diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 5d558e66cc..09a211f2a6 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -13,6 +13,7 @@ use std::sync::{Arc, OnceLock}; use std::time::Instant; use anyhow::Result; +use arc_swap::ArcSwap; use camino::Utf8PathBuf; use pageserver_api::key::{CompactKey, Key}; use pageserver_api::keyspace::KeySpace; @@ -63,7 +64,21 @@ pub struct InMemoryLayer { opened_at: Instant, + /// All versions of all pages in the layer are kept here. Indexed + /// by block number and LSN. The [`IndexEntry`] is an offset into the + /// ephemeral file where the page version is stored. + /// + /// Note that the read path ([`Self::get_values_reconstruct_data`]) and the write path + /// [`Self::put_batch`] may use the index concurrently. The core observation is that + /// the read path will not proceed if it needs the entries in a batch that's currently + /// being written. Hence, it's fine for the read path to operate on the version of the + /// index which doesn't include the updates. + index: ArcSwap>>, + /// The above fields never change, except for `end_lsn`, which is only set once. + /// and `index` which is static at a given LSN and is shared between the read + /// and write paths via an Arc swap. + /// /// All other changing parts are in `inner`, and protected by a mutex. inner: RwLock, @@ -81,11 +96,6 @@ impl std::fmt::Debug for InMemoryLayer { } pub struct InMemoryLayerInner { - /// All versions of all pages in the layer are kept here. Indexed - /// by block number and LSN. The [`IndexEntry`] is an offset into the - /// ephemeral file where the page version is stored. - index: BTreeMap>, - /// The values are stored in a serialized format in this file. /// Each serialized Value is preceded by a 'u32' length field. /// PerSeg::page_versions map stores offsets into this file. @@ -425,8 +435,6 @@ impl InMemoryLayer { .page_content_kind(PageContentKind::InMemoryLayer) .attached_child(); - let inner = self.inner.read().await; - struct ValueRead { entry_lsn: Lsn, read: vectored_dio_read::LogicalRead>, @@ -434,11 +442,9 @@ impl InMemoryLayer { let mut reads: HashMap> = HashMap::new(); let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default(); + let index = self.index.load(); for range in keyspace.ranges.iter() { - for (key, vec_map) in inner - .index - .range(range.start.to_compact()..range.end.to_compact()) - { + for (key, vec_map) in index.range(range.start.to_compact()..range.end.to_compact()) { let key = Key::from_compact(*key); let slice = vec_map.slice_range(lsn_range.clone()); @@ -466,7 +472,7 @@ impl InMemoryLayer { } } } - drop(inner); // release the lock before we spawn the IO; if it's serial-mode IO we will deadlock on the read().await below + let read_from = Arc::clone(self); let read_ctx = ctx.attached_child(); reconstruct_state @@ -573,8 +579,8 @@ impl InMemoryLayer { start_lsn, end_lsn: OnceLock::new(), opened_at: Instant::now(), + index: ArcSwap::new(Arc::new(BTreeMap::default())), inner: RwLock::new(InMemoryLayerInner { - index: BTreeMap::new(), file, resource_units: GlobalResourceUnits::new(), }), @@ -616,6 +622,8 @@ impl InMemoryLayer { .unwrap(); assert_eq!(new_size, expected_new_len); + let mut updated_index = (**self.index.load()).clone(); + // Update the index with the new entries for meta in metadata { let SerializedValueMeta { @@ -639,7 +647,7 @@ impl InMemoryLayer { will_init, })?; - let vec_map = inner.index.entry(key).or_default(); + let vec_map = updated_index.entry(key).or_default(); let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0; if old.is_some() { // This should not break anything, but is unexpected: ingestion code aims to filter out @@ -658,6 +666,8 @@ impl InMemoryLayer { ); } + self.index.store(Arc::new(updated_index)); + inner.resource_units.maybe_publish_size(new_size); Ok(()) @@ -700,8 +710,8 @@ impl InMemoryLayer { #[cfg(debug_assertions)] { - let inner = self.inner.write().await; - for vec_map in inner.index.values() { + let index = self.index.load(); + for vec_map in index.values() { for (lsn, _) in vec_map.as_slice() { assert!(*lsn < end_lsn); } @@ -743,13 +753,13 @@ impl InMemoryLayer { let key_count = if let Some(key_range) = key_range { let key_range = key_range.start.to_compact()..key_range.end.to_compact(); - inner - .index + self.index + .load() .iter() .filter(|(k, _)| key_range.contains(k)) .count() } else { - inner.index.len() + self.index.load().len() }; if key_count == 0 { return Ok(None); @@ -772,7 +782,8 @@ impl InMemoryLayer { let file_contents = inner.file.load_to_io_buf(ctx).await?; let file_contents = file_contents.freeze(); - for (key, vec_map) in inner.index.iter() { + let index = self.index.load(); + for (key, vec_map) in index.iter() { // Write all page versions for (lsn, entry) in vec_map .as_slice()