mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-30 19:40:39 +00:00
Compare commits
12 Commits
erik/histo
...
arpad/temp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2a84ea625c | ||
|
|
3a098a4eb2 | ||
|
|
ea1a21293d | ||
|
|
94ef73db85 | ||
|
|
9aa0cdee83 | ||
|
|
43ab1935f3 | ||
|
|
1edf84d6c7 | ||
|
|
a64711dec8 | ||
|
|
bb019f2bbb | ||
|
|
ab751abdbd | ||
|
|
5df8284961 | ||
|
|
f4150614d0 |
@@ -235,7 +235,7 @@ pub enum PageServiceProtocolPipelinedBatchingStrategy {
|
||||
ScatteredLsn,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(tag = "mode", rename_all = "kebab-case")]
|
||||
pub enum GetVectoredConcurrentIo {
|
||||
/// The read path is fully sequential: layers are visited
|
||||
|
||||
@@ -325,6 +325,7 @@ impl TimelineCreateRequest {
|
||||
match &self.mode {
|
||||
TimelineCreateRequestMode::Branch { .. } => "branch",
|
||||
TimelineCreateRequestMode::ImportPgdata { .. } => "import",
|
||||
TimelineCreateRequestMode::Template { .. } => "template",
|
||||
TimelineCreateRequestMode::Bootstrap { .. } => "bootstrap",
|
||||
}
|
||||
}
|
||||
@@ -406,6 +407,10 @@ pub enum TimelineCreateRequestMode {
|
||||
ImportPgdata {
|
||||
import_pgdata: TimelineCreateRequestModeImportPgdata,
|
||||
},
|
||||
Template {
|
||||
template_tenant_id: TenantShardId,
|
||||
template_timeline_id: TimelineId,
|
||||
},
|
||||
// NB: Bootstrap is all-optional, and thus the serde(untagged) will cause serde to stop at Bootstrap.
|
||||
// (serde picks the first matching enum variant, in declaration order).
|
||||
Bootstrap {
|
||||
|
||||
@@ -144,7 +144,7 @@ where
|
||||
replica,
|
||||
ctx,
|
||||
io_concurrency: IoConcurrency::spawn_from_conf(
|
||||
timeline.conf,
|
||||
timeline.conf.get_vectored_concurrent_io,
|
||||
timeline
|
||||
.gate
|
||||
.enter()
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime};
|
||||
use std::time::SystemTime;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use consumption_metrics::EventType;
|
||||
@@ -24,12 +24,6 @@ pub(super) enum Name {
|
||||
/// Timeline last_record_lsn, incremental
|
||||
#[serde(rename = "written_data_bytes_delta")]
|
||||
WrittenSizeDelta,
|
||||
/// Timeline's PITR cutoff LSN (now - pitr_interval). If PITR is disabled
|
||||
/// (pitr_interval = 0), this will equal WrittenSize.
|
||||
///
|
||||
/// Updated periodically during GC. Does not regress.
|
||||
#[serde(rename = "pitr_cutoff")]
|
||||
PitrCutoff,
|
||||
/// Timeline logical size
|
||||
#[serde(rename = "timeline_logical_size")]
|
||||
LogicalSize,
|
||||
@@ -163,18 +157,6 @@ impl MetricsKey {
|
||||
.incremental_values()
|
||||
}
|
||||
|
||||
/// [`GcCutoffs::time`] (i.e. `now` - `pitr_interval`)
|
||||
///
|
||||
/// [`GcCutoffs::time`]: crate::tenant::timeline::GcCutoffs::time
|
||||
const fn pitr_cutoff(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
|
||||
MetricsKey {
|
||||
tenant_id,
|
||||
timeline_id: Some(timeline_id),
|
||||
metric: Name::PitrCutoff,
|
||||
}
|
||||
.absolute_values()
|
||||
}
|
||||
|
||||
/// Exact [`Timeline::get_current_logical_size`].
|
||||
///
|
||||
/// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
|
||||
@@ -353,8 +335,6 @@ struct TimelineSnapshot {
|
||||
loaded_at: (Lsn, SystemTime),
|
||||
last_record_lsn: Lsn,
|
||||
current_exact_logical_size: Option<u64>,
|
||||
/// The PITR cutoff LSN. None if PITR is disabled.
|
||||
pitr_cutoff: Option<Lsn>,
|
||||
}
|
||||
|
||||
impl TimelineSnapshot {
|
||||
@@ -374,8 +354,6 @@ impl TimelineSnapshot {
|
||||
} else {
|
||||
let loaded_at = t.loaded_at;
|
||||
let last_record_lsn = t.get_last_record_lsn();
|
||||
let pitr_cutoff = (t.get_pitr_interval() != Duration::ZERO)
|
||||
.then(|| t.gc_info.read().unwrap().cutoffs.time);
|
||||
|
||||
let current_exact_logical_size = {
|
||||
let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
|
||||
@@ -396,7 +374,6 @@ impl TimelineSnapshot {
|
||||
loaded_at,
|
||||
last_record_lsn,
|
||||
current_exact_logical_size,
|
||||
pitr_cutoff,
|
||||
}))
|
||||
}
|
||||
}
|
||||
@@ -464,25 +441,6 @@ impl TimelineSnapshot {
|
||||
});
|
||||
}
|
||||
|
||||
// Compute the PITR cutoff. We have to make sure this doesn't regress:
|
||||
// on restart, GC may not run for a while, and the PITR cutoff will be
|
||||
// reported as 0 (which would bill the user for the entire history from
|
||||
// 0 to last_record_lsn). We therefore clamp this to the cached value
|
||||
// (which persists across restarts).
|
||||
//
|
||||
// If pitr_interval is disabled (pitr_cutoff is None), the cutoff is the
|
||||
// last record LSN, by definition. We don't use the written_size
|
||||
// clamping from above, because our own caching should result in
|
||||
// equivalent clamping.
|
||||
let pitr_cutoff_key = MetricsKey::pitr_cutoff(tenant_id, timeline_id);
|
||||
let mut pitr_cutoff = self
|
||||
.pitr_cutoff
|
||||
.unwrap_or(self.last_record_lsn /* PITR disabled */);
|
||||
if let Some(prev) = cache.get(pitr_cutoff_key.key()) {
|
||||
pitr_cutoff = pitr_cutoff.max(Lsn(prev.value)); // don't regress
|
||||
}
|
||||
metrics.push(pitr_cutoff_key.at(now, pitr_cutoff.into()));
|
||||
|
||||
{
|
||||
let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
|
||||
let current_or_previous = self
|
||||
|
||||
@@ -12,15 +12,12 @@ fn startup_collected_timeline_metrics_before_advancing() {
|
||||
let cache = HashMap::new();
|
||||
|
||||
let initdb_lsn = Lsn(0x10000);
|
||||
let pitr_cutoff = Lsn(0x11000);
|
||||
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
|
||||
let logical_size = 0x42000;
|
||||
|
||||
let snap = TimelineSnapshot {
|
||||
loaded_at: (disk_consistent_lsn, SystemTime::now()),
|
||||
last_record_lsn: disk_consistent_lsn,
|
||||
current_exact_logical_size: Some(logical_size),
|
||||
pitr_cutoff: Some(pitr_cutoff),
|
||||
current_exact_logical_size: Some(0x42000),
|
||||
};
|
||||
|
||||
let now = DateTime::<Utc>::from(SystemTime::now());
|
||||
@@ -36,8 +33,7 @@ fn startup_collected_timeline_metrics_before_advancing() {
|
||||
0
|
||||
),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
|
||||
MetricsKey::pitr_cutoff(tenant_id, timeline_id).at(now, pitr_cutoff.0),
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, logical_size)
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
|
||||
]
|
||||
);
|
||||
}
|
||||
@@ -53,9 +49,7 @@ fn startup_collected_timeline_metrics_second_round() {
|
||||
let before = DateTime::<Utc>::from(before);
|
||||
|
||||
let initdb_lsn = Lsn(0x10000);
|
||||
let pitr_cutoff = Lsn(0x11000);
|
||||
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
|
||||
let logical_size = 0x42000;
|
||||
|
||||
let mut metrics = Vec::new();
|
||||
let cache = HashMap::from([MetricsKey::written_size(tenant_id, timeline_id)
|
||||
@@ -65,8 +59,7 @@ fn startup_collected_timeline_metrics_second_round() {
|
||||
let snap = TimelineSnapshot {
|
||||
loaded_at: (disk_consistent_lsn, init),
|
||||
last_record_lsn: disk_consistent_lsn,
|
||||
current_exact_logical_size: Some(logical_size),
|
||||
pitr_cutoff: Some(pitr_cutoff),
|
||||
current_exact_logical_size: Some(0x42000),
|
||||
};
|
||||
|
||||
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
|
||||
@@ -76,8 +69,7 @@ fn startup_collected_timeline_metrics_second_round() {
|
||||
&[
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, now, 0),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
|
||||
MetricsKey::pitr_cutoff(tenant_id, timeline_id).at(now, pitr_cutoff.0),
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, logical_size)
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
|
||||
]
|
||||
);
|
||||
}
|
||||
@@ -94,9 +86,7 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
|
||||
let before = DateTime::<Utc>::from(before);
|
||||
|
||||
let initdb_lsn = Lsn(0x10000);
|
||||
let pitr_cutoff = Lsn(0x11000);
|
||||
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
|
||||
let logical_size = 0x42000;
|
||||
|
||||
let mut metrics = Vec::new();
|
||||
let cache = HashMap::from([
|
||||
@@ -113,8 +103,7 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
|
||||
let snap = TimelineSnapshot {
|
||||
loaded_at: (disk_consistent_lsn, init),
|
||||
last_record_lsn: disk_consistent_lsn,
|
||||
current_exact_logical_size: Some(logical_size),
|
||||
pitr_cutoff: Some(pitr_cutoff),
|
||||
current_exact_logical_size: Some(0x42000),
|
||||
};
|
||||
|
||||
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
|
||||
@@ -124,14 +113,13 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
|
||||
&[
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(just_before, now, 0),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
|
||||
MetricsKey::pitr_cutoff(tenant_id, timeline_id).at(now, pitr_cutoff.0),
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, logical_size)
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn post_restart_written_sizes_with_rolled_back_last_record_lsn_and_pitr_cutoff() {
|
||||
fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
|
||||
// it can happen that we lose the inmemorylayer but have previously sent metrics and we
|
||||
// should never go backwards
|
||||
|
||||
@@ -153,7 +141,6 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn_and_pitr_cutoff()
|
||||
loaded_at: (Lsn(50), at_restart),
|
||||
last_record_lsn: Lsn(50),
|
||||
current_exact_logical_size: None,
|
||||
pitr_cutoff: Some(Lsn(20)),
|
||||
};
|
||||
|
||||
let mut cache = HashMap::from([
|
||||
@@ -168,9 +155,6 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn_and_pitr_cutoff()
|
||||
999_999_999,
|
||||
)
|
||||
.to_kv_pair(),
|
||||
MetricsKey::pitr_cutoff(tenant_id, timeline_id)
|
||||
.at(before_restart, 70)
|
||||
.to_kv_pair(),
|
||||
]);
|
||||
|
||||
let mut metrics = Vec::new();
|
||||
@@ -185,7 +169,6 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn_and_pitr_cutoff()
|
||||
0
|
||||
),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, 100),
|
||||
MetricsKey::pitr_cutoff(tenant_id, timeline_id).at(now, 70),
|
||||
]
|
||||
);
|
||||
|
||||
@@ -200,7 +183,6 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn_and_pitr_cutoff()
|
||||
&[
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(now, later, 0),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(later, 100),
|
||||
MetricsKey::pitr_cutoff(tenant_id, timeline_id).at(later, 70),
|
||||
]
|
||||
);
|
||||
}
|
||||
@@ -220,7 +202,6 @@ fn post_restart_current_exact_logical_size_uses_cached() {
|
||||
loaded_at: (Lsn(50), at_restart),
|
||||
last_record_lsn: Lsn(50),
|
||||
current_exact_logical_size: None,
|
||||
pitr_cutoff: None,
|
||||
};
|
||||
|
||||
let cache = HashMap::from([MetricsKey::timeline_logical_size(tenant_id, timeline_id)
|
||||
@@ -305,53 +286,16 @@ fn time_backwards<const N: usize>() -> [std::time::SystemTime; N] {
|
||||
times
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pitr_cutoff_none_yields_last_record_lsn() {
|
||||
let tenant_id = TenantId::generate();
|
||||
let timeline_id = TimelineId::generate();
|
||||
|
||||
let mut metrics = Vec::new();
|
||||
let cache = HashMap::new();
|
||||
|
||||
let initdb_lsn = Lsn(0x10000);
|
||||
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
|
||||
|
||||
let snap = TimelineSnapshot {
|
||||
loaded_at: (disk_consistent_lsn, SystemTime::now()),
|
||||
last_record_lsn: disk_consistent_lsn,
|
||||
current_exact_logical_size: None,
|
||||
pitr_cutoff: None,
|
||||
};
|
||||
|
||||
let now = DateTime::<Utc>::from(SystemTime::now());
|
||||
|
||||
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
|
||||
|
||||
assert_eq!(
|
||||
metrics,
|
||||
&[
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(
|
||||
snap.loaded_at.1.into(),
|
||||
now,
|
||||
0
|
||||
),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
|
||||
MetricsKey::pitr_cutoff(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
pub(crate) const fn metric_examples_old(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
now: DateTime<Utc>,
|
||||
before: DateTime<Utc>,
|
||||
) -> [RawMetric; 6] {
|
||||
) -> [RawMetric; 5] {
|
||||
[
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at_old_format(now, 0),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id)
|
||||
.from_until_old_format(before, now, 0),
|
||||
MetricsKey::pitr_cutoff(tenant_id, timeline_id).at_old_format(now, 0),
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at_old_format(now, 0),
|
||||
MetricsKey::remote_storage_size(tenant_id).at_old_format(now, 0),
|
||||
MetricsKey::synthetic_size(tenant_id).at_old_format(now, 1),
|
||||
@@ -363,11 +307,10 @@ pub(crate) const fn metric_examples(
|
||||
timeline_id: TimelineId,
|
||||
now: DateTime<Utc>,
|
||||
before: DateTime<Utc>,
|
||||
) -> [NewRawMetric; 6] {
|
||||
) -> [NewRawMetric; 5] {
|
||||
[
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, 0),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, now, 0),
|
||||
MetricsKey::pitr_cutoff(tenant_id, timeline_id).at(now, 0),
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0),
|
||||
MetricsKey::remote_storage_size(tenant_id).at(now, 0),
|
||||
MetricsKey::synthetic_size(tenant_id).at(now, 1),
|
||||
|
||||
@@ -513,10 +513,6 @@ mod tests {
|
||||
line!(),
|
||||
r#"{"type":"incremental","start_time":"2023-09-14T00:00:00.123456789Z","stop_time":"2023-09-15T00:00:00.123456789Z","metric":"written_data_bytes_delta","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"pitr_cutoff","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"timeline_logical_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
|
||||
@@ -564,7 +560,7 @@ mod tests {
|
||||
assert_eq!(upgraded_samples, new_samples);
|
||||
}
|
||||
|
||||
fn metric_samples_old() -> [RawMetric; 6] {
|
||||
fn metric_samples_old() -> [RawMetric; 5] {
|
||||
let tenant_id = TenantId::from_array([0; 16]);
|
||||
let timeline_id = TimelineId::from_array([0xff; 16]);
|
||||
|
||||
@@ -576,7 +572,7 @@ mod tests {
|
||||
super::super::metrics::metric_examples_old(tenant_id, timeline_id, now, before)
|
||||
}
|
||||
|
||||
fn metric_samples() -> [NewRawMetric; 6] {
|
||||
fn metric_samples() -> [NewRawMetric; 5] {
|
||||
let tenant_id = TenantId::from_array([0; 16]);
|
||||
let timeline_id = TimelineId::from_array([0xff; 16]);
|
||||
|
||||
|
||||
@@ -629,6 +629,12 @@ paths:
|
||||
existing_initdb_timeline_id:
|
||||
type: string
|
||||
format: hex
|
||||
template_tenant_id:
|
||||
type: string
|
||||
format: hex
|
||||
template_timeline_id:
|
||||
type: string
|
||||
format: hex
|
||||
import_pgdata:
|
||||
$ref: "#/components/schemas/TimelineCreateRequestImportPgdata"
|
||||
responses:
|
||||
|
||||
@@ -607,6 +607,14 @@ async fn timeline_create_handler(
|
||||
}
|
||||
},
|
||||
}),
|
||||
TimelineCreateRequestMode::Template {
|
||||
template_tenant_id,
|
||||
template_timeline_id,
|
||||
} => tenant::CreateTimelineParams::Template(tenant::CreateTimelineParamsTemplate {
|
||||
new_timeline_id,
|
||||
template_tenant_id,
|
||||
template_timeline_id,
|
||||
}),
|
||||
};
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
|
||||
@@ -3199,7 +3207,7 @@ async fn list_aux_files(
|
||||
.await?;
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
state.conf,
|
||||
state.conf.get_vectored_concurrent_io,
|
||||
timeline.gate.enter().map_err(|_| ApiError::Cancelled)?,
|
||||
);
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ use itertools::Itertools;
|
||||
use jsonwebtoken::TokenData;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::config::{
|
||||
PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
|
||||
GetVectoredConcurrentIo, PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
|
||||
PageServiceProtocolPipelinedBatchingStrategy, PageServiceProtocolPipelinedExecutionStrategy,
|
||||
};
|
||||
use pageserver_api::key::rel_block_to_key;
|
||||
@@ -331,10 +331,10 @@ async fn page_service_conn_main(
|
||||
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
|
||||
// and create the per-query context in process_query ourselves.
|
||||
let mut conn_handler = PageServerHandler::new(
|
||||
conf,
|
||||
tenant_manager,
|
||||
auth,
|
||||
pipelining_config,
|
||||
conf.get_vectored_concurrent_io,
|
||||
perf_span_fields,
|
||||
connection_ctx,
|
||||
cancel.clone(),
|
||||
@@ -371,7 +371,6 @@ async fn page_service_conn_main(
|
||||
}
|
||||
|
||||
struct PageServerHandler {
|
||||
conf: &'static PageServerConf,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
claims: Option<Claims>,
|
||||
|
||||
@@ -389,6 +388,7 @@ struct PageServerHandler {
|
||||
timeline_handles: Option<TimelineHandles>,
|
||||
|
||||
pipelining_config: PageServicePipeliningConfig,
|
||||
get_vectored_concurrent_io: GetVectoredConcurrentIo,
|
||||
|
||||
gate_guard: GateGuard,
|
||||
}
|
||||
@@ -844,17 +844,16 @@ impl BatchedFeMessage {
|
||||
impl PageServerHandler {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
pipelining_config: PageServicePipeliningConfig,
|
||||
get_vectored_concurrent_io: GetVectoredConcurrentIo,
|
||||
perf_span_fields: ConnectionPerfSpanFields,
|
||||
connection_ctx: RequestContext,
|
||||
cancel: CancellationToken,
|
||||
gate_guard: GateGuard,
|
||||
) -> Self {
|
||||
PageServerHandler {
|
||||
conf,
|
||||
auth,
|
||||
claims: None,
|
||||
connection_ctx,
|
||||
@@ -862,6 +861,7 @@ impl PageServerHandler {
|
||||
timeline_handles: Some(TimelineHandles::new(tenant_manager)),
|
||||
cancel,
|
||||
pipelining_config,
|
||||
get_vectored_concurrent_io,
|
||||
gate_guard,
|
||||
}
|
||||
}
|
||||
@@ -1623,7 +1623,7 @@ impl PageServerHandler {
|
||||
}
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.get_vectored_concurrent_io,
|
||||
match self.gate_guard.try_clone() {
|
||||
Ok(guard) => guard,
|
||||
Err(_) => {
|
||||
|
||||
@@ -586,7 +586,7 @@ impl Timeline {
|
||||
// scan directory listing (new), merge with the old results
|
||||
let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.conf.get_vectored_concurrent_io,
|
||||
self.gate
|
||||
.enter()
|
||||
.map_err(|_| PageReconstructError::Cancelled)?,
|
||||
@@ -645,7 +645,7 @@ impl Timeline {
|
||||
);
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.conf.get_vectored_concurrent_io,
|
||||
self.gate
|
||||
.enter()
|
||||
.map_err(|_| PageReconstructError::Cancelled)?,
|
||||
@@ -885,7 +885,7 @@ impl Timeline {
|
||||
);
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.conf.get_vectored_concurrent_io,
|
||||
self.gate
|
||||
.enter()
|
||||
.map_err(|_| PageReconstructError::Cancelled)?,
|
||||
|
||||
@@ -864,6 +864,7 @@ impl Debug for SetStoppingError {
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum CreateTimelineParams {
|
||||
Bootstrap(CreateTimelineParamsBootstrap),
|
||||
Template(CreateTimelineParamsTemplate),
|
||||
Branch(CreateTimelineParamsBranch),
|
||||
ImportPgdata(CreateTimelineParamsImportPgdata),
|
||||
}
|
||||
@@ -875,6 +876,13 @@ pub(crate) struct CreateTimelineParamsBootstrap {
|
||||
pub(crate) pg_version: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct CreateTimelineParamsTemplate {
|
||||
pub(crate) new_timeline_id: TimelineId,
|
||||
pub(crate) template_tenant_id: TenantShardId,
|
||||
pub(crate) template_timeline_id: TimelineId,
|
||||
}
|
||||
|
||||
/// NB: See comment on [`CreateTimelineIdempotency::Branch`] for why there's no `pg_version` here.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct CreateTimelineParamsBranch {
|
||||
@@ -921,6 +929,7 @@ pub(crate) enum CreateTimelineIdempotency {
|
||||
ancestor_start_lsn: Lsn,
|
||||
},
|
||||
ImportPgdata(CreatingTimelineIdempotencyImportPgdata),
|
||||
Template(TenantShardId, TimelineId),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
@@ -2665,6 +2674,9 @@ impl TenantShard {
|
||||
CreateTimelineParams::ImportPgdata(params) => {
|
||||
self.create_timeline_import_pgdata(params, ctx).await?
|
||||
}
|
||||
CreateTimelineParams::Template(params) => {
|
||||
self.create_timeline_from_template(params, ctx).await?
|
||||
}
|
||||
};
|
||||
|
||||
// At this point we have dropped our guard on [`Self::timelines_creating`], and
|
||||
@@ -2729,6 +2741,109 @@ impl TenantShard {
|
||||
Ok(activated_timeline)
|
||||
}
|
||||
|
||||
async fn create_timeline_from_template(
|
||||
self: &Arc<Self>,
|
||||
params: CreateTimelineParamsTemplate,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<CreateTimelineResult, CreateTimelineError> {
|
||||
let CreateTimelineParamsTemplate {
|
||||
new_timeline_id,
|
||||
template_tenant_id,
|
||||
template_timeline_id,
|
||||
} = params;
|
||||
|
||||
// 0. create a guard to prevent parallel creation attempts.
|
||||
let _timeline_create_guard = match self
|
||||
.start_creating_timeline(
|
||||
new_timeline_id,
|
||||
CreateTimelineIdempotency::Template(template_tenant_id, template_timeline_id),
|
||||
)
|
||||
.await?
|
||||
{
|
||||
StartCreatingTimelineResult::CreateGuard(guard) => guard,
|
||||
StartCreatingTimelineResult::Idempotent(timeline) => {
|
||||
return Ok(CreateTimelineResult::Idempotent(timeline));
|
||||
}
|
||||
};
|
||||
|
||||
// 1. download the index part of the template timeline
|
||||
// TODO can we hardcode the generation here or should we pass it as parameter?
|
||||
let template_generation = Generation::new(1);
|
||||
let (template_index_part, template_generation, _) =
|
||||
remote_timeline_client::download_template_index_part(
|
||||
&self.remote_storage,
|
||||
&template_tenant_id,
|
||||
&template_timeline_id,
|
||||
template_generation,
|
||||
&self.cancel,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| CreateTimelineError::Other(e.into()))?;
|
||||
|
||||
tracing::info!(
|
||||
"downloaded template index_part.json with generation {}",
|
||||
template_generation.get_suffix()
|
||||
);
|
||||
|
||||
// 2. create the timeline, initializing the index part of the new timeline with the layers from the template
|
||||
let template_metadata = &template_index_part.metadata;
|
||||
|
||||
let new_metadata = TimelineMetadata::new(
|
||||
template_metadata.disk_consistent_lsn(),
|
||||
None,
|
||||
None,
|
||||
Lsn(0),
|
||||
template_metadata.latest_gc_cutoff_lsn(),
|
||||
template_metadata.initdb_lsn(),
|
||||
template_metadata.pg_version(),
|
||||
);
|
||||
let mut index_part = IndexPart::empty(new_metadata.clone());
|
||||
index_part.layer_metadata = template_index_part
|
||||
.layer_metadata
|
||||
.iter()
|
||||
.map(|(layer_name, metadata)| {
|
||||
// TODO support local sharing of layers
|
||||
let mut metadata = metadata.clone();
|
||||
metadata.template_ttid = Some((template_tenant_id, template_timeline_id));
|
||||
(layer_name.clone(), metadata)
|
||||
})
|
||||
.collect();
|
||||
let resources = self.build_timeline_resources(new_timeline_id);
|
||||
let _res = self
|
||||
.load_remote_timeline(
|
||||
new_timeline_id,
|
||||
index_part,
|
||||
new_metadata.clone(),
|
||||
None,
|
||||
resources,
|
||||
LoadTimelineCause::Attach,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let timeline = {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
let Some(timeline) = timelines.get(&new_timeline_id) else {
|
||||
warn!("timeline not available directly after attach");
|
||||
panic!();
|
||||
};
|
||||
let offloaded_timelines = self.timelines_offloaded.lock().unwrap();
|
||||
self.initialize_gc_info(&timelines, &offloaded_timelines, Some(new_timeline_id));
|
||||
|
||||
Arc::clone(timeline)
|
||||
};
|
||||
|
||||
// Callers are responsible to wait for uploads to complete and for activating the timeline.
|
||||
|
||||
timeline
|
||||
.remote_client
|
||||
.schedule_index_upload_for_full_metadata_update(&new_metadata)
|
||||
.context("imported timeline initial metadata upload")?;
|
||||
|
||||
// 4. finish
|
||||
Ok(CreateTimelineResult::Created(timeline))
|
||||
}
|
||||
|
||||
/// The returned [`Arc<Timeline>`] is NOT in the [`TenantShard::timelines`] map until the import
|
||||
/// completes in the background. A DIFFERENT [`Arc<Timeline>`] will be inserted into the
|
||||
/// [`TenantShard::timelines`] map when the import completes.
|
||||
@@ -5683,7 +5798,6 @@ pub(crate) mod harness {
|
||||
use pageserver_api::models::ShardParameters;
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use utils::id::TenantId;
|
||||
use utils::logging;
|
||||
|
||||
use super::*;
|
||||
@@ -8596,8 +8710,10 @@ mod tests {
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Option<Bytes>, GetVectoredError> {
|
||||
let io_concurrency =
|
||||
IoConcurrency::spawn_from_conf(tline.conf, tline.gate.enter().unwrap());
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
tline.conf.get_vectored_concurrent_io,
|
||||
tline.gate.enter().unwrap(),
|
||||
);
|
||||
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);
|
||||
let query = VersionedKeySpaceQuery::uniform(KeySpace::single(key..key.next()), lsn);
|
||||
let mut res = tline
|
||||
|
||||
@@ -189,8 +189,9 @@ use anyhow::Context;
|
||||
use camino::Utf8Path;
|
||||
use chrono::{NaiveDateTime, Utc};
|
||||
pub(crate) use download::{
|
||||
download_index_part, download_initdb_tar_zst, download_tenant_manifest, is_temp_download_file,
|
||||
list_remote_tenant_shards, list_remote_timelines,
|
||||
download_index_part, download_initdb_tar_zst, download_template_index_part,
|
||||
download_tenant_manifest, is_temp_download_file, list_remote_tenant_shards,
|
||||
list_remote_timelines,
|
||||
};
|
||||
use index::GcCompactionState;
|
||||
pub(crate) use index::LayerFileMetadata;
|
||||
@@ -1768,7 +1769,7 @@ impl RemoteTimelineClient {
|
||||
adopted_as: &Layer,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let source_remote_path = remote_layer_path(
|
||||
let source_remote_path = remote_layer_path_maybe_template(
|
||||
&self.tenant_shard_id.tenant_id,
|
||||
&adopted
|
||||
.get_timeline_id()
|
||||
@@ -1776,6 +1777,7 @@ impl RemoteTimelineClient {
|
||||
adopted.metadata().shard,
|
||||
&adopted.layer_desc().layer_name(),
|
||||
adopted.metadata().generation,
|
||||
adopted.metadata().template_ttid,
|
||||
);
|
||||
|
||||
let target_remote_path = remote_layer_path(
|
||||
@@ -2684,6 +2686,31 @@ pub fn remote_layer_path(
|
||||
RemotePath::from_string(&path).expect("Failed to construct path")
|
||||
}
|
||||
|
||||
pub fn remote_layer_path_maybe_template(
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
shard: ShardIndex,
|
||||
layer_file_name: &LayerName,
|
||||
generation: Generation,
|
||||
template_ttid: Option<(TenantShardId, TimelineId)>,
|
||||
) -> RemotePath {
|
||||
if let Some((template_tenant_shard_id, template_timeline_id)) = template_ttid {
|
||||
let template_tenant_id = template_tenant_shard_id.tenant_id;
|
||||
let template_shard_id = template_tenant_shard_id.to_index();
|
||||
// Generation-aware key format
|
||||
let path = format!(
|
||||
"templates/{template_tenant_id}{0}/{TIMELINES_SEGMENT_NAME}/{template_timeline_id}/{1}{2}",
|
||||
template_shard_id.get_suffix(),
|
||||
layer_file_name,
|
||||
generation.get_suffix()
|
||||
);
|
||||
|
||||
RemotePath::from_string(&path).expect("Failed to construct path")
|
||||
} else {
|
||||
remote_layer_path(tenant_id, timeline_id, shard, layer_file_name, generation)
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if a and b have the same layer path within a tenant/timeline. This is essentially
|
||||
/// remote_layer_path(a) == remote_layer_path(b) without the string allocations.
|
||||
///
|
||||
@@ -2729,6 +2756,19 @@ pub fn remote_index_path(
|
||||
.expect("Failed to construct path")
|
||||
}
|
||||
|
||||
pub fn remote_template_index_path(
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
generation: Generation,
|
||||
) -> RemotePath {
|
||||
RemotePath::from_string(&format!(
|
||||
"templates/{tenant_shard_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
|
||||
IndexPart::FILE_NAME,
|
||||
generation.get_suffix()
|
||||
))
|
||||
.expect("Failed to construct path")
|
||||
}
|
||||
|
||||
pub(crate) fn remote_heatmap_path(tenant_shard_id: &TenantShardId) -> RemotePath {
|
||||
RemotePath::from_string(&format!(
|
||||
"tenants/{tenant_shard_id}/{TENANT_HEATMAP_BASENAME}"
|
||||
|
||||
@@ -29,7 +29,7 @@ use super::manifest::TenantManifest;
|
||||
use super::{
|
||||
FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, INITDB_PATH, parse_remote_index_path,
|
||||
parse_remote_tenant_manifest_path, remote_index_path, remote_initdb_archive_path,
|
||||
remote_initdb_preserved_archive_path, remote_tenant_manifest_path,
|
||||
remote_initdb_preserved_archive_path, remote_template_index_path, remote_tenant_manifest_path,
|
||||
remote_tenant_manifest_prefix, remote_tenant_path,
|
||||
};
|
||||
use crate::TEMP_FILE_SUFFIX;
|
||||
@@ -39,7 +39,9 @@ use crate::span::{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id, debug_assert_current_span_has_tenant_id,
|
||||
};
|
||||
use crate::tenant::Generation;
|
||||
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
|
||||
use crate::tenant::remote_timeline_client::{
|
||||
remote_layer_path_maybe_template, remote_timelines_path,
|
||||
};
|
||||
use crate::tenant::storage_layer::LayerName;
|
||||
use crate::virtual_file;
|
||||
use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
|
||||
@@ -68,12 +70,13 @@ pub async fn download_layer_file<'a>(
|
||||
|
||||
let timeline_path = conf.timeline_path(&tenant_shard_id, &timeline_id);
|
||||
|
||||
let remote_path = remote_layer_path(
|
||||
let remote_path = remote_layer_path_maybe_template(
|
||||
&tenant_shard_id.tenant_id,
|
||||
&timeline_id,
|
||||
layer_metadata.shard,
|
||||
layer_file_name,
|
||||
layer_metadata.generation,
|
||||
layer_metadata.template_ttid,
|
||||
);
|
||||
|
||||
let (bytes_amount, temp_file) = download_retry(
|
||||
@@ -393,6 +396,32 @@ async fn do_download_index_part(
|
||||
Ok((index_part, index_generation, index_part_mtime))
|
||||
}
|
||||
|
||||
async fn do_download_template_index_part(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: Option<&TimelineId>,
|
||||
index_generation: Generation,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
|
||||
let timeline_id =
|
||||
timeline_id.expect("A timeline ID is always provided when downloading an index");
|
||||
let remote_path = remote_template_index_path(tenant_shard_id, timeline_id, index_generation);
|
||||
|
||||
let download_opts = DownloadOpts {
|
||||
kind: DownloadKind::Small,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let (index_part_bytes, index_part_mtime) =
|
||||
do_download_remote_path_retry_forever(storage, &remote_path, download_opts, cancel).await?;
|
||||
|
||||
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
|
||||
.with_context(|| format!("deserialize index part file at {remote_path:?}"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
Ok((index_part, index_generation, index_part_mtime))
|
||||
}
|
||||
|
||||
/// Metadata objects are "generationed", meaning that they include a generation suffix. This
|
||||
/// function downloads the object with the highest generation <= `my_generation`.
|
||||
///
|
||||
@@ -559,6 +588,35 @@ pub(crate) async fn download_index_part(
|
||||
.await
|
||||
}
|
||||
|
||||
/// index_part.json objects are suffixed with a generation number, so we cannot
|
||||
/// directly GET the latest index part without doing some probing.
|
||||
///
|
||||
/// In this function we probe for the most recent index in a generation <= our current generation.
|
||||
/// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
|
||||
pub(crate) async fn download_template_index_part(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
my_generation: Generation,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let index_prefix = remote_template_index_path(tenant_shard_id, timeline_id, Generation::none());
|
||||
download_generation_object(
|
||||
storage,
|
||||
tenant_shard_id,
|
||||
Some(timeline_id),
|
||||
my_generation,
|
||||
"index_part",
|
||||
index_prefix,
|
||||
do_download_template_index_part,
|
||||
parse_remote_index_path,
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn download_tenant_manifest(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
|
||||
@@ -12,6 +12,7 @@ use pageserver_api::shard::ShardIndex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::shard::TenantShardId;
|
||||
|
||||
use super::is_same_remote_layer_path;
|
||||
use crate::tenant::Generation;
|
||||
@@ -233,6 +234,10 @@ pub struct LayerFileMetadata {
|
||||
#[serde(default = "ShardIndex::unsharded")]
|
||||
#[serde(skip_serializing_if = "ShardIndex::is_unsharded")]
|
||||
pub shard: ShardIndex,
|
||||
|
||||
#[serde(default)]
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub template_ttid: Option<(TenantShardId, TimelineId)>,
|
||||
}
|
||||
|
||||
impl LayerFileMetadata {
|
||||
@@ -241,6 +246,7 @@ impl LayerFileMetadata {
|
||||
file_size,
|
||||
generation,
|
||||
shard,
|
||||
template_ttid: None,
|
||||
}
|
||||
}
|
||||
/// Helper to get both generation and file size in a tuple
|
||||
|
||||
@@ -31,6 +31,7 @@ pub use inmemory_layer::InMemoryLayer;
|
||||
pub(crate) use layer::{EvictionError, Layer, ResidentLayer};
|
||||
pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
|
||||
pub use layer_name::{DeltaLayerName, ImageLayerName, LayerName};
|
||||
use pageserver_api::config::GetVectoredConcurrentIo;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
@@ -43,7 +44,6 @@ use self::inmemory_layer::InMemoryLayerFileId;
|
||||
use super::PageReconstructError;
|
||||
use super::layer_map::InMemoryLayerDesc;
|
||||
use super::timeline::{GetVectoredError, ReadPath};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{
|
||||
AccessStatsBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
@@ -318,11 +318,10 @@ impl IoConcurrency {
|
||||
}
|
||||
|
||||
pub(crate) fn spawn_from_conf(
|
||||
conf: &'static PageServerConf,
|
||||
conf: GetVectoredConcurrentIo,
|
||||
gate_guard: GateGuard,
|
||||
) -> IoConcurrency {
|
||||
use pageserver_api::config::GetVectoredConcurrentIo;
|
||||
let selected = match conf.get_vectored_concurrent_io {
|
||||
let selected = match conf {
|
||||
GetVectoredConcurrentIo::Sequential => SelectedIoConcurrency::Sequential,
|
||||
GetVectoredConcurrentIo::SidecarTask => SelectedIoConcurrency::SidecarTask(gate_guard),
|
||||
};
|
||||
|
||||
@@ -185,6 +185,7 @@ impl Layer {
|
||||
None,
|
||||
metadata.generation,
|
||||
metadata.shard,
|
||||
metadata.template_ttid,
|
||||
)));
|
||||
|
||||
debug_assert!(owner.0.needs_download_blocking().unwrap().is_some());
|
||||
@@ -225,6 +226,7 @@ impl Layer {
|
||||
Some(inner),
|
||||
metadata.generation,
|
||||
metadata.shard,
|
||||
metadata.template_ttid,
|
||||
)
|
||||
}));
|
||||
|
||||
@@ -273,6 +275,7 @@ impl Layer {
|
||||
Some(inner),
|
||||
timeline.generation,
|
||||
timeline.get_shard_index(),
|
||||
None,
|
||||
)
|
||||
}));
|
||||
|
||||
@@ -710,6 +713,8 @@ struct LayerInner {
|
||||
/// a shard split since the layer was originally written.
|
||||
shard: ShardIndex,
|
||||
|
||||
template_ttid: Option<(TenantShardId, TimelineId)>,
|
||||
|
||||
/// When the Layer was last evicted but has not been downloaded since.
|
||||
///
|
||||
/// This is used for skipping evicted layers from the previous heatmap (see
|
||||
@@ -853,6 +858,7 @@ impl LayerInner {
|
||||
downloaded: Option<Arc<DownloadedLayer>>,
|
||||
generation: Generation,
|
||||
shard: ShardIndex,
|
||||
template_ttid: Option<(TenantShardId, TimelineId)>,
|
||||
) -> Self {
|
||||
let (inner, version, init_status) = if let Some(inner) = downloaded {
|
||||
let version = inner.version;
|
||||
@@ -888,6 +894,7 @@ impl LayerInner {
|
||||
consecutive_failures: AtomicUsize::new(0),
|
||||
generation,
|
||||
shard,
|
||||
template_ttid,
|
||||
last_evicted_at: std::sync::Mutex::default(),
|
||||
#[cfg(test)]
|
||||
failpoints: Default::default(),
|
||||
@@ -1623,7 +1630,9 @@ impl LayerInner {
|
||||
}
|
||||
|
||||
fn metadata(&self) -> LayerFileMetadata {
|
||||
LayerFileMetadata::new(self.desc.file_size, self.generation, self.shard)
|
||||
let mut metadata = LayerFileMetadata::new(self.desc.file_size, self.generation, self.shard);
|
||||
metadata.template_ttid = self.template_ttid;
|
||||
metadata
|
||||
}
|
||||
|
||||
/// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME.
|
||||
@@ -1771,13 +1780,19 @@ impl DownloadedLayer {
|
||||
"these are the same, just avoiding the upgrade"
|
||||
);
|
||||
|
||||
let (ex_tenant_id, ex_timeline_id) =
|
||||
if let Some((tenant_id, timeline_id)) = owner.template_ttid {
|
||||
(tenant_id.tenant_id, timeline_id)
|
||||
} else {
|
||||
(owner.desc.tenant_shard_id.tenant_id, owner.desc.timeline_id)
|
||||
};
|
||||
let res = if owner.desc.is_delta {
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.page_content_kind(crate::context::PageContentKind::DeltaLayerSummary)
|
||||
.attached_child();
|
||||
let summary = Some(delta_layer::Summary::expected(
|
||||
owner.desc.tenant_shard_id.tenant_id,
|
||||
owner.desc.timeline_id,
|
||||
ex_tenant_id,
|
||||
ex_timeline_id,
|
||||
owner.desc.key_range.clone(),
|
||||
owner.desc.lsn_range.clone(),
|
||||
));
|
||||
@@ -1795,8 +1810,8 @@ impl DownloadedLayer {
|
||||
.attached_child();
|
||||
let lsn = owner.desc.image_layer_lsn();
|
||||
let summary = Some(image_layer::Summary::expected(
|
||||
owner.desc.tenant_shard_id.tenant_id,
|
||||
owner.desc.timeline_id,
|
||||
ex_tenant_id,
|
||||
ex_timeline_id,
|
||||
owner.desc.key_range.clone(),
|
||||
lsn,
|
||||
));
|
||||
|
||||
@@ -2545,13 +2545,6 @@ impl Timeline {
|
||||
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
|
||||
}
|
||||
|
||||
pub(crate) fn get_pitr_interval(&self) -> Duration {
|
||||
let tenant_conf = &self.tenant_conf.load().tenant_conf;
|
||||
tenant_conf
|
||||
.pitr_interval
|
||||
.unwrap_or(self.conf.default_tenant_conf.pitr_interval)
|
||||
}
|
||||
|
||||
fn get_compaction_period(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
@@ -3537,7 +3530,7 @@ impl Timeline {
|
||||
};
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self_ref.conf,
|
||||
self_ref.conf.get_vectored_concurrent_io,
|
||||
self_ref
|
||||
.gate
|
||||
.enter()
|
||||
@@ -5566,7 +5559,7 @@ impl Timeline {
|
||||
});
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.conf.get_vectored_concurrent_io,
|
||||
self.gate
|
||||
.enter()
|
||||
.map_err(|_| CreateImageLayersError::Cancelled)?,
|
||||
|
||||
@@ -188,7 +188,7 @@ pub(crate) async fn generate_tombstone_image_layer(
|
||||
"removing non-inherited keys by writing an image layer with tombstones at the detach LSN"
|
||||
);
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
detached.conf,
|
||||
detached.conf.get_vectored_concurrent_io,
|
||||
detached.gate.enter().map_err(|_| Error::ShuttingDown)?,
|
||||
);
|
||||
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);
|
||||
|
||||
@@ -508,7 +508,6 @@ PER_METRIC_VERIFIERS = {
|
||||
"remote_storage_size": CannotVerifyAnything,
|
||||
"written_size": WrittenDataVerifier,
|
||||
"written_data_bytes_delta": WrittenDataDeltaVerifier,
|
||||
"pitr_cutoff": CannotVerifyAnything,
|
||||
"timeline_logical_size": CannotVerifyAnything,
|
||||
"synthetic_storage_size": SyntheticSizeVerifier,
|
||||
}
|
||||
|
||||
190
test_runner/regress/test_templates.py
Normal file
190
test_runner/regress/test_templates.py
Normal file
@@ -0,0 +1,190 @@
|
||||
import base64
|
||||
import json
|
||||
import shutil
|
||||
import time
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from threading import Event
|
||||
|
||||
import psycopg2
|
||||
import psycopg2.errors
|
||||
import pytest
|
||||
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
|
||||
from fixtures.fast_import import FastImport
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
PgProtocol,
|
||||
VanillaPostgres,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.pageserver.http import (
|
||||
ImportPgdataIdemptencyKey,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
from fixtures.remote_storage import LocalFsStorage, MockS3Server, RemoteStorageKind
|
||||
from fixtures.utils import (
|
||||
run_only_on_default_postgres,
|
||||
shared_buffers_for_max_cu,
|
||||
skip_in_debug_build,
|
||||
wait_until,
|
||||
)
|
||||
from fixtures.workload import Workload
|
||||
from mypy_boto3_kms import KMSClient
|
||||
from mypy_boto3_kms.type_defs import EncryptResponseTypeDef
|
||||
from mypy_boto3_s3 import S3Client
|
||||
from pytest_httpserver import HTTPServer
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
num_rows = 1000
|
||||
|
||||
|
||||
def mock_import_bucket(vanilla_pg: VanillaPostgres, path: Path):
|
||||
"""
|
||||
Mock the import S3 bucket into a local directory for a provided vanilla PG instance.
|
||||
"""
|
||||
assert not vanilla_pg.is_running()
|
||||
|
||||
path.mkdir()
|
||||
# what cplane writes before scheduling fast_import
|
||||
specpath = path / "spec.json"
|
||||
specpath.write_text(json.dumps({"branch_id": "somebranch", "project_id": "someproject"}))
|
||||
# what fast_import writes
|
||||
vanilla_pg.pgdatadir.rename(path / "pgdata")
|
||||
statusdir = path / "status"
|
||||
statusdir.mkdir()
|
||||
(statusdir / "pgdata").write_text(json.dumps({"done": True}))
|
||||
(statusdir / "fast_import").write_text(json.dumps({"command": "pgdata", "done": True}))
|
||||
|
||||
|
||||
def test_template_smoke(neon_env_builder: NeonEnvBuilder):
|
||||
shard_count = 1
|
||||
stripe_size = 1024
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
|
||||
|
||||
log.info("create template data")
|
||||
|
||||
template_tenant_id = TenantId.generate()
|
||||
template_timeline_id = TimelineId.generate()
|
||||
template_tenant_shard_id = TenantShardId(template_tenant_id, 0, 1)
|
||||
|
||||
env.create_tenant(shard_count=1, tenant_id=template_tenant_id, timeline_id=template_timeline_id)
|
||||
|
||||
def validate_data_equivalence(ep):
|
||||
# TODO: would be nicer to just compare pgdump
|
||||
|
||||
# Enable IO concurrency for batching on large sequential scan, to avoid making
|
||||
# this test unnecessarily onerous on CPU. Especially on debug mode, it's still
|
||||
# pretty onerous though, so increase statement_timeout to avoid timeouts.
|
||||
assert ep.safe_psql_many(
|
||||
[
|
||||
"set effective_io_concurrency=32;",
|
||||
"SET statement_timeout='300s';",
|
||||
"select count(*), sum(data::bigint)::bigint from t",
|
||||
]
|
||||
) == [[], [], [(expect_nrows, expect_sum)]]
|
||||
|
||||
# Fill the template with some data
|
||||
with env.endpoints.create_start("main", tenant_id=template_tenant_id) as endpoint:
|
||||
# fillfactor so we don't need to produce that much data
|
||||
# 900 byte per row is > 10% => 1 row per page
|
||||
endpoint.safe_psql("""create table t (data char(900)) with (fillfactor = 10)""")
|
||||
|
||||
nrows = 0
|
||||
|
||||
target_relblock_size = 1024 * 8192
|
||||
|
||||
while True:
|
||||
relblock_size = endpoint.safe_psql_scalar("select pg_relation_size('t')")
|
||||
log.info(
|
||||
f"relblock size: {relblock_size / 8192} pages (target: {target_relblock_size // 8192}) pages"
|
||||
)
|
||||
if relblock_size >= target_relblock_size:
|
||||
break
|
||||
addrows = int((target_relblock_size - relblock_size) // 8192)
|
||||
assert addrows >= 1, "forward progress"
|
||||
endpoint.safe_psql(
|
||||
f"insert into t select generate_series({nrows + 1}, {nrows + addrows})"
|
||||
)
|
||||
nrows += addrows
|
||||
expect_nrows = nrows
|
||||
expect_sum = (
|
||||
(nrows) * (nrows + 1) // 2
|
||||
) # https://stackoverflow.com/questions/43901484/sum-of-the-integers-from-1-to-n
|
||||
|
||||
env.pageserver.http_client().timeline_checkpoint(
|
||||
template_tenant_shard_id, template_timeline_id
|
||||
)
|
||||
wait_for_last_flush_lsn(env, endpoint, template_tenant_id, template_timeline_id)
|
||||
|
||||
validate_data_equivalence(endpoint)
|
||||
|
||||
# Copy the template to the templates dir and delete the original project
|
||||
from_dir = env.pageserver_remote_storage.tenant_path(template_tenant_shard_id)
|
||||
to_dir = env.pageserver_remote_storage.root / "templates" / str(template_tenant_id)
|
||||
shutil.copytree(from_dir, to_dir)
|
||||
|
||||
# Do the template creation
|
||||
new_tenant_id = TenantId.generate()
|
||||
env.storage_controller.tenant_create(new_tenant_id)
|
||||
|
||||
new_timeline_id = TimelineId.generate()
|
||||
log.info("starting timeline creation")
|
||||
start = time.monotonic()
|
||||
|
||||
branch_name = "on_template"
|
||||
env.storage_controller.timeline_create(
|
||||
new_tenant_id,
|
||||
{
|
||||
"new_timeline_id": str(new_timeline_id),
|
||||
"template_tenant_id": str(template_tenant_id),
|
||||
"template_timeline_id": str(template_timeline_id),
|
||||
},
|
||||
)
|
||||
env.neon_cli.mappings_map_branch(branch_name, new_tenant_id, new_timeline_id)
|
||||
|
||||
# Get some timeline details for later.
|
||||
locations = env.storage_controller.locate(new_tenant_id)
|
||||
[shard_zero] = [
|
||||
loc for loc in locations if TenantShardId.parse(loc["shard_id"]).shard_number == 0
|
||||
]
|
||||
shard_zero_ps = env.get_pageserver(shard_zero["node_id"])
|
||||
shard_zero_http = shard_zero_ps.http_client()
|
||||
shard_zero_timeline_info = shard_zero_http.timeline_detail(
|
||||
shard_zero["shard_id"], new_timeline_id
|
||||
)
|
||||
initdb_lsn = Lsn(shard_zero_timeline_info["initdb_lsn"])
|
||||
min_readable_lsn = Lsn(shard_zero_timeline_info["min_readable_lsn"])
|
||||
last_record_lsn = Lsn(shard_zero_timeline_info["last_record_lsn"])
|
||||
disk_consistent_lsn = Lsn(shard_zero_timeline_info["disk_consistent_lsn"])
|
||||
_remote_consistent_lsn = Lsn(shard_zero_timeline_info["remote_consistent_lsn"])
|
||||
remote_consistent_lsn_visible = Lsn(shard_zero_timeline_info["remote_consistent_lsn_visible"])
|
||||
# assert remote_consistent_lsn_visible == remote_consistent_lsn TODO: this fails initially and after restart, presumably because `UploadQueue::clean.1` is still `None`
|
||||
# assert remote_consistent_lsn_visible == disk_consistent_lsn
|
||||
# assert initdb_lsn == min_readable_lsn
|
||||
# assert disk_consistent_lsn == initdb_lsn + 8
|
||||
assert last_record_lsn == disk_consistent_lsn
|
||||
# TODO: assert these values are the same everywhere
|
||||
|
||||
# Last step: validation
|
||||
|
||||
with env.endpoints.create_start(
|
||||
branch_name=branch_name,
|
||||
endpoint_id="ro",
|
||||
tenant_id=new_tenant_id,
|
||||
lsn=last_record_lsn,
|
||||
) as ro_endpoint:
|
||||
validate_data_equivalence(ro_endpoint)
|
||||
|
||||
# ensure the template survives restarts
|
||||
ro_endpoint.stop()
|
||||
env.pageserver.stop(immediate=True)
|
||||
env.pageserver.start()
|
||||
ro_endpoint.start()
|
||||
validate_data_equivalence(ro_endpoint)
|
||||
Reference in New Issue
Block a user