mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-13 19:20:36 +00:00
Compare commits
27 Commits
release-55
...
vlad/coale
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
013e44f528 | ||
|
|
c719f902a1 | ||
|
|
bb2215ef0c | ||
|
|
294a2b2d6f | ||
|
|
7f8a6702fc | ||
|
|
76502b96bd | ||
|
|
a67f496b09 | ||
|
|
761f211f55 | ||
|
|
dc2c6367af | ||
|
|
a65f067799 | ||
|
|
f36561746e | ||
|
|
c121d3a3c7 | ||
|
|
76abb4e7a4 | ||
|
|
d2c806fa40 | ||
|
|
def9b17b15 | ||
|
|
0e8f9b82aa | ||
|
|
2cd9b3cab5 | ||
|
|
e489fd7a1d | ||
|
|
d45eca992f | ||
|
|
02f82ebd79 | ||
|
|
5a1dce5e3d | ||
|
|
7fa1915229 | ||
|
|
fc8714215c | ||
|
|
632613273f | ||
|
|
96eb1631d3 | ||
|
|
7f674c83e1 | ||
|
|
e5411ee556 |
1
.github/workflows/build_and_test.yml
vendored
1
.github/workflows/build_and_test.yml
vendored
@@ -477,6 +477,7 @@ jobs:
|
||||
BUILD_TAG: ${{ needs.tag.outputs.build-tag }}
|
||||
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
|
||||
PAGESERVER_GET_VECTORED_IMPL: vectored
|
||||
PAGESERVER_GET_IMPL: vectored
|
||||
|
||||
# Temporary disable this step until we figure out why it's so flaky
|
||||
# Ref https://github.com/neondatabase/neon/issues/4540
|
||||
|
||||
@@ -129,6 +129,7 @@ pub struct PageServerConf {
|
||||
|
||||
pub(crate) virtual_file_io_engine: Option<String>,
|
||||
pub(crate) get_vectored_impl: Option<String>,
|
||||
pub(crate) get_impl: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for PageServerConf {
|
||||
@@ -141,6 +142,7 @@ impl Default for PageServerConf {
|
||||
http_auth_type: AuthType::Trust,
|
||||
virtual_file_io_engine: None,
|
||||
get_vectored_impl: None,
|
||||
get_impl: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,6 +92,7 @@ impl PageServerNode {
|
||||
http_auth_type,
|
||||
virtual_file_io_engine,
|
||||
get_vectored_impl,
|
||||
get_impl,
|
||||
} = &self.conf;
|
||||
|
||||
let id = format!("id={}", id);
|
||||
@@ -111,6 +112,11 @@ impl PageServerNode {
|
||||
} else {
|
||||
String::new()
|
||||
};
|
||||
let get_impl = if let Some(get_impl) = get_impl {
|
||||
format!("get_impl='{get_impl}'")
|
||||
} else {
|
||||
String::new()
|
||||
};
|
||||
|
||||
let broker_endpoint_param = format!("broker_endpoint='{}'", self.env.broker.client_url());
|
||||
|
||||
@@ -124,6 +130,7 @@ impl PageServerNode {
|
||||
broker_endpoint_param,
|
||||
virtual_file_io_engine,
|
||||
get_vectored_impl,
|
||||
get_impl,
|
||||
];
|
||||
|
||||
if let Some(control_plane_api) = &self.env.control_plane_api {
|
||||
|
||||
@@ -48,11 +48,11 @@ impl Key {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn next(&self) -> Key {
|
||||
pub const fn next(&self) -> Key {
|
||||
self.add(1)
|
||||
}
|
||||
|
||||
pub fn add(&self, x: u32) -> Key {
|
||||
pub const fn add(&self, x: u32) -> Key {
|
||||
let mut key = *self;
|
||||
|
||||
let r = key.field6.overflowing_add(x);
|
||||
@@ -483,6 +483,8 @@ pub fn is_inherited_key(key: Key) -> bool {
|
||||
key != AUX_FILES_KEY
|
||||
}
|
||||
|
||||
pub const NON_INHERITED_RANGE: Range<Key> = AUX_FILES_KEY..AUX_FILES_KEY.next();
|
||||
|
||||
#[inline(always)]
|
||||
pub fn is_rel_fsm_block_key(key: Key) -> bool {
|
||||
key.field1 == 0x00 && key.field4 != 0 && key.field5 == FSM_FORKNUM && key.field6 != 0xffffffff
|
||||
|
||||
@@ -94,12 +94,13 @@ impl KeySpace {
|
||||
|
||||
/// Remove all keys in `other` from `self`.
|
||||
/// This can involve splitting or removing of existing ranges.
|
||||
pub fn remove_overlapping_with(&mut self, other: &KeySpace) {
|
||||
/// Returns the removed keyspace
|
||||
pub fn remove_overlapping_with(&mut self, other: &KeySpace) -> KeySpace {
|
||||
let (self_start, self_end) = match (self.start(), self.end()) {
|
||||
(Some(start), Some(end)) => (start, end),
|
||||
_ => {
|
||||
// self is empty
|
||||
return;
|
||||
return KeySpace::default();
|
||||
}
|
||||
};
|
||||
|
||||
@@ -112,30 +113,37 @@ impl KeySpace {
|
||||
.skip_while(|range| self_start >= range.end)
|
||||
.take_while(|range| self_end > range.start);
|
||||
|
||||
let mut removed_accum = KeySpaceRandomAccum::new();
|
||||
for range in other_ranges {
|
||||
while let Some(overlap_at) = self.overlaps_at(range) {
|
||||
let overlapped = self.ranges[overlap_at].clone();
|
||||
|
||||
if overlapped.start < range.start && overlapped.end <= range.end {
|
||||
// Higher part of the range is completely overlapped.
|
||||
removed_accum.add_range(range.start..self.ranges[overlap_at].end);
|
||||
self.ranges[overlap_at].end = range.start;
|
||||
}
|
||||
if overlapped.start >= range.start && overlapped.end > range.end {
|
||||
// Lower part of the range is completely overlapped.
|
||||
removed_accum.add_range(self.ranges[overlap_at].start..range.end);
|
||||
self.ranges[overlap_at].start = range.end;
|
||||
}
|
||||
if overlapped.start < range.start && overlapped.end > range.end {
|
||||
// Middle part of the range is overlapped.
|
||||
removed_accum.add_range(range.clone());
|
||||
self.ranges[overlap_at].end = range.start;
|
||||
self.ranges
|
||||
.insert(overlap_at + 1, range.end..overlapped.end);
|
||||
}
|
||||
if overlapped.start >= range.start && overlapped.end <= range.end {
|
||||
// Whole range is overlapped
|
||||
removed_accum.add_range(self.ranges[overlap_at].clone());
|
||||
self.ranges.remove(overlap_at);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
removed_accum.to_keyspace()
|
||||
}
|
||||
|
||||
pub fn start(&self) -> Option<Key> {
|
||||
@@ -170,6 +178,11 @@ impl KeySpace {
|
||||
pub fn overlaps(&self, range: &Range<Key>) -> bool {
|
||||
self.overlaps_at(range).is_some()
|
||||
}
|
||||
|
||||
/// Check if the keyspace contains a key
|
||||
pub fn contains(&self, key: &Key) -> bool {
|
||||
self.overlaps(&(*key..key.next()))
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
@@ -553,7 +566,16 @@ mod tests {
|
||||
Key::from_i128(11)..Key::from_i128(13),
|
||||
],
|
||||
};
|
||||
key_space1.remove_overlapping_with(&key_space2);
|
||||
let removed = key_space1.remove_overlapping_with(&key_space2);
|
||||
let removed_expected = KeySpace {
|
||||
ranges: vec![
|
||||
Key::from_i128(2)..Key::from_i128(3),
|
||||
Key::from_i128(6)..Key::from_i128(7),
|
||||
Key::from_i128(11)..Key::from_i128(12),
|
||||
],
|
||||
};
|
||||
assert_eq!(removed, removed_expected);
|
||||
|
||||
assert_eq!(
|
||||
key_space1.ranges,
|
||||
vec![
|
||||
@@ -583,7 +605,17 @@ mod tests {
|
||||
Key::from_i128(14)..Key::from_i128(17),
|
||||
],
|
||||
};
|
||||
key_space1.remove_overlapping_with(&key_space2);
|
||||
|
||||
let removed = key_space1.remove_overlapping_with(&key_space2);
|
||||
let removed_expected = KeySpace {
|
||||
ranges: vec![
|
||||
Key::from_i128(3)..Key::from_i128(5),
|
||||
Key::from_i128(8)..Key::from_i128(10),
|
||||
Key::from_i128(14)..Key::from_i128(15),
|
||||
],
|
||||
};
|
||||
assert_eq!(removed, removed_expected);
|
||||
|
||||
assert_eq!(
|
||||
key_space1.ranges,
|
||||
vec![
|
||||
@@ -610,7 +642,11 @@ mod tests {
|
||||
Key::from_i128(15)..Key::from_i128(17),
|
||||
],
|
||||
};
|
||||
key_space1.remove_overlapping_with(&key_space2);
|
||||
|
||||
let removed = key_space1.remove_overlapping_with(&key_space2);
|
||||
let removed_expected = KeySpace::default();
|
||||
assert_eq!(removed, removed_expected);
|
||||
|
||||
assert_eq!(
|
||||
key_space1.ranges,
|
||||
vec![
|
||||
@@ -637,7 +673,17 @@ mod tests {
|
||||
let key_space2 = KeySpace {
|
||||
ranges: vec![Key::from_i128(9)..Key::from_i128(19)],
|
||||
};
|
||||
key_space1.remove_overlapping_with(&key_space2);
|
||||
|
||||
let removed = key_space1.remove_overlapping_with(&key_space2);
|
||||
let removed_expected = KeySpace {
|
||||
ranges: vec![
|
||||
Key::from_i128(9)..Key::from_i128(10),
|
||||
Key::from_i128(12)..Key::from_i128(15),
|
||||
Key::from_i128(17)..Key::from_i128(19),
|
||||
],
|
||||
};
|
||||
assert_eq!(removed, removed_expected);
|
||||
|
||||
assert_eq!(
|
||||
key_space1.ranges,
|
||||
vec![
|
||||
|
||||
@@ -121,8 +121,10 @@ fn main() -> anyhow::Result<()> {
|
||||
&[("node_id", &conf.id.to_string())],
|
||||
);
|
||||
|
||||
// after setting up logging, log the effective IO engine choice
|
||||
// after setting up logging, log the effective IO engine choice and read path implementations
|
||||
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
|
||||
info!(?conf.get_impl, "starting with get page implementation");
|
||||
info!(?conf.get_vectored_impl, "starting with vectored get page implementation");
|
||||
|
||||
let tenants_path = conf.tenants_path();
|
||||
if !tenants_path.exists() {
|
||||
|
||||
@@ -30,9 +30,9 @@ use utils::{
|
||||
logging::LogFormat,
|
||||
};
|
||||
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
use crate::tenant::timeline::GetVectoredImpl;
|
||||
use crate::tenant::vectored_blob_io::MaxVectoredReadBytes;
|
||||
use crate::tenant::{config::TenantConfOpt, timeline::GetImpl};
|
||||
use crate::tenant::{
|
||||
TENANTS_SEGMENT_NAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
|
||||
};
|
||||
@@ -91,6 +91,8 @@ pub mod defaults {
|
||||
|
||||
pub const DEFAULT_GET_VECTORED_IMPL: &str = "sequential";
|
||||
|
||||
pub const DEFAULT_GET_IMPL: &str = "legacy";
|
||||
|
||||
pub const DEFAULT_MAX_VECTORED_READ_BYTES: usize = 128 * 1024; // 128 KiB
|
||||
|
||||
pub const DEFAULT_VALIDATE_VECTORED_GET: bool = true;
|
||||
@@ -138,6 +140,8 @@ pub mod defaults {
|
||||
|
||||
#get_vectored_impl = '{DEFAULT_GET_VECTORED_IMPL}'
|
||||
|
||||
#get_impl = '{DEFAULT_GET_IMPL}'
|
||||
|
||||
#max_vectored_read_bytes = '{DEFAULT_MAX_VECTORED_READ_BYTES}'
|
||||
|
||||
#validate_vectored_get = '{DEFAULT_VALIDATE_VECTORED_GET}'
|
||||
@@ -284,6 +288,8 @@ pub struct PageServerConf {
|
||||
|
||||
pub get_vectored_impl: GetVectoredImpl,
|
||||
|
||||
pub get_impl: GetImpl,
|
||||
|
||||
pub max_vectored_read_bytes: MaxVectoredReadBytes,
|
||||
|
||||
pub validate_vectored_get: bool,
|
||||
@@ -414,6 +420,8 @@ struct PageServerConfigBuilder {
|
||||
|
||||
get_vectored_impl: BuilderValue<GetVectoredImpl>,
|
||||
|
||||
get_impl: BuilderValue<GetImpl>,
|
||||
|
||||
max_vectored_read_bytes: BuilderValue<MaxVectoredReadBytes>,
|
||||
|
||||
validate_vectored_get: BuilderValue<bool>,
|
||||
@@ -503,6 +511,7 @@ impl PageServerConfigBuilder {
|
||||
virtual_file_io_engine: Set(DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap()),
|
||||
|
||||
get_vectored_impl: Set(DEFAULT_GET_VECTORED_IMPL.parse().unwrap()),
|
||||
get_impl: Set(DEFAULT_GET_IMPL.parse().unwrap()),
|
||||
max_vectored_read_bytes: Set(MaxVectoredReadBytes(
|
||||
NonZeroUsize::new(DEFAULT_MAX_VECTORED_READ_BYTES).unwrap(),
|
||||
)),
|
||||
@@ -681,6 +690,10 @@ impl PageServerConfigBuilder {
|
||||
self.get_vectored_impl = BuilderValue::Set(value);
|
||||
}
|
||||
|
||||
pub fn get_impl(&mut self, value: GetImpl) {
|
||||
self.get_impl = BuilderValue::Set(value);
|
||||
}
|
||||
|
||||
pub fn get_max_vectored_read_bytes(&mut self, value: MaxVectoredReadBytes) {
|
||||
self.max_vectored_read_bytes = BuilderValue::Set(value);
|
||||
}
|
||||
@@ -750,6 +763,7 @@ impl PageServerConfigBuilder {
|
||||
secondary_download_concurrency,
|
||||
ingest_batch_size,
|
||||
get_vectored_impl,
|
||||
get_impl,
|
||||
max_vectored_read_bytes,
|
||||
validate_vectored_get,
|
||||
ephemeral_bytes_per_memory_kb,
|
||||
@@ -1035,6 +1049,9 @@ impl PageServerConf {
|
||||
"get_vectored_impl" => {
|
||||
builder.get_vectored_impl(parse_toml_from_str("get_vectored_impl", item)?)
|
||||
}
|
||||
"get_impl" => {
|
||||
builder.get_impl(parse_toml_from_str("get_impl", item)?)
|
||||
}
|
||||
"max_vectored_read_bytes" => {
|
||||
let bytes = parse_toml_u64("max_vectored_read_bytes", item)? as usize;
|
||||
builder.get_max_vectored_read_bytes(
|
||||
@@ -1126,6 +1143,7 @@ impl PageServerConf {
|
||||
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
|
||||
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
|
||||
get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
|
||||
get_impl: defaults::DEFAULT_GET_IMPL.parse().unwrap(),
|
||||
max_vectored_read_bytes: MaxVectoredReadBytes(
|
||||
NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
|
||||
.expect("Invalid default constant"),
|
||||
@@ -1365,6 +1383,7 @@ background_task_maximum_delay = '334 s'
|
||||
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
|
||||
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
|
||||
get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
|
||||
get_impl: defaults::DEFAULT_GET_IMPL.parse().unwrap(),
|
||||
max_vectored_read_bytes: MaxVectoredReadBytes(
|
||||
NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
|
||||
.expect("Invalid default constant")
|
||||
@@ -1438,6 +1457,7 @@ background_task_maximum_delay = '334 s'
|
||||
ingest_batch_size: 100,
|
||||
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
|
||||
get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
|
||||
get_impl: defaults::DEFAULT_GET_IMPL.parse().unwrap(),
|
||||
max_vectored_read_bytes: MaxVectoredReadBytes(
|
||||
NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
|
||||
.expect("Invalid default constant")
|
||||
|
||||
@@ -86,6 +86,11 @@
|
||||
//! [`RequestContext`] argument. Functions in the middle of the call chain
|
||||
//! only need to pass it on.
|
||||
|
||||
use std::{
|
||||
sync::{atomic::AtomicU32, Arc},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use crate::task_mgr::TaskKind;
|
||||
|
||||
pub(crate) mod optional_counter;
|
||||
@@ -98,6 +103,156 @@ pub struct RequestContext {
|
||||
access_stats_behavior: AccessStatsBehavior,
|
||||
page_content_kind: PageContentKind,
|
||||
pub micros_spent_throttled: optional_counter::MicroSecondsCounterU32,
|
||||
pub read_path_stats: ReadPathStats,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct ReadPathStats {
|
||||
pub inner: Arc<ReadPathStatsInner>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ReadPathStatsInner {
|
||||
pub get_reconstruct_data_time: AtomicU32,
|
||||
pub plan_read_time: AtomicU32,
|
||||
pub read_time: AtomicU32,
|
||||
pub sort_reconstruct_data_time: AtomicU32,
|
||||
pub layer_search_time: AtomicU32,
|
||||
pub keyspace_manipulation_time: AtomicU32,
|
||||
pub in_mem_layer_find_time: AtomicU32,
|
||||
pub fringe_fondle_time: AtomicU32,
|
||||
pub layer_map_search_time: AtomicU32,
|
||||
pub buffer_cache_hits: AtomicU32,
|
||||
pub layers_visited: AtomicU32,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ReadPathStatsInner {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"get_reconstruct_data_time={} plan_read_time={} read_time={} sort_time={} layer_search_time={} keyspace_manipulation_time={} in_mem_layer_find_time={} fringe_fondle_time={} layer_map_search_time={} buffer_cache_hits={} layers_visited={}",
|
||||
self.get_reconstruct_data_time.load(std::sync::atomic::Ordering::Relaxed),
|
||||
self.plan_read_time
|
||||
.load(std::sync::atomic::Ordering::Relaxed),
|
||||
self.read_time.load(std::sync::atomic::Ordering::Relaxed),
|
||||
self.sort_reconstruct_data_time
|
||||
.load(std::sync::atomic::Ordering::Relaxed),
|
||||
self.layer_search_time
|
||||
.load(std::sync::atomic::Ordering::Relaxed),
|
||||
self.keyspace_manipulation_time
|
||||
.load(std::sync::atomic::Ordering::Relaxed),
|
||||
self.in_mem_layer_find_time
|
||||
.load(std::sync::atomic::Ordering::Relaxed),
|
||||
self.fringe_fondle_time
|
||||
.load(std::sync::atomic::Ordering::Relaxed),
|
||||
self.layer_map_search_time
|
||||
.load(std::sync::atomic::Ordering::Relaxed),
|
||||
self.buffer_cache_hits
|
||||
.load(std::sync::atomic::Ordering::Relaxed),
|
||||
self.layers_visited
|
||||
.load(std::sync::atomic::Ordering::Relaxed)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl ReadPathStatsInner {
|
||||
pub fn reset(&self) {
|
||||
self.get_reconstruct_data_time
|
||||
.store(0, std::sync::atomic::Ordering::Relaxed);
|
||||
self.plan_read_time
|
||||
.store(0, std::sync::atomic::Ordering::Relaxed);
|
||||
self.read_time
|
||||
.store(0, std::sync::atomic::Ordering::Relaxed);
|
||||
self.sort_reconstruct_data_time
|
||||
.store(0, std::sync::atomic::Ordering::Relaxed);
|
||||
self.layer_search_time
|
||||
.store(0, std::sync::atomic::Ordering::Relaxed);
|
||||
self.keyspace_manipulation_time
|
||||
.store(0, std::sync::atomic::Ordering::Relaxed);
|
||||
self.in_mem_layer_find_time
|
||||
.store(0, std::sync::atomic::Ordering::Relaxed);
|
||||
self.fringe_fondle_time
|
||||
.store(0, std::sync::atomic::Ordering::Relaxed);
|
||||
self.layer_map_search_time
|
||||
.store(0, std::sync::atomic::Ordering::Relaxed);
|
||||
self.buffer_cache_hits
|
||||
.store(0, std::sync::atomic::Ordering::Relaxed);
|
||||
self.layers_visited
|
||||
.store(0, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn add_get_reconstruct_data_time(&self, dur: Duration) {
|
||||
self.get_reconstruct_data_time.fetch_add(
|
||||
dur.as_micros().try_into().unwrap(),
|
||||
std::sync::atomic::Ordering::Relaxed,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn add_plan_read_time(&self, dur: Duration) {
|
||||
self.plan_read_time.fetch_add(
|
||||
dur.as_micros().try_into().unwrap(),
|
||||
std::sync::atomic::Ordering::Relaxed,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn add_read_time(&self, dur: Duration) {
|
||||
self.read_time.fetch_add(
|
||||
dur.as_micros().try_into().unwrap(),
|
||||
std::sync::atomic::Ordering::Relaxed,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn add_sort_reconstruct_data_time(&self, dur: Duration) {
|
||||
self.sort_reconstruct_data_time.fetch_add(
|
||||
dur.as_micros().try_into().unwrap(),
|
||||
std::sync::atomic::Ordering::Relaxed,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn add_layer_search_time(&self, dur: Duration) {
|
||||
self.layer_search_time.fetch_add(
|
||||
dur.as_micros().try_into().unwrap(),
|
||||
std::sync::atomic::Ordering::Relaxed,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn add_keyspace_manipulation_timer(&self, dur: Duration) {
|
||||
self.keyspace_manipulation_time.fetch_add(
|
||||
dur.as_micros().try_into().unwrap(),
|
||||
std::sync::atomic::Ordering::Relaxed,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn add_in_mem_layer_find_timer(&self, dur: Duration) {
|
||||
self.in_mem_layer_find_time.fetch_add(
|
||||
dur.as_micros().try_into().unwrap(),
|
||||
std::sync::atomic::Ordering::Relaxed,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn add_fringe_fondle_time(&self, dur: Duration) {
|
||||
self.fringe_fondle_time.fetch_add(
|
||||
dur.as_micros().try_into().unwrap(),
|
||||
std::sync::atomic::Ordering::Relaxed,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn add_layer_map_search_time(&self, dur: Duration) {
|
||||
self.layer_map_search_time.fetch_add(
|
||||
dur.as_micros().try_into().unwrap(),
|
||||
std::sync::atomic::Ordering::Relaxed,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn inc_layer_visited(&self) {
|
||||
self.layers_visited
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn inc_buffer_cache_hits(&self) {
|
||||
self.buffer_cache_hits
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
/// The kind of access to the page cache.
|
||||
@@ -154,6 +309,7 @@ impl RequestContextBuilder {
|
||||
access_stats_behavior: AccessStatsBehavior::Update,
|
||||
page_content_kind: PageContentKind::Unknown,
|
||||
micros_spent_throttled: Default::default(),
|
||||
read_path_stats: Default::default(),
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -168,6 +324,7 @@ impl RequestContextBuilder {
|
||||
access_stats_behavior: original.access_stats_behavior,
|
||||
page_content_kind: original.page_content_kind,
|
||||
micros_spent_throttled: Default::default(),
|
||||
read_path_stats: original.read_path_stats.clone(),
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -291,4 +448,12 @@ impl RequestContext {
|
||||
pub(crate) fn page_content_kind(&self) -> PageContentKind {
|
||||
self.page_content_kind
|
||||
}
|
||||
|
||||
pub fn report_stats(&self) {
|
||||
tracing::info!("Read path stats: {}", *self.read_path_stats.inner);
|
||||
}
|
||||
|
||||
pub fn reset_stats(&self) {
|
||||
self.read_path_stats.inner.reset();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,6 +72,7 @@ impl MicroSecondsCounterU32 {
|
||||
Err(_) => Err("add(): duration conversion error"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn close_and_checked_sub_from(&self, from: Duration) -> Result<Duration, &'static str> {
|
||||
let val = self.inner.close()?;
|
||||
let val = Duration::from_micros(val as u64);
|
||||
|
||||
@@ -96,31 +96,44 @@ pub(crate) static READ_NUM_FS_LAYERS: Lazy<Histogram> = Lazy::new(|| {
|
||||
});
|
||||
|
||||
// Metrics collected on operations on the storage repository.
|
||||
#[derive(
|
||||
Clone,
|
||||
Copy,
|
||||
enum_map::Enum,
|
||||
strum_macros::EnumString,
|
||||
strum_macros::Display,
|
||||
strum_macros::IntoStaticStr,
|
||||
)]
|
||||
pub(crate) enum GetKind {
|
||||
Singular,
|
||||
Vectored,
|
||||
}
|
||||
|
||||
pub(crate) struct ReconstructTimeMetrics {
|
||||
ok: Histogram,
|
||||
err: Histogram,
|
||||
singular: Histogram,
|
||||
vectored: Histogram,
|
||||
}
|
||||
|
||||
pub(crate) static RECONSTRUCT_TIME: Lazy<ReconstructTimeMetrics> = Lazy::new(|| {
|
||||
let inner = register_histogram_vec!(
|
||||
"pageserver_getpage_reconstruct_seconds",
|
||||
"Time spent in reconstruct_value (reconstruct a page from deltas)",
|
||||
&["result"],
|
||||
&["get_kind"],
|
||||
CRITICAL_OP_BUCKETS.into(),
|
||||
)
|
||||
.expect("failed to define a metric");
|
||||
|
||||
ReconstructTimeMetrics {
|
||||
ok: inner.get_metric_with_label_values(&["ok"]).unwrap(),
|
||||
err: inner.get_metric_with_label_values(&["err"]).unwrap(),
|
||||
singular: inner.with_label_values(&[GetKind::Singular.into()]),
|
||||
vectored: inner.with_label_values(&[GetKind::Vectored.into()]),
|
||||
}
|
||||
});
|
||||
|
||||
impl ReconstructTimeMetrics {
|
||||
pub(crate) fn for_result<T, E>(&self, result: &Result<T, E>) -> &Histogram {
|
||||
match result {
|
||||
Ok(_) => &self.ok,
|
||||
Err(_) => &self.err,
|
||||
pub(crate) fn for_get_kind(&self, get_kind: GetKind) -> &Histogram {
|
||||
match get_kind {
|
||||
GetKind::Singular => &self.singular,
|
||||
GetKind::Vectored => &self.vectored,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -133,13 +146,33 @@ pub(crate) static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy<IntCounter> = Lazy::n
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static GET_RECONSTRUCT_DATA_TIME: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
pub(crate) struct ReconstructDataTimeMetrics {
|
||||
singular: Histogram,
|
||||
vectored: Histogram,
|
||||
}
|
||||
|
||||
impl ReconstructDataTimeMetrics {
|
||||
pub(crate) fn for_get_kind(&self, get_kind: GetKind) -> &Histogram {
|
||||
match get_kind {
|
||||
GetKind::Singular => &self.singular,
|
||||
GetKind::Vectored => &self.vectored,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) static GET_RECONSTRUCT_DATA_TIME: Lazy<ReconstructDataTimeMetrics> = Lazy::new(|| {
|
||||
let inner = register_histogram_vec!(
|
||||
"pageserver_getpage_get_reconstruct_data_seconds",
|
||||
"Time spent in get_reconstruct_value_data",
|
||||
&["get_kind"],
|
||||
CRITICAL_OP_BUCKETS.into(),
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
.expect("failed to define a metric");
|
||||
|
||||
ReconstructDataTimeMetrics {
|
||||
singular: inner.with_label_values(&[GetKind::Singular.into()]),
|
||||
vectored: inner.with_label_values(&[GetKind::Vectored.into()]),
|
||||
}
|
||||
});
|
||||
|
||||
pub(crate) static MATERIALIZED_PAGE_CACHE_HIT: Lazy<IntCounter> = Lazy::new(|| {
|
||||
|
||||
@@ -3858,8 +3858,10 @@ mod tests {
|
||||
use crate::DEFAULT_PG_VERSION;
|
||||
use bytes::BytesMut;
|
||||
use hex_literal::hex;
|
||||
use pageserver_api::key::NON_INHERITED_RANGE;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use rand::{thread_rng, Rng};
|
||||
use tests::storage_layer::ValuesReconstructState;
|
||||
use tests::timeline::{GetVectoredError, ShutdownMode};
|
||||
|
||||
static TEST_KEY: Lazy<Key> =
|
||||
@@ -4648,7 +4650,9 @@ mod tests {
|
||||
for read in reads {
|
||||
info!("Doing vectored read on {:?}", read);
|
||||
|
||||
let vectored_res = tline.get_vectored_impl(read.clone(), reads_lsn, &ctx).await;
|
||||
let vectored_res = tline
|
||||
.get_vectored_impl(read.clone(), reads_lsn, ValuesReconstructState::new(), &ctx)
|
||||
.await;
|
||||
tline
|
||||
.validate_get_vectored_impl(&vectored_res, read, reads_lsn, &ctx)
|
||||
.await;
|
||||
@@ -4657,6 +4661,64 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_vectored_aux_files() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_get_vectored_aux_files")?;
|
||||
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
let tline = tline.raw_timeline().unwrap();
|
||||
|
||||
let mut modification = tline.begin_modification(Lsn(0x1000));
|
||||
modification.put_file("foo/bar1", b"content1", &ctx).await?;
|
||||
modification.set_lsn(Lsn(0x1008))?;
|
||||
modification.put_file("foo/bar2", b"content2", &ctx).await?;
|
||||
modification.commit(&ctx).await?;
|
||||
|
||||
let child_timeline_id = TimelineId::generate();
|
||||
tenant
|
||||
.branch_timeline_test(
|
||||
tline,
|
||||
child_timeline_id,
|
||||
Some(tline.get_last_record_lsn()),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let child_timeline = tenant
|
||||
.get_timeline(child_timeline_id, true)
|
||||
.expect("Should have the branched timeline");
|
||||
|
||||
let aux_keyspace = KeySpace {
|
||||
ranges: vec![NON_INHERITED_RANGE],
|
||||
};
|
||||
let read_lsn = child_timeline.get_last_record_lsn();
|
||||
|
||||
let vectored_res = child_timeline
|
||||
.get_vectored_impl(
|
||||
aux_keyspace.clone(),
|
||||
read_lsn,
|
||||
ValuesReconstructState::new(),
|
||||
&ctx,
|
||||
)
|
||||
.await;
|
||||
|
||||
child_timeline
|
||||
.validate_get_vectored_impl(&vectored_res, aux_keyspace, read_lsn, &ctx)
|
||||
.await;
|
||||
|
||||
let images = vectored_res?;
|
||||
let mut key = NON_INHERITED_RANGE.start;
|
||||
while key < NON_INHERITED_RANGE.end {
|
||||
assert!(matches!(images[&key], Err(PageReconstructError::Other(_))));
|
||||
key = key.next();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Test that vectored get handles layer gaps correctly
|
||||
// by advancing into the next ancestor timeline if required.
|
||||
//
|
||||
@@ -4785,7 +4847,12 @@ mod tests {
|
||||
ranges: vec![key_near_gap..gap_at_key.next(), key_near_end..current_key],
|
||||
};
|
||||
let results = child_timeline
|
||||
.get_vectored_impl(read.clone(), current_lsn, &ctx)
|
||||
.get_vectored_impl(
|
||||
read.clone(),
|
||||
current_lsn,
|
||||
ValuesReconstructState::new(),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
for (key, img_res) in results {
|
||||
@@ -4918,6 +4985,7 @@ mod tests {
|
||||
ranges: vec![child_gap_at_key..child_gap_at_key.next()],
|
||||
},
|
||||
query_lsn,
|
||||
ValuesReconstructState::new(),
|
||||
&ctx,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -203,7 +203,10 @@ impl<'a> FileBlockReader<'a> {
|
||||
format!("Failed to read immutable buf: {e:#}"),
|
||||
)
|
||||
})? {
|
||||
ReadBufResult::Found(guard) => Ok(guard.into()),
|
||||
ReadBufResult::Found(guard) => {
|
||||
ctx.read_path_stats.inner.inc_buffer_cache_hits();
|
||||
Ok(guard.into())
|
||||
}
|
||||
ReadBufResult::NotFound(write_guard) => {
|
||||
// Read the page from disk into the buffer
|
||||
let write_guard = self.fill_buffer(write_guard, blknum).await?;
|
||||
|
||||
@@ -138,6 +138,29 @@ impl ValuesReconstructState {
|
||||
}
|
||||
}
|
||||
|
||||
/// This function is called after reading a keyspace from a layer.
|
||||
/// It checks if the read path has now moved past the cached Lsn for any keys.
|
||||
///
|
||||
/// Implementation note: We intentionally iterate over the keys for which we've
|
||||
/// already collected some reconstruct data. This avoids scaling complexity with
|
||||
/// the size of the search space.
|
||||
pub(crate) fn on_lsn_advanced(&mut self, keyspace: &KeySpace, advanced_to: Lsn) {
|
||||
for (key, value) in self.keys.iter_mut() {
|
||||
if !keyspace.contains(key) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Ok(state) = value {
|
||||
if state.situation != ValueReconstructSituation::Complete
|
||||
&& state.get_cached_lsn() >= Some(advanced_to)
|
||||
{
|
||||
state.situation = ValueReconstructSituation::Complete;
|
||||
self.keys_done.add_key(*key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Update the state collected for a given key.
|
||||
/// Returns true if this was the last value needed for the key and false otherwise.
|
||||
///
|
||||
@@ -162,11 +185,18 @@ impl ValuesReconstructState {
|
||||
true
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let reached_cache =
|
||||
state.get_cached_lsn().map(|clsn| clsn + 1) == Some(lsn);
|
||||
debug_assert!(
|
||||
Some(lsn) > state.get_cached_lsn(),
|
||||
"Attempt to collect a record below cached LSN for walredo: {} < {}",
|
||||
lsn,
|
||||
state
|
||||
.get_cached_lsn()
|
||||
.expect("Assertion can only fire if a cached lsn is present")
|
||||
);
|
||||
|
||||
let will_init = rec.will_init();
|
||||
state.records.push((lsn, rec));
|
||||
will_init || reached_cache
|
||||
will_init
|
||||
}
|
||||
},
|
||||
};
|
||||
@@ -349,11 +379,28 @@ impl ReadableLayer {
|
||||
) -> Result<(), GetVectoredError> {
|
||||
match self {
|
||||
ReadableLayer::PersistentLayer(layer) => {
|
||||
if keyspace.total_size() == 1 {
|
||||
tracing::info!(
|
||||
"Vectored path for {}: {} @ {:?}",
|
||||
keyspace.start().unwrap(),
|
||||
layer.local_path(),
|
||||
lsn_range
|
||||
);
|
||||
}
|
||||
|
||||
layer
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
|
||||
.await
|
||||
}
|
||||
ReadableLayer::InMemoryLayer(layer) => {
|
||||
if keyspace.total_size() == 1 {
|
||||
tracing::info!(
|
||||
"Vectored path for {}: in mem layer @ {:?}",
|
||||
keyspace.start().unwrap(),
|
||||
lsn_range
|
||||
);
|
||||
}
|
||||
|
||||
layer
|
||||
.get_values_reconstruct_data(keyspace, lsn_range.end, reconstruct_state, ctx)
|
||||
.await
|
||||
|
||||
@@ -58,6 +58,7 @@ use std::io::SeekFrom;
|
||||
use std::ops::Range;
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use tokio::sync::OnceCell;
|
||||
use tracing::*;
|
||||
|
||||
@@ -217,6 +218,7 @@ pub struct DeltaLayerInner {
|
||||
// values copied from summary
|
||||
index_start_blk: u32,
|
||||
index_root_blk: u32,
|
||||
lsn_range: Range<Lsn>,
|
||||
|
||||
file: VirtualFile,
|
||||
file_id: FileId,
|
||||
@@ -742,6 +744,7 @@ impl DeltaLayerInner {
|
||||
file_id,
|
||||
index_start_blk: actual_summary.index_start_blk,
|
||||
index_root_blk: actual_summary.index_root_blk,
|
||||
lsn_range: actual_summary.lsn_range,
|
||||
max_vectored_read_bytes,
|
||||
}))
|
||||
}
|
||||
@@ -753,6 +756,9 @@ impl DeltaLayerInner {
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
ctx.read_path_stats.inner.inc_layer_visited();
|
||||
let plan_timer = Instant::now();
|
||||
|
||||
let mut need_image = true;
|
||||
// Scan the page versions backwards, starting from `lsn`.
|
||||
let block_reader = FileBlockReader::new(&self.file, self.file_id);
|
||||
@@ -788,6 +794,12 @@ impl DeltaLayerInner {
|
||||
)
|
||||
.await?;
|
||||
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_plan_read_time(plan_timer.elapsed());
|
||||
|
||||
let read_timer = Instant::now();
|
||||
|
||||
let ctx = &RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::DeltaLayerValue)
|
||||
.build();
|
||||
@@ -826,6 +838,10 @@ impl DeltaLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_read_time(read_timer.elapsed());
|
||||
|
||||
// If an older page image is needed to reconstruct the page, let the
|
||||
// caller know.
|
||||
if need_image {
|
||||
@@ -849,6 +865,9 @@ impl DeltaLayerInner {
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
ctx.read_path_stats.inner.inc_layer_visited();
|
||||
let plan_timer = Instant::now();
|
||||
|
||||
let block_reader = FileBlockReader::new(&self.file, self.file_id);
|
||||
let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
self.index_start_blk,
|
||||
@@ -866,7 +885,7 @@ impl DeltaLayerInner {
|
||||
let data_end_offset = self.index_start_offset();
|
||||
|
||||
let reads = Self::plan_reads(
|
||||
keyspace,
|
||||
&keyspace,
|
||||
lsn_range,
|
||||
data_end_offset,
|
||||
index_reader,
|
||||
@@ -877,14 +896,26 @@ impl DeltaLayerInner {
|
||||
.await
|
||||
.map_err(GetVectoredError::Other)?;
|
||||
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_plan_read_time(plan_timer.elapsed());
|
||||
|
||||
let read_timer = Instant::now();
|
||||
|
||||
self.do_reads_and_update_state(reads, reconstruct_state)
|
||||
.await;
|
||||
|
||||
reconstruct_state.on_lsn_advanced(&keyspace, self.lsn_range.start);
|
||||
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_read_time(read_timer.elapsed());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn plan_reads<Reader>(
|
||||
keyspace: KeySpace,
|
||||
keyspace: &KeySpace,
|
||||
lsn_range: Range<Lsn>,
|
||||
data_end_offset: u64,
|
||||
index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
|
||||
@@ -1532,7 +1563,7 @@ mod test {
|
||||
|
||||
// Plan and validate
|
||||
let vectored_reads = DeltaLayerInner::plan_reads(
|
||||
keyspace.clone(),
|
||||
&keyspace,
|
||||
lsn_range.clone(),
|
||||
disk_offset,
|
||||
reader,
|
||||
@@ -1784,7 +1815,7 @@ mod test {
|
||||
let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
|
||||
|
||||
let vectored_reads = DeltaLayerInner::plan_reads(
|
||||
keyspace.clone(),
|
||||
&keyspace,
|
||||
entries_meta.lsn_range.clone(),
|
||||
data_end_offset,
|
||||
index_reader,
|
||||
|
||||
@@ -55,6 +55,7 @@ use std::io::SeekFrom;
|
||||
use std::ops::Range;
|
||||
use std::os::unix::prelude::FileExt;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio_stream::StreamExt;
|
||||
use tracing::*;
|
||||
@@ -422,6 +423,9 @@ impl ImageLayerInner {
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
ctx.read_path_stats.inner.inc_layer_visited();
|
||||
let plan_timer = Instant::now();
|
||||
|
||||
let block_reader = FileBlockReader::new(&self.file, self.file_id);
|
||||
let tree_reader =
|
||||
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
|
||||
@@ -437,6 +441,12 @@ impl ImageLayerInner {
|
||||
)
|
||||
.await?
|
||||
{
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_plan_read_time(plan_timer.elapsed());
|
||||
|
||||
let read_timer = Instant::now();
|
||||
|
||||
let blob = block_reader
|
||||
.block_cursor()
|
||||
.read_blob(
|
||||
@@ -450,6 +460,11 @@ impl ImageLayerInner {
|
||||
let value = Bytes::from(blob);
|
||||
|
||||
reconstruct_state.img = Some((self.lsn, value));
|
||||
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_read_time(read_timer.elapsed());
|
||||
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
} else {
|
||||
Ok(ValueReconstructResult::Missing)
|
||||
@@ -464,14 +479,27 @@ impl ImageLayerInner {
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
ctx.read_path_stats.inner.inc_layer_visited();
|
||||
let plan_timer = Instant::now();
|
||||
|
||||
let reads = self
|
||||
.plan_reads(keyspace, ctx)
|
||||
.await
|
||||
.map_err(GetVectoredError::Other)?;
|
||||
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_plan_read_time(plan_timer.elapsed());
|
||||
|
||||
let read_timer = Instant::now();
|
||||
|
||||
self.do_reads_and_update_state(reads, reconstruct_state)
|
||||
.await;
|
||||
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_read_time(read_timer.elapsed());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -298,6 +298,9 @@ impl InMemoryLayer {
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
ctx.read_path_stats.inner.inc_layer_visited();
|
||||
let read_timer = Instant::now();
|
||||
|
||||
ensure!(lsn_range.start >= self.start_lsn);
|
||||
let mut need_image = true;
|
||||
|
||||
@@ -333,6 +336,10 @@ impl InMemoryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_read_time(read_timer.elapsed());
|
||||
|
||||
// release lock on 'inner'
|
||||
|
||||
// If an older page image is needed to reconstruct the page, let the
|
||||
@@ -355,6 +362,9 @@ impl InMemoryLayer {
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
ctx.read_path_stats.inner.inc_layer_visited();
|
||||
let plan_timer = Instant::now();
|
||||
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::InMemoryLayer)
|
||||
.build();
|
||||
@@ -394,6 +404,12 @@ impl InMemoryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_plan_read_time(plan_timer.elapsed());
|
||||
|
||||
let read_timer = Instant::now();
|
||||
|
||||
let keyspace_size = keyspace.total_size();
|
||||
|
||||
let mut completed_keys = HashSet::new();
|
||||
@@ -426,6 +442,12 @@ impl InMemoryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
reconstruct_state.on_lsn_advanced(&keyspace, self.start_lsn);
|
||||
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_read_time(read_timer.elapsed());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -336,6 +336,12 @@ impl Layer {
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, &self.0, ctx)
|
||||
.instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
GetVectoredError::Other(err) => GetVectoredError::Other(
|
||||
err.context(format!("get_values_reconstruct_data for layer {self}")),
|
||||
),
|
||||
err => err,
|
||||
})
|
||||
}
|
||||
|
||||
/// Download the layer if evicted.
|
||||
|
||||
@@ -16,7 +16,7 @@ use enumset::EnumSet;
|
||||
use fail::fail_point;
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::{
|
||||
key::AUX_FILES_KEY,
|
||||
key::{AUX_FILES_KEY, NON_INHERITED_RANGE},
|
||||
keyspace::KeySpaceAccum,
|
||||
models::{
|
||||
CompactionAlgorithm, DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
|
||||
@@ -119,8 +119,8 @@ use self::layer_manager::LayerManager;
|
||||
use self::logical_size::LogicalSize;
|
||||
use self::walreceiver::{WalReceiver, WalReceiverConf};
|
||||
|
||||
use super::config::TenantConf;
|
||||
use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
|
||||
use super::{config::TenantConf, storage_layer::VectoredValueReconstructState};
|
||||
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
|
||||
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
|
||||
use super::{remote_timeline_client::RemoteTimelineClient, storage_layer::ReadableLayer};
|
||||
@@ -586,6 +586,19 @@ impl From<GetVectoredError> for CreateImageLayersError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetVectoredError> for PageReconstructError {
|
||||
fn from(e: GetVectoredError) -> Self {
|
||||
match e {
|
||||
GetVectoredError::Cancelled => PageReconstructError::Cancelled,
|
||||
GetVectoredError::InvalidLsn(_) => PageReconstructError::Other(anyhow!("Invalid LSN")),
|
||||
err @ GetVectoredError::Oversized(_) => PageReconstructError::Other(err.into()),
|
||||
err @ GetVectoredError::MissingKey(_) => PageReconstructError::Other(err.into()),
|
||||
err @ GetVectoredError::GetReadyAncestorError(_) => PageReconstructError::from(err),
|
||||
GetVectoredError::Other(err) => PageReconstructError::Other(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetReadyAncestorError> for PageReconstructError {
|
||||
fn from(e: GetReadyAncestorError) -> Self {
|
||||
use GetReadyAncestorError::*;
|
||||
@@ -615,6 +628,23 @@ pub enum GetVectoredImpl {
|
||||
Vectored,
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Eq,
|
||||
PartialEq,
|
||||
Debug,
|
||||
Copy,
|
||||
Clone,
|
||||
strum_macros::EnumString,
|
||||
strum_macros::Display,
|
||||
serde_with::DeserializeFromStr,
|
||||
serde_with::SerializeDisplay,
|
||||
)]
|
||||
#[strum(serialize_all = "kebab-case")]
|
||||
pub enum GetImpl {
|
||||
Legacy,
|
||||
Vectored,
|
||||
}
|
||||
|
||||
pub(crate) enum WaitLsnWaiter<'a> {
|
||||
Timeline(&'a Timeline),
|
||||
Tenant,
|
||||
@@ -676,16 +706,6 @@ impl Timeline {
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
self.timeline_get_throttle.throttle(ctx, 1).await;
|
||||
self.get_impl(key, lsn, ctx).await
|
||||
}
|
||||
/// Not subject to [`Self::timeline_get_throttle`].
|
||||
async fn get_impl(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
if !lsn.is_valid() {
|
||||
return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
|
||||
@@ -696,13 +716,7 @@ impl Timeline {
|
||||
// page_service.
|
||||
debug_assert!(!self.shard_identity.is_key_disposable(&key));
|
||||
|
||||
// XXX: structured stats collection for layer eviction here.
|
||||
trace!(
|
||||
"get page request for {}@{} from task kind {:?}",
|
||||
key,
|
||||
lsn,
|
||||
ctx.task_kind()
|
||||
);
|
||||
self.timeline_get_throttle.throttle(ctx, 1).await;
|
||||
|
||||
// Check the page cache. We will get back the most recent page with lsn <= `lsn`.
|
||||
// The cached image can be returned directly if there is no WAL between the cached image
|
||||
@@ -725,22 +739,107 @@ impl Timeline {
|
||||
None => None,
|
||||
};
|
||||
|
||||
let mut reconstruct_state = ValueReconstructState {
|
||||
records: Vec::new(),
|
||||
img: cached_page_img,
|
||||
let res = match self.conf.get_impl {
|
||||
GetImpl::Legacy => {
|
||||
let reconstruct_state = ValueReconstructState {
|
||||
records: Vec::new(),
|
||||
img: cached_page_img,
|
||||
};
|
||||
|
||||
self.get_impl(key, lsn, reconstruct_state, ctx).await
|
||||
}
|
||||
GetImpl::Vectored => {
|
||||
let keyspace = KeySpace {
|
||||
ranges: vec![key..key.next()],
|
||||
};
|
||||
|
||||
// Initialise the reconstruct state for the key with the cache
|
||||
// entry returned above.
|
||||
let mut reconstruct_state = ValuesReconstructState::new();
|
||||
let mut key_state = VectoredValueReconstructState::default();
|
||||
key_state.img = cached_page_img;
|
||||
reconstruct_state.keys.insert(key, Ok(key_state));
|
||||
|
||||
let vectored_res = self
|
||||
.get_vectored_impl(keyspace.clone(), lsn, reconstruct_state, ctx)
|
||||
.await;
|
||||
|
||||
if self.conf.validate_vectored_get {
|
||||
self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx)
|
||||
.await;
|
||||
}
|
||||
|
||||
let key_value = vectored_res?.pop_first();
|
||||
match key_value {
|
||||
Some((got_key, value)) => {
|
||||
if got_key != key {
|
||||
error!(
|
||||
"Expected {}, but singular vectored get returned {}",
|
||||
key, got_key
|
||||
);
|
||||
Err(PageReconstructError::Other(anyhow!(
|
||||
"Singular vectored get returned wrong key"
|
||||
)))
|
||||
} else {
|
||||
value
|
||||
}
|
||||
}
|
||||
None => {
|
||||
error!(
|
||||
"Expected {}, but singular vectored get returned nothing",
|
||||
key
|
||||
);
|
||||
Err(PageReconstructError::Other(anyhow!(
|
||||
"Singular vectored get did not return a value for {}",
|
||||
key
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
|
||||
if key.to_i128() % 100 < 10 {
|
||||
ctx.report_stats();
|
||||
}
|
||||
|
||||
ctx.reset_stats();
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
/// Not subject to [`Self::timeline_get_throttle`].
|
||||
async fn get_impl(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
mut reconstruct_state: ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
// XXX: structured stats collection for layer eviction here.
|
||||
trace!(
|
||||
"get page request for {}@{} from task kind {:?}",
|
||||
key,
|
||||
lsn,
|
||||
ctx.task_kind()
|
||||
);
|
||||
|
||||
let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME
|
||||
.for_get_kind(crate::metrics::GetKind::Singular)
|
||||
.start_timer();
|
||||
let get_reconstruct_data_timer = Instant::now();
|
||||
let path = self
|
||||
.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
|
||||
.await?;
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_get_reconstruct_data_time(get_reconstruct_data_timer.elapsed());
|
||||
timer.stop_and_record();
|
||||
|
||||
let start = Instant::now();
|
||||
let res = self.reconstruct_value(key, lsn, reconstruct_state).await;
|
||||
let elapsed = start.elapsed();
|
||||
crate::metrics::RECONSTRUCT_TIME
|
||||
.for_result(&res)
|
||||
.for_get_kind(crate::metrics::GetKind::Singular)
|
||||
.observe(elapsed.as_secs_f64());
|
||||
|
||||
if cfg!(feature = "testing") && res.is_err() {
|
||||
@@ -819,7 +918,9 @@ impl Timeline {
|
||||
self.get_vectored_sequential_impl(keyspace, lsn, ctx).await
|
||||
}
|
||||
GetVectoredImpl::Vectored => {
|
||||
let vectored_res = self.get_vectored_impl(keyspace.clone(), lsn, ctx).await;
|
||||
let vectored_res = self
|
||||
.get_vectored_impl(keyspace.clone(), lsn, ValuesReconstructState::new(), ctx)
|
||||
.await;
|
||||
|
||||
if self.conf.validate_vectored_get {
|
||||
self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx)
|
||||
@@ -865,16 +966,38 @@ impl Timeline {
|
||||
for range in keyspace.ranges {
|
||||
let mut key = range.start;
|
||||
while key != range.end {
|
||||
let block = self.get_impl(key, lsn, ctx).await;
|
||||
let block = self
|
||||
.get_impl(key, lsn, ValueReconstructState::default(), ctx)
|
||||
.await;
|
||||
|
||||
use PageReconstructError::*;
|
||||
match block {
|
||||
Err(Cancelled | AncestorStopping(_)) => {
|
||||
return Err(GetVectoredError::Cancelled)
|
||||
}
|
||||
Err(Other(err)) if err.to_string().contains("could not find data for key") => {
|
||||
Err(Other(err))
|
||||
if err.to_string().contains("could not find data for key")
|
||||
&& !NON_INHERITED_RANGE.contains(&key) =>
|
||||
{
|
||||
return Err(GetVectoredError::MissingKey(key))
|
||||
}
|
||||
Err(Other(err))
|
||||
if err
|
||||
.to_string()
|
||||
.contains("downloading evicted layer file failed") =>
|
||||
{
|
||||
return Err(GetVectoredError::Other(err))
|
||||
}
|
||||
Err(Other(err))
|
||||
if err
|
||||
.chain()
|
||||
.any(|cause| cause.to_string().contains("layer loading failed")) =>
|
||||
{
|
||||
// The intent here is to achieve error parity with the vectored read path.
|
||||
// When vectored read fails to load a layer it fails the whole read, hence
|
||||
// we mimic this behaviour here to keep the validation happy.
|
||||
return Err(GetVectoredError::Other(err));
|
||||
}
|
||||
_ => {
|
||||
values.insert(key, block);
|
||||
key = key.next();
|
||||
@@ -890,13 +1013,31 @@ impl Timeline {
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
lsn: Lsn,
|
||||
mut reconstruct_state: ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
|
||||
let mut reconstruct_state = ValuesReconstructState::new();
|
||||
use crate::metrics::GetKind;
|
||||
|
||||
let get_kind = if keyspace.total_size() == 1 {
|
||||
GetKind::Singular
|
||||
} else {
|
||||
GetKind::Vectored
|
||||
};
|
||||
|
||||
let get_data_timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME
|
||||
.for_get_kind(get_kind)
|
||||
.start_timer();
|
||||
let timer = Instant::now();
|
||||
self.get_vectored_reconstruct_data(keyspace, lsn, &mut reconstruct_state, ctx)
|
||||
.await?;
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_get_reconstruct_data_time(timer.elapsed());
|
||||
get_data_timer.stop_and_record();
|
||||
|
||||
let reconstruct_timer = crate::metrics::RECONSTRUCT_TIME
|
||||
.for_get_kind(get_kind)
|
||||
.start_timer();
|
||||
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
|
||||
for (key, res) in reconstruct_state.keys {
|
||||
match res {
|
||||
@@ -904,13 +1045,18 @@ impl Timeline {
|
||||
results.insert(key, Err(err));
|
||||
}
|
||||
Ok(state) => {
|
||||
let timer = Instant::now();
|
||||
let state = ValueReconstructState::from(state);
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_sort_reconstruct_data_time(timer.elapsed());
|
||||
|
||||
let reconstruct_res = self.reconstruct_value(key, lsn, state).await;
|
||||
results.insert(key, reconstruct_res);
|
||||
}
|
||||
}
|
||||
}
|
||||
reconstruct_timer.stop_and_record();
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
@@ -946,10 +1092,10 @@ impl Timeline {
|
||||
panic!(concat!("Sequential get failed with {}, but vectored get did not",
|
||||
" - keyspace={:?} lsn={}"),
|
||||
seq_err, keyspace, lsn) },
|
||||
(Ok(_), Err(vec_err)) => {
|
||||
(Ok(seq_ok), Err(vec_err)) => {
|
||||
panic!(concat!("Vectored get failed with {}, but sequential get did not",
|
||||
" - keyspace={:?} lsn={}"),
|
||||
vec_err, keyspace, lsn) },
|
||||
" - keyspace={:?} lsn={} seq_ok={:?}"),
|
||||
vec_err, keyspace, lsn, seq_ok) },
|
||||
(Err(seq_err), Err(vec_err)) => {
|
||||
assert!(errors_match(seq_err, vec_err),
|
||||
"Mismatched errors: {seq_err} != {vec_err} - keyspace={keyspace:?} lsn={lsn}")},
|
||||
@@ -2823,6 +2969,7 @@ impl Timeline {
|
||||
|
||||
// Check the open and frozen in-memory layers first, in order from newest
|
||||
// to oldest.
|
||||
let in_mem_layer_find_timer = Instant::now();
|
||||
if let Some(open_layer) = &layers.open_layer {
|
||||
let start_lsn = open_layer.get_lsn_range().start;
|
||||
if cont_lsn > start_lsn {
|
||||
@@ -2834,6 +2981,10 @@ impl Timeline {
|
||||
let open_layer = open_layer.clone();
|
||||
drop(guard);
|
||||
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_in_mem_layer_find_timer(in_mem_layer_find_timer.elapsed());
|
||||
|
||||
result = match open_layer
|
||||
.get_value_reconstruct_data(
|
||||
key,
|
||||
@@ -2865,6 +3016,10 @@ impl Timeline {
|
||||
let frozen_layer = frozen_layer.clone();
|
||||
drop(guard);
|
||||
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_in_mem_layer_find_timer(in_mem_layer_find_timer.elapsed());
|
||||
|
||||
result = match frozen_layer
|
||||
.get_value_reconstruct_data(
|
||||
key,
|
||||
@@ -2888,7 +3043,12 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
let layer_map_search_timer = Instant::now();
|
||||
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_layer_map_search_time(layer_map_search_timer.elapsed());
|
||||
|
||||
let layer = guard.get_from_desc(&layer);
|
||||
drop(guard);
|
||||
|
||||
@@ -2914,11 +3074,19 @@ impl Timeline {
|
||||
));
|
||||
continue 'outer;
|
||||
} else if timeline.ancestor_timeline.is_some() {
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_layer_map_search_time(layer_map_search_timer.elapsed());
|
||||
|
||||
// Nothing on this timeline. Traverse to parent
|
||||
result = ValueReconstructResult::Continue;
|
||||
cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
|
||||
continue 'outer;
|
||||
} else {
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_layer_map_search_time(layer_map_search_timer.elapsed());
|
||||
|
||||
// Nothing found
|
||||
result = ValueReconstructResult::Missing;
|
||||
continue 'outer;
|
||||
@@ -2964,6 +3132,29 @@ impl Timeline {
|
||||
.await?;
|
||||
|
||||
keyspace.remove_overlapping_with(&completed);
|
||||
|
||||
// Do not descend into the ancestor timeline for aux files.
|
||||
// We don't return a blanket [`GetVectoredError::MissingKey`] to avoid
|
||||
// stalling compaction.
|
||||
// TODO(chi): this will need to be updated for aux files v2 storage
|
||||
if keyspace.overlaps(&NON_INHERITED_RANGE) {
|
||||
let removed = keyspace.remove_overlapping_with(&KeySpace {
|
||||
ranges: vec![NON_INHERITED_RANGE],
|
||||
});
|
||||
|
||||
for range in removed.ranges {
|
||||
let mut key = range.start;
|
||||
while key < range.end {
|
||||
reconstruct_state.on_key_error(
|
||||
key,
|
||||
// TODO: use PageReconstructError::Missing once #7393 merges
|
||||
PageReconstructError::Other(anyhow!("Value for {} not found", key)),
|
||||
);
|
||||
key = key.next();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if keyspace.total_size() == 0 || timeline.ancestor_timeline.is_none() {
|
||||
break;
|
||||
}
|
||||
@@ -3015,59 +3206,95 @@ impl Timeline {
|
||||
return Err(GetVectoredError::Cancelled);
|
||||
}
|
||||
|
||||
let keyspace_manip_timer = Instant::now();
|
||||
|
||||
let keys_done_last_step = reconstruct_state.consume_done_keys();
|
||||
unmapped_keyspace.remove_overlapping_with(&keys_done_last_step);
|
||||
completed_keyspace.merge(&keys_done_last_step);
|
||||
|
||||
let guard = timeline.layers.read().await;
|
||||
let layers = guard.layer_map();
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_keyspace_manipulation_timer(keyspace_manip_timer.elapsed());
|
||||
|
||||
let in_memory_layer = layers.find_in_memory_layer(|l| {
|
||||
let start_lsn = l.get_lsn_range().start;
|
||||
cont_lsn > start_lsn
|
||||
});
|
||||
if unmapped_keyspace.start().is_some() {
|
||||
let layer_search_timer = Instant::now();
|
||||
|
||||
match in_memory_layer {
|
||||
Some(l) => {
|
||||
let lsn_range = l.get_lsn_range().start..cont_lsn;
|
||||
fringe.update(
|
||||
ReadableLayer::InMemoryLayer(l),
|
||||
unmapped_keyspace.clone(),
|
||||
lsn_range,
|
||||
);
|
||||
}
|
||||
None => {
|
||||
for range in unmapped_keyspace.ranges.iter() {
|
||||
let results = layers.range_search(range.clone(), cont_lsn);
|
||||
let guard = timeline.layers.read().await;
|
||||
let layers = guard.layer_map();
|
||||
|
||||
results
|
||||
.found
|
||||
.into_iter()
|
||||
.map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| {
|
||||
(
|
||||
ReadableLayer::PersistentLayer(guard.get_from_desc(&layer)),
|
||||
keyspace_accum.to_keyspace(),
|
||||
lsn_floor..cont_lsn,
|
||||
)
|
||||
})
|
||||
.for_each(|(layer, keyspace, lsn_range)| {
|
||||
fringe.update(layer, keyspace, lsn_range)
|
||||
});
|
||||
let in_mem_layer_find_timer = Instant::now();
|
||||
let in_memory_layer = layers.find_in_memory_layer(|l| {
|
||||
let start_lsn = l.get_lsn_range().start;
|
||||
cont_lsn > start_lsn
|
||||
});
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_in_mem_layer_find_timer(in_mem_layer_find_timer.elapsed());
|
||||
|
||||
match in_memory_layer {
|
||||
Some(l) => {
|
||||
let lsn_range = l.get_lsn_range().start..cont_lsn;
|
||||
|
||||
let fringe_fondle_timer = Instant::now();
|
||||
|
||||
fringe.update(
|
||||
ReadableLayer::InMemoryLayer(l),
|
||||
unmapped_keyspace.clone(),
|
||||
lsn_range,
|
||||
);
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_fringe_fondle_time(fringe_fondle_timer.elapsed());
|
||||
}
|
||||
None => {
|
||||
for range in unmapped_keyspace.ranges.iter() {
|
||||
let layer_map_search_timer = Instant::now();
|
||||
|
||||
let results = layers.range_search(range.clone(), cont_lsn);
|
||||
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_layer_map_search_time(layer_map_search_timer.elapsed());
|
||||
|
||||
let fringe_fondle_timer = Instant::now();
|
||||
|
||||
results
|
||||
.found
|
||||
.into_iter()
|
||||
.map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| {
|
||||
(
|
||||
ReadableLayer::PersistentLayer(guard.get_from_desc(&layer)),
|
||||
keyspace_accum.to_keyspace(),
|
||||
lsn_floor..cont_lsn,
|
||||
)
|
||||
})
|
||||
.for_each(|(layer, keyspace, lsn_range)| {
|
||||
fringe.update(layer, keyspace, lsn_range)
|
||||
});
|
||||
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_fringe_fondle_time(fringe_fondle_timer.elapsed());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// It's safe to drop the layer map lock after planning the next round of reads.
|
||||
// The fringe keeps readable handles for the layers which are safe to read even
|
||||
// if layers were compacted or flushed.
|
||||
//
|
||||
// The more interesting consideration is: "Why is the read algorithm still correct
|
||||
// if the layer map changes while it is operating?". Doing a vectored read on a
|
||||
// timeline boils down to pushing an imaginary lsn boundary downwards for each range
|
||||
// covered by the read. The layer map tells us how to move the lsn downwards for a
|
||||
// range at *a particular point in time*. It is fine for the answer to be different
|
||||
// at two different time points.
|
||||
drop(guard);
|
||||
// It's safe to drop the layer map lock after planning the next round of reads.
|
||||
// The fringe keeps readable handles for the layers which are safe to read even
|
||||
// if layers were compacted or flushed.
|
||||
//
|
||||
// The more interesting consideration is: "Why is the read algorithm still correct
|
||||
// if the layer map changes while it is operating?". Doing a vectored read on a
|
||||
// timeline boils down to pushing an imaginary lsn boundary downwards for each range
|
||||
// covered by the read. The layer map tells us how to move the lsn downwards for a
|
||||
// range at *a particular point in time*. It is fine for the answer to be different
|
||||
// at two different time points.
|
||||
drop(guard);
|
||||
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_layer_search_time(layer_search_timer.elapsed());
|
||||
}
|
||||
|
||||
if let Some((layer_to_read, keyspace_to_read, lsn_range)) = fringe.next_layer() {
|
||||
let next_cont_lsn = lsn_range.start;
|
||||
@@ -3842,6 +4069,15 @@ impl Timeline {
|
||||
{
|
||||
warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
|
||||
ZERO_PAGE.clone()
|
||||
} else if NON_INHERITED_RANGE.contains(&img_key)
|
||||
&& matches!(err, PageReconstructError::Other(_))
|
||||
{
|
||||
// Non inherited keys (i.e. aux files) are special. We
|
||||
// might fail to reconstruct the image since we are
|
||||
// prohibited from visiting the ancestor timelines.
|
||||
// We don't want to stall compaction, so we simply ignore
|
||||
// these failures.
|
||||
continue;
|
||||
} else {
|
||||
return Err(CreateImageLayersError::PageReconstructError(
|
||||
err,
|
||||
|
||||
@@ -507,6 +507,11 @@ class NeonEnvBuilder:
|
||||
self.pageserver_get_vectored_impl = "vectored"
|
||||
log.debug('Overriding pageserver get_vectored_impl config to "vectored"')
|
||||
|
||||
self.pageserver_get_impl: Optional[str] = None
|
||||
if os.getenv("PAGESERVER_GET_IMPL", "") == "vectored":
|
||||
self.pageserver_get_impl = "vectored"
|
||||
log.debug('Overriding pageserver get_impl config to "vectored"')
|
||||
|
||||
assert test_name.startswith(
|
||||
"test_"
|
||||
), "Unexpectedly instantiated from outside a test function"
|
||||
@@ -1078,6 +1083,8 @@ class NeonEnv:
|
||||
ps_cfg["virtual_file_io_engine"] = self.pageserver_virtual_file_io_engine
|
||||
if config.pageserver_get_vectored_impl is not None:
|
||||
ps_cfg["get_vectored_impl"] = config.pageserver_get_vectored_impl
|
||||
if config.pageserver_get_impl is not None:
|
||||
ps_cfg["get_impl"] = config.pageserver_get_impl
|
||||
|
||||
# Create a corresponding NeonPageserver object
|
||||
self.pageservers.append(
|
||||
|
||||
@@ -17,11 +17,16 @@ from fixtures.types import TenantId, TimelineId
|
||||
# Test restarting page server, while safekeeper and compute node keep
|
||||
# running.
|
||||
def test_local_corruption(neon_env_builder: NeonEnvBuilder):
|
||||
if neon_env_builder.pageserver_get_impl == "vectored":
|
||||
reconstruct_function_name = "get_values_reconstruct_data"
|
||||
else:
|
||||
reconstruct_function_name = "get_value_reconstruct_data"
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*get_value_reconstruct_data for layer .*",
|
||||
f".*{reconstruct_function_name} for layer .*",
|
||||
".*could not find data for key.*",
|
||||
".*is not active. Current state: Broken.*",
|
||||
".*will not become active. Current state: Broken.*",
|
||||
@@ -84,7 +89,7 @@ def test_local_corruption(neon_env_builder: NeonEnvBuilder):
|
||||
# (We don't check layer file contents on startup, when loading the timeline)
|
||||
#
|
||||
# This will change when we implement checksums for layers
|
||||
with pytest.raises(Exception, match="get_value_reconstruct_data for layer ") as err:
|
||||
with pytest.raises(Exception, match=f"{reconstruct_function_name} for layer ") as err:
|
||||
pg2.start()
|
||||
log.info(
|
||||
f"As expected, compute startup failed for timeline {tenant2}/{timeline2} with corrupt layers: {err}"
|
||||
|
||||
@@ -226,6 +226,11 @@ def test_forward_compatibility(
|
||||
)
|
||||
|
||||
try:
|
||||
# Previous version neon_local and pageserver are not aware
|
||||
# of the new config.
|
||||
# TODO: remove this once the code reaches main
|
||||
neon_env_builder.pageserver_get_impl = None
|
||||
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
neon_local_binpath = neon_env_builder.neon_binpath
|
||||
env = neon_env_builder.from_repo_dir(
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import time
|
||||
import re
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
from fixtures.log_helper import log
|
||||
@@ -109,6 +110,11 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# Test pageserver get_timestamp_of_lsn API
|
||||
def test_ts_of_lsn_api(neon_env_builder: NeonEnvBuilder):
|
||||
if neon_env_builder.pageserver_get_impl == "vectored":
|
||||
key_not_found_error = r".*Requested key.*not found,*"
|
||||
else:
|
||||
key_not_found_error = r".*could not find data for key.*"
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
new_timeline_id = env.neon_cli.create_branch("test_ts_of_lsn_api")
|
||||
@@ -177,8 +183,8 @@ def test_ts_of_lsn_api(neon_env_builder: NeonEnvBuilder):
|
||||
raise RuntimeError("there should have been an 'could not find data for key' error")
|
||||
except PageserverApiException as error:
|
||||
assert error.status_code == 500
|
||||
assert str(error).startswith("could not find data for key")
|
||||
env.pageserver.allowed_errors.append(".*could not find data for key.*")
|
||||
assert re.match(key_not_found_error, str(error))
|
||||
env.pageserver.allowed_errors.append(key_not_found_error)
|
||||
|
||||
# Probe a bunch of timestamps in the valid range
|
||||
step_size = 100
|
||||
|
||||
Reference in New Issue
Block a user