refactor: make update_gc_info and transitive callers async

This is so that in the next commit, we can add a retry_get to
find_lsn_for_timestamp.
This commit is contained in:
Christian Schwarz
2022-12-09 11:20:04 -05:00
committed by Christian Schwarz
parent 1da03141a7
commit 31543c4acc
3 changed files with 88 additions and 53 deletions

View File

@@ -129,7 +129,7 @@ pub struct Tenant {
// may block for a long time `get_timeline`, `get_timelines_state`,... and other operations
// with timelines, which in turn may cause dropping replication connection, expiration of wait_for_lsn
// timeout...
gc_cs: Mutex<()>,
gc_cs: tokio::sync::Mutex<()>,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
// provides access to timeline data sitting in the remote storage
@@ -1158,7 +1158,8 @@ impl Tenant {
ancestor_timeline.wait_lsn(*lsn).await?;
}
self.branch_timeline(ancestor_timeline_id, new_timeline_id, ancestor_start_lsn)?
self.branch_timeline(ancestor_timeline_id, new_timeline_id, ancestor_start_lsn)
.await?
}
None => self.bootstrap_timeline(new_timeline_id, pg_version).await?,
};
@@ -1683,7 +1684,7 @@ impl Tenant {
conf,
tenant_conf: Arc::new(RwLock::new(tenant_conf)),
timelines: Mutex::new(HashMap::new()),
gc_cs: Mutex::new(()),
gc_cs: tokio::sync::Mutex::new(()),
walredo_mgr,
remote_storage,
state,
@@ -1834,7 +1835,9 @@ impl Tenant {
let mut totals: GcResult = Default::default();
let now = Instant::now();
let gc_timelines = self.refresh_gc_info_internal(target_timeline_id, horizon, pitr)?;
let gc_timelines = self
.refresh_gc_info_internal(target_timeline_id, horizon, pitr)
.await?;
utils::failpoint_sleep_millis_async!("gc_iteration_internal_after_getting_gc_timelines");
@@ -1869,7 +1872,7 @@ impl Tenant {
/// [`Tenant::get_gc_horizon`].
///
/// This is usually executed as part of periodic gc, but can now be triggered more often.
pub fn refresh_gc_info(&self) -> anyhow::Result<Vec<Arc<Timeline>>> {
pub async fn refresh_gc_info(&self) -> anyhow::Result<Vec<Arc<Timeline>>> {
// since this method can now be called at different rates than the configured gc loop, it
// might be that these configuration values get applied faster than what it was previously,
// since these were only read from the gc task.
@@ -1880,54 +1883,60 @@ impl Tenant {
let target_timeline_id = None;
self.refresh_gc_info_internal(target_timeline_id, horizon, pitr)
.await
}
fn refresh_gc_info_internal(
async fn refresh_gc_info_internal(
&self,
target_timeline_id: Option<TimelineId>,
horizon: u64,
pitr: Duration,
) -> anyhow::Result<Vec<Arc<Timeline>>> {
// grab mutex to prevent new timelines from being created here.
let gc_cs = self.gc_cs.lock().unwrap();
let timelines = self.timelines.lock().unwrap();
let gc_cs = self.gc_cs.lock().await;
// Scan all timelines. For each timeline, remember the timeline ID and
// the branch point where it was created.
let mut all_branchpoints: BTreeSet<(TimelineId, Lsn)> = BTreeSet::new();
let timeline_ids = {
if let Some(target_timeline_id) = target_timeline_id.as_ref() {
if timelines.get(target_timeline_id).is_none() {
bail!("gc target timeline does not exist")
}
};
let (all_branchpoints, timeline_ids): (BTreeSet<(TimelineId, Lsn)>, _) = {
let timelines = self.timelines.lock().unwrap();
let mut all_branchpoints = BTreeSet::new();
let timeline_ids = {
if let Some(target_timeline_id) = target_timeline_id.as_ref() {
if timelines.get(target_timeline_id).is_none() {
bail!("gc target timeline does not exist")
}
};
timelines
.iter()
.map(|(timeline_id, timeline_entry)| {
if let Some(ancestor_timeline_id) = &timeline_entry.get_ancestor_timeline_id() {
// If target_timeline is specified, we only need to know branchpoints of its children
if let Some(timeline_id) = target_timeline_id {
if ancestor_timeline_id == &timeline_id {
timelines
.iter()
.map(|(timeline_id, timeline_entry)| {
if let Some(ancestor_timeline_id) =
&timeline_entry.get_ancestor_timeline_id()
{
// If target_timeline is specified, we only need to know branchpoints of its children
if let Some(timeline_id) = target_timeline_id {
if ancestor_timeline_id == &timeline_id {
all_branchpoints.insert((
*ancestor_timeline_id,
timeline_entry.get_ancestor_lsn(),
));
}
}
// Collect branchpoints for all timelines
else {
all_branchpoints.insert((
*ancestor_timeline_id,
timeline_entry.get_ancestor_lsn(),
));
}
}
// Collect branchpoints for all timelines
else {
all_branchpoints
.insert((*ancestor_timeline_id, timeline_entry.get_ancestor_lsn()));
}
}
*timeline_id
})
.collect::<Vec<_>>()
*timeline_id
})
.collect::<Vec<_>>()
};
(all_branchpoints, timeline_ids)
};
drop(timelines);
// Ok, we now know all the branch points.
// Update the GC information for each timeline.
@@ -1953,7 +1962,7 @@ impl Tenant {
))
.map(|&x| x.1)
.collect();
timeline.update_gc_info(branchpoints, cutoff, pitr)?;
timeline.update_gc_info(branchpoints, cutoff, pitr).await?;
gc_timelines.push(timeline);
}
@@ -1963,7 +1972,7 @@ impl Tenant {
}
/// Branch an existing timeline
fn branch_timeline(
async fn branch_timeline(
&self,
src: TimelineId,
dst: TimelineId,
@@ -1972,10 +1981,11 @@ impl Tenant {
// We need to hold this lock to prevent GC from starting at the same time. GC scans the directory to learn
// about timelines, so otherwise a race condition is possible, where we create new timeline and GC
// concurrently removes data that is needed by the new timeline.
let _gc_cs = self.gc_cs.lock().unwrap();
let timelines = self.timelines.lock().unwrap();
let timeline_uninit_mark = self.create_timeline_uninit_mark(dst, &timelines)?;
drop(timelines);
let _gc_cs = self.gc_cs.lock().await;
let timeline_uninit_mark = {
let timelines = self.timelines.lock().unwrap();
self.create_timeline_uninit_mark(dst, &timelines)?
};
// In order for the branch creation task to not wait for GC/compaction,
// we need to make sure that the starting LSN of the child branch is not out of scope midway by
@@ -2837,7 +2847,9 @@ mod tests {
//assert_current_logical_size(&tline, Lsn(0x40));
// Branch the history, modify relation differently on the new timeline
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x30)))?;
tenant
.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x30)))
.await?;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID, true)
.expect("Should have a local timeline");
@@ -2925,7 +2937,10 @@ mod tests {
.await?;
// try to branch at lsn 25, should fail because we already garbage collected the data
match tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x25))) {
match tenant
.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x25)))
.await
{
Ok(_) => panic!("branching should have failed"),
Err(err) => {
assert!(err.to_string().contains("invalid branch start lsn"));
@@ -2950,7 +2965,10 @@ mod tests {
.create_empty_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION)?
.initialize()?;
// try to branch at lsn 0x25, should fail because initdb lsn is 0x50
match tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x25))) {
match tenant
.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x25)))
.await
{
Ok(_) => panic!("branching should have failed"),
Err(err) => {
assert!(&err.to_string().contains("invalid branch start lsn"));
@@ -2998,7 +3016,9 @@ mod tests {
.initialize()?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
tenant
.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))
.await?;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID, true)
.expect("Should have a local timeline");
@@ -3020,7 +3040,9 @@ mod tests {
.initialize()?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
tenant
.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))
.await?;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID, true)
.expect("Should have a local timeline");
@@ -3074,7 +3096,9 @@ mod tests {
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
tenant
.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))
.await?;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID, true)
@@ -3225,7 +3249,9 @@ mod tests {
let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?;
tline
.update_gc_info(Vec::new(), cutoff, Duration::ZERO)
.await?;
tline.freeze_and_flush().await?;
tline.compact().await?;
tline.gc().await?;
@@ -3296,7 +3322,9 @@ mod tests {
// Perform a cycle of flush, compact, and GC
let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?;
tline
.update_gc_info(Vec::new(), cutoff, Duration::ZERO)
.await?;
tline.freeze_and_flush().await?;
tline.compact().await?;
tline.gc().await?;
@@ -3345,7 +3373,9 @@ mod tests {
let mut tline_id = TIMELINE_ID;
for _ in 0..50 {
let new_tline_id = TimelineId::generate();
tenant.branch_timeline(tline_id, new_tline_id, Some(lsn))?;
tenant
.branch_timeline(tline_id, new_tline_id, Some(lsn))
.await?;
tline = tenant
.get_timeline(new_tline_id, true)
.expect("Should have the branched timeline");
@@ -3378,7 +3408,9 @@ mod tests {
// Perform a cycle of flush, compact, and GC
let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?;
tline
.update_gc_info(Vec::new(), cutoff, Duration::ZERO)
.await?;
tline.freeze_and_flush().await?;
tline.compact().await?;
tline.gc().await?;
@@ -3409,7 +3441,9 @@ mod tests {
#[allow(clippy::needless_range_loop)]
for idx in 0..NUM_TLINES {
let new_tline_id = TimelineId::generate();
tenant.branch_timeline(tline_id, new_tline_id, Some(lsn))?;
tenant
.branch_timeline(tline_id, new_tline_id, Some(lsn))
.await?;
tline = tenant
.get_timeline(new_tline_id, true)
.expect("Should have the branched timeline");

View File

@@ -70,6 +70,7 @@ pub(super) async fn gather_inputs(
let timelines = tenant
.refresh_gc_info()
.await
.context("Failed to refresh gc_info before gathering inputs")?;
if timelines.is_empty() {

View File

@@ -160,7 +160,7 @@ pub struct Timeline {
// List of child timelines and their branch points. This is needed to avoid
// garbage collecting data that is still needed by the child timelines.
pub gc_info: RwLock<GcInfo>,
pub gc_info: std::sync::RwLock<GcInfo>,
// It may change across major versions so for simplicity
// keep it after running initdb for a timeline.
@@ -794,7 +794,7 @@ impl Timeline {
write_lock: Mutex::new(()),
layer_removal_cs: Default::default(),
gc_info: RwLock::new(GcInfo {
gc_info: std::sync::RwLock::new(GcInfo {
retain_lsns: Vec::new(),
horizon_cutoff: Lsn(0),
pitr_cutoff: Lsn(0),
@@ -2499,7 +2499,7 @@ impl Timeline {
///
/// The 'pitr' duration is used to calculate a 'pitr_cutoff', which can be used to determine
/// whether a record is needed for PITR.
pub(super) fn update_gc_info(
pub(super) async fn update_gc_info(
&self,
retain_lsns: Vec<Lsn>,
cutoff_horizon: Lsn,