run Layer::get_value_reconstruct_data in spawn_blocking (#4498)

This PR concludes the "async `Layer::get_value_reconstruct_data`"
project.

The problem we're solving is that, before this patch, we'd execute
`Layer::get_value_reconstruct_data` on the tokio executor threads.
This function is IO- and/or CPU-intensive.
The IO is using VirtualFile / std::fs; hence it's blocking.
This results in unfairness towards other tokio tasks, especially under
(disk) load.

Some context can be found at
https://github.com/neondatabase/neon/issues/4154
where I suspect (but can't prove) load spikes of logical size
calculation to
cause heavy eviction skew.

Sadly we don't have tokio runtime/scheduler metrics to quantify the
unfairness.
But generally, we know blocking the executor threads on std::fs IO is
bad.
So, let's have this change and watch out for severe perf regressions in
staging & during rollout.

## Changes

* rename `Layer::get_value_reconstruct_data` to
`Layer::get_value_reconstruct_data_blocking`
* add a new blanket impl'd `Layer::get_value_reconstruct_data`
`async_trait` method that runs `get_value_reconstruct_data_blocking`
inside `spawn_blocking`.
* The `spawn_blocking` requires `'static` lifetime of the captured
variables; hence I had to change the data flow to _move_ the
`ValueReconstructState` into and back out of get_value_reconstruct_data
instead of passing a reference. It's a small struct, so I don't expect a
big performance penalty.

## Performance

Fundamentally, the code changes cause the following performance-relevant
changes:

* Latency & allocations: each `get_value_reconstruct_data` call now
makes a short-lived allocation because `async_trait` is just sugar for
boxed futures under the hood
* Latency: `spawn_blocking` adds some latency because it needs to move
the work to a thread pool
* using `spawn_blocking` plus the existing synchronous code inside is
probably more efficient better than switching all the synchronous code
to tokio::fs because _each_ tokio::fs call does `spawn_blocking` under
the hood.
* Throughput: the `spawn_blocking` thread pool is much larger than the
async executor thread pool. Hence, as long as the disks can keep up,
which they should according to AWS specs, we will be able to deliver
higher `get_value_reconstruct_data` throughput.
* Disk IOPS utilization: we will see higher disk utilization if we get
more throughput. Not a problem because the disks in prod are currently
under-utilized, according to node_exporter metrics & the AWS specs.
* CPU utilization: at higher throughput, CPU utilization will be higher.

Slightly higher latency under regular load is acceptable given the
throughput gains and expected better fairness during disk load peaks,
such as logical size calculation peaks uncovered in #4154.


## Full Stack Of Preliminary PRs

This PR builds on top of the following preliminary PRs

1. Clean-ups
  * https://github.com/neondatabase/neon/pull/4316
  * https://github.com/neondatabase/neon/pull/4317
  * https://github.com/neondatabase/neon/pull/4318
  * https://github.com/neondatabase/neon/pull/4319
  * https://github.com/neondatabase/neon/pull/4321
* Note: these were mostly to find an alternative to #4291, which I
thought we'd need in my original plan where we would need to convert
`Tenant::timelines` into an async locking primitive (#4333). In reviews,
we walked away from that, but these cleanups were still quite useful.
2. https://github.com/neondatabase/neon/pull/4364
3. https://github.com/neondatabase/neon/pull/4472
4. https://github.com/neondatabase/neon/pull/4476
5. https://github.com/neondatabase/neon/pull/4477
6. https://github.com/neondatabase/neon/pull/4485
7. https://github.com/neondatabase/neon/pull/4441
This commit is contained in:
Christian Schwarz
2023-06-26 11:43:11 +02:00
committed by GitHub
parent 44a441080d
commit 1faf69a698
6 changed files with 118 additions and 64 deletions

View File

@@ -12,7 +12,7 @@ use crate::context::RequestContext;
use crate::repository::{Key, Value};
use crate::task_mgr::TaskKind;
use crate::walrecord::NeonWalRecord;
use anyhow::Result;
use anyhow::{Context, Result};
use bytes::Bytes;
use enum_map::EnumMap;
use enumset::EnumSet;
@@ -343,7 +343,8 @@ impl LayerAccessStats {
/// All layers should implement a minimal `std::fmt::Debug` without tenant or
/// timeline names, because those are known in the context of which the layers
/// are used in (timeline).
pub trait Layer: std::fmt::Debug + Send + Sync {
#[async_trait::async_trait]
pub trait Layer: std::fmt::Debug + Send + Sync + 'static {
/// Range of keys that this layer covers
fn get_key_range(&self) -> Range<Key>;
@@ -373,13 +374,42 @@ pub trait Layer: std::fmt::Debug + Send + Sync {
/// is available. If this returns ValueReconstructResult::Continue, look up
/// the predecessor layer and call again with the same 'reconstruct_data' to
/// collect more data.
fn get_value_reconstruct_data(
fn get_value_reconstruct_data_blocking(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: &mut ValueReconstructState,
ctx: &RequestContext,
) -> Result<ValueReconstructResult>;
reconstruct_data: ValueReconstructState,
ctx: RequestContext,
) -> Result<(ValueReconstructState, ValueReconstructResult)>;
/// CANCEL SAFETY: if the returned future is dropped,
/// the wrapped closure still run to completion and the return value discarded.
/// For the case of get_value_reconstruct_data, we expect the closure to not
/// have any side effects, as it only attempts to read a layer (and stuff like
/// page cache isn't considered a real side effect).
/// But, ...
/// TRACING:
/// If the returned future is cancelled, the spawn_blocking span can outlive
/// the caller's span.
/// So, technically, we should be using `parent: None` and `follows_from: current`
/// instead. However, in practice, the advantage of maintaining the span stack
/// in logs outweighs the disadvantage of having a dangling span in a case that
/// is not expected to happen because in pageserver we generally don't drop pending futures.
async fn get_value_reconstruct_data(
self: Arc<Self>,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: ValueReconstructState,
ctx: RequestContext,
) -> Result<(ValueReconstructState, ValueReconstructResult)> {
let span = tracing::info_span!("get_value_reconstruct_data_spawn_blocking");
tokio::task::spawn_blocking(move || {
let _enter = span.enter();
self.get_value_reconstruct_data_blocking(key, lsn_range, reconstruct_data, ctx)
})
.await
.context("spawn_blocking")?
}
/// A short ID string that uniquely identifies the given layer within a [`LayerMap`].
fn short_id(&self) -> String;
@@ -499,6 +529,7 @@ impl LayerDescriptor {
}
}
#[async_trait::async_trait]
impl Layer for LayerDescriptor {
fn get_key_range(&self) -> Range<Key> {
self.key.clone()
@@ -512,13 +543,13 @@ impl Layer for LayerDescriptor {
self.is_incremental
}
fn get_value_reconstruct_data(
fn get_value_reconstruct_data_blocking(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
_reconstruct_data: ValueReconstructState,
_ctx: RequestContext,
) -> Result<(ValueReconstructState, ValueReconstructResult)> {
todo!("This method shouldn't be part of the Layer trait")
}

View File

@@ -218,6 +218,7 @@ impl std::fmt::Debug for DeltaLayerInner {
}
}
#[async_trait::async_trait]
impl Layer for DeltaLayer {
/// debugging function to print out the contents of the layer
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
@@ -294,13 +295,13 @@ impl Layer for DeltaLayer {
Ok(())
}
fn get_value_reconstruct_data(
fn get_value_reconstruct_data_blocking(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
mut reconstruct_state: ValueReconstructState,
ctx: RequestContext,
) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> {
ensure!(lsn_range.start >= self.desc.lsn_range.start);
let mut need_image = true;
@@ -308,7 +309,7 @@ impl Layer for DeltaLayer {
{
// Open the file and lock the metadata in memory
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;
// Scan the page versions backwards, starting from `lsn`.
let file = &inner.file;
@@ -374,9 +375,9 @@ impl Layer for DeltaLayer {
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok(ValueReconstructResult::Continue)
Ok((reconstruct_state, ValueReconstructResult::Continue))
} else {
Ok(ValueReconstructResult::Complete)
Ok((reconstruct_state, ValueReconstructResult::Complete))
}
}

View File

@@ -149,6 +149,7 @@ impl std::fmt::Debug for ImageLayerInner {
}
}
#[async_trait::async_trait]
impl Layer for ImageLayer {
/// debugging function to print out the contents of the layer
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
@@ -181,18 +182,18 @@ impl Layer for ImageLayer {
}
/// Look up given page in the file
fn get_value_reconstruct_data(
fn get_value_reconstruct_data_blocking(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
mut reconstruct_state: ValueReconstructState,
ctx: RequestContext,
) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> {
assert!(self.desc.key_range.contains(&key));
assert!(lsn_range.start >= self.lsn);
assert!(lsn_range.end >= self.lsn);
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;
let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
@@ -210,9 +211,9 @@ impl Layer for ImageLayer {
let value = Bytes::from(blob);
reconstruct_state.img = Some((self.lsn, value));
Ok(ValueReconstructResult::Complete)
Ok((reconstruct_state, ValueReconstructResult::Complete))
} else {
Ok(ValueReconstructResult::Missing)
Ok((reconstruct_state, ValueReconstructResult::Missing))
}
}

View File

@@ -110,6 +110,7 @@ impl InMemoryLayer {
}
}
#[async_trait::async_trait]
impl Layer for InMemoryLayer {
fn get_key_range(&self) -> Range<Key> {
Key::MIN..Key::MAX
@@ -190,13 +191,13 @@ impl Layer for InMemoryLayer {
}
/// Look up given value in the layer.
fn get_value_reconstruct_data(
fn get_value_reconstruct_data_blocking(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
mut reconstruct_state: ValueReconstructState,
_ctx: RequestContext,
) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> {
ensure!(lsn_range.start >= self.start_lsn);
let mut need_image = true;
@@ -213,7 +214,7 @@ impl Layer for InMemoryLayer {
match value {
Value::Image(img) => {
reconstruct_state.img = Some((*entry_lsn, img));
return Ok(ValueReconstructResult::Complete);
return Ok((reconstruct_state, ValueReconstructResult::Complete));
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
@@ -233,9 +234,9 @@ impl Layer for InMemoryLayer {
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok(ValueReconstructResult::Continue)
Ok((reconstruct_state, ValueReconstructResult::Continue))
} else {
Ok(ValueReconstructResult::Complete)
Ok((reconstruct_state, ValueReconstructResult::Complete))
}
}
}

View File

@@ -6,7 +6,7 @@ use crate::context::RequestContext;
use crate::repository::Key;
use crate::tenant::layer_map::BatchedUpdates;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::tenant::storage_layer::{Layer, ValueReconstructState};
use anyhow::{bail, Result};
use pageserver_api::models::HistoricLayerInfo;
use std::ops::Range;
@@ -21,7 +21,7 @@ use utils::{
use super::filename::{DeltaFileName, ImageFileName};
use super::{
DeltaLayer, ImageLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter, LayerKeyIter,
LayerResidenceStatus, PersistentLayer, PersistentLayerDesc,
LayerResidenceStatus, PersistentLayer, PersistentLayerDesc, ValueReconstructResult,
};
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
@@ -63,14 +63,15 @@ impl std::fmt::Debug for RemoteLayer {
}
}
#[async_trait::async_trait]
impl Layer for RemoteLayer {
fn get_value_reconstruct_data(
fn get_value_reconstruct_data_blocking(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_state: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
_reconstruct_state: ValueReconstructState,
_ctx: RequestContext,
) -> Result<(ValueReconstructState, ValueReconstructResult)> {
bail!(
"layer {} needs to be downloaded",
self.filename().file_name()

View File

@@ -555,13 +555,14 @@ impl Timeline {
None => None,
};
let mut reconstruct_state = ValueReconstructState {
let reconstruct_state = ValueReconstructState {
records: Vec::new(),
img: cached_page_img,
};
let timer = self.metrics.get_reconstruct_data_time_histo.start_timer();
self.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
let reconstruct_state = self
.get_reconstruct_data(key, lsn, reconstruct_state, ctx)
.await?;
timer.stop_and_record();
@@ -2352,9 +2353,9 @@ impl Timeline {
&self,
key: Key,
request_lsn: Lsn,
reconstruct_state: &mut ValueReconstructState,
mut reconstruct_state: ValueReconstructState,
ctx: &RequestContext,
) -> Result<(), PageReconstructError> {
) -> Result<ValueReconstructState, PageReconstructError> {
// Start from the current timeline.
let mut timeline_owned;
let mut timeline = self;
@@ -2384,12 +2385,12 @@ impl Timeline {
// The function should have updated 'state'
//info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
match result {
ValueReconstructResult::Complete => return Ok(()),
ValueReconstructResult::Complete => return Ok(reconstruct_state),
ValueReconstructResult::Continue => {
// If we reached an earlier cached page image, we're done.
if cont_lsn == cached_lsn + 1 {
MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
return Ok(());
return Ok(reconstruct_state);
}
if prev_lsn <= cont_lsn {
// Didn't make any progress in last iteration. Error out to avoid
@@ -2493,13 +2494,19 @@ impl Timeline {
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, start_lsn);
result = match open_layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
result = match Arc::clone(open_layer)
.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx.attached_child(),
)
.await
{
Ok((new_reconstruct_state, result)) => {
reconstruct_state = new_reconstruct_state;
result
}
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
@@ -2520,13 +2527,19 @@ impl Timeline {
if cont_lsn > start_lsn {
//info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
let lsn_floor = max(cached_lsn + 1, start_lsn);
result = match frozen_layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
result = match Arc::clone(frozen_layer)
.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx.attached_child(),
)
.await
{
Ok((new_reconstruct_state, result)) => {
reconstruct_state = new_reconstruct_state;
result
}
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
@@ -2555,13 +2568,19 @@ impl Timeline {
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, lsn_floor);
result = match layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
result = match Arc::clone(&layer)
.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx.attached_child(),
)
.await
{
Ok((new_reconstruct_state, result)) => {
reconstruct_state = new_reconstruct_state;
result
}
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;