mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
pageserver: allow read path to plan in-mem layer with concurrent write
Problem Get page tracing revealed situations where planning an in-memory layer is taking around 150ms. Upon investigation, the culprit is the inner in-mem layer file lock. A batch being written holds the write lock and a read being planned wants the read lock. Solution Lift the index out of the RW lock and allow the read path to operate on an older version of it via ArcSwap. Note that the read IO operation itself will wait for any on-going writes to finish, but at least we get to plan concurrently with the write.
This commit is contained in:
@@ -13,6 +13,7 @@ use std::sync::{Arc, OnceLock};
|
||||
use std::time::Instant;
|
||||
|
||||
use anyhow::Result;
|
||||
use arc_swap::ArcSwap;
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::key::{CompactKey, Key};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
@@ -63,7 +64,21 @@ pub struct InMemoryLayer {
|
||||
|
||||
opened_at: Instant,
|
||||
|
||||
/// All versions of all pages in the layer are kept here. Indexed
|
||||
/// by block number and LSN. The [`IndexEntry`] is an offset into the
|
||||
/// ephemeral file where the page version is stored.
|
||||
///
|
||||
/// Note that the read path ([`Self::get_values_reconstruct_data`]) and the write path
|
||||
/// [`Self::put_batch`] may use the index concurrently. The core observation is that
|
||||
/// the read path will not proceed if it needs the entries in a batch that's currently
|
||||
/// being written. Hence, it's fine for the read path to operate on the version of the
|
||||
/// index which doesn't include the updates.
|
||||
index: ArcSwap<BTreeMap<CompactKey, VecMap<Lsn, IndexEntry>>>,
|
||||
|
||||
/// The above fields never change, except for `end_lsn`, which is only set once.
|
||||
/// and `index` which is static at a given LSN and is shared between the read
|
||||
/// and write paths via an Arc swap.
|
||||
///
|
||||
/// All other changing parts are in `inner`, and protected by a mutex.
|
||||
inner: RwLock<InMemoryLayerInner>,
|
||||
|
||||
@@ -81,11 +96,6 @@ impl std::fmt::Debug for InMemoryLayer {
|
||||
}
|
||||
|
||||
pub struct InMemoryLayerInner {
|
||||
/// All versions of all pages in the layer are kept here. Indexed
|
||||
/// by block number and LSN. The [`IndexEntry`] is an offset into the
|
||||
/// ephemeral file where the page version is stored.
|
||||
index: BTreeMap<CompactKey, VecMap<Lsn, IndexEntry>>,
|
||||
|
||||
/// The values are stored in a serialized format in this file.
|
||||
/// Each serialized Value is preceded by a 'u32' length field.
|
||||
/// PerSeg::page_versions map stores offsets into this file.
|
||||
@@ -425,8 +435,6 @@ impl InMemoryLayer {
|
||||
.page_content_kind(PageContentKind::InMemoryLayer)
|
||||
.attached_child();
|
||||
|
||||
let inner = self.inner.read().await;
|
||||
|
||||
struct ValueRead {
|
||||
entry_lsn: Lsn,
|
||||
read: vectored_dio_read::LogicalRead<Vec<u8>>,
|
||||
@@ -434,11 +442,9 @@ impl InMemoryLayer {
|
||||
let mut reads: HashMap<Key, Vec<ValueRead>> = HashMap::new();
|
||||
let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default();
|
||||
|
||||
let index = self.index.load();
|
||||
for range in keyspace.ranges.iter() {
|
||||
for (key, vec_map) in inner
|
||||
.index
|
||||
.range(range.start.to_compact()..range.end.to_compact())
|
||||
{
|
||||
for (key, vec_map) in index.range(range.start.to_compact()..range.end.to_compact()) {
|
||||
let key = Key::from_compact(*key);
|
||||
let slice = vec_map.slice_range(lsn_range.clone());
|
||||
|
||||
@@ -466,7 +472,7 @@ impl InMemoryLayer {
|
||||
}
|
||||
}
|
||||
}
|
||||
drop(inner); // release the lock before we spawn the IO; if it's serial-mode IO we will deadlock on the read().await below
|
||||
|
||||
let read_from = Arc::clone(self);
|
||||
let read_ctx = ctx.attached_child();
|
||||
reconstruct_state
|
||||
@@ -573,8 +579,8 @@ impl InMemoryLayer {
|
||||
start_lsn,
|
||||
end_lsn: OnceLock::new(),
|
||||
opened_at: Instant::now(),
|
||||
index: ArcSwap::new(Arc::new(BTreeMap::default())),
|
||||
inner: RwLock::new(InMemoryLayerInner {
|
||||
index: BTreeMap::new(),
|
||||
file,
|
||||
resource_units: GlobalResourceUnits::new(),
|
||||
}),
|
||||
@@ -616,6 +622,8 @@ impl InMemoryLayer {
|
||||
.unwrap();
|
||||
assert_eq!(new_size, expected_new_len);
|
||||
|
||||
let mut updated_index = (**self.index.load()).clone();
|
||||
|
||||
// Update the index with the new entries
|
||||
for meta in metadata {
|
||||
let SerializedValueMeta {
|
||||
@@ -639,7 +647,7 @@ impl InMemoryLayer {
|
||||
will_init,
|
||||
})?;
|
||||
|
||||
let vec_map = inner.index.entry(key).or_default();
|
||||
let vec_map = updated_index.entry(key).or_default();
|
||||
let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0;
|
||||
if old.is_some() {
|
||||
// This should not break anything, but is unexpected: ingestion code aims to filter out
|
||||
@@ -658,6 +666,8 @@ impl InMemoryLayer {
|
||||
);
|
||||
}
|
||||
|
||||
self.index.store(Arc::new(updated_index));
|
||||
|
||||
inner.resource_units.maybe_publish_size(new_size);
|
||||
|
||||
Ok(())
|
||||
@@ -700,8 +710,8 @@ impl InMemoryLayer {
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
let inner = self.inner.write().await;
|
||||
for vec_map in inner.index.values() {
|
||||
let index = self.index.load();
|
||||
for vec_map in index.values() {
|
||||
for (lsn, _) in vec_map.as_slice() {
|
||||
assert!(*lsn < end_lsn);
|
||||
}
|
||||
@@ -743,13 +753,13 @@ impl InMemoryLayer {
|
||||
let key_count = if let Some(key_range) = key_range {
|
||||
let key_range = key_range.start.to_compact()..key_range.end.to_compact();
|
||||
|
||||
inner
|
||||
.index
|
||||
self.index
|
||||
.load()
|
||||
.iter()
|
||||
.filter(|(k, _)| key_range.contains(k))
|
||||
.count()
|
||||
} else {
|
||||
inner.index.len()
|
||||
self.index.load().len()
|
||||
};
|
||||
if key_count == 0 {
|
||||
return Ok(None);
|
||||
@@ -772,7 +782,8 @@ impl InMemoryLayer {
|
||||
let file_contents = inner.file.load_to_io_buf(ctx).await?;
|
||||
let file_contents = file_contents.freeze();
|
||||
|
||||
for (key, vec_map) in inner.index.iter() {
|
||||
let index = self.index.load();
|
||||
for (key, vec_map) in index.iter() {
|
||||
// Write all page versions
|
||||
for (lsn, entry) in vec_map
|
||||
.as_slice()
|
||||
|
||||
Reference in New Issue
Block a user