Compare commits

...

3 Commits

Author SHA1 Message Date
Vlad Lazar
386acd2ae0 sq 2025-05-08 18:13:45 +02:00
Vlad Lazar
bb8508a84e sq 2025-05-08 15:14:07 +02:00
Vlad Lazar
20788f0732 pageserver: allow read path to plan in-mem layer with concurrent write
Problem

Get page tracing revealed situations where planning an in-memory layer
is taking around 150ms. Upon investigation, the culprit is the inner
in-mem layer file lock. A batch being written holds the write lock
and a read being planned wants the read lock.

Solution

Lift the index out of the RW lock and allow the read path to operate on
an older version of it via ArcSwap. Note that the read IO operation
itself will wait for any on-going writes to finish, but at least we get
to plan concurrently with the write.
2025-05-08 14:22:25 +02:00

View File

@@ -5,7 +5,7 @@
//! its position in the file, is kept in memory, though.
//!
use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;
use std::fmt::Write;
use std::ops::Range;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering};
@@ -13,6 +13,7 @@ use std::sync::{Arc, OnceLock};
use std::time::Instant;
use anyhow::Result;
use arc_swap::ArcSwap;
use camino::Utf8PathBuf;
use pageserver_api::key::{CompactKey, Key};
use pageserver_api::keyspace::KeySpace;
@@ -23,7 +24,7 @@ use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::id::TimelineId;
use utils::lsn::Lsn;
use utils::vec_map::VecMap;
use utils::vec_map::{VecMap, VecMapOrdering};
use wal_decoder::serialized_batch::{SerializedValueBatch, SerializedValueMeta, ValueMeta};
use super::{DeltaLayerWriter, PersistentLayerDesc, ValuesReconstructState};
@@ -41,6 +42,8 @@ use crate::{l0_flush, page_cache};
pub(crate) mod vectored_dio_read;
use rpds::RedBlackTreeMapSync;
#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
@@ -63,7 +66,21 @@ pub struct InMemoryLayer {
opened_at: Instant,
/// All versions of all pages in the layer are kept here. Indexed
/// by block number and LSN. The [`IndexEntry`] is an offset into the
/// ephemeral file where the page version is stored.
///
/// Note that the read path ([`Self::get_values_reconstruct_data`]) and the write path
/// [`Self::put_batch`] may use the index concurrently. The core observation is that
/// the read path will not proceed if it needs the entries in a batch that's currently
/// being written. Hence, it's fine for the read path to operate on the version of the
/// index which doesn't include the updates.
index: ArcSwap<RedBlackTreeMapSync<CompactKey, VecMap<Lsn, IndexEntry>>>,
/// The above fields never change, except for `end_lsn`, which is only set once.
/// and `index` which is static at a given LSN and is shared between the read
/// and write paths via an Arc swap.
///
/// All other changing parts are in `inner`, and protected by a mutex.
inner: RwLock<InMemoryLayerInner>,
@@ -81,11 +98,6 @@ impl std::fmt::Debug for InMemoryLayer {
}
pub struct InMemoryLayerInner {
/// All versions of all pages in the layer are kept here. Indexed
/// by block number and LSN. The [`IndexEntry`] is an offset into the
/// ephemeral file where the page version is stored.
index: BTreeMap<CompactKey, VecMap<Lsn, IndexEntry>>,
/// The values are stored in a serialized format in this file.
/// Each serialized Value is preceded by a 'u32' length field.
/// PerSeg::page_versions map stores offsets into this file.
@@ -105,7 +117,7 @@ const MAX_SUPPORTED_BLOB_LEN_BITS: usize = {
trailing_ones
};
/// See [`InMemoryLayerInner::index`].
/// See [`InMemoryLayer::index`].
///
/// For memory efficiency, the data is packed into a u64.
///
@@ -425,8 +437,6 @@ impl InMemoryLayer {
.page_content_kind(PageContentKind::InMemoryLayer)
.attached_child();
let inner = self.inner.read().await;
struct ValueRead {
entry_lsn: Lsn,
read: vectored_dio_read::LogicalRead<Vec<u8>>,
@@ -434,11 +444,9 @@ impl InMemoryLayer {
let mut reads: HashMap<Key, Vec<ValueRead>> = HashMap::new();
let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default();
let index = self.index.load();
for range in keyspace.ranges.iter() {
for (key, vec_map) in inner
.index
.range(range.start.to_compact()..range.end.to_compact())
{
for (key, vec_map) in index.range(range.start.to_compact()..range.end.to_compact()) {
let key = Key::from_compact(*key);
let slice = vec_map.slice_range(lsn_range.clone());
@@ -466,7 +474,7 @@ impl InMemoryLayer {
}
}
}
drop(inner); // release the lock before we spawn the IO; if it's serial-mode IO we will deadlock on the read().await below
let read_from = Arc::clone(self);
let read_ctx = ctx.attached_child();
reconstruct_state
@@ -573,8 +581,8 @@ impl InMemoryLayer {
start_lsn,
end_lsn: OnceLock::new(),
opened_at: Instant::now(),
index: ArcSwap::new(Arc::new(RedBlackTreeMapSync::default())),
inner: RwLock::new(InMemoryLayerInner {
index: BTreeMap::new(),
file,
resource_units: GlobalResourceUnits::new(),
}),
@@ -616,6 +624,9 @@ impl InMemoryLayer {
.unwrap();
assert_eq!(new_size, expected_new_len);
// Constant time clone
let mut updated_index = (**self.index.load()).clone();
// Update the index with the new entries
for meta in metadata {
let SerializedValueMeta {
@@ -639,17 +650,30 @@ impl InMemoryLayer {
will_init,
})?;
let vec_map = inner.index.entry(key).or_default();
let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0;
if old.is_some() {
// This should not break anything, but is unexpected: ingestion code aims to filter out
// multiple writes to the same key at the same LSN. This happens in cases where our
// ingenstion code generates some write like an empty page, and we see a write from postgres
// to the same key in the same wal record. If one such write makes it through, we
// index the most recent write, implicitly ignoring the earlier write. We log a warning
// because this case is unexpected, and we would like tests to fail if this happens.
warn!("Key {} at {} written twice at same LSN", key, lsn);
let vec_map = updated_index.get_mut(&key);
match vec_map {
Some(key_entries) => {
let old = key_entries
.append_or_update_last(lsn, index_entry)
.unwrap()
.0;
if old.is_some() {
// This should not break anything, but is unexpected: ingestion code aims to filter out
// multiple writes to the same key at the same LSN. This happens in cases where our
// ingenstion code generates some write like an empty page, and we see a write from postgres
// to the same key in the same wal record. If one such write makes it through, we
// index the most recent write, implicitly ignoring the earlier write. We log a warning
// because this case is unexpected, and we would like tests to fail if this happens.
warn!("Key {} at {} written twice at same LSN", key, lsn);
}
}
None => {
let mut entries = VecMap::with_capacity(1, VecMapOrdering::Greater);
entries.append(lsn, index_entry).expect("just created");
updated_index.insert_mut(key, entries);
}
}
self.estimated_in_mem_size.fetch_add(
(std::mem::size_of::<CompactKey>()
+ std::mem::size_of::<Lsn>()
@@ -658,6 +682,8 @@ impl InMemoryLayer {
);
}
self.index.store(Arc::new(updated_index));
inner.resource_units.maybe_publish_size(new_size);
Ok(())
@@ -700,8 +726,8 @@ impl InMemoryLayer {
#[cfg(debug_assertions)]
{
let inner = self.inner.write().await;
for vec_map in inner.index.values() {
let index = self.index.load();
for vec_map in index.values() {
for (lsn, _) in vec_map.as_slice() {
assert!(*lsn < end_lsn);
}
@@ -743,13 +769,13 @@ impl InMemoryLayer {
let key_count = if let Some(key_range) = key_range {
let key_range = key_range.start.to_compact()..key_range.end.to_compact();
inner
.index
self.index
.load()
.iter()
.filter(|(k, _)| key_range.contains(k))
.count()
} else {
inner.index.len()
self.index.load().size()
};
if key_count == 0 {
return Ok(None);
@@ -772,7 +798,8 @@ impl InMemoryLayer {
let file_contents = inner.file.load_to_io_buf(ctx).await?;
let file_contents = file_contents.freeze();
for (key, vec_map) in inner.index.iter() {
let index = self.index.load();
for (key, vec_map) in index.iter() {
// Write all page versions
for (lsn, entry) in vec_map
.as_slice()
@@ -945,4 +972,17 @@ mod tests {
);
}
}
#[test]
fn test_rpds() {
let mut tree = rpds::RedBlackTreeMap::<usize, bool>::default();
tree = tree.insert(1, true);
tree = tree.insert(2, true);
let mut new_tree = tree.clone();
*new_tree.get_mut(&2).unwrap() = false;
println!("{}", tree.get(&2).unwrap());
println!("{}", new_tree.get(&2).unwrap());
}
}