diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 9e6ca4b50f..b14a91c657 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -5,7 +5,7 @@ //! its position in the file, is kept in memory, though. //! use std::cmp::Ordering; -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::fmt::Write; use std::ops::Range; use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering}; @@ -24,7 +24,7 @@ use tokio_util::sync::CancellationToken; use tracing::*; use utils::id::TimelineId; use utils::lsn::Lsn; -use utils::vec_map::VecMap; +use utils::vec_map::{VecMap, VecMapOrdering}; use wal_decoder::serialized_batch::{SerializedValueBatch, SerializedValueMeta, ValueMeta}; use super::{DeltaLayerWriter, PersistentLayerDesc, ValuesReconstructState}; @@ -42,6 +42,8 @@ use crate::{l0_flush, page_cache}; pub(crate) mod vectored_dio_read; +use rpds::RedBlackTreeMapSync; + #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)] pub(crate) struct InMemoryLayerFileId(page_cache::FileId); @@ -73,7 +75,7 @@ pub struct InMemoryLayer { /// the read path will not proceed if it needs the entries in a batch that's currently /// being written. Hence, it's fine for the read path to operate on the version of the /// index which doesn't include the updates. - index: ArcSwap>>, + index: ArcSwap>>, /// The above fields never change, except for `end_lsn`, which is only set once. /// and `index` which is static at a given LSN and is shared between the read @@ -579,7 +581,7 @@ impl InMemoryLayer { start_lsn, end_lsn: OnceLock::new(), opened_at: Instant::now(), - index: ArcSwap::new(Arc::new(BTreeMap::default())), + index: ArcSwap::new(Arc::new(RedBlackTreeMapSync::default())), inner: RwLock::new(InMemoryLayerInner { file, resource_units: GlobalResourceUnits::new(), @@ -622,6 +624,7 @@ impl InMemoryLayer { .unwrap(); assert_eq!(new_size, expected_new_len); + // Constant time clone let mut updated_index = (**self.index.load()).clone(); // Update the index with the new entries @@ -647,17 +650,30 @@ impl InMemoryLayer { will_init, })?; - let vec_map = updated_index.entry(key).or_default(); - let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0; - if old.is_some() { - // This should not break anything, but is unexpected: ingestion code aims to filter out - // multiple writes to the same key at the same LSN. This happens in cases where our - // ingenstion code generates some write like an empty page, and we see a write from postgres - // to the same key in the same wal record. If one such write makes it through, we - // index the most recent write, implicitly ignoring the earlier write. We log a warning - // because this case is unexpected, and we would like tests to fail if this happens. - warn!("Key {} at {} written twice at same LSN", key, lsn); + let vec_map = updated_index.get_mut(&key); + match vec_map { + Some(key_entries) => { + let old = key_entries + .append_or_update_last(lsn, index_entry) + .unwrap() + .0; + if old.is_some() { + // This should not break anything, but is unexpected: ingestion code aims to filter out + // multiple writes to the same key at the same LSN. This happens in cases where our + // ingenstion code generates some write like an empty page, and we see a write from postgres + // to the same key in the same wal record. If one such write makes it through, we + // index the most recent write, implicitly ignoring the earlier write. We log a warning + // because this case is unexpected, and we would like tests to fail if this happens. + warn!("Key {} at {} written twice at same LSN", key, lsn); + } + } + None => { + let mut entries = VecMap::with_capacity(1, VecMapOrdering::Greater); + entries.append(lsn, index_entry).expect("just created"); + updated_index.insert_mut(key, entries); + } } + self.estimated_in_mem_size.fetch_add( (std::mem::size_of::() + std::mem::size_of::() @@ -759,7 +775,7 @@ impl InMemoryLayer { .filter(|(k, _)| key_range.contains(k)) .count() } else { - self.index.load().len() + self.index.load().size() }; if key_count == 0 { return Ok(None); @@ -956,4 +972,17 @@ mod tests { ); } } + + #[test] + fn test_rpds() { + let mut tree = rpds::RedBlackTreeMap::::default(); + tree = tree.insert(1, true); + tree = tree.insert(2, true); + + let mut new_tree = tree.clone(); + *new_tree.get_mut(&2).unwrap() = false; + + println!("{}", tree.get(&2).unwrap()); + println!("{}", new_tree.get(&2).unwrap()); + } }