mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 22:50:38 +00:00
pageserver/inmemory_layer: add vectored reconstruct
Collect the values for a key space in one go. Firstly, collect all the offsets at which a read is required and order them. Secondly, perform all of the reads. Note that read amplification is still present. A future patch set will deal with this problem
This commit is contained in:
@@ -9,13 +9,15 @@ use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
|
||||
use crate::repository::{Key, Value};
|
||||
use crate::tenant::block_io::BlockReader;
|
||||
use crate::tenant::ephemeral_file::EphemeralFile;
|
||||
use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState};
|
||||
use crate::tenant::Timeline;
|
||||
use crate::tenant::storage_layer::ValueReconstructResult;
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::{PageReconstructError, Timeline};
|
||||
use crate::walrecord;
|
||||
use anyhow::{ensure, Result};
|
||||
use anyhow::{anyhow, ensure, Result};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::InMemoryLayerInfo;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{BinaryHeap, HashMap, HashSet};
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use tracing::*;
|
||||
use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
|
||||
@@ -25,7 +27,7 @@ use std::fmt::Write as _;
|
||||
use std::ops::Range;
|
||||
use tokio::sync::{RwLock, RwLockWriteGuard};
|
||||
|
||||
use super::{DeltaLayerWriter, ResidentLayer};
|
||||
use super::{DeltaLayerWriter, ResidentLayer, ValueReconstructState, ValuesReconstructState};
|
||||
|
||||
pub struct InMemoryLayer {
|
||||
conf: &'static PageServerConf,
|
||||
@@ -67,6 +69,13 @@ pub struct InMemoryLayerInner {
|
||||
file: EphemeralFile,
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq, Ord, PartialOrd)]
|
||||
struct BlockRead {
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
block_offset: u64,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for InMemoryLayerInner {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("InMemoryLayerInner").finish()
|
||||
@@ -202,6 +211,85 @@ impl InMemoryLayer {
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
}
|
||||
}
|
||||
|
||||
// Look up the keys in the provided keyspace and update
|
||||
// the reconstruct state with whatever is found.
|
||||
//
|
||||
// If the key is cached, go no further than the cached Lsn.
|
||||
pub(crate) async fn get_values_reconstruct_data(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
end_lsn: Lsn,
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::InMemoryLayer)
|
||||
.build();
|
||||
|
||||
let inner = self.inner.read().await;
|
||||
|
||||
let reader = inner.file.block_cursor();
|
||||
|
||||
let mut completed_keys = HashSet::new();
|
||||
let mut min_heap = BinaryHeap::new();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut key = range.start;
|
||||
while key < range.end {
|
||||
if let Some(vec_map) = inner.index.get(&key) {
|
||||
let range = match reconstruct_state.get_cached_lsn(&key) {
|
||||
Some(cached_lsn) => (cached_lsn + 1)..end_lsn,
|
||||
None => self.start_lsn..end_lsn,
|
||||
};
|
||||
|
||||
let slice = vec_map.slice_range(range);
|
||||
for (entry_lsn, pos) in slice.iter().rev() {
|
||||
min_heap.push(BlockRead {
|
||||
key,
|
||||
lsn: *entry_lsn,
|
||||
block_offset: *pos,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
key = key.next();
|
||||
}
|
||||
}
|
||||
|
||||
let keyspace_size = keyspace.total_size();
|
||||
|
||||
while completed_keys.len() < keyspace_size && !min_heap.is_empty() {
|
||||
let block_read = min_heap.pop().unwrap();
|
||||
if completed_keys.contains(&block_read.key) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let buf = reader.read_blob(block_read.block_offset, &ctx).await;
|
||||
if let Err(e) = buf {
|
||||
reconstruct_state
|
||||
.on_key_error(block_read.key, PageReconstructError::from(anyhow!(e)));
|
||||
completed_keys.insert(block_read.key);
|
||||
continue;
|
||||
}
|
||||
|
||||
let value = Value::des(&buf.unwrap());
|
||||
if let Err(e) = value {
|
||||
reconstruct_state
|
||||
.on_key_error(block_read.key, PageReconstructError::from(anyhow!(e)));
|
||||
completed_keys.insert(block_read.key);
|
||||
continue;
|
||||
}
|
||||
|
||||
let key_done =
|
||||
reconstruct_state.update_key(&block_read.key, block_read.lsn, value.unwrap());
|
||||
if key_done {
|
||||
completed_keys.insert(block_read.key);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for InMemoryLayer {
|
||||
|
||||
Reference in New Issue
Block a user