Compare commits

...

13 Commits

Author SHA1 Message Date
Alex Chi Z
aba9eb3944 fix tests
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-15 13:28:07 -04:00
Alex Chi Z
e43d5c83bc add encryption key id
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-15 13:08:15 -04:00
Alex Chi Z
53aadcefa0 fix clippy warnings
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-14 15:59:17 -04:00
Alex Chi Z
92af8aeeb3 feat(pageserver): add initial encrypt key fields
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-14 15:18:07 -04:00
Vlad Lazar
8cce27bedb pageserver: add a randomized read path test (#11519)
## Problem

Every time we make changes to the read path to fix a bug or add a
feature,
we end up adding another incomprehensible test.

## Summary of changes

Add some generic infrastructure for generating a layer map from a type
spec
and use that for a read path test. The test is randomized but uses a
fixed seed
by default. A fuzzing mode is available for confidence building.

See [Notion
page](https://www.notion.so/neondatabase/Read-Path-Unit-Testing-Fuzzing-1d1f189e0047806c8e5cd37781b0a350?pvs=4)
for a diagram of the layer map
used.

Just for fun I tried removing [this
commit](9990199cb4)
from https://github.com/neondatabase/neon/pull/11494
and it caught the bug in the normal mode (no fuzzing required).
2025-04-14 15:31:32 +00:00
Vlad Lazar
90b706cd96 tests: save pageserver metrics at the end of the test (#11559)
## Problem

Sometimes it's useful to see the pageserver metrics after a test in
order to debug stuff.
For example, for https://github.com/neondatabase/neon/issues/11465 I'd
like to know
what the remote storage latencies are from the client.

## Summary of changes

When stopping the env, record the pageserver metrics into a file in the
pageserver's workdir.
2025-04-14 15:13:20 +00:00
Alex Chi Z.
057ce115de fix(test): allow stale generation errors (1/2) (#11531)
## Problem

Part of https://github.com/neondatabase/neon/issues/11486

## Summary of changes

50% of the test instability of `test_create_churn_during_restart` are
due to error message gets changed. Allow the new error message.

Still need to fix other errors due to failure to acquire semaphore in
this or the next patch.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-14 14:51:17 +00:00
Vlad Lazar
e85607eed8 tests: remove config tweak allowing old versions to start with a batching config (#11560)
## Problem

Pageservers now ignore unknown config fields, so this config tweaking is
no longer needed.

## Summary of changes

Get rid of the hack.

Closes https://github.com/neondatabase/neon/issues/11524
2025-04-14 14:42:35 +00:00
Tristan Partin
437071888e Fix logging in nightly physical replication benchmarks (#11541)
Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-04-14 13:57:33 +00:00
Vlad Lazar
148b3701cf pageserver: add metrics for get page batch breaking reasons (#11545)
## Problem

https://github.com/neondatabase/neon/pull/11494 changes the batching
logic, but we don't have a way to evaluate it.

## Summary of changes

This PR introduces a global and per timeline metric which tracks the
reason for
which a batch was broken.
2025-04-14 13:24:47 +00:00
Christian Schwarz
daebe50e19 refactor: plumb gate and cancellation down to to blob_io::BlobWriter (#11543)
In #10063 we will switch BlobWriter to use the owned buffers IO buffered
writer, which implements double-buffering by virtue of a background task
that performs the flushing.

That task's lifecylce must be contained within the Timeline lifecycle,
so, it must hold the timeline gate open and respect Timeline::cancel.

This PR does the noisy plumbing to reduce the #10063 diff.

Refs
- extracted from https://github.com/neondatabase/neon/pull/10063
- epic https://github.com/neondatabase/neon/issues/9868
2025-04-14 11:51:01 +00:00
Arpad Müller
e0ee6fbeff Remove deprecated --compute-hook-url storcon param (#11551)
We have already migrated the storage controller to
`--control-plane-url`, added in #11173. The new param was added to
support also safekeeper specific endpoints. See the docs changes in
#11195 for further details.

Part of #11163
2025-04-14 10:36:40 +00:00
Konstantin Knizhnik
307fa2ceb7 Remove unused n_synced variable from HandleSafekeeperResponse (#11553)
## Problem

clang produce warning about unused variable `n_synced` in
HandleSafekeeperResponse

## Summary of changes

Remove local variable.

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-04-14 09:45:13 +00:00
26 changed files with 1248 additions and 217 deletions

View File

@@ -10,6 +10,8 @@ default = []
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints", "pageserver_api/testing", "wal_decoder/testing", "pageserver_client/testing"]
fuzz-read-path = ["testing"]
[dependencies]
anyhow.workspace = true
arc-swap.workspace = true

View File

@@ -126,7 +126,7 @@ async fn ingest(
max_concurrency: NonZeroUsize::new(1).unwrap(),
});
let (_desc, path) = layer
.write_to_disk(&ctx, None, l0_flush_state.inner())
.write_to_disk(&ctx, None, l0_flush_state.inner(), &gate, cancel.clone())
.await?
.unwrap();
tokio::fs::remove_file(path).await?;

View File

@@ -45,6 +45,7 @@ fn bench_upload_queue_next_ready(c: &mut Criterion) {
shard: ShardIndex::new(ShardNumber(1), ShardCount(2)),
generation: Generation::Valid(1),
file_size: 0,
encryption_key: None,
};
// Construct the (initial and uploaded) index with layer0.

View File

@@ -1714,6 +1714,28 @@ pub enum SmgrQueryType {
Test,
}
#[derive(
Debug,
Clone,
Copy,
IntoStaticStr,
strum_macros::EnumCount,
strum_macros::EnumIter,
strum_macros::FromRepr,
enum_map::Enum,
)]
#[strum(serialize_all = "snake_case")]
pub enum GetPageBatchBreakReason {
BatchFull,
NonBatchableRequest,
NonUniformLsn,
SamePageAtDifferentLsn,
NonUniformTimeline,
ExecutorSteal,
#[cfg(feature = "testing")]
NonUniformKey,
}
pub(crate) struct SmgrQueryTimePerTimeline {
global_started: [IntCounter; SmgrQueryType::COUNT],
global_latency: [Histogram; SmgrQueryType::COUNT],
@@ -1725,6 +1747,8 @@ pub(crate) struct SmgrQueryTimePerTimeline {
per_timeline_flush_in_progress_micros: IntCounter,
global_batch_wait_time: Histogram,
per_timeline_batch_wait_time: Histogram,
global_batch_break_reason: [IntCounter; GetPageBatchBreakReason::COUNT],
per_timeline_batch_break_reason: GetPageBatchBreakReasonTimelineMetrics,
throttling: Arc<tenant_throttling::Pagestream>,
}
@@ -1858,6 +1882,49 @@ static PAGE_SERVICE_BATCH_SIZE_PER_TENANT_TIMELINE: Lazy<HistogramVec> = Lazy::n
.expect("failed to define a metric")
});
static PAGE_SERVICE_BATCH_BREAK_REASON_GLOBAL: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
// it's a counter, but, name is prepared to extend it to a histogram of queue depth
"pageserver_page_service_batch_break_reason_global",
"Reason for breaking batches of get page requests",
&["reason"],
)
.expect("failed to define a metric")
});
struct GetPageBatchBreakReasonTimelineMetrics {
map: EnumMap<GetPageBatchBreakReason, IntCounter>,
}
impl GetPageBatchBreakReasonTimelineMetrics {
fn new(tenant_id: &str, shard_slug: &str, timeline_id: &str) -> Self {
GetPageBatchBreakReasonTimelineMetrics {
map: EnumMap::from_array(std::array::from_fn(|reason_idx| {
let reason = GetPageBatchBreakReason::from_usize(reason_idx);
PAGE_SERVICE_BATCH_BREAK_REASON_PER_TENANT_TIMELINE.with_label_values(&[
tenant_id,
shard_slug,
timeline_id,
reason.into(),
])
})),
}
}
fn inc(&self, reason: GetPageBatchBreakReason) {
self.map[reason].inc()
}
}
static PAGE_SERVICE_BATCH_BREAK_REASON_PER_TENANT_TIMELINE: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_page_service_batch_break_reason",
"Reason for breaking batches of get page requests",
&["tenant_id", "shard_id", "timeline_id", "reason"],
)
.expect("failed to define a metric")
});
pub(crate) static PAGE_SERVICE_CONFIG_MAX_BATCH_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_page_service_config_max_batch_size",
@@ -1985,6 +2052,15 @@ impl SmgrQueryTimePerTimeline {
.get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id])
.unwrap();
let global_batch_break_reason = std::array::from_fn(|i| {
let reason = GetPageBatchBreakReason::from_usize(i);
PAGE_SERVICE_BATCH_BREAK_REASON_GLOBAL
.get_metric_with_label_values(&[reason.into()])
.unwrap()
});
let per_timeline_batch_break_reason =
GetPageBatchBreakReasonTimelineMetrics::new(&tenant_id, &shard_slug, &timeline_id);
let global_flush_in_progress_micros =
PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL.clone();
let per_timeline_flush_in_progress_micros = PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS
@@ -2002,6 +2078,8 @@ impl SmgrQueryTimePerTimeline {
per_timeline_flush_in_progress_micros,
global_batch_wait_time,
per_timeline_batch_wait_time,
global_batch_break_reason,
per_timeline_batch_break_reason,
throttling: pagestream_throttle_metrics,
}
}
@@ -2030,9 +2108,16 @@ impl SmgrQueryTimePerTimeline {
}
/// TODO: do something about this? seems odd, we have a similar call on SmgrOpTimer
pub(crate) fn observe_getpage_batch_start(&self, batch_size: usize) {
pub(crate) fn observe_getpage_batch_start(
&self,
batch_size: usize,
break_reason: GetPageBatchBreakReason,
) {
self.global_batch_size.observe(batch_size as f64);
self.per_timeline_batch_size.observe(batch_size as f64);
self.global_batch_break_reason[break_reason.into_usize()].inc();
self.per_timeline_batch_break_reason.inc(break_reason);
}
}
@@ -3398,6 +3483,15 @@ impl TimelineMetrics {
shard_id,
timeline_id,
]);
for reason in GetPageBatchBreakReason::iter() {
let _ = PAGE_SERVICE_BATCH_BREAK_REASON_PER_TENANT_TIMELINE.remove_label_values(&[
tenant_id,
shard_id,
timeline_id,
reason.into(),
]);
}
}
}
@@ -4276,6 +4370,7 @@ pub fn preinitialize_metrics(
[
&BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT,
&SMGR_QUERY_STARTED_GLOBAL,
&PAGE_SERVICE_BATCH_BREAK_REASON_GLOBAL,
]
.into_iter()
.for_each(|c| {

View File

@@ -58,8 +58,8 @@ use crate::context::{
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
};
use crate::metrics::{
self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, LIVE_CONNECTIONS, SmgrOpTimer,
TimelineMetrics,
self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS,
SmgrOpTimer, TimelineMetrics,
};
use crate::pgdatadir_mapping::Version;
use crate::span::{
@@ -672,6 +672,7 @@ enum BatchedFeMessage {
span: Span,
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
pages: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
batch_break_reason: GetPageBatchBreakReason,
},
DbSize {
span: Span,
@@ -724,6 +725,119 @@ impl BatchedFeMessage {
BatchedFeMessage::RespondError { .. } => {}
}
}
fn should_break_batch(
&self,
other: &BatchedFeMessage,
max_batch_size: NonZeroUsize,
batching_strategy: PageServiceProtocolPipelinedBatchingStrategy,
) -> Option<GetPageBatchBreakReason> {
match (self, other) {
(
BatchedFeMessage::GetPage {
shard: accum_shard,
pages: accum_pages,
..
},
BatchedFeMessage::GetPage {
shard: this_shard,
pages: this_pages,
..
},
) => {
assert_eq!(this_pages.len(), 1);
if accum_pages.len() >= max_batch_size.get() {
trace!(%max_batch_size, "stopping batching because of batch size");
assert_eq!(accum_pages.len(), max_batch_size.get());
return Some(GetPageBatchBreakReason::BatchFull);
}
if !accum_shard.is_same_handle_as(this_shard) {
trace!("stopping batching because timeline object mismatch");
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
// But the current logic for keeping responses in order does not support that.
return Some(GetPageBatchBreakReason::NonUniformTimeline);
}
match batching_strategy {
PageServiceProtocolPipelinedBatchingStrategy::UniformLsn => {
if let Some(last_in_batch) = accum_pages.last() {
if last_in_batch.effective_request_lsn
!= this_pages[0].effective_request_lsn
{
trace!(
accum_lsn = %last_in_batch.effective_request_lsn,
this_lsn = %this_pages[0].effective_request_lsn,
"stopping batching because LSN changed"
);
return Some(GetPageBatchBreakReason::NonUniformLsn);
}
}
}
PageServiceProtocolPipelinedBatchingStrategy::ScatteredLsn => {
// The read path doesn't curently support serving the same page at different LSNs.
// While technically possible, it's uncertain if the complexity is worth it.
// Break the batch if such a case is encountered.
let same_page_different_lsn = accum_pages.iter().any(|batched| {
batched.req.rel == this_pages[0].req.rel
&& batched.req.blkno == this_pages[0].req.blkno
&& batched.effective_request_lsn
!= this_pages[0].effective_request_lsn
});
if same_page_different_lsn {
trace!(
rel=%this_pages[0].req.rel,
blkno=%this_pages[0].req.blkno,
lsn=%this_pages[0].effective_request_lsn,
"stopping batching because same page was requested at different LSNs"
);
return Some(GetPageBatchBreakReason::SamePageAtDifferentLsn);
}
}
}
None
}
#[cfg(feature = "testing")]
(
BatchedFeMessage::Test {
shard: accum_shard,
requests: accum_requests,
..
},
BatchedFeMessage::Test {
shard: this_shard,
requests: this_requests,
..
},
) => {
assert!(this_requests.len() == 1);
if accum_requests.len() >= max_batch_size.get() {
trace!(%max_batch_size, "stopping batching because of batch size");
assert_eq!(accum_requests.len(), max_batch_size.get());
return Some(GetPageBatchBreakReason::BatchFull);
}
if !accum_shard.is_same_handle_as(this_shard) {
trace!("stopping batching because timeline object mismatch");
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
// But the current logic for keeping responses in order does not support that.
return Some(GetPageBatchBreakReason::NonUniformTimeline);
}
let this_batch_key = this_requests[0].req.batch_key;
let accum_batch_key = accum_requests[0].req.batch_key;
if this_requests[0].req.batch_key != accum_requests[0].req.batch_key {
trace!(%accum_batch_key, %this_batch_key, "stopping batching because batch key changed");
return Some(GetPageBatchBreakReason::NonUniformKey);
}
None
}
(_, _) => Some(GetPageBatchBreakReason::NonBatchableRequest),
}
}
}
impl PageServerHandler {
@@ -1047,6 +1161,10 @@ impl PageServerHandler {
effective_request_lsn,
ctx,
}],
// The executor grabs the batch when it becomes idle.
// Hence, [`GetPageBatchBreakReason::ExecutorSteal`] is the
// default reason for breaking the batch.
batch_break_reason: GetPageBatchBreakReason::ExecutorSteal,
}
}
#[cfg(feature = "testing")]
@@ -1084,118 +1202,59 @@ impl PageServerHandler {
Err(e) => return Err(Err(e)),
};
match (&mut *batch, this_msg) {
// something batched already, let's see if we can add this message to the batch
(
Ok(BatchedFeMessage::GetPage {
span: _,
shard: accum_shard,
pages: accum_pages,
}),
BatchedFeMessage::GetPage {
span: _,
shard: this_shard,
pages: this_pages,
},
) if (|| {
assert_eq!(this_pages.len(), 1);
if accum_pages.len() >= max_batch_size.get() {
trace!(%max_batch_size, "stopping batching because of batch size");
assert_eq!(accum_pages.len(), max_batch_size.get());
return false;
}
if !accum_shard.is_same_handle_as(&this_shard) {
trace!("stopping batching because timeline object mismatch");
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
// But the current logic for keeping responses in order does not support that.
return false;
}
match batching_strategy {
PageServiceProtocolPipelinedBatchingStrategy::UniformLsn => {
if let Some(last_in_batch) = accum_pages.last() {
if last_in_batch.effective_request_lsn
!= this_pages[0].effective_request_lsn
{
return false;
}
}
}
PageServiceProtocolPipelinedBatchingStrategy::ScatteredLsn => {
// The read path doesn't curently support serving the same page at different LSNs.
// While technically possible, it's uncertain if the complexity is worth it.
// Break the batch if such a case is encountered.
//
// TODO(vlad): Include a metric for batch breaks with a reason label.
let same_page_different_lsn = accum_pages.iter().any(|batched| {
batched.req.rel == this_pages[0].req.rel
&& batched.req.blkno == this_pages[0].req.blkno
&& batched.effective_request_lsn
!= this_pages[0].effective_request_lsn
});
if same_page_different_lsn {
trace!(
rel=%this_pages[0].req.rel,
blkno=%this_pages[0].req.blkno,
lsn=%this_pages[0].effective_request_lsn,
"stopping batching because same page was requested at different LSNs"
);
return false;
}
}
}
true
})() =>
{
// ok to batch
accum_pages.extend(this_pages);
Ok(())
let eligible_batch = match batch {
Ok(b) => b,
Err(_) => {
return Err(Ok(this_msg));
}
#[cfg(feature = "testing")]
(
Ok(BatchedFeMessage::Test {
shard: accum_shard,
requests: accum_requests,
..
}),
BatchedFeMessage::Test {
shard: this_shard,
requests: this_requests,
..
},
) if (|| {
assert!(this_requests.len() == 1);
if accum_requests.len() >= max_batch_size.get() {
trace!(%max_batch_size, "stopping batching because of batch size");
assert_eq!(accum_requests.len(), max_batch_size.get());
return false;
};
let batch_break =
eligible_batch.should_break_batch(&this_msg, max_batch_size, batching_strategy);
match batch_break {
Some(reason) => {
if let BatchedFeMessage::GetPage {
batch_break_reason, ..
} = eligible_batch
{
*batch_break_reason = reason;
}
if !accum_shard.is_same_handle_as(&this_shard) {
trace!("stopping batching because timeline object mismatch");
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
// But the current logic for keeping responses in order does not support that.
return false;
}
let this_batch_key = this_requests[0].req.batch_key;
let accum_batch_key = accum_requests[0].req.batch_key;
if this_requests[0].req.batch_key != accum_requests[0].req.batch_key {
trace!(%accum_batch_key, %this_batch_key, "stopping batching because batch key changed");
return false;
}
true
})() =>
{
// ok to batch
accum_requests.extend(this_requests);
Ok(())
}
// something batched already but this message is unbatchable
(_, this_msg) => {
// by default, don't continue batching
Err(Ok(this_msg))
}
None => {
// ok to batch
match (eligible_batch, this_msg) {
(
BatchedFeMessage::GetPage {
pages: accum_pages, ..
},
BatchedFeMessage::GetPage {
pages: this_pages, ..
},
) => {
accum_pages.extend(this_pages);
Ok(())
}
#[cfg(feature = "testing")]
(
BatchedFeMessage::Test {
requests: accum_requests,
..
},
BatchedFeMessage::Test {
requests: this_requests,
..
},
) => {
accum_requests.extend(this_requests);
Ok(())
}
// Shape guaranteed by [`BatchedFeMessage::should_break_batch`]
_ => unreachable!(),
}
}
}
}
@@ -1413,7 +1472,12 @@ impl PageServerHandler {
span,
)
}
BatchedFeMessage::GetPage { span, shard, pages } => {
BatchedFeMessage::GetPage {
span,
shard,
pages,
batch_break_reason,
} => {
fail::fail_point!("ps::handle-pagerequest-message::getpage");
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
(
@@ -1425,6 +1489,7 @@ impl PageServerHandler {
&shard,
pages,
io_concurrency,
batch_break_reason,
&ctx,
)
.instrument(span.clone())
@@ -2113,13 +2178,14 @@ impl PageServerHandler {
timeline: &Timeline,
requests: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
io_concurrency: IoConcurrency,
batch_break_reason: GetPageBatchBreakReason,
ctx: &RequestContext,
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
debug_assert_current_span_has_tenant_and_timeline_id();
timeline
.query_metrics
.observe_getpage_batch_start(requests.len());
.observe_getpage_batch_start(requests.len(), batch_break_reason);
// If a page trace is running, submit an event for this request.
if let Some(page_trace) = timeline.page_trace.load().as_ref() {

View File

@@ -5933,12 +5933,20 @@ mod tests {
use models::CompactLsnRange;
use pageserver_api::key::{AUX_KEY_PREFIX, Key, NON_INHERITED_RANGE, RELATION_SIZE_PREFIX};
use pageserver_api::keyspace::KeySpace;
#[cfg(feature = "testing")]
use pageserver_api::keyspace::KeySpaceRandomAccum;
use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings};
#[cfg(feature = "testing")]
use pageserver_api::record::NeonWalRecord;
use pageserver_api::value::Value;
use pageserver_compaction::helpers::overlaps_with;
#[cfg(feature = "testing")]
use rand::SeedableRng;
#[cfg(feature = "testing")]
use rand::rngs::StdRng;
use rand::{Rng, thread_rng};
#[cfg(feature = "testing")]
use std::ops::Range;
use storage_layer::{IoConcurrency, PersistentLayerKey};
use tests::storage_layer::ValuesReconstructState;
use tests::timeline::{GetVectoredError, ShutdownMode};
@@ -5960,6 +5968,318 @@ mod tests {
static TEST_KEY: Lazy<Key> =
Lazy::new(|| Key::from_slice(&hex!("010000000033333333444444445500000001")));
#[cfg(feature = "testing")]
struct TestTimelineSpecification {
start_lsn: Lsn,
last_record_lsn: Lsn,
in_memory_layers_shape: Vec<(Range<Key>, Range<Lsn>)>,
delta_layers_shape: Vec<(Range<Key>, Range<Lsn>)>,
image_layers_shape: Vec<(Range<Key>, Lsn)>,
gap_chance: u8,
will_init_chance: u8,
}
#[cfg(feature = "testing")]
struct Storage {
storage: HashMap<(Key, Lsn), Value>,
start_lsn: Lsn,
}
#[cfg(feature = "testing")]
impl Storage {
fn get(&self, key: Key, lsn: Lsn) -> Bytes {
use bytes::BufMut;
let mut crnt_lsn = lsn;
let mut got_base = false;
let mut acc = Vec::new();
while crnt_lsn >= self.start_lsn {
if let Some(value) = self.storage.get(&(key, crnt_lsn)) {
acc.push(value.clone());
match value {
Value::WalRecord(NeonWalRecord::Test { will_init, .. }) => {
if *will_init {
got_base = true;
break;
}
}
Value::Image(_) => {
got_base = true;
break;
}
_ => unreachable!(),
}
}
crnt_lsn = crnt_lsn.checked_sub(1u64).unwrap();
}
assert!(
got_base,
"Input data was incorrect. No base image for {key}@{lsn}"
);
tracing::debug!("Wal redo depth for {key}@{lsn} is {}", acc.len());
let mut blob = BytesMut::new();
for value in acc.into_iter().rev() {
match value {
Value::WalRecord(NeonWalRecord::Test { append, .. }) => {
blob.extend_from_slice(append.as_bytes());
}
Value::Image(img) => {
blob.put(img);
}
_ => unreachable!(),
}
}
blob.into()
}
}
#[cfg(feature = "testing")]
#[allow(clippy::too_many_arguments)]
async fn randomize_timeline(
tenant: &Arc<Tenant>,
new_timeline_id: TimelineId,
pg_version: u32,
spec: TestTimelineSpecification,
random: &mut rand::rngs::StdRng,
ctx: &RequestContext,
) -> anyhow::Result<(Arc<Timeline>, Storage, Vec<Lsn>)> {
let mut storage: HashMap<(Key, Lsn), Value> = HashMap::default();
let mut interesting_lsns = vec![spec.last_record_lsn];
for (key_range, lsn_range) in spec.in_memory_layers_shape.iter() {
let mut lsn = lsn_range.start;
while lsn < lsn_range.end {
let mut key = key_range.start;
while key < key_range.end {
let gap = random.gen_range(1..=100) <= spec.gap_chance;
let will_init = random.gen_range(1..=100) <= spec.will_init_chance;
if gap {
continue;
}
let record = if will_init {
Value::WalRecord(NeonWalRecord::wal_init(format!("[wil_init {key}@{lsn}]")))
} else {
Value::WalRecord(NeonWalRecord::wal_append(format!("[delta {key}@{lsn}]")))
};
storage.insert((key, lsn), record);
key = key.next();
}
lsn = Lsn(lsn.0 + 1);
}
// Stash some interesting LSN for future use
for offset in [0, 5, 100].iter() {
if *offset == 0 {
interesting_lsns.push(lsn_range.start);
} else {
let below = lsn_range.start.checked_sub(*offset);
match below {
Some(v) if v >= spec.start_lsn => {
interesting_lsns.push(v);
}
_ => {}
}
let above = Lsn(lsn_range.start.0 + offset);
interesting_lsns.push(above);
}
}
}
for (key_range, lsn_range) in spec.delta_layers_shape.iter() {
let mut lsn = lsn_range.start;
while lsn < lsn_range.end {
let mut key = key_range.start;
while key < key_range.end {
let gap = random.gen_range(1..=100) <= spec.gap_chance;
let will_init = random.gen_range(1..=100) <= spec.will_init_chance;
if gap {
continue;
}
let record = if will_init {
Value::WalRecord(NeonWalRecord::wal_init(format!("[wil_init {key}@{lsn}]")))
} else {
Value::WalRecord(NeonWalRecord::wal_append(format!("[delta {key}@{lsn}]")))
};
storage.insert((key, lsn), record);
key = key.next();
}
lsn = Lsn(lsn.0 + 1);
}
// Stash some interesting LSN for future use
for offset in [0, 5, 100].iter() {
if *offset == 0 {
interesting_lsns.push(lsn_range.start);
} else {
let below = lsn_range.start.checked_sub(*offset);
match below {
Some(v) if v >= spec.start_lsn => {
interesting_lsns.push(v);
}
_ => {}
}
let above = Lsn(lsn_range.start.0 + offset);
interesting_lsns.push(above);
}
}
}
for (key_range, lsn) in spec.image_layers_shape.iter() {
let mut key = key_range.start;
while key < key_range.end {
let blob = Bytes::from(format!("[image {key}@{lsn}]"));
let record = Value::Image(blob.clone());
storage.insert((key, *lsn), record);
key = key.next();
}
// Stash some interesting LSN for future use
for offset in [0, 5, 100].iter() {
if *offset == 0 {
interesting_lsns.push(*lsn);
} else {
let below = lsn.checked_sub(*offset);
match below {
Some(v) if v >= spec.start_lsn => {
interesting_lsns.push(v);
}
_ => {}
}
let above = Lsn(lsn.0 + offset);
interesting_lsns.push(above);
}
}
}
let in_memory_test_layers = {
let mut acc = Vec::new();
for (key_range, lsn_range) in spec.in_memory_layers_shape.iter() {
let mut data = Vec::new();
let mut lsn = lsn_range.start;
while lsn < lsn_range.end {
let mut key = key_range.start;
while key < key_range.end {
if let Some(record) = storage.get(&(key, lsn)) {
data.push((key, lsn, record.clone()));
}
key = key.next();
}
lsn = Lsn(lsn.0 + 1);
}
acc.push(InMemoryLayerTestDesc {
data,
lsn_range: lsn_range.clone(),
is_open: false,
})
}
acc
};
let delta_test_layers = {
let mut acc = Vec::new();
for (key_range, lsn_range) in spec.delta_layers_shape.iter() {
let mut data = Vec::new();
let mut lsn = lsn_range.start;
while lsn < lsn_range.end {
let mut key = key_range.start;
while key < key_range.end {
if let Some(record) = storage.get(&(key, lsn)) {
data.push((key, lsn, record.clone()));
}
key = key.next();
}
lsn = Lsn(lsn.0 + 1);
}
acc.push(DeltaLayerTestDesc {
data,
lsn_range: lsn_range.clone(),
key_range: key_range.clone(),
})
}
acc
};
let image_test_layers = {
let mut acc = Vec::new();
for (key_range, lsn) in spec.image_layers_shape.iter() {
let mut data = Vec::new();
let mut key = key_range.start;
while key < key_range.end {
if let Some(record) = storage.get(&(key, *lsn)) {
let blob = match record {
Value::Image(blob) => blob.clone(),
_ => unreachable!(),
};
data.push((key, blob));
}
key = key.next();
}
acc.push((*lsn, data));
}
acc
};
let tline = tenant
.create_test_timeline_with_layers(
new_timeline_id,
spec.start_lsn,
pg_version,
ctx,
in_memory_test_layers,
delta_test_layers,
image_test_layers,
spec.last_record_lsn,
)
.await?;
Ok((
tline,
Storage {
storage,
start_lsn: spec.start_lsn,
},
interesting_lsns,
))
}
#[tokio::test]
async fn test_basic() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_basic").await?.load().await;
@@ -10543,6 +10863,214 @@ mod tests {
Ok(())
}
// A randomized read path test. Generates a layer map according to a deterministic
// specification. Fills the (key, LSN) space in random manner and then performs
// random scattered queries validating the results against in-memory storage.
//
// See this internal Notion page for a diagram of the layer map:
// https://www.notion.so/neondatabase/Read-Path-Unit-Testing-Fuzzing-1d1f189e0047806c8e5cd37781b0a350?pvs=4
//
// A fuzzing mode is also supported. In this mode, the test will use a random
// seed instead of a hardcoded one. Use it in conjunction with `cargo stress`
// to run multiple instances in parallel:
//
// $ RUST_BACKTRACE=1 RUST_LOG=INFO \
// cargo stress --package=pageserver --features=testing,fuzz-read-path --release -- test_read_path
#[cfg(feature = "testing")]
#[tokio::test]
async fn test_read_path() -> anyhow::Result<()> {
use rand::seq::SliceRandom;
let seed = if cfg!(feature = "fuzz-read-path") {
let seed: u64 = thread_rng().r#gen();
seed
} else {
// Use a hard-coded seed when not in fuzzing mode.
// Note that with the current approach results are not reproducible
// accross platforms and Rust releases.
const SEED: u64 = 0;
SEED
};
let mut random = StdRng::seed_from_u64(seed);
let (queries, will_init_chance, gap_chance) = if cfg!(feature = "fuzz-read-path") {
const QUERIES: u64 = 5000;
let will_init_chance: u8 = random.gen_range(0..=10);
let gap_chance: u8 = random.gen_range(0..=50);
(QUERIES, will_init_chance, gap_chance)
} else {
const QUERIES: u64 = 1000;
const WILL_INIT_CHANCE: u8 = 1;
const GAP_CHANCE: u8 = 5;
(QUERIES, WILL_INIT_CHANCE, GAP_CHANCE)
};
let harness = TenantHarness::create("test_read_path").await?;
let (tenant, ctx) = harness.load().await;
tracing::info!("Using random seed: {seed}");
tracing::info!(%will_init_chance, %gap_chance, "Fill params");
// Define the layer map shape. Note that this part is not randomized.
const KEY_DIMENSION_SIZE: u32 = 99;
let start_key = Key::from_hex("110000000033333333444444445500000000").unwrap();
let end_key = start_key.add(KEY_DIMENSION_SIZE);
let total_key_range = start_key..end_key;
let total_key_range_size = end_key.to_i128() - start_key.to_i128();
let total_start_lsn = Lsn(104);
let last_record_lsn = Lsn(504);
assert!(total_key_range_size % 3 == 0);
let in_memory_layers_shape = vec![
(total_key_range.clone(), Lsn(304)..Lsn(400)),
(total_key_range.clone(), Lsn(400)..last_record_lsn),
];
let delta_layers_shape = vec![
(
start_key..(start_key.add((total_key_range_size / 3) as u32)),
Lsn(200)..Lsn(304),
),
(
(start_key.add((total_key_range_size / 3) as u32))
..(start_key.add((total_key_range_size * 2 / 3) as u32)),
Lsn(200)..Lsn(304),
),
(
(start_key.add((total_key_range_size * 2 / 3) as u32))
..(start_key.add(total_key_range_size as u32)),
Lsn(200)..Lsn(304),
),
];
let image_layers_shape = vec![
(
start_key.add((total_key_range_size * 2 / 3 - 10) as u32)
..start_key.add((total_key_range_size * 2 / 3 + 10) as u32),
Lsn(456),
),
(
start_key.add((total_key_range_size / 3 - 10) as u32)
..start_key.add((total_key_range_size / 3 + 10) as u32),
Lsn(256),
),
(total_key_range.clone(), total_start_lsn),
];
let specification = TestTimelineSpecification {
start_lsn: total_start_lsn,
last_record_lsn,
in_memory_layers_shape,
delta_layers_shape,
image_layers_shape,
gap_chance,
will_init_chance,
};
// Create and randomly fill in the layers according to the specification
let (tline, storage, interesting_lsns) = randomize_timeline(
&tenant,
TIMELINE_ID,
DEFAULT_PG_VERSION,
specification,
&mut random,
&ctx,
)
.await?;
// Now generate queries based on the interesting lsns that we've collected.
//
// While there's still room in the query, pick and interesting LSN and a random
// key. Then roll the dice to see if the next key should also be included in
// the query. When the roll fails, break the "batch" and pick another point in the
// (key, LSN) space.
const PICK_NEXT_CHANCE: u8 = 50;
for _ in 0..queries {
let query = {
let mut keyspaces_at_lsn: HashMap<Lsn, KeySpaceRandomAccum> = HashMap::default();
let mut used_keys: HashSet<Key> = HashSet::default();
while used_keys.len() < Timeline::MAX_GET_VECTORED_KEYS as usize {
let selected_lsn = interesting_lsns.choose(&mut random).expect("not empty");
let mut selected_key = start_key.add(random.gen_range(0..KEY_DIMENSION_SIZE));
while used_keys.len() < Timeline::MAX_GET_VECTORED_KEYS as usize {
if used_keys.contains(&selected_key)
|| selected_key >= start_key.add(KEY_DIMENSION_SIZE)
{
break;
}
keyspaces_at_lsn
.entry(*selected_lsn)
.or_default()
.add_key(selected_key);
used_keys.insert(selected_key);
let pick_next = random.gen_range(0..=100) <= PICK_NEXT_CHANCE;
if pick_next {
selected_key = selected_key.next();
} else {
break;
}
}
}
VersionedKeySpaceQuery::scattered(
keyspaces_at_lsn
.into_iter()
.map(|(lsn, acc)| (lsn, acc.to_keyspace()))
.collect(),
)
};
// Run the query and validate the results
let results = tline
.get_vectored(query.clone(), IoConcurrency::Sequential, &ctx)
.await;
let blobs = match results {
Ok(ok) => ok,
Err(err) => {
panic!("seed={seed} Error returned for query {query}: {err}");
}
};
for (key, key_res) in blobs.into_iter() {
match key_res {
Ok(blob) => {
let requested_at_lsn = query.map_key_to_lsn(&key);
let expected = storage.get(key, requested_at_lsn);
if blob != expected {
tracing::error!(
"seed={seed} Mismatch for {key}@{requested_at_lsn} from query: {query}"
);
}
assert_eq!(blob, expected);
}
Err(err) => {
let requested_at_lsn = query.map_key_to_lsn(&key);
panic!(
"seed={seed} Error returned for {key}@{requested_at_lsn} from query {query}: {err}"
);
}
}
}
}
Ok(())
}
fn sort_layer_key(k1: &PersistentLayerKey, k2: &PersistentLayerKey) -> std::cmp::Ordering {
(
k1.is_delta,

View File

@@ -22,6 +22,7 @@ use bytes::{BufMut, BytesMut};
use pageserver_api::models::ImageCompressionAlgorithm;
use tokio::io::AsyncWriteExt;
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
use tokio_util::sync::CancellationToken;
use tracing::warn;
use crate::context::RequestContext;
@@ -169,7 +170,13 @@ pub struct BlobWriter<const BUFFERED: bool> {
}
impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
pub fn new(
inner: VirtualFile,
start_offset: u64,
_gate: &utils::sync::gate::Gate,
_cancel: CancellationToken,
_ctx: &RequestContext,
) -> Self {
Self {
inner,
offset: start_offset,
@@ -432,12 +439,14 @@ pub(crate) mod tests {
) -> Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>), Error> {
let temp_dir = camino_tempfile::tempdir()?;
let pathbuf = temp_dir.path().join("file");
let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
// Write part (in block to drop the file)
let mut offsets = Vec::new();
{
let file = VirtualFile::create(pathbuf.as_path(), ctx).await?;
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0, &gate, cancel.clone(), ctx);
for blob in blobs.iter() {
let (_, res) = if compression {
let res = wtr

View File

@@ -10,6 +10,8 @@ use pageserver_api::models::AuxFilePolicy;
use pageserver_api::models::RelSizeMigration;
use pageserver_api::shard::ShardIndex;
use serde::{Deserialize, Serialize};
use serde_with::base64::Base64;
use serde_with::serde_as;
use utils::id::TimelineId;
use utils::lsn::Lsn;
@@ -114,6 +116,29 @@ pub struct IndexPart {
/// The timestamp when the timeline was marked invisible in synthetic size calculations.
#[serde(skip_serializing_if = "Option::is_none", default)]
pub(crate) marked_invisible_at: Option<NaiveDateTime>,
/// The encryption key used to encrypt the timeline layer files.
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub(crate) keys: Vec<EncryptionKey>,
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct KeyVersion(u32);
/// An identifier for an encryption key. The scope of the key is the timeline (TBD).
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct EncryptionKeyId {
version: KeyVersion,
generation: Generation,
}
#[serde_as]
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct EncryptionKey {
#[serde_as(as = "Base64")]
pub key: Vec<u8>,
pub id: EncryptionKeyId,
pub created_at: NaiveDateTime,
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
@@ -142,10 +167,12 @@ impl IndexPart {
/// - 12: +l2_lsn
/// - 13: +gc_compaction
/// - 14: +marked_invisible_at
const LATEST_VERSION: usize = 14;
/// - 15: +keys and encryption_key in layer_metadata
const LATEST_VERSION: usize = 15;
// Versions we may see when reading from a bucket.
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14];
pub const KNOWN_VERSIONS: &'static [usize] =
&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15];
pub const FILE_NAME: &'static str = "index_part.json";
@@ -165,6 +192,7 @@ impl IndexPart {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
}
}
@@ -222,7 +250,7 @@ impl IndexPart {
///
/// Fields have to be `Option`s because remote [`IndexPart`]'s can be from different version, which
/// might have less or more metadata depending if upgrading or rolling back an upgrade.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct LayerFileMetadata {
pub file_size: u64,
@@ -233,6 +261,9 @@ pub struct LayerFileMetadata {
#[serde(default = "ShardIndex::unsharded")]
#[serde(skip_serializing_if = "ShardIndex::is_unsharded")]
pub shard: ShardIndex,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub encryption_key: Option<EncryptionKeyId>,
}
impl LayerFileMetadata {
@@ -241,6 +272,7 @@ impl LayerFileMetadata {
file_size,
generation,
shard,
encryption_key: None,
}
}
/// Helper to get both generation and file size in a tuple
@@ -453,14 +485,16 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -475,6 +509,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -502,14 +537,16 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -524,6 +561,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -552,14 +590,16 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -574,6 +614,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -627,6 +668,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let empty_layers_parsed = IndexPart::from_json_bytes(empty_layers_json.as_bytes()).unwrap();
@@ -653,14 +695,16 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -675,6 +719,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -703,11 +748,13 @@ mod tests {
file_size: 23289856,
generation: Generation::new(1),
shard: ShardIndex::unsharded(),
encryption_key: None,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000014EF499-00000000015A7619".parse().unwrap(), LayerFileMetadata {
file_size: 1015808,
generation: Generation::new(1),
shard: ShardIndex::unsharded(),
encryption_key: None,
})
]),
disk_consistent_lsn: Lsn::from_str("0/15A7618").unwrap(),
@@ -726,6 +773,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -756,14 +804,16 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -782,6 +832,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -815,12 +866,14 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -843,6 +896,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -877,12 +931,14 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -905,6 +961,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -941,12 +998,14 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -972,6 +1031,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -1017,12 +1077,14 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -1052,6 +1114,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -1098,12 +1161,14 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -1133,6 +1198,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -1183,12 +1249,14 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -1220,6 +1288,7 @@ mod tests {
last_completed_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
}),
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -1271,12 +1340,14 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
shard: ShardIndex::unsharded(),
encryption_key: None,
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -1308,6 +1379,139 @@ mod tests {
last_completed_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
}),
marked_invisible_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}
#[test]
fn v15_keys_are_parsed() {
let example = r#"{
"version": 15,
"layer_metadata":{
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000, "encryption_key": { "version": 1, "generation": 5 } },
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001, "encryption_key": { "version": 2, "generation": 6 } }
},
"disk_consistent_lsn":"0/16960E8",
"metadata": {
"disk_consistent_lsn": "0/16960E8",
"prev_record_lsn": "0/1696070",
"ancestor_timeline": "e45a7f37d3ee2ff17dc14bf4f4e3f52e",
"ancestor_lsn": "0/0",
"latest_gc_cutoff_lsn": "0/1696070",
"initdb_lsn": "0/1696070",
"pg_version": 14
},
"gc_blocking": {
"started_at": "2024-07-19T09:00:00.123",
"reasons": ["DetachAncestor"]
},
"import_pgdata": {
"V1": {
"Done": {
"idempotency_key": "specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5",
"started_at": "2024-11-13T09:23:42.123",
"finished_at": "2024-11-13T09:42:23.123"
}
}
},
"rel_size_migration": "legacy",
"l2_lsn": "0/16960E8",
"gc_compaction": {
"last_completed_lsn": "0/16960E8"
},
"marked_invisible_at": "2023-07-31T09:00:00.123",
"keys": [
{
"key": "dGVzdF9rZXk=",
"id": {
"version": 1,
"generation": 5
},
"created_at": "2024-07-19T09:00:00.123"
},
{
"key": "dGVzdF9rZXlfMg==",
"id": {
"version": 2,
"generation": 6
},
"created_at": "2024-07-19T10:00:00.123"
}
]
}"#;
let expected = IndexPart {
version: 15,
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: Some(EncryptionKeyId {
version: KeyVersion(1),
generation: Generation::Valid(5),
}),
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: Some(EncryptionKeyId {
version: KeyVersion(2),
generation: Generation::Valid(6),
}),
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
metadata: TimelineMetadata::new(
Lsn::from_str("0/16960E8").unwrap(),
Some(Lsn::from_str("0/1696070").unwrap()),
Some(TimelineId::from_str("e45a7f37d3ee2ff17dc14bf4f4e3f52e").unwrap()),
Lsn::INVALID,
Lsn::from_str("0/1696070").unwrap(),
Lsn::from_str("0/1696070").unwrap(),
14,
).with_recalculated_checksum().unwrap(),
deleted_at: None,
lineage: Default::default(),
gc_blocking: Some(GcBlocking {
started_at: parse_naive_datetime("2024-07-19T09:00:00.123000000"),
reasons: enumset::EnumSet::from_iter([GcBlockingReason::DetachAncestor]),
}),
last_aux_file_policy: Default::default(),
archived_at: None,
import_pgdata: Some(import_pgdata::index_part_format::Root::V1(import_pgdata::index_part_format::V1::Done(import_pgdata::index_part_format::Done{
started_at: parse_naive_datetime("2024-11-13T09:23:42.123000000"),
finished_at: parse_naive_datetime("2024-11-13T09:42:23.123000000"),
idempotency_key: import_pgdata::index_part_format::IdempotencyKey::new("specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5".to_string()),
}))),
rel_size_migration: Some(RelSizeMigration::Legacy),
l2_lsn: Some("0/16960E8".parse::<Lsn>().unwrap()),
gc_compaction: Some(GcCompactionState {
last_completed_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
}),
marked_invisible_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
keys: vec![
EncryptionKey {
key: "test_key".as_bytes().to_vec(),
id: EncryptionKeyId {
version: KeyVersion(1),
generation: Generation::Valid(5),
},
created_at: parse_naive_datetime("2024-07-19T09:00:00.123000000"),
},
EncryptionKey {
key: "test_key_2".as_bytes().to_vec(),
id: EncryptionKeyId {
version: KeyVersion(2),
generation: Generation::Valid(6),
},
created_at: parse_naive_datetime("2024-07-19T10:00:00.123000000"),
}
],
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();

View File

@@ -5,6 +5,7 @@ use std::sync::Arc;
use bytes::Bytes;
use pageserver_api::key::{KEY_SIZE, Key};
use pageserver_api::value::Value;
use tokio_util::sync::CancellationToken;
use utils::id::TimelineId;
use utils::lsn::Lsn;
use utils::shard::TenantShardId;
@@ -179,7 +180,7 @@ impl BatchLayerWriter {
/// An image writer that takes images and produces multiple image layers.
#[must_use]
pub struct SplitImageLayerWriter {
pub struct SplitImageLayerWriter<'a> {
inner: ImageLayerWriter,
target_layer_size: u64,
lsn: Lsn,
@@ -188,9 +189,12 @@ pub struct SplitImageLayerWriter {
tenant_shard_id: TenantShardId,
batches: BatchLayerWriter,
start_key: Key,
gate: &'a utils::sync::gate::Gate,
cancel: CancellationToken,
}
impl SplitImageLayerWriter {
impl<'a> SplitImageLayerWriter<'a> {
#[allow(clippy::too_many_arguments)]
pub async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
@@ -198,6 +202,8 @@ impl SplitImageLayerWriter {
start_key: Key,
lsn: Lsn,
target_layer_size: u64,
gate: &'a utils::sync::gate::Gate,
cancel: CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
Ok(Self {
@@ -208,6 +214,8 @@ impl SplitImageLayerWriter {
tenant_shard_id,
&(start_key..Key::MAX),
lsn,
gate,
cancel.clone(),
ctx,
)
.await?,
@@ -217,6 +225,8 @@ impl SplitImageLayerWriter {
batches: BatchLayerWriter::new(conf).await?,
lsn,
start_key,
gate,
cancel,
})
}
@@ -239,6 +249,8 @@ impl SplitImageLayerWriter {
self.tenant_shard_id,
&(key..Key::MAX),
self.lsn,
self.gate,
self.cancel.clone(),
ctx,
)
.await?;
@@ -291,7 +303,7 @@ impl SplitImageLayerWriter {
/// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
/// will split them into multiple files based on size.
#[must_use]
pub struct SplitDeltaLayerWriter {
pub struct SplitDeltaLayerWriter<'a> {
inner: Option<(Key, DeltaLayerWriter)>,
target_layer_size: u64,
conf: &'static PageServerConf,
@@ -300,15 +312,19 @@ pub struct SplitDeltaLayerWriter {
lsn_range: Range<Lsn>,
last_key_written: Key,
batches: BatchLayerWriter,
gate: &'a utils::sync::gate::Gate,
cancel: CancellationToken,
}
impl SplitDeltaLayerWriter {
impl<'a> SplitDeltaLayerWriter<'a> {
pub async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
lsn_range: Range<Lsn>,
target_layer_size: u64,
gate: &'a utils::sync::gate::Gate,
cancel: CancellationToken,
) -> anyhow::Result<Self> {
Ok(Self {
target_layer_size,
@@ -319,6 +335,8 @@ impl SplitDeltaLayerWriter {
lsn_range,
last_key_written: Key::MIN,
batches: BatchLayerWriter::new(conf).await?,
gate,
cancel,
})
}
@@ -344,6 +362,8 @@ impl SplitDeltaLayerWriter {
self.tenant_shard_id,
key,
self.lsn_range.clone(),
self.gate,
self.cancel.clone(),
ctx,
)
.await?,
@@ -362,6 +382,8 @@ impl SplitDeltaLayerWriter {
self.tenant_shard_id,
key,
self.lsn_range.clone(),
self.gate,
self.cancel.clone(),
ctx,
)
.await?;
@@ -469,6 +491,8 @@ mod tests {
get_key(0),
Lsn(0x18),
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
&ctx,
)
.await
@@ -480,6 +504,8 @@ mod tests {
tenant.tenant_shard_id,
Lsn(0x18)..Lsn(0x20),
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
)
.await
.unwrap();
@@ -546,6 +572,8 @@ mod tests {
get_key(0),
Lsn(0x18),
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
&ctx,
)
.await
@@ -556,6 +584,8 @@ mod tests {
tenant.tenant_shard_id,
Lsn(0x18)..Lsn(0x20),
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
)
.await
.unwrap();
@@ -643,6 +673,8 @@ mod tests {
get_key(0),
Lsn(0x18),
4 * 1024,
&tline.gate,
tline.cancel.clone(),
&ctx,
)
.await
@@ -654,6 +686,8 @@ mod tests {
tenant.tenant_shard_id,
Lsn(0x18)..Lsn(0x20),
4 * 1024,
&tline.gate,
tline.cancel.clone(),
)
.await
.unwrap();
@@ -730,6 +764,8 @@ mod tests {
tenant.tenant_shard_id,
Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
)
.await
.unwrap();

View File

@@ -50,6 +50,7 @@ use rand::distributions::Alphanumeric;
use serde::{Deserialize, Serialize};
use tokio::sync::OnceCell;
use tokio_epoll_uring::IoBuf;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::bin_ser::BeSer;
use utils::id::{TenantId, TimelineId};
@@ -400,12 +401,15 @@ impl DeltaLayerWriterInner {
///
/// Start building a new delta layer.
///
#[allow(clippy::too_many_arguments)]
async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
key_start: Key,
lsn_range: Range<Lsn>,
gate: &utils::sync::gate::Gate,
cancel: CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
// Create the file initially with a temporary filename. We don't know
@@ -420,7 +424,7 @@ impl DeltaLayerWriterInner {
let mut file = VirtualFile::create(&path, ctx).await?;
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
// Initialize the b-tree index builder
let block_buf = BlockBuf::new();
@@ -628,12 +632,15 @@ impl DeltaLayerWriter {
///
/// Start building a new delta layer.
///
#[allow(clippy::too_many_arguments)]
pub async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
key_start: Key,
lsn_range: Range<Lsn>,
gate: &utils::sync::gate::Gate,
cancel: CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
Ok(Self {
@@ -644,6 +651,8 @@ impl DeltaLayerWriter {
tenant_shard_id,
key_start,
lsn_range,
gate,
cancel,
ctx,
)
.await?,
@@ -1885,6 +1894,8 @@ pub(crate) mod test {
harness.tenant_shard_id,
entries_meta.key_range.start,
entries_meta.lsn_range.clone(),
&timeline.gate,
timeline.cancel.clone(),
&ctx,
)
.await?;
@@ -2079,6 +2090,8 @@ pub(crate) mod test {
tenant.tenant_shard_id,
Key::MIN,
Lsn(0x11)..truncate_at,
&branch.gate,
branch.cancel.clone(),
ctx,
)
.await
@@ -2213,6 +2226,8 @@ pub(crate) mod test {
tenant.tenant_shard_id,
*key_start,
(*lsn_min)..lsn_end,
&tline.gate,
tline.cancel.clone(),
ctx,
)
.await?;

View File

@@ -48,6 +48,7 @@ use rand::distributions::Alphanumeric;
use serde::{Deserialize, Serialize};
use tokio::sync::OnceCell;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::bin_ser::BeSer;
use utils::id::{TenantId, TimelineId};
@@ -748,12 +749,15 @@ impl ImageLayerWriterInner {
///
/// Start building a new image layer.
///
#[allow(clippy::too_many_arguments)]
async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
key_range: &Range<Key>,
lsn: Lsn,
gate: &utils::sync::gate::Gate,
cancel: CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
// Create the file initially with a temporary filename.
@@ -780,7 +784,7 @@ impl ImageLayerWriterInner {
};
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
// Initialize the b-tree index builder
let block_buf = BlockBuf::new();
@@ -988,18 +992,30 @@ impl ImageLayerWriter {
///
/// Start building a new image layer.
///
#[allow(clippy::too_many_arguments)]
pub async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
key_range: &Range<Key>,
lsn: Lsn,
gate: &utils::sync::gate::Gate,
cancel: CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<ImageLayerWriter> {
Ok(Self {
inner: Some(
ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx)
.await?,
ImageLayerWriterInner::new(
conf,
timeline_id,
tenant_shard_id,
key_range,
lsn,
gate,
cancel,
ctx,
)
.await?,
),
})
}
@@ -1203,6 +1219,8 @@ mod test {
harness.tenant_shard_id,
&range,
lsn,
&timeline.gate,
timeline.cancel.clone(),
&ctx,
)
.await
@@ -1268,6 +1286,8 @@ mod test {
harness.tenant_shard_id,
&range,
lsn,
&timeline.gate,
timeline.cancel.clone(),
&ctx,
)
.await
@@ -1346,6 +1366,8 @@ mod test {
tenant.tenant_shard_id,
&key_range,
lsn,
&tline.gate,
tline.cancel.clone(),
ctx,
)
.await?;

View File

@@ -719,6 +719,8 @@ impl InMemoryLayer {
ctx: &RequestContext,
key_range: Option<Range<Key>>,
l0_flush_global_state: &l0_flush::Inner,
gate: &utils::sync::gate::Gate,
cancel: CancellationToken,
) -> Result<Option<(PersistentLayerDesc, Utf8PathBuf)>> {
// Grab the lock in read-mode. We hold it over the I/O, but because this
// layer is not writeable anymore, no one should be trying to acquire the
@@ -759,6 +761,8 @@ impl InMemoryLayer {
self.tenant_shard_id,
Key::MIN,
self.start_lsn..end_lsn,
gate,
cancel,
ctx,
)
.await?;

View File

@@ -4026,7 +4026,7 @@ impl VersionedKeySpaceQuery {
/// Returns LSN for a specific key.
///
/// Invariant: requested key must be part of [`Self::total_keyspace`]
fn map_key_to_lsn(&self, key: &Key) -> Lsn {
pub(super) fn map_key_to_lsn(&self, key: &Key) -> Lsn {
match self {
Self::Uniform { lsn, .. } => *lsn,
Self::Scattered { keyspaces_at_lsn } => {
@@ -4986,7 +4986,13 @@ impl Timeline {
let ctx = ctx.attached_child();
let work = async move {
let Some((desc, path)) = frozen_layer
.write_to_disk(&ctx, key_range, self_clone.l0_flush_global_state.inner())
.write_to_disk(
&ctx,
key_range,
self_clone.l0_flush_global_state.inner(),
&self_clone.gate,
self_clone.cancel.clone(),
)
.await?
else {
return Ok(None);
@@ -5526,6 +5532,8 @@ impl Timeline {
self.tenant_shard_id,
&img_range,
lsn,
&self.gate,
self.cancel.clone(),
ctx,
)
.await?;
@@ -6890,6 +6898,8 @@ impl Timeline {
self.tenant_shard_id,
&(min_key..end_key),
lsn,
&self.gate,
self.cancel.clone(),
ctx,
)
.await?;
@@ -6951,6 +6961,8 @@ impl Timeline {
self.tenant_shard_id,
deltas.key_range.start,
deltas.lsn_range,
&self.gate,
self.cancel.clone(),
ctx,
)
.await?;

View File

@@ -749,8 +749,8 @@ impl KeyHistoryRetention {
async fn pipe_to(
self,
key: Key,
delta_writer: &mut SplitDeltaLayerWriter,
mut image_writer: Option<&mut SplitImageLayerWriter>,
delta_writer: &mut SplitDeltaLayerWriter<'_>,
mut image_writer: Option<&mut SplitImageLayerWriter<'_>>,
stat: &mut CompactionStatistics,
ctx: &RequestContext,
) -> anyhow::Result<()> {
@@ -1394,6 +1394,8 @@ impl Timeline {
self.tenant_shard_id,
&layer.layer_desc().key_range,
layer.layer_desc().image_layer_lsn(),
&self.gate,
self.cancel.clone(),
ctx,
)
.await
@@ -2033,6 +2035,8 @@ impl Timeline {
debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
lsn_range.clone()
},
&self.gate,
self.cancel.clone(),
ctx,
)
.await
@@ -3232,6 +3236,8 @@ impl Timeline {
job_desc.compaction_key_range.start,
lowest_retain_lsn,
self.get_compaction_target_size(),
&self.gate,
self.cancel.clone(),
ctx,
)
.await
@@ -3248,6 +3254,8 @@ impl Timeline {
self.tenant_shard_id,
lowest_retain_lsn..end_lsn,
self.get_compaction_target_size(),
&self.gate,
self.cancel.clone(),
)
.await
.context("failed to create delta layer writer")
@@ -3344,6 +3352,8 @@ impl Timeline {
self.tenant_shard_id,
desc.key_range.start,
desc.lsn_range.clone(),
&self.gate,
self.cancel.clone(),
ctx,
)
.await
@@ -3361,6 +3371,8 @@ impl Timeline {
self.tenant_shard_id,
job_desc.compaction_key_range.end,
desc.lsn_range.clone(),
&self.gate,
self.cancel.clone(),
ctx,
)
.await
@@ -3932,6 +3944,8 @@ impl CompactionJobExecutor for TimelineAdaptor {
self.timeline.tenant_shard_id,
key_range.start,
lsn_range.clone(),
&self.timeline.gate,
self.timeline.cancel.clone(),
ctx,
)
.await?;
@@ -4007,6 +4021,8 @@ impl TimelineAdaptor {
self.timeline.tenant_shard_id,
key_range,
lsn,
&self.timeline.gate,
self.timeline.cancel.clone(),
ctx,
)
.await?;

View File

@@ -228,6 +228,8 @@ async fn generate_tombstone_image_layer(
detached.tenant_shard_id,
&key_range,
image_lsn,
&detached.gate,
detached.cancel.clone(),
ctx,
)
.await
@@ -776,6 +778,8 @@ async fn copy_lsn_prefix(
target_timeline.tenant_shard_id,
layer.layer_desc().key_range.start,
layer.layer_desc().lsn_range.start..end_lsn,
&target_timeline.gate,
target_timeline.cancel.clone(),
ctx,
)
.await

View File

@@ -738,6 +738,8 @@ impl ChunkProcessingJob {
self.timeline.tenant_shard_id,
&self.range,
self.pgdata_lsn,
&self.timeline.gate,
self.timeline.cancel.clone(),
ctx,
)
.await?;

View File

@@ -641,6 +641,7 @@ mod tests {
generation: timeline.generation,
shard: timeline.get_shard_index(),
file_size: size as u64,
encryption_key: None,
};
make_layer_with_metadata(timeline, name, metadata)
}
@@ -1378,6 +1379,7 @@ mod tests {
shard,
generation: Generation::Valid(generation),
file_size: 0,
encryption_key: None,
};
make_layer_with_metadata(&tli, name, metadata)
};

View File

@@ -2118,9 +2118,6 @@ HandleSafekeeperResponse(WalProposer *wp, Safekeeper *fromsk)
*/
if (wp->config->syncSafekeepers)
{
int n_synced;
n_synced = 0;
for (int i = 0; i < wp->n_safekeepers; i++)
{
Safekeeper *sk = &wp->safekeeper[i];
@@ -2129,8 +2126,6 @@ HandleSafekeeperResponse(WalProposer *wp, Safekeeper *fromsk)
/* alive safekeeper which is not synced yet; wait for it */
if (sk->state != SS_OFFLINE && !synced)
return;
if (synced)
n_synced++;
}
if (newCommitLsn >= wp->propTermStartLsn)

View File

@@ -629,15 +629,13 @@ impl ComputeHook {
};
let result = if !self.config.use_local_compute_notifications {
let compute_hook_url = if let Some(control_plane_url) = &self.config.control_plane_url {
Some(if control_plane_url.ends_with('/') {
format!("{control_plane_url}notify-attach")
} else {
format!("{control_plane_url}/notify-attach")
})
} else {
self.config.compute_hook_url.clone()
};
let compute_hook_url =
self.config
.control_plane_url
.as_ref()
.map(|control_plane_url| {
format!("{}/notify-attach", control_plane_url.trim_end_matches('/'))
});
// We validate this at startup
let notify_url = compute_hook_url.as_ref().unwrap();

View File

@@ -86,10 +86,6 @@ struct Cli {
#[arg(long)]
peer_jwt_token: Option<String>,
/// URL to control plane compute notification endpoint
#[arg(long)]
compute_hook_url: Option<String>,
/// URL to control plane storage API prefix
#[arg(long)]
control_plane_url: Option<String>,
@@ -360,13 +356,11 @@ async fn async_main() -> anyhow::Result<()> {
"Insecure config! One or more secrets is not set. This is only permitted in `--dev` mode"
);
}
StrictMode::Strict
if args.compute_hook_url.is_none() && args.control_plane_url.is_none() =>
{
StrictMode::Strict if args.control_plane_url.is_none() => {
// Production systems should always have a control plane URL set, to prevent falling
// back to trying to use neon_local.
anyhow::bail!(
"neither `--compute-hook-url` nor `--control-plane-url` are set: this is only permitted in `--dev` mode"
"`--control-plane-url` is not set: this is only permitted in `--dev` mode"
);
}
StrictMode::Strict if args.use_local_compute_notifications => {
@@ -394,7 +388,6 @@ async fn async_main() -> anyhow::Result<()> {
safekeeper_jwt_token: secrets.safekeeper_jwt_token,
control_plane_jwt_token: secrets.control_plane_jwt_token,
peer_jwt_token: secrets.peer_jwt_token,
compute_hook_url: args.compute_hook_url,
control_plane_url: args.control_plane_url,
max_offline_interval: args
.max_offline_interval

View File

@@ -357,18 +357,10 @@ pub struct Config {
// This JWT token will be used to authenticate with other storage controller instances
pub peer_jwt_token: Option<String>,
/// Where the compute hook should send notifications of pageserver attachment locations
/// (this URL points to the control plane in prod). If this is None, the compute hook will
/// assume it is running in a test environment and try to update neon_local.
pub compute_hook_url: Option<String>,
/// Prefix for storage API endpoints of the control plane. We use this prefix to compute
/// URLs that we use to send pageserver and safekeeper attachment locations.
/// If this is None, the compute hook will assume it is running in a test environment
/// and try to invoke neon_local instead.
///
/// For now, there is also `compute_hook_url` which allows configuration of the pageserver
/// specific endpoint, but it is in the process of being phased out.
pub control_plane_url: Option<String>,
/// Grace period within which a pageserver does not respond to heartbeats, but is still

View File

@@ -194,6 +194,7 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
counter("pageserver_wait_lsn_started_count"),
counter("pageserver_wait_lsn_finished_count"),
counter("pageserver_wait_ondemand_download_seconds_sum"),
counter("pageserver_page_service_batch_break_reason"),
*histogram("pageserver_page_service_batch_size"),
*histogram("pageserver_page_service_pagestream_batch_wait_time_seconds"),
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,

View File

@@ -947,6 +947,8 @@ class NeonEnvBuilder:
continue
if SMALL_DB_FILE_NAME_REGEX.fullmatch(test_file.name):
continue
if FINAL_METRICS_FILE_NAME == test_file.name:
continue
log.debug(f"Removing large database {test_file} file")
test_file.unlink()
elif test_entry.is_dir():
@@ -1322,10 +1324,6 @@ class NeonEnv:
log.info("test may use old binaries, ignoring warnings about unknown config items")
ps.allowed_errors.append(".*ignoring unknown configuration item.*")
# Allow old software to start until https://github.com/neondatabase/neon/pull/11275
# lands in the compatiblity snapshot.
ps_cfg["page_service_pipelining"].pop("batching")
self.pageservers.append(ps)
cfg["pageservers"].append(ps_cfg)
@@ -1461,6 +1459,12 @@ class NeonEnv:
except Exception as e:
metric_errors.append(e)
log.error(f"metric validation failed on {pageserver.id}: {e}")
try:
pageserver.snapshot_final_metrics()
except Exception as e:
log.error(f"metric snapshot failed on {pageserver.id}: {e}")
try:
pageserver.stop(immediate=immediate)
except RuntimeError:
@@ -2976,6 +2980,20 @@ class NeonPageserver(PgProtocol, LogUtils):
value = self.http_client().get_metric_value(metric)
assert value == 0, f"Nonzero {metric} == {value}"
def snapshot_final_metrics(self):
"""
Take a snapshot of this pageserver's metrics and stash in its work directory.
"""
if not self.running:
log.info(f"Skipping metrics snapshot on pageserver {self.id}, it is not running")
return
metrics = self.http_client().get_metrics_str()
metrics_snapshot_path = self.workdir / FINAL_METRICS_FILE_NAME
with open(metrics_snapshot_path, "w") as f:
f.write(metrics)
def tenant_attach(
self,
tenant_id: TenantId,
@@ -5138,6 +5156,8 @@ SMALL_DB_FILE_NAME_REGEX: re.Pattern[str] = re.compile(
r"config-v1|heatmap-v1|tenant-manifest|metadata|.+\.(?:toml|pid|json|sql|conf)"
)
FINAL_METRICS_FILE_NAME: str = "final_metrics.txt"
SKIP_DIRS = frozenset(
(

View File

@@ -1,7 +1,6 @@
import concurrent.futures
import dataclasses
import json
import re
import threading
import time
from dataclasses import dataclass
@@ -170,6 +169,7 @@ def test_throughput(
time: float
pageserver_batch_size_histo_sum: float
pageserver_batch_size_histo_count: float
pageserver_batch_breaks_reason_count: dict[str, int]
compute_getpage_count: float
pageserver_cpu_seconds_total: float
@@ -183,6 +183,10 @@ def test_throughput(
compute_getpage_count=self.compute_getpage_count - other.compute_getpage_count,
pageserver_cpu_seconds_total=self.pageserver_cpu_seconds_total
- other.pageserver_cpu_seconds_total,
pageserver_batch_breaks_reason_count={
reason: count - other.pageserver_batch_breaks_reason_count.get(reason, 0)
for reason, count in self.pageserver_batch_breaks_reason_count.items()
},
)
def normalize(self, by) -> "Metrics":
@@ -192,6 +196,10 @@ def test_throughput(
pageserver_batch_size_histo_count=self.pageserver_batch_size_histo_count / by,
compute_getpage_count=self.compute_getpage_count / by,
pageserver_cpu_seconds_total=self.pageserver_cpu_seconds_total / by,
pageserver_batch_breaks_reason_count={
reason: count / by
for reason, count in self.pageserver_batch_breaks_reason_count.items()
},
)
def get_metrics() -> Metrics:
@@ -201,6 +209,20 @@ def test_throughput(
)
compute_getpage_count = cur.fetchall()[0][0]
pageserver_metrics = ps_http.get_metrics()
for name, samples in pageserver_metrics.metrics.items():
for sample in samples:
log.info(f"{name=} labels={sample.labels} {sample.value}")
raw_batch_break_reason_count = pageserver_metrics.query_all(
"pageserver_page_service_batch_break_reason_total",
filter={"timeline_id": str(env.initial_timeline)},
)
batch_break_reason_count = {
sample.labels["reason"]: int(sample.value)
for sample in raw_batch_break_reason_count
}
return Metrics(
time=time.time(),
pageserver_batch_size_histo_sum=pageserver_metrics.query_one(
@@ -209,6 +231,7 @@ def test_throughput(
pageserver_batch_size_histo_count=pageserver_metrics.query_one(
"pageserver_page_service_batch_size_count"
).value,
pageserver_batch_breaks_reason_count=batch_break_reason_count,
compute_getpage_count=compute_getpage_count,
pageserver_cpu_seconds_total=pageserver_metrics.query_one(
"libmetrics_process_cpu_seconds_highres"
@@ -263,25 +286,6 @@ def test_throughput(
log.info("Results: %s", metrics)
since_last_start: list[str] = []
for line in env.pageserver.logfile.read_text().splitlines():
if "git:" in line:
since_last_start = []
since_last_start.append(line)
stopping_batching_because_re = re.compile(
r"stopping batching because (LSN changed|of batch size|timeline object mismatch|batch key changed|same page was requested at different LSNs|.*)"
)
reasons_for_stopping_batching = {}
for line in since_last_start:
match = stopping_batching_because_re.search(line)
if match:
if match.group(1) not in reasons_for_stopping_batching:
reasons_for_stopping_batching[match.group(1)] = 0
reasons_for_stopping_batching[match.group(1)] += 1
log.info("Reasons for stopping batching: %s", reasons_for_stopping_batching)
#
# Sanity-checks on the collected data
#
@@ -295,7 +299,16 @@ def test_throughput(
#
for metric, value in dataclasses.asdict(metrics).items():
zenbenchmark.record(f"counters.{metric}", value, unit="", report=MetricReport.TEST_PARAM)
if metric == "pageserver_batch_breaks_reason_count":
assert isinstance(value, dict)
for reason, count in value.items():
zenbenchmark.record(
f"counters.{metric}_{reason}", count, unit="", report=MetricReport.TEST_PARAM
)
else:
zenbenchmark.record(
f"counters.{metric}", value, unit="", report=MetricReport.TEST_PARAM
)
zenbenchmark.record(
"perfmetric.batching_factor",

View File

@@ -64,8 +64,8 @@ def test_ro_replica_lag(
project = neon_api.create_project(pg_version)
project_id = project["project"]["id"]
log.info("Project ID: {}", project_id)
log.info("Primary endpoint ID: {}", project["project"]["endpoints"][0]["id"])
log.info("Project ID: %s", project_id)
log.info("Primary endpoint ID: %s", project["project"]["endpoints"][0]["id"])
neon_api.wait_for_operation_to_finish(project_id)
error_occurred = False
try:
@@ -81,7 +81,7 @@ def test_ro_replica_lag(
endpoint_type="read_only",
settings={"pg_settings": {"hot_standby_feedback": "on"}},
)
log.info("Replica endpoint ID: {}", replica["endpoint"]["id"])
log.info("Replica endpoint ID: %s", replica["endpoint"]["id"])
replica_env = master_env.copy()
replica_env["PGHOST"] = replica["endpoint"]["host"]
neon_api.wait_for_operation_to_finish(project_id)
@@ -197,8 +197,8 @@ def test_replication_start_stop(
project = neon_api.create_project(pg_version)
project_id = project["project"]["id"]
log.info("Project ID: {}", project_id)
log.info("Primary endpoint ID: {}", project["project"]["endpoints"][0]["id"])
log.info("Project ID: %s", project_id)
log.info("Primary endpoint ID: %s", project["project"]["endpoints"][0]["id"])
neon_api.wait_for_operation_to_finish(project_id)
try:
branch_id = project["branch"]["id"]
@@ -215,7 +215,7 @@ def test_replication_start_stop(
endpoint_type="read_only",
settings={"pg_settings": {"hot_standby_feedback": "on"}},
)
log.info("Replica {} endpoint ID: {}", i + 1, replica["endpoint"]["id"])
log.info("Replica %d endpoint ID: %s", i + 1, replica["endpoint"]["id"])
replicas.append(replica)
neon_api.wait_for_operation_to_finish(project_id)

View File

@@ -390,6 +390,7 @@ def test_create_churn_during_restart(neon_env_builder: NeonEnvBuilder):
# Tenant creation requests which arrive out of order will generate complaints about
# generation nubmers out of order.
env.pageserver.allowed_errors.append(".*Generation .+ is less than existing .+")
env.pageserver.allowed_errors.append(".*due to stale generation.+")
# Timeline::flush_and_shutdown cannot tell if it is hitting a failure because of
# an incomplete attach, or some other problem. In the field this should be rare,