Compare commits

...

95 Commits

Author SHA1 Message Date
Joonas Koivunen
dcfd92b6db test: funroll-loop first iteration in test_retried_detach_ancestor_after_failed_reparenting 2024-07-26 14:39:32 +00:00
Joonas Koivunen
f58636ffdd test: refactor -- begin to -funroll-loops in test_retried_detach_ancestor_after_failed_reparenting 2024-07-26 14:39:32 +00:00
Joonas Koivunen
f3ac5bcbe1 test: ensure gc is unpaused with the earlier deletion test 2024-07-26 14:39:32 +00:00
Joonas Koivunen
eb3711b881 doc: why no cancel 2024-07-26 14:39:32 +00:00
Joonas Koivunen
c864166b32 test: make sure gc gets unblocked by late deletion 2024-07-26 14:39:32 +00:00
Joonas Koivunen
ce9b5ae7bf test: allow the 500 error crutch temporarily 2024-07-26 14:39:32 +00:00
Joonas Koivunen
cd2cbe0691 test: rename test_deletion_after_timeline_ancestor_detach_before_completion 2024-07-26 14:39:32 +00:00
Joonas Koivunen
7f241bd379 refactor: remove needless Error::from 2024-07-26 14:39:32 +00:00
Joonas Koivunen
ff52901028 refactor: still_ongoing assert is shared in all paths 2024-07-26 14:39:32 +00:00
Joonas Koivunen
bb377a3544 fixup: make sure detach_ancestor is blocking gc 2024-07-26 14:39:32 +00:00
Joonas Koivunen
5ece7af497 doc: remove confusing comment 2024-07-26 14:39:32 +00:00
Joonas Koivunen
2be3027fa5 doc: elaborate on weird query 2024-07-26 14:39:32 +00:00
Joonas Koivunen
14a0517c7f also assert still ongoing 2024-07-26 14:39:32 +00:00
Joonas Koivunen
dcff25c293 chore: adjust assert message 2024-07-26 14:39:32 +00:00
Joonas Koivunen
f80c37b733 chore: forgotten to update panic text with detach_and_reparent renaming 2024-07-26 14:39:32 +00:00
Joonas Koivunen
b9d0b26cea doc: remove possibly wrong comment 2024-07-26 14:39:32 +00:00
Joonas Koivunen
c2c28f211b doc: explain returning option 2024-07-26 14:39:32 +00:00
Joonas Koivunen
1ebcb1c45b doc: clean out FIXME
we cannot protect against willful misuse. I had been thinking of witness
of Attempt but ...
2024-07-26 14:39:32 +00:00
Joonas Koivunen
66d750ec20 info log on detach 2024-07-26 14:39:32 +00:00
Joonas Koivunen
ba3a6645e7 fix: info log line again, botched rebase? 2024-07-26 14:39:32 +00:00
Joonas Koivunen
8885a8c482 fixup: missed hashset change 2024-07-26 14:39:32 +00:00
Joonas Koivunen
c8880b69fb stop with the (ancestor_lsn, timeline_id) ordered reparented
I was thinking of the case where we have multiple reparented at the same
ancestor_lsn. But of course, that is not a problem if we compare the
reparented as a set...
2024-07-26 14:39:32 +00:00
Joonas Koivunen
274b2a611b test: handle the case where timeline cannot be found
at least do not double-panick.
2024-07-26 14:39:32 +00:00
Joonas Koivunen
a7153bf9b2 test: forgotten allowed errors 2024-07-26 14:39:32 +00:00
Joonas Koivunen
8a4236a441 test: remove needless s3 storage 2024-07-26 14:39:32 +00:00
Joonas Koivunen
7ec927e43b test: cleanup todos 2024-07-26 14:39:32 +00:00
Joonas Koivunen
22470ef444 test: comment 2024-07-26 14:39:32 +00:00
Joonas Koivunen
8248cbb45b test: ensure persisted gc blocking works across restart 2024-07-26 14:39:32 +00:00
Joonas Koivunen
4dd805b68a test: remove the extra deletion which was confusing
it had already been reparented, so it was not needed.
2024-07-26 14:39:32 +00:00
Joonas Koivunen
f582675452 test: refactor repetition 2024-07-26 14:39:32 +00:00
Joonas Koivunen
48069f68bb chore: forgotten pyfmt 2024-07-26 14:39:32 +00:00
Joonas Koivunen
8f52139913 additional assert in completion 2024-07-26 14:39:32 +00:00
Joonas Koivunen
fc4d80bbf2 elaborate on TODO for which a test is later added 2024-07-26 14:39:32 +00:00
Joonas Koivunen
dc83a5a978 fixup dae8c75c04 test: cannot be parametrized over return or exit 2024-07-26 14:39:32 +00:00
Joonas Koivunen
f4fb08d869 stop masking the topmost error in http handler 2024-07-26 14:39:31 +00:00
Joonas Koivunen
75b326faf4 test: complete fixmes 2024-07-26 14:39:31 +00:00
Joonas Koivunen
c23cd5c149 ongoing_detach_ancestor => gc_blocking in index_part 2024-07-26 14:39:31 +00:00
Joonas Koivunen
f4cd9fe40b refactor: misc after attempt to add lock_in_reparentable 2024-07-26 14:39:31 +00:00
Joonas Koivunen
43af9484c0 doc: schedule_reparenting_and_wait 2024-07-26 14:39:31 +00:00
Joonas Koivunen
842bd4c2db refactor: reparentable_timelines query out 2024-07-26 14:39:31 +00:00
Joonas Koivunen
ada9a46dca remove done fixme, minor reformattting 2024-07-26 14:39:31 +00:00
Joonas Koivunen
742fcac7b9 refactor: use partialeq more 2024-07-26 14:39:31 +00:00
Joonas Koivunen
55aeeb5765 allow deleting timeline unblock gc 2024-07-26 14:39:31 +00:00
Joonas Koivunen
89426570d3 relax overly strict comparisons 2024-07-26 14:39:31 +00:00
Joonas Koivunen
7f767ca18e fix: must_restart condition 2024-07-26 14:39:31 +00:00
Joonas Koivunen
1348dbf0f1 doc: comment cleanup 2024-07-26 14:39:31 +00:00
Joonas Koivunen
a179283f86 always notify gc_waiting when writing over the witness tracking 2024-07-26 14:39:31 +00:00
Joonas Koivunen
deb86c1ea1 remodel the return type 2024-07-26 14:39:31 +00:00
Joonas Koivunen
dfdf40916f rename complete_detaching_from_ancestor
it hasn't meant completing in a while now :)
2024-07-26 14:39:31 +00:00
Joonas Koivunen
c6d8015fe9 chore: clippy needless into_iter 2024-07-26 14:39:31 +00:00
Joonas Koivunen
b2233d557b test: complicate to include added paths 2024-07-26 14:39:31 +00:00
Joonas Koivunen
ce2552ba67 minor comment update for FIXME about 503 2024-07-26 14:39:31 +00:00
Joonas Koivunen
f4d773bb89 refactor: unify t::s::Semaphore 2024-07-26 14:39:31 +00:00
Joonas Koivunen
6f28263428 refactor: failpoint all but one 2024-07-26 14:39:31 +00:00
Joonas Koivunen
1e380ea5af refactor: Ancestor::Delete is not needed 2024-07-26 14:39:31 +00:00
Joonas Koivunen
8258385301 remove indentation level with exhaustive match 2024-07-26 14:39:31 +00:00
Joonas Koivunen
6a8f00dea0 fix: return reparented_direct_children in case we reparent nothing new 2024-07-26 14:39:31 +00:00
Joonas Koivunen
44cdb9fb58 refactor: reparented_direct_children query 2024-07-26 14:39:31 +00:00
Joonas Koivunen
cdfaf0700f fix: bifurcate the detach+reparent step 2024-07-26 14:39:31 +00:00
Joonas Koivunen
881e1ad056 refactor: no need to collect reparentable here 2024-07-26 14:39:31 +00:00
Joonas Koivunen
bb3d70e24d fix: properly cancel if any reparenting failed 2024-07-26 14:39:31 +00:00
Joonas Koivunen
c6c560e4c8 rewrite to include testing assertion 2024-07-26 14:39:31 +00:00
Joonas Koivunen
8dd332aed5 doc: remove unnecessary comment 2024-07-26 14:39:31 +00:00
Joonas Koivunen
5c03a17eb8 wip: some progress
now we hit the todo! in "already detached" path.
2024-07-26 14:39:31 +00:00
Joonas Koivunen
402d66778e make reparenting operations idempotent 2024-07-26 14:39:31 +00:00
Joonas Koivunen
39e2bc932f prepare to reparent while gc blocked 2024-07-26 14:39:31 +00:00
Joonas Koivunen
5fc034fa7f feat: block gc persistently until detach ancestor completes 2024-07-26 14:39:31 +00:00
Joonas Koivunen
f9b12def0b add support for WaitToActivate errors 2024-07-26 14:39:31 +00:00
Joonas Koivunen
5d0071447c partial: index_part.json support for ongoing_detach_ancestor 2024-07-26 14:39:31 +00:00
Joonas Koivunen
d9eba3f8c3 ==== PR cut here? 2024-07-26 14:39:31 +00:00
Joonas Koivunen
409e2eff9e fix: run upload_rewritten_layer in a span
there was a weird failure observed with CI tests: https://neon-github-public-dev.s3.amazonaws.com/reports/pr-8430/10108870590/index.html#suites/a1c2be32556270764423c495fad75d47/94a4686382b96297
2024-07-26 14:39:31 +00:00
Joonas Koivunen
e6e3b9a716 doc: remove on_gc_task_start fixme 2024-07-26 08:52:55 +00:00
Joonas Koivunen
7f31a3f671 forgotten rename, maybe 2024-07-26 08:52:55 +00:00
Joonas Koivunen
9971ae3d24 rename is_detached_from_{original_,}ancestor (just the rename) 2024-07-26 08:52:55 +00:00
Joonas Koivunen
48a2a20de3 chore: derive default 2024-07-26 08:52:55 +00:00
Joonas Koivunen
29ef8f15ce chore: unused variable 2024-07-26 08:52:55 +00:00
Joonas Koivunen
5e45dd3f86 rename SharedState::notify
to continue_existing_attempt
2024-07-26 08:52:55 +00:00
Joonas Koivunen
5fced442d7 warning caused by removed body 2024-07-26 08:52:55 +00:00
Joonas Koivunen
4222610233 cleanup index part dependent 2024-07-26 08:52:55 +00:00
Joonas Koivunen
92deb0dfd7 plumbing: collect timelines index parts 2024-07-26 08:52:55 +00:00
Joonas Koivunen
46ca6f17c5 plumbing: notify shared state of existing attempt 2024-07-26 08:52:55 +00:00
Joonas Koivunen
14869abb77 complete the plumbing with non-notifying attempt_blocks_gc impl 2024-07-26 08:52:55 +00:00
Joonas Koivunen
5330fd9366 doc(fixme): shared state 2024-07-26 08:52:55 +00:00
Joonas Koivunen
6c5b3b7812 doc: more sketched api comments 2024-07-26 08:52:55 +00:00
Joonas Koivunen
849fe0f191 plumb the shared state through
the api for the gc pausing is quite awkward.
2024-07-26 08:52:55 +00:00
Joonas Koivunen
f564b66f21 shared state sketch 2024-07-26 08:52:55 +00:00
Joonas Koivunen
2e58ccee78 temp: planning 2024-07-26 08:52:55 +00:00
Joonas Koivunen
f398ab0264 completion: Debug and query for barrier connection 2024-07-26 08:52:55 +00:00
Joonas Koivunen
f23ee2ccdb doc(test): be more accurate 2024-07-26 08:52:55 +00:00
Joonas Koivunen
0ad31bb7fb doc: remove obsolete FIXME
this was cleared with partial metadata updates.
2024-07-26 08:52:55 +00:00
Joonas Koivunen
86f26d0918 chore: minor rename FIXME in IndexPart 2024-07-26 08:52:55 +00:00
Joonas Koivunen
4a562dff2e doc: more 2024-07-26 08:52:55 +00:00
Joonas Koivunen
f9185b42a9 doc: minor enhancements 2024-07-26 08:52:55 +00:00
Joonas Koivunen
d4f30daa81 chore: minor indentation problem 2024-07-26 08:52:55 +00:00
Joonas Koivunen
97ab53e826 chore: add std::fmt::Debug for Barrier 2024-07-26 08:52:55 +00:00
15 changed files with 1576 additions and 305 deletions

View File

@@ -1,6 +1,8 @@
use std::collections::HashSet;
use utils::id::TimelineId;
#[derive(Debug, Default, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct AncestorDetached {
pub reparented_timelines: Vec<TimelineId>,
pub reparented_timelines: HashSet<TimelineId>,
}

View File

@@ -8,10 +8,33 @@ pub struct Completion {
_token: TaskTrackerToken,
}
impl std::fmt::Debug for Completion {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Completion")
.field("siblings", &self._token.task_tracker().len())
.finish()
}
}
impl Completion {
/// Returns true if this completion is associated with the given barrier.
pub fn blocks(&self, barrier: &Barrier) -> bool {
TaskTracker::ptr_eq(self._token.task_tracker(), &barrier.0)
}
}
/// Barrier will wait until all clones of [`Completion`] have been dropped.
#[derive(Clone)]
pub struct Barrier(TaskTracker);
impl std::fmt::Debug for Barrier {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Barrier")
.field("remaining", &self.0.len())
.finish()
}
}
impl Default for Barrier {
fn default() -> Self {
let (_, rx) = channel();

View File

@@ -1811,7 +1811,7 @@ async fn timeline_detach_ancestor_handler(
// drop(tenant);
let resp = match progress {
detach_ancestor::Progress::Prepared(_guard, prepared) => {
detach_ancestor::Progress::Prepared(attempt, prepared) => {
// it would be great to tag the guard on to the tenant activation future
let reparented_timelines = state
.tenant_manager
@@ -1819,10 +1819,10 @@ async fn timeline_detach_ancestor_handler(
tenant_shard_id,
timeline_id,
prepared,
attempt,
ctx,
)
.await
.context("timeline detach ancestor completion")
.map_err(ApiError::InternalServerError)?;
AncestorDetached {

View File

@@ -35,6 +35,7 @@ use std::collections::BTreeMap;
use std::fmt;
use std::time::SystemTime;
use storage_broker::BrokerClientChannel;
use timeline::detach_ancestor;
use tokio::io::BufReader;
use tokio::sync::watch;
use tokio::task::JoinSet;
@@ -300,8 +301,11 @@ pub struct Tenant {
pub(crate) timeline_get_throttle:
Arc<throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>>,
/// An ongoing timeline detach must be checked during attempts to GC or compact a timeline.
ongoing_timeline_detach: std::sync::Mutex<Option<(TimelineId, utils::completion::Barrier)>>,
/// An ongoing timeline detach must be checked during attempts to GC a timeline.
///
/// After starting the timeline detach ancestor, blocking GC until it completes allows retrying
/// the ancestor detach, until we can be certain that all reparentings have been done.
ongoing_timeline_detach: timeline::detach_ancestor::SharedState,
l0_flush_global_state: L0FlushGlobalState,
}
@@ -675,6 +679,7 @@ impl Tenant {
shard_identity: ShardIdentity,
init_order: Option<InitializationOrder>,
mode: SpawnMode,
existing_detach_attempt: Option<&detach_ancestor::Attempt>,
ctx: &RequestContext,
) -> Arc<Tenant> {
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
@@ -704,6 +709,12 @@ impl Tenant {
l0_flush_global_state,
));
if let Some(attempt) = existing_detach_attempt {
tenant
.ongoing_timeline_detach
.continue_existing_attempt(attempt);
}
// The attach task will carry a GateGuard, so that shutdown() reliably waits for it to drop out if
// we shut down while attaching.
let attach_gate_guard = tenant
@@ -756,9 +767,9 @@ impl Tenant {
// The Stopping case is for when we have passed control on to DeleteTenantFlow:
// if it errors, we will call make_broken when tenant is already in Stopping.
assert!(
matches!(*state, TenantState::Attaching | TenantState::Stopping { .. }),
"the attach task owns the tenant state until activation is complete"
);
matches!(*state, TenantState::Attaching | TenantState::Stopping { .. }),
"the attach task owns the tenant state until activation is complete"
);
*state = TenantState::broken_from_reason(err.to_string());
});
@@ -983,6 +994,8 @@ impl Tenant {
}
}
let mut shared_state_builder = timeline::detach_ancestor::SharedStateBuilder::default();
// For every timeline, download the metadata file, scan the local directory,
// and build a layer map that contains an entry for each remote and local
// layer file.
@@ -992,6 +1005,8 @@ impl Tenant {
.remove(&timeline_id)
.expect("just put it in above");
shared_state_builder.record_loading_timeline(&timeline_id, &index_part);
// TODO again handle early failure
self.load_remote_timeline(
timeline_id,
@@ -1036,6 +1051,8 @@ impl Tenant {
// IndexPart is the source of truth.
self.clean_up_timelines(&existent_timelines)?;
shared_state_builder.build(&self.ongoing_timeline_detach);
fail::fail_point!("attach-before-activate", |_| {
anyhow::bail!("attach-before-activate");
});
@@ -1608,6 +1625,11 @@ impl Tenant {
}
}
if self.ongoing_timeline_detach.attempt_blocks_gc() {
info!("Skipping GC while there is an ongoing detach_ancestor attempt");
return Ok(GcResult::default());
}
self.gc_iteration_internal(target_timeline_id, horizon, pitr, cancel, ctx)
.await
}
@@ -2615,7 +2637,7 @@ impl Tenant {
&crate::metrics::tenant_throttling::TIMELINE_GET,
)),
tenant_conf: Arc::new(ArcSwap::from_pointee(attached_conf)),
ongoing_timeline_detach: std::sync::Mutex::default(),
ongoing_timeline_detach: Default::default(),
l0_flush_global_state,
}
}

View File

@@ -285,21 +285,30 @@ impl TimelineMetadata {
}
/// When reparenting, the `ancestor_lsn` does not change.
pub fn reparent(&mut self, timeline: &TimelineId) {
///
/// Returns true if anything was changed.
pub fn reparent(&mut self, timeline: &TimelineId) -> bool {
assert!(self.body.ancestor_timeline.is_some());
// no assertion for redoing this: it's fine, we may have to repeat this multiple times over
let prev = self.body.ancestor_timeline;
self.body.ancestor_timeline = Some(*timeline);
prev.as_ref() != Some(timeline)
}
pub fn detach_from_ancestor(&mut self, branchpoint: &(TimelineId, Lsn)) {
/// Returns true if anything was changed
pub fn detach_from_ancestor(&mut self, branchpoint: &(TimelineId, Lsn)) -> bool {
let mut changed = false;
if let Some(ancestor) = self.body.ancestor_timeline {
assert_eq!(ancestor, branchpoint.0);
changed = true;
}
if self.body.ancestor_lsn != Lsn(0) {
assert_eq!(self.body.ancestor_lsn, branchpoint.1);
changed = true;
}
self.body.ancestor_timeline = None;
self.body.ancestor_lsn = Lsn(0);
changed
}
pub fn latest_gc_cutoff_lsn(&self) -> Lsn {

View File

@@ -13,7 +13,7 @@ use pageserver_api::upcall_api::ReAttachResponseTenant;
use rand::{distributions::Alphanumeric, Rng};
use std::borrow::Cow;
use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
@@ -54,7 +54,7 @@ use utils::id::{TenantId, TimelineId};
use super::remote_timeline_client::remote_tenant_path;
use super::secondary::SecondaryTenant;
use super::timeline::detach_ancestor::PreparedTimelineDetach;
use super::timeline::detach_ancestor::{self, PreparedTimelineDetach};
use super::TenantSharedResources;
/// For a tenant that appears in TenantsMap, it may either be
@@ -676,6 +676,7 @@ pub async fn init_tenant_mgr(
shard_identity,
Some(init_order.clone()),
SpawnMode::Lazy,
None,
&ctx,
)),
LocationMode::Secondary(secondary_conf) => {
@@ -724,6 +725,7 @@ fn tenant_spawn(
shard_identity: ShardIdentity,
init_order: Option<InitializationOrder>,
mode: SpawnMode,
existing_detach_attempt: Option<&detach_ancestor::Attempt>,
ctx: &RequestContext,
) -> Arc<Tenant> {
// All these conditions should have been satisfied by our caller: the tenant dir exists, is a well formed
@@ -744,6 +746,7 @@ fn tenant_spawn(
shard_identity,
init_order,
mode,
existing_detach_attempt,
ctx,
)
}
@@ -1191,6 +1194,7 @@ impl TenantManager {
shard_identity,
None,
spawn_mode,
None,
ctx,
);
@@ -1312,6 +1316,7 @@ impl TenantManager {
shard_identity,
None,
SpawnMode::Eager,
None,
ctx,
);
@@ -1968,8 +1973,10 @@ impl TenantManager {
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
prepared: PreparedTimelineDetach,
mut attempt: detach_ancestor::Attempt,
ctx: &RequestContext,
) -> Result<Vec<TimelineId>, anyhow::Error> {
) -> Result<HashSet<TimelineId>, anyhow::Error> {
// FIXME: this is unnecessary, slotguard already has these semantics
struct RevertOnDropSlot(Option<SlotGuard>);
impl Drop for RevertOnDropSlot {
@@ -2017,43 +2024,66 @@ impl TenantManager {
let timeline = tenant.get_timeline(timeline_id, true)?;
let reparented = timeline
.complete_detaching_timeline_ancestor(&tenant, prepared, ctx)
let resp = timeline
.detach_from_ancestor_and_reparent(&tenant, prepared, ctx)
.await?;
let mut slot_guard = slot_guard.into_inner();
let (_guard, progress) = utils::completion::channel();
match tenant.shutdown(progress, ShutdownMode::Hard).await {
Ok(()) => {
slot_guard.drop_old_value()?;
}
Err(_barrier) => {
slot_guard.revert();
// this really should not happen, at all, unless shutdown was already going?
anyhow::bail!("Cannot restart Tenant, already shutting down");
let tenant = if resp.reset_tenant_required() {
attempt.before_shutdown();
let (_guard, progress) = utils::completion::channel();
match tenant.shutdown(progress, ShutdownMode::Hard).await {
Ok(()) => {
slot_guard.drop_old_value()?;
}
Err(_barrier) => {
slot_guard.revert();
// this really should not happen, at all, unless shutdown was already going?
anyhow::bail!("Cannot restart Tenant, already shutting down");
}
}
let tenant_path = self.conf.tenant_path(&tenant_shard_id);
let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
let shard_identity = config.shard;
let tenant = tenant_spawn(
self.conf,
tenant_shard_id,
&tenant_path,
self.resources.clone(),
AttachedTenantConf::try_from(config)?,
shard_identity,
None,
SpawnMode::Eager,
Some(&attempt),
ctx,
);
slot_guard.upsert(TenantSlot::Attached(tenant.clone()))?;
tenant
} else {
tracing::info!("skipping tenant_reset as no changes made required it");
tenant
};
if let Some(reparented) = resp.completed() {
// finally ask the restarted tenant to complete the detach
tenant
.ongoing_timeline_detach
.complete(attempt, &tenant)
.await?;
Ok(reparented)
} else {
// at least the latest versions have now been downloaded and refreshed; be ready to
// retry another time.
tenant.ongoing_timeline_detach.cancel(attempt);
Err(anyhow::anyhow!(
"failed to reparent all candidate timelines, please retry"
))
}
let tenant_path = self.conf.tenant_path(&tenant_shard_id);
let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
let shard_identity = config.shard;
let tenant = tenant_spawn(
self.conf,
tenant_shard_id,
&tenant_path,
self.resources.clone(),
AttachedTenantConf::try_from(config)?,
shard_identity,
None,
SpawnMode::Eager,
ctx,
);
slot_guard.upsert(TenantSlot::Attached(tenant))?;
Ok(reparented)
}
/// A page service client sends a TenantId, and to look up the correct Tenant we must

View File

@@ -683,12 +683,13 @@ impl RemoteTimelineClient {
Ok(())
}
/// Reparent this timeline to a new parent.
///
/// A retryable step of timeline ancestor detach.
pub(crate) async fn schedule_reparenting_and_wait(
self: &Arc<Self>,
new_parent: &TimelineId,
) -> anyhow::Result<()> {
// FIXME: because of how Timeline::schedule_uploads works when called from layer flushing
// and reads the in-memory part we cannot do the detaching like this
let receiver = {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
@@ -699,17 +700,29 @@ impl RemoteTimelineClient {
));
};
upload_queue.dirty.metadata.reparent(new_parent);
upload_queue.dirty.lineage.record_previous_ancestor(&prev);
let uploaded = &upload_queue.clean.0.metadata;
self.schedule_index_upload(upload_queue)?;
if uploaded.ancestor_timeline().is_none() && !uploaded.ancestor_lsn().is_valid() {
// nothing to do
None
} else {
let mut modified = false;
self.schedule_barrier0(upload_queue)
modified |= upload_queue.dirty.metadata.reparent(new_parent);
modified |= upload_queue.dirty.lineage.record_previous_ancestor(&prev);
if modified {
self.schedule_index_upload(upload_queue)?;
}
Some(self.schedule_barrier0(upload_queue))
}
};
Self::wait_completion0(receiver)
.await
.context("wait completion")
if let Some(receiver) = receiver {
Self::wait_completion0(receiver).await?;
}
Ok(())
}
/// Schedules uploading a new version of `index_part.json` with the given layers added,
@@ -725,26 +738,121 @@ impl RemoteTimelineClient {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
upload_queue.dirty.metadata.detach_from_ancestor(&adopted);
upload_queue.dirty.lineage.record_detaching(&adopted);
if upload_queue.clean.0.lineage.detached_previous_ancestor() == Some(adopted) {
None
} else {
let mut modified = false;
modified |= upload_queue.dirty.metadata.detach_from_ancestor(&adopted);
modified |= upload_queue.dirty.lineage.record_detaching(&adopted);
for layer in layers {
upload_queue
.dirty
.layer_metadata
.insert(layer.layer_desc().layer_name(), layer.metadata());
for layer in layers {
let prev = upload_queue
.dirty
.layer_metadata
.insert(layer.layer_desc().layer_name(), layer.metadata());
modified |= prev.is_none();
}
if modified {
self.schedule_index_upload(upload_queue)?;
}
Some(self.schedule_barrier0(upload_queue))
}
self.schedule_index_upload(upload_queue)?;
let barrier = self.schedule_barrier0(upload_queue);
self.launch_queued_tasks(upload_queue);
barrier
};
Self::wait_completion0(barrier)
.await
.context("wait completion")
if let Some(barrier) = barrier {
Self::wait_completion0(barrier).await?;
}
Ok(())
}
/// Marks timeline detach ancestor started for this timeline if it has not been marked as
/// started.
///
/// A retryable step o ftimeline detach ancestor.
///
/// Does not overwrite or even error if the set of reparentable timelines differes. Those can
/// be inspected later.
///
/// Waits until the completion of the upload.
pub(crate) async fn schedule_started_detach_ancestor_mark_and_wait(
self: &Arc<Self>,
) -> anyhow::Result<()> {
let maybe_barrier = {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
fn wanted(x: Option<&index::GcBlocking>) -> bool {
x.is_some_and(|b| b.blocked_by_detach_ancestor())
}
let current = upload_queue.dirty.gc_blocking.as_ref();
let uploaded = upload_queue.clean.0.gc_blocking.as_ref();
match (current, uploaded) {
(x, y) if wanted(x) && wanted(y) => None,
(x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)),
_ => {
// at this point, the metadata must always show that there is a parent
if upload_queue.dirty.metadata.ancestor_timeline().is_none() {
panic!("cannot start detach ancestor if there is nothing to detach from");
}
upload_queue.dirty.gc_blocking = current
.map(|x| x.with_detach_ancestor())
.or_else(|| Some(index::GcBlocking::started_now_for_detach_ancestor()));
self.schedule_index_upload(upload_queue)?;
Some(self.schedule_barrier0(upload_queue))
}
}
};
if let Some(barrier) = maybe_barrier {
Self::wait_completion0(barrier).await?;
}
Ok(())
}
/// Marks timeline detach ancestor completed for this timeline if it has not been marked as
/// such already.
///
/// ## Panics
///
/// If the timeline has not been detached from ancestor already.
pub(crate) async fn schedule_completed_detach_ancestor_mark_and_wait(
self: &Arc<Self>,
) -> anyhow::Result<()> {
let maybe_barrier = {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
assert!(upload_queue.clean.0.lineage.is_detached_from_ancestor());
fn wanted(x: Option<&index::GcBlocking>) -> bool {
x.is_none() || x.is_some_and(|b| !b.blocked_by_detach_ancestor())
}
let current = upload_queue.dirty.gc_blocking.as_ref();
let uploaded = upload_queue.clean.0.gc_blocking.as_ref();
match (current, uploaded) {
(x, y) if wanted(x) && wanted(y) => None,
(x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)),
_ => {
upload_queue.dirty.gc_blocking = current
.expect("has to be Some because of wanted()")
.without_detach_ancestor();
assert!(wanted(upload_queue.dirty.gc_blocking.as_ref()));
self.schedule_index_upload(upload_queue)?;
Some(self.schedule_barrier0(upload_queue))
}
}
};
if let Some(barrier) = maybe_barrier {
Self::wait_completion0(barrier).await?;
}
Ok(())
}
/// Launch an upload operation in the background; the file is added to be included in next

View File

@@ -56,6 +56,9 @@ pub struct IndexPart {
#[serde(default)]
pub(crate) lineage: Lineage,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub(crate) gc_blocking: Option<GcBlocking>,
/// Describes the kind of aux files stored in the timeline.
///
/// The value is modified during file ingestion when the latest wanted value communicated via tenant config is applied if it is acceptable.
@@ -80,10 +83,11 @@ impl IndexPart {
/// - 5: lineage was added
/// - 6: last_aux_file_policy is added.
/// - 7: metadata_bytes is no longer written, but still read
const LATEST_VERSION: usize = 7;
/// - 8: +gc_blocking
const LATEST_VERSION: usize = 8;
// Versions we may see when reading from a bucket.
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7];
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8];
pub const FILE_NAME: &'static str = "index_part.json";
@@ -95,6 +99,7 @@ impl IndexPart {
metadata,
deleted_at: None,
lineage: Default::default(),
gc_blocking: None,
last_aux_file_policy: None,
}
}
@@ -205,26 +210,45 @@ fn is_false(b: &bool) -> bool {
impl Lineage {
const REMEMBER_AT_MOST: usize = 100;
pub(crate) fn record_previous_ancestor(&mut self, old_ancestor: &TimelineId) {
pub(crate) fn record_previous_ancestor(&mut self, old_ancestor: &TimelineId) -> bool {
if self.reparenting_history.last() == Some(old_ancestor) {
// do not re-record it
return;
}
false
} else {
#[cfg(feature = "testing")]
{
let existing = self
.reparenting_history
.iter()
.position(|x| x == old_ancestor);
assert_eq!(
existing, None,
"we cannot reparent onto and off and onto the same timeline twice"
);
}
let drop_oldest = self.reparenting_history.len() + 1 >= Self::REMEMBER_AT_MOST;
let drop_oldest = self.reparenting_history.len() + 1 >= Self::REMEMBER_AT_MOST;
self.reparenting_history_truncated |= drop_oldest;
if drop_oldest {
self.reparenting_history.remove(0);
self.reparenting_history_truncated |= drop_oldest;
if drop_oldest {
self.reparenting_history.remove(0);
}
self.reparenting_history.push(*old_ancestor);
true
}
self.reparenting_history.push(*old_ancestor);
}
pub(crate) fn record_detaching(&mut self, branchpoint: &(TimelineId, Lsn)) {
assert!(self.original_ancestor.is_none());
self.original_ancestor =
Some((branchpoint.0, branchpoint.1, chrono::Utc::now().naive_utc()));
/// Returns true if anything changed.
pub(crate) fn record_detaching(&mut self, branchpoint: &(TimelineId, Lsn)) -> bool {
if let Some((id, lsn, _)) = self.original_ancestor {
assert_eq!(id, branchpoint.0);
assert_eq!(lsn, branchpoint.1);
false
} else {
assert!(self.original_ancestor.is_none());
self.original_ancestor =
Some((branchpoint.0, branchpoint.1, chrono::Utc::now().naive_utc()));
true
}
}
/// The queried lsn is most likely the basebackup lsn, and this answers question "is it allowed
@@ -236,15 +260,53 @@ impl Lineage {
.is_some_and(|(_, ancestor_lsn, _)| ancestor_lsn == lsn)
}
pub(crate) fn is_detached_from_original_ancestor(&self) -> bool {
/// Returns true if the timeline originally had an ancestor, and no longer has one.
pub(crate) fn is_detached_from_ancestor(&self) -> bool {
self.original_ancestor.is_some()
}
/// Returns original ancestor timeline id and lsn that this timeline has been detached from.
pub(crate) fn detached_previous_ancestor(&self) -> Option<(TimelineId, Lsn)> {
self.original_ancestor.map(|(id, lsn, _)| (id, lsn))
}
pub(crate) fn is_reparented(&self) -> bool {
!self.reparenting_history.is_empty()
}
}
/// Right now, the only reason to block gc persistently is detach_ancestor. To use gc blocking more
/// broadly, a reason set field needs to be added, and the shared state load time building be
/// complicated to avoid detach_ancestor clearing out a manually configured gc blocking.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct GcBlocking {
pub(crate) started_at: NaiveDateTime,
}
impl GcBlocking {
pub(super) fn started_now_for_detach_ancestor() -> Self {
GcBlocking {
started_at: chrono::Utc::now().naive_utc(),
}
}
/// Returns true if detach_ancestor is one of the reasons why the gc is blocked.
pub(crate) fn blocked_by_detach_ancestor(&self) -> bool {
true
}
/// Returns a version of self with the reason of detach_ancestor.
pub(super) fn with_detach_ancestor(&self) -> Self {
self.clone()
}
/// Returns a version of self without the reason of detach_ancestor. Assumption is that if
/// there are no more reasons, we can unblock the gc.
pub(super) fn without_detach_ancestor(&self) -> Option<Self> {
None
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -285,6 +347,7 @@ mod tests {
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
deleted_at: None,
lineage: Lineage::default(),
gc_blocking: None,
last_aux_file_policy: None,
};
@@ -327,6 +390,7 @@ mod tests {
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
deleted_at: None,
lineage: Lineage::default(),
gc_blocking: None,
last_aux_file_policy: None,
};
@@ -370,6 +434,7 @@ mod tests {
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
lineage: Lineage::default(),
gc_blocking: None,
last_aux_file_policy: None,
};
@@ -416,6 +481,7 @@ mod tests {
.unwrap(),
deleted_at: None,
lineage: Lineage::default(),
gc_blocking: None,
last_aux_file_policy: None,
};
@@ -457,6 +523,7 @@ mod tests {
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
lineage: Lineage::default(),
gc_blocking: None,
last_aux_file_policy: None,
};
@@ -501,6 +568,7 @@ mod tests {
reparenting_history: vec![TimelineId::from_str("e1bfd8c633d713d279e6fcd2bcc15b6d").unwrap()],
original_ancestor: Some((TimelineId::from_str("e2bfd8c633d713d279e6fcd2bcc15b6d").unwrap(), Lsn::from_str("0/15A7618").unwrap(), parse_naive_datetime("2024-05-07T18:52:36.322426563"))),
},
gc_blocking: None,
last_aux_file_policy: None,
};
@@ -550,6 +618,7 @@ mod tests {
reparenting_history: vec![TimelineId::from_str("e1bfd8c633d713d279e6fcd2bcc15b6d").unwrap()],
original_ancestor: Some((TimelineId::from_str("e2bfd8c633d713d279e6fcd2bcc15b6d").unwrap(), Lsn::from_str("0/15A7618").unwrap(), parse_naive_datetime("2024-05-07T18:52:36.322426563"))),
},
gc_blocking: None,
last_aux_file_policy: Some(AuxFilePolicy::V2),
};
@@ -604,6 +673,66 @@ mod tests {
).with_recalculated_checksum().unwrap(),
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
lineage: Default::default(),
gc_blocking: None,
last_aux_file_policy: Default::default(),
};
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}
#[test]
fn v8_indexpart_is_parsed() {
let example = r#"{
"version": 8,
"layer_metadata":{
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 },
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001 }
},
"disk_consistent_lsn":"0/16960E8",
"metadata": {
"disk_consistent_lsn": "0/16960E8",
"prev_record_lsn": "0/1696070",
"ancestor_timeline": "e45a7f37d3ee2ff17dc14bf4f4e3f52e",
"ancestor_lsn": "0/0",
"latest_gc_cutoff_lsn": "0/1696070",
"initdb_lsn": "0/1696070",
"pg_version": 14
},
"gc_blocking": {
"started_at": "2024-07-19T09:00:00.123"
}
}"#;
let expected = IndexPart {
version: 8,
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
metadata: TimelineMetadata::new(
Lsn::from_str("0/16960E8").unwrap(),
Some(Lsn::from_str("0/1696070").unwrap()),
Some(TimelineId::from_str("e45a7f37d3ee2ff17dc14bf4f4e3f52e").unwrap()),
Lsn::INVALID,
Lsn::from_str("0/1696070").unwrap(),
Lsn::from_str("0/1696070").unwrap(),
14,
).with_recalculated_checksum().unwrap(),
deleted_at: None,
lineage: Default::default(),
gc_blocking: Some(GcBlocking {
started_at: parse_naive_datetime("2024-07-19T09:00:00.123000000"),
}),
last_aux_file_policy: Default::default(),
};

View File

@@ -129,9 +129,11 @@ pub fn start_background_loops(
let background_jobs_can_start = background_jobs_can_start.cloned();
async move {
let cancel = task_mgr::shutdown_token();
let can_start = completion::Barrier::maybe_wait(background_jobs_can_start);
let can_start = tenant.ongoing_timeline_detach.gc_sleeping_while(can_start);
tokio::select! {
_ = cancel.cancelled() => { return Ok(()) },
_ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
_ = can_start => {}
};
gc_loop(tenant, cancel)
.instrument(info_span!("gc_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
@@ -361,14 +363,13 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
if first {
first = false;
if delay_by_lease_length(tenant.get_lsn_lease_length(), &cancel)
.await
.is_err()
{
break;
}
let delays = async {
delay_by_lease_length(tenant.get_lsn_lease_length(), &cancel).await?;
random_init_delay(period, &cancel).await?;
Ok::<_, Cancelled>(())
};
if random_init_delay(period, &cancel).await.is_err() {
if tenant.ongoing_timeline_detach.gc_sleeping_while(delays).await.is_err() {
break;
}
}
@@ -404,8 +405,8 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
let wait_duration = Duration::from_secs_f64(wait_duration);
error!(
"Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
);
"Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
);
wait_duration
}
}
@@ -414,7 +415,9 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Gc);
// Sleep
if tokio::time::timeout(sleep_duration, cancel.cancelled())
let cancelled = cancel.cancelled();
let cancelled = tenant.ongoing_timeline_detach.gc_sleeping_while(cancelled);
if tokio::time::timeout(sleep_duration, cancelled)
.await
.is_ok()
{

View File

@@ -3707,10 +3707,6 @@ impl Timeline {
Ok(ancestor.clone())
}
pub(crate) fn get_ancestor_timeline(&self) -> Option<Arc<Timeline>> {
self.ancestor_timeline.clone()
}
pub(crate) fn get_shard_identity(&self) -> &ShardIdentity {
&self.shard_identity
}
@@ -4750,18 +4746,21 @@ impl Timeline {
detach_ancestor::prepare(self, tenant, options, ctx).await
}
/// Completes the ancestor detach. This method is to be called while holding the
/// TenantManager's tenant slot, so during this method we cannot be deleted nor can any
/// timeline be deleted. After this method returns successfully, tenant must be reloaded.
/// Second step of detach from ancestor; detaches the `self` from it's current ancestor and
/// reparents any reparentable children of previous ancestor.
///
/// Pageserver receiving a SIGKILL during this operation is not supported (yet).
pub(crate) async fn complete_detaching_timeline_ancestor(
/// This method is to be called while holding the TenantManager's tenant slot, so during this
/// method we cannot be deleted nor can any timeline be deleted. After this method returns
/// successfully, tenant must be reloaded.
///
/// Final step will be to complete after optionally resetting the tenant.
pub(crate) async fn detach_from_ancestor_and_reparent(
self: &Arc<Timeline>,
tenant: &crate::tenant::Tenant,
prepared: detach_ancestor::PreparedTimelineDetach,
ctx: &RequestContext,
) -> Result<Vec<TimelineId>, anyhow::Error> {
detach_ancestor::complete(self, tenant, prepared, ctx).await
) -> Result<detach_ancestor::DetachingAndReparenting, anyhow::Error> {
detach_ancestor::detach_and_reparent(self, tenant, prepared, ctx).await
}
/// Switch aux file policy and schedule upload to the index part.

View File

@@ -221,6 +221,8 @@ impl DeleteTimelineFlow {
// Now that the Timeline is in Stopping state, request all the related tasks to shut down.
timeline.shutdown(super::ShutdownMode::Hard).await;
tenant.ongoing_timeline_detach.on_delete(&timeline);
fail::fail_point!("timeline-delete-before-index-deleted-at", |_| {
Err(anyhow::anyhow!(
"failpoint: timeline-delete-before-index-deleted-at"

File diff suppressed because it is too large Load Diff

View File

@@ -2842,6 +2842,7 @@ impl Service {
);
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref());
client
.timeline_detach_ancestor(tenant_shard_id, timeline_id)
.await
@@ -2858,7 +2859,8 @@ impl Service {
Error::ApiError(StatusCode::BAD_REQUEST, msg) => {
ApiError::BadRequest(anyhow::anyhow!("{node}: {msg}"))
}
// rest can be mapped
// rest can be mapped as usual
// FIXME: this converts some 500 to 409 which is not per openapi
other => passthrough_api_error(&node, other),
}
})
@@ -2866,7 +2868,6 @@ impl Service {
}
// no shard needs to go first/last; the operation should be idempotent
// TODO: it would be great to ensure that all shards return the same error
let mut results = self
.tenant_for_shards(targets, |tenant_shard_id, node| {
futures::FutureExt::boxed(detach_one(
@@ -2885,6 +2886,7 @@ impl Service {
.filter(|(_, res)| res != &any.1)
.collect::<Vec<_>>();
if !mismatching.is_empty() {
// this can be hit by races which should not happen because operation lock on cplane
let matching = results.len() - mismatching.len();
tracing::error!(
matching,

View File

@@ -837,7 +837,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
timeline_id: TimelineId,
batch_size: int | None = None,
**kwargs,
) -> List[TimelineId]:
) -> Set[TimelineId]:
params = {}
if batch_size is not None:
params["batch_size"] = batch_size
@@ -848,7 +848,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
)
self.verbose_error(res)
json = res.json()
return list(map(TimelineId, json["reparented_timelines"]))
return set(map(TimelineId, json["reparented_timelines"]))
def evict_layer(
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, layer_name: str

View File

@@ -5,7 +5,7 @@ import time
from concurrent.futures import ThreadPoolExecutor
from queue import Empty, Queue
from threading import Barrier
from typing import List, Tuple
from typing import List, Set, Tuple
import pytest
from fixtures.common_types import Lsn, TimelineId
@@ -165,7 +165,7 @@ def test_ancestor_detach_branched_from(
)
all_reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
assert all_reparented == []
assert all_reparented == set()
if restart_after:
env.pageserver.stop()
@@ -534,7 +534,7 @@ def test_compaction_induced_by_detaches_in_history(
for _, timeline_id in skip_main:
reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
assert reparented == [], "we have no earlier branches at any level"
assert reparented == set(), "we have no earlier branches at any level"
post_detach_l0s = list(filter(lambda x: x.l0, delta_layers(branch_timeline_id)))
assert len(post_detach_l0s) == 5, "should had inherited 4 L0s, have 5 in total"
@@ -774,7 +774,7 @@ def test_sharded_timeline_detach_ancestor(neon_env_builder: NeonEnvBuilder):
else:
break
assert reparented == [], "too many retries (None) or unexpected reparentings"
assert reparented == set(), "too many retries (None) or unexpected reparentings"
for shard_info in shards:
node_id = int(shard_info["node_id"])
@@ -807,22 +807,24 @@ def test_timeline_detach_ancestor_interrupted_by_deletion(
What remains not tested by this:
- shutdown winning over complete
Shutdown winning over complete needs gc blocking and reparenting any left-overs on retry.
"""
if sharded and mode == "delete_tenant":
# the shared/exclusive lock for tenant is blocking this:
# timeline detach ancestor takes shared, delete tenant takes exclusive
pytest.skip(
"tenant deletion while timeline ancestor detach is underway is not supported yet"
)
pytest.skip("tenant deletion while timeline ancestor detach is underway cannot happen")
shard_count = 2 if sharded else 1
neon_env_builder.num_pageservers = shard_count
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count if sharded else None)
env = neon_env_builder.init_start(
initial_tenant_shard_count=shard_count if sharded else None,
initial_tenant_conf={
"gc_period": "1s",
"lsn_lease_length": "0s",
},
)
for ps in env.pageservers:
ps.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
@@ -831,7 +833,7 @@ def test_timeline_detach_ancestor_interrupted_by_deletion(
detached_timeline = env.neon_cli.create_branch("detached soon", "main")
failpoint = "timeline-detach-ancestor::before_starting_after_locking_pausable"
pausepoint = "timeline-detach-ancestor::before_starting_after_locking_pausable"
env.storage_controller.reconcile_until_idle()
shards = env.storage_controller.locate(env.initial_tenant)
@@ -843,13 +845,20 @@ def test_timeline_detach_ancestor_interrupted_by_deletion(
victim = pageservers[int(shards[-1]["node_id"])]
victim_http = victim.http_client()
victim_http.configure_failpoints((failpoint, "pause"))
victim_http.configure_failpoints((pausepoint, "pause"))
def detach_ancestor():
target.detach_ancestor(env.initial_tenant, detached_timeline)
def at_failpoint() -> Tuple[str, LogCursor]:
return victim.assert_log_contains(f"at failpoint {failpoint}")
def at_failpoint() -> LogCursor:
msg, offset = victim.assert_log_contains(f"at failpoint {pausepoint}")
log.info(f"found {msg}")
msg, offset = victim.assert_log_contains(
".* gc_loop.*: Skipping GC while there is an ongoing detach_ancestor attempt",
offset,
)
log.info(f"found {msg}")
return offset
def start_delete():
if mode == "delete_timeline":
@@ -882,23 +891,44 @@ def test_timeline_detach_ancestor_interrupted_by_deletion(
with ThreadPoolExecutor(max_workers=2) as pool:
try:
fut = pool.submit(detach_ancestor)
_, offset = wait_until(10, 1.0, at_failpoint)
offset = wait_until(10, 1.0, at_failpoint)
delete = pool.submit(start_delete)
wait_until(10, 1.0, lambda: at_waiting_on_gate_close(offset))
offset = wait_until(10, 1.0, lambda: at_waiting_on_gate_close(offset))
victim_http.configure_failpoints((failpoint, "off"))
victim_http.configure_failpoints((pausepoint, "off"))
delete.result()
assert wait_until(10, 1.0, is_deleted), f"unimplemented mode {mode}"
# TODO: match the error
with pytest.raises(PageserverApiException) as exc:
fut.result()
log.info(f"TODO: match this error: {exc.value}")
assert exc.value.status_code == 503
finally:
victim_http.configure_failpoints((failpoint, "off"))
victim_http.configure_failpoints((pausepoint, "off"))
if mode != "delete_timeline":
return
# make sure the gc is unblocked
time.sleep(2)
victim.assert_log_contains(".* gc_loop.*: 1 timelines need GC", offset)
if not sharded:
# we have the other node only while sharded
return
other = pageservers[int(shards[0]["node_id"])]
log.info(f"other is {other.id}")
_, offset = other.assert_log_contains(
".*INFO request\\{method=PUT path=/v1/tenant/\\S+/timeline/\\S+/detach_ancestor .*\\}: Request handled, status: 200 OK",
)
# this might be a lot earlier than the victims line, but that is okay.
_, offset = other.assert_log_contains(".* gc_loop.*: 1 timelines need GC", offset)
@pytest.mark.parametrize("mode", ["delete_reparentable_timeline"])
@@ -915,7 +945,9 @@ def test_sharded_tad_interleaved_after_partial_success(neon_env_builder: NeonEnv
assert (
mode == "delete_reparentable_timeline"
), "only one now, but we could have the create just as well, need gc blocking"
), "only one now, but creating reparentable timelines cannot be supported even with gc blocking"
# perhaps it could be supported by always doing this for the shard0 first, and after that for others.
# when we run shard0 to completion, we can use it's timelines to restrict which can be reparented.
shard_count = 2
neon_env_builder.num_pageservers = shard_count
@@ -1048,10 +1080,267 @@ def test_sharded_tad_interleaved_after_partial_success(neon_env_builder: NeonEnv
victim_http.configure_failpoints((pausepoint, "off"))
def test_retried_detach_ancestor_after_failed_reparenting(neon_env_builder: NeonEnvBuilder):
"""
Using a failpoint, force the completion step of timeline ancestor detach to
fail after reparenting a single timeline.
Retrying should try reparenting until all reparentings are done, all the
time blocking gc even across restarts (first round).
A completion failpoint is used to inhibit completion on second to last
round.
On last round, the completion uses a path where no reparentings can happen
because original ancestor is deleted, and there is a completion to unblock
gc without restart.
"""
# to get the remote storage metrics
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3)
env = neon_env_builder.init_start(
initial_tenant_conf={
"gc_period": "1s",
"lsn_lease_length": "0s",
}
)
env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
env.pageserver.allowed_errors.extend(
[
".* reparenting failed: failpoint: timeline-detach-ancestor::allow_one_reparented",
".* Error processing HTTP request: InternalServerError\\(failed to reparent all candidate timelines, please retry",
".* Error processing HTTP request: InternalServerError\\(failpoint: timeline-detach-ancestor::complete_before_uploading",
]
)
http = env.pageserver.http_client()
def remote_storage_copy_requests():
return http.get_metric_value(
"remote_storage_s3_request_seconds_count",
{"request_type": "copy_object", "result": "ok"},
)
def reparenting_progress(timelines: List[TimelineId]) -> Tuple[int, Set[TimelineId]]:
reparented = 0
not_reparented = set()
for timeline in timelines:
detail = http.timeline_detail(env.initial_tenant, timeline)
ancestor = TimelineId(detail["ancestor_timeline_id"])
if ancestor == detached:
reparented += 1
else:
not_reparented.add(timeline)
return (reparented, not_reparented)
# main ------A-----B-----C-----D-----E> lsn
timelines = []
with env.endpoints.create_start("main") as ep:
for counter in range(5):
ep.safe_psql(
f"create table foo_{counter} as select i::bigint from generate_series(1, 10000) t(i)"
)
branch_lsn = wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
http.timeline_checkpoint(env.initial_tenant, env.initial_timeline)
branch = env.neon_cli.create_branch(
f"branch_{counter}", "main", ancestor_start_lsn=branch_lsn
)
timelines.append(branch)
flush_ep_to_pageserver(env, ep, env.initial_tenant, env.initial_timeline)
# detach "E" which has most reparentable timelines under it
detached = timelines.pop()
assert len(timelines) == 4
http = http.without_status_retrying()
http.configure_failpoints(("timeline-detach-ancestor::allow_one_reparented", "return"))
not_reparented: Set[TimelineId] = set()
# tracked offset in the pageserver log which is at least at the most recent activation
offset = None
def try_detach():
with pytest.raises(
PageserverApiException,
match=".*failed to reparent all candidate timelines, please retry",
) as exc:
http.detach_ancestor(env.initial_tenant, detached)
assert exc.value.status_code == 500
# first round -- do more checking to make sure the gc gets paused
try_detach()
assert (
http.timeline_detail(env.initial_tenant, detached)["ancestor_timeline_id"] is None
), "first round should had detached 'detached'"
reparented, not_reparented = reparenting_progress(timelines)
assert reparented == 1
time.sleep(2)
_, offset = env.pageserver.assert_log_contains(
".*INFO request\\{method=PUT path=/v1/tenant/[0-9a-f]{32}/timeline/[0-9a-f]{32}/detach_ancestor .*\\}: Handling request",
offset,
)
_, offset = env.pageserver.assert_log_contains(".*: attach finished, activating", offset)
_, offset = env.pageserver.assert_log_contains(
".* gc_loop.*: Skipping GC while there is an ongoing detach_ancestor attempt",
offset,
)
metric = remote_storage_copy_requests()
assert metric != 0
# make sure the gc blocking is persistent over a restart
env.pageserver.restart()
env.pageserver.quiesce_tenants()
time.sleep(2)
_, offset = env.pageserver.assert_log_contains(".*: attach finished, activating", offset)
assert env.pageserver.log_contains(".* gc_loop.*: [0-9] timelines need GC", offset) is None
_, offset = env.pageserver.assert_log_contains(
".* gc_loop.*: Skipping GC while there is an ongoing detach_ancestor attempt",
offset,
)
# restore failpoint for the next reparented
http.configure_failpoints(("timeline-detach-ancestor::allow_one_reparented", "return"))
reparented_before = reparented
# do two more rounds
for _ in range(2):
try_detach()
assert (
http.timeline_detail(env.initial_tenant, detached)["ancestor_timeline_id"] is None
), "first round should had detached 'detached'"
reparented, not_reparented = reparenting_progress(timelines)
assert reparented == reparented_before + 1
reparented_before = reparented
_, offset = env.pageserver.assert_log_contains(".*: attach finished, activating", offset)
metric = remote_storage_copy_requests()
assert metric == 0, "copies happen in the first round"
assert offset is not None
assert len(not_reparented) == 1
http.configure_failpoints(("timeline-detach-ancestor::complete_before_uploading", "return"))
# almost final round, the failpoint is hit no longer as there is only one reparented and one always gets to succeed.
# the tenant is restarted once more, but we fail during completing.
with pytest.raises(
PageserverApiException, match=".* timeline-detach-ancestor::complete_before_uploading"
) as exc:
http.detach_ancestor(env.initial_tenant, detached)
assert exc.value.status_code == 500
_, offset = env.pageserver.assert_log_contains(".*: attach finished, activating", offset)
# delete the previous ancestor to take a different path to completion. all
# other tests take the "detach? reparent complete", but this only hits
# "complete".
http.timeline_delete(env.initial_tenant, env.initial_timeline)
wait_timeline_detail_404(http, env.initial_tenant, env.initial_timeline, 20)
http.configure_failpoints(("timeline-detach-ancestor::complete_before_uploading", "off"))
reparented_resp = http.detach_ancestor(env.initial_tenant, detached)
assert reparented_resp == set(timelines)
# no need to quiesce_tenants anymore, because completion does that
reparented, not_reparented = reparenting_progress(timelines)
assert reparented == len(timelines)
time.sleep(2)
assert (
env.pageserver.log_contains(".*: attach finished, activating", offset) is None
), "there should be no restart with the final detach_ancestor as it only completed"
# gc is unblocked
env.pageserver.assert_log_contains(".* gc_loop.*: 5 timelines need GC", offset)
metric = remote_storage_copy_requests()
assert metric == 0
def test_timeline_is_deleted_before_timeline_detach_ancestor_completes(
neon_env_builder: NeonEnvBuilder,
):
"""
Make sure that a timeline deleted after restart will unpause gc blocking.
"""
env = neon_env_builder.init_start(
initial_tenant_conf={
"gc_period": "1s",
"lsn_lease_length": "0s",
}
)
env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
http = env.pageserver.http_client()
detached = env.neon_cli.create_branch("detached")
failpoint = "timeline-detach-ancestor::after_activating_before_finding-pausable"
http.configure_failpoints((failpoint, "pause"))
def detach_and_get_stuck():
return http.detach_ancestor(env.initial_tenant, detached)
def request_processing_noted_in_log():
_, offset = env.pageserver.assert_log_contains(
".*INFO request\\{method=PUT path=/v1/tenant/[0-9a-f]{32}/timeline/[0-9a-f]{32}/detach_ancestor .*\\}: Handling request",
)
return offset
def delete_detached():
return http.timeline_delete(env.initial_tenant, detached)
try:
with ThreadPoolExecutor(max_workers=1) as pool:
detach = pool.submit(detach_and_get_stuck)
offset = wait_until(10, 1.0, request_processing_noted_in_log)
# make this named fn tor more clear failure test output logging
def pausepoint_hit_with_gc_paused() -> LogCursor:
env.pageserver.assert_log_contains(f"at failpoint {failpoint}")
_, at = env.pageserver.assert_log_contains(
".* gc_loop.*: Skipping GC while there is an ongoing detach_ancestor attempt",
offset,
)
return at
offset = wait_until(10, 1.0, pausepoint_hit_with_gc_paused)
delete_detached()
wait_timeline_detail_404(http, env.initial_tenant, detached, 10, 1.0)
http.configure_failpoints((failpoint, "off"))
with pytest.raises(PageserverApiException) as exc:
detach.result()
# FIXME: this should be 404 but because there is another Anyhow conversion it is 500
assert exc.value.status_code == 500
env.pageserver.allowed_errors.append(
".*Error processing HTTP request: InternalServerError\\(detached timeline was not found after restart"
)
finally:
http.configure_failpoints((failpoint, "off"))
# make sure gc has been unblocked
time.sleep(2)
env.pageserver.assert_log_contains(".* gc_loop.*: 1 timelines need GC", offset)
# TODO:
# - after starting the operation, pageserver is shutdown, restarted
# - after starting the operation, bottom-most timeline is deleted, pageserver is restarted, gc is inhibited
# - deletion of reparented while reparenting should fail once, then succeed (?)
# - branch near existing L1 boundary, image layers?
# - investigate: why are layers started at uneven lsn? not just after branching, but in general.
#