This commit is contained in:
Vlad Lazar
2025-05-08 18:13:45 +02:00
parent bb8508a84e
commit 386acd2ae0

View File

@@ -5,7 +5,7 @@
//! its position in the file, is kept in memory, though.
//!
use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;
use std::fmt::Write;
use std::ops::Range;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering};
@@ -24,7 +24,7 @@ use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::id::TimelineId;
use utils::lsn::Lsn;
use utils::vec_map::VecMap;
use utils::vec_map::{VecMap, VecMapOrdering};
use wal_decoder::serialized_batch::{SerializedValueBatch, SerializedValueMeta, ValueMeta};
use super::{DeltaLayerWriter, PersistentLayerDesc, ValuesReconstructState};
@@ -42,6 +42,8 @@ use crate::{l0_flush, page_cache};
pub(crate) mod vectored_dio_read;
use rpds::RedBlackTreeMapSync;
#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
@@ -73,7 +75,7 @@ pub struct InMemoryLayer {
/// the read path will not proceed if it needs the entries in a batch that's currently
/// being written. Hence, it's fine for the read path to operate on the version of the
/// index which doesn't include the updates.
index: ArcSwap<BTreeMap<CompactKey, VecMap<Lsn, IndexEntry>>>,
index: ArcSwap<RedBlackTreeMapSync<CompactKey, VecMap<Lsn, IndexEntry>>>,
/// The above fields never change, except for `end_lsn`, which is only set once.
/// and `index` which is static at a given LSN and is shared between the read
@@ -579,7 +581,7 @@ impl InMemoryLayer {
start_lsn,
end_lsn: OnceLock::new(),
opened_at: Instant::now(),
index: ArcSwap::new(Arc::new(BTreeMap::default())),
index: ArcSwap::new(Arc::new(RedBlackTreeMapSync::default())),
inner: RwLock::new(InMemoryLayerInner {
file,
resource_units: GlobalResourceUnits::new(),
@@ -622,6 +624,7 @@ impl InMemoryLayer {
.unwrap();
assert_eq!(new_size, expected_new_len);
// Constant time clone
let mut updated_index = (**self.index.load()).clone();
// Update the index with the new entries
@@ -647,17 +650,30 @@ impl InMemoryLayer {
will_init,
})?;
let vec_map = updated_index.entry(key).or_default();
let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0;
if old.is_some() {
// This should not break anything, but is unexpected: ingestion code aims to filter out
// multiple writes to the same key at the same LSN. This happens in cases where our
// ingenstion code generates some write like an empty page, and we see a write from postgres
// to the same key in the same wal record. If one such write makes it through, we
// index the most recent write, implicitly ignoring the earlier write. We log a warning
// because this case is unexpected, and we would like tests to fail if this happens.
warn!("Key {} at {} written twice at same LSN", key, lsn);
let vec_map = updated_index.get_mut(&key);
match vec_map {
Some(key_entries) => {
let old = key_entries
.append_or_update_last(lsn, index_entry)
.unwrap()
.0;
if old.is_some() {
// This should not break anything, but is unexpected: ingestion code aims to filter out
// multiple writes to the same key at the same LSN. This happens in cases where our
// ingenstion code generates some write like an empty page, and we see a write from postgres
// to the same key in the same wal record. If one such write makes it through, we
// index the most recent write, implicitly ignoring the earlier write. We log a warning
// because this case is unexpected, and we would like tests to fail if this happens.
warn!("Key {} at {} written twice at same LSN", key, lsn);
}
}
None => {
let mut entries = VecMap::with_capacity(1, VecMapOrdering::Greater);
entries.append(lsn, index_entry).expect("just created");
updated_index.insert_mut(key, entries);
}
}
self.estimated_in_mem_size.fetch_add(
(std::mem::size_of::<CompactKey>()
+ std::mem::size_of::<Lsn>()
@@ -759,7 +775,7 @@ impl InMemoryLayer {
.filter(|(k, _)| key_range.contains(k))
.count()
} else {
self.index.load().len()
self.index.load().size()
};
if key_count == 0 {
return Ok(None);
@@ -956,4 +972,17 @@ mod tests {
);
}
}
#[test]
fn test_rpds() {
let mut tree = rpds::RedBlackTreeMap::<usize, bool>::default();
tree = tree.insert(1, true);
tree = tree.insert(2, true);
let mut new_tree = tree.clone();
*new_tree.get_mut(&2).unwrap() = false;
println!("{}", tree.get(&2).unwrap());
println!("{}", new_tree.get(&2).unwrap());
}
}