From 402d66778e782fddf8def3648adca49b1bb0bc31 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Thu, 18 Jul 2024 17:58:24 +0000 Subject: [PATCH] make reparenting operations idempotent --- pageserver/src/tenant/metadata.rs | 13 +++- .../src/tenant/remote_timeline_client.rs | 77 ++++++++++++------- .../tenant/remote_timeline_client/index.rs | 22 ++++-- 3 files changed, 74 insertions(+), 38 deletions(-) diff --git a/pageserver/src/tenant/metadata.rs b/pageserver/src/tenant/metadata.rs index 6ba1bdef9b..7b30ce388f 100644 --- a/pageserver/src/tenant/metadata.rs +++ b/pageserver/src/tenant/metadata.rs @@ -285,21 +285,30 @@ impl TimelineMetadata { } /// When reparenting, the `ancestor_lsn` does not change. - pub fn reparent(&mut self, timeline: &TimelineId) { + /// + /// Returns true if anything was changed. + pub fn reparent(&mut self, timeline: &TimelineId) -> bool { assert!(self.body.ancestor_timeline.is_some()); // no assertion for redoing this: it's fine, we may have to repeat this multiple times over + let prev = self.body.ancestor_timeline; self.body.ancestor_timeline = Some(*timeline); + prev.as_ref() != Some(timeline) } - pub fn detach_from_ancestor(&mut self, branchpoint: &(TimelineId, Lsn)) { + /// Returns true if anything was changed + pub fn detach_from_ancestor(&mut self, branchpoint: &(TimelineId, Lsn)) -> bool { + let mut changed = false; if let Some(ancestor) = self.body.ancestor_timeline { assert_eq!(ancestor, branchpoint.0); + changed = true; } if self.body.ancestor_lsn != Lsn(0) { assert_eq!(self.body.ancestor_lsn, branchpoint.1); + changed = true; } self.body.ancestor_timeline = None; self.body.ancestor_lsn = Lsn(0); + changed } pub fn latest_gc_cutoff_lsn(&self) -> Lsn { diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index bc9ff14192..f06a5ff0ab 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -697,17 +697,31 @@ impl RemoteTimelineClient { )); }; - upload_queue.dirty.metadata.reparent(new_parent); - upload_queue.dirty.lineage.record_previous_ancestor(&prev); + let uploaded = &upload_queue.clean.0.metadata; - self.schedule_index_upload(upload_queue)?; + if uploaded.ancestor_timeline().is_none() && !uploaded.ancestor_lsn().is_valid() { + // nothing to do + None + } else { + let mut modified = false; - self.schedule_barrier0(upload_queue) + modified |= upload_queue.dirty.metadata.reparent(new_parent); + modified |= upload_queue.dirty.lineage.record_previous_ancestor(&prev); + + if modified { + self.schedule_index_upload(upload_queue)?; + } else { + // the modifications are already being uploaded + } + + Some(self.schedule_barrier0(upload_queue)) + } }; - Self::wait_completion0(receiver) - .await - .context("wait completion") + if let Some(receiver) = receiver { + Self::wait_completion0(receiver).await?; + } + Ok(()) } /// Schedules uploading a new version of `index_part.json` with the given layers added, @@ -723,26 +737,33 @@ impl RemoteTimelineClient { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; - upload_queue.dirty.metadata.detach_from_ancestor(&adopted); - upload_queue.dirty.lineage.record_detaching(&adopted); + if upload_queue.clean.0.lineage.detached_previous_ancestor() == Some(adopted) { + None + } else { + let mut modified = false; + modified |= upload_queue.dirty.metadata.detach_from_ancestor(&adopted); + modified |= upload_queue.dirty.lineage.record_detaching(&adopted); - for layer in layers { - upload_queue - .dirty - .layer_metadata - .insert(layer.layer_desc().layer_name(), layer.metadata()); + for layer in layers { + let prev = upload_queue + .dirty + .layer_metadata + .insert(layer.layer_desc().layer_name(), layer.metadata()); + modified |= prev.is_none(); + } + + if modified { + self.schedule_index_upload(upload_queue)?; + } + + Some(self.schedule_barrier0(upload_queue)) } - - self.schedule_index_upload(upload_queue)?; - - let barrier = self.schedule_barrier0(upload_queue); - self.launch_queued_tasks(upload_queue); - barrier }; - Self::wait_completion0(barrier) - .await - .context("wait completion") + if let Some(barrier) = barrier { + Self::wait_completion0(barrier).await?; + } + Ok(()) } pub(crate) async fn schedule_started_detach_ancestor_mark_and_wait( @@ -771,10 +792,9 @@ impl RemoteTimelineClient { }; if let Some(barrier) = maybe_barrier { - Self::wait_completion0(barrier).await - } else { - Ok(()) + Self::wait_completion0(barrier).await?; } + Ok(()) } pub(crate) async fn schedule_completed_detach_ancestor_mark_and_wait( @@ -801,10 +821,9 @@ impl RemoteTimelineClient { }; if let Some(barrier) = maybe_barrier { - Self::wait_completion0(barrier).await - } else { - Ok(()) + Self::wait_completion0(barrier).await?; } + Ok(()) } /// Launch an upload operation in the background; the file is added to be included in next diff --git a/pageserver/src/tenant/remote_timeline_client/index.rs b/pageserver/src/tenant/remote_timeline_client/index.rs index 7c408f3c04..b63fffba37 100644 --- a/pageserver/src/tenant/remote_timeline_client/index.rs +++ b/pageserver/src/tenant/remote_timeline_client/index.rs @@ -210,10 +210,10 @@ fn is_false(b: &bool) -> bool { impl Lineage { const REMEMBER_AT_MOST: usize = 100; - pub(crate) fn record_previous_ancestor(&mut self, old_ancestor: &TimelineId) { + pub(crate) fn record_previous_ancestor(&mut self, old_ancestor: &TimelineId) -> bool { if self.reparenting_history.last() == Some(old_ancestor) { // do not re-record it - return; + return false; } let drop_oldest = self.reparenting_history.len() + 1 >= Self::REMEMBER_AT_MOST; @@ -223,13 +223,21 @@ impl Lineage { self.reparenting_history.remove(0); } self.reparenting_history.push(*old_ancestor); + true } - pub(crate) fn record_detaching(&mut self, branchpoint: &(TimelineId, Lsn)) { - assert!(self.original_ancestor.is_none()); - - self.original_ancestor = - Some((branchpoint.0, branchpoint.1, chrono::Utc::now().naive_utc())); + /// Returns true if anything changed. + pub(crate) fn record_detaching(&mut self, branchpoint: &(TimelineId, Lsn)) -> bool { + if let Some((id, lsn, _)) = self.original_ancestor { + assert_eq!(id, branchpoint.0); + assert_eq!(lsn, branchpoint.1); + false + } else { + assert!(self.original_ancestor.is_none()); + self.original_ancestor = + Some((branchpoint.0, branchpoint.1, chrono::Utc::now().naive_utc())); + true + } } /// The queried lsn is most likely the basebackup lsn, and this answers question "is it allowed