mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 18:02:56 +00:00
pagserver/delta_layer: use vectored blob read
This commit does a few things: * update the delta layer to use vectored blob reads * thread the start Lsn for the read down to the delta layer * thread the max size of the vectored read to the delta layer
This commit is contained in:
@@ -329,10 +329,20 @@ impl ReadableLayerDesc {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
match self {
|
||||
ReadableLayerDesc::Persistent { desc, lsn_ceil, .. } => {
|
||||
ReadableLayerDesc::Persistent {
|
||||
desc,
|
||||
lsn_floor,
|
||||
lsn_ceil,
|
||||
} => {
|
||||
let layer = layer_manager.get_from_desc(desc);
|
||||
layer
|
||||
.get_values_reconstruct_data(keyspace, *lsn_ceil, reconstruct_state, ctx)
|
||||
.get_values_reconstruct_data(
|
||||
keyspace,
|
||||
*lsn_floor,
|
||||
*lsn_ceil,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
}
|
||||
ReadableLayerDesc::InMemory { handle, lsn_ceil } => {
|
||||
|
||||
@@ -31,7 +31,7 @@ use crate::config::PageServerConf;
|
||||
use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
use crate::repository::{Key, Value, KEY_SIZE};
|
||||
use crate::tenant::blob_io::BlobWriter;
|
||||
use crate::tenant::blob_io::{BlobWriter, VectoredBlobReader, VectoredRead, VectoredReadExtended};
|
||||
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
|
||||
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
||||
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
|
||||
@@ -41,13 +41,13 @@ use crate::virtual_file::{self, VirtualFile};
|
||||
use crate::{walrecord, TEMP_FILE_SUFFIX};
|
||||
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::BytesMut;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::BTreeMap;
|
||||
use std::fs::File;
|
||||
use std::io::SeekFrom;
|
||||
use std::ops::Range;
|
||||
@@ -213,6 +213,7 @@ pub struct DeltaLayerInner {
|
||||
// values copied from summary
|
||||
index_start_blk: u32,
|
||||
index_root_blk: u32,
|
||||
vectored_blob_reader: VectoredBlobReader,
|
||||
|
||||
/// Reader object for reading blocks from the file.
|
||||
file: FileBlockReader,
|
||||
@@ -248,7 +249,7 @@ impl DeltaLayer {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let inner = self.load(LayerAccessKind::Dump, ctx).await?;
|
||||
let inner = self.load(LayerAccessKind::Dump, 0, ctx).await?;
|
||||
|
||||
inner.dump(ctx).await
|
||||
}
|
||||
@@ -284,20 +285,25 @@ impl DeltaLayer {
|
||||
async fn load(
|
||||
&self,
|
||||
access_kind: LayerAccessKind,
|
||||
max_vectored_read_size: usize,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<&Arc<DeltaLayerInner>> {
|
||||
self.access_stats.record_access(access_kind, ctx);
|
||||
// Quick exit if already loaded
|
||||
self.inner
|
||||
.get_or_try_init(|| self.load_inner(ctx))
|
||||
.get_or_try_init(|| self.load_inner(max_vectored_read_size, ctx))
|
||||
.await
|
||||
.with_context(|| format!("Failed to load delta layer {}", self.path()))
|
||||
}
|
||||
|
||||
async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
|
||||
async fn load_inner(
|
||||
&self,
|
||||
max_vectored_read_size: usize,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Arc<DeltaLayerInner>> {
|
||||
let path = self.path();
|
||||
|
||||
let loaded = DeltaLayerInner::load(&path, None, ctx)
|
||||
let loaded = DeltaLayerInner::load(&path, None, max_vectored_read_size, ctx)
|
||||
.await
|
||||
.and_then(|res| res)?;
|
||||
|
||||
@@ -691,6 +697,12 @@ impl DeltaLayer {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
struct DeltaVecBlobMeta {
|
||||
pub key: Key,
|
||||
pub lsn: Lsn,
|
||||
}
|
||||
|
||||
impl DeltaLayerInner {
|
||||
/// Returns nested result following Result<Result<_, OpErr>, Critical>:
|
||||
/// - inner has the success or transient failure
|
||||
@@ -698,15 +710,16 @@ impl DeltaLayerInner {
|
||||
pub(super) async fn load(
|
||||
path: &Utf8Path,
|
||||
summary: Option<Summary>,
|
||||
max_vectored_read_size: usize,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
|
||||
let file = match VirtualFile::open(path).await {
|
||||
Ok(file) => file,
|
||||
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
|
||||
};
|
||||
let file = FileBlockReader::new(file);
|
||||
let block_reader = FileBlockReader::new(file);
|
||||
|
||||
let summary_blk = match file.read_blk(0, ctx).await {
|
||||
let summary_blk = match block_reader.read_blk(0, ctx).await {
|
||||
Ok(blk) => blk,
|
||||
Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
|
||||
};
|
||||
@@ -728,8 +741,16 @@ impl DeltaLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: don't open file twice
|
||||
let file = match VirtualFile::open(path).await {
|
||||
Ok(file) => file,
|
||||
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
|
||||
};
|
||||
let vectored_blob_reader = VectoredBlobReader::new(file, max_vectored_read_size);
|
||||
|
||||
Ok(Ok(DeltaLayerInner {
|
||||
file,
|
||||
file: block_reader,
|
||||
vectored_blob_reader,
|
||||
index_start_blk: actual_summary.index_start_blk,
|
||||
index_root_blk: actual_summary.index_root_blk,
|
||||
}))
|
||||
@@ -834,6 +855,7 @@ impl DeltaLayerInner {
|
||||
pub(super) async fn get_values_reconstruct_data(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
start_lsn: Lsn,
|
||||
end_lsn: Lsn,
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
@@ -845,57 +867,88 @@ impl DeltaLayerInner {
|
||||
file,
|
||||
);
|
||||
|
||||
let mut offsets: BTreeMap<Key, Vec<(Lsn, u64)>> = BTreeMap::new();
|
||||
let mut reads = Vec::new();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut ignore_key = None;
|
||||
let mut current_read: Option<VectoredRead<DeltaVecBlobMeta>> = None;
|
||||
let mut prev_idx = None;
|
||||
|
||||
// Scan the page versions backwards, starting from the last key in the range.
|
||||
// to collect all the offsets at which need to be read.
|
||||
let end_key = DeltaKey::from_key_lsn(&range.end, Lsn(end_lsn.0 - 1));
|
||||
// to collect all the offsets which need to be read.
|
||||
let start_key = DeltaKey::from_key_lsn(&range.start, start_lsn);
|
||||
tree_reader
|
||||
.visit(
|
||||
&end_key.0,
|
||||
VisitDirection::Backwards,
|
||||
&start_key.0,
|
||||
VisitDirection::Forwards,
|
||||
|raw_key, value| {
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
let entry_lsn = DeltaKey::extract_lsn_from_buf(raw_key);
|
||||
let lsn = DeltaKey::extract_lsn_from_buf(raw_key);
|
||||
let blob_ref = BlobRef(value);
|
||||
|
||||
if entry_lsn >= end_lsn {
|
||||
let (prev_key, prev_lsn, prev_blob) = match prev_idx {
|
||||
None => {
|
||||
prev_idx = Some((key, lsn, blob_ref));
|
||||
return true;
|
||||
}
|
||||
Some(prev) => prev,
|
||||
};
|
||||
|
||||
let key_start_lsn =
|
||||
std::cmp::max(reconstruct_state.get_cached_lsn(&key), Some(start_lsn));
|
||||
if Some(prev_lsn) < key_start_lsn || prev_lsn >= end_lsn {
|
||||
prev_idx = Some((key, lsn, blob_ref));
|
||||
return true;
|
||||
}
|
||||
|
||||
if key < range.start {
|
||||
if prev_key >= range.end {
|
||||
if let Some(read) = current_read.take() {
|
||||
reads.push(read);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
if key >= range.end {
|
||||
return true;
|
||||
}
|
||||
let extended = match &mut current_read {
|
||||
None => VectoredReadExtended::No,
|
||||
Some(read) => {
|
||||
// The index is traversed forwards. If the blob is `will_init`, it renders the already
|
||||
// accumulated blobs obsolete.
|
||||
if read.last_meta().map(|meta| meta.key) == Some(prev_key)
|
||||
&& prev_blob.will_init()
|
||||
{
|
||||
read.truncate_at(|meta| meta.key == prev_key);
|
||||
}
|
||||
|
||||
if Some(key) == ignore_key {
|
||||
return true;
|
||||
}
|
||||
read.extend(
|
||||
prev_blob.pos(),
|
||||
blob_ref.pos(),
|
||||
DeltaVecBlobMeta {
|
||||
key: prev_key,
|
||||
lsn: prev_lsn,
|
||||
},
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(cached_lsn) = reconstruct_state.get_cached_lsn(&key) {
|
||||
if entry_lsn <= cached_lsn {
|
||||
return key != range.start;
|
||||
}
|
||||
}
|
||||
|
||||
let blob_ref = BlobRef(value);
|
||||
let lsns_at = offsets.entry(key).or_default();
|
||||
lsns_at.push((entry_lsn, blob_ref.pos()));
|
||||
|
||||
if blob_ref.will_init() {
|
||||
if key == range.start {
|
||||
return false;
|
||||
} else {
|
||||
ignore_key = Some(key);
|
||||
return true;
|
||||
if extended == VectoredReadExtended::No {
|
||||
let next_read = VectoredRead::new(
|
||||
prev_blob.pos(),
|
||||
blob_ref.pos(),
|
||||
DeltaVecBlobMeta {
|
||||
key: prev_key,
|
||||
lsn: prev_lsn,
|
||||
},
|
||||
self.vectored_blob_reader.get_max_read_size(),
|
||||
);
|
||||
|
||||
let prev_read = current_read.replace(next_read);
|
||||
|
||||
if let Some(read) = prev_read {
|
||||
reads.push(read);
|
||||
}
|
||||
}
|
||||
|
||||
prev_idx = Some((key, lsn, blob_ref));
|
||||
true
|
||||
},
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
@@ -906,46 +959,63 @@ impl DeltaLayerInner {
|
||||
.map_err(|err| GetVectoredError::Other(anyhow!(err)))?;
|
||||
}
|
||||
|
||||
let ctx = &RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::DeltaLayerValue)
|
||||
.build();
|
||||
let mut ignore_key = None;
|
||||
|
||||
let cursor = file.block_cursor();
|
||||
let mut buf = Vec::new();
|
||||
for (key, lsns_at) in offsets {
|
||||
for (lsn, block_offset) in lsns_at {
|
||||
let res = cursor.read_blob_into_buf(block_offset, &mut buf, ctx).await;
|
||||
let mut buf = Some(BytesMut::with_capacity(
|
||||
self.vectored_blob_reader.get_max_read_size(),
|
||||
));
|
||||
for read in reads.into_iter().rev() {
|
||||
let res = self
|
||||
.vectored_blob_reader
|
||||
.read_blobs(&read, buf.take().expect("Should have a buffer"))
|
||||
.await;
|
||||
|
||||
if let Err(e) = res {
|
||||
reconstruct_state.on_key_error(
|
||||
key,
|
||||
PageReconstructError::from(anyhow!(e).context(format!(
|
||||
"Failed to read blob from virtual file {}",
|
||||
file.file.path
|
||||
))),
|
||||
);
|
||||
let blobs = match res {
|
||||
Ok(blobs) => blobs,
|
||||
Err(err) => {
|
||||
let kind = err.kind();
|
||||
for (_, blob_meta) in read.blobs_at.as_slice() {
|
||||
reconstruct_state.on_key_error(
|
||||
blob_meta.key,
|
||||
PageReconstructError::from(anyhow!(
|
||||
"Failed to read blobs from virtual file {}: {}",
|
||||
file.file.path,
|
||||
kind
|
||||
)),
|
||||
);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
break;
|
||||
for meta in blobs.metas.iter().rev() {
|
||||
if Some(meta.meta.key) == ignore_key {
|
||||
continue;
|
||||
}
|
||||
|
||||
let value = Value::des(&buf);
|
||||
let value = Value::des(&blobs.buf[meta.start..meta.end]);
|
||||
|
||||
if let Err(e) = value {
|
||||
reconstruct_state.on_key_error(
|
||||
key,
|
||||
meta.meta.key,
|
||||
PageReconstructError::from(anyhow!(e).context(format!(
|
||||
"Failed to deserialize file blob from virtual file {}",
|
||||
"Failed to deserialize blob from virtual file {}",
|
||||
file.file.path
|
||||
))),
|
||||
);
|
||||
|
||||
break;
|
||||
ignore_key = Some(meta.meta.key);
|
||||
continue;
|
||||
}
|
||||
|
||||
let key_situation = reconstruct_state.update_key(&key, lsn, value.unwrap());
|
||||
let key_situation =
|
||||
reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value.unwrap());
|
||||
if key_situation == ValueReconstructSituation::Complete {
|
||||
break;
|
||||
ignore_key = Some(meta.meta.key);
|
||||
}
|
||||
}
|
||||
|
||||
buf = Some(blobs.buf);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -267,6 +267,7 @@ impl Layer {
|
||||
pub(crate) async fn get_values_reconstruct_data(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
start_lsn: Lsn,
|
||||
end_lsn: Lsn,
|
||||
reconstruct_data: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
@@ -282,7 +283,14 @@ impl Layer {
|
||||
.record_access(LayerAccessKind::GetValueReconstructData, ctx);
|
||||
|
||||
layer
|
||||
.get_values_reconstruct_data(keyspace, end_lsn, reconstruct_data, &self.0, ctx)
|
||||
.get_values_reconstruct_data(
|
||||
keyspace,
|
||||
start_lsn,
|
||||
end_lsn,
|
||||
reconstruct_data,
|
||||
&self.0,
|
||||
ctx,
|
||||
)
|
||||
.instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
|
||||
.await
|
||||
}
|
||||
@@ -1299,9 +1307,14 @@ impl DownloadedLayer {
|
||||
owner.desc.key_range.clone(),
|
||||
owner.desc.lsn_range.clone(),
|
||||
));
|
||||
delta_layer::DeltaLayerInner::load(&owner.path, summary, ctx)
|
||||
.await
|
||||
.map(|res| res.map(LayerKind::Delta))
|
||||
delta_layer::DeltaLayerInner::load(
|
||||
&owner.path,
|
||||
summary,
|
||||
owner.conf.max_vectored_read_size,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map(|res| res.map(LayerKind::Delta))
|
||||
} else {
|
||||
let lsn = owner.desc.image_layer_lsn();
|
||||
let summary = Some(image_layer::Summary::expected(
|
||||
@@ -1310,9 +1323,15 @@ impl DownloadedLayer {
|
||||
owner.desc.key_range.clone(),
|
||||
lsn,
|
||||
));
|
||||
image_layer::ImageLayerInner::load(&owner.path, lsn, summary, owner.conf.max_vectored_read_size, ctx)
|
||||
.await
|
||||
.map(|res| res.map(LayerKind::Image))
|
||||
image_layer::ImageLayerInner::load(
|
||||
&owner.path,
|
||||
lsn,
|
||||
summary,
|
||||
owner.conf.max_vectored_read_size,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map(|res| res.map(LayerKind::Image))
|
||||
};
|
||||
|
||||
match res {
|
||||
@@ -1365,6 +1384,7 @@ impl DownloadedLayer {
|
||||
async fn get_values_reconstruct_data(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
start_lsn: Lsn,
|
||||
end_lsn: Lsn,
|
||||
reconstruct_data: &mut ValuesReconstructState,
|
||||
owner: &Arc<LayerInner>,
|
||||
@@ -1374,7 +1394,7 @@ impl DownloadedLayer {
|
||||
|
||||
match self.get(owner, ctx).await.map_err(GetVectoredError::from)? {
|
||||
Delta(d) => {
|
||||
d.get_values_reconstruct_data(keyspace, end_lsn, reconstruct_data, ctx)
|
||||
d.get_values_reconstruct_data(keyspace, start_lsn, end_lsn, reconstruct_data, ctx)
|
||||
.await
|
||||
}
|
||||
Image(i) => {
|
||||
|
||||
Reference in New Issue
Block a user