diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 38ee410720..cc28e77278 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -87,7 +87,7 @@ pub struct InMemoryLayerInner { /// The values are stored in a serialized format in this file. /// Each serialized Value is preceded by a 'u32' length field. /// PerSeg::page_versions map stores offsets into this file. - file: Arc>, + file: EphemeralFile, resource_units: GlobalResourceUnits, } @@ -381,11 +381,7 @@ impl InMemoryLayer { } pub(crate) fn try_len(&self) -> Option { - self.inner - .try_read() - .map(|i| i.file.try_read().map(|i| i.len()).ok()) - .ok() - .flatten() + self.inner.try_read().map(|i| i.file.len()).ok() } pub(crate) fn assert_writable(&self) { @@ -417,7 +413,7 @@ impl InMemoryLayer { // Look up the keys in the provided keyspace and update // the reconstruct state with whatever is found. pub(crate) async fn get_values_reconstruct_data( - &self, + self: &Arc, keyspace: KeySpace, end_lsn: Lsn, reconstruct_state: &mut ValuesReconstructState, @@ -475,13 +471,13 @@ impl InMemoryLayer { } } - let read_from = inner.file.clone(); + let read_from = Arc::clone(self); let read_ctx = ctx.attached_child(); reconstruct_state .spawn_io(async move { - let locked = read_from.read().await; + let inner = read_from.inner.read().await; let f = vectored_dio_read::execute( - &*locked, + &inner.file, reads .iter() .flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)), @@ -511,6 +507,13 @@ impl InMemoryLayer { } assert!(senders.is_empty()); + + // Keep layer existent until this IO is done; + // This is kinda forced for InMemoryLayer because we need to inner.read() anyway, + // but it's less obvious for DeltaLayer and ImageLayer. So, keep this explicit + // drop for consistency among all three layer types. + drop(inner); + drop(read_from); }) .await; @@ -543,8 +546,7 @@ impl InMemoryLayer { /// Get layer size. pub async fn size(&self) -> Result { let inner = self.inner.read().await; - let locked = inner.file.try_read().expect("no contention"); - Ok(locked.len()) + Ok(inner.file.len()) } pub fn estimated_in_mem_size(&self) -> u64 { @@ -562,10 +564,8 @@ impl InMemoryLayer { ) -> Result { trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}"); - let file = Arc::new(tokio::sync::RwLock::new( - EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate, ctx).await?, - )); - let key = InMemoryLayerFileId(file.read().await.page_cache_file_id()); + let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate, ctx).await?; + let key = InMemoryLayerFileId(file.page_cache_file_id()); Ok(InMemoryLayer { file_id: key, @@ -598,7 +598,7 @@ impl InMemoryLayer { let mut inner = self.inner.write().await; self.assert_writable(); - let base_offset = inner.file.read().await.len(); + let base_offset = inner.file.len(); let SerializedValueBatch { raw, @@ -608,12 +608,8 @@ impl InMemoryLayer { } = serialized_batch; // Write the batch to the file - // FIXME: can't borrow arc - let new_size = { - let mut locked = inner.file.write().await; - locked.write_raw(&raw, ctx).await?; - locked.len() - }; + inner.file.write_raw(&raw, ctx).await?; + let new_size = inner.file.len(); let expected_new_len = base_offset .checked_add(raw.len().into_u64()) @@ -676,7 +672,7 @@ impl InMemoryLayer { pub(crate) async fn tick(&self) -> Option { let mut inner = self.inner.write().await; - let size = inner.file.read().await.len(); + let size = inner.file.len(); inner.resource_units.publish_size(size) } @@ -772,7 +768,7 @@ impl InMemoryLayer { match l0_flush_global_state { l0_flush::Inner::Direct { .. } => { - let file_contents = inner.file.read().await.load_to_io_buf(ctx).await?; + let file_contents = inner.file.load_to_io_buf(ctx).await?; let file_contents = file_contents.freeze(); for (key, vec_map) in inner.index.iter() {