pageserver: allow in-mem reads to be planned during writes (#11937)

## Problem

Get page tracing revealed situations where planning an in-memory layer
is taking around 150ms. Upon investigation, the culprit is the inner
in-mem layer file lock. A batch being written holds the write lock and a
read being planned wants the read lock. See [this
trace](https://neonprod.grafana.net/explore?schemaVersion=1&panes=%7B%22j61%22:%7B%22datasource%22:%22JMfY_5TVz%22,%22queries%22:%5B%7B%22refId%22:%22traceId%22,%22queryType%22:%22traceql%22,%22query%22:%22412ec4522fe1750798aca54aec2680ac%22,%22datasource%22:%7B%22type%22:%22tempo%22,%22uid%22:%22JMfY_5TVz%22%7D,%22limit%22:20,%22tableType%22:%22traces%22,%22metricsQueryType%22:%22range%22%7D%5D,%22range%22:%7B%22to%22:%221746702606349%22,%22from%22:%221746681006349%22%7D,%22panelsState%22:%7B%22trace%22:%7B%22spanId%22:%2291e9f1879c9bccc0%22%7D%7D%7D,%226d0%22:%7B%22datasource%22:%22JMfY_5TVz%22,%22queries%22:%5B%7B%22refId%22:%22traceId%22,%22queryType%22:%22traceql%22,%22query%22:%2220a4757706b16af0e1fbab83f9d2e925%22,%22datasource%22:%7B%22type%22:%22tempo%22,%22uid%22:%22JMfY_5TVz%22%7D,%22limit%22:20,%22tableType%22:%22traces%22,%22metricsQueryType%22:%22range%22%7D%5D,%22range%22:%7B%22to%22:%221746702614807%22,%22from%22:%221746681014807%22%7D,%22panelsState%22:%7B%22trace%22:%7B%22spanId%22:%2260e7825512bc2a6b%22%7D%7D%7D%7D)
for example.

## Summary of changes

Lift the index into its own RwLock such that we can at least plan during
write IO.

I tried to be smarter in
https://github.com/neondatabase/neon/pull/11866: arc swap + structurally
shared datastructure
and that killed ingest perf for small keys.

## Benchmarking

* No statistically significant difference for rust inget benchmarks when
compared to main.
This commit is contained in:
Vlad Lazar
2025-05-21 12:08:49 +01:00
committed by GitHub
parent 6f4f3691a5
commit 08bb72e516

View File

@@ -63,7 +63,28 @@ pub struct InMemoryLayer {
opened_at: Instant,
/// The above fields never change, except for `end_lsn`, which is only set once.
/// All versions of all pages in the layer are kept here. Indexed
/// by block number and LSN. The [`IndexEntry`] is an offset into the
/// ephemeral file where the page version is stored.
///
/// We use a separate lock for the index to reduce the critical section
/// during which reads cannot be planned.
///
/// If you need access to both the index and the underlying file at the same time,
/// respect the following locking order to avoid deadlocks:
/// 1. [`InMemoryLayer::inner`]
/// 2. [`InMemoryLayer::index`]
///
/// Note that the file backing [`InMemoryLayer::inner`] is append-only,
/// so it is not necessary to hold simultaneous locks on index.
/// This avoids holding index locks across IO, and is crucial for avoiding read tail latency.
/// In particular:
/// 1. It is safe to read and release [`InMemoryLayer::index`] before locking and reading from [`InMemoryLayer::inner`].
/// 2. It is safe to write and release [`InMemoryLayer::inner`] before locking and updating [`InMemoryLayer::index`].
index: RwLock<BTreeMap<CompactKey, VecMap<Lsn, IndexEntry>>>,
/// The above fields never change, except for `end_lsn`, which is only set once,
/// and `index` (see rationale there).
/// All other changing parts are in `inner`, and protected by a mutex.
inner: RwLock<InMemoryLayerInner>,
@@ -81,11 +102,6 @@ impl std::fmt::Debug for InMemoryLayer {
}
pub struct InMemoryLayerInner {
/// All versions of all pages in the layer are kept here. Indexed
/// by block number and LSN. The [`IndexEntry`] is an offset into the
/// ephemeral file where the page version is stored.
index: BTreeMap<CompactKey, VecMap<Lsn, IndexEntry>>,
/// The values are stored in a serialized format in this file.
/// Each serialized Value is preceded by a 'u32' length field.
/// PerSeg::page_versions map stores offsets into this file.
@@ -105,7 +121,7 @@ const MAX_SUPPORTED_BLOB_LEN_BITS: usize = {
trailing_ones
};
/// See [`InMemoryLayerInner::index`].
/// See [`InMemoryLayer::index`].
///
/// For memory efficiency, the data is packed into a u64.
///
@@ -425,7 +441,7 @@ impl InMemoryLayer {
.page_content_kind(PageContentKind::InMemoryLayer)
.attached_child();
let inner = self.inner.read().await;
let index = self.index.read().await;
struct ValueRead {
entry_lsn: Lsn,
@@ -435,10 +451,7 @@ impl InMemoryLayer {
let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default();
for range in keyspace.ranges.iter() {
for (key, vec_map) in inner
.index
.range(range.start.to_compact()..range.end.to_compact())
{
for (key, vec_map) in index.range(range.start.to_compact()..range.end.to_compact()) {
let key = Key::from_compact(*key);
let slice = vec_map.slice_range(lsn_range.clone());
@@ -466,7 +479,7 @@ impl InMemoryLayer {
}
}
}
drop(inner); // release the lock before we spawn the IO; if it's serial-mode IO we will deadlock on the read().await below
drop(index); // release the lock before we spawn the IO; if it's serial-mode IO we will deadlock on the read().await below
let read_from = Arc::clone(self);
let read_ctx = ctx.attached_child();
reconstruct_state
@@ -573,8 +586,8 @@ impl InMemoryLayer {
start_lsn,
end_lsn: OnceLock::new(),
opened_at: Instant::now(),
index: RwLock::new(BTreeMap::new()),
inner: RwLock::new(InMemoryLayerInner {
index: BTreeMap::new(),
file,
resource_units: GlobalResourceUnits::new(),
}),
@@ -592,31 +605,39 @@ impl InMemoryLayer {
serialized_batch: SerializedValueBatch,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let mut inner = self.inner.write().await;
self.assert_writable();
let (base_offset, metadata) = {
let mut inner = self.inner.write().await;
self.assert_writable();
let base_offset = inner.file.len();
let base_offset = inner.file.len();
let SerializedValueBatch {
raw,
metadata,
max_lsn: _,
len: _,
} = serialized_batch;
let SerializedValueBatch {
raw,
metadata,
max_lsn: _,
len: _,
} = serialized_batch;
// Write the batch to the file
inner.file.write_raw(&raw, ctx).await?;
let new_size = inner.file.len();
// Write the batch to the file
inner.file.write_raw(&raw, ctx).await?;
let new_size = inner.file.len();
let expected_new_len = base_offset
.checked_add(raw.len().into_u64())
// write_raw would error if we were to overflow u64.
// also IndexEntry and higher levels in
//the code don't allow the file to grow that large
.unwrap();
assert_eq!(new_size, expected_new_len);
let expected_new_len = base_offset
.checked_add(raw.len().into_u64())
// write_raw would error if we were to overflow u64.
// also IndexEntry and higher levels in
//the code don't allow the file to grow that large
.unwrap();
assert_eq!(new_size, expected_new_len);
inner.resource_units.maybe_publish_size(new_size);
(base_offset, metadata)
};
// Update the index with the new entries
let mut index = self.index.write().await;
for meta in metadata {
let SerializedValueMeta {
key,
@@ -639,7 +660,7 @@ impl InMemoryLayer {
will_init,
})?;
let vec_map = inner.index.entry(key).or_default();
let vec_map = index.entry(key).or_default();
let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0;
if old.is_some() {
// This should not break anything, but is unexpected: ingestion code aims to filter out
@@ -658,8 +679,6 @@ impl InMemoryLayer {
);
}
inner.resource_units.maybe_publish_size(new_size);
Ok(())
}
@@ -680,6 +699,18 @@ impl InMemoryLayer {
/// Records the end_lsn for non-dropped layers.
/// `end_lsn` is exclusive
///
/// A note on locking:
/// The current API of [`InMemoryLayer`] does not ensure that there's no ongoing
/// writes while freezing the layer. This is enforced at a higher level via
/// [`crate::tenant::Timeline::write_lock`]. Freeze might be called via two code paths:
/// 1. Via the active [`crate::tenant::timeline::TimelineWriter`]. This holds the
/// Timeline::write_lock for its lifetime. The rolling is handled in
/// [`crate::tenant::timeline::TimelineWriter::put_batch`]. It's a &mut self function
/// so can't be called from different threads.
/// 2. In the background via [`crate::tenant::Timeline::maybe_freeze_ephemeral_layer`].
/// This only proceeds if try_lock on Timeline::write_lock succeeds (i.e. there's no active writer),
/// hence there can be no concurrent writes
pub async fn freeze(&self, end_lsn: Lsn) {
assert!(
self.start_lsn < end_lsn,
@@ -700,8 +731,8 @@ impl InMemoryLayer {
#[cfg(debug_assertions)]
{
let inner = self.inner.write().await;
for vec_map in inner.index.values() {
let index = self.index.read().await;
for vec_map in index.values() {
for (lsn, _) in vec_map.as_slice() {
assert!(*lsn < end_lsn);
}
@@ -724,14 +755,11 @@ impl InMemoryLayer {
) -> Result<Option<(PersistentLayerDesc, Utf8PathBuf)>> {
// Grab the lock in read-mode. We hold it over the I/O, but because this
// layer is not writeable anymore, no one should be trying to acquire the
// write lock on it, so we shouldn't block anyone. There's one exception
// though: another thread might have grabbed a reference to this layer
// in `get_layer_for_write' just before the checkpointer called
// `freeze`, and then `write_to_disk` on it. When the thread gets the
// lock, it will see that it's not writeable anymore and retry, but it
// would have to wait until we release it. That race condition is very
// rare though, so we just accept the potential latency hit for now.
// write lock on it, so we shouldn't block anyone. See the comment on
// [`InMemoryLayer::freeze`] to understand how locking between the append path
// and layer flushing works.
let inner = self.inner.read().await;
let index = self.index.read().await;
use l0_flush::Inner;
let _concurrency_permit = match l0_flush_global_state {
@@ -743,13 +771,9 @@ impl InMemoryLayer {
let key_count = if let Some(key_range) = key_range {
let key_range = key_range.start.to_compact()..key_range.end.to_compact();
inner
.index
.iter()
.filter(|(k, _)| key_range.contains(k))
.count()
index.iter().filter(|(k, _)| key_range.contains(k)).count()
} else {
inner.index.len()
index.len()
};
if key_count == 0 {
return Ok(None);
@@ -772,7 +796,7 @@ impl InMemoryLayer {
let file_contents = inner.file.load_to_io_buf(ctx).await?;
let file_contents = file_contents.freeze();
for (key, vec_map) in inner.index.iter() {
for (key, vec_map) in index.iter() {
// Write all page versions
for (lsn, entry) in vec_map
.as_slice()