mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
make reparenting operations idempotent
This commit is contained in:
@@ -285,21 +285,30 @@ impl TimelineMetadata {
|
||||
}
|
||||
|
||||
/// When reparenting, the `ancestor_lsn` does not change.
|
||||
pub fn reparent(&mut self, timeline: &TimelineId) {
|
||||
///
|
||||
/// Returns true if anything was changed.
|
||||
pub fn reparent(&mut self, timeline: &TimelineId) -> bool {
|
||||
assert!(self.body.ancestor_timeline.is_some());
|
||||
// no assertion for redoing this: it's fine, we may have to repeat this multiple times over
|
||||
let prev = self.body.ancestor_timeline;
|
||||
self.body.ancestor_timeline = Some(*timeline);
|
||||
prev.as_ref() != Some(timeline)
|
||||
}
|
||||
|
||||
pub fn detach_from_ancestor(&mut self, branchpoint: &(TimelineId, Lsn)) {
|
||||
/// Returns true if anything was changed
|
||||
pub fn detach_from_ancestor(&mut self, branchpoint: &(TimelineId, Lsn)) -> bool {
|
||||
let mut changed = false;
|
||||
if let Some(ancestor) = self.body.ancestor_timeline {
|
||||
assert_eq!(ancestor, branchpoint.0);
|
||||
changed = true;
|
||||
}
|
||||
if self.body.ancestor_lsn != Lsn(0) {
|
||||
assert_eq!(self.body.ancestor_lsn, branchpoint.1);
|
||||
changed = true;
|
||||
}
|
||||
self.body.ancestor_timeline = None;
|
||||
self.body.ancestor_lsn = Lsn(0);
|
||||
changed
|
||||
}
|
||||
|
||||
pub fn latest_gc_cutoff_lsn(&self) -> Lsn {
|
||||
|
||||
@@ -697,17 +697,31 @@ impl RemoteTimelineClient {
|
||||
));
|
||||
};
|
||||
|
||||
upload_queue.dirty.metadata.reparent(new_parent);
|
||||
upload_queue.dirty.lineage.record_previous_ancestor(&prev);
|
||||
let uploaded = &upload_queue.clean.0.metadata;
|
||||
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
if uploaded.ancestor_timeline().is_none() && !uploaded.ancestor_lsn().is_valid() {
|
||||
// nothing to do
|
||||
None
|
||||
} else {
|
||||
let mut modified = false;
|
||||
|
||||
self.schedule_barrier0(upload_queue)
|
||||
modified |= upload_queue.dirty.metadata.reparent(new_parent);
|
||||
modified |= upload_queue.dirty.lineage.record_previous_ancestor(&prev);
|
||||
|
||||
if modified {
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
} else {
|
||||
// the modifications are already being uploaded
|
||||
}
|
||||
|
||||
Some(self.schedule_barrier0(upload_queue))
|
||||
}
|
||||
};
|
||||
|
||||
Self::wait_completion0(receiver)
|
||||
.await
|
||||
.context("wait completion")
|
||||
if let Some(receiver) = receiver {
|
||||
Self::wait_completion0(receiver).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Schedules uploading a new version of `index_part.json` with the given layers added,
|
||||
@@ -723,26 +737,33 @@ impl RemoteTimelineClient {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
upload_queue.dirty.metadata.detach_from_ancestor(&adopted);
|
||||
upload_queue.dirty.lineage.record_detaching(&adopted);
|
||||
if upload_queue.clean.0.lineage.detached_previous_ancestor() == Some(adopted) {
|
||||
None
|
||||
} else {
|
||||
let mut modified = false;
|
||||
modified |= upload_queue.dirty.metadata.detach_from_ancestor(&adopted);
|
||||
modified |= upload_queue.dirty.lineage.record_detaching(&adopted);
|
||||
|
||||
for layer in layers {
|
||||
upload_queue
|
||||
.dirty
|
||||
.layer_metadata
|
||||
.insert(layer.layer_desc().layer_name(), layer.metadata());
|
||||
for layer in layers {
|
||||
let prev = upload_queue
|
||||
.dirty
|
||||
.layer_metadata
|
||||
.insert(layer.layer_desc().layer_name(), layer.metadata());
|
||||
modified |= prev.is_none();
|
||||
}
|
||||
|
||||
if modified {
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
}
|
||||
|
||||
Some(self.schedule_barrier0(upload_queue))
|
||||
}
|
||||
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
|
||||
let barrier = self.schedule_barrier0(upload_queue);
|
||||
self.launch_queued_tasks(upload_queue);
|
||||
barrier
|
||||
};
|
||||
|
||||
Self::wait_completion0(barrier)
|
||||
.await
|
||||
.context("wait completion")
|
||||
if let Some(barrier) = barrier {
|
||||
Self::wait_completion0(barrier).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn schedule_started_detach_ancestor_mark_and_wait(
|
||||
@@ -771,10 +792,9 @@ impl RemoteTimelineClient {
|
||||
};
|
||||
|
||||
if let Some(barrier) = maybe_barrier {
|
||||
Self::wait_completion0(barrier).await
|
||||
} else {
|
||||
Ok(())
|
||||
Self::wait_completion0(barrier).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn schedule_completed_detach_ancestor_mark_and_wait(
|
||||
@@ -801,10 +821,9 @@ impl RemoteTimelineClient {
|
||||
};
|
||||
|
||||
if let Some(barrier) = maybe_barrier {
|
||||
Self::wait_completion0(barrier).await
|
||||
} else {
|
||||
Ok(())
|
||||
Self::wait_completion0(barrier).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Launch an upload operation in the background; the file is added to be included in next
|
||||
|
||||
@@ -210,10 +210,10 @@ fn is_false(b: &bool) -> bool {
|
||||
impl Lineage {
|
||||
const REMEMBER_AT_MOST: usize = 100;
|
||||
|
||||
pub(crate) fn record_previous_ancestor(&mut self, old_ancestor: &TimelineId) {
|
||||
pub(crate) fn record_previous_ancestor(&mut self, old_ancestor: &TimelineId) -> bool {
|
||||
if self.reparenting_history.last() == Some(old_ancestor) {
|
||||
// do not re-record it
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
let drop_oldest = self.reparenting_history.len() + 1 >= Self::REMEMBER_AT_MOST;
|
||||
@@ -223,13 +223,21 @@ impl Lineage {
|
||||
self.reparenting_history.remove(0);
|
||||
}
|
||||
self.reparenting_history.push(*old_ancestor);
|
||||
true
|
||||
}
|
||||
|
||||
pub(crate) fn record_detaching(&mut self, branchpoint: &(TimelineId, Lsn)) {
|
||||
assert!(self.original_ancestor.is_none());
|
||||
|
||||
self.original_ancestor =
|
||||
Some((branchpoint.0, branchpoint.1, chrono::Utc::now().naive_utc()));
|
||||
/// Returns true if anything changed.
|
||||
pub(crate) fn record_detaching(&mut self, branchpoint: &(TimelineId, Lsn)) -> bool {
|
||||
if let Some((id, lsn, _)) = self.original_ancestor {
|
||||
assert_eq!(id, branchpoint.0);
|
||||
assert_eq!(lsn, branchpoint.1);
|
||||
false
|
||||
} else {
|
||||
assert!(self.original_ancestor.is_none());
|
||||
self.original_ancestor =
|
||||
Some((branchpoint.0, branchpoint.1, chrono::Utc::now().naive_utc()));
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
/// The queried lsn is most likely the basebackup lsn, and this answers question "is it allowed
|
||||
|
||||
Reference in New Issue
Block a user