mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-04 17:00:37 +00:00
Compare commits
10 Commits
RemoteExte
...
problame/s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
190c786310 | ||
|
|
c49ceb807d | ||
|
|
1ea42041dc | ||
|
|
695c05520b | ||
|
|
c128343858 | ||
|
|
2f01d0ed26 | ||
|
|
2f2f3c92c1 | ||
|
|
7ce8ad6d7a | ||
|
|
6a8cd9757b | ||
|
|
73acc17bf8 |
6
Cargo.lock
generated
6
Cargo.lock
generated
@@ -4223,8 +4223,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio"
|
name = "tokio"
|
||||||
version = "1.28.1"
|
version = "1.28.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "git+https://github.com/problame/tokio.git?branch=problame/distinguish-core-and-worker-by-thread-name#d88791686cfc7fc7d010889ad7638d09646b3de7"
|
||||||
checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"autocfg",
|
"autocfg",
|
||||||
"bytes",
|
"bytes",
|
||||||
@@ -4251,8 +4250,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio-macros"
|
name = "tokio-macros"
|
||||||
version = "2.1.0"
|
version = "2.1.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "git+https://github.com/problame/tokio.git?branch=problame/distinguish-core-and-worker-by-thread-name#d88791686cfc7fc7d010889ad7638d09646b3de7"
|
||||||
checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
|
|||||||
@@ -187,6 +187,8 @@ tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", re
|
|||||||
# until async safekeepers patch is merged to the main.
|
# until async safekeepers patch is merged to the main.
|
||||||
sharded-slab = { git = "https://github.com/neondatabase/sharded-slab.git", rev="98d16753ab01c61f0a028de44167307a00efea00" }
|
sharded-slab = { git = "https://github.com/neondatabase/sharded-slab.git", rev="98d16753ab01c61f0a028de44167307a00efea00" }
|
||||||
|
|
||||||
|
tokio = { git = "https://github.com/problame/tokio.git", branch="problame/distinguish-core-and-worker-by-thread-name" }
|
||||||
|
|
||||||
################# Binary contents sections
|
################# Binary contents sections
|
||||||
|
|
||||||
[profile.release]
|
[profile.release]
|
||||||
|
|||||||
@@ -130,6 +130,66 @@ pub static MATERIALIZED_PAGE_CACHE_HIT: Lazy<IntCounter> = Lazy::new(|| {
|
|||||||
.expect("failed to define a metric")
|
.expect("failed to define a metric")
|
||||||
});
|
});
|
||||||
|
|
||||||
|
pub static PAGE_CACHE_READ_ACCESSES: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||||
|
register_int_counter_vec!(
|
||||||
|
"pageserver_page_cache_read_accesses_total",
|
||||||
|
"Number of read accesses to the page cache",
|
||||||
|
&["key_kind"]
|
||||||
|
)
|
||||||
|
.expect("failed to define a metric")
|
||||||
|
});
|
||||||
|
|
||||||
|
pub static PAGE_CACHE_READ_ACCESSES_MATERIALIZED_PAGE: Lazy<IntCounter> = Lazy::new(|| {
|
||||||
|
PAGE_CACHE_READ_ACCESSES
|
||||||
|
.get_metric_with_label_values(&["materialized_page"])
|
||||||
|
.unwrap()
|
||||||
|
});
|
||||||
|
|
||||||
|
pub static PAGE_CACHE_READ_ACCESSES_EPHEMERAL: Lazy<IntCounter> = Lazy::new(|| {
|
||||||
|
PAGE_CACHE_READ_ACCESSES
|
||||||
|
.get_metric_with_label_values(&["ephemeral"])
|
||||||
|
.unwrap()
|
||||||
|
});
|
||||||
|
|
||||||
|
pub static PAGE_CACHE_READ_ACCESSES_IMMUTABLE: Lazy<IntCounter> = Lazy::new(|| {
|
||||||
|
PAGE_CACHE_READ_ACCESSES
|
||||||
|
.get_metric_with_label_values(&["immutable"])
|
||||||
|
.unwrap()
|
||||||
|
});
|
||||||
|
|
||||||
|
pub static PAGE_CACHE_READ_HITS: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||||
|
register_int_counter_vec!(
|
||||||
|
"pageserver_page_cache_read_hits_total",
|
||||||
|
"Number of read accesses to the page cache that hit",
|
||||||
|
&["key_kind", "hit_kind"]
|
||||||
|
)
|
||||||
|
.expect("failed to define a metric")
|
||||||
|
});
|
||||||
|
|
||||||
|
pub static PAGE_CACHE_READ_HITS_EPHEMERAL: Lazy<IntCounter> = Lazy::new(|| {
|
||||||
|
PAGE_CACHE_READ_HITS
|
||||||
|
.get_metric_with_label_values(&["ephemeral", "-"])
|
||||||
|
.unwrap()
|
||||||
|
});
|
||||||
|
|
||||||
|
pub static PAGE_CACHE_READ_HITS_IMMUTABLE: Lazy<IntCounter> = Lazy::new(|| {
|
||||||
|
PAGE_CACHE_READ_HITS
|
||||||
|
.get_metric_with_label_values(&["immutable", "-"])
|
||||||
|
.unwrap()
|
||||||
|
});
|
||||||
|
|
||||||
|
pub static PAGE_CACHE_READ_HITS_MATERIALIZED_PAGE_EXACT: Lazy<IntCounter> = Lazy::new(|| {
|
||||||
|
PAGE_CACHE_READ_HITS
|
||||||
|
.get_metric_with_label_values(&["materialized_page", "exact"])
|
||||||
|
.unwrap()
|
||||||
|
});
|
||||||
|
|
||||||
|
pub static PAGE_CACHE_READ_HITS_MATERIALIZED_PAGE_OLDER_LSN: Lazy<IntCounter> = Lazy::new(|| {
|
||||||
|
PAGE_CACHE_READ_HITS
|
||||||
|
.get_metric_with_label_values(&["materialized_page", "older_lsn"])
|
||||||
|
.unwrap()
|
||||||
|
});
|
||||||
|
|
||||||
static WAIT_LSN_TIME: Lazy<HistogramVec> = Lazy::new(|| {
|
static WAIT_LSN_TIME: Lazy<HistogramVec> = Lazy::new(|| {
|
||||||
register_histogram_vec!(
|
register_histogram_vec!(
|
||||||
"pageserver_wait_lsn_seconds",
|
"pageserver_wait_lsn_seconds",
|
||||||
@@ -614,6 +674,79 @@ pub static WALRECEIVER_CANDIDATES_ADDED: Lazy<IntCounter> =
|
|||||||
pub static WALRECEIVER_CANDIDATES_REMOVED: Lazy<IntCounter> =
|
pub static WALRECEIVER_CANDIDATES_REMOVED: Lazy<IntCounter> =
|
||||||
Lazy::new(|| WALRECEIVER_CANDIDATES_EVENTS.with_label_values(&["remove"]));
|
Lazy::new(|| WALRECEIVER_CANDIDATES_EVENTS.with_label_values(&["remove"]));
|
||||||
|
|
||||||
|
pub static LAYER_GET_VALUE_RECONSTRUCT_DATA_SPAWN_BLOCKING_STARTED_COUNT: Lazy<IntCounter> =
|
||||||
|
Lazy::new(|| {
|
||||||
|
register_int_counter!(
|
||||||
|
"pageserver_layer_get_value_reconstruct_data_spawn_blocking_started_count",
|
||||||
|
"Number of spawn_blocking calls made in Layer::get_value_reconstruct_data"
|
||||||
|
)
|
||||||
|
.expect("failed to define a metric")
|
||||||
|
});
|
||||||
|
|
||||||
|
pub static LAYER_GET_VALUE_RECONSTRUCT_DATA_SPAWN_BLOCKING_ACTIVE_GAUGE: Lazy<IntGauge> =
|
||||||
|
Lazy::new(|| {
|
||||||
|
register_int_gauge!(
|
||||||
|
"pageserver_layer_get_value_reconstruct_data_spawn_blocking_active_gauge",
|
||||||
|
"Number of spawn_blocking calls active in Layer::get_value_reconstruct_data"
|
||||||
|
)
|
||||||
|
.expect("failed to define a metric")
|
||||||
|
});
|
||||||
|
|
||||||
|
pub static LAYER_GET_VALUE_RECONSTRUCT_DATA_SPAWN_BLOCKING_QUEUE_DELAY: Lazy<Histogram> = Lazy::new(
|
||||||
|
|| {
|
||||||
|
register_histogram!(
|
||||||
|
"pageserver_layer_get_value_reconstruct_data_spawn_blocking_queue_delay_seconds",
|
||||||
|
"Time a Layer::get_value_reconstruct_data call spends in spawn_blocking queue until the first line of blockign code runs inside spawn_blocking",
|
||||||
|
vec![
|
||||||
|
0.000_005, 0.000_010, 0.000_025, 0.000_050, 0.000_100, 0.000_250, 0.000_500, 0.001_000,
|
||||||
|
0.002_500, 0.005_000, 0.010_000, 0.025_000, 0.050_000, 0.100_000, 0.250_000, 0.500_000,
|
||||||
|
1.000_000, 2.000_000, 5.000_000, 10.000_000, 25.000_000, 50.000_000, 100.000_000,
|
||||||
|
],
|
||||||
|
)
|
||||||
|
.expect("failed to define a metric")
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
pub static LAYER_GET_VALUE_RECONSTRUCT_DATA_COMPLETION_TIME: Lazy<HistogramVec> = Lazy::new(|| {
|
||||||
|
register_histogram_vec!(
|
||||||
|
"pageserver_layer_get_value_reconstruct_data_completion_time_seconds",
|
||||||
|
"Time a Layer::get_value_reconstruct_data call takes to complete",
|
||||||
|
&["result"],
|
||||||
|
vec![
|
||||||
|
0.000_005,
|
||||||
|
0.000_010,
|
||||||
|
0.000_025,
|
||||||
|
0.000_050,
|
||||||
|
0.000_100,
|
||||||
|
0.000_250,
|
||||||
|
0.000_500,
|
||||||
|
0.001_000,
|
||||||
|
0.002_500,
|
||||||
|
0.005_000,
|
||||||
|
0.010_000,
|
||||||
|
0.025_000,
|
||||||
|
0.050_000,
|
||||||
|
0.100_000,
|
||||||
|
0.250_000,
|
||||||
|
0.500_000,
|
||||||
|
1.000_000,
|
||||||
|
2.000_000,
|
||||||
|
5.000_000,
|
||||||
|
10.000_000,
|
||||||
|
25.000_000,
|
||||||
|
50.000_000,
|
||||||
|
100.000_000,
|
||||||
|
]
|
||||||
|
)
|
||||||
|
.expect("failed to define a metric")
|
||||||
|
});
|
||||||
|
|
||||||
|
pub static LAYER_GET_VALUE_RECONSTRUCT_DATA_COMPLETION_TIME_OK: Lazy<Histogram> =
|
||||||
|
Lazy::new(|| LAYER_GET_VALUE_RECONSTRUCT_DATA_COMPLETION_TIME.with_label_values(&["ok"]));
|
||||||
|
|
||||||
|
pub static LAYER_GET_VALUE_RECONSTRUCT_DATA_COMPLETION_TIME_ERROR: Lazy<Histogram> =
|
||||||
|
Lazy::new(|| LAYER_GET_VALUE_RECONSTRUCT_DATA_COMPLETION_TIME.with_label_values(&["error"]));
|
||||||
|
|
||||||
// Metrics collected on WAL redo operations
|
// Metrics collected on WAL redo operations
|
||||||
//
|
//
|
||||||
// We collect the time spent in actual WAL redo ('redo'), and time waiting
|
// We collect the time spent in actual WAL redo ('redo'), and time waiting
|
||||||
|
|||||||
@@ -313,6 +313,8 @@ impl PageCache {
|
|||||||
key: &Key,
|
key: &Key,
|
||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
) -> Option<(Lsn, PageReadGuard)> {
|
) -> Option<(Lsn, PageReadGuard)> {
|
||||||
|
crate::metrics::PAGE_CACHE_READ_ACCESSES_MATERIALIZED_PAGE.inc();
|
||||||
|
|
||||||
let mut cache_key = CacheKey::MaterializedPage {
|
let mut cache_key = CacheKey::MaterializedPage {
|
||||||
hash_key: MaterializedPageHashKey {
|
hash_key: MaterializedPageHashKey {
|
||||||
tenant_id,
|
tenant_id,
|
||||||
@@ -323,8 +325,17 @@ impl PageCache {
|
|||||||
};
|
};
|
||||||
|
|
||||||
if let Some(guard) = self.try_lock_for_read(&mut cache_key) {
|
if let Some(guard) = self.try_lock_for_read(&mut cache_key) {
|
||||||
if let CacheKey::MaterializedPage { hash_key: _, lsn } = cache_key {
|
if let CacheKey::MaterializedPage {
|
||||||
Some((lsn, guard))
|
hash_key: _,
|
||||||
|
lsn: available_lsn,
|
||||||
|
} = cache_key
|
||||||
|
{
|
||||||
|
if available_lsn == lsn {
|
||||||
|
crate::metrics::PAGE_CACHE_READ_HITS_MATERIALIZED_PAGE_EXACT.inc();
|
||||||
|
} else {
|
||||||
|
crate::metrics::PAGE_CACHE_READ_HITS_MATERIALIZED_PAGE_OLDER_LSN.inc();
|
||||||
|
}
|
||||||
|
Some((available_lsn, guard))
|
||||||
} else {
|
} else {
|
||||||
panic!("unexpected key type in slot");
|
panic!("unexpected key type in slot");
|
||||||
}
|
}
|
||||||
@@ -499,11 +510,31 @@ impl PageCache {
|
|||||||
/// ```
|
/// ```
|
||||||
///
|
///
|
||||||
fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result<ReadBufResult> {
|
fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result<ReadBufResult> {
|
||||||
|
let (read_access, hit) = match cache_key {
|
||||||
|
CacheKey::MaterializedPage { .. } => {
|
||||||
|
unreachable!("Materialized pages use lookup_materialized_page")
|
||||||
|
}
|
||||||
|
CacheKey::EphemeralPage { .. } => (
|
||||||
|
&crate::metrics::PAGE_CACHE_READ_ACCESSES_EPHEMERAL,
|
||||||
|
&crate::metrics::PAGE_CACHE_READ_HITS_EPHEMERAL,
|
||||||
|
),
|
||||||
|
CacheKey::ImmutableFilePage { .. } => (
|
||||||
|
&crate::metrics::PAGE_CACHE_READ_ACCESSES_IMMUTABLE,
|
||||||
|
&crate::metrics::PAGE_CACHE_READ_HITS_IMMUTABLE,
|
||||||
|
),
|
||||||
|
};
|
||||||
|
read_access.inc();
|
||||||
|
|
||||||
|
let mut is_first_iteration = true;
|
||||||
loop {
|
loop {
|
||||||
// First check if the key already exists in the cache.
|
// First check if the key already exists in the cache.
|
||||||
if let Some(read_guard) = self.try_lock_for_read(cache_key) {
|
if let Some(read_guard) = self.try_lock_for_read(cache_key) {
|
||||||
|
if is_first_iteration {
|
||||||
|
hit.inc();
|
||||||
|
}
|
||||||
return Ok(ReadBufResult::Found(read_guard));
|
return Ok(ReadBufResult::Found(read_guard));
|
||||||
}
|
}
|
||||||
|
is_first_iteration = false;
|
||||||
|
|
||||||
// Not found. Find a victim buffer
|
// Not found. Find a victim buffer
|
||||||
let (slot_idx, mut inner) =
|
let (slot_idx, mut inner) =
|
||||||
|
|||||||
@@ -102,11 +102,33 @@ use crate::shutdown_pageserver;
|
|||||||
// It's also good to avoid hogging all threads that would be needed to process
|
// It's also good to avoid hogging all threads that would be needed to process
|
||||||
// other operations, if the upload tasks e.g. get blocked on locks. It shouldn't
|
// other operations, if the upload tasks e.g. get blocked on locks. It shouldn't
|
||||||
// happen, but still.
|
// happen, but still.
|
||||||
//
|
|
||||||
|
static PAGESERVER_TOKIO_MAX_BLOCKING_THREADS_OVERRIDE: Lazy<Option<usize>> = Lazy::new(|| {
|
||||||
|
let env_var: String = match std::env::var("PAGESERVER_TOKIO_MAX_BLOCKING_THREADS") {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(std::env::VarError::NotPresent) => {
|
||||||
|
debug!("env var PAGESERVER_TOKIO_MAX_BLOCKING_THREADS not set, using default");
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
Err(std::env::VarError::NotUnicode(_)) => {
|
||||||
|
panic!("env var PAGESERVER_TOKIO_MAX_BLOCKING_THREADS is not valid UTF-8");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let pool_size = match env_var.parse() {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(e) => {
|
||||||
|
panic!("Failed to parse PAGESERVER_TOKIO_MAX_BLOCKING_THREADS: {e:?}");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
eprintln!("using spawn_blocking pool size override from env var PAGESERVER_TOKIO_MAX_BLOCKING_THREADS: {pool_size:?}");
|
||||||
|
Some(pool_size)
|
||||||
|
});
|
||||||
|
|
||||||
pub static COMPUTE_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
pub static COMPUTE_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||||
tokio::runtime::Builder::new_multi_thread()
|
tokio::runtime::Builder::new_multi_thread()
|
||||||
.thread_name("compute request worker")
|
.thread_name("compute request worker")
|
||||||
.enable_all()
|
.enable_all()
|
||||||
|
.max_blocking_threads((*PAGESERVER_TOKIO_MAX_BLOCKING_THREADS_OVERRIDE).unwrap_or(512))
|
||||||
.build()
|
.build()
|
||||||
.expect("Failed to create compute request runtime")
|
.expect("Failed to create compute request runtime")
|
||||||
});
|
});
|
||||||
@@ -115,6 +137,7 @@ pub static MGMT_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
|||||||
tokio::runtime::Builder::new_multi_thread()
|
tokio::runtime::Builder::new_multi_thread()
|
||||||
.thread_name("mgmt request worker")
|
.thread_name("mgmt request worker")
|
||||||
.enable_all()
|
.enable_all()
|
||||||
|
.max_blocking_threads((*PAGESERVER_TOKIO_MAX_BLOCKING_THREADS_OVERRIDE).unwrap_or(512))
|
||||||
.build()
|
.build()
|
||||||
.expect("Failed to create mgmt request runtime")
|
.expect("Failed to create mgmt request runtime")
|
||||||
});
|
});
|
||||||
@@ -123,6 +146,7 @@ pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
|||||||
tokio::runtime::Builder::new_multi_thread()
|
tokio::runtime::Builder::new_multi_thread()
|
||||||
.thread_name("walreceiver worker")
|
.thread_name("walreceiver worker")
|
||||||
.enable_all()
|
.enable_all()
|
||||||
|
.max_blocking_threads((*PAGESERVER_TOKIO_MAX_BLOCKING_THREADS_OVERRIDE).unwrap_or(512))
|
||||||
.build()
|
.build()
|
||||||
.expect("Failed to create walreceiver runtime")
|
.expect("Failed to create walreceiver runtime")
|
||||||
});
|
});
|
||||||
@@ -131,6 +155,7 @@ pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
|||||||
tokio::runtime::Builder::new_multi_thread()
|
tokio::runtime::Builder::new_multi_thread()
|
||||||
.thread_name("background op worker")
|
.thread_name("background op worker")
|
||||||
.enable_all()
|
.enable_all()
|
||||||
|
.max_blocking_threads((*PAGESERVER_TOKIO_MAX_BLOCKING_THREADS_OVERRIDE).unwrap_or(512))
|
||||||
.build()
|
.build()
|
||||||
.expect("Failed to create background op runtime")
|
.expect("Failed to create background op runtime")
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ use crate::context::RequestContext;
|
|||||||
use crate::repository::{Key, Value};
|
use crate::repository::{Key, Value};
|
||||||
use crate::task_mgr::TaskKind;
|
use crate::task_mgr::TaskKind;
|
||||||
use crate::walrecord::NeonWalRecord;
|
use crate::walrecord::NeonWalRecord;
|
||||||
use anyhow::Result;
|
use anyhow::{Context, Result};
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use enum_map::EnumMap;
|
use enum_map::EnumMap;
|
||||||
use enumset::EnumSet;
|
use enumset::EnumSet;
|
||||||
@@ -24,7 +24,7 @@ use pageserver_api::models::{
|
|||||||
use std::ops::Range;
|
use std::ops::Range;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
||||||
use tracing::warn;
|
use tracing::warn;
|
||||||
use utils::history_buffer::HistoryBufferWithDropCounter;
|
use utils::history_buffer::HistoryBufferWithDropCounter;
|
||||||
use utils::rate_limit::RateLimit;
|
use utils::rate_limit::RateLimit;
|
||||||
@@ -335,7 +335,8 @@ impl LayerAccessStats {
|
|||||||
/// All layers should implement a minimal `std::fmt::Debug` without tenant or
|
/// All layers should implement a minimal `std::fmt::Debug` without tenant or
|
||||||
/// timeline names, because those are known in the context of which the layers
|
/// timeline names, because those are known in the context of which the layers
|
||||||
/// are used in (timeline).
|
/// are used in (timeline).
|
||||||
pub trait Layer: std::fmt::Debug + Send + Sync {
|
#[async_trait::async_trait]
|
||||||
|
pub trait Layer: std::fmt::Debug + Send + Sync + 'static {
|
||||||
/// Range of keys that this layer covers
|
/// Range of keys that this layer covers
|
||||||
fn get_key_range(&self) -> Range<Key>;
|
fn get_key_range(&self) -> Range<Key>;
|
||||||
|
|
||||||
@@ -365,13 +366,74 @@ pub trait Layer: std::fmt::Debug + Send + Sync {
|
|||||||
/// is available. If this returns ValueReconstructResult::Continue, look up
|
/// is available. If this returns ValueReconstructResult::Continue, look up
|
||||||
/// the predecessor layer and call again with the same 'reconstruct_data' to
|
/// the predecessor layer and call again with the same 'reconstruct_data' to
|
||||||
/// collect more data.
|
/// collect more data.
|
||||||
fn get_value_reconstruct_data(
|
fn get_value_reconstruct_data_blocking(
|
||||||
&self,
|
&self,
|
||||||
key: Key,
|
key: Key,
|
||||||
lsn_range: Range<Lsn>,
|
lsn_range: Range<Lsn>,
|
||||||
reconstruct_data: &mut ValueReconstructState,
|
reconstruct_data: ValueReconstructState,
|
||||||
ctx: &RequestContext,
|
ctx: RequestContext,
|
||||||
) -> Result<ValueReconstructResult>;
|
) -> Result<(ValueReconstructState, ValueReconstructResult)>;
|
||||||
|
|
||||||
|
/// CANCEL SAFETY: if the returned future is dropped,
|
||||||
|
/// the wrapped closure still run to completion and the return value discarded.
|
||||||
|
/// For the case of get_value_reconstruct_data, we expect the closure to not
|
||||||
|
/// have any side effects, as it only attempts to read a layer (and stuff like
|
||||||
|
/// page cache isn't considered a real side effect).
|
||||||
|
/// But, ...
|
||||||
|
/// TRACING:
|
||||||
|
/// If the returned future is cancelled, the spawn_blocking span can outlive
|
||||||
|
/// the caller's span.
|
||||||
|
/// So, technically, we should be using `parent: None` and `follows_from: current`
|
||||||
|
/// instead. However, in practice, the advantage of maintaining the span stack
|
||||||
|
/// in logs outweighs the disadvantage of having a dangling span in a case that
|
||||||
|
/// is not expected to happen because in pageserver we generally don't drop pending futures.
|
||||||
|
async fn get_value_reconstruct_data(
|
||||||
|
self: Arc<Self>,
|
||||||
|
key: Key,
|
||||||
|
lsn_range: Range<Lsn>,
|
||||||
|
reconstruct_data: ValueReconstructState,
|
||||||
|
ctx: RequestContext,
|
||||||
|
) -> Result<(ValueReconstructState, ValueReconstructResult)> {
|
||||||
|
let span = tracing::info_span!("get_value_reconstruct_data_spawn_blocking");
|
||||||
|
static USE_SPAWN_BLOCKING: Lazy<bool> = Lazy::new(|| {
|
||||||
|
let val = std::env::var("PAGESERVER_LAYER_GET_RECONSTRUCT_DATA_USE_SPAWN_BLOCKING")
|
||||||
|
.map(|s| s == "1")
|
||||||
|
.unwrap_or(false);
|
||||||
|
tracing::info!("PAGESERVER_LAYER_GET_RECONSTRUCT_DATA_USE_SPAWN_BLOCKING={val}");
|
||||||
|
val
|
||||||
|
});
|
||||||
|
let use_spawn_blocking = *USE_SPAWN_BLOCKING;
|
||||||
|
let start = Instant::now();
|
||||||
|
let res = if !use_spawn_blocking {
|
||||||
|
anyhow::Ok(self.get_value_reconstruct_data_blocking(
|
||||||
|
key,
|
||||||
|
lsn_range,
|
||||||
|
reconstruct_data,
|
||||||
|
ctx,
|
||||||
|
))
|
||||||
|
} else {
|
||||||
|
crate::metrics::LAYER_GET_VALUE_RECONSTRUCT_DATA_SPAWN_BLOCKING_STARTED_COUNT.inc();
|
||||||
|
crate::metrics::LAYER_GET_VALUE_RECONSTRUCT_DATA_SPAWN_BLOCKING_ACTIVE_GAUGE.inc();
|
||||||
|
let res = tokio::task::spawn_blocking(move || {
|
||||||
|
crate::metrics::LAYER_GET_VALUE_RECONSTRUCT_DATA_SPAWN_BLOCKING_QUEUE_DELAY
|
||||||
|
.observe(start.elapsed().as_secs_f64());
|
||||||
|
let _enter = span.enter();
|
||||||
|
self.get_value_reconstruct_data_blocking(key, lsn_range, reconstruct_data, ctx)
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.context("spawn_blocking");
|
||||||
|
crate::metrics::LAYER_GET_VALUE_RECONSTRUCT_DATA_SPAWN_BLOCKING_ACTIVE_GAUGE.dec();
|
||||||
|
res
|
||||||
|
};
|
||||||
|
let histo = match &res {
|
||||||
|
Ok(Ok(_)) => &crate::metrics::LAYER_GET_VALUE_RECONSTRUCT_DATA_COMPLETION_TIME_OK,
|
||||||
|
Ok(Err(_)) | Err(_) => {
|
||||||
|
&crate::metrics::LAYER_GET_VALUE_RECONSTRUCT_DATA_COMPLETION_TIME_ERROR
|
||||||
|
}
|
||||||
|
};
|
||||||
|
histo.observe(start.elapsed().as_secs_f64());
|
||||||
|
res?
|
||||||
|
}
|
||||||
|
|
||||||
/// A short ID string that uniquely identifies the given layer within a [`LayerMap`].
|
/// A short ID string that uniquely identifies the given layer within a [`LayerMap`].
|
||||||
fn short_id(&self) -> String;
|
fn short_id(&self) -> String;
|
||||||
@@ -483,17 +545,8 @@ pub mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
impl Layer for LayerDescriptor {
|
impl Layer for LayerDescriptor {
|
||||||
fn get_value_reconstruct_data(
|
|
||||||
&self,
|
|
||||||
_key: Key,
|
|
||||||
_lsn_range: Range<Lsn>,
|
|
||||||
_reconstruct_data: &mut ValueReconstructState,
|
|
||||||
_ctx: &RequestContext,
|
|
||||||
) -> Result<ValueReconstructResult> {
|
|
||||||
todo!("This method shouldn't be part of the Layer trait")
|
|
||||||
}
|
|
||||||
|
|
||||||
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
@@ -508,6 +561,16 @@ pub mod tests {
|
|||||||
self.layer_desc().lsn_range.clone()
|
self.layer_desc().lsn_range.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_value_reconstruct_data_blocking(
|
||||||
|
&self,
|
||||||
|
_key: Key,
|
||||||
|
_lsn_range: Range<Lsn>,
|
||||||
|
_reconstruct_data: ValueReconstructState,
|
||||||
|
_ctx: RequestContext,
|
||||||
|
) -> Result<(ValueReconstructState, ValueReconstructResult)> {
|
||||||
|
todo!("This method shouldn't be part of the Layer trait")
|
||||||
|
}
|
||||||
|
|
||||||
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
|
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
|
||||||
fn is_incremental(&self) -> bool {
|
fn is_incremental(&self) -> bool {
|
||||||
self.layer_desc().is_incremental
|
self.layer_desc().is_incremental
|
||||||
|
|||||||
@@ -218,6 +218,7 @@ impl std::fmt::Debug for DeltaLayerInner {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
impl Layer for DeltaLayer {
|
impl Layer for DeltaLayer {
|
||||||
/// debugging function to print out the contents of the layer
|
/// debugging function to print out the contents of the layer
|
||||||
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||||
@@ -294,13 +295,13 @@ impl Layer for DeltaLayer {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_value_reconstruct_data(
|
fn get_value_reconstruct_data_blocking(
|
||||||
&self,
|
&self,
|
||||||
key: Key,
|
key: Key,
|
||||||
lsn_range: Range<Lsn>,
|
lsn_range: Range<Lsn>,
|
||||||
reconstruct_state: &mut ValueReconstructState,
|
mut reconstruct_state: ValueReconstructState,
|
||||||
ctx: &RequestContext,
|
ctx: RequestContext,
|
||||||
) -> anyhow::Result<ValueReconstructResult> {
|
) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> {
|
||||||
ensure!(lsn_range.start >= self.desc.lsn_range.start);
|
ensure!(lsn_range.start >= self.desc.lsn_range.start);
|
||||||
let mut need_image = true;
|
let mut need_image = true;
|
||||||
|
|
||||||
@@ -308,7 +309,7 @@ impl Layer for DeltaLayer {
|
|||||||
|
|
||||||
{
|
{
|
||||||
// Open the file and lock the metadata in memory
|
// Open the file and lock the metadata in memory
|
||||||
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
|
let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;
|
||||||
|
|
||||||
// Scan the page versions backwards, starting from `lsn`.
|
// Scan the page versions backwards, starting from `lsn`.
|
||||||
let file = &inner.file;
|
let file = &inner.file;
|
||||||
@@ -374,9 +375,9 @@ impl Layer for DeltaLayer {
|
|||||||
// If an older page image is needed to reconstruct the page, let the
|
// If an older page image is needed to reconstruct the page, let the
|
||||||
// caller know.
|
// caller know.
|
||||||
if need_image {
|
if need_image {
|
||||||
Ok(ValueReconstructResult::Continue)
|
Ok((reconstruct_state, ValueReconstructResult::Continue))
|
||||||
} else {
|
} else {
|
||||||
Ok(ValueReconstructResult::Complete)
|
Ok((reconstruct_state, ValueReconstructResult::Complete))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -149,6 +149,7 @@ impl std::fmt::Debug for ImageLayerInner {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
impl Layer for ImageLayer {
|
impl Layer for ImageLayer {
|
||||||
/// debugging function to print out the contents of the layer
|
/// debugging function to print out the contents of the layer
|
||||||
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||||
@@ -181,18 +182,18 @@ impl Layer for ImageLayer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Look up given page in the file
|
/// Look up given page in the file
|
||||||
fn get_value_reconstruct_data(
|
fn get_value_reconstruct_data_blocking(
|
||||||
&self,
|
&self,
|
||||||
key: Key,
|
key: Key,
|
||||||
lsn_range: Range<Lsn>,
|
lsn_range: Range<Lsn>,
|
||||||
reconstruct_state: &mut ValueReconstructState,
|
mut reconstruct_state: ValueReconstructState,
|
||||||
ctx: &RequestContext,
|
ctx: RequestContext,
|
||||||
) -> anyhow::Result<ValueReconstructResult> {
|
) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> {
|
||||||
assert!(self.desc.key_range.contains(&key));
|
assert!(self.desc.key_range.contains(&key));
|
||||||
assert!(lsn_range.start >= self.lsn);
|
assert!(lsn_range.start >= self.lsn);
|
||||||
assert!(lsn_range.end >= self.lsn);
|
assert!(lsn_range.end >= self.lsn);
|
||||||
|
|
||||||
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
|
let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;
|
||||||
|
|
||||||
let file = inner.file.as_ref().unwrap();
|
let file = inner.file.as_ref().unwrap();
|
||||||
let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
|
let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
|
||||||
@@ -210,9 +211,9 @@ impl Layer for ImageLayer {
|
|||||||
let value = Bytes::from(blob);
|
let value = Bytes::from(blob);
|
||||||
|
|
||||||
reconstruct_state.img = Some((self.lsn, value));
|
reconstruct_state.img = Some((self.lsn, value));
|
||||||
Ok(ValueReconstructResult::Complete)
|
Ok((reconstruct_state, ValueReconstructResult::Complete))
|
||||||
} else {
|
} else {
|
||||||
Ok(ValueReconstructResult::Missing)
|
Ok((reconstruct_state, ValueReconstructResult::Missing))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -110,6 +110,7 @@ impl InMemoryLayer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
impl Layer for InMemoryLayer {
|
impl Layer for InMemoryLayer {
|
||||||
fn get_key_range(&self) -> Range<Key> {
|
fn get_key_range(&self) -> Range<Key> {
|
||||||
Key::MIN..Key::MAX
|
Key::MIN..Key::MAX
|
||||||
@@ -190,13 +191,13 @@ impl Layer for InMemoryLayer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Look up given value in the layer.
|
/// Look up given value in the layer.
|
||||||
fn get_value_reconstruct_data(
|
fn get_value_reconstruct_data_blocking(
|
||||||
&self,
|
&self,
|
||||||
key: Key,
|
key: Key,
|
||||||
lsn_range: Range<Lsn>,
|
lsn_range: Range<Lsn>,
|
||||||
reconstruct_state: &mut ValueReconstructState,
|
mut reconstruct_state: ValueReconstructState,
|
||||||
_ctx: &RequestContext,
|
_ctx: RequestContext,
|
||||||
) -> anyhow::Result<ValueReconstructResult> {
|
) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> {
|
||||||
ensure!(lsn_range.start >= self.start_lsn);
|
ensure!(lsn_range.start >= self.start_lsn);
|
||||||
let mut need_image = true;
|
let mut need_image = true;
|
||||||
|
|
||||||
@@ -213,7 +214,7 @@ impl Layer for InMemoryLayer {
|
|||||||
match value {
|
match value {
|
||||||
Value::Image(img) => {
|
Value::Image(img) => {
|
||||||
reconstruct_state.img = Some((*entry_lsn, img));
|
reconstruct_state.img = Some((*entry_lsn, img));
|
||||||
return Ok(ValueReconstructResult::Complete);
|
return Ok((reconstruct_state, ValueReconstructResult::Complete));
|
||||||
}
|
}
|
||||||
Value::WalRecord(rec) => {
|
Value::WalRecord(rec) => {
|
||||||
let will_init = rec.will_init();
|
let will_init = rec.will_init();
|
||||||
@@ -233,9 +234,9 @@ impl Layer for InMemoryLayer {
|
|||||||
// If an older page image is needed to reconstruct the page, let the
|
// If an older page image is needed to reconstruct the page, let the
|
||||||
// caller know.
|
// caller know.
|
||||||
if need_image {
|
if need_image {
|
||||||
Ok(ValueReconstructResult::Continue)
|
Ok((reconstruct_state, ValueReconstructResult::Continue))
|
||||||
} else {
|
} else {
|
||||||
Ok(ValueReconstructResult::Complete)
|
Ok((reconstruct_state, ValueReconstructResult::Complete))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ use crate::context::RequestContext;
|
|||||||
use crate::repository::Key;
|
use crate::repository::Key;
|
||||||
use crate::tenant::layer_map::BatchedUpdates;
|
use crate::tenant::layer_map::BatchedUpdates;
|
||||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||||
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
|
use crate::tenant::storage_layer::{Layer, ValueReconstructState};
|
||||||
use anyhow::{bail, Result};
|
use anyhow::{bail, Result};
|
||||||
use pageserver_api::models::HistoricLayerInfo;
|
use pageserver_api::models::HistoricLayerInfo;
|
||||||
use std::ops::Range;
|
use std::ops::Range;
|
||||||
@@ -21,7 +21,7 @@ use utils::{
|
|||||||
use super::filename::{DeltaFileName, ImageFileName};
|
use super::filename::{DeltaFileName, ImageFileName};
|
||||||
use super::{
|
use super::{
|
||||||
DeltaLayer, ImageLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter, LayerKeyIter,
|
DeltaLayer, ImageLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter, LayerKeyIter,
|
||||||
LayerResidenceStatus, PersistentLayer, PersistentLayerDesc,
|
LayerResidenceStatus, PersistentLayer, PersistentLayerDesc, ValueReconstructResult,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
|
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
|
||||||
@@ -63,14 +63,15 @@ impl std::fmt::Debug for RemoteLayer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
impl Layer for RemoteLayer {
|
impl Layer for RemoteLayer {
|
||||||
fn get_value_reconstruct_data(
|
fn get_value_reconstruct_data_blocking(
|
||||||
&self,
|
&self,
|
||||||
_key: Key,
|
_key: Key,
|
||||||
_lsn_range: Range<Lsn>,
|
_lsn_range: Range<Lsn>,
|
||||||
_reconstruct_state: &mut ValueReconstructState,
|
_reconstruct_state: ValueReconstructState,
|
||||||
_ctx: &RequestContext,
|
_ctx: RequestContext,
|
||||||
) -> Result<ValueReconstructResult> {
|
) -> Result<(ValueReconstructState, ValueReconstructResult)> {
|
||||||
bail!(
|
bail!(
|
||||||
"layer {} needs to be downloaded",
|
"layer {} needs to be downloaded",
|
||||||
self.filename().file_name()
|
self.filename().file_name()
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ use bytes::Bytes;
|
|||||||
use fail::fail_point;
|
use fail::fail_point;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use once_cell::sync::OnceCell;
|
use once_cell::sync::{Lazy, OnceCell};
|
||||||
use pageserver_api::models::{
|
use pageserver_api::models::{
|
||||||
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
|
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
|
||||||
DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceEventReason, LayerResidenceStatus,
|
DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceEventReason, LayerResidenceStatus,
|
||||||
@@ -660,16 +660,33 @@ impl Timeline {
|
|||||||
None => None,
|
None => None,
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut reconstruct_state = ValueReconstructState {
|
let reconstruct_state = ValueReconstructState {
|
||||||
records: Vec::new(),
|
records: Vec::new(),
|
||||||
img: cached_page_img,
|
img: cached_page_img,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static GET_RECONSTRUCT_DATA_CONCURRENCY: Lazy<Option<usize>> = Lazy::new(|| {
|
||||||
|
std::env::var("PAGESERVER_TIMELINE_GET_RECONSTRUCT_DATA_CONCURRENCY_LIMIT")
|
||||||
|
.ok()
|
||||||
|
.and_then(|s| s.parse().ok())
|
||||||
|
});
|
||||||
|
static GET_RECONSTRUCT_DATA_SEMAPHORE: Lazy<Option<Semaphore>> =
|
||||||
|
Lazy::new(|| (*GET_RECONSTRUCT_DATA_CONCURRENCY).map(Semaphore::new));
|
||||||
|
|
||||||
|
let permit = if let Some(sem) = GET_RECONSTRUCT_DATA_SEMAPHORE.as_ref() {
|
||||||
|
Some(sem.acquire().await)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
let timer = self.metrics.get_reconstruct_data_time_histo.start_timer();
|
let timer = self.metrics.get_reconstruct_data_time_histo.start_timer();
|
||||||
self.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
|
let reconstruct_state = self
|
||||||
|
.get_reconstruct_data(key, lsn, reconstruct_state, ctx)
|
||||||
.await?;
|
.await?;
|
||||||
timer.stop_and_record();
|
timer.stop_and_record();
|
||||||
|
|
||||||
|
drop(permit);
|
||||||
|
|
||||||
RECONSTRUCT_TIME
|
RECONSTRUCT_TIME
|
||||||
.observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state))
|
.observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state))
|
||||||
}
|
}
|
||||||
@@ -2495,9 +2512,9 @@ impl Timeline {
|
|||||||
&self,
|
&self,
|
||||||
key: Key,
|
key: Key,
|
||||||
request_lsn: Lsn,
|
request_lsn: Lsn,
|
||||||
reconstruct_state: &mut ValueReconstructState,
|
mut reconstruct_state: ValueReconstructState,
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> Result<(), PageReconstructError> {
|
) -> Result<ValueReconstructState, PageReconstructError> {
|
||||||
// Start from the current timeline.
|
// Start from the current timeline.
|
||||||
let mut timeline_owned;
|
let mut timeline_owned;
|
||||||
let mut timeline = self;
|
let mut timeline = self;
|
||||||
@@ -2527,12 +2544,12 @@ impl Timeline {
|
|||||||
// The function should have updated 'state'
|
// The function should have updated 'state'
|
||||||
//info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
|
//info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
|
||||||
match result {
|
match result {
|
||||||
ValueReconstructResult::Complete => return Ok(()),
|
ValueReconstructResult::Complete => return Ok(reconstruct_state),
|
||||||
ValueReconstructResult::Continue => {
|
ValueReconstructResult::Continue => {
|
||||||
// If we reached an earlier cached page image, we're done.
|
// If we reached an earlier cached page image, we're done.
|
||||||
if cont_lsn == cached_lsn + 1 {
|
if cont_lsn == cached_lsn + 1 {
|
||||||
MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
|
MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
|
||||||
return Ok(());
|
return Ok(reconstruct_state);
|
||||||
}
|
}
|
||||||
if prev_lsn <= cont_lsn {
|
if prev_lsn <= cont_lsn {
|
||||||
// Didn't make any progress in last iteration. Error out to avoid
|
// Didn't make any progress in last iteration. Error out to avoid
|
||||||
@@ -2637,13 +2654,19 @@ impl Timeline {
|
|||||||
// Get all the data needed to reconstruct the page version from this layer.
|
// Get all the data needed to reconstruct the page version from this layer.
|
||||||
// But if we have an older cached page image, no need to go past that.
|
// But if we have an older cached page image, no need to go past that.
|
||||||
let lsn_floor = max(cached_lsn + 1, start_lsn);
|
let lsn_floor = max(cached_lsn + 1, start_lsn);
|
||||||
result = match open_layer.get_value_reconstruct_data(
|
result = match Arc::clone(open_layer)
|
||||||
key,
|
.get_value_reconstruct_data(
|
||||||
lsn_floor..cont_lsn,
|
key,
|
||||||
reconstruct_state,
|
lsn_floor..cont_lsn,
|
||||||
ctx,
|
reconstruct_state,
|
||||||
) {
|
ctx.attached_child(),
|
||||||
Ok(result) => result,
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok((new_reconstruct_state, result)) => {
|
||||||
|
reconstruct_state = new_reconstruct_state;
|
||||||
|
result
|
||||||
|
}
|
||||||
Err(e) => return Err(PageReconstructError::from(e)),
|
Err(e) => return Err(PageReconstructError::from(e)),
|
||||||
};
|
};
|
||||||
cont_lsn = lsn_floor;
|
cont_lsn = lsn_floor;
|
||||||
@@ -2664,13 +2687,19 @@ impl Timeline {
|
|||||||
if cont_lsn > start_lsn {
|
if cont_lsn > start_lsn {
|
||||||
//info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
|
//info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
|
||||||
let lsn_floor = max(cached_lsn + 1, start_lsn);
|
let lsn_floor = max(cached_lsn + 1, start_lsn);
|
||||||
result = match frozen_layer.get_value_reconstruct_data(
|
result = match Arc::clone(frozen_layer)
|
||||||
key,
|
.get_value_reconstruct_data(
|
||||||
lsn_floor..cont_lsn,
|
key,
|
||||||
reconstruct_state,
|
lsn_floor..cont_lsn,
|
||||||
ctx,
|
reconstruct_state,
|
||||||
) {
|
ctx.attached_child(),
|
||||||
Ok(result) => result,
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok((new_reconstruct_state, result)) => {
|
||||||
|
reconstruct_state = new_reconstruct_state;
|
||||||
|
result
|
||||||
|
}
|
||||||
Err(e) => return Err(PageReconstructError::from(e)),
|
Err(e) => return Err(PageReconstructError::from(e)),
|
||||||
};
|
};
|
||||||
cont_lsn = lsn_floor;
|
cont_lsn = lsn_floor;
|
||||||
@@ -2700,13 +2729,19 @@ impl Timeline {
|
|||||||
// Get all the data needed to reconstruct the page version from this layer.
|
// Get all the data needed to reconstruct the page version from this layer.
|
||||||
// But if we have an older cached page image, no need to go past that.
|
// But if we have an older cached page image, no need to go past that.
|
||||||
let lsn_floor = max(cached_lsn + 1, lsn_floor);
|
let lsn_floor = max(cached_lsn + 1, lsn_floor);
|
||||||
result = match layer.get_value_reconstruct_data(
|
result = match Arc::clone(&layer)
|
||||||
key,
|
.get_value_reconstruct_data(
|
||||||
lsn_floor..cont_lsn,
|
key,
|
||||||
reconstruct_state,
|
lsn_floor..cont_lsn,
|
||||||
ctx,
|
reconstruct_state,
|
||||||
) {
|
ctx.attached_child(),
|
||||||
Ok(result) => result,
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok((new_reconstruct_state, result)) => {
|
||||||
|
reconstruct_state = new_reconstruct_state;
|
||||||
|
result
|
||||||
|
}
|
||||||
Err(e) => return Err(PageReconstructError::from(e)),
|
Err(e) => return Err(PageReconstructError::from(e)),
|
||||||
};
|
};
|
||||||
cont_lsn = lsn_floor;
|
cont_lsn = lsn_floor;
|
||||||
|
|||||||
Reference in New Issue
Block a user