Compare commits

...

27 Commits

Author SHA1 Message Date
Vlad Lazar
013e44f528 wip: remove gratuitous log 2024-04-23 11:55:42 +01:00
Vlad Lazar
c719f902a1 wip: fix new stats 2024-04-23 10:37:28 +01:00
Vlad Lazar
bb2215ef0c wip: even more stats 2024-04-23 10:24:49 +01:00
Vlad Lazar
294a2b2d6f wip: reset stats 2024-04-19 16:52:37 +01:00
Vlad Lazar
7f8a6702fc wip: reset stats 2024-04-19 16:51:33 +01:00
Vlad Lazar
76502b96bd wip: more data 2024-04-19 15:41:28 +01:00
Vlad Lazar
a67f496b09 wip: track read path stats in context 2024-04-19 14:26:13 +01:00
Vlad Lazar
761f211f55 sq: error parity 2024-04-19 12:16:18 +01:00
Vlad Lazar
dc2c6367af sq: error parity 2024-04-19 10:50:02 +01:00
Vlad Lazar
a65f067799 sq: improve error parity 2024-04-19 10:50:02 +01:00
Vlad Lazar
f36561746e tests: handle not found errors from both read paths in lsn mapping test 2024-04-19 10:50:02 +01:00
Vlad Lazar
c121d3a3c7 sq validate 2024-04-19 10:50:02 +01:00
Vlad Lazar
76abb4e7a4 pagserver: cargo fmt 2024-04-19 10:50:02 +01:00
Vlad Lazar
d2c806fa40 pageserver: skip unnecessary fringe update on vectored read path 2024-04-19 10:50:02 +01:00
Vlad Lazar
def9b17b15 pageserver: achieve parity between read paths for invalid lsns 2024-04-19 10:50:02 +01:00
Vlad Lazar
0e8f9b82aa tests: unset new config for forward compat test 2024-04-19 10:50:02 +01:00
Vlad Lazar
2cd9b3cab5 pageserver: imrpove error parity between read paths 2024-04-19 10:50:02 +01:00
Vlad Lazar
e489fd7a1d test: use vectored impl for singular gets in tests 2024-04-19 10:50:01 +01:00
Vlad Lazar
d45eca992f pageserver: validate singular vectored get results 2024-04-19 10:50:01 +01:00
Vlad Lazar
02f82ebd79 pageserver: use vectored get for singular gets when configured 2024-04-19 10:50:01 +01:00
Vlad Lazar
5a1dce5e3d pageserver: fix vectored read path cache handling 2024-04-19 10:50:01 +01:00
Vlad Lazar
7fa1915229 pageserver/config: add config for singular get impl 2024-04-19 10:50:01 +01:00
Vlad Lazar
fc8714215c pageserver/timeline: replicate metrics on vectored get path 2024-04-19 10:50:01 +01:00
Vlad Lazar
632613273f pageserver/metrics: add get kind label for get reconstruct data time 2024-04-19 10:50:01 +01:00
Vlad Lazar
96eb1631d3 pageserver/metrics: add get kind label for reconstruct time 2024-04-19 10:50:01 +01:00
Vlad Lazar
7f674c83e1 pageserver: fix aux key retrieval for vectored get
Problem
Vectored get would descend into ancestor timelines for aux files.
This is not the behaviour of the legacy read path and blocks cutting
over to the vectored read path.

Summary of Changes
Treat non inherited keys specially in vectored get. At the point when
we want to descend into the ancestor mark all pending non inherited keys
as errored out at the key level. Note that this diverges from the
standard vectored get behaviour for missing keys which is a top level
error. This divergence is required to avoid blocking compaction in case
such an error is encountered when compaction aux files keys. I'm pretty
sure the bug I just described predates the vectored get implementation,
but it's still worth fixing.
2024-04-19 10:50:01 +01:00
Vlad Lazar
e5411ee556 pageserver_api: return removed overlapping keyspace 2024-04-19 10:50:01 +01:00
22 changed files with 853 additions and 110 deletions

View File

@@ -477,6 +477,7 @@ jobs:
BUILD_TAG: ${{ needs.tag.outputs.build-tag }}
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
PAGESERVER_GET_VECTORED_IMPL: vectored
PAGESERVER_GET_IMPL: vectored
# Temporary disable this step until we figure out why it's so flaky
# Ref https://github.com/neondatabase/neon/issues/4540

View File

@@ -129,6 +129,7 @@ pub struct PageServerConf {
pub(crate) virtual_file_io_engine: Option<String>,
pub(crate) get_vectored_impl: Option<String>,
pub(crate) get_impl: Option<String>,
}
impl Default for PageServerConf {
@@ -141,6 +142,7 @@ impl Default for PageServerConf {
http_auth_type: AuthType::Trust,
virtual_file_io_engine: None,
get_vectored_impl: None,
get_impl: None,
}
}
}

View File

@@ -92,6 +92,7 @@ impl PageServerNode {
http_auth_type,
virtual_file_io_engine,
get_vectored_impl,
get_impl,
} = &self.conf;
let id = format!("id={}", id);
@@ -111,6 +112,11 @@ impl PageServerNode {
} else {
String::new()
};
let get_impl = if let Some(get_impl) = get_impl {
format!("get_impl='{get_impl}'")
} else {
String::new()
};
let broker_endpoint_param = format!("broker_endpoint='{}'", self.env.broker.client_url());
@@ -124,6 +130,7 @@ impl PageServerNode {
broker_endpoint_param,
virtual_file_io_engine,
get_vectored_impl,
get_impl,
];
if let Some(control_plane_api) = &self.env.control_plane_api {

View File

@@ -48,11 +48,11 @@ impl Key {
}
}
pub fn next(&self) -> Key {
pub const fn next(&self) -> Key {
self.add(1)
}
pub fn add(&self, x: u32) -> Key {
pub const fn add(&self, x: u32) -> Key {
let mut key = *self;
let r = key.field6.overflowing_add(x);
@@ -483,6 +483,8 @@ pub fn is_inherited_key(key: Key) -> bool {
key != AUX_FILES_KEY
}
pub const NON_INHERITED_RANGE: Range<Key> = AUX_FILES_KEY..AUX_FILES_KEY.next();
#[inline(always)]
pub fn is_rel_fsm_block_key(key: Key) -> bool {
key.field1 == 0x00 && key.field4 != 0 && key.field5 == FSM_FORKNUM && key.field6 != 0xffffffff

View File

@@ -94,12 +94,13 @@ impl KeySpace {
/// Remove all keys in `other` from `self`.
/// This can involve splitting or removing of existing ranges.
pub fn remove_overlapping_with(&mut self, other: &KeySpace) {
/// Returns the removed keyspace
pub fn remove_overlapping_with(&mut self, other: &KeySpace) -> KeySpace {
let (self_start, self_end) = match (self.start(), self.end()) {
(Some(start), Some(end)) => (start, end),
_ => {
// self is empty
return;
return KeySpace::default();
}
};
@@ -112,30 +113,37 @@ impl KeySpace {
.skip_while(|range| self_start >= range.end)
.take_while(|range| self_end > range.start);
let mut removed_accum = KeySpaceRandomAccum::new();
for range in other_ranges {
while let Some(overlap_at) = self.overlaps_at(range) {
let overlapped = self.ranges[overlap_at].clone();
if overlapped.start < range.start && overlapped.end <= range.end {
// Higher part of the range is completely overlapped.
removed_accum.add_range(range.start..self.ranges[overlap_at].end);
self.ranges[overlap_at].end = range.start;
}
if overlapped.start >= range.start && overlapped.end > range.end {
// Lower part of the range is completely overlapped.
removed_accum.add_range(self.ranges[overlap_at].start..range.end);
self.ranges[overlap_at].start = range.end;
}
if overlapped.start < range.start && overlapped.end > range.end {
// Middle part of the range is overlapped.
removed_accum.add_range(range.clone());
self.ranges[overlap_at].end = range.start;
self.ranges
.insert(overlap_at + 1, range.end..overlapped.end);
}
if overlapped.start >= range.start && overlapped.end <= range.end {
// Whole range is overlapped
removed_accum.add_range(self.ranges[overlap_at].clone());
self.ranges.remove(overlap_at);
}
}
}
removed_accum.to_keyspace()
}
pub fn start(&self) -> Option<Key> {
@@ -170,6 +178,11 @@ impl KeySpace {
pub fn overlaps(&self, range: &Range<Key>) -> bool {
self.overlaps_at(range).is_some()
}
/// Check if the keyspace contains a key
pub fn contains(&self, key: &Key) -> bool {
self.overlaps(&(*key..key.next()))
}
}
///
@@ -553,7 +566,16 @@ mod tests {
Key::from_i128(11)..Key::from_i128(13),
],
};
key_space1.remove_overlapping_with(&key_space2);
let removed = key_space1.remove_overlapping_with(&key_space2);
let removed_expected = KeySpace {
ranges: vec![
Key::from_i128(2)..Key::from_i128(3),
Key::from_i128(6)..Key::from_i128(7),
Key::from_i128(11)..Key::from_i128(12),
],
};
assert_eq!(removed, removed_expected);
assert_eq!(
key_space1.ranges,
vec![
@@ -583,7 +605,17 @@ mod tests {
Key::from_i128(14)..Key::from_i128(17),
],
};
key_space1.remove_overlapping_with(&key_space2);
let removed = key_space1.remove_overlapping_with(&key_space2);
let removed_expected = KeySpace {
ranges: vec![
Key::from_i128(3)..Key::from_i128(5),
Key::from_i128(8)..Key::from_i128(10),
Key::from_i128(14)..Key::from_i128(15),
],
};
assert_eq!(removed, removed_expected);
assert_eq!(
key_space1.ranges,
vec![
@@ -610,7 +642,11 @@ mod tests {
Key::from_i128(15)..Key::from_i128(17),
],
};
key_space1.remove_overlapping_with(&key_space2);
let removed = key_space1.remove_overlapping_with(&key_space2);
let removed_expected = KeySpace::default();
assert_eq!(removed, removed_expected);
assert_eq!(
key_space1.ranges,
vec![
@@ -637,7 +673,17 @@ mod tests {
let key_space2 = KeySpace {
ranges: vec![Key::from_i128(9)..Key::from_i128(19)],
};
key_space1.remove_overlapping_with(&key_space2);
let removed = key_space1.remove_overlapping_with(&key_space2);
let removed_expected = KeySpace {
ranges: vec![
Key::from_i128(9)..Key::from_i128(10),
Key::from_i128(12)..Key::from_i128(15),
Key::from_i128(17)..Key::from_i128(19),
],
};
assert_eq!(removed, removed_expected);
assert_eq!(
key_space1.ranges,
vec![

View File

@@ -121,8 +121,10 @@ fn main() -> anyhow::Result<()> {
&[("node_id", &conf.id.to_string())],
);
// after setting up logging, log the effective IO engine choice
// after setting up logging, log the effective IO engine choice and read path implementations
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.get_impl, "starting with get page implementation");
info!(?conf.get_vectored_impl, "starting with vectored get page implementation");
let tenants_path = conf.tenants_path();
if !tenants_path.exists() {

View File

@@ -30,9 +30,9 @@ use utils::{
logging::LogFormat,
};
use crate::tenant::config::TenantConfOpt;
use crate::tenant::timeline::GetVectoredImpl;
use crate::tenant::vectored_blob_io::MaxVectoredReadBytes;
use crate::tenant::{config::TenantConfOpt, timeline::GetImpl};
use crate::tenant::{
TENANTS_SEGMENT_NAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
};
@@ -91,6 +91,8 @@ pub mod defaults {
pub const DEFAULT_GET_VECTORED_IMPL: &str = "sequential";
pub const DEFAULT_GET_IMPL: &str = "legacy";
pub const DEFAULT_MAX_VECTORED_READ_BYTES: usize = 128 * 1024; // 128 KiB
pub const DEFAULT_VALIDATE_VECTORED_GET: bool = true;
@@ -138,6 +140,8 @@ pub mod defaults {
#get_vectored_impl = '{DEFAULT_GET_VECTORED_IMPL}'
#get_impl = '{DEFAULT_GET_IMPL}'
#max_vectored_read_bytes = '{DEFAULT_MAX_VECTORED_READ_BYTES}'
#validate_vectored_get = '{DEFAULT_VALIDATE_VECTORED_GET}'
@@ -284,6 +288,8 @@ pub struct PageServerConf {
pub get_vectored_impl: GetVectoredImpl,
pub get_impl: GetImpl,
pub max_vectored_read_bytes: MaxVectoredReadBytes,
pub validate_vectored_get: bool,
@@ -414,6 +420,8 @@ struct PageServerConfigBuilder {
get_vectored_impl: BuilderValue<GetVectoredImpl>,
get_impl: BuilderValue<GetImpl>,
max_vectored_read_bytes: BuilderValue<MaxVectoredReadBytes>,
validate_vectored_get: BuilderValue<bool>,
@@ -503,6 +511,7 @@ impl PageServerConfigBuilder {
virtual_file_io_engine: Set(DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap()),
get_vectored_impl: Set(DEFAULT_GET_VECTORED_IMPL.parse().unwrap()),
get_impl: Set(DEFAULT_GET_IMPL.parse().unwrap()),
max_vectored_read_bytes: Set(MaxVectoredReadBytes(
NonZeroUsize::new(DEFAULT_MAX_VECTORED_READ_BYTES).unwrap(),
)),
@@ -681,6 +690,10 @@ impl PageServerConfigBuilder {
self.get_vectored_impl = BuilderValue::Set(value);
}
pub fn get_impl(&mut self, value: GetImpl) {
self.get_impl = BuilderValue::Set(value);
}
pub fn get_max_vectored_read_bytes(&mut self, value: MaxVectoredReadBytes) {
self.max_vectored_read_bytes = BuilderValue::Set(value);
}
@@ -750,6 +763,7 @@ impl PageServerConfigBuilder {
secondary_download_concurrency,
ingest_batch_size,
get_vectored_impl,
get_impl,
max_vectored_read_bytes,
validate_vectored_get,
ephemeral_bytes_per_memory_kb,
@@ -1035,6 +1049,9 @@ impl PageServerConf {
"get_vectored_impl" => {
builder.get_vectored_impl(parse_toml_from_str("get_vectored_impl", item)?)
}
"get_impl" => {
builder.get_impl(parse_toml_from_str("get_impl", item)?)
}
"max_vectored_read_bytes" => {
let bytes = parse_toml_u64("max_vectored_read_bytes", item)? as usize;
builder.get_max_vectored_read_bytes(
@@ -1126,6 +1143,7 @@ impl PageServerConf {
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
get_impl: defaults::DEFAULT_GET_IMPL.parse().unwrap(),
max_vectored_read_bytes: MaxVectoredReadBytes(
NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
.expect("Invalid default constant"),
@@ -1365,6 +1383,7 @@ background_task_maximum_delay = '334 s'
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
get_impl: defaults::DEFAULT_GET_IMPL.parse().unwrap(),
max_vectored_read_bytes: MaxVectoredReadBytes(
NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
.expect("Invalid default constant")
@@ -1438,6 +1457,7 @@ background_task_maximum_delay = '334 s'
ingest_batch_size: 100,
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
get_impl: defaults::DEFAULT_GET_IMPL.parse().unwrap(),
max_vectored_read_bytes: MaxVectoredReadBytes(
NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
.expect("Invalid default constant")

View File

@@ -86,6 +86,11 @@
//! [`RequestContext`] argument. Functions in the middle of the call chain
//! only need to pass it on.
use std::{
sync::{atomic::AtomicU32, Arc},
time::Duration,
};
use crate::task_mgr::TaskKind;
pub(crate) mod optional_counter;
@@ -98,6 +103,156 @@ pub struct RequestContext {
access_stats_behavior: AccessStatsBehavior,
page_content_kind: PageContentKind,
pub micros_spent_throttled: optional_counter::MicroSecondsCounterU32,
pub read_path_stats: ReadPathStats,
}
#[derive(Debug, Default, Clone)]
pub struct ReadPathStats {
pub inner: Arc<ReadPathStatsInner>,
}
#[derive(Debug, Default)]
pub struct ReadPathStatsInner {
pub get_reconstruct_data_time: AtomicU32,
pub plan_read_time: AtomicU32,
pub read_time: AtomicU32,
pub sort_reconstruct_data_time: AtomicU32,
pub layer_search_time: AtomicU32,
pub keyspace_manipulation_time: AtomicU32,
pub in_mem_layer_find_time: AtomicU32,
pub fringe_fondle_time: AtomicU32,
pub layer_map_search_time: AtomicU32,
pub buffer_cache_hits: AtomicU32,
pub layers_visited: AtomicU32,
}
impl std::fmt::Display for ReadPathStatsInner {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"get_reconstruct_data_time={} plan_read_time={} read_time={} sort_time={} layer_search_time={} keyspace_manipulation_time={} in_mem_layer_find_time={} fringe_fondle_time={} layer_map_search_time={} buffer_cache_hits={} layers_visited={}",
self.get_reconstruct_data_time.load(std::sync::atomic::Ordering::Relaxed),
self.plan_read_time
.load(std::sync::atomic::Ordering::Relaxed),
self.read_time.load(std::sync::atomic::Ordering::Relaxed),
self.sort_reconstruct_data_time
.load(std::sync::atomic::Ordering::Relaxed),
self.layer_search_time
.load(std::sync::atomic::Ordering::Relaxed),
self.keyspace_manipulation_time
.load(std::sync::atomic::Ordering::Relaxed),
self.in_mem_layer_find_time
.load(std::sync::atomic::Ordering::Relaxed),
self.fringe_fondle_time
.load(std::sync::atomic::Ordering::Relaxed),
self.layer_map_search_time
.load(std::sync::atomic::Ordering::Relaxed),
self.buffer_cache_hits
.load(std::sync::atomic::Ordering::Relaxed),
self.layers_visited
.load(std::sync::atomic::Ordering::Relaxed)
)
}
}
impl ReadPathStatsInner {
pub fn reset(&self) {
self.get_reconstruct_data_time
.store(0, std::sync::atomic::Ordering::Relaxed);
self.plan_read_time
.store(0, std::sync::atomic::Ordering::Relaxed);
self.read_time
.store(0, std::sync::atomic::Ordering::Relaxed);
self.sort_reconstruct_data_time
.store(0, std::sync::atomic::Ordering::Relaxed);
self.layer_search_time
.store(0, std::sync::atomic::Ordering::Relaxed);
self.keyspace_manipulation_time
.store(0, std::sync::atomic::Ordering::Relaxed);
self.in_mem_layer_find_time
.store(0, std::sync::atomic::Ordering::Relaxed);
self.fringe_fondle_time
.store(0, std::sync::atomic::Ordering::Relaxed);
self.layer_map_search_time
.store(0, std::sync::atomic::Ordering::Relaxed);
self.buffer_cache_hits
.store(0, std::sync::atomic::Ordering::Relaxed);
self.layers_visited
.store(0, std::sync::atomic::Ordering::Relaxed);
}
pub fn add_get_reconstruct_data_time(&self, dur: Duration) {
self.get_reconstruct_data_time.fetch_add(
dur.as_micros().try_into().unwrap(),
std::sync::atomic::Ordering::Relaxed,
);
}
pub fn add_plan_read_time(&self, dur: Duration) {
self.plan_read_time.fetch_add(
dur.as_micros().try_into().unwrap(),
std::sync::atomic::Ordering::Relaxed,
);
}
pub fn add_read_time(&self, dur: Duration) {
self.read_time.fetch_add(
dur.as_micros().try_into().unwrap(),
std::sync::atomic::Ordering::Relaxed,
);
}
pub fn add_sort_reconstruct_data_time(&self, dur: Duration) {
self.sort_reconstruct_data_time.fetch_add(
dur.as_micros().try_into().unwrap(),
std::sync::atomic::Ordering::Relaxed,
);
}
pub fn add_layer_search_time(&self, dur: Duration) {
self.layer_search_time.fetch_add(
dur.as_micros().try_into().unwrap(),
std::sync::atomic::Ordering::Relaxed,
);
}
pub fn add_keyspace_manipulation_timer(&self, dur: Duration) {
self.keyspace_manipulation_time.fetch_add(
dur.as_micros().try_into().unwrap(),
std::sync::atomic::Ordering::Relaxed,
);
}
pub fn add_in_mem_layer_find_timer(&self, dur: Duration) {
self.in_mem_layer_find_time.fetch_add(
dur.as_micros().try_into().unwrap(),
std::sync::atomic::Ordering::Relaxed,
);
}
pub fn add_fringe_fondle_time(&self, dur: Duration) {
self.fringe_fondle_time.fetch_add(
dur.as_micros().try_into().unwrap(),
std::sync::atomic::Ordering::Relaxed,
);
}
pub fn add_layer_map_search_time(&self, dur: Duration) {
self.layer_map_search_time.fetch_add(
dur.as_micros().try_into().unwrap(),
std::sync::atomic::Ordering::Relaxed,
);
}
pub fn inc_layer_visited(&self) {
self.layers_visited
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
pub fn inc_buffer_cache_hits(&self) {
self.buffer_cache_hits
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
/// The kind of access to the page cache.
@@ -154,6 +309,7 @@ impl RequestContextBuilder {
access_stats_behavior: AccessStatsBehavior::Update,
page_content_kind: PageContentKind::Unknown,
micros_spent_throttled: Default::default(),
read_path_stats: Default::default(),
},
}
}
@@ -168,6 +324,7 @@ impl RequestContextBuilder {
access_stats_behavior: original.access_stats_behavior,
page_content_kind: original.page_content_kind,
micros_spent_throttled: Default::default(),
read_path_stats: original.read_path_stats.clone(),
},
}
}
@@ -291,4 +448,12 @@ impl RequestContext {
pub(crate) fn page_content_kind(&self) -> PageContentKind {
self.page_content_kind
}
pub fn report_stats(&self) {
tracing::info!("Read path stats: {}", *self.read_path_stats.inner);
}
pub fn reset_stats(&self) {
self.read_path_stats.inner.reset();
}
}

View File

@@ -72,6 +72,7 @@ impl MicroSecondsCounterU32 {
Err(_) => Err("add(): duration conversion error"),
}
}
pub fn close_and_checked_sub_from(&self, from: Duration) -> Result<Duration, &'static str> {
let val = self.inner.close()?;
let val = Duration::from_micros(val as u64);

View File

@@ -96,31 +96,44 @@ pub(crate) static READ_NUM_FS_LAYERS: Lazy<Histogram> = Lazy::new(|| {
});
// Metrics collected on operations on the storage repository.
#[derive(
Clone,
Copy,
enum_map::Enum,
strum_macros::EnumString,
strum_macros::Display,
strum_macros::IntoStaticStr,
)]
pub(crate) enum GetKind {
Singular,
Vectored,
}
pub(crate) struct ReconstructTimeMetrics {
ok: Histogram,
err: Histogram,
singular: Histogram,
vectored: Histogram,
}
pub(crate) static RECONSTRUCT_TIME: Lazy<ReconstructTimeMetrics> = Lazy::new(|| {
let inner = register_histogram_vec!(
"pageserver_getpage_reconstruct_seconds",
"Time spent in reconstruct_value (reconstruct a page from deltas)",
&["result"],
&["get_kind"],
CRITICAL_OP_BUCKETS.into(),
)
.expect("failed to define a metric");
ReconstructTimeMetrics {
ok: inner.get_metric_with_label_values(&["ok"]).unwrap(),
err: inner.get_metric_with_label_values(&["err"]).unwrap(),
singular: inner.with_label_values(&[GetKind::Singular.into()]),
vectored: inner.with_label_values(&[GetKind::Vectored.into()]),
}
});
impl ReconstructTimeMetrics {
pub(crate) fn for_result<T, E>(&self, result: &Result<T, E>) -> &Histogram {
match result {
Ok(_) => &self.ok,
Err(_) => &self.err,
pub(crate) fn for_get_kind(&self, get_kind: GetKind) -> &Histogram {
match get_kind {
GetKind::Singular => &self.singular,
GetKind::Vectored => &self.vectored,
}
}
}
@@ -133,13 +146,33 @@ pub(crate) static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy<IntCounter> = Lazy::n
.expect("failed to define a metric")
});
pub(crate) static GET_RECONSTRUCT_DATA_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
pub(crate) struct ReconstructDataTimeMetrics {
singular: Histogram,
vectored: Histogram,
}
impl ReconstructDataTimeMetrics {
pub(crate) fn for_get_kind(&self, get_kind: GetKind) -> &Histogram {
match get_kind {
GetKind::Singular => &self.singular,
GetKind::Vectored => &self.vectored,
}
}
}
pub(crate) static GET_RECONSTRUCT_DATA_TIME: Lazy<ReconstructDataTimeMetrics> = Lazy::new(|| {
let inner = register_histogram_vec!(
"pageserver_getpage_get_reconstruct_data_seconds",
"Time spent in get_reconstruct_value_data",
&["get_kind"],
CRITICAL_OP_BUCKETS.into(),
)
.expect("failed to define a metric")
.expect("failed to define a metric");
ReconstructDataTimeMetrics {
singular: inner.with_label_values(&[GetKind::Singular.into()]),
vectored: inner.with_label_values(&[GetKind::Vectored.into()]),
}
});
pub(crate) static MATERIALIZED_PAGE_CACHE_HIT: Lazy<IntCounter> = Lazy::new(|| {

View File

@@ -3858,8 +3858,10 @@ mod tests {
use crate::DEFAULT_PG_VERSION;
use bytes::BytesMut;
use hex_literal::hex;
use pageserver_api::key::NON_INHERITED_RANGE;
use pageserver_api::keyspace::KeySpace;
use rand::{thread_rng, Rng};
use tests::storage_layer::ValuesReconstructState;
use tests::timeline::{GetVectoredError, ShutdownMode};
static TEST_KEY: Lazy<Key> =
@@ -4648,7 +4650,9 @@ mod tests {
for read in reads {
info!("Doing vectored read on {:?}", read);
let vectored_res = tline.get_vectored_impl(read.clone(), reads_lsn, &ctx).await;
let vectored_res = tline
.get_vectored_impl(read.clone(), reads_lsn, ValuesReconstructState::new(), &ctx)
.await;
tline
.validate_get_vectored_impl(&vectored_res, read, reads_lsn, &ctx)
.await;
@@ -4657,6 +4661,64 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_get_vectored_aux_files() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_get_vectored_aux_files")?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
.await?;
let tline = tline.raw_timeline().unwrap();
let mut modification = tline.begin_modification(Lsn(0x1000));
modification.put_file("foo/bar1", b"content1", &ctx).await?;
modification.set_lsn(Lsn(0x1008))?;
modification.put_file("foo/bar2", b"content2", &ctx).await?;
modification.commit(&ctx).await?;
let child_timeline_id = TimelineId::generate();
tenant
.branch_timeline_test(
tline,
child_timeline_id,
Some(tline.get_last_record_lsn()),
&ctx,
)
.await?;
let child_timeline = tenant
.get_timeline(child_timeline_id, true)
.expect("Should have the branched timeline");
let aux_keyspace = KeySpace {
ranges: vec![NON_INHERITED_RANGE],
};
let read_lsn = child_timeline.get_last_record_lsn();
let vectored_res = child_timeline
.get_vectored_impl(
aux_keyspace.clone(),
read_lsn,
ValuesReconstructState::new(),
&ctx,
)
.await;
child_timeline
.validate_get_vectored_impl(&vectored_res, aux_keyspace, read_lsn, &ctx)
.await;
let images = vectored_res?;
let mut key = NON_INHERITED_RANGE.start;
while key < NON_INHERITED_RANGE.end {
assert!(matches!(images[&key], Err(PageReconstructError::Other(_))));
key = key.next();
}
Ok(())
}
// Test that vectored get handles layer gaps correctly
// by advancing into the next ancestor timeline if required.
//
@@ -4785,7 +4847,12 @@ mod tests {
ranges: vec![key_near_gap..gap_at_key.next(), key_near_end..current_key],
};
let results = child_timeline
.get_vectored_impl(read.clone(), current_lsn, &ctx)
.get_vectored_impl(
read.clone(),
current_lsn,
ValuesReconstructState::new(),
&ctx,
)
.await?;
for (key, img_res) in results {
@@ -4918,6 +4985,7 @@ mod tests {
ranges: vec![child_gap_at_key..child_gap_at_key.next()],
},
query_lsn,
ValuesReconstructState::new(),
&ctx,
)
.await;

View File

@@ -203,7 +203,10 @@ impl<'a> FileBlockReader<'a> {
format!("Failed to read immutable buf: {e:#}"),
)
})? {
ReadBufResult::Found(guard) => Ok(guard.into()),
ReadBufResult::Found(guard) => {
ctx.read_path_stats.inner.inc_buffer_cache_hits();
Ok(guard.into())
}
ReadBufResult::NotFound(write_guard) => {
// Read the page from disk into the buffer
let write_guard = self.fill_buffer(write_guard, blknum).await?;

View File

@@ -138,6 +138,29 @@ impl ValuesReconstructState {
}
}
/// This function is called after reading a keyspace from a layer.
/// It checks if the read path has now moved past the cached Lsn for any keys.
///
/// Implementation note: We intentionally iterate over the keys for which we've
/// already collected some reconstruct data. This avoids scaling complexity with
/// the size of the search space.
pub(crate) fn on_lsn_advanced(&mut self, keyspace: &KeySpace, advanced_to: Lsn) {
for (key, value) in self.keys.iter_mut() {
if !keyspace.contains(key) {
continue;
}
if let Ok(state) = value {
if state.situation != ValueReconstructSituation::Complete
&& state.get_cached_lsn() >= Some(advanced_to)
{
state.situation = ValueReconstructSituation::Complete;
self.keys_done.add_key(*key);
}
}
}
}
/// Update the state collected for a given key.
/// Returns true if this was the last value needed for the key and false otherwise.
///
@@ -162,11 +185,18 @@ impl ValuesReconstructState {
true
}
Value::WalRecord(rec) => {
let reached_cache =
state.get_cached_lsn().map(|clsn| clsn + 1) == Some(lsn);
debug_assert!(
Some(lsn) > state.get_cached_lsn(),
"Attempt to collect a record below cached LSN for walredo: {} < {}",
lsn,
state
.get_cached_lsn()
.expect("Assertion can only fire if a cached lsn is present")
);
let will_init = rec.will_init();
state.records.push((lsn, rec));
will_init || reached_cache
will_init
}
},
};
@@ -349,11 +379,28 @@ impl ReadableLayer {
) -> Result<(), GetVectoredError> {
match self {
ReadableLayer::PersistentLayer(layer) => {
if keyspace.total_size() == 1 {
tracing::info!(
"Vectored path for {}: {} @ {:?}",
keyspace.start().unwrap(),
layer.local_path(),
lsn_range
);
}
layer
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
.await
}
ReadableLayer::InMemoryLayer(layer) => {
if keyspace.total_size() == 1 {
tracing::info!(
"Vectored path for {}: in mem layer @ {:?}",
keyspace.start().unwrap(),
lsn_range
);
}
layer
.get_values_reconstruct_data(keyspace, lsn_range.end, reconstruct_state, ctx)
.await

View File

@@ -58,6 +58,7 @@ use std::io::SeekFrom;
use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::OnceCell;
use tracing::*;
@@ -217,6 +218,7 @@ pub struct DeltaLayerInner {
// values copied from summary
index_start_blk: u32,
index_root_blk: u32,
lsn_range: Range<Lsn>,
file: VirtualFile,
file_id: FileId,
@@ -742,6 +744,7 @@ impl DeltaLayerInner {
file_id,
index_start_blk: actual_summary.index_start_blk,
index_root_blk: actual_summary.index_root_blk,
lsn_range: actual_summary.lsn_range,
max_vectored_read_bytes,
}))
}
@@ -753,6 +756,9 @@ impl DeltaLayerInner {
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
ctx.read_path_stats.inner.inc_layer_visited();
let plan_timer = Instant::now();
let mut need_image = true;
// Scan the page versions backwards, starting from `lsn`.
let block_reader = FileBlockReader::new(&self.file, self.file_id);
@@ -788,6 +794,12 @@ impl DeltaLayerInner {
)
.await?;
ctx.read_path_stats
.inner
.add_plan_read_time(plan_timer.elapsed());
let read_timer = Instant::now();
let ctx = &RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::DeltaLayerValue)
.build();
@@ -826,6 +838,10 @@ impl DeltaLayerInner {
}
}
ctx.read_path_stats
.inner
.add_read_time(read_timer.elapsed());
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
@@ -849,6 +865,9 @@ impl DeltaLayerInner {
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
ctx.read_path_stats.inner.inc_layer_visited();
let plan_timer = Instant::now();
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
self.index_start_blk,
@@ -866,7 +885,7 @@ impl DeltaLayerInner {
let data_end_offset = self.index_start_offset();
let reads = Self::plan_reads(
keyspace,
&keyspace,
lsn_range,
data_end_offset,
index_reader,
@@ -877,14 +896,26 @@ impl DeltaLayerInner {
.await
.map_err(GetVectoredError::Other)?;
ctx.read_path_stats
.inner
.add_plan_read_time(plan_timer.elapsed());
let read_timer = Instant::now();
self.do_reads_and_update_state(reads, reconstruct_state)
.await;
reconstruct_state.on_lsn_advanced(&keyspace, self.lsn_range.start);
ctx.read_path_stats
.inner
.add_read_time(read_timer.elapsed());
Ok(())
}
async fn plan_reads<Reader>(
keyspace: KeySpace,
keyspace: &KeySpace,
lsn_range: Range<Lsn>,
data_end_offset: u64,
index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
@@ -1532,7 +1563,7 @@ mod test {
// Plan and validate
let vectored_reads = DeltaLayerInner::plan_reads(
keyspace.clone(),
&keyspace,
lsn_range.clone(),
disk_offset,
reader,
@@ -1784,7 +1815,7 @@ mod test {
let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
let vectored_reads = DeltaLayerInner::plan_reads(
keyspace.clone(),
&keyspace,
entries_meta.lsn_range.clone(),
data_end_offset,
index_reader,

View File

@@ -55,6 +55,7 @@ use std::io::SeekFrom;
use std::ops::Range;
use std::os::unix::prelude::FileExt;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::OnceCell;
use tokio_stream::StreamExt;
use tracing::*;
@@ -422,6 +423,9 @@ impl ImageLayerInner {
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
ctx.read_path_stats.inner.inc_layer_visited();
let plan_timer = Instant::now();
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
@@ -437,6 +441,12 @@ impl ImageLayerInner {
)
.await?
{
ctx.read_path_stats
.inner
.add_plan_read_time(plan_timer.elapsed());
let read_timer = Instant::now();
let blob = block_reader
.block_cursor()
.read_blob(
@@ -450,6 +460,11 @@ impl ImageLayerInner {
let value = Bytes::from(blob);
reconstruct_state.img = Some((self.lsn, value));
ctx.read_path_stats
.inner
.add_read_time(read_timer.elapsed());
Ok(ValueReconstructResult::Complete)
} else {
Ok(ValueReconstructResult::Missing)
@@ -464,14 +479,27 @@ impl ImageLayerInner {
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
ctx.read_path_stats.inner.inc_layer_visited();
let plan_timer = Instant::now();
let reads = self
.plan_reads(keyspace, ctx)
.await
.map_err(GetVectoredError::Other)?;
ctx.read_path_stats
.inner
.add_plan_read_time(plan_timer.elapsed());
let read_timer = Instant::now();
self.do_reads_and_update_state(reads, reconstruct_state)
.await;
ctx.read_path_stats
.inner
.add_read_time(read_timer.elapsed());
Ok(())
}

View File

@@ -298,6 +298,9 @@ impl InMemoryLayer {
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
ctx.read_path_stats.inner.inc_layer_visited();
let read_timer = Instant::now();
ensure!(lsn_range.start >= self.start_lsn);
let mut need_image = true;
@@ -333,6 +336,10 @@ impl InMemoryLayer {
}
}
ctx.read_path_stats
.inner
.add_read_time(read_timer.elapsed());
// release lock on 'inner'
// If an older page image is needed to reconstruct the page, let the
@@ -355,6 +362,9 @@ impl InMemoryLayer {
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
ctx.read_path_stats.inner.inc_layer_visited();
let plan_timer = Instant::now();
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.build();
@@ -394,6 +404,12 @@ impl InMemoryLayer {
}
}
ctx.read_path_stats
.inner
.add_plan_read_time(plan_timer.elapsed());
let read_timer = Instant::now();
let keyspace_size = keyspace.total_size();
let mut completed_keys = HashSet::new();
@@ -426,6 +442,12 @@ impl InMemoryLayer {
}
}
reconstruct_state.on_lsn_advanced(&keyspace, self.start_lsn);
ctx.read_path_stats
.inner
.add_read_time(read_timer.elapsed());
Ok(())
}
}

View File

@@ -336,6 +336,12 @@ impl Layer {
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, &self.0, ctx)
.instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
.await
.map_err(|err| match err {
GetVectoredError::Other(err) => GetVectoredError::Other(
err.context(format!("get_values_reconstruct_data for layer {self}")),
),
err => err,
})
}
/// Download the layer if evicted.

View File

@@ -16,7 +16,7 @@ use enumset::EnumSet;
use fail::fail_point;
use once_cell::sync::Lazy;
use pageserver_api::{
key::AUX_FILES_KEY,
key::{AUX_FILES_KEY, NON_INHERITED_RANGE},
keyspace::KeySpaceAccum,
models::{
CompactionAlgorithm, DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
@@ -119,8 +119,8 @@ use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
use super::{config::TenantConf, storage_layer::VectoredValueReconstructState};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
use super::{remote_timeline_client::RemoteTimelineClient, storage_layer::ReadableLayer};
@@ -586,6 +586,19 @@ impl From<GetVectoredError> for CreateImageLayersError {
}
}
impl From<GetVectoredError> for PageReconstructError {
fn from(e: GetVectoredError) -> Self {
match e {
GetVectoredError::Cancelled => PageReconstructError::Cancelled,
GetVectoredError::InvalidLsn(_) => PageReconstructError::Other(anyhow!("Invalid LSN")),
err @ GetVectoredError::Oversized(_) => PageReconstructError::Other(err.into()),
err @ GetVectoredError::MissingKey(_) => PageReconstructError::Other(err.into()),
err @ GetVectoredError::GetReadyAncestorError(_) => PageReconstructError::from(err),
GetVectoredError::Other(err) => PageReconstructError::Other(err),
}
}
}
impl From<GetReadyAncestorError> for PageReconstructError {
fn from(e: GetReadyAncestorError) -> Self {
use GetReadyAncestorError::*;
@@ -615,6 +628,23 @@ pub enum GetVectoredImpl {
Vectored,
}
#[derive(
Eq,
PartialEq,
Debug,
Copy,
Clone,
strum_macros::EnumString,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
)]
#[strum(serialize_all = "kebab-case")]
pub enum GetImpl {
Legacy,
Vectored,
}
pub(crate) enum WaitLsnWaiter<'a> {
Timeline(&'a Timeline),
Tenant,
@@ -676,16 +706,6 @@ impl Timeline {
key: Key,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
self.timeline_get_throttle.throttle(ctx, 1).await;
self.get_impl(key, lsn, ctx).await
}
/// Not subject to [`Self::timeline_get_throttle`].
async fn get_impl(
&self,
key: Key,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
if !lsn.is_valid() {
return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
@@ -696,13 +716,7 @@ impl Timeline {
// page_service.
debug_assert!(!self.shard_identity.is_key_disposable(&key));
// XXX: structured stats collection for layer eviction here.
trace!(
"get page request for {}@{} from task kind {:?}",
key,
lsn,
ctx.task_kind()
);
self.timeline_get_throttle.throttle(ctx, 1).await;
// Check the page cache. We will get back the most recent page with lsn <= `lsn`.
// The cached image can be returned directly if there is no WAL between the cached image
@@ -725,22 +739,107 @@ impl Timeline {
None => None,
};
let mut reconstruct_state = ValueReconstructState {
records: Vec::new(),
img: cached_page_img,
let res = match self.conf.get_impl {
GetImpl::Legacy => {
let reconstruct_state = ValueReconstructState {
records: Vec::new(),
img: cached_page_img,
};
self.get_impl(key, lsn, reconstruct_state, ctx).await
}
GetImpl::Vectored => {
let keyspace = KeySpace {
ranges: vec![key..key.next()],
};
// Initialise the reconstruct state for the key with the cache
// entry returned above.
let mut reconstruct_state = ValuesReconstructState::new();
let mut key_state = VectoredValueReconstructState::default();
key_state.img = cached_page_img;
reconstruct_state.keys.insert(key, Ok(key_state));
let vectored_res = self
.get_vectored_impl(keyspace.clone(), lsn, reconstruct_state, ctx)
.await;
if self.conf.validate_vectored_get {
self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx)
.await;
}
let key_value = vectored_res?.pop_first();
match key_value {
Some((got_key, value)) => {
if got_key != key {
error!(
"Expected {}, but singular vectored get returned {}",
key, got_key
);
Err(PageReconstructError::Other(anyhow!(
"Singular vectored get returned wrong key"
)))
} else {
value
}
}
None => {
error!(
"Expected {}, but singular vectored get returned nothing",
key
);
Err(PageReconstructError::Other(anyhow!(
"Singular vectored get did not return a value for {}",
key
)))
}
}
}
};
let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
if key.to_i128() % 100 < 10 {
ctx.report_stats();
}
ctx.reset_stats();
res
}
/// Not subject to [`Self::timeline_get_throttle`].
async fn get_impl(
&self,
key: Key,
lsn: Lsn,
mut reconstruct_state: ValueReconstructState,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
// XXX: structured stats collection for layer eviction here.
trace!(
"get page request for {}@{} from task kind {:?}",
key,
lsn,
ctx.task_kind()
);
let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME
.for_get_kind(crate::metrics::GetKind::Singular)
.start_timer();
let get_reconstruct_data_timer = Instant::now();
let path = self
.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
.await?;
ctx.read_path_stats
.inner
.add_get_reconstruct_data_time(get_reconstruct_data_timer.elapsed());
timer.stop_and_record();
let start = Instant::now();
let res = self.reconstruct_value(key, lsn, reconstruct_state).await;
let elapsed = start.elapsed();
crate::metrics::RECONSTRUCT_TIME
.for_result(&res)
.for_get_kind(crate::metrics::GetKind::Singular)
.observe(elapsed.as_secs_f64());
if cfg!(feature = "testing") && res.is_err() {
@@ -819,7 +918,9 @@ impl Timeline {
self.get_vectored_sequential_impl(keyspace, lsn, ctx).await
}
GetVectoredImpl::Vectored => {
let vectored_res = self.get_vectored_impl(keyspace.clone(), lsn, ctx).await;
let vectored_res = self
.get_vectored_impl(keyspace.clone(), lsn, ValuesReconstructState::new(), ctx)
.await;
if self.conf.validate_vectored_get {
self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx)
@@ -865,16 +966,38 @@ impl Timeline {
for range in keyspace.ranges {
let mut key = range.start;
while key != range.end {
let block = self.get_impl(key, lsn, ctx).await;
let block = self
.get_impl(key, lsn, ValueReconstructState::default(), ctx)
.await;
use PageReconstructError::*;
match block {
Err(Cancelled | AncestorStopping(_)) => {
return Err(GetVectoredError::Cancelled)
}
Err(Other(err)) if err.to_string().contains("could not find data for key") => {
Err(Other(err))
if err.to_string().contains("could not find data for key")
&& !NON_INHERITED_RANGE.contains(&key) =>
{
return Err(GetVectoredError::MissingKey(key))
}
Err(Other(err))
if err
.to_string()
.contains("downloading evicted layer file failed") =>
{
return Err(GetVectoredError::Other(err))
}
Err(Other(err))
if err
.chain()
.any(|cause| cause.to_string().contains("layer loading failed")) =>
{
// The intent here is to achieve error parity with the vectored read path.
// When vectored read fails to load a layer it fails the whole read, hence
// we mimic this behaviour here to keep the validation happy.
return Err(GetVectoredError::Other(err));
}
_ => {
values.insert(key, block);
key = key.next();
@@ -890,13 +1013,31 @@ impl Timeline {
&self,
keyspace: KeySpace,
lsn: Lsn,
mut reconstruct_state: ValuesReconstructState,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
let mut reconstruct_state = ValuesReconstructState::new();
use crate::metrics::GetKind;
let get_kind = if keyspace.total_size() == 1 {
GetKind::Singular
} else {
GetKind::Vectored
};
let get_data_timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME
.for_get_kind(get_kind)
.start_timer();
let timer = Instant::now();
self.get_vectored_reconstruct_data(keyspace, lsn, &mut reconstruct_state, ctx)
.await?;
ctx.read_path_stats
.inner
.add_get_reconstruct_data_time(timer.elapsed());
get_data_timer.stop_and_record();
let reconstruct_timer = crate::metrics::RECONSTRUCT_TIME
.for_get_kind(get_kind)
.start_timer();
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
for (key, res) in reconstruct_state.keys {
match res {
@@ -904,13 +1045,18 @@ impl Timeline {
results.insert(key, Err(err));
}
Ok(state) => {
let timer = Instant::now();
let state = ValueReconstructState::from(state);
ctx.read_path_stats
.inner
.add_sort_reconstruct_data_time(timer.elapsed());
let reconstruct_res = self.reconstruct_value(key, lsn, state).await;
results.insert(key, reconstruct_res);
}
}
}
reconstruct_timer.stop_and_record();
Ok(results)
}
@@ -946,10 +1092,10 @@ impl Timeline {
panic!(concat!("Sequential get failed with {}, but vectored get did not",
" - keyspace={:?} lsn={}"),
seq_err, keyspace, lsn) },
(Ok(_), Err(vec_err)) => {
(Ok(seq_ok), Err(vec_err)) => {
panic!(concat!("Vectored get failed with {}, but sequential get did not",
" - keyspace={:?} lsn={}"),
vec_err, keyspace, lsn) },
" - keyspace={:?} lsn={} seq_ok={:?}"),
vec_err, keyspace, lsn, seq_ok) },
(Err(seq_err), Err(vec_err)) => {
assert!(errors_match(seq_err, vec_err),
"Mismatched errors: {seq_err} != {vec_err} - keyspace={keyspace:?} lsn={lsn}")},
@@ -2823,6 +2969,7 @@ impl Timeline {
// Check the open and frozen in-memory layers first, in order from newest
// to oldest.
let in_mem_layer_find_timer = Instant::now();
if let Some(open_layer) = &layers.open_layer {
let start_lsn = open_layer.get_lsn_range().start;
if cont_lsn > start_lsn {
@@ -2834,6 +2981,10 @@ impl Timeline {
let open_layer = open_layer.clone();
drop(guard);
ctx.read_path_stats
.inner
.add_in_mem_layer_find_timer(in_mem_layer_find_timer.elapsed());
result = match open_layer
.get_value_reconstruct_data(
key,
@@ -2865,6 +3016,10 @@ impl Timeline {
let frozen_layer = frozen_layer.clone();
drop(guard);
ctx.read_path_stats
.inner
.add_in_mem_layer_find_timer(in_mem_layer_find_timer.elapsed());
result = match frozen_layer
.get_value_reconstruct_data(
key,
@@ -2888,7 +3043,12 @@ impl Timeline {
}
}
let layer_map_search_timer = Instant::now();
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
ctx.read_path_stats
.inner
.add_layer_map_search_time(layer_map_search_timer.elapsed());
let layer = guard.get_from_desc(&layer);
drop(guard);
@@ -2914,11 +3074,19 @@ impl Timeline {
));
continue 'outer;
} else if timeline.ancestor_timeline.is_some() {
ctx.read_path_stats
.inner
.add_layer_map_search_time(layer_map_search_timer.elapsed());
// Nothing on this timeline. Traverse to parent
result = ValueReconstructResult::Continue;
cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
continue 'outer;
} else {
ctx.read_path_stats
.inner
.add_layer_map_search_time(layer_map_search_timer.elapsed());
// Nothing found
result = ValueReconstructResult::Missing;
continue 'outer;
@@ -2964,6 +3132,29 @@ impl Timeline {
.await?;
keyspace.remove_overlapping_with(&completed);
// Do not descend into the ancestor timeline for aux files.
// We don't return a blanket [`GetVectoredError::MissingKey`] to avoid
// stalling compaction.
// TODO(chi): this will need to be updated for aux files v2 storage
if keyspace.overlaps(&NON_INHERITED_RANGE) {
let removed = keyspace.remove_overlapping_with(&KeySpace {
ranges: vec![NON_INHERITED_RANGE],
});
for range in removed.ranges {
let mut key = range.start;
while key < range.end {
reconstruct_state.on_key_error(
key,
// TODO: use PageReconstructError::Missing once #7393 merges
PageReconstructError::Other(anyhow!("Value for {} not found", key)),
);
key = key.next();
}
}
}
if keyspace.total_size() == 0 || timeline.ancestor_timeline.is_none() {
break;
}
@@ -3015,59 +3206,95 @@ impl Timeline {
return Err(GetVectoredError::Cancelled);
}
let keyspace_manip_timer = Instant::now();
let keys_done_last_step = reconstruct_state.consume_done_keys();
unmapped_keyspace.remove_overlapping_with(&keys_done_last_step);
completed_keyspace.merge(&keys_done_last_step);
let guard = timeline.layers.read().await;
let layers = guard.layer_map();
ctx.read_path_stats
.inner
.add_keyspace_manipulation_timer(keyspace_manip_timer.elapsed());
let in_memory_layer = layers.find_in_memory_layer(|l| {
let start_lsn = l.get_lsn_range().start;
cont_lsn > start_lsn
});
if unmapped_keyspace.start().is_some() {
let layer_search_timer = Instant::now();
match in_memory_layer {
Some(l) => {
let lsn_range = l.get_lsn_range().start..cont_lsn;
fringe.update(
ReadableLayer::InMemoryLayer(l),
unmapped_keyspace.clone(),
lsn_range,
);
}
None => {
for range in unmapped_keyspace.ranges.iter() {
let results = layers.range_search(range.clone(), cont_lsn);
let guard = timeline.layers.read().await;
let layers = guard.layer_map();
results
.found
.into_iter()
.map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| {
(
ReadableLayer::PersistentLayer(guard.get_from_desc(&layer)),
keyspace_accum.to_keyspace(),
lsn_floor..cont_lsn,
)
})
.for_each(|(layer, keyspace, lsn_range)| {
fringe.update(layer, keyspace, lsn_range)
});
let in_mem_layer_find_timer = Instant::now();
let in_memory_layer = layers.find_in_memory_layer(|l| {
let start_lsn = l.get_lsn_range().start;
cont_lsn > start_lsn
});
ctx.read_path_stats
.inner
.add_in_mem_layer_find_timer(in_mem_layer_find_timer.elapsed());
match in_memory_layer {
Some(l) => {
let lsn_range = l.get_lsn_range().start..cont_lsn;
let fringe_fondle_timer = Instant::now();
fringe.update(
ReadableLayer::InMemoryLayer(l),
unmapped_keyspace.clone(),
lsn_range,
);
ctx.read_path_stats
.inner
.add_fringe_fondle_time(fringe_fondle_timer.elapsed());
}
None => {
for range in unmapped_keyspace.ranges.iter() {
let layer_map_search_timer = Instant::now();
let results = layers.range_search(range.clone(), cont_lsn);
ctx.read_path_stats
.inner
.add_layer_map_search_time(layer_map_search_timer.elapsed());
let fringe_fondle_timer = Instant::now();
results
.found
.into_iter()
.map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| {
(
ReadableLayer::PersistentLayer(guard.get_from_desc(&layer)),
keyspace_accum.to_keyspace(),
lsn_floor..cont_lsn,
)
})
.for_each(|(layer, keyspace, lsn_range)| {
fringe.update(layer, keyspace, lsn_range)
});
ctx.read_path_stats
.inner
.add_fringe_fondle_time(fringe_fondle_timer.elapsed());
}
}
}
}
// It's safe to drop the layer map lock after planning the next round of reads.
// The fringe keeps readable handles for the layers which are safe to read even
// if layers were compacted or flushed.
//
// The more interesting consideration is: "Why is the read algorithm still correct
// if the layer map changes while it is operating?". Doing a vectored read on a
// timeline boils down to pushing an imaginary lsn boundary downwards for each range
// covered by the read. The layer map tells us how to move the lsn downwards for a
// range at *a particular point in time*. It is fine for the answer to be different
// at two different time points.
drop(guard);
// It's safe to drop the layer map lock after planning the next round of reads.
// The fringe keeps readable handles for the layers which are safe to read even
// if layers were compacted or flushed.
//
// The more interesting consideration is: "Why is the read algorithm still correct
// if the layer map changes while it is operating?". Doing a vectored read on a
// timeline boils down to pushing an imaginary lsn boundary downwards for each range
// covered by the read. The layer map tells us how to move the lsn downwards for a
// range at *a particular point in time*. It is fine for the answer to be different
// at two different time points.
drop(guard);
ctx.read_path_stats
.inner
.add_layer_search_time(layer_search_timer.elapsed());
}
if let Some((layer_to_read, keyspace_to_read, lsn_range)) = fringe.next_layer() {
let next_cont_lsn = lsn_range.start;
@@ -3842,6 +4069,15 @@ impl Timeline {
{
warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
ZERO_PAGE.clone()
} else if NON_INHERITED_RANGE.contains(&img_key)
&& matches!(err, PageReconstructError::Other(_))
{
// Non inherited keys (i.e. aux files) are special. We
// might fail to reconstruct the image since we are
// prohibited from visiting the ancestor timelines.
// We don't want to stall compaction, so we simply ignore
// these failures.
continue;
} else {
return Err(CreateImageLayersError::PageReconstructError(
err,

View File

@@ -507,6 +507,11 @@ class NeonEnvBuilder:
self.pageserver_get_vectored_impl = "vectored"
log.debug('Overriding pageserver get_vectored_impl config to "vectored"')
self.pageserver_get_impl: Optional[str] = None
if os.getenv("PAGESERVER_GET_IMPL", "") == "vectored":
self.pageserver_get_impl = "vectored"
log.debug('Overriding pageserver get_impl config to "vectored"')
assert test_name.startswith(
"test_"
), "Unexpectedly instantiated from outside a test function"
@@ -1078,6 +1083,8 @@ class NeonEnv:
ps_cfg["virtual_file_io_engine"] = self.pageserver_virtual_file_io_engine
if config.pageserver_get_vectored_impl is not None:
ps_cfg["get_vectored_impl"] = config.pageserver_get_vectored_impl
if config.pageserver_get_impl is not None:
ps_cfg["get_impl"] = config.pageserver_get_impl
# Create a corresponding NeonPageserver object
self.pageservers.append(

View File

@@ -17,11 +17,16 @@ from fixtures.types import TenantId, TimelineId
# Test restarting page server, while safekeeper and compute node keep
# running.
def test_local_corruption(neon_env_builder: NeonEnvBuilder):
if neon_env_builder.pageserver_get_impl == "vectored":
reconstruct_function_name = "get_values_reconstruct_data"
else:
reconstruct_function_name = "get_value_reconstruct_data"
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(
[
".*get_value_reconstruct_data for layer .*",
f".*{reconstruct_function_name} for layer .*",
".*could not find data for key.*",
".*is not active. Current state: Broken.*",
".*will not become active. Current state: Broken.*",
@@ -84,7 +89,7 @@ def test_local_corruption(neon_env_builder: NeonEnvBuilder):
# (We don't check layer file contents on startup, when loading the timeline)
#
# This will change when we implement checksums for layers
with pytest.raises(Exception, match="get_value_reconstruct_data for layer ") as err:
with pytest.raises(Exception, match=f"{reconstruct_function_name} for layer ") as err:
pg2.start()
log.info(
f"As expected, compute startup failed for timeline {tenant2}/{timeline2} with corrupt layers: {err}"

View File

@@ -226,6 +226,11 @@ def test_forward_compatibility(
)
try:
# Previous version neon_local and pageserver are not aware
# of the new config.
# TODO: remove this once the code reaches main
neon_env_builder.pageserver_get_impl = None
neon_env_builder.num_safekeepers = 3
neon_local_binpath = neon_env_builder.neon_binpath
env = neon_env_builder.from_repo_dir(

View File

@@ -1,4 +1,5 @@
import time
import re
from datetime import datetime, timedelta, timezone
from fixtures.log_helper import log
@@ -109,6 +110,11 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
# Test pageserver get_timestamp_of_lsn API
def test_ts_of_lsn_api(neon_env_builder: NeonEnvBuilder):
if neon_env_builder.pageserver_get_impl == "vectored":
key_not_found_error = r".*Requested key.*not found,*"
else:
key_not_found_error = r".*could not find data for key.*"
env = neon_env_builder.init_start()
new_timeline_id = env.neon_cli.create_branch("test_ts_of_lsn_api")
@@ -177,8 +183,8 @@ def test_ts_of_lsn_api(neon_env_builder: NeonEnvBuilder):
raise RuntimeError("there should have been an 'could not find data for key' error")
except PageserverApiException as error:
assert error.status_code == 500
assert str(error).startswith("could not find data for key")
env.pageserver.allowed_errors.append(".*could not find data for key.*")
assert re.match(key_not_found_error, str(error))
env.pageserver.allowed_errors.append(key_not_found_error)
# Probe a bunch of timestamps in the valid range
step_size = 100