mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
10 Commits
arpad/log_
...
problame/s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
190c786310 | ||
|
|
c49ceb807d | ||
|
|
1ea42041dc | ||
|
|
695c05520b | ||
|
|
c128343858 | ||
|
|
2f01d0ed26 | ||
|
|
2f2f3c92c1 | ||
|
|
7ce8ad6d7a | ||
|
|
6a8cd9757b | ||
|
|
73acc17bf8 |
6
Cargo.lock
generated
6
Cargo.lock
generated
@@ -4223,8 +4223,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "tokio"
|
||||
version = "1.28.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105"
|
||||
source = "git+https://github.com/problame/tokio.git?branch=problame/distinguish-core-and-worker-by-thread-name#d88791686cfc7fc7d010889ad7638d09646b3de7"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"bytes",
|
||||
@@ -4251,8 +4250,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "tokio-macros"
|
||||
version = "2.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
|
||||
source = "git+https://github.com/problame/tokio.git?branch=problame/distinguish-core-and-worker-by-thread-name#d88791686cfc7fc7d010889ad7638d09646b3de7"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
|
||||
@@ -187,6 +187,8 @@ tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", re
|
||||
# until async safekeepers patch is merged to the main.
|
||||
sharded-slab = { git = "https://github.com/neondatabase/sharded-slab.git", rev="98d16753ab01c61f0a028de44167307a00efea00" }
|
||||
|
||||
tokio = { git = "https://github.com/problame/tokio.git", branch="problame/distinguish-core-and-worker-by-thread-name" }
|
||||
|
||||
################# Binary contents sections
|
||||
|
||||
[profile.release]
|
||||
|
||||
@@ -130,6 +130,66 @@ pub static MATERIALIZED_PAGE_CACHE_HIT: Lazy<IntCounter> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub static PAGE_CACHE_READ_ACCESSES: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_page_cache_read_accesses_total",
|
||||
"Number of read accesses to the page cache",
|
||||
&["key_kind"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub static PAGE_CACHE_READ_ACCESSES_MATERIALIZED_PAGE: Lazy<IntCounter> = Lazy::new(|| {
|
||||
PAGE_CACHE_READ_ACCESSES
|
||||
.get_metric_with_label_values(&["materialized_page"])
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static PAGE_CACHE_READ_ACCESSES_EPHEMERAL: Lazy<IntCounter> = Lazy::new(|| {
|
||||
PAGE_CACHE_READ_ACCESSES
|
||||
.get_metric_with_label_values(&["ephemeral"])
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static PAGE_CACHE_READ_ACCESSES_IMMUTABLE: Lazy<IntCounter> = Lazy::new(|| {
|
||||
PAGE_CACHE_READ_ACCESSES
|
||||
.get_metric_with_label_values(&["immutable"])
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static PAGE_CACHE_READ_HITS: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_page_cache_read_hits_total",
|
||||
"Number of read accesses to the page cache that hit",
|
||||
&["key_kind", "hit_kind"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub static PAGE_CACHE_READ_HITS_EPHEMERAL: Lazy<IntCounter> = Lazy::new(|| {
|
||||
PAGE_CACHE_READ_HITS
|
||||
.get_metric_with_label_values(&["ephemeral", "-"])
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static PAGE_CACHE_READ_HITS_IMMUTABLE: Lazy<IntCounter> = Lazy::new(|| {
|
||||
PAGE_CACHE_READ_HITS
|
||||
.get_metric_with_label_values(&["immutable", "-"])
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static PAGE_CACHE_READ_HITS_MATERIALIZED_PAGE_EXACT: Lazy<IntCounter> = Lazy::new(|| {
|
||||
PAGE_CACHE_READ_HITS
|
||||
.get_metric_with_label_values(&["materialized_page", "exact"])
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static PAGE_CACHE_READ_HITS_MATERIALIZED_PAGE_OLDER_LSN: Lazy<IntCounter> = Lazy::new(|| {
|
||||
PAGE_CACHE_READ_HITS
|
||||
.get_metric_with_label_values(&["materialized_page", "older_lsn"])
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
static WAIT_LSN_TIME: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"pageserver_wait_lsn_seconds",
|
||||
@@ -614,6 +674,79 @@ pub static WALRECEIVER_CANDIDATES_ADDED: Lazy<IntCounter> =
|
||||
pub static WALRECEIVER_CANDIDATES_REMOVED: Lazy<IntCounter> =
|
||||
Lazy::new(|| WALRECEIVER_CANDIDATES_EVENTS.with_label_values(&["remove"]));
|
||||
|
||||
pub static LAYER_GET_VALUE_RECONSTRUCT_DATA_SPAWN_BLOCKING_STARTED_COUNT: Lazy<IntCounter> =
|
||||
Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_layer_get_value_reconstruct_data_spawn_blocking_started_count",
|
||||
"Number of spawn_blocking calls made in Layer::get_value_reconstruct_data"
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub static LAYER_GET_VALUE_RECONSTRUCT_DATA_SPAWN_BLOCKING_ACTIVE_GAUGE: Lazy<IntGauge> =
|
||||
Lazy::new(|| {
|
||||
register_int_gauge!(
|
||||
"pageserver_layer_get_value_reconstruct_data_spawn_blocking_active_gauge",
|
||||
"Number of spawn_blocking calls active in Layer::get_value_reconstruct_data"
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub static LAYER_GET_VALUE_RECONSTRUCT_DATA_SPAWN_BLOCKING_QUEUE_DELAY: Lazy<Histogram> = Lazy::new(
|
||||
|| {
|
||||
register_histogram!(
|
||||
"pageserver_layer_get_value_reconstruct_data_spawn_blocking_queue_delay_seconds",
|
||||
"Time a Layer::get_value_reconstruct_data call spends in spawn_blocking queue until the first line of blockign code runs inside spawn_blocking",
|
||||
vec![
|
||||
0.000_005, 0.000_010, 0.000_025, 0.000_050, 0.000_100, 0.000_250, 0.000_500, 0.001_000,
|
||||
0.002_500, 0.005_000, 0.010_000, 0.025_000, 0.050_000, 0.100_000, 0.250_000, 0.500_000,
|
||||
1.000_000, 2.000_000, 5.000_000, 10.000_000, 25.000_000, 50.000_000, 100.000_000,
|
||||
],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
},
|
||||
);
|
||||
|
||||
pub static LAYER_GET_VALUE_RECONSTRUCT_DATA_COMPLETION_TIME: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"pageserver_layer_get_value_reconstruct_data_completion_time_seconds",
|
||||
"Time a Layer::get_value_reconstruct_data call takes to complete",
|
||||
&["result"],
|
||||
vec![
|
||||
0.000_005,
|
||||
0.000_010,
|
||||
0.000_025,
|
||||
0.000_050,
|
||||
0.000_100,
|
||||
0.000_250,
|
||||
0.000_500,
|
||||
0.001_000,
|
||||
0.002_500,
|
||||
0.005_000,
|
||||
0.010_000,
|
||||
0.025_000,
|
||||
0.050_000,
|
||||
0.100_000,
|
||||
0.250_000,
|
||||
0.500_000,
|
||||
1.000_000,
|
||||
2.000_000,
|
||||
5.000_000,
|
||||
10.000_000,
|
||||
25.000_000,
|
||||
50.000_000,
|
||||
100.000_000,
|
||||
]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub static LAYER_GET_VALUE_RECONSTRUCT_DATA_COMPLETION_TIME_OK: Lazy<Histogram> =
|
||||
Lazy::new(|| LAYER_GET_VALUE_RECONSTRUCT_DATA_COMPLETION_TIME.with_label_values(&["ok"]));
|
||||
|
||||
pub static LAYER_GET_VALUE_RECONSTRUCT_DATA_COMPLETION_TIME_ERROR: Lazy<Histogram> =
|
||||
Lazy::new(|| LAYER_GET_VALUE_RECONSTRUCT_DATA_COMPLETION_TIME.with_label_values(&["error"]));
|
||||
|
||||
// Metrics collected on WAL redo operations
|
||||
//
|
||||
// We collect the time spent in actual WAL redo ('redo'), and time waiting
|
||||
|
||||
@@ -313,6 +313,8 @@ impl PageCache {
|
||||
key: &Key,
|
||||
lsn: Lsn,
|
||||
) -> Option<(Lsn, PageReadGuard)> {
|
||||
crate::metrics::PAGE_CACHE_READ_ACCESSES_MATERIALIZED_PAGE.inc();
|
||||
|
||||
let mut cache_key = CacheKey::MaterializedPage {
|
||||
hash_key: MaterializedPageHashKey {
|
||||
tenant_id,
|
||||
@@ -323,8 +325,17 @@ impl PageCache {
|
||||
};
|
||||
|
||||
if let Some(guard) = self.try_lock_for_read(&mut cache_key) {
|
||||
if let CacheKey::MaterializedPage { hash_key: _, lsn } = cache_key {
|
||||
Some((lsn, guard))
|
||||
if let CacheKey::MaterializedPage {
|
||||
hash_key: _,
|
||||
lsn: available_lsn,
|
||||
} = cache_key
|
||||
{
|
||||
if available_lsn == lsn {
|
||||
crate::metrics::PAGE_CACHE_READ_HITS_MATERIALIZED_PAGE_EXACT.inc();
|
||||
} else {
|
||||
crate::metrics::PAGE_CACHE_READ_HITS_MATERIALIZED_PAGE_OLDER_LSN.inc();
|
||||
}
|
||||
Some((available_lsn, guard))
|
||||
} else {
|
||||
panic!("unexpected key type in slot");
|
||||
}
|
||||
@@ -499,11 +510,31 @@ impl PageCache {
|
||||
/// ```
|
||||
///
|
||||
fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result<ReadBufResult> {
|
||||
let (read_access, hit) = match cache_key {
|
||||
CacheKey::MaterializedPage { .. } => {
|
||||
unreachable!("Materialized pages use lookup_materialized_page")
|
||||
}
|
||||
CacheKey::EphemeralPage { .. } => (
|
||||
&crate::metrics::PAGE_CACHE_READ_ACCESSES_EPHEMERAL,
|
||||
&crate::metrics::PAGE_CACHE_READ_HITS_EPHEMERAL,
|
||||
),
|
||||
CacheKey::ImmutableFilePage { .. } => (
|
||||
&crate::metrics::PAGE_CACHE_READ_ACCESSES_IMMUTABLE,
|
||||
&crate::metrics::PAGE_CACHE_READ_HITS_IMMUTABLE,
|
||||
),
|
||||
};
|
||||
read_access.inc();
|
||||
|
||||
let mut is_first_iteration = true;
|
||||
loop {
|
||||
// First check if the key already exists in the cache.
|
||||
if let Some(read_guard) = self.try_lock_for_read(cache_key) {
|
||||
if is_first_iteration {
|
||||
hit.inc();
|
||||
}
|
||||
return Ok(ReadBufResult::Found(read_guard));
|
||||
}
|
||||
is_first_iteration = false;
|
||||
|
||||
// Not found. Find a victim buffer
|
||||
let (slot_idx, mut inner) =
|
||||
|
||||
@@ -102,11 +102,33 @@ use crate::shutdown_pageserver;
|
||||
// It's also good to avoid hogging all threads that would be needed to process
|
||||
// other operations, if the upload tasks e.g. get blocked on locks. It shouldn't
|
||||
// happen, but still.
|
||||
//
|
||||
|
||||
static PAGESERVER_TOKIO_MAX_BLOCKING_THREADS_OVERRIDE: Lazy<Option<usize>> = Lazy::new(|| {
|
||||
let env_var: String = match std::env::var("PAGESERVER_TOKIO_MAX_BLOCKING_THREADS") {
|
||||
Ok(v) => v,
|
||||
Err(std::env::VarError::NotPresent) => {
|
||||
debug!("env var PAGESERVER_TOKIO_MAX_BLOCKING_THREADS not set, using default");
|
||||
return None;
|
||||
}
|
||||
Err(std::env::VarError::NotUnicode(_)) => {
|
||||
panic!("env var PAGESERVER_TOKIO_MAX_BLOCKING_THREADS is not valid UTF-8");
|
||||
}
|
||||
};
|
||||
let pool_size = match env_var.parse() {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
panic!("Failed to parse PAGESERVER_TOKIO_MAX_BLOCKING_THREADS: {e:?}");
|
||||
}
|
||||
};
|
||||
eprintln!("using spawn_blocking pool size override from env var PAGESERVER_TOKIO_MAX_BLOCKING_THREADS: {pool_size:?}");
|
||||
Some(pool_size)
|
||||
});
|
||||
|
||||
pub static COMPUTE_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("compute request worker")
|
||||
.enable_all()
|
||||
.max_blocking_threads((*PAGESERVER_TOKIO_MAX_BLOCKING_THREADS_OVERRIDE).unwrap_or(512))
|
||||
.build()
|
||||
.expect("Failed to create compute request runtime")
|
||||
});
|
||||
@@ -115,6 +137,7 @@ pub static MGMT_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("mgmt request worker")
|
||||
.enable_all()
|
||||
.max_blocking_threads((*PAGESERVER_TOKIO_MAX_BLOCKING_THREADS_OVERRIDE).unwrap_or(512))
|
||||
.build()
|
||||
.expect("Failed to create mgmt request runtime")
|
||||
});
|
||||
@@ -123,6 +146,7 @@ pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("walreceiver worker")
|
||||
.enable_all()
|
||||
.max_blocking_threads((*PAGESERVER_TOKIO_MAX_BLOCKING_THREADS_OVERRIDE).unwrap_or(512))
|
||||
.build()
|
||||
.expect("Failed to create walreceiver runtime")
|
||||
});
|
||||
@@ -131,6 +155,7 @@ pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("background op worker")
|
||||
.enable_all()
|
||||
.max_blocking_threads((*PAGESERVER_TOKIO_MAX_BLOCKING_THREADS_OVERRIDE).unwrap_or(512))
|
||||
.build()
|
||||
.expect("Failed to create background op runtime")
|
||||
});
|
||||
|
||||
@@ -12,7 +12,7 @@ use crate::context::RequestContext;
|
||||
use crate::repository::{Key, Value};
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use anyhow::Result;
|
||||
use anyhow::{Context, Result};
|
||||
use bytes::Bytes;
|
||||
use enum_map::EnumMap;
|
||||
use enumset::EnumSet;
|
||||
@@ -24,7 +24,7 @@ use pageserver_api::models::{
|
||||
use std::ops::Range;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
||||
use tracing::warn;
|
||||
use utils::history_buffer::HistoryBufferWithDropCounter;
|
||||
use utils::rate_limit::RateLimit;
|
||||
@@ -335,7 +335,8 @@ impl LayerAccessStats {
|
||||
/// All layers should implement a minimal `std::fmt::Debug` without tenant or
|
||||
/// timeline names, because those are known in the context of which the layers
|
||||
/// are used in (timeline).
|
||||
pub trait Layer: std::fmt::Debug + Send + Sync {
|
||||
#[async_trait::async_trait]
|
||||
pub trait Layer: std::fmt::Debug + Send + Sync + 'static {
|
||||
/// Range of keys that this layer covers
|
||||
fn get_key_range(&self) -> Range<Key>;
|
||||
|
||||
@@ -365,13 +366,74 @@ pub trait Layer: std::fmt::Debug + Send + Sync {
|
||||
/// is available. If this returns ValueReconstructResult::Continue, look up
|
||||
/// the predecessor layer and call again with the same 'reconstruct_data' to
|
||||
/// collect more data.
|
||||
fn get_value_reconstruct_data(
|
||||
fn get_value_reconstruct_data_blocking(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_data: &mut ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<ValueReconstructResult>;
|
||||
reconstruct_data: ValueReconstructState,
|
||||
ctx: RequestContext,
|
||||
) -> Result<(ValueReconstructState, ValueReconstructResult)>;
|
||||
|
||||
/// CANCEL SAFETY: if the returned future is dropped,
|
||||
/// the wrapped closure still run to completion and the return value discarded.
|
||||
/// For the case of get_value_reconstruct_data, we expect the closure to not
|
||||
/// have any side effects, as it only attempts to read a layer (and stuff like
|
||||
/// page cache isn't considered a real side effect).
|
||||
/// But, ...
|
||||
/// TRACING:
|
||||
/// If the returned future is cancelled, the spawn_blocking span can outlive
|
||||
/// the caller's span.
|
||||
/// So, technically, we should be using `parent: None` and `follows_from: current`
|
||||
/// instead. However, in practice, the advantage of maintaining the span stack
|
||||
/// in logs outweighs the disadvantage of having a dangling span in a case that
|
||||
/// is not expected to happen because in pageserver we generally don't drop pending futures.
|
||||
async fn get_value_reconstruct_data(
|
||||
self: Arc<Self>,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_data: ValueReconstructState,
|
||||
ctx: RequestContext,
|
||||
) -> Result<(ValueReconstructState, ValueReconstructResult)> {
|
||||
let span = tracing::info_span!("get_value_reconstruct_data_spawn_blocking");
|
||||
static USE_SPAWN_BLOCKING: Lazy<bool> = Lazy::new(|| {
|
||||
let val = std::env::var("PAGESERVER_LAYER_GET_RECONSTRUCT_DATA_USE_SPAWN_BLOCKING")
|
||||
.map(|s| s == "1")
|
||||
.unwrap_or(false);
|
||||
tracing::info!("PAGESERVER_LAYER_GET_RECONSTRUCT_DATA_USE_SPAWN_BLOCKING={val}");
|
||||
val
|
||||
});
|
||||
let use_spawn_blocking = *USE_SPAWN_BLOCKING;
|
||||
let start = Instant::now();
|
||||
let res = if !use_spawn_blocking {
|
||||
anyhow::Ok(self.get_value_reconstruct_data_blocking(
|
||||
key,
|
||||
lsn_range,
|
||||
reconstruct_data,
|
||||
ctx,
|
||||
))
|
||||
} else {
|
||||
crate::metrics::LAYER_GET_VALUE_RECONSTRUCT_DATA_SPAWN_BLOCKING_STARTED_COUNT.inc();
|
||||
crate::metrics::LAYER_GET_VALUE_RECONSTRUCT_DATA_SPAWN_BLOCKING_ACTIVE_GAUGE.inc();
|
||||
let res = tokio::task::spawn_blocking(move || {
|
||||
crate::metrics::LAYER_GET_VALUE_RECONSTRUCT_DATA_SPAWN_BLOCKING_QUEUE_DELAY
|
||||
.observe(start.elapsed().as_secs_f64());
|
||||
let _enter = span.enter();
|
||||
self.get_value_reconstruct_data_blocking(key, lsn_range, reconstruct_data, ctx)
|
||||
})
|
||||
.await
|
||||
.context("spawn_blocking");
|
||||
crate::metrics::LAYER_GET_VALUE_RECONSTRUCT_DATA_SPAWN_BLOCKING_ACTIVE_GAUGE.dec();
|
||||
res
|
||||
};
|
||||
let histo = match &res {
|
||||
Ok(Ok(_)) => &crate::metrics::LAYER_GET_VALUE_RECONSTRUCT_DATA_COMPLETION_TIME_OK,
|
||||
Ok(Err(_)) | Err(_) => {
|
||||
&crate::metrics::LAYER_GET_VALUE_RECONSTRUCT_DATA_COMPLETION_TIME_ERROR
|
||||
}
|
||||
};
|
||||
histo.observe(start.elapsed().as_secs_f64());
|
||||
res?
|
||||
}
|
||||
|
||||
/// A short ID string that uniquely identifies the given layer within a [`LayerMap`].
|
||||
fn short_id(&self) -> String;
|
||||
@@ -483,17 +545,8 @@ pub mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Layer for LayerDescriptor {
|
||||
fn get_value_reconstruct_data(
|
||||
&self,
|
||||
_key: Key,
|
||||
_lsn_range: Range<Lsn>,
|
||||
_reconstruct_data: &mut ValueReconstructState,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<ValueReconstructResult> {
|
||||
todo!("This method shouldn't be part of the Layer trait")
|
||||
}
|
||||
|
||||
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||
todo!()
|
||||
}
|
||||
@@ -508,6 +561,16 @@ pub mod tests {
|
||||
self.layer_desc().lsn_range.clone()
|
||||
}
|
||||
|
||||
fn get_value_reconstruct_data_blocking(
|
||||
&self,
|
||||
_key: Key,
|
||||
_lsn_range: Range<Lsn>,
|
||||
_reconstruct_data: ValueReconstructState,
|
||||
_ctx: RequestContext,
|
||||
) -> Result<(ValueReconstructState, ValueReconstructResult)> {
|
||||
todo!("This method shouldn't be part of the Layer trait")
|
||||
}
|
||||
|
||||
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
|
||||
fn is_incremental(&self) -> bool {
|
||||
self.layer_desc().is_incremental
|
||||
|
||||
@@ -218,6 +218,7 @@ impl std::fmt::Debug for DeltaLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Layer for DeltaLayer {
|
||||
/// debugging function to print out the contents of the layer
|
||||
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
@@ -294,13 +295,13 @@ impl Layer for DeltaLayer {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_value_reconstruct_data(
|
||||
fn get_value_reconstruct_data_blocking(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
mut reconstruct_state: ValueReconstructState,
|
||||
ctx: RequestContext,
|
||||
) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> {
|
||||
ensure!(lsn_range.start >= self.desc.lsn_range.start);
|
||||
let mut need_image = true;
|
||||
|
||||
@@ -308,7 +309,7 @@ impl Layer for DeltaLayer {
|
||||
|
||||
{
|
||||
// Open the file and lock the metadata in memory
|
||||
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
|
||||
let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;
|
||||
|
||||
// Scan the page versions backwards, starting from `lsn`.
|
||||
let file = &inner.file;
|
||||
@@ -374,9 +375,9 @@ impl Layer for DeltaLayer {
|
||||
// If an older page image is needed to reconstruct the page, let the
|
||||
// caller know.
|
||||
if need_image {
|
||||
Ok(ValueReconstructResult::Continue)
|
||||
Ok((reconstruct_state, ValueReconstructResult::Continue))
|
||||
} else {
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
Ok((reconstruct_state, ValueReconstructResult::Complete))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -149,6 +149,7 @@ impl std::fmt::Debug for ImageLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Layer for ImageLayer {
|
||||
/// debugging function to print out the contents of the layer
|
||||
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
@@ -181,18 +182,18 @@ impl Layer for ImageLayer {
|
||||
}
|
||||
|
||||
/// Look up given page in the file
|
||||
fn get_value_reconstruct_data(
|
||||
fn get_value_reconstruct_data_blocking(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
mut reconstruct_state: ValueReconstructState,
|
||||
ctx: RequestContext,
|
||||
) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> {
|
||||
assert!(self.desc.key_range.contains(&key));
|
||||
assert!(lsn_range.start >= self.lsn);
|
||||
assert!(lsn_range.end >= self.lsn);
|
||||
|
||||
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
|
||||
let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;
|
||||
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
|
||||
@@ -210,9 +211,9 @@ impl Layer for ImageLayer {
|
||||
let value = Bytes::from(blob);
|
||||
|
||||
reconstruct_state.img = Some((self.lsn, value));
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
Ok((reconstruct_state, ValueReconstructResult::Complete))
|
||||
} else {
|
||||
Ok(ValueReconstructResult::Missing)
|
||||
Ok((reconstruct_state, ValueReconstructResult::Missing))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -110,6 +110,7 @@ impl InMemoryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Layer for InMemoryLayer {
|
||||
fn get_key_range(&self) -> Range<Key> {
|
||||
Key::MIN..Key::MAX
|
||||
@@ -190,13 +191,13 @@ impl Layer for InMemoryLayer {
|
||||
}
|
||||
|
||||
/// Look up given value in the layer.
|
||||
fn get_value_reconstruct_data(
|
||||
fn get_value_reconstruct_data_blocking(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
_ctx: &RequestContext,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
mut reconstruct_state: ValueReconstructState,
|
||||
_ctx: RequestContext,
|
||||
) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> {
|
||||
ensure!(lsn_range.start >= self.start_lsn);
|
||||
let mut need_image = true;
|
||||
|
||||
@@ -213,7 +214,7 @@ impl Layer for InMemoryLayer {
|
||||
match value {
|
||||
Value::Image(img) => {
|
||||
reconstruct_state.img = Some((*entry_lsn, img));
|
||||
return Ok(ValueReconstructResult::Complete);
|
||||
return Ok((reconstruct_state, ValueReconstructResult::Complete));
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let will_init = rec.will_init();
|
||||
@@ -233,9 +234,9 @@ impl Layer for InMemoryLayer {
|
||||
// If an older page image is needed to reconstruct the page, let the
|
||||
// caller know.
|
||||
if need_image {
|
||||
Ok(ValueReconstructResult::Continue)
|
||||
Ok((reconstruct_state, ValueReconstructResult::Continue))
|
||||
} else {
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
Ok((reconstruct_state, ValueReconstructResult::Complete))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ use crate::context::RequestContext;
|
||||
use crate::repository::Key;
|
||||
use crate::tenant::layer_map::BatchedUpdates;
|
||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
|
||||
use crate::tenant::storage_layer::{Layer, ValueReconstructState};
|
||||
use anyhow::{bail, Result};
|
||||
use pageserver_api::models::HistoricLayerInfo;
|
||||
use std::ops::Range;
|
||||
@@ -21,7 +21,7 @@ use utils::{
|
||||
use super::filename::{DeltaFileName, ImageFileName};
|
||||
use super::{
|
||||
DeltaLayer, ImageLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter, LayerKeyIter,
|
||||
LayerResidenceStatus, PersistentLayer, PersistentLayerDesc,
|
||||
LayerResidenceStatus, PersistentLayer, PersistentLayerDesc, ValueReconstructResult,
|
||||
};
|
||||
|
||||
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
|
||||
@@ -63,14 +63,15 @@ impl std::fmt::Debug for RemoteLayer {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Layer for RemoteLayer {
|
||||
fn get_value_reconstruct_data(
|
||||
fn get_value_reconstruct_data_blocking(
|
||||
&self,
|
||||
_key: Key,
|
||||
_lsn_range: Range<Lsn>,
|
||||
_reconstruct_state: &mut ValueReconstructState,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<ValueReconstructResult> {
|
||||
_reconstruct_state: ValueReconstructState,
|
||||
_ctx: RequestContext,
|
||||
) -> Result<(ValueReconstructState, ValueReconstructResult)> {
|
||||
bail!(
|
||||
"layer {} needs to be downloaded",
|
||||
self.filename().file_name()
|
||||
|
||||
@@ -8,7 +8,7 @@ use bytes::Bytes;
|
||||
use fail::fail_point;
|
||||
use futures::StreamExt;
|
||||
use itertools::Itertools;
|
||||
use once_cell::sync::OnceCell;
|
||||
use once_cell::sync::{Lazy, OnceCell};
|
||||
use pageserver_api::models::{
|
||||
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
|
||||
DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceEventReason, LayerResidenceStatus,
|
||||
@@ -660,16 +660,33 @@ impl Timeline {
|
||||
None => None,
|
||||
};
|
||||
|
||||
let mut reconstruct_state = ValueReconstructState {
|
||||
let reconstruct_state = ValueReconstructState {
|
||||
records: Vec::new(),
|
||||
img: cached_page_img,
|
||||
};
|
||||
|
||||
static GET_RECONSTRUCT_DATA_CONCURRENCY: Lazy<Option<usize>> = Lazy::new(|| {
|
||||
std::env::var("PAGESERVER_TIMELINE_GET_RECONSTRUCT_DATA_CONCURRENCY_LIMIT")
|
||||
.ok()
|
||||
.and_then(|s| s.parse().ok())
|
||||
});
|
||||
static GET_RECONSTRUCT_DATA_SEMAPHORE: Lazy<Option<Semaphore>> =
|
||||
Lazy::new(|| (*GET_RECONSTRUCT_DATA_CONCURRENCY).map(Semaphore::new));
|
||||
|
||||
let permit = if let Some(sem) = GET_RECONSTRUCT_DATA_SEMAPHORE.as_ref() {
|
||||
Some(sem.acquire().await)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let timer = self.metrics.get_reconstruct_data_time_histo.start_timer();
|
||||
self.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
|
||||
let reconstruct_state = self
|
||||
.get_reconstruct_data(key, lsn, reconstruct_state, ctx)
|
||||
.await?;
|
||||
timer.stop_and_record();
|
||||
|
||||
drop(permit);
|
||||
|
||||
RECONSTRUCT_TIME
|
||||
.observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state))
|
||||
}
|
||||
@@ -2495,9 +2512,9 @@ impl Timeline {
|
||||
&self,
|
||||
key: Key,
|
||||
request_lsn: Lsn,
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
mut reconstruct_state: ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), PageReconstructError> {
|
||||
) -> Result<ValueReconstructState, PageReconstructError> {
|
||||
// Start from the current timeline.
|
||||
let mut timeline_owned;
|
||||
let mut timeline = self;
|
||||
@@ -2527,12 +2544,12 @@ impl Timeline {
|
||||
// The function should have updated 'state'
|
||||
//info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
|
||||
match result {
|
||||
ValueReconstructResult::Complete => return Ok(()),
|
||||
ValueReconstructResult::Complete => return Ok(reconstruct_state),
|
||||
ValueReconstructResult::Continue => {
|
||||
// If we reached an earlier cached page image, we're done.
|
||||
if cont_lsn == cached_lsn + 1 {
|
||||
MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
|
||||
return Ok(());
|
||||
return Ok(reconstruct_state);
|
||||
}
|
||||
if prev_lsn <= cont_lsn {
|
||||
// Didn't make any progress in last iteration. Error out to avoid
|
||||
@@ -2637,13 +2654,19 @@ impl Timeline {
|
||||
// Get all the data needed to reconstruct the page version from this layer.
|
||||
// But if we have an older cached page image, no need to go past that.
|
||||
let lsn_floor = max(cached_lsn + 1, start_lsn);
|
||||
result = match open_layer.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
) {
|
||||
Ok(result) => result,
|
||||
result = match Arc::clone(open_layer)
|
||||
.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx.attached_child(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok((new_reconstruct_state, result)) => {
|
||||
reconstruct_state = new_reconstruct_state;
|
||||
result
|
||||
}
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
};
|
||||
cont_lsn = lsn_floor;
|
||||
@@ -2664,13 +2687,19 @@ impl Timeline {
|
||||
if cont_lsn > start_lsn {
|
||||
//info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
|
||||
let lsn_floor = max(cached_lsn + 1, start_lsn);
|
||||
result = match frozen_layer.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
) {
|
||||
Ok(result) => result,
|
||||
result = match Arc::clone(frozen_layer)
|
||||
.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx.attached_child(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok((new_reconstruct_state, result)) => {
|
||||
reconstruct_state = new_reconstruct_state;
|
||||
result
|
||||
}
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
};
|
||||
cont_lsn = lsn_floor;
|
||||
@@ -2700,13 +2729,19 @@ impl Timeline {
|
||||
// Get all the data needed to reconstruct the page version from this layer.
|
||||
// But if we have an older cached page image, no need to go past that.
|
||||
let lsn_floor = max(cached_lsn + 1, lsn_floor);
|
||||
result = match layer.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
) {
|
||||
Ok(result) => result,
|
||||
result = match Arc::clone(&layer)
|
||||
.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx.attached_child(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok((new_reconstruct_state, result)) => {
|
||||
reconstruct_state = new_reconstruct_state;
|
||||
result
|
||||
}
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
};
|
||||
cont_lsn = lsn_floor;
|
||||
|
||||
Reference in New Issue
Block a user