From 31543c4acc330060712036ae22651995c2b29a28 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 9 Dec 2022 11:20:04 -0500 Subject: [PATCH] refactor: make update_gc_info and transitive callers async This is so that in the next commit, we can add a retry_get to find_lsn_for_timestamp. --- pageserver/src/tenant.rs | 134 +++++++++++++++++++----------- pageserver/src/tenant/size.rs | 1 + pageserver/src/tenant/timeline.rs | 6 +- 3 files changed, 88 insertions(+), 53 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index ce05d8f085..799a34fb3b 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -129,7 +129,7 @@ pub struct Tenant { // may block for a long time `get_timeline`, `get_timelines_state`,... and other operations // with timelines, which in turn may cause dropping replication connection, expiration of wait_for_lsn // timeout... - gc_cs: Mutex<()>, + gc_cs: tokio::sync::Mutex<()>, walredo_mgr: Arc, // provides access to timeline data sitting in the remote storage @@ -1158,7 +1158,8 @@ impl Tenant { ancestor_timeline.wait_lsn(*lsn).await?; } - self.branch_timeline(ancestor_timeline_id, new_timeline_id, ancestor_start_lsn)? + self.branch_timeline(ancestor_timeline_id, new_timeline_id, ancestor_start_lsn) + .await? } None => self.bootstrap_timeline(new_timeline_id, pg_version).await?, }; @@ -1683,7 +1684,7 @@ impl Tenant { conf, tenant_conf: Arc::new(RwLock::new(tenant_conf)), timelines: Mutex::new(HashMap::new()), - gc_cs: Mutex::new(()), + gc_cs: tokio::sync::Mutex::new(()), walredo_mgr, remote_storage, state, @@ -1834,7 +1835,9 @@ impl Tenant { let mut totals: GcResult = Default::default(); let now = Instant::now(); - let gc_timelines = self.refresh_gc_info_internal(target_timeline_id, horizon, pitr)?; + let gc_timelines = self + .refresh_gc_info_internal(target_timeline_id, horizon, pitr) + .await?; utils::failpoint_sleep_millis_async!("gc_iteration_internal_after_getting_gc_timelines"); @@ -1869,7 +1872,7 @@ impl Tenant { /// [`Tenant::get_gc_horizon`]. /// /// This is usually executed as part of periodic gc, but can now be triggered more often. - pub fn refresh_gc_info(&self) -> anyhow::Result>> { + pub async fn refresh_gc_info(&self) -> anyhow::Result>> { // since this method can now be called at different rates than the configured gc loop, it // might be that these configuration values get applied faster than what it was previously, // since these were only read from the gc task. @@ -1880,54 +1883,60 @@ impl Tenant { let target_timeline_id = None; self.refresh_gc_info_internal(target_timeline_id, horizon, pitr) + .await } - fn refresh_gc_info_internal( + async fn refresh_gc_info_internal( &self, target_timeline_id: Option, horizon: u64, pitr: Duration, ) -> anyhow::Result>> { // grab mutex to prevent new timelines from being created here. - let gc_cs = self.gc_cs.lock().unwrap(); - - let timelines = self.timelines.lock().unwrap(); + let gc_cs = self.gc_cs.lock().await; // Scan all timelines. For each timeline, remember the timeline ID and // the branch point where it was created. - let mut all_branchpoints: BTreeSet<(TimelineId, Lsn)> = BTreeSet::new(); - let timeline_ids = { - if let Some(target_timeline_id) = target_timeline_id.as_ref() { - if timelines.get(target_timeline_id).is_none() { - bail!("gc target timeline does not exist") - } - }; + let (all_branchpoints, timeline_ids): (BTreeSet<(TimelineId, Lsn)>, _) = { + let timelines = self.timelines.lock().unwrap(); + let mut all_branchpoints = BTreeSet::new(); + let timeline_ids = { + if let Some(target_timeline_id) = target_timeline_id.as_ref() { + if timelines.get(target_timeline_id).is_none() { + bail!("gc target timeline does not exist") + } + }; - timelines - .iter() - .map(|(timeline_id, timeline_entry)| { - if let Some(ancestor_timeline_id) = &timeline_entry.get_ancestor_timeline_id() { - // If target_timeline is specified, we only need to know branchpoints of its children - if let Some(timeline_id) = target_timeline_id { - if ancestor_timeline_id == &timeline_id { + timelines + .iter() + .map(|(timeline_id, timeline_entry)| { + if let Some(ancestor_timeline_id) = + &timeline_entry.get_ancestor_timeline_id() + { + // If target_timeline is specified, we only need to know branchpoints of its children + if let Some(timeline_id) = target_timeline_id { + if ancestor_timeline_id == &timeline_id { + all_branchpoints.insert(( + *ancestor_timeline_id, + timeline_entry.get_ancestor_lsn(), + )); + } + } + // Collect branchpoints for all timelines + else { all_branchpoints.insert(( *ancestor_timeline_id, timeline_entry.get_ancestor_lsn(), )); } } - // Collect branchpoints for all timelines - else { - all_branchpoints - .insert((*ancestor_timeline_id, timeline_entry.get_ancestor_lsn())); - } - } - *timeline_id - }) - .collect::>() + *timeline_id + }) + .collect::>() + }; + (all_branchpoints, timeline_ids) }; - drop(timelines); // Ok, we now know all the branch points. // Update the GC information for each timeline. @@ -1953,7 +1962,7 @@ impl Tenant { )) .map(|&x| x.1) .collect(); - timeline.update_gc_info(branchpoints, cutoff, pitr)?; + timeline.update_gc_info(branchpoints, cutoff, pitr).await?; gc_timelines.push(timeline); } @@ -1963,7 +1972,7 @@ impl Tenant { } /// Branch an existing timeline - fn branch_timeline( + async fn branch_timeline( &self, src: TimelineId, dst: TimelineId, @@ -1972,10 +1981,11 @@ impl Tenant { // We need to hold this lock to prevent GC from starting at the same time. GC scans the directory to learn // about timelines, so otherwise a race condition is possible, where we create new timeline and GC // concurrently removes data that is needed by the new timeline. - let _gc_cs = self.gc_cs.lock().unwrap(); - let timelines = self.timelines.lock().unwrap(); - let timeline_uninit_mark = self.create_timeline_uninit_mark(dst, &timelines)?; - drop(timelines); + let _gc_cs = self.gc_cs.lock().await; + let timeline_uninit_mark = { + let timelines = self.timelines.lock().unwrap(); + self.create_timeline_uninit_mark(dst, &timelines)? + }; // In order for the branch creation task to not wait for GC/compaction, // we need to make sure that the starting LSN of the child branch is not out of scope midway by @@ -2837,7 +2847,9 @@ mod tests { //assert_current_logical_size(&tline, Lsn(0x40)); // Branch the history, modify relation differently on the new timeline - tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x30)))?; + tenant + .branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x30))) + .await?; let newtline = tenant .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); @@ -2925,7 +2937,10 @@ mod tests { .await?; // try to branch at lsn 25, should fail because we already garbage collected the data - match tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x25))) { + match tenant + .branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x25))) + .await + { Ok(_) => panic!("branching should have failed"), Err(err) => { assert!(err.to_string().contains("invalid branch start lsn")); @@ -2950,7 +2965,10 @@ mod tests { .create_empty_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION)? .initialize()?; // try to branch at lsn 0x25, should fail because initdb lsn is 0x50 - match tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x25))) { + match tenant + .branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x25))) + .await + { Ok(_) => panic!("branching should have failed"), Err(err) => { assert!(&err.to_string().contains("invalid branch start lsn")); @@ -2998,7 +3016,9 @@ mod tests { .initialize()?; make_some_layers(tline.as_ref(), Lsn(0x20)).await?; - tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; + tenant + .branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40))) + .await?; let newtline = tenant .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); @@ -3020,7 +3040,9 @@ mod tests { .initialize()?; make_some_layers(tline.as_ref(), Lsn(0x20)).await?; - tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; + tenant + .branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40))) + .await?; let newtline = tenant .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); @@ -3074,7 +3096,9 @@ mod tests { make_some_layers(tline.as_ref(), Lsn(0x20)).await?; - tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; + tenant + .branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40))) + .await?; let newtline = tenant .get_timeline(NEW_TIMELINE_ID, true) @@ -3225,7 +3249,9 @@ mod tests { let cutoff = tline.get_last_record_lsn(); - tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?; + tline + .update_gc_info(Vec::new(), cutoff, Duration::ZERO) + .await?; tline.freeze_and_flush().await?; tline.compact().await?; tline.gc().await?; @@ -3296,7 +3322,9 @@ mod tests { // Perform a cycle of flush, compact, and GC let cutoff = tline.get_last_record_lsn(); - tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?; + tline + .update_gc_info(Vec::new(), cutoff, Duration::ZERO) + .await?; tline.freeze_and_flush().await?; tline.compact().await?; tline.gc().await?; @@ -3345,7 +3373,9 @@ mod tests { let mut tline_id = TIMELINE_ID; for _ in 0..50 { let new_tline_id = TimelineId::generate(); - tenant.branch_timeline(tline_id, new_tline_id, Some(lsn))?; + tenant + .branch_timeline(tline_id, new_tline_id, Some(lsn)) + .await?; tline = tenant .get_timeline(new_tline_id, true) .expect("Should have the branched timeline"); @@ -3378,7 +3408,9 @@ mod tests { // Perform a cycle of flush, compact, and GC let cutoff = tline.get_last_record_lsn(); - tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?; + tline + .update_gc_info(Vec::new(), cutoff, Duration::ZERO) + .await?; tline.freeze_and_flush().await?; tline.compact().await?; tline.gc().await?; @@ -3409,7 +3441,9 @@ mod tests { #[allow(clippy::needless_range_loop)] for idx in 0..NUM_TLINES { let new_tline_id = TimelineId::generate(); - tenant.branch_timeline(tline_id, new_tline_id, Some(lsn))?; + tenant + .branch_timeline(tline_id, new_tline_id, Some(lsn)) + .await?; tline = tenant .get_timeline(new_tline_id, true) .expect("Should have the branched timeline"); diff --git a/pageserver/src/tenant/size.rs b/pageserver/src/tenant/size.rs index 597461ce29..5ce0837562 100644 --- a/pageserver/src/tenant/size.rs +++ b/pageserver/src/tenant/size.rs @@ -70,6 +70,7 @@ pub(super) async fn gather_inputs( let timelines = tenant .refresh_gc_info() + .await .context("Failed to refresh gc_info before gathering inputs")?; if timelines.is_empty() { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 59d3486644..61d619a17b 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -160,7 +160,7 @@ pub struct Timeline { // List of child timelines and their branch points. This is needed to avoid // garbage collecting data that is still needed by the child timelines. - pub gc_info: RwLock, + pub gc_info: std::sync::RwLock, // It may change across major versions so for simplicity // keep it after running initdb for a timeline. @@ -794,7 +794,7 @@ impl Timeline { write_lock: Mutex::new(()), layer_removal_cs: Default::default(), - gc_info: RwLock::new(GcInfo { + gc_info: std::sync::RwLock::new(GcInfo { retain_lsns: Vec::new(), horizon_cutoff: Lsn(0), pitr_cutoff: Lsn(0), @@ -2499,7 +2499,7 @@ impl Timeline { /// /// The 'pitr' duration is used to calculate a 'pitr_cutoff', which can be used to determine /// whether a record is needed for PITR. - pub(super) fn update_gc_info( + pub(super) async fn update_gc_info( &self, retain_lsns: Vec, cutoff_horizon: Lsn,