Compare commits

...

10 Commits

Author SHA1 Message Date
Christian Schwarz
190c786310 use patched tokio to get different thread names for executor and spawn-blocking threads 2023-07-04 16:36:01 +02:00
Christian Schwarz
c49ceb807d add guage to measure queue depth for spawn_blocking queue 2023-07-04 16:34:31 +02:00
Christian Schwarz
1ea42041dc add metric for spawn_blocking start count (useful to compute queue depth) 2023-07-01 00:58:19 +02:00
Christian Schwarz
695c05520b add env var to limit Timeline::get concurrency 2023-07-01 00:45:11 +02:00
Christian Schwarz
c128343858 make spawn_blocking use configurable via env var 2023-07-01 00:37:54 +02:00
Christian Schwarz
2f01d0ed26 add histogram for get_value_reconstruct_data call duration
doesn't account for time spent allocating the boxed future,
but it's nice to have everything in one place
2023-06-30 23:25:57 +02:00
Dmitry Rodionov
2f2f3c92c1 unroll the metrics 2023-06-30 22:32:41 +03:00
Christian Schwarz
7ce8ad6d7a add env var to override tokio runtime builder max_blocking_threads() 2023-06-30 21:18:23 +02:00
Christian Schwarz
6a8cd9757b WIP: page_cache Prometheus metrics 2023-06-30 21:01:59 +02:00
Christian Schwarz
73acc17bf8 V2: run Layer::get_value_reconstruct_data in spawn_blocking
This time with metrics for queue depth.
2023-06-30 20:38:02 +02:00
11 changed files with 370 additions and 79 deletions

6
Cargo.lock generated
View File

@@ -4223,8 +4223,7 @@ dependencies = [
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.28.1" version = "1.28.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/problame/tokio.git?branch=problame/distinguish-core-and-worker-by-thread-name#d88791686cfc7fc7d010889ad7638d09646b3de7"
checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105"
dependencies = [ dependencies = [
"autocfg", "autocfg",
"bytes", "bytes",
@@ -4251,8 +4250,7 @@ dependencies = [
[[package]] [[package]]
name = "tokio-macros" name = "tokio-macros"
version = "2.1.0" version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/problame/tokio.git?branch=problame/distinguish-core-and-worker-by-thread-name#d88791686cfc7fc7d010889ad7638d09646b3de7"
checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",

View File

@@ -187,6 +187,8 @@ tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", re
# until async safekeepers patch is merged to the main. # until async safekeepers patch is merged to the main.
sharded-slab = { git = "https://github.com/neondatabase/sharded-slab.git", rev="98d16753ab01c61f0a028de44167307a00efea00" } sharded-slab = { git = "https://github.com/neondatabase/sharded-slab.git", rev="98d16753ab01c61f0a028de44167307a00efea00" }
tokio = { git = "https://github.com/problame/tokio.git", branch="problame/distinguish-core-and-worker-by-thread-name" }
################# Binary contents sections ################# Binary contents sections
[profile.release] [profile.release]

View File

@@ -130,6 +130,66 @@ pub static MATERIALIZED_PAGE_CACHE_HIT: Lazy<IntCounter> = Lazy::new(|| {
.expect("failed to define a metric") .expect("failed to define a metric")
}); });
pub static PAGE_CACHE_READ_ACCESSES: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_page_cache_read_accesses_total",
"Number of read accesses to the page cache",
&["key_kind"]
)
.expect("failed to define a metric")
});
pub static PAGE_CACHE_READ_ACCESSES_MATERIALIZED_PAGE: Lazy<IntCounter> = Lazy::new(|| {
PAGE_CACHE_READ_ACCESSES
.get_metric_with_label_values(&["materialized_page"])
.unwrap()
});
pub static PAGE_CACHE_READ_ACCESSES_EPHEMERAL: Lazy<IntCounter> = Lazy::new(|| {
PAGE_CACHE_READ_ACCESSES
.get_metric_with_label_values(&["ephemeral"])
.unwrap()
});
pub static PAGE_CACHE_READ_ACCESSES_IMMUTABLE: Lazy<IntCounter> = Lazy::new(|| {
PAGE_CACHE_READ_ACCESSES
.get_metric_with_label_values(&["immutable"])
.unwrap()
});
pub static PAGE_CACHE_READ_HITS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_page_cache_read_hits_total",
"Number of read accesses to the page cache that hit",
&["key_kind", "hit_kind"]
)
.expect("failed to define a metric")
});
pub static PAGE_CACHE_READ_HITS_EPHEMERAL: Lazy<IntCounter> = Lazy::new(|| {
PAGE_CACHE_READ_HITS
.get_metric_with_label_values(&["ephemeral", "-"])
.unwrap()
});
pub static PAGE_CACHE_READ_HITS_IMMUTABLE: Lazy<IntCounter> = Lazy::new(|| {
PAGE_CACHE_READ_HITS
.get_metric_with_label_values(&["immutable", "-"])
.unwrap()
});
pub static PAGE_CACHE_READ_HITS_MATERIALIZED_PAGE_EXACT: Lazy<IntCounter> = Lazy::new(|| {
PAGE_CACHE_READ_HITS
.get_metric_with_label_values(&["materialized_page", "exact"])
.unwrap()
});
pub static PAGE_CACHE_READ_HITS_MATERIALIZED_PAGE_OLDER_LSN: Lazy<IntCounter> = Lazy::new(|| {
PAGE_CACHE_READ_HITS
.get_metric_with_label_values(&["materialized_page", "older_lsn"])
.unwrap()
});
static WAIT_LSN_TIME: Lazy<HistogramVec> = Lazy::new(|| { static WAIT_LSN_TIME: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!( register_histogram_vec!(
"pageserver_wait_lsn_seconds", "pageserver_wait_lsn_seconds",
@@ -614,6 +674,79 @@ pub static WALRECEIVER_CANDIDATES_ADDED: Lazy<IntCounter> =
pub static WALRECEIVER_CANDIDATES_REMOVED: Lazy<IntCounter> = pub static WALRECEIVER_CANDIDATES_REMOVED: Lazy<IntCounter> =
Lazy::new(|| WALRECEIVER_CANDIDATES_EVENTS.with_label_values(&["remove"])); Lazy::new(|| WALRECEIVER_CANDIDATES_EVENTS.with_label_values(&["remove"]));
pub static LAYER_GET_VALUE_RECONSTRUCT_DATA_SPAWN_BLOCKING_STARTED_COUNT: Lazy<IntCounter> =
Lazy::new(|| {
register_int_counter!(
"pageserver_layer_get_value_reconstruct_data_spawn_blocking_started_count",
"Number of spawn_blocking calls made in Layer::get_value_reconstruct_data"
)
.expect("failed to define a metric")
});
pub static LAYER_GET_VALUE_RECONSTRUCT_DATA_SPAWN_BLOCKING_ACTIVE_GAUGE: Lazy<IntGauge> =
Lazy::new(|| {
register_int_gauge!(
"pageserver_layer_get_value_reconstruct_data_spawn_blocking_active_gauge",
"Number of spawn_blocking calls active in Layer::get_value_reconstruct_data"
)
.expect("failed to define a metric")
});
pub static LAYER_GET_VALUE_RECONSTRUCT_DATA_SPAWN_BLOCKING_QUEUE_DELAY: Lazy<Histogram> = Lazy::new(
|| {
register_histogram!(
"pageserver_layer_get_value_reconstruct_data_spawn_blocking_queue_delay_seconds",
"Time a Layer::get_value_reconstruct_data call spends in spawn_blocking queue until the first line of blockign code runs inside spawn_blocking",
vec![
0.000_005, 0.000_010, 0.000_025, 0.000_050, 0.000_100, 0.000_250, 0.000_500, 0.001_000,
0.002_500, 0.005_000, 0.010_000, 0.025_000, 0.050_000, 0.100_000, 0.250_000, 0.500_000,
1.000_000, 2.000_000, 5.000_000, 10.000_000, 25.000_000, 50.000_000, 100.000_000,
],
)
.expect("failed to define a metric")
},
);
pub static LAYER_GET_VALUE_RECONSTRUCT_DATA_COMPLETION_TIME: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_layer_get_value_reconstruct_data_completion_time_seconds",
"Time a Layer::get_value_reconstruct_data call takes to complete",
&["result"],
vec![
0.000_005,
0.000_010,
0.000_025,
0.000_050,
0.000_100,
0.000_250,
0.000_500,
0.001_000,
0.002_500,
0.005_000,
0.010_000,
0.025_000,
0.050_000,
0.100_000,
0.250_000,
0.500_000,
1.000_000,
2.000_000,
5.000_000,
10.000_000,
25.000_000,
50.000_000,
100.000_000,
]
)
.expect("failed to define a metric")
});
pub static LAYER_GET_VALUE_RECONSTRUCT_DATA_COMPLETION_TIME_OK: Lazy<Histogram> =
Lazy::new(|| LAYER_GET_VALUE_RECONSTRUCT_DATA_COMPLETION_TIME.with_label_values(&["ok"]));
pub static LAYER_GET_VALUE_RECONSTRUCT_DATA_COMPLETION_TIME_ERROR: Lazy<Histogram> =
Lazy::new(|| LAYER_GET_VALUE_RECONSTRUCT_DATA_COMPLETION_TIME.with_label_values(&["error"]));
// Metrics collected on WAL redo operations // Metrics collected on WAL redo operations
// //
// We collect the time spent in actual WAL redo ('redo'), and time waiting // We collect the time spent in actual WAL redo ('redo'), and time waiting

View File

@@ -313,6 +313,8 @@ impl PageCache {
key: &Key, key: &Key,
lsn: Lsn, lsn: Lsn,
) -> Option<(Lsn, PageReadGuard)> { ) -> Option<(Lsn, PageReadGuard)> {
crate::metrics::PAGE_CACHE_READ_ACCESSES_MATERIALIZED_PAGE.inc();
let mut cache_key = CacheKey::MaterializedPage { let mut cache_key = CacheKey::MaterializedPage {
hash_key: MaterializedPageHashKey { hash_key: MaterializedPageHashKey {
tenant_id, tenant_id,
@@ -323,8 +325,17 @@ impl PageCache {
}; };
if let Some(guard) = self.try_lock_for_read(&mut cache_key) { if let Some(guard) = self.try_lock_for_read(&mut cache_key) {
if let CacheKey::MaterializedPage { hash_key: _, lsn } = cache_key { if let CacheKey::MaterializedPage {
Some((lsn, guard)) hash_key: _,
lsn: available_lsn,
} = cache_key
{
if available_lsn == lsn {
crate::metrics::PAGE_CACHE_READ_HITS_MATERIALIZED_PAGE_EXACT.inc();
} else {
crate::metrics::PAGE_CACHE_READ_HITS_MATERIALIZED_PAGE_OLDER_LSN.inc();
}
Some((available_lsn, guard))
} else { } else {
panic!("unexpected key type in slot"); panic!("unexpected key type in slot");
} }
@@ -499,11 +510,31 @@ impl PageCache {
/// ``` /// ```
/// ///
fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result<ReadBufResult> { fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result<ReadBufResult> {
let (read_access, hit) = match cache_key {
CacheKey::MaterializedPage { .. } => {
unreachable!("Materialized pages use lookup_materialized_page")
}
CacheKey::EphemeralPage { .. } => (
&crate::metrics::PAGE_CACHE_READ_ACCESSES_EPHEMERAL,
&crate::metrics::PAGE_CACHE_READ_HITS_EPHEMERAL,
),
CacheKey::ImmutableFilePage { .. } => (
&crate::metrics::PAGE_CACHE_READ_ACCESSES_IMMUTABLE,
&crate::metrics::PAGE_CACHE_READ_HITS_IMMUTABLE,
),
};
read_access.inc();
let mut is_first_iteration = true;
loop { loop {
// First check if the key already exists in the cache. // First check if the key already exists in the cache.
if let Some(read_guard) = self.try_lock_for_read(cache_key) { if let Some(read_guard) = self.try_lock_for_read(cache_key) {
if is_first_iteration {
hit.inc();
}
return Ok(ReadBufResult::Found(read_guard)); return Ok(ReadBufResult::Found(read_guard));
} }
is_first_iteration = false;
// Not found. Find a victim buffer // Not found. Find a victim buffer
let (slot_idx, mut inner) = let (slot_idx, mut inner) =

View File

@@ -102,11 +102,33 @@ use crate::shutdown_pageserver;
// It's also good to avoid hogging all threads that would be needed to process // It's also good to avoid hogging all threads that would be needed to process
// other operations, if the upload tasks e.g. get blocked on locks. It shouldn't // other operations, if the upload tasks e.g. get blocked on locks. It shouldn't
// happen, but still. // happen, but still.
//
static PAGESERVER_TOKIO_MAX_BLOCKING_THREADS_OVERRIDE: Lazy<Option<usize>> = Lazy::new(|| {
let env_var: String = match std::env::var("PAGESERVER_TOKIO_MAX_BLOCKING_THREADS") {
Ok(v) => v,
Err(std::env::VarError::NotPresent) => {
debug!("env var PAGESERVER_TOKIO_MAX_BLOCKING_THREADS not set, using default");
return None;
}
Err(std::env::VarError::NotUnicode(_)) => {
panic!("env var PAGESERVER_TOKIO_MAX_BLOCKING_THREADS is not valid UTF-8");
}
};
let pool_size = match env_var.parse() {
Ok(v) => v,
Err(e) => {
panic!("Failed to parse PAGESERVER_TOKIO_MAX_BLOCKING_THREADS: {e:?}");
}
};
eprintln!("using spawn_blocking pool size override from env var PAGESERVER_TOKIO_MAX_BLOCKING_THREADS: {pool_size:?}");
Some(pool_size)
});
pub static COMPUTE_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| { pub static COMPUTE_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread() tokio::runtime::Builder::new_multi_thread()
.thread_name("compute request worker") .thread_name("compute request worker")
.enable_all() .enable_all()
.max_blocking_threads((*PAGESERVER_TOKIO_MAX_BLOCKING_THREADS_OVERRIDE).unwrap_or(512))
.build() .build()
.expect("Failed to create compute request runtime") .expect("Failed to create compute request runtime")
}); });
@@ -115,6 +137,7 @@ pub static MGMT_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread() tokio::runtime::Builder::new_multi_thread()
.thread_name("mgmt request worker") .thread_name("mgmt request worker")
.enable_all() .enable_all()
.max_blocking_threads((*PAGESERVER_TOKIO_MAX_BLOCKING_THREADS_OVERRIDE).unwrap_or(512))
.build() .build()
.expect("Failed to create mgmt request runtime") .expect("Failed to create mgmt request runtime")
}); });
@@ -123,6 +146,7 @@ pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread() tokio::runtime::Builder::new_multi_thread()
.thread_name("walreceiver worker") .thread_name("walreceiver worker")
.enable_all() .enable_all()
.max_blocking_threads((*PAGESERVER_TOKIO_MAX_BLOCKING_THREADS_OVERRIDE).unwrap_or(512))
.build() .build()
.expect("Failed to create walreceiver runtime") .expect("Failed to create walreceiver runtime")
}); });
@@ -131,6 +155,7 @@ pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread() tokio::runtime::Builder::new_multi_thread()
.thread_name("background op worker") .thread_name("background op worker")
.enable_all() .enable_all()
.max_blocking_threads((*PAGESERVER_TOKIO_MAX_BLOCKING_THREADS_OVERRIDE).unwrap_or(512))
.build() .build()
.expect("Failed to create background op runtime") .expect("Failed to create background op runtime")
}); });

View File

@@ -12,7 +12,7 @@ use crate::context::RequestContext;
use crate::repository::{Key, Value}; use crate::repository::{Key, Value};
use crate::task_mgr::TaskKind; use crate::task_mgr::TaskKind;
use crate::walrecord::NeonWalRecord; use crate::walrecord::NeonWalRecord;
use anyhow::Result; use anyhow::{Context, Result};
use bytes::Bytes; use bytes::Bytes;
use enum_map::EnumMap; use enum_map::EnumMap;
use enumset::EnumSet; use enumset::EnumSet;
@@ -24,7 +24,7 @@ use pageserver_api::models::{
use std::ops::Range; use std::ops::Range;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tracing::warn; use tracing::warn;
use utils::history_buffer::HistoryBufferWithDropCounter; use utils::history_buffer::HistoryBufferWithDropCounter;
use utils::rate_limit::RateLimit; use utils::rate_limit::RateLimit;
@@ -335,7 +335,8 @@ impl LayerAccessStats {
/// All layers should implement a minimal `std::fmt::Debug` without tenant or /// All layers should implement a minimal `std::fmt::Debug` without tenant or
/// timeline names, because those are known in the context of which the layers /// timeline names, because those are known in the context of which the layers
/// are used in (timeline). /// are used in (timeline).
pub trait Layer: std::fmt::Debug + Send + Sync { #[async_trait::async_trait]
pub trait Layer: std::fmt::Debug + Send + Sync + 'static {
/// Range of keys that this layer covers /// Range of keys that this layer covers
fn get_key_range(&self) -> Range<Key>; fn get_key_range(&self) -> Range<Key>;
@@ -365,13 +366,74 @@ pub trait Layer: std::fmt::Debug + Send + Sync {
/// is available. If this returns ValueReconstructResult::Continue, look up /// is available. If this returns ValueReconstructResult::Continue, look up
/// the predecessor layer and call again with the same 'reconstruct_data' to /// the predecessor layer and call again with the same 'reconstruct_data' to
/// collect more data. /// collect more data.
fn get_value_reconstruct_data( fn get_value_reconstruct_data_blocking(
&self, &self,
key: Key, key: Key,
lsn_range: Range<Lsn>, lsn_range: Range<Lsn>,
reconstruct_data: &mut ValueReconstructState, reconstruct_data: ValueReconstructState,
ctx: &RequestContext, ctx: RequestContext,
) -> Result<ValueReconstructResult>; ) -> Result<(ValueReconstructState, ValueReconstructResult)>;
/// CANCEL SAFETY: if the returned future is dropped,
/// the wrapped closure still run to completion and the return value discarded.
/// For the case of get_value_reconstruct_data, we expect the closure to not
/// have any side effects, as it only attempts to read a layer (and stuff like
/// page cache isn't considered a real side effect).
/// But, ...
/// TRACING:
/// If the returned future is cancelled, the spawn_blocking span can outlive
/// the caller's span.
/// So, technically, we should be using `parent: None` and `follows_from: current`
/// instead. However, in practice, the advantage of maintaining the span stack
/// in logs outweighs the disadvantage of having a dangling span in a case that
/// is not expected to happen because in pageserver we generally don't drop pending futures.
async fn get_value_reconstruct_data(
self: Arc<Self>,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: ValueReconstructState,
ctx: RequestContext,
) -> Result<(ValueReconstructState, ValueReconstructResult)> {
let span = tracing::info_span!("get_value_reconstruct_data_spawn_blocking");
static USE_SPAWN_BLOCKING: Lazy<bool> = Lazy::new(|| {
let val = std::env::var("PAGESERVER_LAYER_GET_RECONSTRUCT_DATA_USE_SPAWN_BLOCKING")
.map(|s| s == "1")
.unwrap_or(false);
tracing::info!("PAGESERVER_LAYER_GET_RECONSTRUCT_DATA_USE_SPAWN_BLOCKING={val}");
val
});
let use_spawn_blocking = *USE_SPAWN_BLOCKING;
let start = Instant::now();
let res = if !use_spawn_blocking {
anyhow::Ok(self.get_value_reconstruct_data_blocking(
key,
lsn_range,
reconstruct_data,
ctx,
))
} else {
crate::metrics::LAYER_GET_VALUE_RECONSTRUCT_DATA_SPAWN_BLOCKING_STARTED_COUNT.inc();
crate::metrics::LAYER_GET_VALUE_RECONSTRUCT_DATA_SPAWN_BLOCKING_ACTIVE_GAUGE.inc();
let res = tokio::task::spawn_blocking(move || {
crate::metrics::LAYER_GET_VALUE_RECONSTRUCT_DATA_SPAWN_BLOCKING_QUEUE_DELAY
.observe(start.elapsed().as_secs_f64());
let _enter = span.enter();
self.get_value_reconstruct_data_blocking(key, lsn_range, reconstruct_data, ctx)
})
.await
.context("spawn_blocking");
crate::metrics::LAYER_GET_VALUE_RECONSTRUCT_DATA_SPAWN_BLOCKING_ACTIVE_GAUGE.dec();
res
};
let histo = match &res {
Ok(Ok(_)) => &crate::metrics::LAYER_GET_VALUE_RECONSTRUCT_DATA_COMPLETION_TIME_OK,
Ok(Err(_)) | Err(_) => {
&crate::metrics::LAYER_GET_VALUE_RECONSTRUCT_DATA_COMPLETION_TIME_ERROR
}
};
histo.observe(start.elapsed().as_secs_f64());
res?
}
/// A short ID string that uniquely identifies the given layer within a [`LayerMap`]. /// A short ID string that uniquely identifies the given layer within a [`LayerMap`].
fn short_id(&self) -> String; fn short_id(&self) -> String;
@@ -483,17 +545,8 @@ pub mod tests {
} }
} }
#[async_trait::async_trait]
impl Layer for LayerDescriptor { impl Layer for LayerDescriptor {
fn get_value_reconstruct_data(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
todo!("This method shouldn't be part of the Layer trait")
}
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> { fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
todo!() todo!()
} }
@@ -508,6 +561,16 @@ pub mod tests {
self.layer_desc().lsn_range.clone() self.layer_desc().lsn_range.clone()
} }
fn get_value_reconstruct_data_blocking(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: ValueReconstructState,
_ctx: RequestContext,
) -> Result<(ValueReconstructState, ValueReconstructResult)> {
todo!("This method shouldn't be part of the Layer trait")
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers. /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn is_incremental(&self) -> bool { fn is_incremental(&self) -> bool {
self.layer_desc().is_incremental self.layer_desc().is_incremental

View File

@@ -218,6 +218,7 @@ impl std::fmt::Debug for DeltaLayerInner {
} }
} }
#[async_trait::async_trait]
impl Layer for DeltaLayer { impl Layer for DeltaLayer {
/// debugging function to print out the contents of the layer /// debugging function to print out the contents of the layer
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> { fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
@@ -294,13 +295,13 @@ impl Layer for DeltaLayer {
Ok(()) Ok(())
} }
fn get_value_reconstruct_data( fn get_value_reconstruct_data_blocking(
&self, &self,
key: Key, key: Key,
lsn_range: Range<Lsn>, lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState, mut reconstruct_state: ValueReconstructState,
ctx: &RequestContext, ctx: RequestContext,
) -> anyhow::Result<ValueReconstructResult> { ) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> {
ensure!(lsn_range.start >= self.desc.lsn_range.start); ensure!(lsn_range.start >= self.desc.lsn_range.start);
let mut need_image = true; let mut need_image = true;
@@ -308,7 +309,7 @@ impl Layer for DeltaLayer {
{ {
// Open the file and lock the metadata in memory // Open the file and lock the metadata in memory
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?; let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;
// Scan the page versions backwards, starting from `lsn`. // Scan the page versions backwards, starting from `lsn`.
let file = &inner.file; let file = &inner.file;
@@ -374,9 +375,9 @@ impl Layer for DeltaLayer {
// If an older page image is needed to reconstruct the page, let the // If an older page image is needed to reconstruct the page, let the
// caller know. // caller know.
if need_image { if need_image {
Ok(ValueReconstructResult::Continue) Ok((reconstruct_state, ValueReconstructResult::Continue))
} else { } else {
Ok(ValueReconstructResult::Complete) Ok((reconstruct_state, ValueReconstructResult::Complete))
} }
} }

View File

@@ -149,6 +149,7 @@ impl std::fmt::Debug for ImageLayerInner {
} }
} }
#[async_trait::async_trait]
impl Layer for ImageLayer { impl Layer for ImageLayer {
/// debugging function to print out the contents of the layer /// debugging function to print out the contents of the layer
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> { fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
@@ -181,18 +182,18 @@ impl Layer for ImageLayer {
} }
/// Look up given page in the file /// Look up given page in the file
fn get_value_reconstruct_data( fn get_value_reconstruct_data_blocking(
&self, &self,
key: Key, key: Key,
lsn_range: Range<Lsn>, lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState, mut reconstruct_state: ValueReconstructState,
ctx: &RequestContext, ctx: RequestContext,
) -> anyhow::Result<ValueReconstructResult> { ) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> {
assert!(self.desc.key_range.contains(&key)); assert!(self.desc.key_range.contains(&key));
assert!(lsn_range.start >= self.lsn); assert!(lsn_range.start >= self.lsn);
assert!(lsn_range.end >= self.lsn); assert!(lsn_range.end >= self.lsn);
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?; let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;
let file = inner.file.as_ref().unwrap(); let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file); let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
@@ -210,9 +211,9 @@ impl Layer for ImageLayer {
let value = Bytes::from(blob); let value = Bytes::from(blob);
reconstruct_state.img = Some((self.lsn, value)); reconstruct_state.img = Some((self.lsn, value));
Ok(ValueReconstructResult::Complete) Ok((reconstruct_state, ValueReconstructResult::Complete))
} else { } else {
Ok(ValueReconstructResult::Missing) Ok((reconstruct_state, ValueReconstructResult::Missing))
} }
} }

View File

@@ -110,6 +110,7 @@ impl InMemoryLayer {
} }
} }
#[async_trait::async_trait]
impl Layer for InMemoryLayer { impl Layer for InMemoryLayer {
fn get_key_range(&self) -> Range<Key> { fn get_key_range(&self) -> Range<Key> {
Key::MIN..Key::MAX Key::MIN..Key::MAX
@@ -190,13 +191,13 @@ impl Layer for InMemoryLayer {
} }
/// Look up given value in the layer. /// Look up given value in the layer.
fn get_value_reconstruct_data( fn get_value_reconstruct_data_blocking(
&self, &self,
key: Key, key: Key,
lsn_range: Range<Lsn>, lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState, mut reconstruct_state: ValueReconstructState,
_ctx: &RequestContext, _ctx: RequestContext,
) -> anyhow::Result<ValueReconstructResult> { ) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> {
ensure!(lsn_range.start >= self.start_lsn); ensure!(lsn_range.start >= self.start_lsn);
let mut need_image = true; let mut need_image = true;
@@ -213,7 +214,7 @@ impl Layer for InMemoryLayer {
match value { match value {
Value::Image(img) => { Value::Image(img) => {
reconstruct_state.img = Some((*entry_lsn, img)); reconstruct_state.img = Some((*entry_lsn, img));
return Ok(ValueReconstructResult::Complete); return Ok((reconstruct_state, ValueReconstructResult::Complete));
} }
Value::WalRecord(rec) => { Value::WalRecord(rec) => {
let will_init = rec.will_init(); let will_init = rec.will_init();
@@ -233,9 +234,9 @@ impl Layer for InMemoryLayer {
// If an older page image is needed to reconstruct the page, let the // If an older page image is needed to reconstruct the page, let the
// caller know. // caller know.
if need_image { if need_image {
Ok(ValueReconstructResult::Continue) Ok((reconstruct_state, ValueReconstructResult::Continue))
} else { } else {
Ok(ValueReconstructResult::Complete) Ok((reconstruct_state, ValueReconstructResult::Complete))
} }
} }
} }

View File

@@ -6,7 +6,7 @@ use crate::context::RequestContext;
use crate::repository::Key; use crate::repository::Key;
use crate::tenant::layer_map::BatchedUpdates; use crate::tenant::layer_map::BatchedUpdates;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata; use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; use crate::tenant::storage_layer::{Layer, ValueReconstructState};
use anyhow::{bail, Result}; use anyhow::{bail, Result};
use pageserver_api::models::HistoricLayerInfo; use pageserver_api::models::HistoricLayerInfo;
use std::ops::Range; use std::ops::Range;
@@ -21,7 +21,7 @@ use utils::{
use super::filename::{DeltaFileName, ImageFileName}; use super::filename::{DeltaFileName, ImageFileName};
use super::{ use super::{
DeltaLayer, ImageLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter, LayerKeyIter, DeltaLayer, ImageLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter, LayerKeyIter,
LayerResidenceStatus, PersistentLayer, PersistentLayerDesc, LayerResidenceStatus, PersistentLayer, PersistentLayerDesc, ValueReconstructResult,
}; };
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or /// RemoteLayer is a not yet downloaded [`ImageLayer`] or
@@ -63,14 +63,15 @@ impl std::fmt::Debug for RemoteLayer {
} }
} }
#[async_trait::async_trait]
impl Layer for RemoteLayer { impl Layer for RemoteLayer {
fn get_value_reconstruct_data( fn get_value_reconstruct_data_blocking(
&self, &self,
_key: Key, _key: Key,
_lsn_range: Range<Lsn>, _lsn_range: Range<Lsn>,
_reconstruct_state: &mut ValueReconstructState, _reconstruct_state: ValueReconstructState,
_ctx: &RequestContext, _ctx: RequestContext,
) -> Result<ValueReconstructResult> { ) -> Result<(ValueReconstructState, ValueReconstructResult)> {
bail!( bail!(
"layer {} needs to be downloaded", "layer {} needs to be downloaded",
self.filename().file_name() self.filename().file_name()

View File

@@ -8,7 +8,7 @@ use bytes::Bytes;
use fail::fail_point; use fail::fail_point;
use futures::StreamExt; use futures::StreamExt;
use itertools::Itertools; use itertools::Itertools;
use once_cell::sync::OnceCell; use once_cell::sync::{Lazy, OnceCell};
use pageserver_api::models::{ use pageserver_api::models::{
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceEventReason, LayerResidenceStatus, DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceEventReason, LayerResidenceStatus,
@@ -660,16 +660,33 @@ impl Timeline {
None => None, None => None,
}; };
let mut reconstruct_state = ValueReconstructState { let reconstruct_state = ValueReconstructState {
records: Vec::new(), records: Vec::new(),
img: cached_page_img, img: cached_page_img,
}; };
static GET_RECONSTRUCT_DATA_CONCURRENCY: Lazy<Option<usize>> = Lazy::new(|| {
std::env::var("PAGESERVER_TIMELINE_GET_RECONSTRUCT_DATA_CONCURRENCY_LIMIT")
.ok()
.and_then(|s| s.parse().ok())
});
static GET_RECONSTRUCT_DATA_SEMAPHORE: Lazy<Option<Semaphore>> =
Lazy::new(|| (*GET_RECONSTRUCT_DATA_CONCURRENCY).map(Semaphore::new));
let permit = if let Some(sem) = GET_RECONSTRUCT_DATA_SEMAPHORE.as_ref() {
Some(sem.acquire().await)
} else {
None
};
let timer = self.metrics.get_reconstruct_data_time_histo.start_timer(); let timer = self.metrics.get_reconstruct_data_time_histo.start_timer();
self.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx) let reconstruct_state = self
.get_reconstruct_data(key, lsn, reconstruct_state, ctx)
.await?; .await?;
timer.stop_and_record(); timer.stop_and_record();
drop(permit);
RECONSTRUCT_TIME RECONSTRUCT_TIME
.observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state)) .observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state))
} }
@@ -2495,9 +2512,9 @@ impl Timeline {
&self, &self,
key: Key, key: Key,
request_lsn: Lsn, request_lsn: Lsn,
reconstruct_state: &mut ValueReconstructState, mut reconstruct_state: ValueReconstructState,
ctx: &RequestContext, ctx: &RequestContext,
) -> Result<(), PageReconstructError> { ) -> Result<ValueReconstructState, PageReconstructError> {
// Start from the current timeline. // Start from the current timeline.
let mut timeline_owned; let mut timeline_owned;
let mut timeline = self; let mut timeline = self;
@@ -2527,12 +2544,12 @@ impl Timeline {
// The function should have updated 'state' // The function should have updated 'state'
//info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn); //info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
match result { match result {
ValueReconstructResult::Complete => return Ok(()), ValueReconstructResult::Complete => return Ok(reconstruct_state),
ValueReconstructResult::Continue => { ValueReconstructResult::Continue => {
// If we reached an earlier cached page image, we're done. // If we reached an earlier cached page image, we're done.
if cont_lsn == cached_lsn + 1 { if cont_lsn == cached_lsn + 1 {
MATERIALIZED_PAGE_CACHE_HIT.inc_by(1); MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
return Ok(()); return Ok(reconstruct_state);
} }
if prev_lsn <= cont_lsn { if prev_lsn <= cont_lsn {
// Didn't make any progress in last iteration. Error out to avoid // Didn't make any progress in last iteration. Error out to avoid
@@ -2637,13 +2654,19 @@ impl Timeline {
// Get all the data needed to reconstruct the page version from this layer. // Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that. // But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, start_lsn); let lsn_floor = max(cached_lsn + 1, start_lsn);
result = match open_layer.get_value_reconstruct_data( result = match Arc::clone(open_layer)
key, .get_value_reconstruct_data(
lsn_floor..cont_lsn, key,
reconstruct_state, lsn_floor..cont_lsn,
ctx, reconstruct_state,
) { ctx.attached_child(),
Ok(result) => result, )
.await
{
Ok((new_reconstruct_state, result)) => {
reconstruct_state = new_reconstruct_state;
result
}
Err(e) => return Err(PageReconstructError::from(e)), Err(e) => return Err(PageReconstructError::from(e)),
}; };
cont_lsn = lsn_floor; cont_lsn = lsn_floor;
@@ -2664,13 +2687,19 @@ impl Timeline {
if cont_lsn > start_lsn { if cont_lsn > start_lsn {
//info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display()); //info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
let lsn_floor = max(cached_lsn + 1, start_lsn); let lsn_floor = max(cached_lsn + 1, start_lsn);
result = match frozen_layer.get_value_reconstruct_data( result = match Arc::clone(frozen_layer)
key, .get_value_reconstruct_data(
lsn_floor..cont_lsn, key,
reconstruct_state, lsn_floor..cont_lsn,
ctx, reconstruct_state,
) { ctx.attached_child(),
Ok(result) => result, )
.await
{
Ok((new_reconstruct_state, result)) => {
reconstruct_state = new_reconstruct_state;
result
}
Err(e) => return Err(PageReconstructError::from(e)), Err(e) => return Err(PageReconstructError::from(e)),
}; };
cont_lsn = lsn_floor; cont_lsn = lsn_floor;
@@ -2700,13 +2729,19 @@ impl Timeline {
// Get all the data needed to reconstruct the page version from this layer. // Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that. // But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, lsn_floor); let lsn_floor = max(cached_lsn + 1, lsn_floor);
result = match layer.get_value_reconstruct_data( result = match Arc::clone(&layer)
key, .get_value_reconstruct_data(
lsn_floor..cont_lsn, key,
reconstruct_state, lsn_floor..cont_lsn,
ctx, reconstruct_state,
) { ctx.attached_child(),
Ok(result) => result, )
.await
{
Ok((new_reconstruct_state, result)) => {
reconstruct_state = new_reconstruct_state;
result
}
Err(e) => return Err(PageReconstructError::from(e)), Err(e) => return Err(PageReconstructError::from(e)),
}; };
cont_lsn = lsn_floor; cont_lsn = lsn_floor;