mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 15:02:56 +00:00
pageserver: coalesce read paths (#7477)
## Problem We are currently supporting two read paths. No bueno. ## Summary of changes High level: use vectored read path to serve get page requests - gated by `get_impl` config Low level: 1. Add ps config, `get_impl` to specify which read path to use when serving get page requests 2. Fix base cached image handling for the vectored read path. This was subtly broken: previously we would not mark keys that went past their cached lsn as complete. This is a self standing change which could be its own PR, but I've included it here because writing separate tests for it is tricky. 3. Fork get page to use either the legacy or vectored implementation 4. Validate the use of vectored read path when serving get page requests against the legacy implementation. Controlled by `validate_vectored_get` ps config. 5. Use the vectored read path to serve get page requests in tests (with validation). ## Note Since the vectored read path does not go through the page cache to read buffers, this change also amounts to a removal of the buffer page cache. Materialized page cache is still used.
This commit is contained in:
1
.github/workflows/build_and_test.yml
vendored
1
.github/workflows/build_and_test.yml
vendored
@@ -477,6 +477,7 @@ jobs:
|
||||
BUILD_TAG: ${{ needs.tag.outputs.build-tag }}
|
||||
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
|
||||
PAGESERVER_GET_VECTORED_IMPL: vectored
|
||||
PAGESERVER_GET_IMPL: vectored
|
||||
|
||||
# Temporary disable this step until we figure out why it's so flaky
|
||||
# Ref https://github.com/neondatabase/neon/issues/4540
|
||||
|
||||
@@ -129,6 +129,7 @@ pub struct PageServerConf {
|
||||
|
||||
pub(crate) virtual_file_io_engine: Option<String>,
|
||||
pub(crate) get_vectored_impl: Option<String>,
|
||||
pub(crate) get_impl: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for PageServerConf {
|
||||
@@ -141,6 +142,7 @@ impl Default for PageServerConf {
|
||||
http_auth_type: AuthType::Trust,
|
||||
virtual_file_io_engine: None,
|
||||
get_vectored_impl: None,
|
||||
get_impl: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,6 +92,7 @@ impl PageServerNode {
|
||||
http_auth_type,
|
||||
virtual_file_io_engine,
|
||||
get_vectored_impl,
|
||||
get_impl,
|
||||
} = &self.conf;
|
||||
|
||||
let id = format!("id={}", id);
|
||||
@@ -111,6 +112,11 @@ impl PageServerNode {
|
||||
} else {
|
||||
String::new()
|
||||
};
|
||||
let get_impl = if let Some(get_impl) = get_impl {
|
||||
format!("get_impl='{get_impl}'")
|
||||
} else {
|
||||
String::new()
|
||||
};
|
||||
|
||||
let broker_endpoint_param = format!("broker_endpoint='{}'", self.env.broker.client_url());
|
||||
|
||||
@@ -124,6 +130,7 @@ impl PageServerNode {
|
||||
broker_endpoint_param,
|
||||
virtual_file_io_engine,
|
||||
get_vectored_impl,
|
||||
get_impl,
|
||||
];
|
||||
|
||||
if let Some(control_plane_api) = &self.env.control_plane_api {
|
||||
|
||||
@@ -182,6 +182,11 @@ impl KeySpace {
|
||||
pub fn overlaps(&self, range: &Range<Key>) -> bool {
|
||||
self.overlaps_at(range).is_some()
|
||||
}
|
||||
|
||||
/// Check if the keyspace contains a key
|
||||
pub fn contains(&self, key: &Key) -> bool {
|
||||
self.overlaps(&(*key..key.next()))
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
|
||||
@@ -121,8 +121,10 @@ fn main() -> anyhow::Result<()> {
|
||||
&[("node_id", &conf.id.to_string())],
|
||||
);
|
||||
|
||||
// after setting up logging, log the effective IO engine choice
|
||||
// after setting up logging, log the effective IO engine choice and read path implementations
|
||||
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
|
||||
info!(?conf.get_impl, "starting with get page implementation");
|
||||
info!(?conf.get_vectored_impl, "starting with vectored get page implementation");
|
||||
|
||||
let tenants_path = conf.tenants_path();
|
||||
if !tenants_path.exists() {
|
||||
|
||||
@@ -30,9 +30,9 @@ use utils::{
|
||||
logging::LogFormat,
|
||||
};
|
||||
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
use crate::tenant::timeline::GetVectoredImpl;
|
||||
use crate::tenant::vectored_blob_io::MaxVectoredReadBytes;
|
||||
use crate::tenant::{config::TenantConfOpt, timeline::GetImpl};
|
||||
use crate::tenant::{
|
||||
TENANTS_SEGMENT_NAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
|
||||
};
|
||||
@@ -91,6 +91,8 @@ pub mod defaults {
|
||||
|
||||
pub const DEFAULT_GET_VECTORED_IMPL: &str = "sequential";
|
||||
|
||||
pub const DEFAULT_GET_IMPL: &str = "legacy";
|
||||
|
||||
pub const DEFAULT_MAX_VECTORED_READ_BYTES: usize = 128 * 1024; // 128 KiB
|
||||
|
||||
pub const DEFAULT_VALIDATE_VECTORED_GET: bool = true;
|
||||
@@ -138,6 +140,8 @@ pub mod defaults {
|
||||
|
||||
#get_vectored_impl = '{DEFAULT_GET_VECTORED_IMPL}'
|
||||
|
||||
#get_impl = '{DEFAULT_GET_IMPL}'
|
||||
|
||||
#max_vectored_read_bytes = '{DEFAULT_MAX_VECTORED_READ_BYTES}'
|
||||
|
||||
#validate_vectored_get = '{DEFAULT_VALIDATE_VECTORED_GET}'
|
||||
@@ -284,6 +288,8 @@ pub struct PageServerConf {
|
||||
|
||||
pub get_vectored_impl: GetVectoredImpl,
|
||||
|
||||
pub get_impl: GetImpl,
|
||||
|
||||
pub max_vectored_read_bytes: MaxVectoredReadBytes,
|
||||
|
||||
pub validate_vectored_get: bool,
|
||||
@@ -414,6 +420,8 @@ struct PageServerConfigBuilder {
|
||||
|
||||
get_vectored_impl: BuilderValue<GetVectoredImpl>,
|
||||
|
||||
get_impl: BuilderValue<GetImpl>,
|
||||
|
||||
max_vectored_read_bytes: BuilderValue<MaxVectoredReadBytes>,
|
||||
|
||||
validate_vectored_get: BuilderValue<bool>,
|
||||
@@ -503,6 +511,7 @@ impl PageServerConfigBuilder {
|
||||
virtual_file_io_engine: Set(DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap()),
|
||||
|
||||
get_vectored_impl: Set(DEFAULT_GET_VECTORED_IMPL.parse().unwrap()),
|
||||
get_impl: Set(DEFAULT_GET_IMPL.parse().unwrap()),
|
||||
max_vectored_read_bytes: Set(MaxVectoredReadBytes(
|
||||
NonZeroUsize::new(DEFAULT_MAX_VECTORED_READ_BYTES).unwrap(),
|
||||
)),
|
||||
@@ -681,6 +690,10 @@ impl PageServerConfigBuilder {
|
||||
self.get_vectored_impl = BuilderValue::Set(value);
|
||||
}
|
||||
|
||||
pub fn get_impl(&mut self, value: GetImpl) {
|
||||
self.get_impl = BuilderValue::Set(value);
|
||||
}
|
||||
|
||||
pub fn get_max_vectored_read_bytes(&mut self, value: MaxVectoredReadBytes) {
|
||||
self.max_vectored_read_bytes = BuilderValue::Set(value);
|
||||
}
|
||||
@@ -750,6 +763,7 @@ impl PageServerConfigBuilder {
|
||||
secondary_download_concurrency,
|
||||
ingest_batch_size,
|
||||
get_vectored_impl,
|
||||
get_impl,
|
||||
max_vectored_read_bytes,
|
||||
validate_vectored_get,
|
||||
ephemeral_bytes_per_memory_kb,
|
||||
@@ -1035,6 +1049,9 @@ impl PageServerConf {
|
||||
"get_vectored_impl" => {
|
||||
builder.get_vectored_impl(parse_toml_from_str("get_vectored_impl", item)?)
|
||||
}
|
||||
"get_impl" => {
|
||||
builder.get_impl(parse_toml_from_str("get_impl", item)?)
|
||||
}
|
||||
"max_vectored_read_bytes" => {
|
||||
let bytes = parse_toml_u64("max_vectored_read_bytes", item)? as usize;
|
||||
builder.get_max_vectored_read_bytes(
|
||||
@@ -1126,6 +1143,7 @@ impl PageServerConf {
|
||||
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
|
||||
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
|
||||
get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
|
||||
get_impl: defaults::DEFAULT_GET_IMPL.parse().unwrap(),
|
||||
max_vectored_read_bytes: MaxVectoredReadBytes(
|
||||
NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
|
||||
.expect("Invalid default constant"),
|
||||
@@ -1365,6 +1383,7 @@ background_task_maximum_delay = '334 s'
|
||||
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
|
||||
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
|
||||
get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
|
||||
get_impl: defaults::DEFAULT_GET_IMPL.parse().unwrap(),
|
||||
max_vectored_read_bytes: MaxVectoredReadBytes(
|
||||
NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
|
||||
.expect("Invalid default constant")
|
||||
@@ -1438,6 +1457,7 @@ background_task_maximum_delay = '334 s'
|
||||
ingest_batch_size: 100,
|
||||
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
|
||||
get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
|
||||
get_impl: defaults::DEFAULT_GET_IMPL.parse().unwrap(),
|
||||
max_vectored_read_bytes: MaxVectoredReadBytes(
|
||||
NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
|
||||
.expect("Invalid default constant")
|
||||
|
||||
@@ -3865,6 +3865,7 @@ mod tests {
|
||||
use pageserver_api::key::NON_INHERITED_RANGE;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use rand::{thread_rng, Rng};
|
||||
use tests::storage_layer::ValuesReconstructState;
|
||||
use tests::timeline::{GetVectoredError, ShutdownMode};
|
||||
|
||||
static TEST_KEY: Lazy<Key> =
|
||||
@@ -4653,7 +4654,9 @@ mod tests {
|
||||
for read in reads {
|
||||
info!("Doing vectored read on {:?}", read);
|
||||
|
||||
let vectored_res = tline.get_vectored_impl(read.clone(), reads_lsn, &ctx).await;
|
||||
let vectored_res = tline
|
||||
.get_vectored_impl(read.clone(), reads_lsn, ValuesReconstructState::new(), &ctx)
|
||||
.await;
|
||||
tline
|
||||
.validate_get_vectored_impl(&vectored_res, read, reads_lsn, &ctx)
|
||||
.await;
|
||||
@@ -4698,7 +4701,12 @@ mod tests {
|
||||
let read_lsn = child_timeline.get_last_record_lsn();
|
||||
|
||||
let vectored_res = child_timeline
|
||||
.get_vectored_impl(aux_keyspace.clone(), read_lsn, &ctx)
|
||||
.get_vectored_impl(
|
||||
aux_keyspace.clone(),
|
||||
read_lsn,
|
||||
ValuesReconstructState::new(),
|
||||
&ctx,
|
||||
)
|
||||
.await;
|
||||
|
||||
child_timeline
|
||||
@@ -4846,7 +4854,12 @@ mod tests {
|
||||
ranges: vec![key_near_gap..gap_at_key.next(), key_near_end..current_key],
|
||||
};
|
||||
let results = child_timeline
|
||||
.get_vectored_impl(read.clone(), current_lsn, &ctx)
|
||||
.get_vectored_impl(
|
||||
read.clone(),
|
||||
current_lsn,
|
||||
ValuesReconstructState::new(),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
for (key, img_res) in results {
|
||||
@@ -4979,6 +4992,7 @@ mod tests {
|
||||
ranges: vec![child_gap_at_key..child_gap_at_key.next()],
|
||||
},
|
||||
query_lsn,
|
||||
ValuesReconstructState::new(),
|
||||
&ctx,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -148,6 +148,29 @@ impl ValuesReconstructState {
|
||||
self.layers_visited
|
||||
}
|
||||
|
||||
/// This function is called after reading a keyspace from a layer.
|
||||
/// It checks if the read path has now moved past the cached Lsn for any keys.
|
||||
///
|
||||
/// Implementation note: We intentionally iterate over the keys for which we've
|
||||
/// already collected some reconstruct data. This avoids scaling complexity with
|
||||
/// the size of the search space.
|
||||
pub(crate) fn on_lsn_advanced(&mut self, keyspace: &KeySpace, advanced_to: Lsn) {
|
||||
for (key, value) in self.keys.iter_mut() {
|
||||
if !keyspace.contains(key) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Ok(state) = value {
|
||||
if state.situation != ValueReconstructSituation::Complete
|
||||
&& state.get_cached_lsn() >= Some(advanced_to)
|
||||
{
|
||||
state.situation = ValueReconstructSituation::Complete;
|
||||
self.keys_done.add_key(*key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Update the state collected for a given key.
|
||||
/// Returns true if this was the last value needed for the key and false otherwise.
|
||||
///
|
||||
@@ -172,11 +195,18 @@ impl ValuesReconstructState {
|
||||
true
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let reached_cache =
|
||||
state.get_cached_lsn().map(|clsn| clsn + 1) == Some(lsn);
|
||||
debug_assert!(
|
||||
Some(lsn) > state.get_cached_lsn(),
|
||||
"Attempt to collect a record below cached LSN for walredo: {} < {}",
|
||||
lsn,
|
||||
state
|
||||
.get_cached_lsn()
|
||||
.expect("Assertion can only fire if a cached lsn is present")
|
||||
);
|
||||
|
||||
let will_init = rec.will_init();
|
||||
state.records.push((lsn, rec));
|
||||
will_init || reached_cache
|
||||
will_init
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
@@ -217,6 +217,7 @@ pub struct DeltaLayerInner {
|
||||
// values copied from summary
|
||||
index_start_blk: u32,
|
||||
index_root_blk: u32,
|
||||
lsn_range: Range<Lsn>,
|
||||
|
||||
file: VirtualFile,
|
||||
file_id: FileId,
|
||||
@@ -745,6 +746,7 @@ impl DeltaLayerInner {
|
||||
file_id,
|
||||
index_start_blk: actual_summary.index_start_blk,
|
||||
index_root_blk: actual_summary.index_root_blk,
|
||||
lsn_range: actual_summary.lsn_range,
|
||||
max_vectored_read_bytes,
|
||||
}))
|
||||
}
|
||||
@@ -869,7 +871,7 @@ impl DeltaLayerInner {
|
||||
let data_end_offset = self.index_start_offset();
|
||||
|
||||
let reads = Self::plan_reads(
|
||||
keyspace,
|
||||
&keyspace,
|
||||
lsn_range,
|
||||
data_end_offset,
|
||||
index_reader,
|
||||
@@ -883,11 +885,13 @@ impl DeltaLayerInner {
|
||||
self.do_reads_and_update_state(reads, reconstruct_state)
|
||||
.await;
|
||||
|
||||
reconstruct_state.on_lsn_advanced(&keyspace, self.lsn_range.start);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn plan_reads<Reader>(
|
||||
keyspace: KeySpace,
|
||||
keyspace: &KeySpace,
|
||||
lsn_range: Range<Lsn>,
|
||||
data_end_offset: u64,
|
||||
index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
|
||||
@@ -1535,7 +1539,7 @@ mod test {
|
||||
|
||||
// Plan and validate
|
||||
let vectored_reads = DeltaLayerInner::plan_reads(
|
||||
keyspace.clone(),
|
||||
&keyspace,
|
||||
lsn_range.clone(),
|
||||
disk_offset,
|
||||
reader,
|
||||
@@ -1787,7 +1791,7 @@ mod test {
|
||||
let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
|
||||
|
||||
let vectored_reads = DeltaLayerInner::plan_reads(
|
||||
keyspace.clone(),
|
||||
&keyspace,
|
||||
entries_meta.lsn_range.clone(),
|
||||
data_end_offset,
|
||||
index_reader,
|
||||
|
||||
@@ -438,6 +438,8 @@ impl InMemoryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
reconstruct_state.on_lsn_advanced(&keyspace, self.start_lsn);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -336,6 +336,12 @@ impl Layer {
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, &self.0, ctx)
|
||||
.instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
GetVectoredError::Other(err) => GetVectoredError::Other(
|
||||
err.context(format!("get_values_reconstruct_data for layer {self}")),
|
||||
),
|
||||
err => err,
|
||||
})
|
||||
}
|
||||
|
||||
/// Download the layer if evicted.
|
||||
|
||||
@@ -119,8 +119,8 @@ use self::layer_manager::LayerManager;
|
||||
use self::logical_size::LogicalSize;
|
||||
use self::walreceiver::{WalReceiver, WalReceiverConf};
|
||||
|
||||
use super::config::TenantConf;
|
||||
use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
|
||||
use super::{config::TenantConf, storage_layer::VectoredValueReconstructState};
|
||||
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
|
||||
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
|
||||
use super::{remote_timeline_client::RemoteTimelineClient, storage_layer::ReadableLayer};
|
||||
@@ -653,6 +653,19 @@ impl From<GetVectoredError> for CreateImageLayersError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetVectoredError> for PageReconstructError {
|
||||
fn from(e: GetVectoredError) -> Self {
|
||||
match e {
|
||||
GetVectoredError::Cancelled => PageReconstructError::Cancelled,
|
||||
GetVectoredError::InvalidLsn(_) => PageReconstructError::Other(anyhow!("Invalid LSN")),
|
||||
err @ GetVectoredError::Oversized(_) => PageReconstructError::Other(err.into()),
|
||||
err @ GetVectoredError::MissingKey(_) => PageReconstructError::Other(err.into()),
|
||||
GetVectoredError::GetReadyAncestorError(err) => PageReconstructError::from(err),
|
||||
GetVectoredError::Other(err) => PageReconstructError::Other(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetReadyAncestorError> for PageReconstructError {
|
||||
fn from(e: GetReadyAncestorError) -> Self {
|
||||
use GetReadyAncestorError::*;
|
||||
@@ -682,6 +695,23 @@ pub enum GetVectoredImpl {
|
||||
Vectored,
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Eq,
|
||||
PartialEq,
|
||||
Debug,
|
||||
Copy,
|
||||
Clone,
|
||||
strum_macros::EnumString,
|
||||
strum_macros::Display,
|
||||
serde_with::DeserializeFromStr,
|
||||
serde_with::SerializeDisplay,
|
||||
)]
|
||||
#[strum(serialize_all = "kebab-case")]
|
||||
pub enum GetImpl {
|
||||
Legacy,
|
||||
Vectored,
|
||||
}
|
||||
|
||||
pub(crate) enum WaitLsnWaiter<'a> {
|
||||
Timeline(&'a Timeline),
|
||||
Tenant,
|
||||
@@ -743,16 +773,6 @@ impl Timeline {
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
self.timeline_get_throttle.throttle(ctx, 1).await;
|
||||
self.get_impl(key, lsn, ctx).await
|
||||
}
|
||||
/// Not subject to [`Self::timeline_get_throttle`].
|
||||
async fn get_impl(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
if !lsn.is_valid() {
|
||||
return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
|
||||
@@ -763,13 +783,7 @@ impl Timeline {
|
||||
// page_service.
|
||||
debug_assert!(!self.shard_identity.is_key_disposable(&key));
|
||||
|
||||
// XXX: structured stats collection for layer eviction here.
|
||||
trace!(
|
||||
"get page request for {}@{} from task kind {:?}",
|
||||
key,
|
||||
lsn,
|
||||
ctx.task_kind()
|
||||
);
|
||||
self.timeline_get_throttle.throttle(ctx, 1).await;
|
||||
|
||||
// Check the page cache. We will get back the most recent page with lsn <= `lsn`.
|
||||
// The cached image can be returned directly if there is no WAL between the cached image
|
||||
@@ -792,10 +806,81 @@ impl Timeline {
|
||||
None => None,
|
||||
};
|
||||
|
||||
let mut reconstruct_state = ValueReconstructState {
|
||||
records: Vec::new(),
|
||||
img: cached_page_img,
|
||||
};
|
||||
match self.conf.get_impl {
|
||||
GetImpl::Legacy => {
|
||||
let reconstruct_state = ValueReconstructState {
|
||||
records: Vec::new(),
|
||||
img: cached_page_img,
|
||||
};
|
||||
|
||||
self.get_impl(key, lsn, reconstruct_state, ctx).await
|
||||
}
|
||||
GetImpl::Vectored => {
|
||||
let keyspace = KeySpace {
|
||||
ranges: vec![key..key.next()],
|
||||
};
|
||||
|
||||
// Initialise the reconstruct state for the key with the cache
|
||||
// entry returned above.
|
||||
let mut reconstruct_state = ValuesReconstructState::new();
|
||||
let mut key_state = VectoredValueReconstructState::default();
|
||||
key_state.img = cached_page_img;
|
||||
reconstruct_state.keys.insert(key, Ok(key_state));
|
||||
|
||||
let vectored_res = self
|
||||
.get_vectored_impl(keyspace.clone(), lsn, reconstruct_state, ctx)
|
||||
.await;
|
||||
|
||||
if self.conf.validate_vectored_get {
|
||||
self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx)
|
||||
.await;
|
||||
}
|
||||
|
||||
let key_value = vectored_res?.pop_first();
|
||||
match key_value {
|
||||
Some((got_key, value)) => {
|
||||
if got_key != key {
|
||||
error!(
|
||||
"Expected {}, but singular vectored get returned {}",
|
||||
key, got_key
|
||||
);
|
||||
Err(PageReconstructError::Other(anyhow!(
|
||||
"Singular vectored get returned wrong key"
|
||||
)))
|
||||
} else {
|
||||
value
|
||||
}
|
||||
}
|
||||
None => {
|
||||
error!(
|
||||
"Expected {}, but singular vectored get returned nothing",
|
||||
key
|
||||
);
|
||||
Err(PageReconstructError::Other(anyhow!(
|
||||
"Singular vectored get did not return a value for {}",
|
||||
key
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Not subject to [`Self::timeline_get_throttle`].
|
||||
async fn get_impl(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
mut reconstruct_state: ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
// XXX: structured stats collection for layer eviction here.
|
||||
trace!(
|
||||
"get page request for {}@{} from task kind {:?}",
|
||||
key,
|
||||
lsn,
|
||||
ctx.task_kind()
|
||||
);
|
||||
|
||||
let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME
|
||||
.for_get_kind(GetKind::Singular)
|
||||
@@ -888,7 +973,9 @@ impl Timeline {
|
||||
self.get_vectored_sequential_impl(keyspace, lsn, ctx).await
|
||||
}
|
||||
GetVectoredImpl::Vectored => {
|
||||
let vectored_res = self.get_vectored_impl(keyspace.clone(), lsn, ctx).await;
|
||||
let vectored_res = self
|
||||
.get_vectored_impl(keyspace.clone(), lsn, ValuesReconstructState::new(), ctx)
|
||||
.await;
|
||||
|
||||
if self.conf.validate_vectored_get {
|
||||
self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx)
|
||||
@@ -934,7 +1021,9 @@ impl Timeline {
|
||||
for range in keyspace.ranges {
|
||||
let mut key = range.start;
|
||||
while key != range.end {
|
||||
let block = self.get_impl(key, lsn, ctx).await;
|
||||
let block = self
|
||||
.get_impl(key, lsn, ValueReconstructState::default(), ctx)
|
||||
.await;
|
||||
|
||||
use PageReconstructError::*;
|
||||
match block {
|
||||
@@ -952,6 +1041,23 @@ impl Timeline {
|
||||
// level error.
|
||||
return Err(GetVectoredError::MissingKey(key));
|
||||
}
|
||||
Err(Other(err))
|
||||
if err
|
||||
.to_string()
|
||||
.contains("downloading evicted layer file failed") =>
|
||||
{
|
||||
return Err(GetVectoredError::Other(err))
|
||||
}
|
||||
Err(Other(err))
|
||||
if err
|
||||
.chain()
|
||||
.any(|cause| cause.to_string().contains("layer loading failed")) =>
|
||||
{
|
||||
// The intent here is to achieve error parity with the vectored read path.
|
||||
// When vectored read fails to load a layer it fails the whole read, hence
|
||||
// we mimic this behaviour here to keep the validation happy.
|
||||
return Err(GetVectoredError::Other(err));
|
||||
}
|
||||
_ => {
|
||||
values.insert(key, block);
|
||||
key = key.next();
|
||||
@@ -967,10 +1073,9 @@ impl Timeline {
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
lsn: Lsn,
|
||||
mut reconstruct_state: ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
|
||||
let mut reconstruct_state = ValuesReconstructState::new();
|
||||
|
||||
let get_kind = if keyspace.total_size() == 1 {
|
||||
GetKind::Singular
|
||||
} else {
|
||||
|
||||
@@ -507,6 +507,11 @@ class NeonEnvBuilder:
|
||||
self.pageserver_get_vectored_impl = "vectored"
|
||||
log.debug('Overriding pageserver get_vectored_impl config to "vectored"')
|
||||
|
||||
self.pageserver_get_impl: Optional[str] = None
|
||||
if os.getenv("PAGESERVER_GET_IMPL", "") == "vectored":
|
||||
self.pageserver_get_impl = "vectored"
|
||||
log.debug('Overriding pageserver get_impl config to "vectored"')
|
||||
|
||||
assert test_name.startswith(
|
||||
"test_"
|
||||
), "Unexpectedly instantiated from outside a test function"
|
||||
@@ -1078,6 +1083,8 @@ class NeonEnv:
|
||||
ps_cfg["virtual_file_io_engine"] = self.pageserver_virtual_file_io_engine
|
||||
if config.pageserver_get_vectored_impl is not None:
|
||||
ps_cfg["get_vectored_impl"] = config.pageserver_get_vectored_impl
|
||||
if config.pageserver_get_impl is not None:
|
||||
ps_cfg["get_impl"] = config.pageserver_get_impl
|
||||
|
||||
# Create a corresponding NeonPageserver object
|
||||
self.pageservers.append(
|
||||
|
||||
@@ -17,11 +17,16 @@ from fixtures.types import TenantId, TimelineId
|
||||
# Test restarting page server, while safekeeper and compute node keep
|
||||
# running.
|
||||
def test_local_corruption(neon_env_builder: NeonEnvBuilder):
|
||||
if neon_env_builder.pageserver_get_impl == "vectored":
|
||||
reconstruct_function_name = "get_values_reconstruct_data"
|
||||
else:
|
||||
reconstruct_function_name = "get_value_reconstruct_data"
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*get_value_reconstruct_data for layer .*",
|
||||
f".*{reconstruct_function_name} for layer .*",
|
||||
".*could not find data for key.*",
|
||||
".*is not active. Current state: Broken.*",
|
||||
".*will not become active. Current state: Broken.*",
|
||||
@@ -84,7 +89,7 @@ def test_local_corruption(neon_env_builder: NeonEnvBuilder):
|
||||
# (We don't check layer file contents on startup, when loading the timeline)
|
||||
#
|
||||
# This will change when we implement checksums for layers
|
||||
with pytest.raises(Exception, match="get_value_reconstruct_data for layer ") as err:
|
||||
with pytest.raises(Exception, match=f"{reconstruct_function_name} for layer ") as err:
|
||||
pg2.start()
|
||||
log.info(
|
||||
f"As expected, compute startup failed for timeline {tenant2}/{timeline2} with corrupt layers: {err}"
|
||||
|
||||
@@ -226,6 +226,11 @@ def test_forward_compatibility(
|
||||
)
|
||||
|
||||
try:
|
||||
# Previous version neon_local and pageserver are not aware
|
||||
# of the new config.
|
||||
# TODO: remove this once the code reaches main
|
||||
neon_env_builder.pageserver_get_impl = None
|
||||
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
neon_local_binpath = neon_env_builder.neon_binpath
|
||||
env = neon_env_builder.from_repo_dir(
|
||||
|
||||
@@ -4,16 +4,21 @@ import threading
|
||||
import time
|
||||
from typing import List
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.neon_fixtures import DEFAULT_BRANCH_NAME, NeonEnvBuilder
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
def test_local_file_cache_unlink(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder, build_type: str):
|
||||
if build_type == "debug":
|
||||
# Disable vectored read path cross validation since it makes the test time out.
|
||||
neon_env_builder.pageserver_config_override = "validate_vectored_get=false"
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
cache_dir = os.path.join(env.repo_dir, "file_cache")
|
||||
os.mkdir(cache_dir)
|
||||
|
||||
env.neon_cli.create_branch("empty", ancestor_branch_name=DEFAULT_BRANCH_NAME)
|
||||
env.neon_cli.create_branch("test_local_file_cache_unlink", "empty")
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import re
|
||||
import time
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
@@ -109,6 +110,11 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# Test pageserver get_timestamp_of_lsn API
|
||||
def test_ts_of_lsn_api(neon_env_builder: NeonEnvBuilder):
|
||||
if neon_env_builder.pageserver_get_impl == "vectored":
|
||||
key_not_found_error = r".*Requested key.*not found,*"
|
||||
else:
|
||||
key_not_found_error = r".*could not find data for key.*"
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
new_timeline_id = env.neon_cli.create_branch("test_ts_of_lsn_api")
|
||||
@@ -177,8 +183,8 @@ def test_ts_of_lsn_api(neon_env_builder: NeonEnvBuilder):
|
||||
raise RuntimeError("there should have been an 'could not find data for key' error")
|
||||
except PageserverApiException as error:
|
||||
assert error.status_code == 500
|
||||
assert str(error).startswith("could not find data for key")
|
||||
env.pageserver.allowed_errors.append(".*could not find data for key.*")
|
||||
assert re.match(key_not_found_error, str(error))
|
||||
env.pageserver.allowed_errors.append(key_not_found_error)
|
||||
|
||||
# Probe a bunch of timestamps in the valid range
|
||||
step_size = 100
|
||||
|
||||
@@ -18,6 +18,7 @@ from fixtures.remote_storage import s3_storage
|
||||
def test_pg_regress(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
test_output_dir: Path,
|
||||
build_type: str,
|
||||
pg_bin,
|
||||
capsys,
|
||||
base_dir: Path,
|
||||
@@ -30,6 +31,11 @@ def test_pg_regress(
|
||||
"""
|
||||
if shard_count is not None:
|
||||
neon_env_builder.num_pageservers = shard_count
|
||||
|
||||
if build_type == "debug":
|
||||
# Disable vectored read path cross validation since it makes the test time out.
|
||||
neon_env_builder.pageserver_config_override = "validate_vectored_get=false"
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||
neon_env_builder.enable_scrub_on_exit()
|
||||
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
|
||||
|
||||
Reference in New Issue
Block a user