mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-23 08:00:37 +00:00
Compare commits
13 Commits
skyzh/feat
...
skyzh/keys
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aba9eb3944 | ||
|
|
e43d5c83bc | ||
|
|
53aadcefa0 | ||
|
|
92af8aeeb3 | ||
|
|
8cce27bedb | ||
|
|
90b706cd96 | ||
|
|
057ce115de | ||
|
|
e85607eed8 | ||
|
|
437071888e | ||
|
|
148b3701cf | ||
|
|
daebe50e19 | ||
|
|
e0ee6fbeff | ||
|
|
307fa2ceb7 |
@@ -10,6 +10,8 @@ default = []
|
||||
# which adds some runtime cost to run tests on outage conditions
|
||||
testing = ["fail/failpoints", "pageserver_api/testing", "wal_decoder/testing", "pageserver_client/testing"]
|
||||
|
||||
fuzz-read-path = ["testing"]
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
arc-swap.workspace = true
|
||||
|
||||
@@ -126,7 +126,7 @@ async fn ingest(
|
||||
max_concurrency: NonZeroUsize::new(1).unwrap(),
|
||||
});
|
||||
let (_desc, path) = layer
|
||||
.write_to_disk(&ctx, None, l0_flush_state.inner())
|
||||
.write_to_disk(&ctx, None, l0_flush_state.inner(), &gate, cancel.clone())
|
||||
.await?
|
||||
.unwrap();
|
||||
tokio::fs::remove_file(path).await?;
|
||||
|
||||
@@ -45,6 +45,7 @@ fn bench_upload_queue_next_ready(c: &mut Criterion) {
|
||||
shard: ShardIndex::new(ShardNumber(1), ShardCount(2)),
|
||||
generation: Generation::Valid(1),
|
||||
file_size: 0,
|
||||
encryption_key: None,
|
||||
};
|
||||
|
||||
// Construct the (initial and uploaded) index with layer0.
|
||||
|
||||
@@ -1714,6 +1714,28 @@ pub enum SmgrQueryType {
|
||||
Test,
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Debug,
|
||||
Clone,
|
||||
Copy,
|
||||
IntoStaticStr,
|
||||
strum_macros::EnumCount,
|
||||
strum_macros::EnumIter,
|
||||
strum_macros::FromRepr,
|
||||
enum_map::Enum,
|
||||
)]
|
||||
#[strum(serialize_all = "snake_case")]
|
||||
pub enum GetPageBatchBreakReason {
|
||||
BatchFull,
|
||||
NonBatchableRequest,
|
||||
NonUniformLsn,
|
||||
SamePageAtDifferentLsn,
|
||||
NonUniformTimeline,
|
||||
ExecutorSteal,
|
||||
#[cfg(feature = "testing")]
|
||||
NonUniformKey,
|
||||
}
|
||||
|
||||
pub(crate) struct SmgrQueryTimePerTimeline {
|
||||
global_started: [IntCounter; SmgrQueryType::COUNT],
|
||||
global_latency: [Histogram; SmgrQueryType::COUNT],
|
||||
@@ -1725,6 +1747,8 @@ pub(crate) struct SmgrQueryTimePerTimeline {
|
||||
per_timeline_flush_in_progress_micros: IntCounter,
|
||||
global_batch_wait_time: Histogram,
|
||||
per_timeline_batch_wait_time: Histogram,
|
||||
global_batch_break_reason: [IntCounter; GetPageBatchBreakReason::COUNT],
|
||||
per_timeline_batch_break_reason: GetPageBatchBreakReasonTimelineMetrics,
|
||||
throttling: Arc<tenant_throttling::Pagestream>,
|
||||
}
|
||||
|
||||
@@ -1858,6 +1882,49 @@ static PAGE_SERVICE_BATCH_SIZE_PER_TENANT_TIMELINE: Lazy<HistogramVec> = Lazy::n
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static PAGE_SERVICE_BATCH_BREAK_REASON_GLOBAL: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
// it's a counter, but, name is prepared to extend it to a histogram of queue depth
|
||||
"pageserver_page_service_batch_break_reason_global",
|
||||
"Reason for breaking batches of get page requests",
|
||||
&["reason"],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
struct GetPageBatchBreakReasonTimelineMetrics {
|
||||
map: EnumMap<GetPageBatchBreakReason, IntCounter>,
|
||||
}
|
||||
|
||||
impl GetPageBatchBreakReasonTimelineMetrics {
|
||||
fn new(tenant_id: &str, shard_slug: &str, timeline_id: &str) -> Self {
|
||||
GetPageBatchBreakReasonTimelineMetrics {
|
||||
map: EnumMap::from_array(std::array::from_fn(|reason_idx| {
|
||||
let reason = GetPageBatchBreakReason::from_usize(reason_idx);
|
||||
PAGE_SERVICE_BATCH_BREAK_REASON_PER_TENANT_TIMELINE.with_label_values(&[
|
||||
tenant_id,
|
||||
shard_slug,
|
||||
timeline_id,
|
||||
reason.into(),
|
||||
])
|
||||
})),
|
||||
}
|
||||
}
|
||||
|
||||
fn inc(&self, reason: GetPageBatchBreakReason) {
|
||||
self.map[reason].inc()
|
||||
}
|
||||
}
|
||||
|
||||
static PAGE_SERVICE_BATCH_BREAK_REASON_PER_TENANT_TIMELINE: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_page_service_batch_break_reason",
|
||||
"Reason for breaking batches of get page requests",
|
||||
&["tenant_id", "shard_id", "timeline_id", "reason"],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static PAGE_SERVICE_CONFIG_MAX_BATCH_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"pageserver_page_service_config_max_batch_size",
|
||||
@@ -1985,6 +2052,15 @@ impl SmgrQueryTimePerTimeline {
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id])
|
||||
.unwrap();
|
||||
|
||||
let global_batch_break_reason = std::array::from_fn(|i| {
|
||||
let reason = GetPageBatchBreakReason::from_usize(i);
|
||||
PAGE_SERVICE_BATCH_BREAK_REASON_GLOBAL
|
||||
.get_metric_with_label_values(&[reason.into()])
|
||||
.unwrap()
|
||||
});
|
||||
let per_timeline_batch_break_reason =
|
||||
GetPageBatchBreakReasonTimelineMetrics::new(&tenant_id, &shard_slug, &timeline_id);
|
||||
|
||||
let global_flush_in_progress_micros =
|
||||
PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL.clone();
|
||||
let per_timeline_flush_in_progress_micros = PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS
|
||||
@@ -2002,6 +2078,8 @@ impl SmgrQueryTimePerTimeline {
|
||||
per_timeline_flush_in_progress_micros,
|
||||
global_batch_wait_time,
|
||||
per_timeline_batch_wait_time,
|
||||
global_batch_break_reason,
|
||||
per_timeline_batch_break_reason,
|
||||
throttling: pagestream_throttle_metrics,
|
||||
}
|
||||
}
|
||||
@@ -2030,9 +2108,16 @@ impl SmgrQueryTimePerTimeline {
|
||||
}
|
||||
|
||||
/// TODO: do something about this? seems odd, we have a similar call on SmgrOpTimer
|
||||
pub(crate) fn observe_getpage_batch_start(&self, batch_size: usize) {
|
||||
pub(crate) fn observe_getpage_batch_start(
|
||||
&self,
|
||||
batch_size: usize,
|
||||
break_reason: GetPageBatchBreakReason,
|
||||
) {
|
||||
self.global_batch_size.observe(batch_size as f64);
|
||||
self.per_timeline_batch_size.observe(batch_size as f64);
|
||||
|
||||
self.global_batch_break_reason[break_reason.into_usize()].inc();
|
||||
self.per_timeline_batch_break_reason.inc(break_reason);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3398,6 +3483,15 @@ impl TimelineMetrics {
|
||||
shard_id,
|
||||
timeline_id,
|
||||
]);
|
||||
|
||||
for reason in GetPageBatchBreakReason::iter() {
|
||||
let _ = PAGE_SERVICE_BATCH_BREAK_REASON_PER_TENANT_TIMELINE.remove_label_values(&[
|
||||
tenant_id,
|
||||
shard_id,
|
||||
timeline_id,
|
||||
reason.into(),
|
||||
]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4276,6 +4370,7 @@ pub fn preinitialize_metrics(
|
||||
[
|
||||
&BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT,
|
||||
&SMGR_QUERY_STARTED_GLOBAL,
|
||||
&PAGE_SERVICE_BATCH_BREAK_REASON_GLOBAL,
|
||||
]
|
||||
.into_iter()
|
||||
.for_each(|c| {
|
||||
|
||||
@@ -58,8 +58,8 @@ use crate::context::{
|
||||
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
use crate::metrics::{
|
||||
self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, LIVE_CONNECTIONS, SmgrOpTimer,
|
||||
TimelineMetrics,
|
||||
self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS,
|
||||
SmgrOpTimer, TimelineMetrics,
|
||||
};
|
||||
use crate::pgdatadir_mapping::Version;
|
||||
use crate::span::{
|
||||
@@ -672,6 +672,7 @@ enum BatchedFeMessage {
|
||||
span: Span,
|
||||
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
|
||||
pages: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
|
||||
batch_break_reason: GetPageBatchBreakReason,
|
||||
},
|
||||
DbSize {
|
||||
span: Span,
|
||||
@@ -724,6 +725,119 @@ impl BatchedFeMessage {
|
||||
BatchedFeMessage::RespondError { .. } => {}
|
||||
}
|
||||
}
|
||||
|
||||
fn should_break_batch(
|
||||
&self,
|
||||
other: &BatchedFeMessage,
|
||||
max_batch_size: NonZeroUsize,
|
||||
batching_strategy: PageServiceProtocolPipelinedBatchingStrategy,
|
||||
) -> Option<GetPageBatchBreakReason> {
|
||||
match (self, other) {
|
||||
(
|
||||
BatchedFeMessage::GetPage {
|
||||
shard: accum_shard,
|
||||
pages: accum_pages,
|
||||
..
|
||||
},
|
||||
BatchedFeMessage::GetPage {
|
||||
shard: this_shard,
|
||||
pages: this_pages,
|
||||
..
|
||||
},
|
||||
) => {
|
||||
assert_eq!(this_pages.len(), 1);
|
||||
if accum_pages.len() >= max_batch_size.get() {
|
||||
trace!(%max_batch_size, "stopping batching because of batch size");
|
||||
assert_eq!(accum_pages.len(), max_batch_size.get());
|
||||
|
||||
return Some(GetPageBatchBreakReason::BatchFull);
|
||||
}
|
||||
if !accum_shard.is_same_handle_as(this_shard) {
|
||||
trace!("stopping batching because timeline object mismatch");
|
||||
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
|
||||
// But the current logic for keeping responses in order does not support that.
|
||||
|
||||
return Some(GetPageBatchBreakReason::NonUniformTimeline);
|
||||
}
|
||||
|
||||
match batching_strategy {
|
||||
PageServiceProtocolPipelinedBatchingStrategy::UniformLsn => {
|
||||
if let Some(last_in_batch) = accum_pages.last() {
|
||||
if last_in_batch.effective_request_lsn
|
||||
!= this_pages[0].effective_request_lsn
|
||||
{
|
||||
trace!(
|
||||
accum_lsn = %last_in_batch.effective_request_lsn,
|
||||
this_lsn = %this_pages[0].effective_request_lsn,
|
||||
"stopping batching because LSN changed"
|
||||
);
|
||||
|
||||
return Some(GetPageBatchBreakReason::NonUniformLsn);
|
||||
}
|
||||
}
|
||||
}
|
||||
PageServiceProtocolPipelinedBatchingStrategy::ScatteredLsn => {
|
||||
// The read path doesn't curently support serving the same page at different LSNs.
|
||||
// While technically possible, it's uncertain if the complexity is worth it.
|
||||
// Break the batch if such a case is encountered.
|
||||
let same_page_different_lsn = accum_pages.iter().any(|batched| {
|
||||
batched.req.rel == this_pages[0].req.rel
|
||||
&& batched.req.blkno == this_pages[0].req.blkno
|
||||
&& batched.effective_request_lsn
|
||||
!= this_pages[0].effective_request_lsn
|
||||
});
|
||||
|
||||
if same_page_different_lsn {
|
||||
trace!(
|
||||
rel=%this_pages[0].req.rel,
|
||||
blkno=%this_pages[0].req.blkno,
|
||||
lsn=%this_pages[0].effective_request_lsn,
|
||||
"stopping batching because same page was requested at different LSNs"
|
||||
);
|
||||
|
||||
return Some(GetPageBatchBreakReason::SamePageAtDifferentLsn);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
(
|
||||
BatchedFeMessage::Test {
|
||||
shard: accum_shard,
|
||||
requests: accum_requests,
|
||||
..
|
||||
},
|
||||
BatchedFeMessage::Test {
|
||||
shard: this_shard,
|
||||
requests: this_requests,
|
||||
..
|
||||
},
|
||||
) => {
|
||||
assert!(this_requests.len() == 1);
|
||||
if accum_requests.len() >= max_batch_size.get() {
|
||||
trace!(%max_batch_size, "stopping batching because of batch size");
|
||||
assert_eq!(accum_requests.len(), max_batch_size.get());
|
||||
return Some(GetPageBatchBreakReason::BatchFull);
|
||||
}
|
||||
if !accum_shard.is_same_handle_as(this_shard) {
|
||||
trace!("stopping batching because timeline object mismatch");
|
||||
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
|
||||
// But the current logic for keeping responses in order does not support that.
|
||||
return Some(GetPageBatchBreakReason::NonUniformTimeline);
|
||||
}
|
||||
let this_batch_key = this_requests[0].req.batch_key;
|
||||
let accum_batch_key = accum_requests[0].req.batch_key;
|
||||
if this_requests[0].req.batch_key != accum_requests[0].req.batch_key {
|
||||
trace!(%accum_batch_key, %this_batch_key, "stopping batching because batch key changed");
|
||||
return Some(GetPageBatchBreakReason::NonUniformKey);
|
||||
}
|
||||
None
|
||||
}
|
||||
(_, _) => Some(GetPageBatchBreakReason::NonBatchableRequest),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PageServerHandler {
|
||||
@@ -1047,6 +1161,10 @@ impl PageServerHandler {
|
||||
effective_request_lsn,
|
||||
ctx,
|
||||
}],
|
||||
// The executor grabs the batch when it becomes idle.
|
||||
// Hence, [`GetPageBatchBreakReason::ExecutorSteal`] is the
|
||||
// default reason for breaking the batch.
|
||||
batch_break_reason: GetPageBatchBreakReason::ExecutorSteal,
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -1084,118 +1202,59 @@ impl PageServerHandler {
|
||||
Err(e) => return Err(Err(e)),
|
||||
};
|
||||
|
||||
match (&mut *batch, this_msg) {
|
||||
// something batched already, let's see if we can add this message to the batch
|
||||
(
|
||||
Ok(BatchedFeMessage::GetPage {
|
||||
span: _,
|
||||
shard: accum_shard,
|
||||
pages: accum_pages,
|
||||
}),
|
||||
BatchedFeMessage::GetPage {
|
||||
span: _,
|
||||
shard: this_shard,
|
||||
pages: this_pages,
|
||||
},
|
||||
) if (|| {
|
||||
assert_eq!(this_pages.len(), 1);
|
||||
if accum_pages.len() >= max_batch_size.get() {
|
||||
trace!(%max_batch_size, "stopping batching because of batch size");
|
||||
assert_eq!(accum_pages.len(), max_batch_size.get());
|
||||
return false;
|
||||
}
|
||||
if !accum_shard.is_same_handle_as(&this_shard) {
|
||||
trace!("stopping batching because timeline object mismatch");
|
||||
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
|
||||
// But the current logic for keeping responses in order does not support that.
|
||||
return false;
|
||||
}
|
||||
|
||||
match batching_strategy {
|
||||
PageServiceProtocolPipelinedBatchingStrategy::UniformLsn => {
|
||||
if let Some(last_in_batch) = accum_pages.last() {
|
||||
if last_in_batch.effective_request_lsn
|
||||
!= this_pages[0].effective_request_lsn
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
PageServiceProtocolPipelinedBatchingStrategy::ScatteredLsn => {
|
||||
// The read path doesn't curently support serving the same page at different LSNs.
|
||||
// While technically possible, it's uncertain if the complexity is worth it.
|
||||
// Break the batch if such a case is encountered.
|
||||
//
|
||||
// TODO(vlad): Include a metric for batch breaks with a reason label.
|
||||
let same_page_different_lsn = accum_pages.iter().any(|batched| {
|
||||
batched.req.rel == this_pages[0].req.rel
|
||||
&& batched.req.blkno == this_pages[0].req.blkno
|
||||
&& batched.effective_request_lsn
|
||||
!= this_pages[0].effective_request_lsn
|
||||
});
|
||||
|
||||
if same_page_different_lsn {
|
||||
trace!(
|
||||
rel=%this_pages[0].req.rel,
|
||||
blkno=%this_pages[0].req.blkno,
|
||||
lsn=%this_pages[0].effective_request_lsn,
|
||||
"stopping batching because same page was requested at different LSNs"
|
||||
);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
true
|
||||
})() =>
|
||||
{
|
||||
// ok to batch
|
||||
accum_pages.extend(this_pages);
|
||||
Ok(())
|
||||
let eligible_batch = match batch {
|
||||
Ok(b) => b,
|
||||
Err(_) => {
|
||||
return Err(Ok(this_msg));
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
(
|
||||
Ok(BatchedFeMessage::Test {
|
||||
shard: accum_shard,
|
||||
requests: accum_requests,
|
||||
..
|
||||
}),
|
||||
BatchedFeMessage::Test {
|
||||
shard: this_shard,
|
||||
requests: this_requests,
|
||||
..
|
||||
},
|
||||
) if (|| {
|
||||
assert!(this_requests.len() == 1);
|
||||
if accum_requests.len() >= max_batch_size.get() {
|
||||
trace!(%max_batch_size, "stopping batching because of batch size");
|
||||
assert_eq!(accum_requests.len(), max_batch_size.get());
|
||||
return false;
|
||||
};
|
||||
|
||||
let batch_break =
|
||||
eligible_batch.should_break_batch(&this_msg, max_batch_size, batching_strategy);
|
||||
|
||||
match batch_break {
|
||||
Some(reason) => {
|
||||
if let BatchedFeMessage::GetPage {
|
||||
batch_break_reason, ..
|
||||
} = eligible_batch
|
||||
{
|
||||
*batch_break_reason = reason;
|
||||
}
|
||||
if !accum_shard.is_same_handle_as(&this_shard) {
|
||||
trace!("stopping batching because timeline object mismatch");
|
||||
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
|
||||
// But the current logic for keeping responses in order does not support that.
|
||||
return false;
|
||||
}
|
||||
let this_batch_key = this_requests[0].req.batch_key;
|
||||
let accum_batch_key = accum_requests[0].req.batch_key;
|
||||
if this_requests[0].req.batch_key != accum_requests[0].req.batch_key {
|
||||
trace!(%accum_batch_key, %this_batch_key, "stopping batching because batch key changed");
|
||||
return false;
|
||||
}
|
||||
true
|
||||
})() =>
|
||||
{
|
||||
// ok to batch
|
||||
accum_requests.extend(this_requests);
|
||||
Ok(())
|
||||
}
|
||||
// something batched already but this message is unbatchable
|
||||
(_, this_msg) => {
|
||||
// by default, don't continue batching
|
||||
|
||||
Err(Ok(this_msg))
|
||||
}
|
||||
None => {
|
||||
// ok to batch
|
||||
match (eligible_batch, this_msg) {
|
||||
(
|
||||
BatchedFeMessage::GetPage {
|
||||
pages: accum_pages, ..
|
||||
},
|
||||
BatchedFeMessage::GetPage {
|
||||
pages: this_pages, ..
|
||||
},
|
||||
) => {
|
||||
accum_pages.extend(this_pages);
|
||||
Ok(())
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
(
|
||||
BatchedFeMessage::Test {
|
||||
requests: accum_requests,
|
||||
..
|
||||
},
|
||||
BatchedFeMessage::Test {
|
||||
requests: this_requests,
|
||||
..
|
||||
},
|
||||
) => {
|
||||
accum_requests.extend(this_requests);
|
||||
Ok(())
|
||||
}
|
||||
// Shape guaranteed by [`BatchedFeMessage::should_break_batch`]
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1413,7 +1472,12 @@ impl PageServerHandler {
|
||||
span,
|
||||
)
|
||||
}
|
||||
BatchedFeMessage::GetPage { span, shard, pages } => {
|
||||
BatchedFeMessage::GetPage {
|
||||
span,
|
||||
shard,
|
||||
pages,
|
||||
batch_break_reason,
|
||||
} => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::getpage");
|
||||
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
|
||||
(
|
||||
@@ -1425,6 +1489,7 @@ impl PageServerHandler {
|
||||
&shard,
|
||||
pages,
|
||||
io_concurrency,
|
||||
batch_break_reason,
|
||||
&ctx,
|
||||
)
|
||||
.instrument(span.clone())
|
||||
@@ -2113,13 +2178,14 @@ impl PageServerHandler {
|
||||
timeline: &Timeline,
|
||||
requests: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
|
||||
io_concurrency: IoConcurrency,
|
||||
batch_break_reason: GetPageBatchBreakReason,
|
||||
ctx: &RequestContext,
|
||||
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
timeline
|
||||
.query_metrics
|
||||
.observe_getpage_batch_start(requests.len());
|
||||
.observe_getpage_batch_start(requests.len(), batch_break_reason);
|
||||
|
||||
// If a page trace is running, submit an event for this request.
|
||||
if let Some(page_trace) = timeline.page_trace.load().as_ref() {
|
||||
|
||||
@@ -5933,12 +5933,20 @@ mod tests {
|
||||
use models::CompactLsnRange;
|
||||
use pageserver_api::key::{AUX_KEY_PREFIX, Key, NON_INHERITED_RANGE, RELATION_SIZE_PREFIX};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
#[cfg(feature = "testing")]
|
||||
use pageserver_api::keyspace::KeySpaceRandomAccum;
|
||||
use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings};
|
||||
#[cfg(feature = "testing")]
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::value::Value;
|
||||
use pageserver_compaction::helpers::overlaps_with;
|
||||
#[cfg(feature = "testing")]
|
||||
use rand::SeedableRng;
|
||||
#[cfg(feature = "testing")]
|
||||
use rand::rngs::StdRng;
|
||||
use rand::{Rng, thread_rng};
|
||||
#[cfg(feature = "testing")]
|
||||
use std::ops::Range;
|
||||
use storage_layer::{IoConcurrency, PersistentLayerKey};
|
||||
use tests::storage_layer::ValuesReconstructState;
|
||||
use tests::timeline::{GetVectoredError, ShutdownMode};
|
||||
@@ -5960,6 +5968,318 @@ mod tests {
|
||||
static TEST_KEY: Lazy<Key> =
|
||||
Lazy::new(|| Key::from_slice(&hex!("010000000033333333444444445500000001")));
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
struct TestTimelineSpecification {
|
||||
start_lsn: Lsn,
|
||||
last_record_lsn: Lsn,
|
||||
|
||||
in_memory_layers_shape: Vec<(Range<Key>, Range<Lsn>)>,
|
||||
delta_layers_shape: Vec<(Range<Key>, Range<Lsn>)>,
|
||||
image_layers_shape: Vec<(Range<Key>, Lsn)>,
|
||||
|
||||
gap_chance: u8,
|
||||
will_init_chance: u8,
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
struct Storage {
|
||||
storage: HashMap<(Key, Lsn), Value>,
|
||||
start_lsn: Lsn,
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
impl Storage {
|
||||
fn get(&self, key: Key, lsn: Lsn) -> Bytes {
|
||||
use bytes::BufMut;
|
||||
|
||||
let mut crnt_lsn = lsn;
|
||||
let mut got_base = false;
|
||||
|
||||
let mut acc = Vec::new();
|
||||
|
||||
while crnt_lsn >= self.start_lsn {
|
||||
if let Some(value) = self.storage.get(&(key, crnt_lsn)) {
|
||||
acc.push(value.clone());
|
||||
|
||||
match value {
|
||||
Value::WalRecord(NeonWalRecord::Test { will_init, .. }) => {
|
||||
if *will_init {
|
||||
got_base = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
Value::Image(_) => {
|
||||
got_base = true;
|
||||
break;
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
crnt_lsn = crnt_lsn.checked_sub(1u64).unwrap();
|
||||
}
|
||||
|
||||
assert!(
|
||||
got_base,
|
||||
"Input data was incorrect. No base image for {key}@{lsn}"
|
||||
);
|
||||
|
||||
tracing::debug!("Wal redo depth for {key}@{lsn} is {}", acc.len());
|
||||
|
||||
let mut blob = BytesMut::new();
|
||||
for value in acc.into_iter().rev() {
|
||||
match value {
|
||||
Value::WalRecord(NeonWalRecord::Test { append, .. }) => {
|
||||
blob.extend_from_slice(append.as_bytes());
|
||||
}
|
||||
Value::Image(img) => {
|
||||
blob.put(img);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
blob.into()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn randomize_timeline(
|
||||
tenant: &Arc<Tenant>,
|
||||
new_timeline_id: TimelineId,
|
||||
pg_version: u32,
|
||||
spec: TestTimelineSpecification,
|
||||
random: &mut rand::rngs::StdRng,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<(Arc<Timeline>, Storage, Vec<Lsn>)> {
|
||||
let mut storage: HashMap<(Key, Lsn), Value> = HashMap::default();
|
||||
let mut interesting_lsns = vec![spec.last_record_lsn];
|
||||
|
||||
for (key_range, lsn_range) in spec.in_memory_layers_shape.iter() {
|
||||
let mut lsn = lsn_range.start;
|
||||
while lsn < lsn_range.end {
|
||||
let mut key = key_range.start;
|
||||
while key < key_range.end {
|
||||
let gap = random.gen_range(1..=100) <= spec.gap_chance;
|
||||
let will_init = random.gen_range(1..=100) <= spec.will_init_chance;
|
||||
|
||||
if gap {
|
||||
continue;
|
||||
}
|
||||
|
||||
let record = if will_init {
|
||||
Value::WalRecord(NeonWalRecord::wal_init(format!("[wil_init {key}@{lsn}]")))
|
||||
} else {
|
||||
Value::WalRecord(NeonWalRecord::wal_append(format!("[delta {key}@{lsn}]")))
|
||||
};
|
||||
|
||||
storage.insert((key, lsn), record);
|
||||
|
||||
key = key.next();
|
||||
}
|
||||
lsn = Lsn(lsn.0 + 1);
|
||||
}
|
||||
|
||||
// Stash some interesting LSN for future use
|
||||
for offset in [0, 5, 100].iter() {
|
||||
if *offset == 0 {
|
||||
interesting_lsns.push(lsn_range.start);
|
||||
} else {
|
||||
let below = lsn_range.start.checked_sub(*offset);
|
||||
match below {
|
||||
Some(v) if v >= spec.start_lsn => {
|
||||
interesting_lsns.push(v);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let above = Lsn(lsn_range.start.0 + offset);
|
||||
interesting_lsns.push(above);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (key_range, lsn_range) in spec.delta_layers_shape.iter() {
|
||||
let mut lsn = lsn_range.start;
|
||||
while lsn < lsn_range.end {
|
||||
let mut key = key_range.start;
|
||||
while key < key_range.end {
|
||||
let gap = random.gen_range(1..=100) <= spec.gap_chance;
|
||||
let will_init = random.gen_range(1..=100) <= spec.will_init_chance;
|
||||
|
||||
if gap {
|
||||
continue;
|
||||
}
|
||||
|
||||
let record = if will_init {
|
||||
Value::WalRecord(NeonWalRecord::wal_init(format!("[wil_init {key}@{lsn}]")))
|
||||
} else {
|
||||
Value::WalRecord(NeonWalRecord::wal_append(format!("[delta {key}@{lsn}]")))
|
||||
};
|
||||
|
||||
storage.insert((key, lsn), record);
|
||||
|
||||
key = key.next();
|
||||
}
|
||||
lsn = Lsn(lsn.0 + 1);
|
||||
}
|
||||
|
||||
// Stash some interesting LSN for future use
|
||||
for offset in [0, 5, 100].iter() {
|
||||
if *offset == 0 {
|
||||
interesting_lsns.push(lsn_range.start);
|
||||
} else {
|
||||
let below = lsn_range.start.checked_sub(*offset);
|
||||
match below {
|
||||
Some(v) if v >= spec.start_lsn => {
|
||||
interesting_lsns.push(v);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let above = Lsn(lsn_range.start.0 + offset);
|
||||
interesting_lsns.push(above);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (key_range, lsn) in spec.image_layers_shape.iter() {
|
||||
let mut key = key_range.start;
|
||||
while key < key_range.end {
|
||||
let blob = Bytes::from(format!("[image {key}@{lsn}]"));
|
||||
let record = Value::Image(blob.clone());
|
||||
storage.insert((key, *lsn), record);
|
||||
|
||||
key = key.next();
|
||||
}
|
||||
|
||||
// Stash some interesting LSN for future use
|
||||
for offset in [0, 5, 100].iter() {
|
||||
if *offset == 0 {
|
||||
interesting_lsns.push(*lsn);
|
||||
} else {
|
||||
let below = lsn.checked_sub(*offset);
|
||||
match below {
|
||||
Some(v) if v >= spec.start_lsn => {
|
||||
interesting_lsns.push(v);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let above = Lsn(lsn.0 + offset);
|
||||
interesting_lsns.push(above);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let in_memory_test_layers = {
|
||||
let mut acc = Vec::new();
|
||||
|
||||
for (key_range, lsn_range) in spec.in_memory_layers_shape.iter() {
|
||||
let mut data = Vec::new();
|
||||
|
||||
let mut lsn = lsn_range.start;
|
||||
while lsn < lsn_range.end {
|
||||
let mut key = key_range.start;
|
||||
while key < key_range.end {
|
||||
if let Some(record) = storage.get(&(key, lsn)) {
|
||||
data.push((key, lsn, record.clone()));
|
||||
}
|
||||
|
||||
key = key.next();
|
||||
}
|
||||
lsn = Lsn(lsn.0 + 1);
|
||||
}
|
||||
|
||||
acc.push(InMemoryLayerTestDesc {
|
||||
data,
|
||||
lsn_range: lsn_range.clone(),
|
||||
is_open: false,
|
||||
})
|
||||
}
|
||||
|
||||
acc
|
||||
};
|
||||
|
||||
let delta_test_layers = {
|
||||
let mut acc = Vec::new();
|
||||
|
||||
for (key_range, lsn_range) in spec.delta_layers_shape.iter() {
|
||||
let mut data = Vec::new();
|
||||
|
||||
let mut lsn = lsn_range.start;
|
||||
while lsn < lsn_range.end {
|
||||
let mut key = key_range.start;
|
||||
while key < key_range.end {
|
||||
if let Some(record) = storage.get(&(key, lsn)) {
|
||||
data.push((key, lsn, record.clone()));
|
||||
}
|
||||
|
||||
key = key.next();
|
||||
}
|
||||
lsn = Lsn(lsn.0 + 1);
|
||||
}
|
||||
|
||||
acc.push(DeltaLayerTestDesc {
|
||||
data,
|
||||
lsn_range: lsn_range.clone(),
|
||||
key_range: key_range.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
acc
|
||||
};
|
||||
|
||||
let image_test_layers = {
|
||||
let mut acc = Vec::new();
|
||||
|
||||
for (key_range, lsn) in spec.image_layers_shape.iter() {
|
||||
let mut data = Vec::new();
|
||||
|
||||
let mut key = key_range.start;
|
||||
while key < key_range.end {
|
||||
if let Some(record) = storage.get(&(key, *lsn)) {
|
||||
let blob = match record {
|
||||
Value::Image(blob) => blob.clone(),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
data.push((key, blob));
|
||||
}
|
||||
|
||||
key = key.next();
|
||||
}
|
||||
|
||||
acc.push((*lsn, data));
|
||||
}
|
||||
|
||||
acc
|
||||
};
|
||||
|
||||
let tline = tenant
|
||||
.create_test_timeline_with_layers(
|
||||
new_timeline_id,
|
||||
spec.start_lsn,
|
||||
pg_version,
|
||||
ctx,
|
||||
in_memory_test_layers,
|
||||
delta_test_layers,
|
||||
image_test_layers,
|
||||
spec.last_record_lsn,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok((
|
||||
tline,
|
||||
Storage {
|
||||
storage,
|
||||
start_lsn: spec.start_lsn,
|
||||
},
|
||||
interesting_lsns,
|
||||
))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_basic() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_basic").await?.load().await;
|
||||
@@ -10543,6 +10863,214 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// A randomized read path test. Generates a layer map according to a deterministic
|
||||
// specification. Fills the (key, LSN) space in random manner and then performs
|
||||
// random scattered queries validating the results against in-memory storage.
|
||||
//
|
||||
// See this internal Notion page for a diagram of the layer map:
|
||||
// https://www.notion.so/neondatabase/Read-Path-Unit-Testing-Fuzzing-1d1f189e0047806c8e5cd37781b0a350?pvs=4
|
||||
//
|
||||
// A fuzzing mode is also supported. In this mode, the test will use a random
|
||||
// seed instead of a hardcoded one. Use it in conjunction with `cargo stress`
|
||||
// to run multiple instances in parallel:
|
||||
//
|
||||
// $ RUST_BACKTRACE=1 RUST_LOG=INFO \
|
||||
// cargo stress --package=pageserver --features=testing,fuzz-read-path --release -- test_read_path
|
||||
#[cfg(feature = "testing")]
|
||||
#[tokio::test]
|
||||
async fn test_read_path() -> anyhow::Result<()> {
|
||||
use rand::seq::SliceRandom;
|
||||
|
||||
let seed = if cfg!(feature = "fuzz-read-path") {
|
||||
let seed: u64 = thread_rng().r#gen();
|
||||
seed
|
||||
} else {
|
||||
// Use a hard-coded seed when not in fuzzing mode.
|
||||
// Note that with the current approach results are not reproducible
|
||||
// accross platforms and Rust releases.
|
||||
const SEED: u64 = 0;
|
||||
SEED
|
||||
};
|
||||
|
||||
let mut random = StdRng::seed_from_u64(seed);
|
||||
|
||||
let (queries, will_init_chance, gap_chance) = if cfg!(feature = "fuzz-read-path") {
|
||||
const QUERIES: u64 = 5000;
|
||||
let will_init_chance: u8 = random.gen_range(0..=10);
|
||||
let gap_chance: u8 = random.gen_range(0..=50);
|
||||
|
||||
(QUERIES, will_init_chance, gap_chance)
|
||||
} else {
|
||||
const QUERIES: u64 = 1000;
|
||||
const WILL_INIT_CHANCE: u8 = 1;
|
||||
const GAP_CHANCE: u8 = 5;
|
||||
|
||||
(QUERIES, WILL_INIT_CHANCE, GAP_CHANCE)
|
||||
};
|
||||
|
||||
let harness = TenantHarness::create("test_read_path").await?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
tracing::info!("Using random seed: {seed}");
|
||||
tracing::info!(%will_init_chance, %gap_chance, "Fill params");
|
||||
|
||||
// Define the layer map shape. Note that this part is not randomized.
|
||||
|
||||
const KEY_DIMENSION_SIZE: u32 = 99;
|
||||
let start_key = Key::from_hex("110000000033333333444444445500000000").unwrap();
|
||||
let end_key = start_key.add(KEY_DIMENSION_SIZE);
|
||||
let total_key_range = start_key..end_key;
|
||||
let total_key_range_size = end_key.to_i128() - start_key.to_i128();
|
||||
let total_start_lsn = Lsn(104);
|
||||
let last_record_lsn = Lsn(504);
|
||||
|
||||
assert!(total_key_range_size % 3 == 0);
|
||||
|
||||
let in_memory_layers_shape = vec![
|
||||
(total_key_range.clone(), Lsn(304)..Lsn(400)),
|
||||
(total_key_range.clone(), Lsn(400)..last_record_lsn),
|
||||
];
|
||||
|
||||
let delta_layers_shape = vec![
|
||||
(
|
||||
start_key..(start_key.add((total_key_range_size / 3) as u32)),
|
||||
Lsn(200)..Lsn(304),
|
||||
),
|
||||
(
|
||||
(start_key.add((total_key_range_size / 3) as u32))
|
||||
..(start_key.add((total_key_range_size * 2 / 3) as u32)),
|
||||
Lsn(200)..Lsn(304),
|
||||
),
|
||||
(
|
||||
(start_key.add((total_key_range_size * 2 / 3) as u32))
|
||||
..(start_key.add(total_key_range_size as u32)),
|
||||
Lsn(200)..Lsn(304),
|
||||
),
|
||||
];
|
||||
|
||||
let image_layers_shape = vec![
|
||||
(
|
||||
start_key.add((total_key_range_size * 2 / 3 - 10) as u32)
|
||||
..start_key.add((total_key_range_size * 2 / 3 + 10) as u32),
|
||||
Lsn(456),
|
||||
),
|
||||
(
|
||||
start_key.add((total_key_range_size / 3 - 10) as u32)
|
||||
..start_key.add((total_key_range_size / 3 + 10) as u32),
|
||||
Lsn(256),
|
||||
),
|
||||
(total_key_range.clone(), total_start_lsn),
|
||||
];
|
||||
|
||||
let specification = TestTimelineSpecification {
|
||||
start_lsn: total_start_lsn,
|
||||
last_record_lsn,
|
||||
in_memory_layers_shape,
|
||||
delta_layers_shape,
|
||||
image_layers_shape,
|
||||
gap_chance,
|
||||
will_init_chance,
|
||||
};
|
||||
|
||||
// Create and randomly fill in the layers according to the specification
|
||||
let (tline, storage, interesting_lsns) = randomize_timeline(
|
||||
&tenant,
|
||||
TIMELINE_ID,
|
||||
DEFAULT_PG_VERSION,
|
||||
specification,
|
||||
&mut random,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Now generate queries based on the interesting lsns that we've collected.
|
||||
//
|
||||
// While there's still room in the query, pick and interesting LSN and a random
|
||||
// key. Then roll the dice to see if the next key should also be included in
|
||||
// the query. When the roll fails, break the "batch" and pick another point in the
|
||||
// (key, LSN) space.
|
||||
|
||||
const PICK_NEXT_CHANCE: u8 = 50;
|
||||
for _ in 0..queries {
|
||||
let query = {
|
||||
let mut keyspaces_at_lsn: HashMap<Lsn, KeySpaceRandomAccum> = HashMap::default();
|
||||
let mut used_keys: HashSet<Key> = HashSet::default();
|
||||
|
||||
while used_keys.len() < Timeline::MAX_GET_VECTORED_KEYS as usize {
|
||||
let selected_lsn = interesting_lsns.choose(&mut random).expect("not empty");
|
||||
let mut selected_key = start_key.add(random.gen_range(0..KEY_DIMENSION_SIZE));
|
||||
|
||||
while used_keys.len() < Timeline::MAX_GET_VECTORED_KEYS as usize {
|
||||
if used_keys.contains(&selected_key)
|
||||
|| selected_key >= start_key.add(KEY_DIMENSION_SIZE)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
keyspaces_at_lsn
|
||||
.entry(*selected_lsn)
|
||||
.or_default()
|
||||
.add_key(selected_key);
|
||||
used_keys.insert(selected_key);
|
||||
|
||||
let pick_next = random.gen_range(0..=100) <= PICK_NEXT_CHANCE;
|
||||
if pick_next {
|
||||
selected_key = selected_key.next();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
VersionedKeySpaceQuery::scattered(
|
||||
keyspaces_at_lsn
|
||||
.into_iter()
|
||||
.map(|(lsn, acc)| (lsn, acc.to_keyspace()))
|
||||
.collect(),
|
||||
)
|
||||
};
|
||||
|
||||
// Run the query and validate the results
|
||||
|
||||
let results = tline
|
||||
.get_vectored(query.clone(), IoConcurrency::Sequential, &ctx)
|
||||
.await;
|
||||
|
||||
let blobs = match results {
|
||||
Ok(ok) => ok,
|
||||
Err(err) => {
|
||||
panic!("seed={seed} Error returned for query {query}: {err}");
|
||||
}
|
||||
};
|
||||
|
||||
for (key, key_res) in blobs.into_iter() {
|
||||
match key_res {
|
||||
Ok(blob) => {
|
||||
let requested_at_lsn = query.map_key_to_lsn(&key);
|
||||
let expected = storage.get(key, requested_at_lsn);
|
||||
|
||||
if blob != expected {
|
||||
tracing::error!(
|
||||
"seed={seed} Mismatch for {key}@{requested_at_lsn} from query: {query}"
|
||||
);
|
||||
}
|
||||
|
||||
assert_eq!(blob, expected);
|
||||
}
|
||||
Err(err) => {
|
||||
let requested_at_lsn = query.map_key_to_lsn(&key);
|
||||
|
||||
panic!(
|
||||
"seed={seed} Error returned for {key}@{requested_at_lsn} from query {query}: {err}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn sort_layer_key(k1: &PersistentLayerKey, k2: &PersistentLayerKey) -> std::cmp::Ordering {
|
||||
(
|
||||
k1.is_delta,
|
||||
|
||||
@@ -22,6 +22,7 @@ use bytes::{BufMut, BytesMut};
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
@@ -169,7 +170,13 @@ pub struct BlobWriter<const BUFFERED: bool> {
|
||||
}
|
||||
|
||||
impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
|
||||
pub fn new(
|
||||
inner: VirtualFile,
|
||||
start_offset: u64,
|
||||
_gate: &utils::sync::gate::Gate,
|
||||
_cancel: CancellationToken,
|
||||
_ctx: &RequestContext,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
offset: start_offset,
|
||||
@@ -432,12 +439,14 @@ pub(crate) mod tests {
|
||||
) -> Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>), Error> {
|
||||
let temp_dir = camino_tempfile::tempdir()?;
|
||||
let pathbuf = temp_dir.path().join("file");
|
||||
let gate = utils::sync::gate::Gate::default();
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
// Write part (in block to drop the file)
|
||||
let mut offsets = Vec::new();
|
||||
{
|
||||
let file = VirtualFile::create(pathbuf.as_path(), ctx).await?;
|
||||
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
|
||||
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0, &gate, cancel.clone(), ctx);
|
||||
for blob in blobs.iter() {
|
||||
let (_, res) = if compression {
|
||||
let res = wtr
|
||||
|
||||
@@ -10,6 +10,8 @@ use pageserver_api::models::AuxFilePolicy;
|
||||
use pageserver_api::models::RelSizeMigration;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::base64::Base64;
|
||||
use serde_with::serde_as;
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -114,6 +116,29 @@ pub struct IndexPart {
|
||||
/// The timestamp when the timeline was marked invisible in synthetic size calculations.
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
pub(crate) marked_invisible_at: Option<NaiveDateTime>,
|
||||
|
||||
/// The encryption key used to encrypt the timeline layer files.
|
||||
#[serde(skip_serializing_if = "Vec::is_empty", default)]
|
||||
pub(crate) keys: Vec<EncryptionKey>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
|
||||
pub struct KeyVersion(u32);
|
||||
|
||||
/// An identifier for an encryption key. The scope of the key is the timeline (TBD).
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
|
||||
pub struct EncryptionKeyId {
|
||||
version: KeyVersion,
|
||||
generation: Generation,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
|
||||
pub struct EncryptionKey {
|
||||
#[serde_as(as = "Base64")]
|
||||
pub key: Vec<u8>,
|
||||
pub id: EncryptionKeyId,
|
||||
pub created_at: NaiveDateTime,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
|
||||
@@ -142,10 +167,12 @@ impl IndexPart {
|
||||
/// - 12: +l2_lsn
|
||||
/// - 13: +gc_compaction
|
||||
/// - 14: +marked_invisible_at
|
||||
const LATEST_VERSION: usize = 14;
|
||||
/// - 15: +keys and encryption_key in layer_metadata
|
||||
const LATEST_VERSION: usize = 15;
|
||||
|
||||
// Versions we may see when reading from a bucket.
|
||||
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14];
|
||||
pub const KNOWN_VERSIONS: &'static [usize] =
|
||||
&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15];
|
||||
|
||||
pub const FILE_NAME: &'static str = "index_part.json";
|
||||
|
||||
@@ -165,6 +192,7 @@ impl IndexPart {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -222,7 +250,7 @@ impl IndexPart {
|
||||
///
|
||||
/// Fields have to be `Option`s because remote [`IndexPart`]'s can be from different version, which
|
||||
/// might have less or more metadata depending if upgrading or rolling back an upgrade.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct LayerFileMetadata {
|
||||
pub file_size: u64,
|
||||
|
||||
@@ -233,6 +261,9 @@ pub struct LayerFileMetadata {
|
||||
#[serde(default = "ShardIndex::unsharded")]
|
||||
#[serde(skip_serializing_if = "ShardIndex::is_unsharded")]
|
||||
pub shard: ShardIndex,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
pub encryption_key: Option<EncryptionKeyId>,
|
||||
}
|
||||
|
||||
impl LayerFileMetadata {
|
||||
@@ -241,6 +272,7 @@ impl LayerFileMetadata {
|
||||
file_size,
|
||||
generation,
|
||||
shard,
|
||||
encryption_key: None,
|
||||
}
|
||||
}
|
||||
/// Helper to get both generation and file size in a tuple
|
||||
@@ -453,14 +485,16 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -475,6 +509,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -502,14 +537,16 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -524,6 +561,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -552,14 +590,16 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -574,6 +614,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -627,6 +668,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let empty_layers_parsed = IndexPart::from_json_bytes(empty_layers_json.as_bytes()).unwrap();
|
||||
@@ -653,14 +695,16 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -675,6 +719,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -703,11 +748,13 @@ mod tests {
|
||||
file_size: 23289856,
|
||||
generation: Generation::new(1),
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000014EF499-00000000015A7619".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 1015808,
|
||||
generation: Generation::new(1),
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: Lsn::from_str("0/15A7618").unwrap(),
|
||||
@@ -726,6 +773,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -756,14 +804,16 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -782,6 +832,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -815,12 +866,14 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -843,6 +896,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -877,12 +931,14 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -905,6 +961,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -941,12 +998,14 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -972,6 +1031,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -1017,12 +1077,14 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -1052,6 +1114,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -1098,12 +1161,14 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -1133,6 +1198,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -1183,12 +1249,14 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -1220,6 +1288,7 @@ mod tests {
|
||||
last_completed_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
}),
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -1271,12 +1340,14 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -1308,6 +1379,139 @@ mod tests {
|
||||
last_completed_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
}),
|
||||
marked_invisible_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
assert_eq!(part, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn v15_keys_are_parsed() {
|
||||
let example = r#"{
|
||||
"version": 15,
|
||||
"layer_metadata":{
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000, "encryption_key": { "version": 1, "generation": 5 } },
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001, "encryption_key": { "version": 2, "generation": 6 } }
|
||||
},
|
||||
"disk_consistent_lsn":"0/16960E8",
|
||||
"metadata": {
|
||||
"disk_consistent_lsn": "0/16960E8",
|
||||
"prev_record_lsn": "0/1696070",
|
||||
"ancestor_timeline": "e45a7f37d3ee2ff17dc14bf4f4e3f52e",
|
||||
"ancestor_lsn": "0/0",
|
||||
"latest_gc_cutoff_lsn": "0/1696070",
|
||||
"initdb_lsn": "0/1696070",
|
||||
"pg_version": 14
|
||||
},
|
||||
"gc_blocking": {
|
||||
"started_at": "2024-07-19T09:00:00.123",
|
||||
"reasons": ["DetachAncestor"]
|
||||
},
|
||||
"import_pgdata": {
|
||||
"V1": {
|
||||
"Done": {
|
||||
"idempotency_key": "specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5",
|
||||
"started_at": "2024-11-13T09:23:42.123",
|
||||
"finished_at": "2024-11-13T09:42:23.123"
|
||||
}
|
||||
}
|
||||
},
|
||||
"rel_size_migration": "legacy",
|
||||
"l2_lsn": "0/16960E8",
|
||||
"gc_compaction": {
|
||||
"last_completed_lsn": "0/16960E8"
|
||||
},
|
||||
"marked_invisible_at": "2023-07-31T09:00:00.123",
|
||||
"keys": [
|
||||
{
|
||||
"key": "dGVzdF9rZXk=",
|
||||
"id": {
|
||||
"version": 1,
|
||||
"generation": 5
|
||||
},
|
||||
"created_at": "2024-07-19T09:00:00.123"
|
||||
},
|
||||
{
|
||||
"key": "dGVzdF9rZXlfMg==",
|
||||
"id": {
|
||||
"version": 2,
|
||||
"generation": 6
|
||||
},
|
||||
"created_at": "2024-07-19T10:00:00.123"
|
||||
}
|
||||
]
|
||||
}"#;
|
||||
|
||||
let expected = IndexPart {
|
||||
version: 15,
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: Some(EncryptionKeyId {
|
||||
version: KeyVersion(1),
|
||||
generation: Generation::Valid(5),
|
||||
}),
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: Some(EncryptionKeyId {
|
||||
version: KeyVersion(2),
|
||||
generation: Generation::Valid(6),
|
||||
}),
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata: TimelineMetadata::new(
|
||||
Lsn::from_str("0/16960E8").unwrap(),
|
||||
Some(Lsn::from_str("0/1696070").unwrap()),
|
||||
Some(TimelineId::from_str("e45a7f37d3ee2ff17dc14bf4f4e3f52e").unwrap()),
|
||||
Lsn::INVALID,
|
||||
Lsn::from_str("0/1696070").unwrap(),
|
||||
Lsn::from_str("0/1696070").unwrap(),
|
||||
14,
|
||||
).with_recalculated_checksum().unwrap(),
|
||||
deleted_at: None,
|
||||
lineage: Default::default(),
|
||||
gc_blocking: Some(GcBlocking {
|
||||
started_at: parse_naive_datetime("2024-07-19T09:00:00.123000000"),
|
||||
reasons: enumset::EnumSet::from_iter([GcBlockingReason::DetachAncestor]),
|
||||
}),
|
||||
last_aux_file_policy: Default::default(),
|
||||
archived_at: None,
|
||||
import_pgdata: Some(import_pgdata::index_part_format::Root::V1(import_pgdata::index_part_format::V1::Done(import_pgdata::index_part_format::Done{
|
||||
started_at: parse_naive_datetime("2024-11-13T09:23:42.123000000"),
|
||||
finished_at: parse_naive_datetime("2024-11-13T09:42:23.123000000"),
|
||||
idempotency_key: import_pgdata::index_part_format::IdempotencyKey::new("specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5".to_string()),
|
||||
}))),
|
||||
rel_size_migration: Some(RelSizeMigration::Legacy),
|
||||
l2_lsn: Some("0/16960E8".parse::<Lsn>().unwrap()),
|
||||
gc_compaction: Some(GcCompactionState {
|
||||
last_completed_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
}),
|
||||
marked_invisible_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
|
||||
keys: vec![
|
||||
EncryptionKey {
|
||||
key: "test_key".as_bytes().to_vec(),
|
||||
id: EncryptionKeyId {
|
||||
version: KeyVersion(1),
|
||||
generation: Generation::Valid(5),
|
||||
},
|
||||
created_at: parse_naive_datetime("2024-07-19T09:00:00.123000000"),
|
||||
},
|
||||
EncryptionKey {
|
||||
key: "test_key_2".as_bytes().to_vec(),
|
||||
id: EncryptionKeyId {
|
||||
version: KeyVersion(2),
|
||||
generation: Generation::Valid(6),
|
||||
},
|
||||
created_at: parse_naive_datetime("2024-07-19T10:00:00.123000000"),
|
||||
}
|
||||
],
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
|
||||
@@ -5,6 +5,7 @@ use std::sync::Arc;
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::key::{KEY_SIZE, Key};
|
||||
use pageserver_api::value::Value;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::shard::TenantShardId;
|
||||
@@ -179,7 +180,7 @@ impl BatchLayerWriter {
|
||||
|
||||
/// An image writer that takes images and produces multiple image layers.
|
||||
#[must_use]
|
||||
pub struct SplitImageLayerWriter {
|
||||
pub struct SplitImageLayerWriter<'a> {
|
||||
inner: ImageLayerWriter,
|
||||
target_layer_size: u64,
|
||||
lsn: Lsn,
|
||||
@@ -188,9 +189,12 @@ pub struct SplitImageLayerWriter {
|
||||
tenant_shard_id: TenantShardId,
|
||||
batches: BatchLayerWriter,
|
||||
start_key: Key,
|
||||
gate: &'a utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
impl SplitImageLayerWriter {
|
||||
impl<'a> SplitImageLayerWriter<'a> {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
@@ -198,6 +202,8 @@ impl SplitImageLayerWriter {
|
||||
start_key: Key,
|
||||
lsn: Lsn,
|
||||
target_layer_size: u64,
|
||||
gate: &'a utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
@@ -208,6 +214,8 @@ impl SplitImageLayerWriter {
|
||||
tenant_shard_id,
|
||||
&(start_key..Key::MAX),
|
||||
lsn,
|
||||
gate,
|
||||
cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
@@ -217,6 +225,8 @@ impl SplitImageLayerWriter {
|
||||
batches: BatchLayerWriter::new(conf).await?,
|
||||
lsn,
|
||||
start_key,
|
||||
gate,
|
||||
cancel,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -239,6 +249,8 @@ impl SplitImageLayerWriter {
|
||||
self.tenant_shard_id,
|
||||
&(key..Key::MAX),
|
||||
self.lsn,
|
||||
self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -291,7 +303,7 @@ impl SplitImageLayerWriter {
|
||||
/// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
|
||||
/// will split them into multiple files based on size.
|
||||
#[must_use]
|
||||
pub struct SplitDeltaLayerWriter {
|
||||
pub struct SplitDeltaLayerWriter<'a> {
|
||||
inner: Option<(Key, DeltaLayerWriter)>,
|
||||
target_layer_size: u64,
|
||||
conf: &'static PageServerConf,
|
||||
@@ -300,15 +312,19 @@ pub struct SplitDeltaLayerWriter {
|
||||
lsn_range: Range<Lsn>,
|
||||
last_key_written: Key,
|
||||
batches: BatchLayerWriter,
|
||||
gate: &'a utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
impl SplitDeltaLayerWriter {
|
||||
impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
pub async fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
lsn_range: Range<Lsn>,
|
||||
target_layer_size: u64,
|
||||
gate: &'a utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
target_layer_size,
|
||||
@@ -319,6 +335,8 @@ impl SplitDeltaLayerWriter {
|
||||
lsn_range,
|
||||
last_key_written: Key::MIN,
|
||||
batches: BatchLayerWriter::new(conf).await?,
|
||||
gate,
|
||||
cancel,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -344,6 +362,8 @@ impl SplitDeltaLayerWriter {
|
||||
self.tenant_shard_id,
|
||||
key,
|
||||
self.lsn_range.clone(),
|
||||
self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
@@ -362,6 +382,8 @@ impl SplitDeltaLayerWriter {
|
||||
self.tenant_shard_id,
|
||||
key,
|
||||
self.lsn_range.clone(),
|
||||
self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -469,6 +491,8 @@ mod tests {
|
||||
get_key(0),
|
||||
Lsn(0x18),
|
||||
4 * 1024 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
@@ -480,6 +504,8 @@ mod tests {
|
||||
tenant.tenant_shard_id,
|
||||
Lsn(0x18)..Lsn(0x20),
|
||||
4 * 1024 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -546,6 +572,8 @@ mod tests {
|
||||
get_key(0),
|
||||
Lsn(0x18),
|
||||
4 * 1024 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
@@ -556,6 +584,8 @@ mod tests {
|
||||
tenant.tenant_shard_id,
|
||||
Lsn(0x18)..Lsn(0x20),
|
||||
4 * 1024 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -643,6 +673,8 @@ mod tests {
|
||||
get_key(0),
|
||||
Lsn(0x18),
|
||||
4 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
@@ -654,6 +686,8 @@ mod tests {
|
||||
tenant.tenant_shard_id,
|
||||
Lsn(0x18)..Lsn(0x20),
|
||||
4 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -730,6 +764,8 @@ mod tests {
|
||||
tenant.tenant_shard_id,
|
||||
Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
|
||||
4 * 1024 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -50,6 +50,7 @@ use rand::distributions::Alphanumeric;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio_epoll_uring::IoBuf;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
@@ -400,12 +401,15 @@ impl DeltaLayerWriterInner {
|
||||
///
|
||||
/// Start building a new delta layer.
|
||||
///
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
key_start: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
// Create the file initially with a temporary filename. We don't know
|
||||
@@ -420,7 +424,7 @@ impl DeltaLayerWriterInner {
|
||||
let mut file = VirtualFile::create(&path, ctx).await?;
|
||||
// make room for the header block
|
||||
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
|
||||
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
|
||||
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
|
||||
|
||||
// Initialize the b-tree index builder
|
||||
let block_buf = BlockBuf::new();
|
||||
@@ -628,12 +632,15 @@ impl DeltaLayerWriter {
|
||||
///
|
||||
/// Start building a new delta layer.
|
||||
///
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
key_start: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
@@ -644,6 +651,8 @@ impl DeltaLayerWriter {
|
||||
tenant_shard_id,
|
||||
key_start,
|
||||
lsn_range,
|
||||
gate,
|
||||
cancel,
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
@@ -1885,6 +1894,8 @@ pub(crate) mod test {
|
||||
harness.tenant_shard_id,
|
||||
entries_meta.key_range.start,
|
||||
entries_meta.lsn_range.clone(),
|
||||
&timeline.gate,
|
||||
timeline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -2079,6 +2090,8 @@ pub(crate) mod test {
|
||||
tenant.tenant_shard_id,
|
||||
Key::MIN,
|
||||
Lsn(0x11)..truncate_at,
|
||||
&branch.gate,
|
||||
branch.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -2213,6 +2226,8 @@ pub(crate) mod test {
|
||||
tenant.tenant_shard_id,
|
||||
*key_start,
|
||||
(*lsn_min)..lsn_end,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -48,6 +48,7 @@ use rand::distributions::Alphanumeric;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio_stream::StreamExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
@@ -748,12 +749,15 @@ impl ImageLayerWriterInner {
|
||||
///
|
||||
/// Start building a new image layer.
|
||||
///
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
key_range: &Range<Key>,
|
||||
lsn: Lsn,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
// Create the file initially with a temporary filename.
|
||||
@@ -780,7 +784,7 @@ impl ImageLayerWriterInner {
|
||||
};
|
||||
// make room for the header block
|
||||
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
|
||||
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
|
||||
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
|
||||
|
||||
// Initialize the b-tree index builder
|
||||
let block_buf = BlockBuf::new();
|
||||
@@ -988,18 +992,30 @@ impl ImageLayerWriter {
|
||||
///
|
||||
/// Start building a new image layer.
|
||||
///
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
key_range: &Range<Key>,
|
||||
lsn: Lsn,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ImageLayerWriter> {
|
||||
Ok(Self {
|
||||
inner: Some(
|
||||
ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx)
|
||||
.await?,
|
||||
ImageLayerWriterInner::new(
|
||||
conf,
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
key_range,
|
||||
lsn,
|
||||
gate,
|
||||
cancel,
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
),
|
||||
})
|
||||
}
|
||||
@@ -1203,6 +1219,8 @@ mod test {
|
||||
harness.tenant_shard_id,
|
||||
&range,
|
||||
lsn,
|
||||
&timeline.gate,
|
||||
timeline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
@@ -1268,6 +1286,8 @@ mod test {
|
||||
harness.tenant_shard_id,
|
||||
&range,
|
||||
lsn,
|
||||
&timeline.gate,
|
||||
timeline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
@@ -1346,6 +1366,8 @@ mod test {
|
||||
tenant.tenant_shard_id,
|
||||
&key_range,
|
||||
lsn,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -719,6 +719,8 @@ impl InMemoryLayer {
|
||||
ctx: &RequestContext,
|
||||
key_range: Option<Range<Key>>,
|
||||
l0_flush_global_state: &l0_flush::Inner,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<Option<(PersistentLayerDesc, Utf8PathBuf)>> {
|
||||
// Grab the lock in read-mode. We hold it over the I/O, but because this
|
||||
// layer is not writeable anymore, no one should be trying to acquire the
|
||||
@@ -759,6 +761,8 @@ impl InMemoryLayer {
|
||||
self.tenant_shard_id,
|
||||
Key::MIN,
|
||||
self.start_lsn..end_lsn,
|
||||
gate,
|
||||
cancel,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -4026,7 +4026,7 @@ impl VersionedKeySpaceQuery {
|
||||
/// Returns LSN for a specific key.
|
||||
///
|
||||
/// Invariant: requested key must be part of [`Self::total_keyspace`]
|
||||
fn map_key_to_lsn(&self, key: &Key) -> Lsn {
|
||||
pub(super) fn map_key_to_lsn(&self, key: &Key) -> Lsn {
|
||||
match self {
|
||||
Self::Uniform { lsn, .. } => *lsn,
|
||||
Self::Scattered { keyspaces_at_lsn } => {
|
||||
@@ -4986,7 +4986,13 @@ impl Timeline {
|
||||
let ctx = ctx.attached_child();
|
||||
let work = async move {
|
||||
let Some((desc, path)) = frozen_layer
|
||||
.write_to_disk(&ctx, key_range, self_clone.l0_flush_global_state.inner())
|
||||
.write_to_disk(
|
||||
&ctx,
|
||||
key_range,
|
||||
self_clone.l0_flush_global_state.inner(),
|
||||
&self_clone.gate,
|
||||
self_clone.cancel.clone(),
|
||||
)
|
||||
.await?
|
||||
else {
|
||||
return Ok(None);
|
||||
@@ -5526,6 +5532,8 @@ impl Timeline {
|
||||
self.tenant_shard_id,
|
||||
&img_range,
|
||||
lsn,
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -6890,6 +6898,8 @@ impl Timeline {
|
||||
self.tenant_shard_id,
|
||||
&(min_key..end_key),
|
||||
lsn,
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -6951,6 +6961,8 @@ impl Timeline {
|
||||
self.tenant_shard_id,
|
||||
deltas.key_range.start,
|
||||
deltas.lsn_range,
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -749,8 +749,8 @@ impl KeyHistoryRetention {
|
||||
async fn pipe_to(
|
||||
self,
|
||||
key: Key,
|
||||
delta_writer: &mut SplitDeltaLayerWriter,
|
||||
mut image_writer: Option<&mut SplitImageLayerWriter>,
|
||||
delta_writer: &mut SplitDeltaLayerWriter<'_>,
|
||||
mut image_writer: Option<&mut SplitImageLayerWriter<'_>>,
|
||||
stat: &mut CompactionStatistics,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
@@ -1394,6 +1394,8 @@ impl Timeline {
|
||||
self.tenant_shard_id,
|
||||
&layer.layer_desc().key_range,
|
||||
layer.layer_desc().image_layer_lsn(),
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -2033,6 +2035,8 @@ impl Timeline {
|
||||
debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
|
||||
lsn_range.clone()
|
||||
},
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -3232,6 +3236,8 @@ impl Timeline {
|
||||
job_desc.compaction_key_range.start,
|
||||
lowest_retain_lsn,
|
||||
self.get_compaction_target_size(),
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -3248,6 +3254,8 @@ impl Timeline {
|
||||
self.tenant_shard_id,
|
||||
lowest_retain_lsn..end_lsn,
|
||||
self.get_compaction_target_size(),
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
)
|
||||
.await
|
||||
.context("failed to create delta layer writer")
|
||||
@@ -3344,6 +3352,8 @@ impl Timeline {
|
||||
self.tenant_shard_id,
|
||||
desc.key_range.start,
|
||||
desc.lsn_range.clone(),
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -3361,6 +3371,8 @@ impl Timeline {
|
||||
self.tenant_shard_id,
|
||||
job_desc.compaction_key_range.end,
|
||||
desc.lsn_range.clone(),
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -3932,6 +3944,8 @@ impl CompactionJobExecutor for TimelineAdaptor {
|
||||
self.timeline.tenant_shard_id,
|
||||
key_range.start,
|
||||
lsn_range.clone(),
|
||||
&self.timeline.gate,
|
||||
self.timeline.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -4007,6 +4021,8 @@ impl TimelineAdaptor {
|
||||
self.timeline.tenant_shard_id,
|
||||
key_range,
|
||||
lsn,
|
||||
&self.timeline.gate,
|
||||
self.timeline.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -228,6 +228,8 @@ async fn generate_tombstone_image_layer(
|
||||
detached.tenant_shard_id,
|
||||
&key_range,
|
||||
image_lsn,
|
||||
&detached.gate,
|
||||
detached.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -776,6 +778,8 @@ async fn copy_lsn_prefix(
|
||||
target_timeline.tenant_shard_id,
|
||||
layer.layer_desc().key_range.start,
|
||||
layer.layer_desc().lsn_range.start..end_lsn,
|
||||
&target_timeline.gate,
|
||||
target_timeline.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -738,6 +738,8 @@ impl ChunkProcessingJob {
|
||||
self.timeline.tenant_shard_id,
|
||||
&self.range,
|
||||
self.pgdata_lsn,
|
||||
&self.timeline.gate,
|
||||
self.timeline.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -641,6 +641,7 @@ mod tests {
|
||||
generation: timeline.generation,
|
||||
shard: timeline.get_shard_index(),
|
||||
file_size: size as u64,
|
||||
encryption_key: None,
|
||||
};
|
||||
make_layer_with_metadata(timeline, name, metadata)
|
||||
}
|
||||
@@ -1378,6 +1379,7 @@ mod tests {
|
||||
shard,
|
||||
generation: Generation::Valid(generation),
|
||||
file_size: 0,
|
||||
encryption_key: None,
|
||||
};
|
||||
make_layer_with_metadata(&tli, name, metadata)
|
||||
};
|
||||
|
||||
@@ -2118,9 +2118,6 @@ HandleSafekeeperResponse(WalProposer *wp, Safekeeper *fromsk)
|
||||
*/
|
||||
if (wp->config->syncSafekeepers)
|
||||
{
|
||||
int n_synced;
|
||||
|
||||
n_synced = 0;
|
||||
for (int i = 0; i < wp->n_safekeepers; i++)
|
||||
{
|
||||
Safekeeper *sk = &wp->safekeeper[i];
|
||||
@@ -2129,8 +2126,6 @@ HandleSafekeeperResponse(WalProposer *wp, Safekeeper *fromsk)
|
||||
/* alive safekeeper which is not synced yet; wait for it */
|
||||
if (sk->state != SS_OFFLINE && !synced)
|
||||
return;
|
||||
if (synced)
|
||||
n_synced++;
|
||||
}
|
||||
|
||||
if (newCommitLsn >= wp->propTermStartLsn)
|
||||
|
||||
@@ -629,15 +629,13 @@ impl ComputeHook {
|
||||
};
|
||||
|
||||
let result = if !self.config.use_local_compute_notifications {
|
||||
let compute_hook_url = if let Some(control_plane_url) = &self.config.control_plane_url {
|
||||
Some(if control_plane_url.ends_with('/') {
|
||||
format!("{control_plane_url}notify-attach")
|
||||
} else {
|
||||
format!("{control_plane_url}/notify-attach")
|
||||
})
|
||||
} else {
|
||||
self.config.compute_hook_url.clone()
|
||||
};
|
||||
let compute_hook_url =
|
||||
self.config
|
||||
.control_plane_url
|
||||
.as_ref()
|
||||
.map(|control_plane_url| {
|
||||
format!("{}/notify-attach", control_plane_url.trim_end_matches('/'))
|
||||
});
|
||||
|
||||
// We validate this at startup
|
||||
let notify_url = compute_hook_url.as_ref().unwrap();
|
||||
|
||||
@@ -86,10 +86,6 @@ struct Cli {
|
||||
#[arg(long)]
|
||||
peer_jwt_token: Option<String>,
|
||||
|
||||
/// URL to control plane compute notification endpoint
|
||||
#[arg(long)]
|
||||
compute_hook_url: Option<String>,
|
||||
|
||||
/// URL to control plane storage API prefix
|
||||
#[arg(long)]
|
||||
control_plane_url: Option<String>,
|
||||
@@ -360,13 +356,11 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
"Insecure config! One or more secrets is not set. This is only permitted in `--dev` mode"
|
||||
);
|
||||
}
|
||||
StrictMode::Strict
|
||||
if args.compute_hook_url.is_none() && args.control_plane_url.is_none() =>
|
||||
{
|
||||
StrictMode::Strict if args.control_plane_url.is_none() => {
|
||||
// Production systems should always have a control plane URL set, to prevent falling
|
||||
// back to trying to use neon_local.
|
||||
anyhow::bail!(
|
||||
"neither `--compute-hook-url` nor `--control-plane-url` are set: this is only permitted in `--dev` mode"
|
||||
"`--control-plane-url` is not set: this is only permitted in `--dev` mode"
|
||||
);
|
||||
}
|
||||
StrictMode::Strict if args.use_local_compute_notifications => {
|
||||
@@ -394,7 +388,6 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
safekeeper_jwt_token: secrets.safekeeper_jwt_token,
|
||||
control_plane_jwt_token: secrets.control_plane_jwt_token,
|
||||
peer_jwt_token: secrets.peer_jwt_token,
|
||||
compute_hook_url: args.compute_hook_url,
|
||||
control_plane_url: args.control_plane_url,
|
||||
max_offline_interval: args
|
||||
.max_offline_interval
|
||||
|
||||
@@ -357,18 +357,10 @@ pub struct Config {
|
||||
// This JWT token will be used to authenticate with other storage controller instances
|
||||
pub peer_jwt_token: Option<String>,
|
||||
|
||||
/// Where the compute hook should send notifications of pageserver attachment locations
|
||||
/// (this URL points to the control plane in prod). If this is None, the compute hook will
|
||||
/// assume it is running in a test environment and try to update neon_local.
|
||||
pub compute_hook_url: Option<String>,
|
||||
|
||||
/// Prefix for storage API endpoints of the control plane. We use this prefix to compute
|
||||
/// URLs that we use to send pageserver and safekeeper attachment locations.
|
||||
/// If this is None, the compute hook will assume it is running in a test environment
|
||||
/// and try to invoke neon_local instead.
|
||||
///
|
||||
/// For now, there is also `compute_hook_url` which allows configuration of the pageserver
|
||||
/// specific endpoint, but it is in the process of being phased out.
|
||||
pub control_plane_url: Option<String>,
|
||||
|
||||
/// Grace period within which a pageserver does not respond to heartbeats, but is still
|
||||
|
||||
@@ -194,6 +194,7 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
|
||||
counter("pageserver_wait_lsn_started_count"),
|
||||
counter("pageserver_wait_lsn_finished_count"),
|
||||
counter("pageserver_wait_ondemand_download_seconds_sum"),
|
||||
counter("pageserver_page_service_batch_break_reason"),
|
||||
*histogram("pageserver_page_service_batch_size"),
|
||||
*histogram("pageserver_page_service_pagestream_batch_wait_time_seconds"),
|
||||
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,
|
||||
|
||||
@@ -947,6 +947,8 @@ class NeonEnvBuilder:
|
||||
continue
|
||||
if SMALL_DB_FILE_NAME_REGEX.fullmatch(test_file.name):
|
||||
continue
|
||||
if FINAL_METRICS_FILE_NAME == test_file.name:
|
||||
continue
|
||||
log.debug(f"Removing large database {test_file} file")
|
||||
test_file.unlink()
|
||||
elif test_entry.is_dir():
|
||||
@@ -1322,10 +1324,6 @@ class NeonEnv:
|
||||
log.info("test may use old binaries, ignoring warnings about unknown config items")
|
||||
ps.allowed_errors.append(".*ignoring unknown configuration item.*")
|
||||
|
||||
# Allow old software to start until https://github.com/neondatabase/neon/pull/11275
|
||||
# lands in the compatiblity snapshot.
|
||||
ps_cfg["page_service_pipelining"].pop("batching")
|
||||
|
||||
self.pageservers.append(ps)
|
||||
cfg["pageservers"].append(ps_cfg)
|
||||
|
||||
@@ -1461,6 +1459,12 @@ class NeonEnv:
|
||||
except Exception as e:
|
||||
metric_errors.append(e)
|
||||
log.error(f"metric validation failed on {pageserver.id}: {e}")
|
||||
|
||||
try:
|
||||
pageserver.snapshot_final_metrics()
|
||||
except Exception as e:
|
||||
log.error(f"metric snapshot failed on {pageserver.id}: {e}")
|
||||
|
||||
try:
|
||||
pageserver.stop(immediate=immediate)
|
||||
except RuntimeError:
|
||||
@@ -2976,6 +2980,20 @@ class NeonPageserver(PgProtocol, LogUtils):
|
||||
value = self.http_client().get_metric_value(metric)
|
||||
assert value == 0, f"Nonzero {metric} == {value}"
|
||||
|
||||
def snapshot_final_metrics(self):
|
||||
"""
|
||||
Take a snapshot of this pageserver's metrics and stash in its work directory.
|
||||
"""
|
||||
if not self.running:
|
||||
log.info(f"Skipping metrics snapshot on pageserver {self.id}, it is not running")
|
||||
return
|
||||
|
||||
metrics = self.http_client().get_metrics_str()
|
||||
metrics_snapshot_path = self.workdir / FINAL_METRICS_FILE_NAME
|
||||
|
||||
with open(metrics_snapshot_path, "w") as f:
|
||||
f.write(metrics)
|
||||
|
||||
def tenant_attach(
|
||||
self,
|
||||
tenant_id: TenantId,
|
||||
@@ -5138,6 +5156,8 @@ SMALL_DB_FILE_NAME_REGEX: re.Pattern[str] = re.compile(
|
||||
r"config-v1|heatmap-v1|tenant-manifest|metadata|.+\.(?:toml|pid|json|sql|conf)"
|
||||
)
|
||||
|
||||
FINAL_METRICS_FILE_NAME: str = "final_metrics.txt"
|
||||
|
||||
|
||||
SKIP_DIRS = frozenset(
|
||||
(
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import concurrent.futures
|
||||
import dataclasses
|
||||
import json
|
||||
import re
|
||||
import threading
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
@@ -170,6 +169,7 @@ def test_throughput(
|
||||
time: float
|
||||
pageserver_batch_size_histo_sum: float
|
||||
pageserver_batch_size_histo_count: float
|
||||
pageserver_batch_breaks_reason_count: dict[str, int]
|
||||
compute_getpage_count: float
|
||||
pageserver_cpu_seconds_total: float
|
||||
|
||||
@@ -183,6 +183,10 @@ def test_throughput(
|
||||
compute_getpage_count=self.compute_getpage_count - other.compute_getpage_count,
|
||||
pageserver_cpu_seconds_total=self.pageserver_cpu_seconds_total
|
||||
- other.pageserver_cpu_seconds_total,
|
||||
pageserver_batch_breaks_reason_count={
|
||||
reason: count - other.pageserver_batch_breaks_reason_count.get(reason, 0)
|
||||
for reason, count in self.pageserver_batch_breaks_reason_count.items()
|
||||
},
|
||||
)
|
||||
|
||||
def normalize(self, by) -> "Metrics":
|
||||
@@ -192,6 +196,10 @@ def test_throughput(
|
||||
pageserver_batch_size_histo_count=self.pageserver_batch_size_histo_count / by,
|
||||
compute_getpage_count=self.compute_getpage_count / by,
|
||||
pageserver_cpu_seconds_total=self.pageserver_cpu_seconds_total / by,
|
||||
pageserver_batch_breaks_reason_count={
|
||||
reason: count / by
|
||||
for reason, count in self.pageserver_batch_breaks_reason_count.items()
|
||||
},
|
||||
)
|
||||
|
||||
def get_metrics() -> Metrics:
|
||||
@@ -201,6 +209,20 @@ def test_throughput(
|
||||
)
|
||||
compute_getpage_count = cur.fetchall()[0][0]
|
||||
pageserver_metrics = ps_http.get_metrics()
|
||||
for name, samples in pageserver_metrics.metrics.items():
|
||||
for sample in samples:
|
||||
log.info(f"{name=} labels={sample.labels} {sample.value}")
|
||||
|
||||
raw_batch_break_reason_count = pageserver_metrics.query_all(
|
||||
"pageserver_page_service_batch_break_reason_total",
|
||||
filter={"timeline_id": str(env.initial_timeline)},
|
||||
)
|
||||
|
||||
batch_break_reason_count = {
|
||||
sample.labels["reason"]: int(sample.value)
|
||||
for sample in raw_batch_break_reason_count
|
||||
}
|
||||
|
||||
return Metrics(
|
||||
time=time.time(),
|
||||
pageserver_batch_size_histo_sum=pageserver_metrics.query_one(
|
||||
@@ -209,6 +231,7 @@ def test_throughput(
|
||||
pageserver_batch_size_histo_count=pageserver_metrics.query_one(
|
||||
"pageserver_page_service_batch_size_count"
|
||||
).value,
|
||||
pageserver_batch_breaks_reason_count=batch_break_reason_count,
|
||||
compute_getpage_count=compute_getpage_count,
|
||||
pageserver_cpu_seconds_total=pageserver_metrics.query_one(
|
||||
"libmetrics_process_cpu_seconds_highres"
|
||||
@@ -263,25 +286,6 @@ def test_throughput(
|
||||
|
||||
log.info("Results: %s", metrics)
|
||||
|
||||
since_last_start: list[str] = []
|
||||
for line in env.pageserver.logfile.read_text().splitlines():
|
||||
if "git:" in line:
|
||||
since_last_start = []
|
||||
since_last_start.append(line)
|
||||
|
||||
stopping_batching_because_re = re.compile(
|
||||
r"stopping batching because (LSN changed|of batch size|timeline object mismatch|batch key changed|same page was requested at different LSNs|.*)"
|
||||
)
|
||||
reasons_for_stopping_batching = {}
|
||||
for line in since_last_start:
|
||||
match = stopping_batching_because_re.search(line)
|
||||
if match:
|
||||
if match.group(1) not in reasons_for_stopping_batching:
|
||||
reasons_for_stopping_batching[match.group(1)] = 0
|
||||
reasons_for_stopping_batching[match.group(1)] += 1
|
||||
|
||||
log.info("Reasons for stopping batching: %s", reasons_for_stopping_batching)
|
||||
|
||||
#
|
||||
# Sanity-checks on the collected data
|
||||
#
|
||||
@@ -295,7 +299,16 @@ def test_throughput(
|
||||
#
|
||||
|
||||
for metric, value in dataclasses.asdict(metrics).items():
|
||||
zenbenchmark.record(f"counters.{metric}", value, unit="", report=MetricReport.TEST_PARAM)
|
||||
if metric == "pageserver_batch_breaks_reason_count":
|
||||
assert isinstance(value, dict)
|
||||
for reason, count in value.items():
|
||||
zenbenchmark.record(
|
||||
f"counters.{metric}_{reason}", count, unit="", report=MetricReport.TEST_PARAM
|
||||
)
|
||||
else:
|
||||
zenbenchmark.record(
|
||||
f"counters.{metric}", value, unit="", report=MetricReport.TEST_PARAM
|
||||
)
|
||||
|
||||
zenbenchmark.record(
|
||||
"perfmetric.batching_factor",
|
||||
|
||||
@@ -64,8 +64,8 @@ def test_ro_replica_lag(
|
||||
|
||||
project = neon_api.create_project(pg_version)
|
||||
project_id = project["project"]["id"]
|
||||
log.info("Project ID: {}", project_id)
|
||||
log.info("Primary endpoint ID: {}", project["project"]["endpoints"][0]["id"])
|
||||
log.info("Project ID: %s", project_id)
|
||||
log.info("Primary endpoint ID: %s", project["project"]["endpoints"][0]["id"])
|
||||
neon_api.wait_for_operation_to_finish(project_id)
|
||||
error_occurred = False
|
||||
try:
|
||||
@@ -81,7 +81,7 @@ def test_ro_replica_lag(
|
||||
endpoint_type="read_only",
|
||||
settings={"pg_settings": {"hot_standby_feedback": "on"}},
|
||||
)
|
||||
log.info("Replica endpoint ID: {}", replica["endpoint"]["id"])
|
||||
log.info("Replica endpoint ID: %s", replica["endpoint"]["id"])
|
||||
replica_env = master_env.copy()
|
||||
replica_env["PGHOST"] = replica["endpoint"]["host"]
|
||||
neon_api.wait_for_operation_to_finish(project_id)
|
||||
@@ -197,8 +197,8 @@ def test_replication_start_stop(
|
||||
|
||||
project = neon_api.create_project(pg_version)
|
||||
project_id = project["project"]["id"]
|
||||
log.info("Project ID: {}", project_id)
|
||||
log.info("Primary endpoint ID: {}", project["project"]["endpoints"][0]["id"])
|
||||
log.info("Project ID: %s", project_id)
|
||||
log.info("Primary endpoint ID: %s", project["project"]["endpoints"][0]["id"])
|
||||
neon_api.wait_for_operation_to_finish(project_id)
|
||||
try:
|
||||
branch_id = project["branch"]["id"]
|
||||
@@ -215,7 +215,7 @@ def test_replication_start_stop(
|
||||
endpoint_type="read_only",
|
||||
settings={"pg_settings": {"hot_standby_feedback": "on"}},
|
||||
)
|
||||
log.info("Replica {} endpoint ID: {}", i + 1, replica["endpoint"]["id"])
|
||||
log.info("Replica %d endpoint ID: %s", i + 1, replica["endpoint"]["id"])
|
||||
replicas.append(replica)
|
||||
neon_api.wait_for_operation_to_finish(project_id)
|
||||
|
||||
|
||||
@@ -390,6 +390,7 @@ def test_create_churn_during_restart(neon_env_builder: NeonEnvBuilder):
|
||||
# Tenant creation requests which arrive out of order will generate complaints about
|
||||
# generation nubmers out of order.
|
||||
env.pageserver.allowed_errors.append(".*Generation .+ is less than existing .+")
|
||||
env.pageserver.allowed_errors.append(".*due to stale generation.+")
|
||||
|
||||
# Timeline::flush_and_shutdown cannot tell if it is hitting a failure because of
|
||||
# an incomplete attach, or some other problem. In the field this should be rare,
|
||||
|
||||
Reference in New Issue
Block a user