mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
refactor: misc after attempt to add lock_in_reparentable
This commit is contained in:
@@ -767,6 +767,15 @@ impl RemoteTimelineClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Marks timeline detach ancestor started for this timeline if it has not been marked as
|
||||
/// started.
|
||||
///
|
||||
/// A retryable step o ftimeline detach ancestor.
|
||||
///
|
||||
/// Does not overwrite or even error if the set of reparentable timelines differes. Those can
|
||||
/// be inspected later.
|
||||
///
|
||||
/// Waits until the completion of the upload.
|
||||
pub(crate) async fn schedule_started_detach_ancestor_mark_and_wait(
|
||||
self: &Arc<Self>,
|
||||
) -> anyhow::Result<()> {
|
||||
@@ -774,18 +783,16 @@ impl RemoteTimelineClient {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
match upload_queue.dirty.ongoing_detach_ancestor {
|
||||
Some(_) if upload_queue.clean.0.ongoing_detach_ancestor.is_some() => {
|
||||
// we don't need to upload anything
|
||||
None
|
||||
}
|
||||
Some(_) => {
|
||||
// we need to wait until current uploads
|
||||
Some(self.schedule_barrier0(upload_queue))
|
||||
}
|
||||
match upload_queue.dirty.ongoing_detach_ancestor.as_ref() {
|
||||
Some(_) if upload_queue.clean.0.ongoing_detach_ancestor.is_some() => None,
|
||||
Some(_) => Some(self.schedule_barrier0(upload_queue)),
|
||||
None => {
|
||||
// at this point, the metadata must always show that there is a parent
|
||||
if upload_queue.dirty.metadata.ancestor_timeline().is_none() {
|
||||
panic!("cannot start detach ancestor if there is nothing to detach from");
|
||||
}
|
||||
upload_queue.dirty.ongoing_detach_ancestor =
|
||||
Some(chrono::Utc::now().naive_utc().into());
|
||||
Some(index::OngoingDetachAncestor::started_now());
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
Some(self.schedule_barrier0(upload_queue))
|
||||
}
|
||||
@@ -798,6 +805,12 @@ impl RemoteTimelineClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Marks timeline detach ancestor completed for this timeline if it has not been marked as
|
||||
/// such already.
|
||||
///
|
||||
/// ## Panics
|
||||
///
|
||||
/// If the timeline has not been detached from ancestor already.
|
||||
pub(crate) async fn schedule_completed_detach_ancestor_mark_and_wait(
|
||||
self: &Arc<Self>,
|
||||
) -> anyhow::Result<()> {
|
||||
|
||||
@@ -275,16 +275,16 @@ impl Lineage {
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: restructure and rename this as a generic method of gc blocking
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub(crate) struct OngoingDetachAncestor {
|
||||
first_started_at: NaiveDateTime,
|
||||
// FIXME: include last start, number of restarts?
|
||||
pub(crate) first_started_at: NaiveDateTime,
|
||||
}
|
||||
|
||||
impl From<NaiveDateTime> for OngoingDetachAncestor {
|
||||
fn from(value: NaiveDateTime) -> Self {
|
||||
impl OngoingDetachAncestor {
|
||||
pub(super) fn started_now() -> Self {
|
||||
OngoingDetachAncestor {
|
||||
first_started_at: value,
|
||||
first_started_at: chrono::Utc::now().naive_utc(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -682,7 +682,7 @@ mod tests {
|
||||
"pg_version": 14
|
||||
},
|
||||
"ongoing_detach_ancestor": {
|
||||
"first_started_at": "2024-07-19T09:00:00.123"
|
||||
"started_at": "2024-07-19T09:00:00.123"
|
||||
}
|
||||
}"#;
|
||||
|
||||
@@ -712,7 +712,9 @@ mod tests {
|
||||
).with_recalculated_checksum().unwrap(),
|
||||
deleted_at: None,
|
||||
lineage: Default::default(),
|
||||
ongoing_detach_ancestor: Some(OngoingDetachAncestor { first_started_at: parse_naive_datetime("2024-07-19T09:00:00.123000000") }),
|
||||
ongoing_detach_ancestor: Some(OngoingDetachAncestor {
|
||||
started_at: parse_naive_datetime("2024-07-19T09:00:00.123000000"),
|
||||
}),
|
||||
last_aux_file_policy: Default::default(),
|
||||
};
|
||||
|
||||
|
||||
@@ -4749,13 +4749,11 @@ impl Timeline {
|
||||
/// Second step of detach from ancestor; detaches the `self` from it's current ancestor and
|
||||
/// reparents any reparentable children of previous ancestor.
|
||||
///
|
||||
/// This method is to be called while
|
||||
/// holding the TenantManager's tenant slot, so during this method we cannot be deleted nor can
|
||||
/// any timeline be deleted. After this method returns successfully, tenant must be reloaded.
|
||||
/// This method is to be called while holding the TenantManager's tenant slot, so during this
|
||||
/// method we cannot be deleted nor can any timeline be deleted. After this method returns
|
||||
/// successfully, tenant must be reloaded.
|
||||
///
|
||||
/// Final step will be to complete after optionally resetting the tenant.
|
||||
///
|
||||
/// Pageserver receiving a SIGKILL during this operation is not supported (yet).
|
||||
pub(crate) async fn detach_from_ancestor_and_reparent(
|
||||
self: &Arc<Timeline>,
|
||||
tenant: &crate::tenant::Tenant,
|
||||
|
||||
@@ -101,6 +101,21 @@ impl From<FlushLayerError> for Error {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetActiveTenantError> for Error {
|
||||
fn from(value: GetActiveTenantError) -> Self {
|
||||
use pageserver_api::models::TenantState;
|
||||
use GetActiveTenantError::*;
|
||||
|
||||
match value {
|
||||
Cancelled | WillNotBecomeActive(TenantState::Stopping { .. }) => Error::ShuttingDown,
|
||||
WaitForActiveTimeout { .. } | NotFound(_) | Broken(_) | WillNotBecomeActive(_) => {
|
||||
// NotFound seems out-of-place
|
||||
Error::WaitToActivate(value)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) enum Progress {
|
||||
Prepared(Attempt, PreparedTimelineDetach),
|
||||
Done(AncestorDetached),
|
||||
@@ -192,23 +207,45 @@ impl SharedState {
|
||||
}
|
||||
|
||||
/// Acquire the exclusive lock for a new detach ancestor attempt and ensure that GC task has
|
||||
/// been persistently paused via [`crate::tenant::IndexPart`], awaiting for completion.
|
||||
/// been transiently paused.
|
||||
///
|
||||
/// Cancellation safe.
|
||||
async fn start_new_attempt(&self, detached: &Arc<Timeline>) -> Result<Attempt, Error> {
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
let completion = self.obtain_exclusive_permit(detached)?;
|
||||
|
||||
let gate_entered = detached.gate.enter().map_err(|_| Error::ShuttingDown)?;
|
||||
|
||||
self.wait_until_gc_is_paused(detached).await?;
|
||||
|
||||
let ready_in = started_at.elapsed();
|
||||
|
||||
tracing::info!(elapsed_ms = ready_in.as_millis(), "gc paused, gate entered");
|
||||
|
||||
Ok(Attempt {
|
||||
timeline_id: detached.timeline_id,
|
||||
_guard: completion,
|
||||
gate_entered: Some(gate_entered),
|
||||
})
|
||||
}
|
||||
|
||||
fn obtain_exclusive_permit(
|
||||
&self,
|
||||
detached: &Arc<Timeline>,
|
||||
) -> Result<completion::Completion, Error> {
|
||||
if detached.cancel.is_cancelled() {
|
||||
return Err(Error::ShuttingDown);
|
||||
}
|
||||
|
||||
let completion = {
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
let completion = guard.start_new(&detached.timeline_id)?;
|
||||
// now that we changed the contents, notify any long-sleeping gc
|
||||
self.gc_waiting.notify_one();
|
||||
completion
|
||||
};
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
let completion = guard.start_new(&detached.timeline_id)?;
|
||||
// now that we changed the contents, notify any long-sleeping gc
|
||||
self.gc_waiting.notify_one();
|
||||
Ok(completion)
|
||||
}
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
async fn wait_until_gc_is_paused(&self, detached: &Arc<Timeline>) -> Result<(), Error> {
|
||||
let mut cancelled = std::pin::pin!(detached.cancel.cancelled());
|
||||
|
||||
loop {
|
||||
@@ -221,35 +258,9 @@ impl SharedState {
|
||||
// because we check if *our* progress has been witnessed by gc.
|
||||
let g = self.inner.lock().unwrap();
|
||||
if g.is_gc_paused(&detached.timeline_id) {
|
||||
break;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// finally
|
||||
let gate_entered = detached.gate.enter().map_err(|_| Error::ShuttingDown)?;
|
||||
let synced_in = started_at.elapsed();
|
||||
|
||||
detached
|
||||
.remote_client
|
||||
.schedule_started_detach_ancestor_mark_and_wait()
|
||||
.await
|
||||
// FIXME: aaaargh
|
||||
.map_err(|_| Error::ShuttingDown)?;
|
||||
|
||||
let uploaded_in = started_at.elapsed() - synced_in;
|
||||
|
||||
// FIXME: get rid of this logging or make it a metric or two
|
||||
tracing::info!(
|
||||
sync_ms = synced_in.as_millis(),
|
||||
upload_ms = uploaded_in.as_millis(),
|
||||
"gc paused, gate entered, and uploaded"
|
||||
);
|
||||
|
||||
Ok(Attempt {
|
||||
timeline_id: detached.timeline_id,
|
||||
_guard: completion,
|
||||
gate_entered: Some(gate_entered),
|
||||
})
|
||||
}
|
||||
|
||||
/// Completes a previously started detach ancestor attempt. To be called *after* the operation
|
||||
@@ -270,12 +281,9 @@ impl SharedState {
|
||||
// find the timeline the attempt represents
|
||||
// using the timelines remote client, upload an index part with completion information
|
||||
|
||||
{
|
||||
let g = self.inner.lock().unwrap();
|
||||
self.inner.lock().unwrap().validate(&attempt);
|
||||
|
||||
// TODO: cover the case where retry completes?
|
||||
g.validate(&attempt);
|
||||
}
|
||||
// FIXME: could check more preconditions, like that the timeline has been detached?
|
||||
|
||||
let mut attempt = scopeguard::guard(attempt, |attempt| {
|
||||
// our attempt will no longer be valid, so release it
|
||||
@@ -284,8 +292,7 @@ impl SharedState {
|
||||
|
||||
tenant
|
||||
.wait_to_become_active(std::time::Duration::from_secs(9999))
|
||||
.await
|
||||
.map_err(Error::WaitToActivate)?;
|
||||
.await?;
|
||||
|
||||
// TODO: pause failpoint here to catch the situation where detached timeline is deleted...?
|
||||
// we are not yet holding the gate so it could advance to the point of removing from
|
||||
@@ -302,7 +309,7 @@ impl SharedState {
|
||||
unreachable!("unsure if there is an ordering, but perhaps this is possible?");
|
||||
};
|
||||
|
||||
// the gate being antered does not matter much, but lets be strict
|
||||
// the gate being entered does not matter much, but lets be strict
|
||||
if attempt.gate_entered.is_none() {
|
||||
let entered = timeline.gate.enter().map_err(|_| Error::ShuttingDown)?;
|
||||
attempt.gate_entered = Some(entered);
|
||||
@@ -620,21 +627,14 @@ pub(super) async fn prepare(
|
||||
|
||||
if still_in_progress {
|
||||
// gc is still blocked, we can still reparent and complete.
|
||||
//
|
||||
// this of course represents a challenge: how to *not* reparent branches which were not
|
||||
// there when we started? cannot, unfortunately, if not recorded to the ongoing_detach_ancestor.
|
||||
//
|
||||
// FIXME: if a new timeline had been created on ancestor which was reparentable between
|
||||
// the attempts, we could end up with it having different ancestry across shards. Fix
|
||||
// this by locking the parentable timelines before the operation starts, and storing
|
||||
// them in index_part.json.
|
||||
//
|
||||
// because the ancestor of detached is already set to none, we have published all
|
||||
// of the layers.
|
||||
// we are safe to reparent remaining, because they were locked in in the beginning.
|
||||
let attempt = tenant
|
||||
.ongoing_timeline_detach
|
||||
.start_new_attempt(detached)
|
||||
.await?;
|
||||
|
||||
// because the ancestor of detached is already set to none, we have published all
|
||||
// of the layers, so we are still "prepared."
|
||||
return Ok(Progress::Prepared(
|
||||
attempt,
|
||||
PreparedTimelineDetach { layers: Vec::new() },
|
||||
@@ -649,6 +649,8 @@ pub(super) async fn prepare(
|
||||
|
||||
// IDEA? add the non-reparented in to the response -- these would be the reparentable, but
|
||||
// no longer reparentable because they appeared *after* gc blocking was released.
|
||||
//
|
||||
// will not be needed once we have the locking in.
|
||||
return Ok(Progress::Done(AncestorDetached {
|
||||
reparented_timelines,
|
||||
}));
|
||||
@@ -671,6 +673,17 @@ pub(super) async fn prepare(
|
||||
.start_new_attempt(detached)
|
||||
.await?;
|
||||
|
||||
// FIXME: is the assumption that no one else is making these changes except us strong
|
||||
// enough...? need a witness in the RemoteTimelineClient api?
|
||||
//
|
||||
// if it wasn't persistently started already, mark the ancestor detach persistently started.
|
||||
detached
|
||||
.remote_client
|
||||
.schedule_started_detach_ancestor_mark_and_wait()
|
||||
.await
|
||||
// FIXME: aaaargh
|
||||
.map_err(|_| Error::ShuttingDown)?;
|
||||
|
||||
utils::pausable_failpoint!("timeline-detach-ancestor::before_starting_after_locking_pausable");
|
||||
|
||||
fail::fail_point!(
|
||||
@@ -735,7 +748,8 @@ pub(super) async fn prepare(
|
||||
};
|
||||
|
||||
// TODO: layers are already sorted by something: use that to determine how much of remote
|
||||
// copies are already done.
|
||||
// copies are already done -- gc is blocked, but a compaction could had happened on ancestor,
|
||||
// which is something to keep in mind if copy skipping is implemented.
|
||||
tracing::info!(filtered=%filtered_layers, to_rewrite = straddling_branchpoint.len(), historic=%rest_of_historic.len(), "collected layers");
|
||||
|
||||
// TODO: copying and lsn prefix copying could be done at the same time with a single fsync after
|
||||
@@ -844,25 +858,25 @@ fn reparented_direct_children(
|
||||
tenant: &Tenant,
|
||||
) -> Result<Vec<TimelineId>, Error> {
|
||||
let mut all_direct_children = tenant
|
||||
.timelines
|
||||
.lock()
|
||||
.unwrap()
|
||||
.values()
|
||||
.filter_map(|tl| {
|
||||
let is_direct_child = matches!(tl.ancestor_timeline.as_ref(), Some(ancestor) if Arc::ptr_eq(ancestor, detached));
|
||||
.timelines
|
||||
.lock()
|
||||
.unwrap()
|
||||
.values()
|
||||
.filter_map(|tl| {
|
||||
let is_direct_child = matches!(tl.ancestor_timeline.as_ref(), Some(ancestor) if Arc::ptr_eq(ancestor, detached));
|
||||
|
||||
if is_direct_child {
|
||||
Some((tl.ancestor_lsn, tl.clone()))
|
||||
} else {
|
||||
if let Some(timeline) = tl.ancestor_timeline.as_ref() {
|
||||
assert_ne!(timeline.timeline_id, detached.timeline_id);
|
||||
}
|
||||
None
|
||||
if is_direct_child {
|
||||
Some((tl.ancestor_lsn, tl.clone()))
|
||||
} else {
|
||||
if let Some(timeline) = tl.ancestor_timeline.as_ref() {
|
||||
assert_ne!(timeline.timeline_id, detached.timeline_id);
|
||||
}
|
||||
})
|
||||
// Collect to avoid lock taking order problem with Tenant::timelines and
|
||||
// Timeline::remote_client
|
||||
.collect::<Vec<_>>();
|
||||
None
|
||||
}
|
||||
})
|
||||
// Collect to avoid lock taking order problem with Tenant::timelines and
|
||||
// Timeline::remote_client
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut any_shutdown = false;
|
||||
|
||||
@@ -1097,7 +1111,7 @@ pub(super) async fn detach_and_reparent(
|
||||
Detached(Arc<Timeline>, Lsn),
|
||||
}
|
||||
|
||||
let (recorded_branchpoint, detach_is_ongoing) = {
|
||||
let (recorded_branchpoint, still_ongoing) = {
|
||||
let access = detached.remote_client.initialized_upload_queue()?;
|
||||
let latest = access.latest_uploaded_index_part();
|
||||
|
||||
@@ -1128,7 +1142,8 @@ pub(super) async fn detach_and_reparent(
|
||||
if let Some(ancestor) = existing {
|
||||
Ancestor::Detached(ancestor, ancestor_lsn)
|
||||
} else {
|
||||
let direct_children = reparented_direct_children(detached, tenant)?;
|
||||
let direct_children =
|
||||
reparented_direct_children(detached, tenant).map_err(Error::from)?;
|
||||
return Ok(DetachingAndReparenting::AlreadyDone(direct_children));
|
||||
}
|
||||
} else {
|
||||
@@ -1143,8 +1158,8 @@ pub(super) async fn detach_and_reparent(
|
||||
// if we crash after this operation, a retry will allow reparenting the remaining timelines as
|
||||
// gc is blocked.
|
||||
assert!(
|
||||
detach_is_ongoing,
|
||||
"to detach and reparent, gc must still be blocked"
|
||||
still_ongoing,
|
||||
"to detach or reparent, gc must still be blocked"
|
||||
);
|
||||
|
||||
let (ancestor, ancestor_lsn, was_detached) = match ancestor {
|
||||
|
||||
@@ -2842,6 +2842,7 @@ impl Service {
|
||||
);
|
||||
|
||||
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref());
|
||||
|
||||
client
|
||||
.timeline_detach_ancestor(tenant_shard_id, timeline_id)
|
||||
.await
|
||||
|
||||
Reference in New Issue
Block a user