mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-18 21:50:37 +00:00
Compare commits
25 Commits
proxy-http
...
joonas/nth
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
409e2eff9e | ||
|
|
e6e3b9a716 | ||
|
|
7f31a3f671 | ||
|
|
9971ae3d24 | ||
|
|
48a2a20de3 | ||
|
|
29ef8f15ce | ||
|
|
5e45dd3f86 | ||
|
|
5fced442d7 | ||
|
|
4222610233 | ||
|
|
92deb0dfd7 | ||
|
|
46ca6f17c5 | ||
|
|
14869abb77 | ||
|
|
5330fd9366 | ||
|
|
6c5b3b7812 | ||
|
|
849fe0f191 | ||
|
|
f564b66f21 | ||
|
|
2e58ccee78 | ||
|
|
f398ab0264 | ||
|
|
f23ee2ccdb | ||
|
|
0ad31bb7fb | ||
|
|
86f26d0918 | ||
|
|
4a562dff2e | ||
|
|
f9185b42a9 | ||
|
|
d4f30daa81 | ||
|
|
97ab53e826 |
@@ -8,10 +8,33 @@ pub struct Completion {
|
||||
_token: TaskTrackerToken,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for Completion {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("Completion")
|
||||
.field("siblings", &self._token.task_tracker().len())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl Completion {
|
||||
/// Returns true if this completion is associated with the given barrier.
|
||||
pub fn blocks(&self, barrier: &Barrier) -> bool {
|
||||
TaskTracker::ptr_eq(self._token.task_tracker(), &barrier.0)
|
||||
}
|
||||
}
|
||||
|
||||
/// Barrier will wait until all clones of [`Completion`] have been dropped.
|
||||
#[derive(Clone)]
|
||||
pub struct Barrier(TaskTracker);
|
||||
|
||||
impl std::fmt::Debug for Barrier {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("Barrier")
|
||||
.field("remaining", &self.0.len())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Barrier {
|
||||
fn default() -> Self {
|
||||
let (_, rx) = channel();
|
||||
|
||||
@@ -1811,7 +1811,7 @@ async fn timeline_detach_ancestor_handler(
|
||||
// drop(tenant);
|
||||
|
||||
let resp = match progress {
|
||||
detach_ancestor::Progress::Prepared(_guard, prepared) => {
|
||||
detach_ancestor::Progress::Prepared(attempt, prepared) => {
|
||||
// it would be great to tag the guard on to the tenant activation future
|
||||
let reparented_timelines = state
|
||||
.tenant_manager
|
||||
@@ -1819,6 +1819,7 @@ async fn timeline_detach_ancestor_handler(
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
prepared,
|
||||
attempt,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -1832,6 +1833,7 @@ async fn timeline_detach_ancestor_handler(
|
||||
detach_ancestor::Progress::Done(resp) => resp,
|
||||
};
|
||||
|
||||
// FIXME: if the ordering is really needed and not a hashset, move it here?
|
||||
json_response(StatusCode::OK, resp)
|
||||
}
|
||||
.instrument(span)
|
||||
|
||||
@@ -35,6 +35,7 @@ use std::collections::BTreeMap;
|
||||
use std::fmt;
|
||||
use std::time::SystemTime;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use timeline::detach_ancestor;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::sync::watch;
|
||||
use tokio::task::JoinSet;
|
||||
@@ -300,8 +301,11 @@ pub struct Tenant {
|
||||
pub(crate) timeline_get_throttle:
|
||||
Arc<throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>>,
|
||||
|
||||
/// An ongoing timeline detach must be checked during attempts to GC or compact a timeline.
|
||||
ongoing_timeline_detach: std::sync::Mutex<Option<(TimelineId, utils::completion::Barrier)>>,
|
||||
/// An ongoing timeline detach must be checked during attempts to GC a timeline.
|
||||
///
|
||||
/// After starting the timeline detach ancestor, blocking GC until it completes allows retrying
|
||||
/// the ancestor detach, until we can be certain that all reparentings have been done.
|
||||
ongoing_timeline_detach: timeline::detach_ancestor::SharedState,
|
||||
|
||||
l0_flush_global_state: L0FlushGlobalState,
|
||||
}
|
||||
@@ -675,6 +679,7 @@ impl Tenant {
|
||||
shard_identity: ShardIdentity,
|
||||
init_order: Option<InitializationOrder>,
|
||||
mode: SpawnMode,
|
||||
existing_detach_attempt: Option<&detach_ancestor::Attempt>,
|
||||
ctx: &RequestContext,
|
||||
) -> Arc<Tenant> {
|
||||
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
|
||||
@@ -704,6 +709,12 @@ impl Tenant {
|
||||
l0_flush_global_state,
|
||||
));
|
||||
|
||||
if let Some(attempt) = existing_detach_attempt {
|
||||
tenant
|
||||
.ongoing_timeline_detach
|
||||
.continue_existing_attempt(attempt);
|
||||
}
|
||||
|
||||
// The attach task will carry a GateGuard, so that shutdown() reliably waits for it to drop out if
|
||||
// we shut down while attaching.
|
||||
let attach_gate_guard = tenant
|
||||
@@ -756,9 +767,9 @@ impl Tenant {
|
||||
// The Stopping case is for when we have passed control on to DeleteTenantFlow:
|
||||
// if it errors, we will call make_broken when tenant is already in Stopping.
|
||||
assert!(
|
||||
matches!(*state, TenantState::Attaching | TenantState::Stopping { .. }),
|
||||
"the attach task owns the tenant state until activation is complete"
|
||||
);
|
||||
matches!(*state, TenantState::Attaching | TenantState::Stopping { .. }),
|
||||
"the attach task owns the tenant state until activation is complete"
|
||||
);
|
||||
|
||||
*state = TenantState::broken_from_reason(err.to_string());
|
||||
});
|
||||
@@ -983,6 +994,8 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
let mut shared_state_builder = timeline::detach_ancestor::SharedStateBuilder::default();
|
||||
|
||||
// For every timeline, download the metadata file, scan the local directory,
|
||||
// and build a layer map that contains an entry for each remote and local
|
||||
// layer file.
|
||||
@@ -992,6 +1005,8 @@ impl Tenant {
|
||||
.remove(&timeline_id)
|
||||
.expect("just put it in above");
|
||||
|
||||
shared_state_builder.record_loading_timeline(&timeline_id, &index_part);
|
||||
|
||||
// TODO again handle early failure
|
||||
self.load_remote_timeline(
|
||||
timeline_id,
|
||||
@@ -1036,6 +1051,8 @@ impl Tenant {
|
||||
// IndexPart is the source of truth.
|
||||
self.clean_up_timelines(&existent_timelines)?;
|
||||
|
||||
shared_state_builder.build(&self.ongoing_timeline_detach);
|
||||
|
||||
fail::fail_point!("attach-before-activate", |_| {
|
||||
anyhow::bail!("attach-before-activate");
|
||||
});
|
||||
@@ -1608,6 +1625,11 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
if self.ongoing_timeline_detach.attempt_blocks_gc() {
|
||||
info!("Skipping GC while there is an ongoing detach_ancestor attempt");
|
||||
return Ok(GcResult::default());
|
||||
}
|
||||
|
||||
self.gc_iteration_internal(target_timeline_id, horizon, pitr, cancel, ctx)
|
||||
.await
|
||||
}
|
||||
@@ -2615,7 +2637,7 @@ impl Tenant {
|
||||
&crate::metrics::tenant_throttling::TIMELINE_GET,
|
||||
)),
|
||||
tenant_conf: Arc::new(ArcSwap::from_pointee(attached_conf)),
|
||||
ongoing_timeline_detach: std::sync::Mutex::default(),
|
||||
ongoing_timeline_detach: Default::default(),
|
||||
l0_flush_global_state,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,7 +54,7 @@ use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use super::remote_timeline_client::remote_tenant_path;
|
||||
use super::secondary::SecondaryTenant;
|
||||
use super::timeline::detach_ancestor::PreparedTimelineDetach;
|
||||
use super::timeline::detach_ancestor::{self, PreparedTimelineDetach};
|
||||
use super::TenantSharedResources;
|
||||
|
||||
/// For a tenant that appears in TenantsMap, it may either be
|
||||
@@ -676,6 +676,7 @@ pub async fn init_tenant_mgr(
|
||||
shard_identity,
|
||||
Some(init_order.clone()),
|
||||
SpawnMode::Lazy,
|
||||
None,
|
||||
&ctx,
|
||||
)),
|
||||
LocationMode::Secondary(secondary_conf) => {
|
||||
@@ -724,6 +725,7 @@ fn tenant_spawn(
|
||||
shard_identity: ShardIdentity,
|
||||
init_order: Option<InitializationOrder>,
|
||||
mode: SpawnMode,
|
||||
existing_detach_attempt: Option<&detach_ancestor::Attempt>,
|
||||
ctx: &RequestContext,
|
||||
) -> Arc<Tenant> {
|
||||
// All these conditions should have been satisfied by our caller: the tenant dir exists, is a well formed
|
||||
@@ -744,6 +746,7 @@ fn tenant_spawn(
|
||||
shard_identity,
|
||||
init_order,
|
||||
mode,
|
||||
existing_detach_attempt,
|
||||
ctx,
|
||||
)
|
||||
}
|
||||
@@ -1191,6 +1194,7 @@ impl TenantManager {
|
||||
shard_identity,
|
||||
None,
|
||||
spawn_mode,
|
||||
None,
|
||||
ctx,
|
||||
);
|
||||
|
||||
@@ -1312,6 +1316,7 @@ impl TenantManager {
|
||||
shard_identity,
|
||||
None,
|
||||
SpawnMode::Eager,
|
||||
None,
|
||||
ctx,
|
||||
);
|
||||
|
||||
@@ -1968,8 +1973,10 @@ impl TenantManager {
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
prepared: PreparedTimelineDetach,
|
||||
mut attempt: detach_ancestor::Attempt,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<TimelineId>, anyhow::Error> {
|
||||
// FIXME: this is unnecessary, slotguard already has these semantics
|
||||
struct RevertOnDropSlot(Option<SlotGuard>);
|
||||
|
||||
impl Drop for RevertOnDropSlot {
|
||||
@@ -2017,12 +2024,14 @@ impl TenantManager {
|
||||
|
||||
let timeline = tenant.get_timeline(timeline_id, true)?;
|
||||
|
||||
let reparented = timeline
|
||||
let resp = timeline
|
||||
.complete_detaching_timeline_ancestor(&tenant, prepared, ctx)
|
||||
.await?;
|
||||
|
||||
let mut slot_guard = slot_guard.into_inner();
|
||||
|
||||
attempt.before_shutdown();
|
||||
|
||||
let (_guard, progress) = utils::completion::channel();
|
||||
match tenant.shutdown(progress, ShutdownMode::Hard).await {
|
||||
Ok(()) => {
|
||||
@@ -2048,12 +2057,18 @@ impl TenantManager {
|
||||
shard_identity,
|
||||
None,
|
||||
SpawnMode::Eager,
|
||||
Some(&attempt),
|
||||
ctx,
|
||||
);
|
||||
|
||||
slot_guard.upsert(TenantSlot::Attached(tenant))?;
|
||||
slot_guard.upsert(TenantSlot::Attached(tenant.clone()))?;
|
||||
|
||||
Ok(reparented)
|
||||
tenant
|
||||
.ongoing_timeline_detach
|
||||
.complete(attempt, &tenant)
|
||||
.await?;
|
||||
|
||||
Ok(resp)
|
||||
}
|
||||
|
||||
/// A page service client sends a TenantId, and to look up the correct Tenant we must
|
||||
|
||||
@@ -687,8 +687,6 @@ impl RemoteTimelineClient {
|
||||
self: &Arc<Self>,
|
||||
new_parent: &TimelineId,
|
||||
) -> anyhow::Result<()> {
|
||||
// FIXME: because of how Timeline::schedule_uploads works when called from layer flushing
|
||||
// and reads the in-memory part we cannot do the detaching like this
|
||||
let receiver = {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
@@ -236,7 +236,8 @@ impl Lineage {
|
||||
.is_some_and(|(_, ancestor_lsn, _)| ancestor_lsn == lsn)
|
||||
}
|
||||
|
||||
pub(crate) fn is_detached_from_original_ancestor(&self) -> bool {
|
||||
/// Returns true if the timeline originally had an ancestor, and no longer has one.
|
||||
pub(crate) fn is_detached_from_ancestor(&self) -> bool {
|
||||
self.original_ancestor.is_some()
|
||||
}
|
||||
|
||||
|
||||
@@ -129,9 +129,11 @@ pub fn start_background_loops(
|
||||
let background_jobs_can_start = background_jobs_can_start.cloned();
|
||||
async move {
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
let can_start = completion::Barrier::maybe_wait(background_jobs_can_start);
|
||||
let can_start = tenant.ongoing_timeline_detach.gc_sleeping_while(can_start);
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => { return Ok(()) },
|
||||
_ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
|
||||
_ = can_start => {}
|
||||
};
|
||||
gc_loop(tenant, cancel)
|
||||
.instrument(info_span!("gc_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
|
||||
@@ -361,14 +363,13 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
if first {
|
||||
first = false;
|
||||
|
||||
if delay_by_lease_length(tenant.get_lsn_lease_length(), &cancel)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
let delays = async {
|
||||
delay_by_lease_length(tenant.get_lsn_lease_length(), &cancel).await?;
|
||||
random_init_delay(period, &cancel).await?;
|
||||
Ok::<_, Cancelled>(())
|
||||
};
|
||||
|
||||
if random_init_delay(period, &cancel).await.is_err() {
|
||||
if tenant.ongoing_timeline_detach.gc_sleeping_while(delays).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -404,8 +405,8 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
let wait_duration = Duration::from_secs_f64(wait_duration);
|
||||
|
||||
error!(
|
||||
"Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
|
||||
);
|
||||
"Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
|
||||
);
|
||||
wait_duration
|
||||
}
|
||||
}
|
||||
@@ -414,7 +415,9 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Gc);
|
||||
|
||||
// Sleep
|
||||
if tokio::time::timeout(sleep_duration, cancel.cancelled())
|
||||
let cancelled = cancel.cancelled();
|
||||
let cancelled = tenant.ongoing_timeline_detach.gc_sleeping_while(cancelled);
|
||||
if tokio::time::timeout(sleep_duration, cancelled)
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
|
||||
@@ -92,7 +92,7 @@ impl From<FlushLayerError> for Error {
|
||||
}
|
||||
|
||||
pub(crate) enum Progress {
|
||||
Prepared(completion::Completion, PreparedTimelineDetach),
|
||||
Prepared(Attempt, PreparedTimelineDetach),
|
||||
Done(AncestorDetached),
|
||||
}
|
||||
|
||||
@@ -116,6 +116,130 @@ impl Default for Options {
|
||||
}
|
||||
}
|
||||
|
||||
/// SharedState manages the pausing of background tasks (GC) for the duration of timeline detach
|
||||
/// ancestor.
|
||||
///
|
||||
/// Currently this is tracked at tenant level, but it could be moved to be on the roots
|
||||
/// of each timeline tree.
|
||||
#[derive(Default)]
|
||||
pub(crate) struct SharedState {
|
||||
inner: std::sync::Mutex<Option<(TimelineId, completion::Barrier)>>,
|
||||
}
|
||||
|
||||
impl SharedState {
|
||||
/// Notify an uninitialized shared state that an attempt to detach timeline ancestor continues
|
||||
/// from previous instance.
|
||||
pub(crate) fn continue_existing_attempt(&self, _attempt: &Attempt) {}
|
||||
|
||||
/// Only GC must be paused while a detach ancestor is ongoing. Compaction can happen, to aid
|
||||
/// with any ongoing ingestion. Compaction even after restart is ok because layers will not be
|
||||
/// removed until the detach has been persistently completed.
|
||||
pub(crate) fn attempt_blocks_gc(&self) -> bool {
|
||||
// if we have any started and not finished ancestor detaches, we must remain paused
|
||||
// and also let any trying to start operation know that we've paused.
|
||||
|
||||
// Two cases:
|
||||
// - there is an actual attempt started
|
||||
// - we have learned from indexparts that an attempt will be retried in near future
|
||||
self.inner.lock().unwrap().is_some()
|
||||
}
|
||||
|
||||
/// Sleep for the duration, while letting any ongoing ancestor_detach attempt know that gc has
|
||||
/// been paused.
|
||||
///
|
||||
/// Cancellation safe.
|
||||
pub(crate) async fn gc_sleeping_while<T, F: std::future::Future<Output = T>>(
|
||||
&self,
|
||||
fut: F,
|
||||
) -> T {
|
||||
// this needs to wrap the sleeping so that we can quickly let ancestor_detach continue
|
||||
fut.await
|
||||
}
|
||||
|
||||
/// Acquire the exclusive lock for a new detach ancestor attempt and ensure that GC task has
|
||||
/// been persistently paused via [`crate::tenant::IndexPart`], awaiting for completion.
|
||||
///
|
||||
/// Cancellation safe.
|
||||
async fn start_new_attempt(&self, detached: &Arc<Timeline>) -> Result<Attempt, Error> {
|
||||
if detached.cancel.is_cancelled() {
|
||||
return Err(Error::ShuttingDown);
|
||||
}
|
||||
|
||||
let (guard, barrier) = completion::channel();
|
||||
|
||||
{
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
if let Some((tl, other)) = guard.as_ref() {
|
||||
if !other.is_ready() {
|
||||
return Err(Error::OtherTimelineDetachOngoing(*tl));
|
||||
}
|
||||
}
|
||||
*guard = Some((detached.timeline_id, barrier));
|
||||
}
|
||||
|
||||
// FIXME: modify the index part to have a "detach-ancestor: inprogress { started_at }"
|
||||
// unsure if it should be awaited to upload yet...
|
||||
|
||||
// finally
|
||||
let gate_entered = detached.gate.enter().map_err(|_| Error::ShuttingDown)?;
|
||||
|
||||
Ok(Attempt {
|
||||
_guard: guard,
|
||||
gate_entered: Some(gate_entered),
|
||||
})
|
||||
}
|
||||
|
||||
/// Completes a previously started detach ancestor attempt. To be called *after* the operation
|
||||
/// including the tenant has been restarted. The completion is persistent, and no reparentings
|
||||
/// can be done afterwards.
|
||||
///
|
||||
/// Cancellation safe.
|
||||
pub(crate) async fn complete(
|
||||
&self,
|
||||
_attempt: Attempt,
|
||||
_tenant: &Arc<Tenant>,
|
||||
) -> Result<(), Error> {
|
||||
// do we need the tenant to actually activate...? yes we do, but that can happen via
|
||||
// background task starting, because we will similarly want to confirm that the gc has
|
||||
// paused, before we unpause it?
|
||||
//
|
||||
// assert that such and such state has been collected
|
||||
// find the timeline the attempt represents
|
||||
// using the timelines remote client, upload an index part with completion information
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Token which represents a persistent, exclusive, awaitable single attempt.
|
||||
pub(crate) struct Attempt {
|
||||
_guard: completion::Completion,
|
||||
gate_entered: Option<utils::sync::gate::GateGuard>,
|
||||
}
|
||||
|
||||
impl Attempt {
|
||||
pub(crate) fn before_shutdown(&mut self) {
|
||||
let taken = self.gate_entered.take();
|
||||
assert!(taken.is_some());
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct SharedStateBuilder {}
|
||||
|
||||
impl SharedStateBuilder {
|
||||
/// While loading, visit a timelines persistent [`crate::tenant::IndexPart`] and record if it is being
|
||||
/// detached.
|
||||
pub(crate) fn record_loading_timeline(
|
||||
&mut self,
|
||||
_timeline_id: &TimelineId,
|
||||
_index_part: &crate::tenant::IndexPart,
|
||||
) {
|
||||
}
|
||||
|
||||
/// Merge the loaded not yet deleting in-progress to the existing datastructure.
|
||||
pub(crate) fn build(self, _target: &SharedState) {}
|
||||
}
|
||||
|
||||
/// See [`Timeline::prepare_to_detach_from_ancestor`]
|
||||
pub(super) async fn prepare(
|
||||
detached: &Arc<Timeline>,
|
||||
@@ -136,7 +260,7 @@ pub(super) async fn prepare(
|
||||
// we are safe to inspect the latest uploaded, because we can only witness this after
|
||||
// restart is complete and ancestor is no more.
|
||||
let latest = accessor.latest_uploaded_index_part();
|
||||
if !latest.lineage.is_detached_from_original_ancestor() {
|
||||
if !latest.lineage.is_detached_from_ancestor() {
|
||||
return Err(NoAncestor);
|
||||
}
|
||||
}
|
||||
@@ -150,6 +274,11 @@ pub(super) async fn prepare(
|
||||
.values()
|
||||
.filter(|tl| matches!(tl.ancestor_timeline.as_ref(), Some(ancestor) if Arc::ptr_eq(ancestor, detached)))
|
||||
.map(|tl| (tl.ancestor_lsn, tl.clone()))
|
||||
// FIXME: instead of collecting, partition based on if they are still reparentable
|
||||
// if gc has been blocked, we can still reparent these timelines
|
||||
//
|
||||
// Collect to avoid lock taking order problem with Tenant::timelines and
|
||||
// Timeline::remote_client
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut any_shutdown = false;
|
||||
@@ -174,12 +303,19 @@ pub(super) async fn prepare(
|
||||
return Err(Error::ShuttingDown);
|
||||
}
|
||||
|
||||
// FIXME: reparent the reparentable (return Progress::Prepared if there were any) -- we
|
||||
// will need to acquire the lock as well..? so it would make sense that at load time we
|
||||
// would detect the in-progress ness and "soft-acquire it for us".
|
||||
// FIXME: otherwise release gc blocking if it still is there, and wait for upload
|
||||
|
||||
let mut reparented = all_direct_children;
|
||||
// why this instead of hashset? there is a reason, but I've forgotten it many times.
|
||||
//
|
||||
// maybe if this was a hashset we would not be able to distinguish some race condition.
|
||||
reparented.sort_unstable_by_key(|(lsn, tl)| (*lsn, tl.timeline_id));
|
||||
|
||||
// FIXME: add the non-reparented in to the response -- these would be the reparentable, but
|
||||
// no longer reparentable because they appeared *after* gc blocking was released.
|
||||
return Ok(Progress::Done(AncestorDetached {
|
||||
reparented_timelines: reparented
|
||||
.into_iter()
|
||||
@@ -200,22 +336,10 @@ pub(super) async fn prepare(
|
||||
return Err(TooManyAncestors);
|
||||
}
|
||||
|
||||
// before we acquire the gate, we must mark the ancestor as having a detach operation
|
||||
// ongoing which will block other concurrent detach operations so we don't get to ackward
|
||||
// situations where there would be two branches trying to reparent earlier branches.
|
||||
let (guard, barrier) = completion::channel();
|
||||
|
||||
{
|
||||
let mut guard = tenant.ongoing_timeline_detach.lock().unwrap();
|
||||
if let Some((tl, other)) = guard.as_ref() {
|
||||
if !other.is_ready() {
|
||||
return Err(OtherTimelineDetachOngoing(*tl));
|
||||
}
|
||||
}
|
||||
*guard = Some((detached.timeline_id, barrier));
|
||||
}
|
||||
|
||||
let _gate_entered = detached.gate.enter().map_err(|_| ShuttingDown)?;
|
||||
let attempt = tenant
|
||||
.ongoing_timeline_detach
|
||||
.start_new_attempt(detached)
|
||||
.await?;
|
||||
|
||||
utils::pausable_failpoint!("timeline-detach-ancestor::before_starting_after_locking_pausable");
|
||||
|
||||
@@ -304,20 +428,26 @@ pub(super) async fn prepare(
|
||||
let timeline = detached.clone();
|
||||
let ctx = ctx.detached_child(TaskKind::DetachAncestor, DownloadBehavior::Download);
|
||||
|
||||
tasks.spawn(async move {
|
||||
let _permit = limiter.acquire().await;
|
||||
let copied =
|
||||
upload_rewritten_layer(end_lsn, &layer, &timeline, &timeline.cancel, &ctx)
|
||||
.await?;
|
||||
Ok(copied)
|
||||
});
|
||||
let span = tracing::info_span!("upload_rewritten_layer", %layer);
|
||||
tasks.spawn(
|
||||
async move {
|
||||
let _permit = limiter.acquire().await;
|
||||
let copied =
|
||||
upload_rewritten_layer(end_lsn, &layer, &timeline, &timeline.cancel, &ctx)
|
||||
.await?;
|
||||
if let Some(copied) = copied.as_ref() {
|
||||
tracing::info!(%copied, "rewrote and uploaded");
|
||||
}
|
||||
Ok(copied)
|
||||
}
|
||||
.instrument(span),
|
||||
);
|
||||
}
|
||||
|
||||
while let Some(res) = tasks.join_next().await {
|
||||
match res {
|
||||
Ok(Ok(Some(copied))) => {
|
||||
wrote_any = true;
|
||||
tracing::info!(layer=%copied, "rewrote and uploaded");
|
||||
new_layers.push(copied);
|
||||
}
|
||||
Ok(Ok(None)) => {}
|
||||
@@ -378,7 +508,7 @@ pub(super) async fn prepare(
|
||||
|
||||
let prepared = PreparedTimelineDetach { layers: new_layers };
|
||||
|
||||
Ok(Progress::Prepared(guard, prepared))
|
||||
Ok(Progress::Prepared(attempt, prepared))
|
||||
}
|
||||
|
||||
fn partition_work(
|
||||
@@ -562,6 +692,9 @@ pub(super) async fn complete(
|
||||
)
|
||||
.await?;
|
||||
|
||||
// FIXME: assert that the persistent record of inprogress detach exists
|
||||
// FIXME: assert that gc is still blocked
|
||||
|
||||
let mut tasks = tokio::task::JoinSet::new();
|
||||
|
||||
// because we are now keeping the slot in progress, it is unlikely that there will be any
|
||||
@@ -647,6 +780,7 @@ pub(super) async fn complete(
|
||||
}
|
||||
|
||||
if reparenting_candidates != reparented.len() {
|
||||
// FIXME: we must return 503 kind of response
|
||||
tracing::info!("failed to reparent some candidates");
|
||||
}
|
||||
|
||||
@@ -657,5 +791,8 @@ pub(super) async fn complete(
|
||||
.map(|(_, timeline_id)| timeline_id)
|
||||
.collect();
|
||||
|
||||
// FIXME: here everything has gone peachy, the tenant will be restarted next.
|
||||
// after restart and before returning the response, the gc blocking must be undone
|
||||
|
||||
Ok(reparented)
|
||||
}
|
||||
|
||||
@@ -2858,7 +2858,8 @@ impl Service {
|
||||
Error::ApiError(StatusCode::BAD_REQUEST, msg) => {
|
||||
ApiError::BadRequest(anyhow::anyhow!("{node}: {msg}"))
|
||||
}
|
||||
// rest can be mapped
|
||||
// rest can be mapped as usual
|
||||
// FIXME: this converts some 500 to 409 which is not per openapi
|
||||
other => passthrough_api_error(&node, other),
|
||||
}
|
||||
})
|
||||
|
||||
@@ -814,9 +814,7 @@ def test_timeline_detach_ancestor_interrupted_by_deletion(
|
||||
if sharded and mode == "delete_tenant":
|
||||
# the shared/exclusive lock for tenant is blocking this:
|
||||
# timeline detach ancestor takes shared, delete tenant takes exclusive
|
||||
pytest.skip(
|
||||
"tenant deletion while timeline ancestor detach is underway is not supported yet"
|
||||
)
|
||||
pytest.skip("tenant deletion while timeline ancestor detach is underway cannot happen")
|
||||
|
||||
shard_count = 2 if sharded else 1
|
||||
|
||||
|
||||
Reference in New Issue
Block a user