mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 21:20:37 +00:00
Compare commits
95 Commits
test-relsi
...
joonas/nth
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dcfd92b6db | ||
|
|
f58636ffdd | ||
|
|
f3ac5bcbe1 | ||
|
|
eb3711b881 | ||
|
|
c864166b32 | ||
|
|
ce9b5ae7bf | ||
|
|
cd2cbe0691 | ||
|
|
7f241bd379 | ||
|
|
ff52901028 | ||
|
|
bb377a3544 | ||
|
|
5ece7af497 | ||
|
|
2be3027fa5 | ||
|
|
14a0517c7f | ||
|
|
dcff25c293 | ||
|
|
f80c37b733 | ||
|
|
b9d0b26cea | ||
|
|
c2c28f211b | ||
|
|
1ebcb1c45b | ||
|
|
66d750ec20 | ||
|
|
ba3a6645e7 | ||
|
|
8885a8c482 | ||
|
|
c8880b69fb | ||
|
|
274b2a611b | ||
|
|
a7153bf9b2 | ||
|
|
8a4236a441 | ||
|
|
7ec927e43b | ||
|
|
22470ef444 | ||
|
|
8248cbb45b | ||
|
|
4dd805b68a | ||
|
|
f582675452 | ||
|
|
48069f68bb | ||
|
|
8f52139913 | ||
|
|
fc4d80bbf2 | ||
|
|
dc83a5a978 | ||
|
|
f4fb08d869 | ||
|
|
75b326faf4 | ||
|
|
c23cd5c149 | ||
|
|
f4cd9fe40b | ||
|
|
43af9484c0 | ||
|
|
842bd4c2db | ||
|
|
ada9a46dca | ||
|
|
742fcac7b9 | ||
|
|
55aeeb5765 | ||
|
|
89426570d3 | ||
|
|
7f767ca18e | ||
|
|
1348dbf0f1 | ||
|
|
a179283f86 | ||
|
|
deb86c1ea1 | ||
|
|
dfdf40916f | ||
|
|
c6d8015fe9 | ||
|
|
b2233d557b | ||
|
|
ce2552ba67 | ||
|
|
f4d773bb89 | ||
|
|
6f28263428 | ||
|
|
1e380ea5af | ||
|
|
8258385301 | ||
|
|
6a8f00dea0 | ||
|
|
44cdb9fb58 | ||
|
|
cdfaf0700f | ||
|
|
881e1ad056 | ||
|
|
bb3d70e24d | ||
|
|
c6c560e4c8 | ||
|
|
8dd332aed5 | ||
|
|
5c03a17eb8 | ||
|
|
402d66778e | ||
|
|
39e2bc932f | ||
|
|
5fc034fa7f | ||
|
|
f9b12def0b | ||
|
|
5d0071447c | ||
|
|
d9eba3f8c3 | ||
|
|
409e2eff9e | ||
|
|
e6e3b9a716 | ||
|
|
7f31a3f671 | ||
|
|
9971ae3d24 | ||
|
|
48a2a20de3 | ||
|
|
29ef8f15ce | ||
|
|
5e45dd3f86 | ||
|
|
5fced442d7 | ||
|
|
4222610233 | ||
|
|
92deb0dfd7 | ||
|
|
46ca6f17c5 | ||
|
|
14869abb77 | ||
|
|
5330fd9366 | ||
|
|
6c5b3b7812 | ||
|
|
849fe0f191 | ||
|
|
f564b66f21 | ||
|
|
2e58ccee78 | ||
|
|
f398ab0264 | ||
|
|
f23ee2ccdb | ||
|
|
0ad31bb7fb | ||
|
|
86f26d0918 | ||
|
|
4a562dff2e | ||
|
|
f9185b42a9 | ||
|
|
d4f30daa81 | ||
|
|
97ab53e826 |
@@ -1,6 +1,8 @@
|
||||
use std::collections::HashSet;
|
||||
|
||||
use utils::id::TimelineId;
|
||||
|
||||
#[derive(Debug, Default, PartialEq, serde::Serialize, serde::Deserialize)]
|
||||
pub struct AncestorDetached {
|
||||
pub reparented_timelines: Vec<TimelineId>,
|
||||
pub reparented_timelines: HashSet<TimelineId>,
|
||||
}
|
||||
|
||||
@@ -8,10 +8,33 @@ pub struct Completion {
|
||||
_token: TaskTrackerToken,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for Completion {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("Completion")
|
||||
.field("siblings", &self._token.task_tracker().len())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl Completion {
|
||||
/// Returns true if this completion is associated with the given barrier.
|
||||
pub fn blocks(&self, barrier: &Barrier) -> bool {
|
||||
TaskTracker::ptr_eq(self._token.task_tracker(), &barrier.0)
|
||||
}
|
||||
}
|
||||
|
||||
/// Barrier will wait until all clones of [`Completion`] have been dropped.
|
||||
#[derive(Clone)]
|
||||
pub struct Barrier(TaskTracker);
|
||||
|
||||
impl std::fmt::Debug for Barrier {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("Barrier")
|
||||
.field("remaining", &self.0.len())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Barrier {
|
||||
fn default() -> Self {
|
||||
let (_, rx) = channel();
|
||||
|
||||
@@ -1811,7 +1811,7 @@ async fn timeline_detach_ancestor_handler(
|
||||
// drop(tenant);
|
||||
|
||||
let resp = match progress {
|
||||
detach_ancestor::Progress::Prepared(_guard, prepared) => {
|
||||
detach_ancestor::Progress::Prepared(attempt, prepared) => {
|
||||
// it would be great to tag the guard on to the tenant activation future
|
||||
let reparented_timelines = state
|
||||
.tenant_manager
|
||||
@@ -1819,10 +1819,10 @@ async fn timeline_detach_ancestor_handler(
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
prepared,
|
||||
attempt,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.context("timeline detach ancestor completion")
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
AncestorDetached {
|
||||
|
||||
@@ -35,6 +35,7 @@ use std::collections::BTreeMap;
|
||||
use std::fmt;
|
||||
use std::time::SystemTime;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use timeline::detach_ancestor;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::sync::watch;
|
||||
use tokio::task::JoinSet;
|
||||
@@ -300,8 +301,11 @@ pub struct Tenant {
|
||||
pub(crate) timeline_get_throttle:
|
||||
Arc<throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>>,
|
||||
|
||||
/// An ongoing timeline detach must be checked during attempts to GC or compact a timeline.
|
||||
ongoing_timeline_detach: std::sync::Mutex<Option<(TimelineId, utils::completion::Barrier)>>,
|
||||
/// An ongoing timeline detach must be checked during attempts to GC a timeline.
|
||||
///
|
||||
/// After starting the timeline detach ancestor, blocking GC until it completes allows retrying
|
||||
/// the ancestor detach, until we can be certain that all reparentings have been done.
|
||||
ongoing_timeline_detach: timeline::detach_ancestor::SharedState,
|
||||
|
||||
l0_flush_global_state: L0FlushGlobalState,
|
||||
}
|
||||
@@ -675,6 +679,7 @@ impl Tenant {
|
||||
shard_identity: ShardIdentity,
|
||||
init_order: Option<InitializationOrder>,
|
||||
mode: SpawnMode,
|
||||
existing_detach_attempt: Option<&detach_ancestor::Attempt>,
|
||||
ctx: &RequestContext,
|
||||
) -> Arc<Tenant> {
|
||||
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
|
||||
@@ -704,6 +709,12 @@ impl Tenant {
|
||||
l0_flush_global_state,
|
||||
));
|
||||
|
||||
if let Some(attempt) = existing_detach_attempt {
|
||||
tenant
|
||||
.ongoing_timeline_detach
|
||||
.continue_existing_attempt(attempt);
|
||||
}
|
||||
|
||||
// The attach task will carry a GateGuard, so that shutdown() reliably waits for it to drop out if
|
||||
// we shut down while attaching.
|
||||
let attach_gate_guard = tenant
|
||||
@@ -756,9 +767,9 @@ impl Tenant {
|
||||
// The Stopping case is for when we have passed control on to DeleteTenantFlow:
|
||||
// if it errors, we will call make_broken when tenant is already in Stopping.
|
||||
assert!(
|
||||
matches!(*state, TenantState::Attaching | TenantState::Stopping { .. }),
|
||||
"the attach task owns the tenant state until activation is complete"
|
||||
);
|
||||
matches!(*state, TenantState::Attaching | TenantState::Stopping { .. }),
|
||||
"the attach task owns the tenant state until activation is complete"
|
||||
);
|
||||
|
||||
*state = TenantState::broken_from_reason(err.to_string());
|
||||
});
|
||||
@@ -983,6 +994,8 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
let mut shared_state_builder = timeline::detach_ancestor::SharedStateBuilder::default();
|
||||
|
||||
// For every timeline, download the metadata file, scan the local directory,
|
||||
// and build a layer map that contains an entry for each remote and local
|
||||
// layer file.
|
||||
@@ -992,6 +1005,8 @@ impl Tenant {
|
||||
.remove(&timeline_id)
|
||||
.expect("just put it in above");
|
||||
|
||||
shared_state_builder.record_loading_timeline(&timeline_id, &index_part);
|
||||
|
||||
// TODO again handle early failure
|
||||
self.load_remote_timeline(
|
||||
timeline_id,
|
||||
@@ -1036,6 +1051,8 @@ impl Tenant {
|
||||
// IndexPart is the source of truth.
|
||||
self.clean_up_timelines(&existent_timelines)?;
|
||||
|
||||
shared_state_builder.build(&self.ongoing_timeline_detach);
|
||||
|
||||
fail::fail_point!("attach-before-activate", |_| {
|
||||
anyhow::bail!("attach-before-activate");
|
||||
});
|
||||
@@ -1608,6 +1625,11 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
if self.ongoing_timeline_detach.attempt_blocks_gc() {
|
||||
info!("Skipping GC while there is an ongoing detach_ancestor attempt");
|
||||
return Ok(GcResult::default());
|
||||
}
|
||||
|
||||
self.gc_iteration_internal(target_timeline_id, horizon, pitr, cancel, ctx)
|
||||
.await
|
||||
}
|
||||
@@ -2615,7 +2637,7 @@ impl Tenant {
|
||||
&crate::metrics::tenant_throttling::TIMELINE_GET,
|
||||
)),
|
||||
tenant_conf: Arc::new(ArcSwap::from_pointee(attached_conf)),
|
||||
ongoing_timeline_detach: std::sync::Mutex::default(),
|
||||
ongoing_timeline_detach: Default::default(),
|
||||
l0_flush_global_state,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -285,21 +285,30 @@ impl TimelineMetadata {
|
||||
}
|
||||
|
||||
/// When reparenting, the `ancestor_lsn` does not change.
|
||||
pub fn reparent(&mut self, timeline: &TimelineId) {
|
||||
///
|
||||
/// Returns true if anything was changed.
|
||||
pub fn reparent(&mut self, timeline: &TimelineId) -> bool {
|
||||
assert!(self.body.ancestor_timeline.is_some());
|
||||
// no assertion for redoing this: it's fine, we may have to repeat this multiple times over
|
||||
let prev = self.body.ancestor_timeline;
|
||||
self.body.ancestor_timeline = Some(*timeline);
|
||||
prev.as_ref() != Some(timeline)
|
||||
}
|
||||
|
||||
pub fn detach_from_ancestor(&mut self, branchpoint: &(TimelineId, Lsn)) {
|
||||
/// Returns true if anything was changed
|
||||
pub fn detach_from_ancestor(&mut self, branchpoint: &(TimelineId, Lsn)) -> bool {
|
||||
let mut changed = false;
|
||||
if let Some(ancestor) = self.body.ancestor_timeline {
|
||||
assert_eq!(ancestor, branchpoint.0);
|
||||
changed = true;
|
||||
}
|
||||
if self.body.ancestor_lsn != Lsn(0) {
|
||||
assert_eq!(self.body.ancestor_lsn, branchpoint.1);
|
||||
changed = true;
|
||||
}
|
||||
self.body.ancestor_timeline = None;
|
||||
self.body.ancestor_lsn = Lsn(0);
|
||||
changed
|
||||
}
|
||||
|
||||
pub fn latest_gc_cutoff_lsn(&self) -> Lsn {
|
||||
|
||||
@@ -13,7 +13,7 @@ use pageserver_api::upcall_api::ReAttachResponseTenant;
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use std::borrow::Cow;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
@@ -54,7 +54,7 @@ use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use super::remote_timeline_client::remote_tenant_path;
|
||||
use super::secondary::SecondaryTenant;
|
||||
use super::timeline::detach_ancestor::PreparedTimelineDetach;
|
||||
use super::timeline::detach_ancestor::{self, PreparedTimelineDetach};
|
||||
use super::TenantSharedResources;
|
||||
|
||||
/// For a tenant that appears in TenantsMap, it may either be
|
||||
@@ -676,6 +676,7 @@ pub async fn init_tenant_mgr(
|
||||
shard_identity,
|
||||
Some(init_order.clone()),
|
||||
SpawnMode::Lazy,
|
||||
None,
|
||||
&ctx,
|
||||
)),
|
||||
LocationMode::Secondary(secondary_conf) => {
|
||||
@@ -724,6 +725,7 @@ fn tenant_spawn(
|
||||
shard_identity: ShardIdentity,
|
||||
init_order: Option<InitializationOrder>,
|
||||
mode: SpawnMode,
|
||||
existing_detach_attempt: Option<&detach_ancestor::Attempt>,
|
||||
ctx: &RequestContext,
|
||||
) -> Arc<Tenant> {
|
||||
// All these conditions should have been satisfied by our caller: the tenant dir exists, is a well formed
|
||||
@@ -744,6 +746,7 @@ fn tenant_spawn(
|
||||
shard_identity,
|
||||
init_order,
|
||||
mode,
|
||||
existing_detach_attempt,
|
||||
ctx,
|
||||
)
|
||||
}
|
||||
@@ -1191,6 +1194,7 @@ impl TenantManager {
|
||||
shard_identity,
|
||||
None,
|
||||
spawn_mode,
|
||||
None,
|
||||
ctx,
|
||||
);
|
||||
|
||||
@@ -1312,6 +1316,7 @@ impl TenantManager {
|
||||
shard_identity,
|
||||
None,
|
||||
SpawnMode::Eager,
|
||||
None,
|
||||
ctx,
|
||||
);
|
||||
|
||||
@@ -1968,8 +1973,10 @@ impl TenantManager {
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
prepared: PreparedTimelineDetach,
|
||||
mut attempt: detach_ancestor::Attempt,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<TimelineId>, anyhow::Error> {
|
||||
) -> Result<HashSet<TimelineId>, anyhow::Error> {
|
||||
// FIXME: this is unnecessary, slotguard already has these semantics
|
||||
struct RevertOnDropSlot(Option<SlotGuard>);
|
||||
|
||||
impl Drop for RevertOnDropSlot {
|
||||
@@ -2017,43 +2024,66 @@ impl TenantManager {
|
||||
|
||||
let timeline = tenant.get_timeline(timeline_id, true)?;
|
||||
|
||||
let reparented = timeline
|
||||
.complete_detaching_timeline_ancestor(&tenant, prepared, ctx)
|
||||
let resp = timeline
|
||||
.detach_from_ancestor_and_reparent(&tenant, prepared, ctx)
|
||||
.await?;
|
||||
|
||||
let mut slot_guard = slot_guard.into_inner();
|
||||
|
||||
let (_guard, progress) = utils::completion::channel();
|
||||
match tenant.shutdown(progress, ShutdownMode::Hard).await {
|
||||
Ok(()) => {
|
||||
slot_guard.drop_old_value()?;
|
||||
}
|
||||
Err(_barrier) => {
|
||||
slot_guard.revert();
|
||||
// this really should not happen, at all, unless shutdown was already going?
|
||||
anyhow::bail!("Cannot restart Tenant, already shutting down");
|
||||
let tenant = if resp.reset_tenant_required() {
|
||||
attempt.before_shutdown();
|
||||
|
||||
let (_guard, progress) = utils::completion::channel();
|
||||
match tenant.shutdown(progress, ShutdownMode::Hard).await {
|
||||
Ok(()) => {
|
||||
slot_guard.drop_old_value()?;
|
||||
}
|
||||
Err(_barrier) => {
|
||||
slot_guard.revert();
|
||||
// this really should not happen, at all, unless shutdown was already going?
|
||||
anyhow::bail!("Cannot restart Tenant, already shutting down");
|
||||
}
|
||||
}
|
||||
|
||||
let tenant_path = self.conf.tenant_path(&tenant_shard_id);
|
||||
let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
|
||||
|
||||
let shard_identity = config.shard;
|
||||
let tenant = tenant_spawn(
|
||||
self.conf,
|
||||
tenant_shard_id,
|
||||
&tenant_path,
|
||||
self.resources.clone(),
|
||||
AttachedTenantConf::try_from(config)?,
|
||||
shard_identity,
|
||||
None,
|
||||
SpawnMode::Eager,
|
||||
Some(&attempt),
|
||||
ctx,
|
||||
);
|
||||
|
||||
slot_guard.upsert(TenantSlot::Attached(tenant.clone()))?;
|
||||
tenant
|
||||
} else {
|
||||
tracing::info!("skipping tenant_reset as no changes made required it");
|
||||
tenant
|
||||
};
|
||||
|
||||
if let Some(reparented) = resp.completed() {
|
||||
// finally ask the restarted tenant to complete the detach
|
||||
tenant
|
||||
.ongoing_timeline_detach
|
||||
.complete(attempt, &tenant)
|
||||
.await?;
|
||||
Ok(reparented)
|
||||
} else {
|
||||
// at least the latest versions have now been downloaded and refreshed; be ready to
|
||||
// retry another time.
|
||||
tenant.ongoing_timeline_detach.cancel(attempt);
|
||||
Err(anyhow::anyhow!(
|
||||
"failed to reparent all candidate timelines, please retry"
|
||||
))
|
||||
}
|
||||
|
||||
let tenant_path = self.conf.tenant_path(&tenant_shard_id);
|
||||
let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
|
||||
|
||||
let shard_identity = config.shard;
|
||||
let tenant = tenant_spawn(
|
||||
self.conf,
|
||||
tenant_shard_id,
|
||||
&tenant_path,
|
||||
self.resources.clone(),
|
||||
AttachedTenantConf::try_from(config)?,
|
||||
shard_identity,
|
||||
None,
|
||||
SpawnMode::Eager,
|
||||
ctx,
|
||||
);
|
||||
|
||||
slot_guard.upsert(TenantSlot::Attached(tenant))?;
|
||||
|
||||
Ok(reparented)
|
||||
}
|
||||
|
||||
/// A page service client sends a TenantId, and to look up the correct Tenant we must
|
||||
|
||||
@@ -683,12 +683,13 @@ impl RemoteTimelineClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Reparent this timeline to a new parent.
|
||||
///
|
||||
/// A retryable step of timeline ancestor detach.
|
||||
pub(crate) async fn schedule_reparenting_and_wait(
|
||||
self: &Arc<Self>,
|
||||
new_parent: &TimelineId,
|
||||
) -> anyhow::Result<()> {
|
||||
// FIXME: because of how Timeline::schedule_uploads works when called from layer flushing
|
||||
// and reads the in-memory part we cannot do the detaching like this
|
||||
let receiver = {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
@@ -699,17 +700,29 @@ impl RemoteTimelineClient {
|
||||
));
|
||||
};
|
||||
|
||||
upload_queue.dirty.metadata.reparent(new_parent);
|
||||
upload_queue.dirty.lineage.record_previous_ancestor(&prev);
|
||||
let uploaded = &upload_queue.clean.0.metadata;
|
||||
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
if uploaded.ancestor_timeline().is_none() && !uploaded.ancestor_lsn().is_valid() {
|
||||
// nothing to do
|
||||
None
|
||||
} else {
|
||||
let mut modified = false;
|
||||
|
||||
self.schedule_barrier0(upload_queue)
|
||||
modified |= upload_queue.dirty.metadata.reparent(new_parent);
|
||||
modified |= upload_queue.dirty.lineage.record_previous_ancestor(&prev);
|
||||
|
||||
if modified {
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
}
|
||||
|
||||
Some(self.schedule_barrier0(upload_queue))
|
||||
}
|
||||
};
|
||||
|
||||
Self::wait_completion0(receiver)
|
||||
.await
|
||||
.context("wait completion")
|
||||
if let Some(receiver) = receiver {
|
||||
Self::wait_completion0(receiver).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Schedules uploading a new version of `index_part.json` with the given layers added,
|
||||
@@ -725,26 +738,121 @@ impl RemoteTimelineClient {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
upload_queue.dirty.metadata.detach_from_ancestor(&adopted);
|
||||
upload_queue.dirty.lineage.record_detaching(&adopted);
|
||||
if upload_queue.clean.0.lineage.detached_previous_ancestor() == Some(adopted) {
|
||||
None
|
||||
} else {
|
||||
let mut modified = false;
|
||||
modified |= upload_queue.dirty.metadata.detach_from_ancestor(&adopted);
|
||||
modified |= upload_queue.dirty.lineage.record_detaching(&adopted);
|
||||
|
||||
for layer in layers {
|
||||
upload_queue
|
||||
.dirty
|
||||
.layer_metadata
|
||||
.insert(layer.layer_desc().layer_name(), layer.metadata());
|
||||
for layer in layers {
|
||||
let prev = upload_queue
|
||||
.dirty
|
||||
.layer_metadata
|
||||
.insert(layer.layer_desc().layer_name(), layer.metadata());
|
||||
modified |= prev.is_none();
|
||||
}
|
||||
|
||||
if modified {
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
}
|
||||
|
||||
Some(self.schedule_barrier0(upload_queue))
|
||||
}
|
||||
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
|
||||
let barrier = self.schedule_barrier0(upload_queue);
|
||||
self.launch_queued_tasks(upload_queue);
|
||||
barrier
|
||||
};
|
||||
|
||||
Self::wait_completion0(barrier)
|
||||
.await
|
||||
.context("wait completion")
|
||||
if let Some(barrier) = barrier {
|
||||
Self::wait_completion0(barrier).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Marks timeline detach ancestor started for this timeline if it has not been marked as
|
||||
/// started.
|
||||
///
|
||||
/// A retryable step o ftimeline detach ancestor.
|
||||
///
|
||||
/// Does not overwrite or even error if the set of reparentable timelines differes. Those can
|
||||
/// be inspected later.
|
||||
///
|
||||
/// Waits until the completion of the upload.
|
||||
pub(crate) async fn schedule_started_detach_ancestor_mark_and_wait(
|
||||
self: &Arc<Self>,
|
||||
) -> anyhow::Result<()> {
|
||||
let maybe_barrier = {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
fn wanted(x: Option<&index::GcBlocking>) -> bool {
|
||||
x.is_some_and(|b| b.blocked_by_detach_ancestor())
|
||||
}
|
||||
|
||||
let current = upload_queue.dirty.gc_blocking.as_ref();
|
||||
let uploaded = upload_queue.clean.0.gc_blocking.as_ref();
|
||||
|
||||
match (current, uploaded) {
|
||||
(x, y) if wanted(x) && wanted(y) => None,
|
||||
(x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)),
|
||||
_ => {
|
||||
// at this point, the metadata must always show that there is a parent
|
||||
if upload_queue.dirty.metadata.ancestor_timeline().is_none() {
|
||||
panic!("cannot start detach ancestor if there is nothing to detach from");
|
||||
}
|
||||
upload_queue.dirty.gc_blocking = current
|
||||
.map(|x| x.with_detach_ancestor())
|
||||
.or_else(|| Some(index::GcBlocking::started_now_for_detach_ancestor()));
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
Some(self.schedule_barrier0(upload_queue))
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(barrier) = maybe_barrier {
|
||||
Self::wait_completion0(barrier).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Marks timeline detach ancestor completed for this timeline if it has not been marked as
|
||||
/// such already.
|
||||
///
|
||||
/// ## Panics
|
||||
///
|
||||
/// If the timeline has not been detached from ancestor already.
|
||||
pub(crate) async fn schedule_completed_detach_ancestor_mark_and_wait(
|
||||
self: &Arc<Self>,
|
||||
) -> anyhow::Result<()> {
|
||||
let maybe_barrier = {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
assert!(upload_queue.clean.0.lineage.is_detached_from_ancestor());
|
||||
|
||||
fn wanted(x: Option<&index::GcBlocking>) -> bool {
|
||||
x.is_none() || x.is_some_and(|b| !b.blocked_by_detach_ancestor())
|
||||
}
|
||||
|
||||
let current = upload_queue.dirty.gc_blocking.as_ref();
|
||||
let uploaded = upload_queue.clean.0.gc_blocking.as_ref();
|
||||
|
||||
match (current, uploaded) {
|
||||
(x, y) if wanted(x) && wanted(y) => None,
|
||||
(x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)),
|
||||
_ => {
|
||||
upload_queue.dirty.gc_blocking = current
|
||||
.expect("has to be Some because of wanted()")
|
||||
.without_detach_ancestor();
|
||||
assert!(wanted(upload_queue.dirty.gc_blocking.as_ref()));
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
Some(self.schedule_barrier0(upload_queue))
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(barrier) = maybe_barrier {
|
||||
Self::wait_completion0(barrier).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Launch an upload operation in the background; the file is added to be included in next
|
||||
|
||||
@@ -56,6 +56,9 @@ pub struct IndexPart {
|
||||
#[serde(default)]
|
||||
pub(crate) lineage: Lineage,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
pub(crate) gc_blocking: Option<GcBlocking>,
|
||||
|
||||
/// Describes the kind of aux files stored in the timeline.
|
||||
///
|
||||
/// The value is modified during file ingestion when the latest wanted value communicated via tenant config is applied if it is acceptable.
|
||||
@@ -80,10 +83,11 @@ impl IndexPart {
|
||||
/// - 5: lineage was added
|
||||
/// - 6: last_aux_file_policy is added.
|
||||
/// - 7: metadata_bytes is no longer written, but still read
|
||||
const LATEST_VERSION: usize = 7;
|
||||
/// - 8: +gc_blocking
|
||||
const LATEST_VERSION: usize = 8;
|
||||
|
||||
// Versions we may see when reading from a bucket.
|
||||
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7];
|
||||
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8];
|
||||
|
||||
pub const FILE_NAME: &'static str = "index_part.json";
|
||||
|
||||
@@ -95,6 +99,7 @@ impl IndexPart {
|
||||
metadata,
|
||||
deleted_at: None,
|
||||
lineage: Default::default(),
|
||||
gc_blocking: None,
|
||||
last_aux_file_policy: None,
|
||||
}
|
||||
}
|
||||
@@ -205,26 +210,45 @@ fn is_false(b: &bool) -> bool {
|
||||
impl Lineage {
|
||||
const REMEMBER_AT_MOST: usize = 100;
|
||||
|
||||
pub(crate) fn record_previous_ancestor(&mut self, old_ancestor: &TimelineId) {
|
||||
pub(crate) fn record_previous_ancestor(&mut self, old_ancestor: &TimelineId) -> bool {
|
||||
if self.reparenting_history.last() == Some(old_ancestor) {
|
||||
// do not re-record it
|
||||
return;
|
||||
}
|
||||
false
|
||||
} else {
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
let existing = self
|
||||
.reparenting_history
|
||||
.iter()
|
||||
.position(|x| x == old_ancestor);
|
||||
assert_eq!(
|
||||
existing, None,
|
||||
"we cannot reparent onto and off and onto the same timeline twice"
|
||||
);
|
||||
}
|
||||
let drop_oldest = self.reparenting_history.len() + 1 >= Self::REMEMBER_AT_MOST;
|
||||
|
||||
let drop_oldest = self.reparenting_history.len() + 1 >= Self::REMEMBER_AT_MOST;
|
||||
|
||||
self.reparenting_history_truncated |= drop_oldest;
|
||||
if drop_oldest {
|
||||
self.reparenting_history.remove(0);
|
||||
self.reparenting_history_truncated |= drop_oldest;
|
||||
if drop_oldest {
|
||||
self.reparenting_history.remove(0);
|
||||
}
|
||||
self.reparenting_history.push(*old_ancestor);
|
||||
true
|
||||
}
|
||||
self.reparenting_history.push(*old_ancestor);
|
||||
}
|
||||
|
||||
pub(crate) fn record_detaching(&mut self, branchpoint: &(TimelineId, Lsn)) {
|
||||
assert!(self.original_ancestor.is_none());
|
||||
|
||||
self.original_ancestor =
|
||||
Some((branchpoint.0, branchpoint.1, chrono::Utc::now().naive_utc()));
|
||||
/// Returns true if anything changed.
|
||||
pub(crate) fn record_detaching(&mut self, branchpoint: &(TimelineId, Lsn)) -> bool {
|
||||
if let Some((id, lsn, _)) = self.original_ancestor {
|
||||
assert_eq!(id, branchpoint.0);
|
||||
assert_eq!(lsn, branchpoint.1);
|
||||
false
|
||||
} else {
|
||||
assert!(self.original_ancestor.is_none());
|
||||
self.original_ancestor =
|
||||
Some((branchpoint.0, branchpoint.1, chrono::Utc::now().naive_utc()));
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
/// The queried lsn is most likely the basebackup lsn, and this answers question "is it allowed
|
||||
@@ -236,15 +260,53 @@ impl Lineage {
|
||||
.is_some_and(|(_, ancestor_lsn, _)| ancestor_lsn == lsn)
|
||||
}
|
||||
|
||||
pub(crate) fn is_detached_from_original_ancestor(&self) -> bool {
|
||||
/// Returns true if the timeline originally had an ancestor, and no longer has one.
|
||||
pub(crate) fn is_detached_from_ancestor(&self) -> bool {
|
||||
self.original_ancestor.is_some()
|
||||
}
|
||||
|
||||
/// Returns original ancestor timeline id and lsn that this timeline has been detached from.
|
||||
pub(crate) fn detached_previous_ancestor(&self) -> Option<(TimelineId, Lsn)> {
|
||||
self.original_ancestor.map(|(id, lsn, _)| (id, lsn))
|
||||
}
|
||||
|
||||
pub(crate) fn is_reparented(&self) -> bool {
|
||||
!self.reparenting_history.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
/// Right now, the only reason to block gc persistently is detach_ancestor. To use gc blocking more
|
||||
/// broadly, a reason set field needs to be added, and the shared state load time building be
|
||||
/// complicated to avoid detach_ancestor clearing out a manually configured gc blocking.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub(crate) struct GcBlocking {
|
||||
pub(crate) started_at: NaiveDateTime,
|
||||
}
|
||||
|
||||
impl GcBlocking {
|
||||
pub(super) fn started_now_for_detach_ancestor() -> Self {
|
||||
GcBlocking {
|
||||
started_at: chrono::Utc::now().naive_utc(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if detach_ancestor is one of the reasons why the gc is blocked.
|
||||
pub(crate) fn blocked_by_detach_ancestor(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
/// Returns a version of self with the reason of detach_ancestor.
|
||||
pub(super) fn with_detach_ancestor(&self) -> Self {
|
||||
self.clone()
|
||||
}
|
||||
|
||||
/// Returns a version of self without the reason of detach_ancestor. Assumption is that if
|
||||
/// there are no more reasons, we can unblock the gc.
|
||||
pub(super) fn without_detach_ancestor(&self) -> Option<Self> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -285,6 +347,7 @@ mod tests {
|
||||
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
|
||||
deleted_at: None,
|
||||
lineage: Lineage::default(),
|
||||
gc_blocking: None,
|
||||
last_aux_file_policy: None,
|
||||
};
|
||||
|
||||
@@ -327,6 +390,7 @@ mod tests {
|
||||
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
|
||||
deleted_at: None,
|
||||
lineage: Lineage::default(),
|
||||
gc_blocking: None,
|
||||
last_aux_file_policy: None,
|
||||
};
|
||||
|
||||
@@ -370,6 +434,7 @@ mod tests {
|
||||
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
|
||||
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
|
||||
lineage: Lineage::default(),
|
||||
gc_blocking: None,
|
||||
last_aux_file_policy: None,
|
||||
};
|
||||
|
||||
@@ -416,6 +481,7 @@ mod tests {
|
||||
.unwrap(),
|
||||
deleted_at: None,
|
||||
lineage: Lineage::default(),
|
||||
gc_blocking: None,
|
||||
last_aux_file_policy: None,
|
||||
};
|
||||
|
||||
@@ -457,6 +523,7 @@ mod tests {
|
||||
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
|
||||
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
|
||||
lineage: Lineage::default(),
|
||||
gc_blocking: None,
|
||||
last_aux_file_policy: None,
|
||||
};
|
||||
|
||||
@@ -501,6 +568,7 @@ mod tests {
|
||||
reparenting_history: vec![TimelineId::from_str("e1bfd8c633d713d279e6fcd2bcc15b6d").unwrap()],
|
||||
original_ancestor: Some((TimelineId::from_str("e2bfd8c633d713d279e6fcd2bcc15b6d").unwrap(), Lsn::from_str("0/15A7618").unwrap(), parse_naive_datetime("2024-05-07T18:52:36.322426563"))),
|
||||
},
|
||||
gc_blocking: None,
|
||||
last_aux_file_policy: None,
|
||||
};
|
||||
|
||||
@@ -550,6 +618,7 @@ mod tests {
|
||||
reparenting_history: vec![TimelineId::from_str("e1bfd8c633d713d279e6fcd2bcc15b6d").unwrap()],
|
||||
original_ancestor: Some((TimelineId::from_str("e2bfd8c633d713d279e6fcd2bcc15b6d").unwrap(), Lsn::from_str("0/15A7618").unwrap(), parse_naive_datetime("2024-05-07T18:52:36.322426563"))),
|
||||
},
|
||||
gc_blocking: None,
|
||||
last_aux_file_policy: Some(AuxFilePolicy::V2),
|
||||
};
|
||||
|
||||
@@ -604,6 +673,66 @@ mod tests {
|
||||
).with_recalculated_checksum().unwrap(),
|
||||
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
|
||||
lineage: Default::default(),
|
||||
gc_blocking: None,
|
||||
last_aux_file_policy: Default::default(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
|
||||
assert_eq!(part, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn v8_indexpart_is_parsed() {
|
||||
let example = r#"{
|
||||
"version": 8,
|
||||
"layer_metadata":{
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 },
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001 }
|
||||
},
|
||||
"disk_consistent_lsn":"0/16960E8",
|
||||
"metadata": {
|
||||
"disk_consistent_lsn": "0/16960E8",
|
||||
"prev_record_lsn": "0/1696070",
|
||||
"ancestor_timeline": "e45a7f37d3ee2ff17dc14bf4f4e3f52e",
|
||||
"ancestor_lsn": "0/0",
|
||||
"latest_gc_cutoff_lsn": "0/1696070",
|
||||
"initdb_lsn": "0/1696070",
|
||||
"pg_version": 14
|
||||
},
|
||||
"gc_blocking": {
|
||||
"started_at": "2024-07-19T09:00:00.123"
|
||||
}
|
||||
}"#;
|
||||
|
||||
let expected = IndexPart {
|
||||
version: 8,
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata: TimelineMetadata::new(
|
||||
Lsn::from_str("0/16960E8").unwrap(),
|
||||
Some(Lsn::from_str("0/1696070").unwrap()),
|
||||
Some(TimelineId::from_str("e45a7f37d3ee2ff17dc14bf4f4e3f52e").unwrap()),
|
||||
Lsn::INVALID,
|
||||
Lsn::from_str("0/1696070").unwrap(),
|
||||
Lsn::from_str("0/1696070").unwrap(),
|
||||
14,
|
||||
).with_recalculated_checksum().unwrap(),
|
||||
deleted_at: None,
|
||||
lineage: Default::default(),
|
||||
gc_blocking: Some(GcBlocking {
|
||||
started_at: parse_naive_datetime("2024-07-19T09:00:00.123000000"),
|
||||
}),
|
||||
last_aux_file_policy: Default::default(),
|
||||
};
|
||||
|
||||
|
||||
@@ -129,9 +129,11 @@ pub fn start_background_loops(
|
||||
let background_jobs_can_start = background_jobs_can_start.cloned();
|
||||
async move {
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
let can_start = completion::Barrier::maybe_wait(background_jobs_can_start);
|
||||
let can_start = tenant.ongoing_timeline_detach.gc_sleeping_while(can_start);
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => { return Ok(()) },
|
||||
_ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
|
||||
_ = can_start => {}
|
||||
};
|
||||
gc_loop(tenant, cancel)
|
||||
.instrument(info_span!("gc_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
|
||||
@@ -361,14 +363,13 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
if first {
|
||||
first = false;
|
||||
|
||||
if delay_by_lease_length(tenant.get_lsn_lease_length(), &cancel)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
let delays = async {
|
||||
delay_by_lease_length(tenant.get_lsn_lease_length(), &cancel).await?;
|
||||
random_init_delay(period, &cancel).await?;
|
||||
Ok::<_, Cancelled>(())
|
||||
};
|
||||
|
||||
if random_init_delay(period, &cancel).await.is_err() {
|
||||
if tenant.ongoing_timeline_detach.gc_sleeping_while(delays).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -404,8 +405,8 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
let wait_duration = Duration::from_secs_f64(wait_duration);
|
||||
|
||||
error!(
|
||||
"Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
|
||||
);
|
||||
"Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
|
||||
);
|
||||
wait_duration
|
||||
}
|
||||
}
|
||||
@@ -414,7 +415,9 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Gc);
|
||||
|
||||
// Sleep
|
||||
if tokio::time::timeout(sleep_duration, cancel.cancelled())
|
||||
let cancelled = cancel.cancelled();
|
||||
let cancelled = tenant.ongoing_timeline_detach.gc_sleeping_while(cancelled);
|
||||
if tokio::time::timeout(sleep_duration, cancelled)
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
|
||||
@@ -3707,10 +3707,6 @@ impl Timeline {
|
||||
Ok(ancestor.clone())
|
||||
}
|
||||
|
||||
pub(crate) fn get_ancestor_timeline(&self) -> Option<Arc<Timeline>> {
|
||||
self.ancestor_timeline.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn get_shard_identity(&self) -> &ShardIdentity {
|
||||
&self.shard_identity
|
||||
}
|
||||
@@ -4750,18 +4746,21 @@ impl Timeline {
|
||||
detach_ancestor::prepare(self, tenant, options, ctx).await
|
||||
}
|
||||
|
||||
/// Completes the ancestor detach. This method is to be called while holding the
|
||||
/// TenantManager's tenant slot, so during this method we cannot be deleted nor can any
|
||||
/// timeline be deleted. After this method returns successfully, tenant must be reloaded.
|
||||
/// Second step of detach from ancestor; detaches the `self` from it's current ancestor and
|
||||
/// reparents any reparentable children of previous ancestor.
|
||||
///
|
||||
/// Pageserver receiving a SIGKILL during this operation is not supported (yet).
|
||||
pub(crate) async fn complete_detaching_timeline_ancestor(
|
||||
/// This method is to be called while holding the TenantManager's tenant slot, so during this
|
||||
/// method we cannot be deleted nor can any timeline be deleted. After this method returns
|
||||
/// successfully, tenant must be reloaded.
|
||||
///
|
||||
/// Final step will be to complete after optionally resetting the tenant.
|
||||
pub(crate) async fn detach_from_ancestor_and_reparent(
|
||||
self: &Arc<Timeline>,
|
||||
tenant: &crate::tenant::Tenant,
|
||||
prepared: detach_ancestor::PreparedTimelineDetach,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<TimelineId>, anyhow::Error> {
|
||||
detach_ancestor::complete(self, tenant, prepared, ctx).await
|
||||
) -> Result<detach_ancestor::DetachingAndReparenting, anyhow::Error> {
|
||||
detach_ancestor::detach_and_reparent(self, tenant, prepared, ctx).await
|
||||
}
|
||||
|
||||
/// Switch aux file policy and schedule upload to the index part.
|
||||
|
||||
@@ -221,6 +221,8 @@ impl DeleteTimelineFlow {
|
||||
// Now that the Timeline is in Stopping state, request all the related tasks to shut down.
|
||||
timeline.shutdown(super::ShutdownMode::Hard).await;
|
||||
|
||||
tenant.ongoing_timeline_detach.on_delete(&timeline);
|
||||
|
||||
fail::fail_point!("timeline-delete-before-index-deleted-at", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: timeline-delete-before-index-deleted-at"
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -2842,6 +2842,7 @@ impl Service {
|
||||
);
|
||||
|
||||
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref());
|
||||
|
||||
client
|
||||
.timeline_detach_ancestor(tenant_shard_id, timeline_id)
|
||||
.await
|
||||
@@ -2858,7 +2859,8 @@ impl Service {
|
||||
Error::ApiError(StatusCode::BAD_REQUEST, msg) => {
|
||||
ApiError::BadRequest(anyhow::anyhow!("{node}: {msg}"))
|
||||
}
|
||||
// rest can be mapped
|
||||
// rest can be mapped as usual
|
||||
// FIXME: this converts some 500 to 409 which is not per openapi
|
||||
other => passthrough_api_error(&node, other),
|
||||
}
|
||||
})
|
||||
@@ -2866,7 +2868,6 @@ impl Service {
|
||||
}
|
||||
|
||||
// no shard needs to go first/last; the operation should be idempotent
|
||||
// TODO: it would be great to ensure that all shards return the same error
|
||||
let mut results = self
|
||||
.tenant_for_shards(targets, |tenant_shard_id, node| {
|
||||
futures::FutureExt::boxed(detach_one(
|
||||
@@ -2885,6 +2886,7 @@ impl Service {
|
||||
.filter(|(_, res)| res != &any.1)
|
||||
.collect::<Vec<_>>();
|
||||
if !mismatching.is_empty() {
|
||||
// this can be hit by races which should not happen because operation lock on cplane
|
||||
let matching = results.len() - mismatching.len();
|
||||
tracing::error!(
|
||||
matching,
|
||||
|
||||
@@ -837,7 +837,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
timeline_id: TimelineId,
|
||||
batch_size: int | None = None,
|
||||
**kwargs,
|
||||
) -> List[TimelineId]:
|
||||
) -> Set[TimelineId]:
|
||||
params = {}
|
||||
if batch_size is not None:
|
||||
params["batch_size"] = batch_size
|
||||
@@ -848,7 +848,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
)
|
||||
self.verbose_error(res)
|
||||
json = res.json()
|
||||
return list(map(TimelineId, json["reparented_timelines"]))
|
||||
return set(map(TimelineId, json["reparented_timelines"]))
|
||||
|
||||
def evict_layer(
|
||||
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, layer_name: str
|
||||
|
||||
@@ -5,7 +5,7 @@ import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from queue import Empty, Queue
|
||||
from threading import Barrier
|
||||
from typing import List, Tuple
|
||||
from typing import List, Set, Tuple
|
||||
|
||||
import pytest
|
||||
from fixtures.common_types import Lsn, TimelineId
|
||||
@@ -165,7 +165,7 @@ def test_ancestor_detach_branched_from(
|
||||
)
|
||||
|
||||
all_reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
|
||||
assert all_reparented == []
|
||||
assert all_reparented == set()
|
||||
|
||||
if restart_after:
|
||||
env.pageserver.stop()
|
||||
@@ -534,7 +534,7 @@ def test_compaction_induced_by_detaches_in_history(
|
||||
|
||||
for _, timeline_id in skip_main:
|
||||
reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
|
||||
assert reparented == [], "we have no earlier branches at any level"
|
||||
assert reparented == set(), "we have no earlier branches at any level"
|
||||
|
||||
post_detach_l0s = list(filter(lambda x: x.l0, delta_layers(branch_timeline_id)))
|
||||
assert len(post_detach_l0s) == 5, "should had inherited 4 L0s, have 5 in total"
|
||||
@@ -774,7 +774,7 @@ def test_sharded_timeline_detach_ancestor(neon_env_builder: NeonEnvBuilder):
|
||||
else:
|
||||
break
|
||||
|
||||
assert reparented == [], "too many retries (None) or unexpected reparentings"
|
||||
assert reparented == set(), "too many retries (None) or unexpected reparentings"
|
||||
|
||||
for shard_info in shards:
|
||||
node_id = int(shard_info["node_id"])
|
||||
@@ -807,22 +807,24 @@ def test_timeline_detach_ancestor_interrupted_by_deletion(
|
||||
|
||||
What remains not tested by this:
|
||||
- shutdown winning over complete
|
||||
|
||||
Shutdown winning over complete needs gc blocking and reparenting any left-overs on retry.
|
||||
"""
|
||||
|
||||
if sharded and mode == "delete_tenant":
|
||||
# the shared/exclusive lock for tenant is blocking this:
|
||||
# timeline detach ancestor takes shared, delete tenant takes exclusive
|
||||
pytest.skip(
|
||||
"tenant deletion while timeline ancestor detach is underway is not supported yet"
|
||||
)
|
||||
pytest.skip("tenant deletion while timeline ancestor detach is underway cannot happen")
|
||||
|
||||
shard_count = 2 if sharded else 1
|
||||
|
||||
neon_env_builder.num_pageservers = shard_count
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count if sharded else None)
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_shard_count=shard_count if sharded else None,
|
||||
initial_tenant_conf={
|
||||
"gc_period": "1s",
|
||||
"lsn_lease_length": "0s",
|
||||
},
|
||||
)
|
||||
|
||||
for ps in env.pageservers:
|
||||
ps.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
|
||||
@@ -831,7 +833,7 @@ def test_timeline_detach_ancestor_interrupted_by_deletion(
|
||||
|
||||
detached_timeline = env.neon_cli.create_branch("detached soon", "main")
|
||||
|
||||
failpoint = "timeline-detach-ancestor::before_starting_after_locking_pausable"
|
||||
pausepoint = "timeline-detach-ancestor::before_starting_after_locking_pausable"
|
||||
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
shards = env.storage_controller.locate(env.initial_tenant)
|
||||
@@ -843,13 +845,20 @@ def test_timeline_detach_ancestor_interrupted_by_deletion(
|
||||
|
||||
victim = pageservers[int(shards[-1]["node_id"])]
|
||||
victim_http = victim.http_client()
|
||||
victim_http.configure_failpoints((failpoint, "pause"))
|
||||
victim_http.configure_failpoints((pausepoint, "pause"))
|
||||
|
||||
def detach_ancestor():
|
||||
target.detach_ancestor(env.initial_tenant, detached_timeline)
|
||||
|
||||
def at_failpoint() -> Tuple[str, LogCursor]:
|
||||
return victim.assert_log_contains(f"at failpoint {failpoint}")
|
||||
def at_failpoint() -> LogCursor:
|
||||
msg, offset = victim.assert_log_contains(f"at failpoint {pausepoint}")
|
||||
log.info(f"found {msg}")
|
||||
msg, offset = victim.assert_log_contains(
|
||||
".* gc_loop.*: Skipping GC while there is an ongoing detach_ancestor attempt",
|
||||
offset,
|
||||
)
|
||||
log.info(f"found {msg}")
|
||||
return offset
|
||||
|
||||
def start_delete():
|
||||
if mode == "delete_timeline":
|
||||
@@ -882,23 +891,44 @@ def test_timeline_detach_ancestor_interrupted_by_deletion(
|
||||
with ThreadPoolExecutor(max_workers=2) as pool:
|
||||
try:
|
||||
fut = pool.submit(detach_ancestor)
|
||||
_, offset = wait_until(10, 1.0, at_failpoint)
|
||||
offset = wait_until(10, 1.0, at_failpoint)
|
||||
|
||||
delete = pool.submit(start_delete)
|
||||
|
||||
wait_until(10, 1.0, lambda: at_waiting_on_gate_close(offset))
|
||||
offset = wait_until(10, 1.0, lambda: at_waiting_on_gate_close(offset))
|
||||
|
||||
victim_http.configure_failpoints((failpoint, "off"))
|
||||
victim_http.configure_failpoints((pausepoint, "off"))
|
||||
|
||||
delete.result()
|
||||
|
||||
assert wait_until(10, 1.0, is_deleted), f"unimplemented mode {mode}"
|
||||
|
||||
# TODO: match the error
|
||||
with pytest.raises(PageserverApiException) as exc:
|
||||
fut.result()
|
||||
log.info(f"TODO: match this error: {exc.value}")
|
||||
assert exc.value.status_code == 503
|
||||
finally:
|
||||
victim_http.configure_failpoints((failpoint, "off"))
|
||||
victim_http.configure_failpoints((pausepoint, "off"))
|
||||
|
||||
if mode != "delete_timeline":
|
||||
return
|
||||
|
||||
# make sure the gc is unblocked
|
||||
time.sleep(2)
|
||||
victim.assert_log_contains(".* gc_loop.*: 1 timelines need GC", offset)
|
||||
|
||||
if not sharded:
|
||||
# we have the other node only while sharded
|
||||
return
|
||||
|
||||
other = pageservers[int(shards[0]["node_id"])]
|
||||
log.info(f"other is {other.id}")
|
||||
_, offset = other.assert_log_contains(
|
||||
".*INFO request\\{method=PUT path=/v1/tenant/\\S+/timeline/\\S+/detach_ancestor .*\\}: Request handled, status: 200 OK",
|
||||
)
|
||||
# this might be a lot earlier than the victims line, but that is okay.
|
||||
_, offset = other.assert_log_contains(".* gc_loop.*: 1 timelines need GC", offset)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("mode", ["delete_reparentable_timeline"])
|
||||
@@ -915,7 +945,9 @@ def test_sharded_tad_interleaved_after_partial_success(neon_env_builder: NeonEnv
|
||||
|
||||
assert (
|
||||
mode == "delete_reparentable_timeline"
|
||||
), "only one now, but we could have the create just as well, need gc blocking"
|
||||
), "only one now, but creating reparentable timelines cannot be supported even with gc blocking"
|
||||
# perhaps it could be supported by always doing this for the shard0 first, and after that for others.
|
||||
# when we run shard0 to completion, we can use it's timelines to restrict which can be reparented.
|
||||
|
||||
shard_count = 2
|
||||
neon_env_builder.num_pageservers = shard_count
|
||||
@@ -1048,10 +1080,267 @@ def test_sharded_tad_interleaved_after_partial_success(neon_env_builder: NeonEnv
|
||||
victim_http.configure_failpoints((pausepoint, "off"))
|
||||
|
||||
|
||||
def test_retried_detach_ancestor_after_failed_reparenting(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Using a failpoint, force the completion step of timeline ancestor detach to
|
||||
fail after reparenting a single timeline.
|
||||
|
||||
Retrying should try reparenting until all reparentings are done, all the
|
||||
time blocking gc even across restarts (first round).
|
||||
|
||||
A completion failpoint is used to inhibit completion on second to last
|
||||
round.
|
||||
|
||||
On last round, the completion uses a path where no reparentings can happen
|
||||
because original ancestor is deleted, and there is a completion to unblock
|
||||
gc without restart.
|
||||
"""
|
||||
|
||||
# to get the remote storage metrics
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3)
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
"gc_period": "1s",
|
||||
"lsn_lease_length": "0s",
|
||||
}
|
||||
)
|
||||
|
||||
env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
|
||||
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".* reparenting failed: failpoint: timeline-detach-ancestor::allow_one_reparented",
|
||||
".* Error processing HTTP request: InternalServerError\\(failed to reparent all candidate timelines, please retry",
|
||||
".* Error processing HTTP request: InternalServerError\\(failpoint: timeline-detach-ancestor::complete_before_uploading",
|
||||
]
|
||||
)
|
||||
|
||||
http = env.pageserver.http_client()
|
||||
|
||||
def remote_storage_copy_requests():
|
||||
return http.get_metric_value(
|
||||
"remote_storage_s3_request_seconds_count",
|
||||
{"request_type": "copy_object", "result": "ok"},
|
||||
)
|
||||
|
||||
def reparenting_progress(timelines: List[TimelineId]) -> Tuple[int, Set[TimelineId]]:
|
||||
reparented = 0
|
||||
not_reparented = set()
|
||||
for timeline in timelines:
|
||||
detail = http.timeline_detail(env.initial_tenant, timeline)
|
||||
ancestor = TimelineId(detail["ancestor_timeline_id"])
|
||||
if ancestor == detached:
|
||||
reparented += 1
|
||||
else:
|
||||
not_reparented.add(timeline)
|
||||
return (reparented, not_reparented)
|
||||
|
||||
# main ------A-----B-----C-----D-----E> lsn
|
||||
timelines = []
|
||||
with env.endpoints.create_start("main") as ep:
|
||||
for counter in range(5):
|
||||
ep.safe_psql(
|
||||
f"create table foo_{counter} as select i::bigint from generate_series(1, 10000) t(i)"
|
||||
)
|
||||
branch_lsn = wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
|
||||
http.timeline_checkpoint(env.initial_tenant, env.initial_timeline)
|
||||
branch = env.neon_cli.create_branch(
|
||||
f"branch_{counter}", "main", ancestor_start_lsn=branch_lsn
|
||||
)
|
||||
timelines.append(branch)
|
||||
|
||||
flush_ep_to_pageserver(env, ep, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
# detach "E" which has most reparentable timelines under it
|
||||
detached = timelines.pop()
|
||||
assert len(timelines) == 4
|
||||
|
||||
http = http.without_status_retrying()
|
||||
|
||||
http.configure_failpoints(("timeline-detach-ancestor::allow_one_reparented", "return"))
|
||||
|
||||
not_reparented: Set[TimelineId] = set()
|
||||
# tracked offset in the pageserver log which is at least at the most recent activation
|
||||
offset = None
|
||||
|
||||
def try_detach():
|
||||
with pytest.raises(
|
||||
PageserverApiException,
|
||||
match=".*failed to reparent all candidate timelines, please retry",
|
||||
) as exc:
|
||||
http.detach_ancestor(env.initial_tenant, detached)
|
||||
assert exc.value.status_code == 500
|
||||
|
||||
# first round -- do more checking to make sure the gc gets paused
|
||||
try_detach()
|
||||
|
||||
assert (
|
||||
http.timeline_detail(env.initial_tenant, detached)["ancestor_timeline_id"] is None
|
||||
), "first round should had detached 'detached'"
|
||||
|
||||
reparented, not_reparented = reparenting_progress(timelines)
|
||||
assert reparented == 1
|
||||
|
||||
time.sleep(2)
|
||||
_, offset = env.pageserver.assert_log_contains(
|
||||
".*INFO request\\{method=PUT path=/v1/tenant/[0-9a-f]{32}/timeline/[0-9a-f]{32}/detach_ancestor .*\\}: Handling request",
|
||||
offset,
|
||||
)
|
||||
_, offset = env.pageserver.assert_log_contains(".*: attach finished, activating", offset)
|
||||
_, offset = env.pageserver.assert_log_contains(
|
||||
".* gc_loop.*: Skipping GC while there is an ongoing detach_ancestor attempt",
|
||||
offset,
|
||||
)
|
||||
metric = remote_storage_copy_requests()
|
||||
assert metric != 0
|
||||
# make sure the gc blocking is persistent over a restart
|
||||
env.pageserver.restart()
|
||||
env.pageserver.quiesce_tenants()
|
||||
time.sleep(2)
|
||||
_, offset = env.pageserver.assert_log_contains(".*: attach finished, activating", offset)
|
||||
assert env.pageserver.log_contains(".* gc_loop.*: [0-9] timelines need GC", offset) is None
|
||||
_, offset = env.pageserver.assert_log_contains(
|
||||
".* gc_loop.*: Skipping GC while there is an ongoing detach_ancestor attempt",
|
||||
offset,
|
||||
)
|
||||
# restore failpoint for the next reparented
|
||||
http.configure_failpoints(("timeline-detach-ancestor::allow_one_reparented", "return"))
|
||||
|
||||
reparented_before = reparented
|
||||
|
||||
# do two more rounds
|
||||
for _ in range(2):
|
||||
try_detach()
|
||||
|
||||
assert (
|
||||
http.timeline_detail(env.initial_tenant, detached)["ancestor_timeline_id"] is None
|
||||
), "first round should had detached 'detached'"
|
||||
|
||||
reparented, not_reparented = reparenting_progress(timelines)
|
||||
assert reparented == reparented_before + 1
|
||||
reparented_before = reparented
|
||||
|
||||
_, offset = env.pageserver.assert_log_contains(".*: attach finished, activating", offset)
|
||||
metric = remote_storage_copy_requests()
|
||||
assert metric == 0, "copies happen in the first round"
|
||||
|
||||
assert offset is not None
|
||||
assert len(not_reparented) == 1
|
||||
|
||||
http.configure_failpoints(("timeline-detach-ancestor::complete_before_uploading", "return"))
|
||||
|
||||
# almost final round, the failpoint is hit no longer as there is only one reparented and one always gets to succeed.
|
||||
# the tenant is restarted once more, but we fail during completing.
|
||||
with pytest.raises(
|
||||
PageserverApiException, match=".* timeline-detach-ancestor::complete_before_uploading"
|
||||
) as exc:
|
||||
http.detach_ancestor(env.initial_tenant, detached)
|
||||
assert exc.value.status_code == 500
|
||||
_, offset = env.pageserver.assert_log_contains(".*: attach finished, activating", offset)
|
||||
|
||||
# delete the previous ancestor to take a different path to completion. all
|
||||
# other tests take the "detach? reparent complete", but this only hits
|
||||
# "complete".
|
||||
http.timeline_delete(env.initial_tenant, env.initial_timeline)
|
||||
wait_timeline_detail_404(http, env.initial_tenant, env.initial_timeline, 20)
|
||||
|
||||
http.configure_failpoints(("timeline-detach-ancestor::complete_before_uploading", "off"))
|
||||
|
||||
reparented_resp = http.detach_ancestor(env.initial_tenant, detached)
|
||||
assert reparented_resp == set(timelines)
|
||||
# no need to quiesce_tenants anymore, because completion does that
|
||||
|
||||
reparented, not_reparented = reparenting_progress(timelines)
|
||||
assert reparented == len(timelines)
|
||||
|
||||
time.sleep(2)
|
||||
assert (
|
||||
env.pageserver.log_contains(".*: attach finished, activating", offset) is None
|
||||
), "there should be no restart with the final detach_ancestor as it only completed"
|
||||
|
||||
# gc is unblocked
|
||||
env.pageserver.assert_log_contains(".* gc_loop.*: 5 timelines need GC", offset)
|
||||
|
||||
metric = remote_storage_copy_requests()
|
||||
assert metric == 0
|
||||
|
||||
|
||||
def test_timeline_is_deleted_before_timeline_detach_ancestor_completes(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
"""
|
||||
Make sure that a timeline deleted after restart will unpause gc blocking.
|
||||
"""
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
"gc_period": "1s",
|
||||
"lsn_lease_length": "0s",
|
||||
}
|
||||
)
|
||||
|
||||
env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
|
||||
|
||||
http = env.pageserver.http_client()
|
||||
|
||||
detached = env.neon_cli.create_branch("detached")
|
||||
|
||||
failpoint = "timeline-detach-ancestor::after_activating_before_finding-pausable"
|
||||
|
||||
http.configure_failpoints((failpoint, "pause"))
|
||||
|
||||
def detach_and_get_stuck():
|
||||
return http.detach_ancestor(env.initial_tenant, detached)
|
||||
|
||||
def request_processing_noted_in_log():
|
||||
_, offset = env.pageserver.assert_log_contains(
|
||||
".*INFO request\\{method=PUT path=/v1/tenant/[0-9a-f]{32}/timeline/[0-9a-f]{32}/detach_ancestor .*\\}: Handling request",
|
||||
)
|
||||
return offset
|
||||
|
||||
def delete_detached():
|
||||
return http.timeline_delete(env.initial_tenant, detached)
|
||||
|
||||
try:
|
||||
with ThreadPoolExecutor(max_workers=1) as pool:
|
||||
detach = pool.submit(detach_and_get_stuck)
|
||||
|
||||
offset = wait_until(10, 1.0, request_processing_noted_in_log)
|
||||
|
||||
# make this named fn tor more clear failure test output logging
|
||||
def pausepoint_hit_with_gc_paused() -> LogCursor:
|
||||
env.pageserver.assert_log_contains(f"at failpoint {failpoint}")
|
||||
_, at = env.pageserver.assert_log_contains(
|
||||
".* gc_loop.*: Skipping GC while there is an ongoing detach_ancestor attempt",
|
||||
offset,
|
||||
)
|
||||
return at
|
||||
|
||||
offset = wait_until(10, 1.0, pausepoint_hit_with_gc_paused)
|
||||
|
||||
delete_detached()
|
||||
|
||||
wait_timeline_detail_404(http, env.initial_tenant, detached, 10, 1.0)
|
||||
|
||||
http.configure_failpoints((failpoint, "off"))
|
||||
|
||||
with pytest.raises(PageserverApiException) as exc:
|
||||
detach.result()
|
||||
|
||||
# FIXME: this should be 404 but because there is another Anyhow conversion it is 500
|
||||
assert exc.value.status_code == 500
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*Error processing HTTP request: InternalServerError\\(detached timeline was not found after restart"
|
||||
)
|
||||
finally:
|
||||
http.configure_failpoints((failpoint, "off"))
|
||||
|
||||
# make sure gc has been unblocked
|
||||
time.sleep(2)
|
||||
|
||||
env.pageserver.assert_log_contains(".* gc_loop.*: 1 timelines need GC", offset)
|
||||
|
||||
|
||||
# TODO:
|
||||
# - after starting the operation, pageserver is shutdown, restarted
|
||||
# - after starting the operation, bottom-most timeline is deleted, pageserver is restarted, gc is inhibited
|
||||
# - deletion of reparented while reparenting should fail once, then succeed (?)
|
||||
# - branch near existing L1 boundary, image layers?
|
||||
# - investigate: why are layers started at uneven lsn? not just after branching, but in general.
|
||||
#
|
||||
|
||||
Reference in New Issue
Block a user