asyncify get_value_reconstruct_data (impls still use sync IO)

This commit is contained in:
Christian Schwarz
2023-05-10 20:25:47 +02:00
parent 84a4f48ec5
commit 6718cf0ae6
6 changed files with 218 additions and 182 deletions

View File

@@ -22,6 +22,7 @@ use pageserver_api::models::{
};
use std::ops::Range;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tracing::warn;
@@ -335,6 +336,13 @@ impl LayerAccessStats {
}
}
pub(crate) type GetValueReconstructFuture = Pin<
Box<
dyn Send
+ std::future::Future<Output = Result<(ValueReconstructState, ValueReconstructResult)>>,
>,
>;
/// Supertrait of the [`Layer`] trait that captures the bare minimum interface
/// required by [`LayerMap`].
///
@@ -372,12 +380,12 @@ pub trait Layer: std::fmt::Debug + Send + Sync {
/// the predecessor layer and call again with the same 'reconstruct_data' to
/// collect more data.
fn get_value_reconstruct_data(
&self,
self: Arc<Self>,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: &mut ValueReconstructState,
ctx: &RequestContext,
) -> Result<ValueReconstructResult>;
reconstruct_data: ValueReconstructState,
ctx: RequestContext,
) -> GetValueReconstructFuture;
/// A short ID string that uniquely identifies the given layer within a [`LayerMap`].
fn short_id(&self) -> String;
@@ -486,12 +494,12 @@ impl Layer for LayerDescriptor {
}
fn get_value_reconstruct_data(
&self,
self: Arc<Self>,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
_reconstruct_data: ValueReconstructState,
_ctx: RequestContext,
) -> GetValueReconstructFuture {
todo!("This method shouldn't be part of the Layer trait")
}

View File

@@ -46,7 +46,7 @@ use std::io::{Seek, SeekFrom};
use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
use tracing::*;
use utils::{
@@ -56,8 +56,8 @@ use utils::{
};
use super::{
DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerFileName, LayerIter,
LayerKeyIter, PathOrConf,
DeltaFileName, GetValueReconstructFuture, Layer, LayerAccessStats, LayerAccessStatsReset,
LayerFileName, LayerIter, LayerKeyIter, PathOrConf,
};
///
@@ -318,89 +318,91 @@ impl Layer for DeltaLayer {
}
fn get_value_reconstruct_data(
&self,
self: Arc<Self>,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
ensure!(lsn_range.start >= self.lsn_range.start);
let mut need_image = true;
mut reconstruct_state: ValueReconstructState,
ctx: RequestContext,
) -> GetValueReconstructFuture {
Box::pin(async move {
ensure!(lsn_range.start >= self.lsn_range.start);
let mut need_image = true;
ensure!(self.key_range.contains(&key));
ensure!(self.key_range.contains(&key));
{
// Open the file and lock the metadata in memory
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
{
// Open the file and lock the metadata in memory
let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;
// Scan the page versions backwards, starting from `lsn`.
let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
inner.index_start_blk,
inner.index_root_blk,
file,
);
let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
// Scan the page versions backwards, starting from `lsn`.
let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
inner.index_start_blk,
inner.index_root_blk,
file,
);
let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
let mut offsets: Vec<(Lsn, u64)> = Vec::new();
let mut offsets: Vec<(Lsn, u64)> = Vec::new();
tree_reader.visit(&search_key.0, VisitDirection::Backwards, |key, value| {
let blob_ref = BlobRef(value);
if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
return false;
}
let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
if entry_lsn < lsn_range.start {
return false;
}
offsets.push((entry_lsn, blob_ref.pos()));
!blob_ref.will_init()
})?;
// Ok, 'offsets' now contains the offsets of all the entries we need to read
let mut cursor = file.block_cursor();
let mut buf = Vec::new();
for (entry_lsn, pos) in offsets {
cursor.read_blob_into_buf(pos, &mut buf).with_context(|| {
format!(
"Failed to read blob from virtual file {}",
file.file.path.display()
)
})?;
let val = Value::des(&buf).with_context(|| {
format!(
"Failed to deserialize file blob from virtual file {}",
file.file.path.display()
)
})?;
match val {
Value::Image(img) => {
reconstruct_state.img = Some((entry_lsn, img));
need_image = false;
break;
tree_reader.visit(&search_key.0, VisitDirection::Backwards, |key, value| {
let blob_ref = BlobRef(value);
if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
return false;
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
if entry_lsn < lsn_range.start {
return false;
}
offsets.push((entry_lsn, blob_ref.pos()));
!blob_ref.will_init()
})?;
// Ok, 'offsets' now contains the offsets of all the entries we need to read
let mut cursor = file.block_cursor();
let mut buf = Vec::new();
for (entry_lsn, pos) in offsets {
cursor.read_blob_into_buf(pos, &mut buf).with_context(|| {
format!(
"Failed to read blob from virtual file {}",
file.file.path.display()
)
})?;
let val = Value::des(&buf).with_context(|| {
format!(
"Failed to deserialize file blob from virtual file {}",
file.file.path.display()
)
})?;
match val {
Value::Image(img) => {
reconstruct_state.img = Some((entry_lsn, img));
need_image = false;
break;
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
}
}
}
}
// release metadata lock and close the file
}
// release metadata lock and close the file
}
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok(ValueReconstructResult::Continue)
} else {
Ok(ValueReconstructResult::Complete)
}
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok((reconstruct_state, ValueReconstructResult::Continue))
} else {
Ok((reconstruct_state, ValueReconstructResult::Complete))
}
})
}
}

View File

@@ -43,7 +43,7 @@ use std::io::{Seek, SeekFrom};
use std::ops::Range;
use std::os::unix::prelude::FileExt;
use std::path::{Path, PathBuf};
use std::sync::{RwLock, RwLockReadGuard};
use std::sync::{Arc, RwLock, RwLockReadGuard};
use tracing::*;
use utils::{
@@ -53,7 +53,7 @@ use utils::{
};
use super::filename::{ImageFileName, LayerFileName};
use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf};
use super::{GetValueReconstructFuture, Layer, LayerAccessStatsReset, LayerIter, PathOrConf};
///
/// Header stored in the beginning of the file
@@ -197,38 +197,41 @@ impl Layer for ImageLayer {
/// Look up given page in the file
fn get_value_reconstruct_data(
&self,
self: Arc<Self>,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
assert!(self.key_range.contains(&key));
assert!(lsn_range.start >= self.lsn);
assert!(lsn_range.end >= self.lsn);
mut reconstruct_state: ValueReconstructState,
ctx: RequestContext,
) -> GetValueReconstructFuture {
Box::pin(async move {
assert!(self.key_range.contains(&key));
assert!(lsn_range.start >= self.lsn);
assert!(lsn_range.end >= self.lsn);
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;
let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
let file = inner.file.as_ref().unwrap();
let tree_reader =
DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
if let Some(offset) = tree_reader.get(&keybuf)? {
let blob = file.block_cursor().read_blob(offset).with_context(|| {
format!(
"failed to read value from data file {} at offset {}",
self.path().display(),
offset
)
})?;
let value = Bytes::from(blob);
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
if let Some(offset) = tree_reader.get(&keybuf)? {
let blob = file.block_cursor().read_blob(offset).with_context(|| {
format!(
"failed to read value from data file {} at offset {}",
self.path().display(),
offset
)
})?;
let value = Bytes::from(blob);
reconstruct_state.img = Some((self.lsn, value));
Ok(ValueReconstructResult::Complete)
} else {
Ok(ValueReconstructResult::Missing)
}
reconstruct_state.img = Some((self.lsn, value));
Ok((reconstruct_state, ValueReconstructResult::Complete))
} else {
Ok((reconstruct_state, ValueReconstructResult::Missing))
}
})
}
}

View File

@@ -27,9 +27,9 @@ use utils::{
// while being able to use std::fmt::Write's methods
use std::fmt::Write as _;
use std::ops::Range;
use std::sync::RwLock;
use std::sync::{Arc, RwLock};
use super::{DeltaLayer, DeltaLayerWriter, Layer};
use super::{DeltaLayer, DeltaLayerWriter, GetValueReconstructFuture, Layer};
thread_local! {
/// A buffer for serializing object during [`InMemoryLayer::put_value`].
@@ -191,52 +191,54 @@ impl Layer for InMemoryLayer {
/// Look up given value in the layer.
fn get_value_reconstruct_data(
&self,
self: Arc<Self>,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
ensure!(lsn_range.start >= self.start_lsn);
let mut need_image = true;
mut reconstruct_state: ValueReconstructState,
_ctx: RequestContext,
) -> GetValueReconstructFuture {
Box::pin(async move {
ensure!(lsn_range.start >= self.start_lsn);
let mut need_image = true;
let inner = self.inner.read().unwrap();
let inner = self.inner.read().unwrap();
let mut reader = inner.file.block_cursor();
let mut reader = inner.file.block_cursor();
// Scan the page versions backwards, starting from `lsn`.
if let Some(vec_map) = inner.index.get(&key) {
let slice = vec_map.slice_range(lsn_range);
for (entry_lsn, pos) in slice.iter().rev() {
let buf = reader.read_blob(*pos)?;
let value = Value::des(&buf)?;
match value {
Value::Image(img) => {
reconstruct_state.img = Some((*entry_lsn, img));
return Ok(ValueReconstructResult::Complete);
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((*entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
// Scan the page versions backwards, starting from `lsn`.
if let Some(vec_map) = inner.index.get(&key) {
let slice = vec_map.slice_range(lsn_range);
for (entry_lsn, pos) in slice.iter().rev() {
let buf = reader.read_blob(*pos)?;
let value = Value::des(&buf)?;
match value {
Value::Image(img) => {
reconstruct_state.img = Some((*entry_lsn, img));
return Ok((reconstruct_state, ValueReconstructResult::Complete));
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((*entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
}
}
}
}
}
}
// release lock on 'inner'
// release lock on 'inner'
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok(ValueReconstructResult::Continue)
} else {
Ok(ValueReconstructResult::Complete)
}
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok((reconstruct_state, ValueReconstructResult::Continue))
} else {
Ok((reconstruct_state, ValueReconstructResult::Complete))
}
})
}
}

View File

@@ -6,7 +6,7 @@ use crate::context::RequestContext;
use crate::repository::Key;
use crate::tenant::layer_map::BatchedUpdates;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::tenant::storage_layer::{Layer, ValueReconstructState};
use anyhow::{bail, Result};
use pageserver_api::models::HistoricLayerInfo;
use std::ops::Range;
@@ -21,8 +21,8 @@ use utils::{
use super::filename::{DeltaFileName, ImageFileName, LayerFileName};
use super::image_layer::ImageLayer;
use super::{
DeltaLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter, LayerKeyIter,
LayerResidenceStatus, PersistentLayer,
DeltaLayer, GetValueReconstructFuture, LayerAccessStats, LayerAccessStatsReset, LayerIter,
LayerKeyIter, LayerResidenceStatus, PersistentLayer,
};
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
@@ -83,16 +83,18 @@ impl Layer for RemoteLayer {
}
fn get_value_reconstruct_data(
&self,
self: Arc<Self>,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_state: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
bail!(
"layer {} needs to be downloaded",
self.filename().file_name()
);
_reconstruct_state: ValueReconstructState,
_ctx: RequestContext,
) -> GetValueReconstructFuture {
Box::pin(async move {
bail!(
"layer {} needs to be downloaded",
self.filename().file_name()
);
})
}
fn is_incremental(&self) -> bool {

View File

@@ -504,12 +504,13 @@ impl Timeline {
None => None,
};
let mut reconstruct_state = ValueReconstructState {
let reconstruct_state = ValueReconstructState {
records: Vec::new(),
img: cached_page_img,
};
self.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
let reconstruct_state = self
.get_reconstruct_data(key, lsn, reconstruct_state, ctx)
.await?;
self.metrics
@@ -2146,9 +2147,9 @@ impl Timeline {
&self,
key: Key,
request_lsn: Lsn,
reconstruct_state: &mut ValueReconstructState,
mut reconstruct_state: ValueReconstructState,
ctx: &RequestContext,
) -> Result<(), PageReconstructError> {
) -> Result<ValueReconstructState, PageReconstructError> {
// Start from the current timeline.
let mut timeline_owned;
let mut timeline = self;
@@ -2175,12 +2176,12 @@ impl Timeline {
// The function should have updated 'state'
//info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
match result {
ValueReconstructResult::Complete => return Ok(()),
ValueReconstructResult::Complete => return Ok(reconstruct_state),
ValueReconstructResult::Continue => {
// If we reached an earlier cached page image, we're done.
if cont_lsn == cached_lsn + 1 {
self.metrics.materialized_page_cache_hit_counter.inc_by(1);
return Ok(());
return Ok(reconstruct_state);
}
if prev_lsn <= cont_lsn {
// Didn't make any progress in last iteration. Error out to avoid
@@ -2237,13 +2238,19 @@ impl Timeline {
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, start_lsn);
result = match open_layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
result = match Arc::clone(open_layer)
.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx.attached_child(),
)
.await
{
Ok((new_reconstruct_state, result)) => {
reconstruct_state = new_reconstruct_state;
result
}
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
@@ -2263,13 +2270,19 @@ impl Timeline {
if cont_lsn > start_lsn {
//info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
let lsn_floor = max(cached_lsn + 1, start_lsn);
result = match frozen_layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
result = match Arc::clone(frozen_layer)
.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx.attached_child(),
)
.await
{
Ok((new_reconstruct_state, result)) => {
reconstruct_state = new_reconstruct_state;
result
}
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
@@ -2297,13 +2310,19 @@ impl Timeline {
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, lsn_floor);
result = match layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
result = match Arc::clone(&layer)
.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx.attached_child(),
)
.await
{
Ok((new_reconstruct_state, result)) => {
reconstruct_state = new_reconstruct_state;
result
}
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;