From 20788f073239a2ff5952c4094c5b9db75b7e1d6f Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Thu, 8 May 2025 14:22:25 +0200 Subject: [PATCH] pageserver: allow read path to plan in-mem layer with concurrent write Problem Get page tracing revealed situations where planning an in-memory layer is taking around 150ms. Upon investigation, the culprit is the inner in-mem layer file lock. A batch being written holds the write lock and a read being planned wants the read lock. Solution Lift the index out of the RW lock and allow the read path to operate on an older version of it via ArcSwap. Note that the read IO operation itself will wait for any on-going writes to finish, but at least we get to plan concurrently with the write. --- .../tenant/storage_layer/inmemory_layer.rs | 51 +++++++++++-------- 1 file changed, 31 insertions(+), 20 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 5d558e66cc..09a211f2a6 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -13,6 +13,7 @@ use std::sync::{Arc, OnceLock}; use std::time::Instant; use anyhow::Result; +use arc_swap::ArcSwap; use camino::Utf8PathBuf; use pageserver_api::key::{CompactKey, Key}; use pageserver_api::keyspace::KeySpace; @@ -63,7 +64,21 @@ pub struct InMemoryLayer { opened_at: Instant, + /// All versions of all pages in the layer are kept here. Indexed + /// by block number and LSN. The [`IndexEntry`] is an offset into the + /// ephemeral file where the page version is stored. + /// + /// Note that the read path ([`Self::get_values_reconstruct_data`]) and the write path + /// [`Self::put_batch`] may use the index concurrently. The core observation is that + /// the read path will not proceed if it needs the entries in a batch that's currently + /// being written. Hence, it's fine for the read path to operate on the version of the + /// index which doesn't include the updates. + index: ArcSwap>>, + /// The above fields never change, except for `end_lsn`, which is only set once. + /// and `index` which is static at a given LSN and is shared between the read + /// and write paths via an Arc swap. + /// /// All other changing parts are in `inner`, and protected by a mutex. inner: RwLock, @@ -81,11 +96,6 @@ impl std::fmt::Debug for InMemoryLayer { } pub struct InMemoryLayerInner { - /// All versions of all pages in the layer are kept here. Indexed - /// by block number and LSN. The [`IndexEntry`] is an offset into the - /// ephemeral file where the page version is stored. - index: BTreeMap>, - /// The values are stored in a serialized format in this file. /// Each serialized Value is preceded by a 'u32' length field. /// PerSeg::page_versions map stores offsets into this file. @@ -425,8 +435,6 @@ impl InMemoryLayer { .page_content_kind(PageContentKind::InMemoryLayer) .attached_child(); - let inner = self.inner.read().await; - struct ValueRead { entry_lsn: Lsn, read: vectored_dio_read::LogicalRead>, @@ -434,11 +442,9 @@ impl InMemoryLayer { let mut reads: HashMap> = HashMap::new(); let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default(); + let index = self.index.load(); for range in keyspace.ranges.iter() { - for (key, vec_map) in inner - .index - .range(range.start.to_compact()..range.end.to_compact()) - { + for (key, vec_map) in index.range(range.start.to_compact()..range.end.to_compact()) { let key = Key::from_compact(*key); let slice = vec_map.slice_range(lsn_range.clone()); @@ -466,7 +472,7 @@ impl InMemoryLayer { } } } - drop(inner); // release the lock before we spawn the IO; if it's serial-mode IO we will deadlock on the read().await below + let read_from = Arc::clone(self); let read_ctx = ctx.attached_child(); reconstruct_state @@ -573,8 +579,8 @@ impl InMemoryLayer { start_lsn, end_lsn: OnceLock::new(), opened_at: Instant::now(), + index: ArcSwap::new(Arc::new(BTreeMap::default())), inner: RwLock::new(InMemoryLayerInner { - index: BTreeMap::new(), file, resource_units: GlobalResourceUnits::new(), }), @@ -616,6 +622,8 @@ impl InMemoryLayer { .unwrap(); assert_eq!(new_size, expected_new_len); + let mut updated_index = (**self.index.load()).clone(); + // Update the index with the new entries for meta in metadata { let SerializedValueMeta { @@ -639,7 +647,7 @@ impl InMemoryLayer { will_init, })?; - let vec_map = inner.index.entry(key).or_default(); + let vec_map = updated_index.entry(key).or_default(); let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0; if old.is_some() { // This should not break anything, but is unexpected: ingestion code aims to filter out @@ -658,6 +666,8 @@ impl InMemoryLayer { ); } + self.index.store(Arc::new(updated_index)); + inner.resource_units.maybe_publish_size(new_size); Ok(()) @@ -700,8 +710,8 @@ impl InMemoryLayer { #[cfg(debug_assertions)] { - let inner = self.inner.write().await; - for vec_map in inner.index.values() { + let index = self.index.load(); + for vec_map in index.values() { for (lsn, _) in vec_map.as_slice() { assert!(*lsn < end_lsn); } @@ -743,13 +753,13 @@ impl InMemoryLayer { let key_count = if let Some(key_range) = key_range { let key_range = key_range.start.to_compact()..key_range.end.to_compact(); - inner - .index + self.index + .load() .iter() .filter(|(k, _)| key_range.contains(k)) .count() } else { - inner.index.len() + self.index.load().len() }; if key_count == 0 { return Ok(None); @@ -772,7 +782,8 @@ impl InMemoryLayer { let file_contents = inner.file.load_to_io_buf(ctx).await?; let file_contents = file_contents.freeze(); - for (key, vec_map) in inner.index.iter() { + let index = self.index.load(); + for (key, vec_map) in index.iter() { // Write all page versions for (lsn, entry) in vec_map .as_slice()