feat: Timeline detach ancestor (#7456)

## Problem

Timelines cannot be deleted if they have children. In many production
cases, a branch or a timeline has been created off the main branch for
various reasons to the effect of having now a "new main" branch. This
feature will make it possible to detach a timeline from its ancestor by
inheriting all of the data before the branchpoint to the detached
timeline and by also reparenting all of the ancestor's earlier branches
to the detached timeline.

## Summary of changes

- Earlier added copy_lsn_prefix functionality is used
- RemoteTimelineClient learns to adopt layers by copying them from
another timeline
- LayerManager adds support for adding adopted layers
-
`timeline::Timeline::{prepare_to_detach,complete_detaching}_from_ancestor`
and `timeline::detach_ancestor` are added
- HTTP PUT handler

Cc: #6994

Co-authored-by: Christian Schwarz <christian@neon.tech>
This commit is contained in:
Joonas Koivunen
2024-05-07 13:47:57 +03:00
committed by GitHub
parent af849a1f61
commit 3c9b484c4d
17 changed files with 1411 additions and 29 deletions

View File

@@ -1,3 +1,4 @@
pub mod detach_ancestor;
pub mod partitioning;
pub mod utilization;

View File

@@ -0,0 +1,6 @@
use utils::id::TimelineId;
#[derive(Default, serde::Serialize)]
pub struct AncestorDetached {
pub reparented_timelines: Vec<TimelineId>,
}

View File

@@ -1827,6 +1827,75 @@ async fn timeline_download_remote_layers_handler_get(
json_response(StatusCode::OK, info)
}
async fn timeline_detach_ancestor_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
use crate::tenant::timeline::detach_ancestor::Options;
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let span = tracing::info_span!("detach_ancestor", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id);
async move {
let mut options = Options::default();
let rewrite_concurrency =
parse_query_param::<_, std::num::NonZeroUsize>(&request, "rewrite_concurrency")?;
let copy_concurrency =
parse_query_param::<_, std::num::NonZeroUsize>(&request, "copy_concurrency")?;
[
(&mut options.rewrite_concurrency, rewrite_concurrency),
(&mut options.copy_concurrency, copy_concurrency),
]
.into_iter()
.filter_map(|(target, val)| val.map(|val| (target, val)))
.for_each(|(target, val)| *target = val);
let state = get_state(&request);
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
let ctx = RequestContext::new(TaskKind::DetachAncestor, DownloadBehavior::Download);
let ctx = &ctx;
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(|e| ApiError::NotFound(e.into()))?;
let (_guard, prepared) = timeline
.prepare_to_detach_from_ancestor(&tenant, options, ctx)
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
let res = state
.tenant_manager
.complete_detaching_timeline_ancestor(tenant_shard_id, timeline_id, prepared, ctx)
.await;
match res {
Ok(reparented_timelines) => {
let resp = pageserver_api::models::detach_ancestor::AncestorDetached {
reparented_timelines,
};
json_response(StatusCode::OK, resp)
}
Err(e) => Err(ApiError::InternalServerError(
e.context("timeline detach completion"),
)),
}
}
.instrument(span)
.await
}
async fn deletion_queue_flush(
r: Request<Body>,
cancel: CancellationToken,
@@ -2515,6 +2584,10 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_remote_layers",
|r| api_handler(r, timeline_download_remote_layers_handler_get),
)
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/detach_ancestor",
|r| api_handler(r, timeline_detach_ancestor_handler),
)
.delete("/v1/tenant/:tenant_shard_id/timeline/:timeline_id", |r| {
api_handler(r, timeline_delete_handler)
})

View File

@@ -33,7 +33,6 @@ impl Value {
}
}
#[cfg(test)]
#[derive(Debug, PartialEq)]
pub(crate) enum InvalidInput {
TooShortValue,
@@ -42,10 +41,8 @@ pub(crate) enum InvalidInput {
/// We could have a ValueRef where everything is `serde(borrow)`. Before implementing that, lets
/// use this type for querying if a slice looks some particular way.
#[cfg(test)]
pub(crate) struct ValueBytes;
#[cfg(test)]
impl ValueBytes {
pub(crate) fn will_init(raw: &[u8]) -> Result<bool, InvalidInput> {
if raw.len() < 12 {

View File

@@ -370,6 +370,8 @@ pub enum TaskKind {
#[cfg(test)]
UnitTest,
DetachAncestor,
}
#[derive(Default)]

View File

@@ -322,6 +322,9 @@ pub struct Tenant {
/// All [`Tenant::timelines`] of a given [`Tenant`] instance share the same [`throttle::Throttle`] instance.
pub(crate) timeline_get_throttle:
Arc<throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>>,
/// An ongoing timeline detach must be checked during attempts to GC or compact a timeline.
ongoing_timeline_detach: std::sync::Mutex<Option<(TimelineId, utils::completion::Barrier)>>,
}
impl std::fmt::Debug for Tenant {
@@ -2557,6 +2560,7 @@ impl Tenant {
&crate::metrics::tenant_throttling::TIMELINE_GET,
)),
tenant_conf: Arc::new(ArcSwap::from_pointee(attached_conf)),
ongoing_timeline_detach: std::sync::Mutex::default(),
}
}

View File

@@ -207,6 +207,24 @@ impl TimelineMetadata {
self.body.ancestor_lsn
}
/// When reparenting, the `ancestor_lsn` does not change.
pub fn reparent(&mut self, timeline: &TimelineId) {
assert!(self.body.ancestor_timeline.is_some());
// no assertion for redoing this: it's fine, we may have to repeat this multiple times over
self.body.ancestor_timeline = Some(*timeline);
}
pub fn detach_from_ancestor(&mut self, timeline: &TimelineId, ancestor_lsn: &Lsn) {
if let Some(ancestor) = self.body.ancestor_timeline {
assert_eq!(ancestor, *timeline);
}
if self.body.ancestor_lsn != Lsn(0) {
assert_eq!(self.body.ancestor_lsn, *ancestor_lsn);
}
self.body.ancestor_timeline = None;
self.body.ancestor_lsn = Lsn(0);
}
pub fn latest_gc_cutoff_lsn(&self) -> Lsn {
self.body.latest_gc_cutoff_lsn
}

View File

@@ -56,6 +56,7 @@ use utils::id::{TenantId, TimelineId};
use super::delete::DeleteTenantError;
use super::secondary::SecondaryTenant;
use super::timeline::detach_ancestor::PreparedTimelineDetach;
use super::TenantSharedResources;
/// For a tenant that appears in TenantsMap, it may either be
@@ -2007,6 +2008,101 @@ impl TenantManager {
})
.collect())
}
/// Completes an earlier prepared timeline detach ancestor.
pub(crate) async fn complete_detaching_timeline_ancestor(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
prepared: PreparedTimelineDetach,
ctx: &RequestContext,
) -> Result<Vec<TimelineId>, anyhow::Error> {
struct RevertOnDropSlot(Option<SlotGuard>);
impl Drop for RevertOnDropSlot {
fn drop(&mut self) {
if let Some(taken) = self.0.take() {
taken.revert();
}
}
}
impl RevertOnDropSlot {
fn into_inner(mut self) -> SlotGuard {
self.0.take().unwrap()
}
}
impl std::ops::Deref for RevertOnDropSlot {
type Target = SlotGuard;
fn deref(&self) -> &Self::Target {
self.0.as_ref().unwrap()
}
}
let slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
let slot_guard = RevertOnDropSlot(Some(slot_guard));
let tenant = {
let Some(old_slot) = slot_guard.get_old_value() else {
anyhow::bail!(
"Tenant not found when trying to complete detaching timeline ancestor"
);
};
let Some(tenant) = old_slot.get_attached() else {
anyhow::bail!("Tenant is not in attached state");
};
if !tenant.is_active() {
anyhow::bail!("Tenant is not active");
}
tenant.clone()
};
let timeline = tenant.get_timeline(timeline_id, true)?;
let reparented = timeline
.complete_detaching_timeline_ancestor(&tenant, prepared, ctx)
.await?;
let mut slot_guard = slot_guard.into_inner();
let (_guard, progress) = utils::completion::channel();
match tenant.shutdown(progress, ShutdownMode::Hard).await {
Ok(()) => {
slot_guard.drop_old_value()?;
}
Err(_barrier) => {
slot_guard.revert();
// this really should not happen, at all, unless shutdown was already going?
anyhow::bail!("Cannot restart Tenant, already shutting down");
}
}
let tenant_path = self.conf.tenant_path(&tenant_shard_id);
let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
let shard_identity = config.shard;
let tenant = tenant_spawn(
self.conf,
tenant_shard_id,
&tenant_path,
self.resources.clone(),
AttachedTenantConf::try_from(config)?,
shard_identity,
None,
self.tenants,
SpawnMode::Eager,
ctx,
)?;
slot_guard.upsert(TenantSlot::Attached(tenant))?;
Ok(reparented)
}
}
#[derive(Debug, thiserror::Error)]

View File

@@ -645,9 +645,61 @@ impl RemoteTimelineClient {
self.launch_queued_tasks(upload_queue);
}
pub(crate) async fn schedule_reparenting_and_wait(
self: &Arc<Self>,
new_parent: &TimelineId,
) -> anyhow::Result<()> {
// FIXME: because of how Timeline::schedule_uploads works when called from layer flushing
// and reads the in-memory part we cannot do the detaching like this
let receiver = {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
upload_queue.latest_metadata.reparent(new_parent);
self.schedule_index_upload(upload_queue, upload_queue.latest_metadata.clone());
self.schedule_barrier0(upload_queue)
};
Self::wait_completion0(receiver).await
}
/// Schedules uploading a new version of `index_part.json` with the given layers added,
/// detaching from ancestor and waits for it to complete.
///
/// Launch an upload operation in the background.
///
/// This is used with `Timeline::detach_ancestor` functionality.
pub(crate) async fn schedule_adding_existing_layers_to_index_detach_and_wait(
self: &Arc<Self>,
layers: &[Layer],
adopted: (TimelineId, Lsn),
) -> anyhow::Result<()> {
let barrier = {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
upload_queue
.latest_metadata
.detach_from_ancestor(&adopted.0, &adopted.1);
for layer in layers {
upload_queue
.latest_files
.insert(layer.layer_desc().filename(), layer.metadata());
}
self.schedule_index_upload(upload_queue, upload_queue.latest_metadata.clone());
let barrier = self.schedule_barrier0(upload_queue);
self.launch_queued_tasks(upload_queue);
barrier
};
Self::wait_completion0(barrier).await
}
/// Launch an upload operation in the background; the file is added to be included in next
/// `index_part.json` upload.
pub(crate) fn schedule_layer_file_upload(
self: &Arc<Self>,
layer: ResidentLayer,
@@ -673,9 +725,11 @@ impl RemoteTimelineClient {
upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
info!(
"scheduled layer file upload {layer} gen={:?} shard={:?}",
metadata.generation, metadata.shard
gen=?metadata.generation,
shard=?metadata.shard,
"scheduled layer file upload {layer}",
);
let op = UploadOp::UploadLayer(layer, metadata);
self.metric_begin(&op);
upload_queue.queued_operations.push_back(op);
@@ -882,12 +936,18 @@ impl RemoteTimelineClient {
/// Wait for all previously scheduled uploads/deletions to complete
pub(crate) async fn wait_completion(self: &Arc<Self>) -> anyhow::Result<()> {
let mut receiver = {
let receiver = {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
self.schedule_barrier0(upload_queue)
};
Self::wait_completion0(receiver).await
}
async fn wait_completion0(
mut receiver: tokio::sync::watch::Receiver<()>,
) -> anyhow::Result<()> {
if receiver.changed().await.is_err() {
anyhow::bail!("wait_completion aborted because upload queue was stopped");
}
@@ -1085,6 +1145,72 @@ impl RemoteTimelineClient {
Ok(())
}
/// Uploads the given layer **without** adding it to be part of a future `index_part.json` upload.
///
/// This is not normally needed.
pub(crate) async fn upload_layer_file(
self: &Arc<Self>,
uploaded: &ResidentLayer,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
backoff::retry(
|| async {
let m = uploaded.metadata();
upload::upload_timeline_layer(
self.conf,
&self.storage_impl,
uploaded.local_path(),
&uploaded.metadata(),
m.generation,
cancel,
)
.await
},
TimeoutOrCancel::caused_by_cancel,
FAILED_UPLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
"upload a layer without adding it to latest files",
cancel,
)
.await
.ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
.and_then(|x| x)
.context("upload a layer without adding it to latest files")
}
/// Copies the `adopted` remote existing layer to the remote path of `adopted_as`. The layer is
/// not added to be part of a future `index_part.json` upload.
pub(crate) async fn copy_timeline_layer(
self: &Arc<Self>,
adopted: &Layer,
adopted_as: &Layer,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
backoff::retry(
|| async {
upload::copy_timeline_layer(
self.conf,
&self.storage_impl,
adopted.local_path(),
&adopted.metadata(),
adopted_as.local_path(),
&adopted_as.metadata(),
cancel,
)
.await
},
TimeoutOrCancel::caused_by_cancel,
FAILED_UPLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
"copy timeline layer",
cancel,
)
.await
.ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
.and_then(|x| x)
.context("remote copy timeline layer")
}
async fn flush_deletion_queue(&self) -> Result<(), DeletionQueueError> {
match tokio::time::timeout(
DELETION_QUEUE_FLUSH_TIMEOUT,
@@ -1256,7 +1382,7 @@ impl RemoteTimelineClient {
while let Some(next_op) = upload_queue.queued_operations.front() {
// Can we run this task now?
let can_run_now = match next_op {
UploadOp::UploadLayer(_, _) => {
UploadOp::UploadLayer(..) => {
// Can always be scheduled.
true
}
@@ -1822,7 +1948,7 @@ pub fn parse_remote_index_path(path: RemotePath) -> Option<Generation> {
/// That path includes in itself both tenant and timeline ids, allowing to have a unique remote storage path.
///
/// Errors if the path provided does not start from pageserver's workdir.
pub fn remote_path(
pub(crate) fn remote_path(
conf: &PageServerConf,
local_path: &Utf8Path,
generation: Generation,

View File

@@ -120,6 +120,30 @@ pub(super) async fn upload_timeline_layer<'a>(
.with_context(|| format!("upload layer from local path '{source_path}'"))
}
pub(super) async fn copy_timeline_layer(
conf: &'static PageServerConf,
storage: &GenericRemoteStorage,
source_path: &Utf8Path,
source_metadata: &LayerFileMetadata,
target_path: &Utf8Path,
target_metadata: &LayerFileMetadata,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
fail_point!("before-copy-layer", |_| {
bail!("failpoint before-copy-layer")
});
pausable_failpoint!("before-copy-layer-pausable");
let source_path = remote_path(conf, source_path, source_metadata.generation)?;
let target_path = remote_path(conf, target_path, target_metadata.generation)?;
storage
.copy_object(&source_path, &target_path, cancel)
.await
.with_context(|| format!("copy layer {source_path} to {target_path}"))
}
/// Uploads the given `initdb` data to the remote storage.
pub(crate) async fn upload_initdb_dir(
storage: &GenericRemoteStorage,

View File

@@ -1139,15 +1139,15 @@ impl DeltaLayerInner {
Ok(all_keys)
}
/// Using the given writer, write out a truncated version, where LSNs higher than the
/// truncate_at are missing.
#[cfg(test)]
/// Using the given writer, write out a version which has the earlier Lsns than `until`.
///
/// Return the amount of key value records pushed to the writer.
pub(super) async fn copy_prefix(
&self,
writer: &mut DeltaLayerWriter,
truncate_at: Lsn,
until: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> anyhow::Result<usize> {
use crate::tenant::vectored_blob_io::{
BlobMeta, VectoredReadBuilder, VectoredReadExtended,
};
@@ -1211,6 +1211,8 @@ impl DeltaLayerInner {
// FIXME: buffering of DeltaLayerWriter
let mut per_blob_copy = Vec::new();
let mut records = 0;
while let Some(item) = stream.try_next().await? {
tracing::debug!(?item, "popped");
let offset = item
@@ -1229,7 +1231,7 @@ impl DeltaLayerInner {
prev = Option::from(item);
let actionable = actionable.filter(|x| x.0.lsn < truncate_at);
let actionable = actionable.filter(|x| x.0.lsn < until);
let builder = if let Some((meta, offsets)) = actionable {
// extend or create a new builder
@@ -1297,7 +1299,7 @@ impl DeltaLayerInner {
let will_init = crate::repository::ValueBytes::will_init(data)
.inspect_err(|_e| {
#[cfg(feature = "testing")]
tracing::error!(data=?utils::Hex(data), err=?_e, "failed to parse will_init out of serialized value");
tracing::error!(data=?utils::Hex(data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
})
.unwrap_or(false);
@@ -1314,7 +1316,10 @@ impl DeltaLayerInner {
)
.await;
per_blob_copy = tmp;
res?;
records += 1;
}
buffer = Some(res.buf);
@@ -1326,7 +1331,7 @@ impl DeltaLayerInner {
"with the sentinel above loop should had handled all"
);
Ok(())
Ok(records)
}
pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
@@ -1399,7 +1404,6 @@ impl DeltaLayerInner {
Ok(())
}
#[cfg(test)]
fn stream_index_forwards<'a, R>(
&'a self,
reader: &'a DiskBtreeReader<R, DELTA_KEY_SIZE>,

View File

@@ -1797,25 +1797,23 @@ impl ResidentLayer {
}
}
/// FIXME: truncate is bad name because we are not truncating anything, but copying the
/// filtered parts.
#[cfg(test)]
pub(super) async fn copy_delta_prefix(
/// Returns the amount of keys and values written to the writer.
pub(crate) async fn copy_delta_prefix(
&self,
writer: &mut super::delta_layer::DeltaLayerWriter,
truncate_at: Lsn,
until: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> anyhow::Result<usize> {
use LayerKind::*;
let owner = &self.owner.0;
match self.downloaded.get(owner, ctx).await? {
Delta(ref d) => d
.copy_prefix(writer, truncate_at, ctx)
.copy_prefix(writer, until, ctx)
.await
.with_context(|| format!("truncate {self}")),
Image(_) => anyhow::bail!(format!("cannot truncate image layer {self}")),
.with_context(|| format!("copy_delta_prefix until {until} of {self}")),
Image(_) => anyhow::bail!(format!("cannot copy_lsn_prefix of image layer {self}")),
}
}

View File

@@ -1,5 +1,6 @@
mod compaction;
pub mod delete;
pub(crate) mod detach_ancestor;
mod eviction_task;
mod init;
pub mod layer_manager;
@@ -1494,6 +1495,12 @@ impl Timeline {
/// Flush to disk all data that was written with the put_* functions
#[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
pub(crate) async fn freeze_and_flush(&self) -> anyhow::Result<()> {
self.freeze_and_flush0().await
}
// This exists to provide a non-span creating version of `freeze_and_flush` we can call without
// polluting the span hierarchy.
pub(crate) async fn freeze_and_flush0(&self) -> anyhow::Result<()> {
let to_lsn = self.freeze_inmem_layer(false).await;
self.flush_frozen_layers_and_wait(to_lsn).await
}
@@ -3510,7 +3517,7 @@ impl Timeline {
Ok(ancestor)
}
fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
pub(crate) fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
let ancestor = self.ancestor_timeline.as_ref().with_context(|| {
format!(
"Ancestor is missing. Timeline id: {} Ancestor id {:?}",
@@ -4326,6 +4333,49 @@ impl Timeline {
_ = self.cancel.cancelled() => {}
)
}
/// Detach this timeline from its ancestor by copying all of ancestors layers as this
/// Timelines layers up to the ancestor_lsn.
///
/// Requires a timeline that:
/// - has an ancestor to detach from
/// - the ancestor does not have an ancestor -- follows from the original RFC limitations, not
/// a technical requirement
/// - has prev_lsn in remote storage (temporary restriction)
///
/// After the operation has been started, it cannot be canceled. Upon restart it needs to be
/// polled again until completion.
///
/// During the operation all timelines sharing the data with this timeline will be reparented
/// from our ancestor to be branches of this timeline.
pub(crate) async fn prepare_to_detach_from_ancestor(
self: &Arc<Timeline>,
tenant: &crate::tenant::Tenant,
options: detach_ancestor::Options,
ctx: &RequestContext,
) -> Result<
(
completion::Completion,
detach_ancestor::PreparedTimelineDetach,
),
detach_ancestor::Error,
> {
detach_ancestor::prepare(self, tenant, options, ctx).await
}
/// Completes the ancestor detach. This method is to be called while holding the
/// TenantManager's tenant slot, so during this method we cannot be deleted nor can any
/// timeline be deleted. After this method returns successfully, tenant must be reloaded.
///
/// Pageserver receiving a SIGKILL during this operation is not supported (yet).
pub(crate) async fn complete_detaching_timeline_ancestor(
self: &Arc<Timeline>,
tenant: &crate::tenant::Tenant,
prepared: detach_ancestor::PreparedTimelineDetach,
ctx: &RequestContext,
) -> Result<Vec<TimelineId>, anyhow::Error> {
detach_ancestor::complete(self, tenant, prepared, ctx).await
}
}
/// Top-level failure to compact.
@@ -4610,6 +4660,8 @@ impl Timeline {
retain_lsns: Vec<Lsn>,
new_gc_cutoff: Lsn,
) -> anyhow::Result<GcResult> {
// FIXME: if there is an ongoing detach_from_ancestor, we should just skip gc
let now = SystemTime::now();
let mut result: GcResult = GcResult::default();

View File

@@ -422,6 +422,10 @@ impl DeleteTimelineFlow {
pub(crate) fn is_finished(&self) -> bool {
matches!(self, Self::Finished)
}
pub(crate) fn is_not_started(&self) -> bool {
matches!(self, Self::NotStarted)
}
}
struct DeletionGuard(OwnedMutexGuard<DeleteTimelineFlow>);

View File

@@ -0,0 +1,550 @@
use std::sync::Arc;
use super::{layer_manager::LayerManager, Timeline};
use crate::{
context::{DownloadBehavior, RequestContext},
task_mgr::TaskKind,
tenant::{
storage_layer::{AsLayerDesc as _, DeltaLayerWriter, Layer, ResidentLayer},
Tenant,
},
virtual_file::{MaybeFatalIo, VirtualFile},
};
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use utils::{completion, generation::Generation, id::TimelineId, lsn::Lsn};
#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
#[error("no ancestors")]
NoAncestor,
#[error("too many ancestors")]
TooManyAncestors,
#[error("shutting down, please retry later")]
ShuttingDown,
#[error("detached timeline must receive writes before the operation")]
DetachedTimelineNeedsWrites,
#[error("flushing failed")]
FlushAncestor(#[source] anyhow::Error),
#[error("layer download failed")]
RewrittenDeltaDownloadFailed(#[source] anyhow::Error),
#[error("copying LSN prefix locally failed")]
CopyDeltaPrefix(#[source] anyhow::Error),
#[error("upload rewritten layer")]
UploadRewritten(#[source] anyhow::Error),
#[error("ancestor is already being detached by: {}", .0)]
OtherTimelineDetachOngoing(TimelineId),
#[error("remote copying layer failed")]
CopyFailed(#[source] anyhow::Error),
#[error("unexpected error")]
Unexpected(#[source] anyhow::Error),
}
pub(crate) struct PreparedTimelineDetach {
layers: Vec<Layer>,
}
/// TODO: this should be part of PageserverConf because we cannot easily modify cplane arguments.
#[derive(Debug)]
pub(crate) struct Options {
pub(crate) rewrite_concurrency: std::num::NonZeroUsize,
pub(crate) copy_concurrency: std::num::NonZeroUsize,
}
impl Default for Options {
fn default() -> Self {
Self {
rewrite_concurrency: std::num::NonZeroUsize::new(2).unwrap(),
copy_concurrency: std::num::NonZeroUsize::new(10).unwrap(),
}
}
}
/// See [`Timeline::prepare_to_detach_from_ancestor`]
pub(super) async fn prepare(
detached: &Arc<Timeline>,
tenant: &Tenant,
options: Options,
ctx: &RequestContext,
) -> Result<(completion::Completion, PreparedTimelineDetach), Error> {
use Error::*;
if detached.remote_client.as_ref().is_none() {
unimplemented!("no new code for running without remote storage");
}
let Some((ancestor, ancestor_lsn)) = detached
.ancestor_timeline
.as_ref()
.map(|tl| (tl.clone(), detached.ancestor_lsn))
else {
return Err(NoAncestor);
};
if !ancestor_lsn.is_valid() {
return Err(NoAncestor);
}
if ancestor.ancestor_timeline.is_some() {
// non-technical requirement; we could flatten N ancestors just as easily but we chose
// not to
return Err(TooManyAncestors);
}
if detached.get_prev_record_lsn() == Lsn::INVALID
|| detached.disk_consistent_lsn.load() == ancestor_lsn
{
// this is to avoid a problem that after detaching we would be unable to start up the
// compute because of "PREV_LSN: invalid".
return Err(DetachedTimelineNeedsWrites);
}
// before we acquire the gate, we must mark the ancestor as having a detach operation
// ongoing which will block other concurrent detach operations so we don't get to ackward
// situations where there would be two branches trying to reparent earlier branches.
let (guard, barrier) = completion::channel();
{
let mut guard = tenant.ongoing_timeline_detach.lock().unwrap();
if let Some((tl, other)) = guard.as_ref() {
if !other.is_ready() {
return Err(OtherTimelineDetachOngoing(*tl));
}
}
*guard = Some((detached.timeline_id, barrier));
}
let _gate_entered = detached.gate.enter().map_err(|_| ShuttingDown)?;
if ancestor_lsn >= ancestor.get_disk_consistent_lsn() {
let span =
tracing::info_span!("freeze_and_flush", ancestor_timeline_id=%ancestor.timeline_id);
async {
let started_at = std::time::Instant::now();
let freeze_and_flush = ancestor.freeze_and_flush0();
let mut freeze_and_flush = std::pin::pin!(freeze_and_flush);
let res =
tokio::time::timeout(std::time::Duration::from_secs(1), &mut freeze_and_flush)
.await;
let res = match res {
Ok(res) => res,
Err(_elapsed) => {
tracing::info!("freezing and flushing ancestor is still ongoing");
freeze_and_flush.await
}
};
res.map_err(FlushAncestor)?;
// we do not need to wait for uploads to complete but we do need `struct Layer`,
// copying delta prefix is unsupported currently for `InMemoryLayer`.
tracing::info!(
elapsed_ms = started_at.elapsed().as_millis(),
"froze and flushed the ancestor"
);
Ok(())
}
.instrument(span)
.await?;
}
let end_lsn = ancestor_lsn + 1;
let (filtered_layers, straddling_branchpoint, rest_of_historic) = {
// we do not need to start from our layers, because they can only be layers that come
// *after* ancestor_lsn
let layers = tokio::select! {
guard = ancestor.layers.read() => guard,
_ = detached.cancel.cancelled() => {
return Err(ShuttingDown);
}
_ = ancestor.cancel.cancelled() => {
return Err(ShuttingDown);
}
};
// between retries, these can change if compaction or gc ran in between. this will mean
// we have to redo work.
partition_work(ancestor_lsn, &layers)
};
// TODO: layers are already sorted by something: use that to determine how much of remote
// copies are already done.
tracing::info!(filtered=%filtered_layers, to_rewrite = straddling_branchpoint.len(), historic=%rest_of_historic.len(), "collected layers");
// TODO: copying and lsn prefix copying could be done at the same time with a single fsync after
let mut new_layers: Vec<Layer> =
Vec::with_capacity(straddling_branchpoint.len() + rest_of_historic.len());
{
tracing::debug!(to_rewrite = %straddling_branchpoint.len(), "copying prefix of delta layers");
let mut tasks = tokio::task::JoinSet::new();
let mut wrote_any = false;
let limiter = Arc::new(tokio::sync::Semaphore::new(
options.rewrite_concurrency.get(),
));
for layer in straddling_branchpoint {
let limiter = limiter.clone();
let timeline = detached.clone();
let ctx = ctx.detached_child(TaskKind::DetachAncestor, DownloadBehavior::Download);
tasks.spawn(async move {
let _permit = limiter.acquire().await;
let copied =
upload_rewritten_layer(end_lsn, &layer, &timeline, &timeline.cancel, &ctx)
.await?;
Ok(copied)
});
}
while let Some(res) = tasks.join_next().await {
match res {
Ok(Ok(Some(copied))) => {
wrote_any = true;
tracing::info!(layer=%copied, "rewrote and uploaded");
new_layers.push(copied);
}
Ok(Ok(None)) => {}
Ok(Err(e)) => return Err(e),
Err(je) => return Err(Unexpected(je.into())),
}
}
// FIXME: the fsync should be mandatory, after both rewrites and copies
if wrote_any {
let timeline_dir = VirtualFile::open(
&detached
.conf
.timeline_path(&detached.tenant_shard_id, &detached.timeline_id),
)
.await
.fatal_err("VirtualFile::open for timeline dir fsync");
timeline_dir
.sync_all()
.await
.fatal_err("VirtualFile::sync_all timeline dir");
}
}
let mut tasks = tokio::task::JoinSet::new();
let limiter = Arc::new(tokio::sync::Semaphore::new(options.copy_concurrency.get()));
for adopted in rest_of_historic {
let limiter = limiter.clone();
let timeline = detached.clone();
tasks.spawn(
async move {
let _permit = limiter.acquire().await;
let owned =
remote_copy(&adopted, &timeline, timeline.generation, &timeline.cancel).await?;
tracing::info!(layer=%owned, "remote copied");
Ok(owned)
}
.in_current_span(),
);
}
while let Some(res) = tasks.join_next().await {
match res {
Ok(Ok(owned)) => {
new_layers.push(owned);
}
Ok(Err(failed)) => {
return Err(failed);
}
Err(je) => return Err(Unexpected(je.into())),
}
}
// TODO: fsync directory again if we hardlinked something
let prepared = PreparedTimelineDetach { layers: new_layers };
Ok((guard, prepared))
}
fn partition_work(
ancestor_lsn: Lsn,
source_layermap: &LayerManager,
) -> (usize, Vec<Layer>, Vec<Layer>) {
let mut straddling_branchpoint = vec![];
let mut rest_of_historic = vec![];
let mut later_by_lsn = 0;
for desc in source_layermap.layer_map().iter_historic_layers() {
// off by one chances here:
// - start is inclusive
// - end is exclusive
if desc.lsn_range.start > ancestor_lsn {
later_by_lsn += 1;
continue;
}
let target = if desc.lsn_range.start <= ancestor_lsn
&& desc.lsn_range.end > ancestor_lsn
&& desc.is_delta
{
// TODO: image layer at Lsn optimization
&mut straddling_branchpoint
} else {
&mut rest_of_historic
};
target.push(source_layermap.get_from_desc(&desc));
}
(later_by_lsn, straddling_branchpoint, rest_of_historic)
}
async fn upload_rewritten_layer(
end_lsn: Lsn,
layer: &Layer,
target: &Arc<Timeline>,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<Option<Layer>, Error> {
use Error::UploadRewritten;
let copied = copy_lsn_prefix(end_lsn, layer, target, ctx).await?;
let Some(copied) = copied else {
return Ok(None);
};
// FIXME: better shuttingdown error
target
.remote_client
.as_ref()
.unwrap()
.upload_layer_file(&copied, cancel)
.await
.map_err(UploadRewritten)?;
Ok(Some(copied.into()))
}
async fn copy_lsn_prefix(
end_lsn: Lsn,
layer: &Layer,
target_timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> Result<Option<ResidentLayer>, Error> {
use Error::{CopyDeltaPrefix, RewrittenDeltaDownloadFailed};
tracing::debug!(%layer, %end_lsn, "copying lsn prefix");
let mut writer = DeltaLayerWriter::new(
target_timeline.conf,
target_timeline.timeline_id,
target_timeline.tenant_shard_id,
layer.layer_desc().key_range.start,
layer.layer_desc().lsn_range.start..end_lsn,
)
.await
.map_err(CopyDeltaPrefix)?;
let resident = layer
.download_and_keep_resident()
.await
// likely shutdown
.map_err(RewrittenDeltaDownloadFailed)?;
let records = resident
.copy_delta_prefix(&mut writer, end_lsn, ctx)
.await
.map_err(CopyDeltaPrefix)?;
drop(resident);
tracing::debug!(%layer, records, "copied records");
if records == 0 {
drop(writer);
// TODO: we might want to store an empty marker in remote storage for this
// layer so that we will not needlessly walk `layer` on repeated attempts.
Ok(None)
} else {
// reuse the key instead of adding more holes between layers by using the real
// highest key in the layer.
let reused_highest_key = layer.layer_desc().key_range.end;
let copied = writer
.finish(reused_highest_key, target_timeline, ctx)
.await
.map_err(CopyDeltaPrefix)?;
tracing::debug!(%layer, %copied, "new layer produced");
Ok(Some(copied))
}
}
/// Creates a new Layer instance for the adopted layer, and ensures it is found from the remote
/// storage on successful return without the adopted layer being added to `index_part.json`.
async fn remote_copy(
adopted: &Layer,
adoptee: &Arc<Timeline>,
generation: Generation,
cancel: &CancellationToken,
) -> Result<Layer, Error> {
use Error::CopyFailed;
// depending if Layer::keep_resident we could hardlink
let mut metadata = adopted.metadata();
debug_assert!(metadata.generation <= generation);
metadata.generation = generation;
let owned = crate::tenant::storage_layer::Layer::for_evicted(
adoptee.conf,
adoptee,
adopted.layer_desc().filename(),
metadata,
);
// FIXME: better shuttingdown error
adoptee
.remote_client
.as_ref()
.unwrap()
.copy_timeline_layer(adopted, &owned, cancel)
.await
.map(move |()| owned)
.map_err(CopyFailed)
}
/// See [`Timeline::complete_detaching_timeline_ancestor`].
pub(super) async fn complete(
detached: &Arc<Timeline>,
tenant: &Tenant,
prepared: PreparedTimelineDetach,
_ctx: &RequestContext,
) -> Result<Vec<TimelineId>, anyhow::Error> {
let rtc = detached
.remote_client
.as_ref()
.expect("has to have a remote timeline client for timeline ancestor detach");
let PreparedTimelineDetach { layers } = prepared;
let ancestor = detached
.get_ancestor_timeline()
.expect("must still have a ancestor");
let ancestor_lsn = detached.get_ancestor_lsn();
// publish the prepared layers before we reparent any of the timelines, so that on restart
// reparented timelines find layers. also do the actual detaching.
//
// if we crash after this operation, we will at least come up having detached a timeline, but
// we cannot go back and reparent the timelines which would had been reparented in normal
// execution.
//
// this is not perfect, but it avoids us a retry happening after a compaction or gc on restart
// which could give us a completely wrong layer combination.
rtc.schedule_adding_existing_layers_to_index_detach_and_wait(
&layers,
(ancestor.timeline_id, ancestor_lsn),
)
.await?;
let mut tasks = tokio::task::JoinSet::new();
// because we are now keeping the slot in progress, it is unlikely that there will be any
// timeline deletions during this time. if we raced one, then we'll just ignore it.
tenant
.timelines
.lock()
.unwrap()
.values()
.filter_map(|tl| {
if Arc::ptr_eq(tl, detached) {
return None;
}
if !tl.is_active() {
return None;
}
let tl_ancestor = tl.ancestor_timeline.as_ref()?;
let is_same = Arc::ptr_eq(&ancestor, tl_ancestor);
let is_earlier = tl.get_ancestor_lsn() <= ancestor_lsn;
let is_deleting = tl
.delete_progress
.try_lock()
.map(|flow| !flow.is_not_started())
.unwrap_or(true);
if is_same && is_earlier && !is_deleting {
Some(tl.clone())
} else {
None
}
})
.for_each(|timeline| {
// important in this scope: we are holding the Tenant::timelines lock
let span = tracing::info_span!("reparent", reparented=%timeline.timeline_id);
let new_parent = detached.timeline_id;
tasks.spawn(
async move {
let res = timeline
.remote_client
.as_ref()
.expect("reparented has to have remote client because detached has one")
.schedule_reparenting_and_wait(&new_parent)
.await;
match res {
Ok(()) => Some(timeline),
Err(e) => {
// with the use of tenant slot, we no longer expect these.
tracing::warn!("reparenting failed: {e:#}");
None
}
}
}
.instrument(span),
);
});
let reparenting_candidates = tasks.len();
let mut reparented = Vec::with_capacity(tasks.len());
while let Some(res) = tasks.join_next().await {
match res {
Ok(Some(timeline)) => {
tracing::info!(reparented=%timeline.timeline_id, "reparenting done");
reparented.push(timeline.timeline_id);
}
Ok(None) => {
// lets just ignore this for now. one or all reparented timelines could had
// started deletion, and that is fine.
}
Err(je) if je.is_cancelled() => unreachable!("not used"),
Err(je) if je.is_panic() => {
// ignore; it's better to continue with a single reparenting failing (or even
// all of them) in order to get to the goal state.
//
// these timelines will never be reparentable, but they can be always detached as
// separate tree roots.
}
Err(je) => tracing::error!("unexpected join error: {je:?}"),
}
}
if reparenting_candidates != reparented.len() {
tracing::info!("failed to reparent some candidates");
}
Ok(reparented)
}

View File

@@ -819,6 +819,23 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
continue
self.download_layer(tenant_id, timeline_id, layer.layer_file_name)
def detach_ancestor(
self,
tenant_id: Union[TenantId, TenantShardId],
timeline_id: TimelineId,
batch_size: int | None = None,
) -> Set[TimelineId]:
params = {}
if batch_size is not None:
params["batch_size"] = batch_size
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/detach_ancestor",
params=params,
)
self.verbose_error(res)
json = res.json()
return set(map(TimelineId, json["reparented_timelines"]))
def evict_layer(
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, layer_name: str
):

View File

@@ -0,0 +1,410 @@
import enum
from concurrent.futures import ThreadPoolExecutor
from queue import Empty, Queue
from threading import Barrier
from typing import List
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.http import HistoricLayerInfo
from fixtures.pageserver.utils import wait_timeline_detail_404
from fixtures.types import Lsn, TimelineId
def by_end_lsn(info: HistoricLayerInfo) -> Lsn:
assert info.lsn_end is not None
return Lsn(info.lsn_end)
def layer_name(info: HistoricLayerInfo) -> str:
return info.layer_file_name
@enum.unique
class Branchpoint(str, enum.Enum):
"""
Have branches at these Lsns possibly relative to L0 layer boundary.
"""
EARLIER = "earlier"
AT_L0 = "at"
AFTER_L0 = "after"
LAST_RECORD_LSN = "head"
def __str__(self) -> str:
return self.value
@staticmethod
def all() -> List["Branchpoint"]:
return [
Branchpoint.EARLIER,
Branchpoint.AT_L0,
Branchpoint.AFTER_L0,
Branchpoint.LAST_RECORD_LSN,
]
@pytest.mark.parametrize("branchpoint", Branchpoint.all())
@pytest.mark.parametrize("restart_after", [True, False])
def test_ancestor_detach_branched_from(
neon_env_builder: NeonEnvBuilder, branchpoint: Branchpoint, restart_after: bool
):
"""
Creates a branch relative to L0 lsn boundary according to Branchpoint. Later the timeline is detached.
"""
# TODO: parametrize; currently unimplemented over at pageserver
write_to_branch_first = True
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(
[
".*initial size calculation failed: downloading failed, possibly for shutdown"
".*failed to freeze and flush: cannot flush frozen layers when flush_loop is not running, state is Exited",
]
)
client = env.pageserver.http_client()
with env.endpoints.create_start("main", tenant_id=env.initial_tenant) as ep:
ep.safe_psql("CREATE TABLE foo (i BIGINT);")
after_first_tx = wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
ep.safe_psql("INSERT INTO foo SELECT i::bigint FROM generate_series(0, 8191) g(i);")
# create a single layer for us to remote copy
wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
client.timeline_checkpoint(env.initial_tenant, env.initial_timeline)
ep.safe_psql("INSERT INTO foo SELECT i::bigint FROM generate_series(8192, 16383) g(i);")
wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
deltas = client.layer_map_info(env.initial_tenant, env.initial_timeline).delta_layers()
# there is also the in-mem layer, but ignore it for now
assert len(deltas) == 2, "expecting there to be two deltas: initdb and checkpointed"
later_delta = max(deltas, key=by_end_lsn)
assert later_delta.lsn_end is not None
# -1 as the lsn_end is exclusive.
last_lsn = Lsn(later_delta.lsn_end).lsn_int - 1
if branchpoint == Branchpoint.EARLIER:
branch_at = after_first_tx
rows = 0
truncated_layers = 1
elif branchpoint == Branchpoint.AT_L0:
branch_at = Lsn(last_lsn)
rows = 8192
truncated_layers = 0
elif branchpoint == Branchpoint.AFTER_L0:
branch_at = Lsn(last_lsn + 8)
rows = 8192
# as there is no 8 byte walrecord, nothing should get copied from the straddling layer
truncated_layers = 0
else:
# this case also covers the implicit flush of ancestor as the inmemory hasn't been flushed yet
assert branchpoint == Branchpoint.LAST_RECORD_LSN
branch_at = None
rows = 16384
truncated_layers = 0
name = "new main"
timeline_id = env.neon_cli.create_branch(
name, "main", env.initial_tenant, ancestor_start_lsn=branch_at
)
recorded = Lsn(client.timeline_detail(env.initial_tenant, timeline_id)["ancestor_lsn"])
if branch_at is None:
# fix it up if we need it later (currently unused)
branch_at = recorded
else:
assert branch_at == recorded, "the test should not use unaligned lsns"
if write_to_branch_first:
with env.endpoints.create_start(name, tenant_id=env.initial_tenant) as ep:
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows
# make sure the ep is writable
# with BEFORE_L0, AFTER_L0 there will be a gap in Lsns caused by accurate end_lsn on straddling layers
ep.safe_psql("CREATE TABLE audit AS SELECT 1 as starts;")
wait_for_last_flush_lsn(env, ep, env.initial_tenant, timeline_id)
# branch must have a flush for "PREV_LSN: none"
client.timeline_checkpoint(env.initial_tenant, timeline_id)
branch_layers = set(
map(layer_name, client.layer_map_info(env.initial_tenant, timeline_id).historic_layers)
)
else:
branch_layers = set()
all_reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
assert all_reparented == set()
if restart_after:
env.pageserver.stop()
env.pageserver.start()
with env.endpoints.create_start("main", tenant_id=env.initial_tenant) as ep:
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == 16384
with env.endpoints.create_start(name, tenant_id=env.initial_tenant) as ep:
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows
old_main_info = client.layer_map_info(env.initial_tenant, env.initial_timeline)
old_main = set(map(layer_name, old_main_info.historic_layers))
new_main_info = client.layer_map_info(env.initial_tenant, timeline_id)
new_main = set(map(layer_name, new_main_info.historic_layers))
new_main_copied_or_truncated = new_main - branch_layers
new_main_truncated = new_main_copied_or_truncated - old_main
assert len(new_main_truncated) == truncated_layers
# could additionally check that the symmetric difference has layers starting at the same lsn
# but if nothing was copied, then there is no nice rule.
# there could be a hole in LSNs between copied from the "old main" and the first branch layer.
client.timeline_delete(env.initial_tenant, env.initial_timeline)
wait_timeline_detail_404(client, env.initial_tenant, env.initial_timeline, 10, 1.0)
@pytest.mark.parametrize("restart_after", [True, False])
def test_ancestor_detach_reparents_earlier(neon_env_builder: NeonEnvBuilder, restart_after: bool):
"""
The case from RFC:
+-> another branch with same ancestor_lsn as new main
|
old main -------|---------X--------->
| | |
| | +-> after
| |
| +-> new main
|
+-> reparented
Ends up as:
old main --------------------------->
|
+-> after
+-> another branch with same ancestor_lsn as new main
|
new main -------|---------|->
|
+-> reparented
We confirm the end result by being able to delete "old main" after deleting "after".
"""
# TODO: support not yet implemented for these
write_to_branch_first = True
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(
[
".*initial size calculation failed: downloading failed, possibly for shutdown",
# after restart this is likely to happen if there is other load on the runner
".*failed to freeze and flush: cannot flush frozen layers when flush_loop is not running, state is Exited",
]
)
client = env.pageserver.http_client()
with env.endpoints.create_start("main", tenant_id=env.initial_tenant) as ep:
ep.safe_psql("CREATE TABLE foo (i BIGINT);")
ep.safe_psql("CREATE TABLE audit AS SELECT 1 as starts;")
branchpoint_pipe = wait_for_last_flush_lsn(
env, ep, env.initial_tenant, env.initial_timeline
)
ep.safe_psql("INSERT INTO foo SELECT i::bigint FROM generate_series(0, 8191) g(i);")
branchpoint_x = wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
client.timeline_checkpoint(env.initial_tenant, env.initial_timeline)
ep.safe_psql("INSERT INTO foo SELECT i::bigint FROM generate_series(8192, 16383) g(i);")
wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
# as this only gets reparented, we don't need to write to it like new main
reparented = env.neon_cli.create_branch(
"reparented", "main", env.initial_tenant, ancestor_start_lsn=branchpoint_pipe
)
same_branchpoint = env.neon_cli.create_branch(
"same_branchpoint", "main", env.initial_tenant, ancestor_start_lsn=branchpoint_x
)
timeline_id = env.neon_cli.create_branch(
"new main", "main", env.initial_tenant, ancestor_start_lsn=branchpoint_x
)
after = env.neon_cli.create_branch("after", "main", env.initial_tenant, ancestor_start_lsn=None)
if write_to_branch_first:
with env.endpoints.create_start("new main", tenant_id=env.initial_tenant) as ep:
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == 8192
with ep.cursor() as cur:
cur.execute("UPDATE audit SET starts = starts + 1")
assert cur.rowcount == 1
wait_for_last_flush_lsn(env, ep, env.initial_tenant, timeline_id)
client.timeline_checkpoint(env.initial_tenant, timeline_id)
all_reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
assert all_reparented == {reparented, same_branchpoint}
if restart_after:
env.pageserver.stop()
env.pageserver.start()
env.pageserver.quiesce_tenants()
# checking the ancestor after is much faster than waiting for the endpoint not start
expected_result = [
("main", env.initial_timeline, None, 16384, 1),
("after", after, env.initial_timeline, 16384, 1),
("new main", timeline_id, None, 8192, 2),
("same_branchpoint", same_branchpoint, timeline_id, 8192, 1),
("reparented", reparented, timeline_id, 0, 1),
]
for _, timeline_id, expected_ancestor, _, _ in expected_result:
details = client.timeline_detail(env.initial_tenant, timeline_id)
ancestor_timeline_id = details["ancestor_timeline_id"]
if expected_ancestor is None:
assert ancestor_timeline_id is None
else:
assert TimelineId(ancestor_timeline_id) == expected_ancestor
for name, _, _, rows, starts in expected_result:
with env.endpoints.create_start(name, tenant_id=env.initial_tenant) as ep:
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows
assert ep.safe_psql(f"SELECT count(*) FROM audit WHERE starts = {starts}")[0][0] == 1
# delete the timelines to confirm detach actually worked
client.timeline_delete(env.initial_tenant, after)
wait_timeline_detail_404(client, env.initial_tenant, after, 10, 1.0)
client.timeline_delete(env.initial_tenant, env.initial_timeline)
wait_timeline_detail_404(client, env.initial_tenant, env.initial_timeline, 10, 1.0)
@pytest.mark.parametrize("restart_after", [True, False])
def test_detached_receives_flushes_while_being_detached(
neon_env_builder: NeonEnvBuilder, restart_after: bool
):
"""
Makes sure that the timeline is able to receive writes through-out the detach process.
"""
write_to_branch_first = True
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
# row counts have been manually verified to cause reconnections and getpage
# requests when restart_after=False with pg16
def insert_rows(n: int, ep) -> int:
ep.safe_psql(
f"INSERT INTO foo SELECT i::bigint, 'more info!! this is a long string' || i FROM generate_series(0, {n - 1}) g(i);"
)
return n
with env.endpoints.create_start("main", tenant_id=env.initial_tenant) as ep:
ep.safe_psql("CREATE EXTENSION neon_test_utils;")
ep.safe_psql("CREATE TABLE foo (i BIGINT, aux TEXT NOT NULL);")
rows = insert_rows(256, ep)
branchpoint = wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
timeline_id = env.neon_cli.create_branch(
"new main", "main", tenant_id=env.initial_tenant, ancestor_start_lsn=branchpoint
)
log.info("starting the new main endpoint")
ep = env.endpoints.create_start("new main", tenant_id=env.initial_tenant)
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows
if write_to_branch_first:
rows += insert_rows(256, ep)
wait_for_last_flush_lsn(env, ep, env.initial_tenant, timeline_id)
client.timeline_checkpoint(env.initial_tenant, timeline_id)
log.info("completed {write_to_branch_first=}")
def small_txs(ep, queue: Queue[str], barrier):
extra_rows = 0
with ep.connect() as conn:
while True:
try:
queue.get_nowait()
break
except Empty:
pass
if barrier is not None:
barrier.wait()
barrier = None
cursor = conn.cursor()
cursor.execute(
"INSERT INTO foo(i, aux) VALUES (1, 'more info!! this is a long string' || 1);"
)
extra_rows += 1
return extra_rows
with ThreadPoolExecutor(max_workers=1) as exec:
queue: Queue[str] = Queue()
barrier = Barrier(2)
completion = exec.submit(small_txs, ep, queue, barrier)
barrier.wait()
reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
assert len(reparented) == 0
if restart_after:
# ep and row production is kept alive on purpose
env.pageserver.stop()
env.pageserver.start()
env.pageserver.quiesce_tenants()
queue.put("done")
extra_rows = completion.result()
assert extra_rows > 0, "some rows should had been written"
rows += extra_rows
assert client.timeline_detail(env.initial_tenant, timeline_id)["ancestor_timeline_id"] is None
assert ep.safe_psql("SELECT clear_buffer_cache();")
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows
assert ep.safe_psql("SELECT SUM(LENGTH(aux)) FROM foo")[0][0] != 0
ep.stop()
# finally restart the endpoint and make sure we still have the same answer
with env.endpoints.create_start("new main", tenant_id=env.initial_tenant) as ep:
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows
env.pageserver.allowed_errors.append(
"initial size calculation failed: downloading failed, possibly for shutdown"
)
# TODO:
# - after starting the operation, tenant is deleted
# - after starting the operation, pageserver is shutdown, restarted
# - after starting the operation, bottom-most timeline is deleted, pageserver is restarted, gc is inhibited
# - deletion of reparented while reparenting should fail once, then succeed (?)
# - branch near existing L1 boundary, image layers?
# - investigate: why are layers started at uneven lsn? not just after branching, but in general.