mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-04 00:40:38 +00:00
Compare commits
11 Commits
release
...
arpad/temp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2a84ea625c | ||
|
|
3a098a4eb2 | ||
|
|
ea1a21293d | ||
|
|
94ef73db85 | ||
|
|
9aa0cdee83 | ||
|
|
43ab1935f3 | ||
|
|
1edf84d6c7 | ||
|
|
a64711dec8 | ||
|
|
bb019f2bbb | ||
|
|
ab751abdbd | ||
|
|
5df8284961 |
@@ -325,6 +325,7 @@ impl TimelineCreateRequest {
|
||||
match &self.mode {
|
||||
TimelineCreateRequestMode::Branch { .. } => "branch",
|
||||
TimelineCreateRequestMode::ImportPgdata { .. } => "import",
|
||||
TimelineCreateRequestMode::Template { .. } => "template",
|
||||
TimelineCreateRequestMode::Bootstrap { .. } => "bootstrap",
|
||||
}
|
||||
}
|
||||
@@ -406,6 +407,10 @@ pub enum TimelineCreateRequestMode {
|
||||
ImportPgdata {
|
||||
import_pgdata: TimelineCreateRequestModeImportPgdata,
|
||||
},
|
||||
Template {
|
||||
template_tenant_id: TenantShardId,
|
||||
template_timeline_id: TimelineId,
|
||||
},
|
||||
// NB: Bootstrap is all-optional, and thus the serde(untagged) will cause serde to stop at Bootstrap.
|
||||
// (serde picks the first matching enum variant, in declaration order).
|
||||
Bootstrap {
|
||||
|
||||
@@ -629,6 +629,12 @@ paths:
|
||||
existing_initdb_timeline_id:
|
||||
type: string
|
||||
format: hex
|
||||
template_tenant_id:
|
||||
type: string
|
||||
format: hex
|
||||
template_timeline_id:
|
||||
type: string
|
||||
format: hex
|
||||
import_pgdata:
|
||||
$ref: "#/components/schemas/TimelineCreateRequestImportPgdata"
|
||||
responses:
|
||||
|
||||
@@ -607,6 +607,14 @@ async fn timeline_create_handler(
|
||||
}
|
||||
},
|
||||
}),
|
||||
TimelineCreateRequestMode::Template {
|
||||
template_tenant_id,
|
||||
template_timeline_id,
|
||||
} => tenant::CreateTimelineParams::Template(tenant::CreateTimelineParamsTemplate {
|
||||
new_timeline_id,
|
||||
template_tenant_id,
|
||||
template_timeline_id,
|
||||
}),
|
||||
};
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
|
||||
|
||||
@@ -864,6 +864,7 @@ impl Debug for SetStoppingError {
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum CreateTimelineParams {
|
||||
Bootstrap(CreateTimelineParamsBootstrap),
|
||||
Template(CreateTimelineParamsTemplate),
|
||||
Branch(CreateTimelineParamsBranch),
|
||||
ImportPgdata(CreateTimelineParamsImportPgdata),
|
||||
}
|
||||
@@ -875,6 +876,13 @@ pub(crate) struct CreateTimelineParamsBootstrap {
|
||||
pub(crate) pg_version: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct CreateTimelineParamsTemplate {
|
||||
pub(crate) new_timeline_id: TimelineId,
|
||||
pub(crate) template_tenant_id: TenantShardId,
|
||||
pub(crate) template_timeline_id: TimelineId,
|
||||
}
|
||||
|
||||
/// NB: See comment on [`CreateTimelineIdempotency::Branch`] for why there's no `pg_version` here.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct CreateTimelineParamsBranch {
|
||||
@@ -921,6 +929,7 @@ pub(crate) enum CreateTimelineIdempotency {
|
||||
ancestor_start_lsn: Lsn,
|
||||
},
|
||||
ImportPgdata(CreatingTimelineIdempotencyImportPgdata),
|
||||
Template(TenantShardId, TimelineId),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
@@ -2665,6 +2674,9 @@ impl TenantShard {
|
||||
CreateTimelineParams::ImportPgdata(params) => {
|
||||
self.create_timeline_import_pgdata(params, ctx).await?
|
||||
}
|
||||
CreateTimelineParams::Template(params) => {
|
||||
self.create_timeline_from_template(params, ctx).await?
|
||||
}
|
||||
};
|
||||
|
||||
// At this point we have dropped our guard on [`Self::timelines_creating`], and
|
||||
@@ -2729,6 +2741,109 @@ impl TenantShard {
|
||||
Ok(activated_timeline)
|
||||
}
|
||||
|
||||
async fn create_timeline_from_template(
|
||||
self: &Arc<Self>,
|
||||
params: CreateTimelineParamsTemplate,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<CreateTimelineResult, CreateTimelineError> {
|
||||
let CreateTimelineParamsTemplate {
|
||||
new_timeline_id,
|
||||
template_tenant_id,
|
||||
template_timeline_id,
|
||||
} = params;
|
||||
|
||||
// 0. create a guard to prevent parallel creation attempts.
|
||||
let _timeline_create_guard = match self
|
||||
.start_creating_timeline(
|
||||
new_timeline_id,
|
||||
CreateTimelineIdempotency::Template(template_tenant_id, template_timeline_id),
|
||||
)
|
||||
.await?
|
||||
{
|
||||
StartCreatingTimelineResult::CreateGuard(guard) => guard,
|
||||
StartCreatingTimelineResult::Idempotent(timeline) => {
|
||||
return Ok(CreateTimelineResult::Idempotent(timeline));
|
||||
}
|
||||
};
|
||||
|
||||
// 1. download the index part of the template timeline
|
||||
// TODO can we hardcode the generation here or should we pass it as parameter?
|
||||
let template_generation = Generation::new(1);
|
||||
let (template_index_part, template_generation, _) =
|
||||
remote_timeline_client::download_template_index_part(
|
||||
&self.remote_storage,
|
||||
&template_tenant_id,
|
||||
&template_timeline_id,
|
||||
template_generation,
|
||||
&self.cancel,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| CreateTimelineError::Other(e.into()))?;
|
||||
|
||||
tracing::info!(
|
||||
"downloaded template index_part.json with generation {}",
|
||||
template_generation.get_suffix()
|
||||
);
|
||||
|
||||
// 2. create the timeline, initializing the index part of the new timeline with the layers from the template
|
||||
let template_metadata = &template_index_part.metadata;
|
||||
|
||||
let new_metadata = TimelineMetadata::new(
|
||||
template_metadata.disk_consistent_lsn(),
|
||||
None,
|
||||
None,
|
||||
Lsn(0),
|
||||
template_metadata.latest_gc_cutoff_lsn(),
|
||||
template_metadata.initdb_lsn(),
|
||||
template_metadata.pg_version(),
|
||||
);
|
||||
let mut index_part = IndexPart::empty(new_metadata.clone());
|
||||
index_part.layer_metadata = template_index_part
|
||||
.layer_metadata
|
||||
.iter()
|
||||
.map(|(layer_name, metadata)| {
|
||||
// TODO support local sharing of layers
|
||||
let mut metadata = metadata.clone();
|
||||
metadata.template_ttid = Some((template_tenant_id, template_timeline_id));
|
||||
(layer_name.clone(), metadata)
|
||||
})
|
||||
.collect();
|
||||
let resources = self.build_timeline_resources(new_timeline_id);
|
||||
let _res = self
|
||||
.load_remote_timeline(
|
||||
new_timeline_id,
|
||||
index_part,
|
||||
new_metadata.clone(),
|
||||
None,
|
||||
resources,
|
||||
LoadTimelineCause::Attach,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let timeline = {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
let Some(timeline) = timelines.get(&new_timeline_id) else {
|
||||
warn!("timeline not available directly after attach");
|
||||
panic!();
|
||||
};
|
||||
let offloaded_timelines = self.timelines_offloaded.lock().unwrap();
|
||||
self.initialize_gc_info(&timelines, &offloaded_timelines, Some(new_timeline_id));
|
||||
|
||||
Arc::clone(timeline)
|
||||
};
|
||||
|
||||
// Callers are responsible to wait for uploads to complete and for activating the timeline.
|
||||
|
||||
timeline
|
||||
.remote_client
|
||||
.schedule_index_upload_for_full_metadata_update(&new_metadata)
|
||||
.context("imported timeline initial metadata upload")?;
|
||||
|
||||
// 4. finish
|
||||
Ok(CreateTimelineResult::Created(timeline))
|
||||
}
|
||||
|
||||
/// The returned [`Arc<Timeline>`] is NOT in the [`TenantShard::timelines`] map until the import
|
||||
/// completes in the background. A DIFFERENT [`Arc<Timeline>`] will be inserted into the
|
||||
/// [`TenantShard::timelines`] map when the import completes.
|
||||
@@ -5683,7 +5798,6 @@ pub(crate) mod harness {
|
||||
use pageserver_api::models::ShardParameters;
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use utils::id::TenantId;
|
||||
use utils::logging;
|
||||
|
||||
use super::*;
|
||||
|
||||
@@ -189,8 +189,9 @@ use anyhow::Context;
|
||||
use camino::Utf8Path;
|
||||
use chrono::{NaiveDateTime, Utc};
|
||||
pub(crate) use download::{
|
||||
download_index_part, download_initdb_tar_zst, download_tenant_manifest, is_temp_download_file,
|
||||
list_remote_tenant_shards, list_remote_timelines,
|
||||
download_index_part, download_initdb_tar_zst, download_template_index_part,
|
||||
download_tenant_manifest, is_temp_download_file, list_remote_tenant_shards,
|
||||
list_remote_timelines,
|
||||
};
|
||||
use index::GcCompactionState;
|
||||
pub(crate) use index::LayerFileMetadata;
|
||||
@@ -1768,7 +1769,7 @@ impl RemoteTimelineClient {
|
||||
adopted_as: &Layer,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let source_remote_path = remote_layer_path(
|
||||
let source_remote_path = remote_layer_path_maybe_template(
|
||||
&self.tenant_shard_id.tenant_id,
|
||||
&adopted
|
||||
.get_timeline_id()
|
||||
@@ -1776,6 +1777,7 @@ impl RemoteTimelineClient {
|
||||
adopted.metadata().shard,
|
||||
&adopted.layer_desc().layer_name(),
|
||||
adopted.metadata().generation,
|
||||
adopted.metadata().template_ttid,
|
||||
);
|
||||
|
||||
let target_remote_path = remote_layer_path(
|
||||
@@ -2684,6 +2686,31 @@ pub fn remote_layer_path(
|
||||
RemotePath::from_string(&path).expect("Failed to construct path")
|
||||
}
|
||||
|
||||
pub fn remote_layer_path_maybe_template(
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
shard: ShardIndex,
|
||||
layer_file_name: &LayerName,
|
||||
generation: Generation,
|
||||
template_ttid: Option<(TenantShardId, TimelineId)>,
|
||||
) -> RemotePath {
|
||||
if let Some((template_tenant_shard_id, template_timeline_id)) = template_ttid {
|
||||
let template_tenant_id = template_tenant_shard_id.tenant_id;
|
||||
let template_shard_id = template_tenant_shard_id.to_index();
|
||||
// Generation-aware key format
|
||||
let path = format!(
|
||||
"templates/{template_tenant_id}{0}/{TIMELINES_SEGMENT_NAME}/{template_timeline_id}/{1}{2}",
|
||||
template_shard_id.get_suffix(),
|
||||
layer_file_name,
|
||||
generation.get_suffix()
|
||||
);
|
||||
|
||||
RemotePath::from_string(&path).expect("Failed to construct path")
|
||||
} else {
|
||||
remote_layer_path(tenant_id, timeline_id, shard, layer_file_name, generation)
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if a and b have the same layer path within a tenant/timeline. This is essentially
|
||||
/// remote_layer_path(a) == remote_layer_path(b) without the string allocations.
|
||||
///
|
||||
@@ -2729,6 +2756,19 @@ pub fn remote_index_path(
|
||||
.expect("Failed to construct path")
|
||||
}
|
||||
|
||||
pub fn remote_template_index_path(
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
generation: Generation,
|
||||
) -> RemotePath {
|
||||
RemotePath::from_string(&format!(
|
||||
"templates/{tenant_shard_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
|
||||
IndexPart::FILE_NAME,
|
||||
generation.get_suffix()
|
||||
))
|
||||
.expect("Failed to construct path")
|
||||
}
|
||||
|
||||
pub(crate) fn remote_heatmap_path(tenant_shard_id: &TenantShardId) -> RemotePath {
|
||||
RemotePath::from_string(&format!(
|
||||
"tenants/{tenant_shard_id}/{TENANT_HEATMAP_BASENAME}"
|
||||
|
||||
@@ -29,7 +29,7 @@ use super::manifest::TenantManifest;
|
||||
use super::{
|
||||
FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, INITDB_PATH, parse_remote_index_path,
|
||||
parse_remote_tenant_manifest_path, remote_index_path, remote_initdb_archive_path,
|
||||
remote_initdb_preserved_archive_path, remote_tenant_manifest_path,
|
||||
remote_initdb_preserved_archive_path, remote_template_index_path, remote_tenant_manifest_path,
|
||||
remote_tenant_manifest_prefix, remote_tenant_path,
|
||||
};
|
||||
use crate::TEMP_FILE_SUFFIX;
|
||||
@@ -39,7 +39,9 @@ use crate::span::{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id, debug_assert_current_span_has_tenant_id,
|
||||
};
|
||||
use crate::tenant::Generation;
|
||||
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
|
||||
use crate::tenant::remote_timeline_client::{
|
||||
remote_layer_path_maybe_template, remote_timelines_path,
|
||||
};
|
||||
use crate::tenant::storage_layer::LayerName;
|
||||
use crate::virtual_file;
|
||||
use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
|
||||
@@ -68,12 +70,13 @@ pub async fn download_layer_file<'a>(
|
||||
|
||||
let timeline_path = conf.timeline_path(&tenant_shard_id, &timeline_id);
|
||||
|
||||
let remote_path = remote_layer_path(
|
||||
let remote_path = remote_layer_path_maybe_template(
|
||||
&tenant_shard_id.tenant_id,
|
||||
&timeline_id,
|
||||
layer_metadata.shard,
|
||||
layer_file_name,
|
||||
layer_metadata.generation,
|
||||
layer_metadata.template_ttid,
|
||||
);
|
||||
|
||||
let (bytes_amount, temp_file) = download_retry(
|
||||
@@ -393,6 +396,32 @@ async fn do_download_index_part(
|
||||
Ok((index_part, index_generation, index_part_mtime))
|
||||
}
|
||||
|
||||
async fn do_download_template_index_part(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: Option<&TimelineId>,
|
||||
index_generation: Generation,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
|
||||
let timeline_id =
|
||||
timeline_id.expect("A timeline ID is always provided when downloading an index");
|
||||
let remote_path = remote_template_index_path(tenant_shard_id, timeline_id, index_generation);
|
||||
|
||||
let download_opts = DownloadOpts {
|
||||
kind: DownloadKind::Small,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let (index_part_bytes, index_part_mtime) =
|
||||
do_download_remote_path_retry_forever(storage, &remote_path, download_opts, cancel).await?;
|
||||
|
||||
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
|
||||
.with_context(|| format!("deserialize index part file at {remote_path:?}"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
Ok((index_part, index_generation, index_part_mtime))
|
||||
}
|
||||
|
||||
/// Metadata objects are "generationed", meaning that they include a generation suffix. This
|
||||
/// function downloads the object with the highest generation <= `my_generation`.
|
||||
///
|
||||
@@ -559,6 +588,35 @@ pub(crate) async fn download_index_part(
|
||||
.await
|
||||
}
|
||||
|
||||
/// index_part.json objects are suffixed with a generation number, so we cannot
|
||||
/// directly GET the latest index part without doing some probing.
|
||||
///
|
||||
/// In this function we probe for the most recent index in a generation <= our current generation.
|
||||
/// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
|
||||
pub(crate) async fn download_template_index_part(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
my_generation: Generation,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let index_prefix = remote_template_index_path(tenant_shard_id, timeline_id, Generation::none());
|
||||
download_generation_object(
|
||||
storage,
|
||||
tenant_shard_id,
|
||||
Some(timeline_id),
|
||||
my_generation,
|
||||
"index_part",
|
||||
index_prefix,
|
||||
do_download_template_index_part,
|
||||
parse_remote_index_path,
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn download_tenant_manifest(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
|
||||
@@ -12,6 +12,7 @@ use pageserver_api::shard::ShardIndex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::shard::TenantShardId;
|
||||
|
||||
use super::is_same_remote_layer_path;
|
||||
use crate::tenant::Generation;
|
||||
@@ -233,6 +234,10 @@ pub struct LayerFileMetadata {
|
||||
#[serde(default = "ShardIndex::unsharded")]
|
||||
#[serde(skip_serializing_if = "ShardIndex::is_unsharded")]
|
||||
pub shard: ShardIndex,
|
||||
|
||||
#[serde(default)]
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub template_ttid: Option<(TenantShardId, TimelineId)>,
|
||||
}
|
||||
|
||||
impl LayerFileMetadata {
|
||||
@@ -241,6 +246,7 @@ impl LayerFileMetadata {
|
||||
file_size,
|
||||
generation,
|
||||
shard,
|
||||
template_ttid: None,
|
||||
}
|
||||
}
|
||||
/// Helper to get both generation and file size in a tuple
|
||||
|
||||
@@ -185,6 +185,7 @@ impl Layer {
|
||||
None,
|
||||
metadata.generation,
|
||||
metadata.shard,
|
||||
metadata.template_ttid,
|
||||
)));
|
||||
|
||||
debug_assert!(owner.0.needs_download_blocking().unwrap().is_some());
|
||||
@@ -225,6 +226,7 @@ impl Layer {
|
||||
Some(inner),
|
||||
metadata.generation,
|
||||
metadata.shard,
|
||||
metadata.template_ttid,
|
||||
)
|
||||
}));
|
||||
|
||||
@@ -273,6 +275,7 @@ impl Layer {
|
||||
Some(inner),
|
||||
timeline.generation,
|
||||
timeline.get_shard_index(),
|
||||
None,
|
||||
)
|
||||
}));
|
||||
|
||||
@@ -710,6 +713,8 @@ struct LayerInner {
|
||||
/// a shard split since the layer was originally written.
|
||||
shard: ShardIndex,
|
||||
|
||||
template_ttid: Option<(TenantShardId, TimelineId)>,
|
||||
|
||||
/// When the Layer was last evicted but has not been downloaded since.
|
||||
///
|
||||
/// This is used for skipping evicted layers from the previous heatmap (see
|
||||
@@ -853,6 +858,7 @@ impl LayerInner {
|
||||
downloaded: Option<Arc<DownloadedLayer>>,
|
||||
generation: Generation,
|
||||
shard: ShardIndex,
|
||||
template_ttid: Option<(TenantShardId, TimelineId)>,
|
||||
) -> Self {
|
||||
let (inner, version, init_status) = if let Some(inner) = downloaded {
|
||||
let version = inner.version;
|
||||
@@ -888,6 +894,7 @@ impl LayerInner {
|
||||
consecutive_failures: AtomicUsize::new(0),
|
||||
generation,
|
||||
shard,
|
||||
template_ttid,
|
||||
last_evicted_at: std::sync::Mutex::default(),
|
||||
#[cfg(test)]
|
||||
failpoints: Default::default(),
|
||||
@@ -1623,7 +1630,9 @@ impl LayerInner {
|
||||
}
|
||||
|
||||
fn metadata(&self) -> LayerFileMetadata {
|
||||
LayerFileMetadata::new(self.desc.file_size, self.generation, self.shard)
|
||||
let mut metadata = LayerFileMetadata::new(self.desc.file_size, self.generation, self.shard);
|
||||
metadata.template_ttid = self.template_ttid;
|
||||
metadata
|
||||
}
|
||||
|
||||
/// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME.
|
||||
@@ -1771,13 +1780,19 @@ impl DownloadedLayer {
|
||||
"these are the same, just avoiding the upgrade"
|
||||
);
|
||||
|
||||
let (ex_tenant_id, ex_timeline_id) =
|
||||
if let Some((tenant_id, timeline_id)) = owner.template_ttid {
|
||||
(tenant_id.tenant_id, timeline_id)
|
||||
} else {
|
||||
(owner.desc.tenant_shard_id.tenant_id, owner.desc.timeline_id)
|
||||
};
|
||||
let res = if owner.desc.is_delta {
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.page_content_kind(crate::context::PageContentKind::DeltaLayerSummary)
|
||||
.attached_child();
|
||||
let summary = Some(delta_layer::Summary::expected(
|
||||
owner.desc.tenant_shard_id.tenant_id,
|
||||
owner.desc.timeline_id,
|
||||
ex_tenant_id,
|
||||
ex_timeline_id,
|
||||
owner.desc.key_range.clone(),
|
||||
owner.desc.lsn_range.clone(),
|
||||
));
|
||||
@@ -1795,8 +1810,8 @@ impl DownloadedLayer {
|
||||
.attached_child();
|
||||
let lsn = owner.desc.image_layer_lsn();
|
||||
let summary = Some(image_layer::Summary::expected(
|
||||
owner.desc.tenant_shard_id.tenant_id,
|
||||
owner.desc.timeline_id,
|
||||
ex_tenant_id,
|
||||
ex_timeline_id,
|
||||
owner.desc.key_range.clone(),
|
||||
lsn,
|
||||
));
|
||||
|
||||
190
test_runner/regress/test_templates.py
Normal file
190
test_runner/regress/test_templates.py
Normal file
@@ -0,0 +1,190 @@
|
||||
import base64
|
||||
import json
|
||||
import shutil
|
||||
import time
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from threading import Event
|
||||
|
||||
import psycopg2
|
||||
import psycopg2.errors
|
||||
import pytest
|
||||
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
|
||||
from fixtures.fast_import import FastImport
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
PgProtocol,
|
||||
VanillaPostgres,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.pageserver.http import (
|
||||
ImportPgdataIdemptencyKey,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
from fixtures.remote_storage import LocalFsStorage, MockS3Server, RemoteStorageKind
|
||||
from fixtures.utils import (
|
||||
run_only_on_default_postgres,
|
||||
shared_buffers_for_max_cu,
|
||||
skip_in_debug_build,
|
||||
wait_until,
|
||||
)
|
||||
from fixtures.workload import Workload
|
||||
from mypy_boto3_kms import KMSClient
|
||||
from mypy_boto3_kms.type_defs import EncryptResponseTypeDef
|
||||
from mypy_boto3_s3 import S3Client
|
||||
from pytest_httpserver import HTTPServer
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
num_rows = 1000
|
||||
|
||||
|
||||
def mock_import_bucket(vanilla_pg: VanillaPostgres, path: Path):
|
||||
"""
|
||||
Mock the import S3 bucket into a local directory for a provided vanilla PG instance.
|
||||
"""
|
||||
assert not vanilla_pg.is_running()
|
||||
|
||||
path.mkdir()
|
||||
# what cplane writes before scheduling fast_import
|
||||
specpath = path / "spec.json"
|
||||
specpath.write_text(json.dumps({"branch_id": "somebranch", "project_id": "someproject"}))
|
||||
# what fast_import writes
|
||||
vanilla_pg.pgdatadir.rename(path / "pgdata")
|
||||
statusdir = path / "status"
|
||||
statusdir.mkdir()
|
||||
(statusdir / "pgdata").write_text(json.dumps({"done": True}))
|
||||
(statusdir / "fast_import").write_text(json.dumps({"command": "pgdata", "done": True}))
|
||||
|
||||
|
||||
def test_template_smoke(neon_env_builder: NeonEnvBuilder):
|
||||
shard_count = 1
|
||||
stripe_size = 1024
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
|
||||
|
||||
log.info("create template data")
|
||||
|
||||
template_tenant_id = TenantId.generate()
|
||||
template_timeline_id = TimelineId.generate()
|
||||
template_tenant_shard_id = TenantShardId(template_tenant_id, 0, 1)
|
||||
|
||||
env.create_tenant(shard_count=1, tenant_id=template_tenant_id, timeline_id=template_timeline_id)
|
||||
|
||||
def validate_data_equivalence(ep):
|
||||
# TODO: would be nicer to just compare pgdump
|
||||
|
||||
# Enable IO concurrency for batching on large sequential scan, to avoid making
|
||||
# this test unnecessarily onerous on CPU. Especially on debug mode, it's still
|
||||
# pretty onerous though, so increase statement_timeout to avoid timeouts.
|
||||
assert ep.safe_psql_many(
|
||||
[
|
||||
"set effective_io_concurrency=32;",
|
||||
"SET statement_timeout='300s';",
|
||||
"select count(*), sum(data::bigint)::bigint from t",
|
||||
]
|
||||
) == [[], [], [(expect_nrows, expect_sum)]]
|
||||
|
||||
# Fill the template with some data
|
||||
with env.endpoints.create_start("main", tenant_id=template_tenant_id) as endpoint:
|
||||
# fillfactor so we don't need to produce that much data
|
||||
# 900 byte per row is > 10% => 1 row per page
|
||||
endpoint.safe_psql("""create table t (data char(900)) with (fillfactor = 10)""")
|
||||
|
||||
nrows = 0
|
||||
|
||||
target_relblock_size = 1024 * 8192
|
||||
|
||||
while True:
|
||||
relblock_size = endpoint.safe_psql_scalar("select pg_relation_size('t')")
|
||||
log.info(
|
||||
f"relblock size: {relblock_size / 8192} pages (target: {target_relblock_size // 8192}) pages"
|
||||
)
|
||||
if relblock_size >= target_relblock_size:
|
||||
break
|
||||
addrows = int((target_relblock_size - relblock_size) // 8192)
|
||||
assert addrows >= 1, "forward progress"
|
||||
endpoint.safe_psql(
|
||||
f"insert into t select generate_series({nrows + 1}, {nrows + addrows})"
|
||||
)
|
||||
nrows += addrows
|
||||
expect_nrows = nrows
|
||||
expect_sum = (
|
||||
(nrows) * (nrows + 1) // 2
|
||||
) # https://stackoverflow.com/questions/43901484/sum-of-the-integers-from-1-to-n
|
||||
|
||||
env.pageserver.http_client().timeline_checkpoint(
|
||||
template_tenant_shard_id, template_timeline_id
|
||||
)
|
||||
wait_for_last_flush_lsn(env, endpoint, template_tenant_id, template_timeline_id)
|
||||
|
||||
validate_data_equivalence(endpoint)
|
||||
|
||||
# Copy the template to the templates dir and delete the original project
|
||||
from_dir = env.pageserver_remote_storage.tenant_path(template_tenant_shard_id)
|
||||
to_dir = env.pageserver_remote_storage.root / "templates" / str(template_tenant_id)
|
||||
shutil.copytree(from_dir, to_dir)
|
||||
|
||||
# Do the template creation
|
||||
new_tenant_id = TenantId.generate()
|
||||
env.storage_controller.tenant_create(new_tenant_id)
|
||||
|
||||
new_timeline_id = TimelineId.generate()
|
||||
log.info("starting timeline creation")
|
||||
start = time.monotonic()
|
||||
|
||||
branch_name = "on_template"
|
||||
env.storage_controller.timeline_create(
|
||||
new_tenant_id,
|
||||
{
|
||||
"new_timeline_id": str(new_timeline_id),
|
||||
"template_tenant_id": str(template_tenant_id),
|
||||
"template_timeline_id": str(template_timeline_id),
|
||||
},
|
||||
)
|
||||
env.neon_cli.mappings_map_branch(branch_name, new_tenant_id, new_timeline_id)
|
||||
|
||||
# Get some timeline details for later.
|
||||
locations = env.storage_controller.locate(new_tenant_id)
|
||||
[shard_zero] = [
|
||||
loc for loc in locations if TenantShardId.parse(loc["shard_id"]).shard_number == 0
|
||||
]
|
||||
shard_zero_ps = env.get_pageserver(shard_zero["node_id"])
|
||||
shard_zero_http = shard_zero_ps.http_client()
|
||||
shard_zero_timeline_info = shard_zero_http.timeline_detail(
|
||||
shard_zero["shard_id"], new_timeline_id
|
||||
)
|
||||
initdb_lsn = Lsn(shard_zero_timeline_info["initdb_lsn"])
|
||||
min_readable_lsn = Lsn(shard_zero_timeline_info["min_readable_lsn"])
|
||||
last_record_lsn = Lsn(shard_zero_timeline_info["last_record_lsn"])
|
||||
disk_consistent_lsn = Lsn(shard_zero_timeline_info["disk_consistent_lsn"])
|
||||
_remote_consistent_lsn = Lsn(shard_zero_timeline_info["remote_consistent_lsn"])
|
||||
remote_consistent_lsn_visible = Lsn(shard_zero_timeline_info["remote_consistent_lsn_visible"])
|
||||
# assert remote_consistent_lsn_visible == remote_consistent_lsn TODO: this fails initially and after restart, presumably because `UploadQueue::clean.1` is still `None`
|
||||
# assert remote_consistent_lsn_visible == disk_consistent_lsn
|
||||
# assert initdb_lsn == min_readable_lsn
|
||||
# assert disk_consistent_lsn == initdb_lsn + 8
|
||||
assert last_record_lsn == disk_consistent_lsn
|
||||
# TODO: assert these values are the same everywhere
|
||||
|
||||
# Last step: validation
|
||||
|
||||
with env.endpoints.create_start(
|
||||
branch_name=branch_name,
|
||||
endpoint_id="ro",
|
||||
tenant_id=new_tenant_id,
|
||||
lsn=last_record_lsn,
|
||||
) as ro_endpoint:
|
||||
validate_data_equivalence(ro_endpoint)
|
||||
|
||||
# ensure the template survives restarts
|
||||
ro_endpoint.stop()
|
||||
env.pageserver.stop(immediate=True)
|
||||
env.pageserver.start()
|
||||
ro_endpoint.start()
|
||||
validate_data_equivalence(ro_endpoint)
|
||||
Reference in New Issue
Block a user