mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 18:02:56 +00:00
pageserver/image_layer: use vectored blob read
This commit updates the image layer to use the vectored disk blob read. The max size config is also threaded to the image layer.
This commit is contained in:
@@ -27,7 +27,7 @@ use crate::config::PageServerConf;
|
||||
use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
use crate::repository::{Key, Value, KEY_SIZE};
|
||||
use crate::tenant::blob_io::BlobWriter;
|
||||
use crate::tenant::blob_io::{BlobWriter, VectoredBlobReader, VectoredRead, VectoredReadExtended};
|
||||
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
|
||||
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
||||
use crate::tenant::storage_layer::{
|
||||
@@ -38,7 +38,7 @@ use crate::tenant::{PageReconstructError, Timeline};
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use hex;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
@@ -154,6 +154,7 @@ pub struct ImageLayerInner {
|
||||
|
||||
/// Reader object for reading blocks from the file.
|
||||
file: FileBlockReader,
|
||||
vectored_blob_reader: VectoredBlobReader,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for ImageLayerInner {
|
||||
@@ -210,7 +211,7 @@ impl ImageLayer {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let inner = self.load(LayerAccessKind::Dump, ctx).await?;
|
||||
let inner = self.load(LayerAccessKind::Dump, 0, ctx).await?;
|
||||
|
||||
inner.dump(ctx).await?;
|
||||
|
||||
@@ -240,21 +241,32 @@ impl ImageLayer {
|
||||
async fn load(
|
||||
&self,
|
||||
access_kind: LayerAccessKind,
|
||||
max_vectored_read_size: usize,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<&ImageLayerInner> {
|
||||
self.access_stats.record_access(access_kind, ctx);
|
||||
self.inner
|
||||
.get_or_try_init(|| self.load_inner(ctx))
|
||||
.get_or_try_init(|| self.load_inner(max_vectored_read_size, ctx))
|
||||
.await
|
||||
.with_context(|| format!("Failed to load image layer {}", self.path()))
|
||||
}
|
||||
|
||||
async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
|
||||
async fn load_inner(
|
||||
&self,
|
||||
max_vectored_read_size: usize,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<ImageLayerInner> {
|
||||
let path = self.path();
|
||||
|
||||
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, ctx)
|
||||
.await
|
||||
.and_then(|res| res)?;
|
||||
let loaded = ImageLayerInner::load(
|
||||
&path,
|
||||
self.desc.image_layer_lsn(),
|
||||
None,
|
||||
max_vectored_read_size,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.and_then(|res| res)?;
|
||||
|
||||
// not production code
|
||||
let actual_filename = path.file_name().unwrap().to_owned();
|
||||
@@ -353,6 +365,11 @@ impl ImageLayer {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
struct ImageVecBlobMeta {
|
||||
pub key: Key,
|
||||
}
|
||||
|
||||
impl ImageLayerInner {
|
||||
/// Returns nested result following Result<Result<_, OpErr>, Critical>:
|
||||
/// - inner has the success or transient failure
|
||||
@@ -361,14 +378,15 @@ impl ImageLayerInner {
|
||||
path: &Utf8Path,
|
||||
lsn: Lsn,
|
||||
summary: Option<Summary>,
|
||||
max_vectored_read_size: usize,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
|
||||
let file = match VirtualFile::open(path).await {
|
||||
Ok(file) => file,
|
||||
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
|
||||
};
|
||||
let file = FileBlockReader::new(file);
|
||||
let summary_blk = match file.read_blk(0, ctx).await {
|
||||
let block_reader = FileBlockReader::new(file);
|
||||
let summary_blk = match block_reader.read_blk(0, ctx).await {
|
||||
Ok(blk) => blk,
|
||||
Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
|
||||
};
|
||||
@@ -394,11 +412,19 @@ impl ImageLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: don't open file twice
|
||||
let file = match VirtualFile::open(path).await {
|
||||
Ok(file) => file,
|
||||
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
|
||||
};
|
||||
let vectored_blob_reader = VectoredBlobReader::new(file, max_vectored_read_size);
|
||||
|
||||
Ok(Ok(ImageLayerInner {
|
||||
index_start_blk: actual_summary.index_start_blk,
|
||||
index_root_blk: actual_summary.index_root_blk,
|
||||
lsn,
|
||||
file,
|
||||
file: block_reader,
|
||||
vectored_blob_reader,
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -452,9 +478,12 @@ impl ImageLayerInner {
|
||||
let file = &self.file;
|
||||
let tree_reader = DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, file);
|
||||
|
||||
let mut blocks = Vec::new();
|
||||
let mut reads = Vec::new();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut current_read: Option<VectoredRead<ImageVecBlobMeta>> = None;
|
||||
let mut prev_idx = None;
|
||||
|
||||
let mut raw_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
range.start.write_to_byte_slice(&mut raw_key);
|
||||
|
||||
@@ -462,19 +491,48 @@ impl ImageLayerInner {
|
||||
.visit(
|
||||
&raw_key,
|
||||
VisitDirection::Forwards,
|
||||
|raw_key, value| {
|
||||
|raw_key, offset| {
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
|
||||
if key < range.start {
|
||||
return true;
|
||||
}
|
||||
let (prev_key, prev_offset) = match prev_idx {
|
||||
None => {
|
||||
prev_idx = Some((key, offset));
|
||||
return true;
|
||||
}
|
||||
Some(prev) => prev,
|
||||
};
|
||||
|
||||
if prev_key >= range.end {
|
||||
if let Some(read) = current_read.take() {
|
||||
reads.push(read);
|
||||
}
|
||||
|
||||
if key >= range.end {
|
||||
return false;
|
||||
}
|
||||
|
||||
blocks.push((key, value));
|
||||
let extended = match &mut current_read {
|
||||
None => VectoredReadExtended::No,
|
||||
Some(read) => {
|
||||
read.extend(prev_offset, offset, ImageVecBlobMeta { key: prev_key })
|
||||
}
|
||||
};
|
||||
|
||||
if extended == VectoredReadExtended::No {
|
||||
let next_read = VectoredRead::new(
|
||||
prev_offset,
|
||||
offset,
|
||||
ImageVecBlobMeta { key: prev_key },
|
||||
self.vectored_blob_reader.get_max_read_size(),
|
||||
);
|
||||
|
||||
let prev_read = current_read.replace(next_read);
|
||||
|
||||
if let Some(read) = prev_read {
|
||||
reads.push(read);
|
||||
}
|
||||
}
|
||||
|
||||
prev_idx = Some((key, offset));
|
||||
true
|
||||
},
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
@@ -485,28 +543,39 @@ impl ImageLayerInner {
|
||||
.map_err(|err| GetVectoredError::Other(anyhow!(err)))?;
|
||||
}
|
||||
|
||||
let ctx = &RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::ImageLayerValue)
|
||||
.build();
|
||||
let mut buf = Some(BytesMut::with_capacity(
|
||||
self.vectored_blob_reader.get_max_read_size(),
|
||||
));
|
||||
for read in reads.into_iter().rev() {
|
||||
let res = self
|
||||
.vectored_blob_reader
|
||||
.read_blobs(&read, buf.take().expect("Should have a buffer"))
|
||||
.await;
|
||||
|
||||
let cursor = file.block_cursor();
|
||||
let mut buf = Vec::new();
|
||||
for (key, offset) in blocks {
|
||||
let res = cursor.read_blob_into_buf(offset, &mut buf, ctx).await;
|
||||
if let Err(e) = res {
|
||||
reconstruct_state.on_key_error(
|
||||
key,
|
||||
PageReconstructError::from(anyhow!(e).context(format!(
|
||||
"Failed to read blob from virtual file {}",
|
||||
file.file.path
|
||||
))),
|
||||
);
|
||||
let blobs = match res {
|
||||
Ok(blobs) => blobs,
|
||||
Err(err) => {
|
||||
let kind = err.kind();
|
||||
for (_, blob_meta) in read.blobs_at.as_slice() {
|
||||
reconstruct_state.on_key_error(
|
||||
blob_meta.key,
|
||||
PageReconstructError::from(anyhow!(
|
||||
"Failed to read blobs from virtual file {}: {}",
|
||||
file.file.path,
|
||||
kind
|
||||
)),
|
||||
);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
continue;
|
||||
for meta in blobs.metas.iter().rev() {
|
||||
let img_buf = Bytes::copy_from_slice(&blobs.buf[meta.start..meta.end]);
|
||||
reconstruct_state.update_key(&meta.meta.key, self.lsn, Value::Image(img_buf));
|
||||
}
|
||||
|
||||
let blob = Bytes::copy_from_slice(buf.as_slice());
|
||||
reconstruct_state.update_key(&key, self.lsn, Value::Image(blob));
|
||||
buf = Some(blobs.buf);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -1310,7 +1310,7 @@ impl DownloadedLayer {
|
||||
owner.desc.key_range.clone(),
|
||||
lsn,
|
||||
));
|
||||
image_layer::ImageLayerInner::load(&owner.path, lsn, summary, ctx)
|
||||
image_layer::ImageLayerInner::load(&owner.path, lsn, summary, owner.conf.max_vectored_read_size, ctx)
|
||||
.await
|
||||
.map(|res| res.map(LayerKind::Image))
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user