pageserver: improve API for invoking GC (#7655)

## Problem

In https://github.com/neondatabase/neon/pull/7531, I had a test flaky
because the GC API endpoint fails if the tenant happens not to be active
yet.

## Summary of changes

While adding that wait for the tenant to be active, I noticed that this
endpoint is kind of strange (spawns a TaskManager task) and has a
comment `// TODO: spawning is redundant now, need to hold the gate`, so
this PR cleans it up to just run the GC inline while holding a gate.

The GC code is updated to avoid assuming it runs inside a task manager
task. Avoiding checking the task_mgr cancellation token is safe, because
our timeline shutdown always cancels Timeline::cancel.
This commit is contained in:
John Spray
2024-05-13 17:59:59 +01:00
committed by GitHub
parent 7f51764001
commit be0c73f8e7
4 changed files with 49 additions and 69 deletions

View File

@@ -1715,12 +1715,7 @@ async fn timeline_gc_handler(
let gc_req: TimelineGcRequest = json_request(&mut request).await?; let gc_req: TimelineGcRequest = json_request(&mut request).await?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let wait_task_done = mgr::immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &ctx)?; let gc_result = mgr::immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &ctx).await?;
let gc_result = wait_task_done
.await
.context("wait for gc task")
.map_err(ApiError::InternalServerError)?
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, gc_result) json_response(StatusCode::OK, gc_result)
} }

View File

@@ -2800,7 +2800,7 @@ impl Tenant {
// See comments in [`Tenant::branch_timeline`] for more information about why branch // See comments in [`Tenant::branch_timeline`] for more information about why branch
// creation task can run concurrently with timeline's GC iteration. // creation task can run concurrently with timeline's GC iteration.
for timeline in gc_timelines { for timeline in gc_timelines {
if task_mgr::is_shutdown_requested() || cancel.is_cancelled() { if cancel.is_cancelled() {
// We were requested to shut down. Stop and return with the progress we // We were requested to shut down. Stop and return with the progress we
// made. // made.
break; break;

View File

@@ -2880,86 +2880,73 @@ use {
utils::http::error::ApiError, utils::http::error::ApiError,
}; };
pub(crate) fn immediate_gc( #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))]
pub(crate) async fn immediate_gc(
tenant_shard_id: TenantShardId, tenant_shard_id: TenantShardId,
timeline_id: TimelineId, timeline_id: TimelineId,
gc_req: TimelineGcRequest, gc_req: TimelineGcRequest,
cancel: CancellationToken, cancel: CancellationToken,
ctx: &RequestContext, ctx: &RequestContext,
) -> Result<tokio::sync::oneshot::Receiver<Result<GcResult, anyhow::Error>>, ApiError> { ) -> Result<GcResult, ApiError> {
let guard = TENANTS.read().unwrap(); let tenant = {
let guard = TENANTS.read().unwrap();
let tenant = guard guard
.get(&tenant_shard_id) .get(&tenant_shard_id)
.cloned() .cloned()
.with_context(|| format!("tenant {tenant_shard_id}")) .with_context(|| format!("tenant {tenant_shard_id}"))
.map_err(|e| ApiError::NotFound(e.into()))?; .map_err(|e| ApiError::NotFound(e.into()))?
};
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon()); let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
// Use tenant's pitr setting // Use tenant's pitr setting
let pitr = tenant.get_pitr_interval(); let pitr = tenant.get_pitr_interval();
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
// Run in task_mgr to avoid race with tenant_detach operation // Run in task_mgr to avoid race with tenant_detach operation
let ctx = ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download); let ctx: RequestContext =
let (task_done, wait_task_done) = tokio::sync::oneshot::channel(); ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
let span = info_span!("manual_gc", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id);
// TODO: spawning is redundant now, need to hold the gate let _gate_guard = tenant.gate.enter().map_err(|_| ApiError::ShuttingDown)?;
task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::GarbageCollector,
Some(tenant_shard_id),
Some(timeline_id),
&format!("timeline_gc_handler garbage collection run for tenant {tenant_shard_id} timeline {timeline_id}"),
false,
async move {
fail::fail_point!("immediate_gc_task_pre");
#[allow(unused_mut)] fail::fail_point!("immediate_gc_task_pre");
let mut result = tenant
.gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
.await;
// FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
// better once the types support it.
#[cfg(feature = "testing")] #[allow(unused_mut)]
{ let mut result = tenant
// we need to synchronize with drop completion for python tests without polling for .gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
// log messages .await;
if let Ok(result) = result.as_mut() { // FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
let mut js = tokio::task::JoinSet::new(); // better once the types support it.
for layer in std::mem::take(&mut result.doomed_layers) {
js.spawn(layer.wait_drop());
}
tracing::info!(total = js.len(), "starting to wait for the gc'd layers to be dropped");
while let Some(res) = js.join_next().await {
res.expect("wait_drop should not panic");
}
}
let timeline = tenant.get_timeline(timeline_id, false).ok(); #[cfg(feature = "testing")]
let rtc = timeline.as_ref().and_then(|x| x.remote_client.as_ref()); {
// we need to synchronize with drop completion for python tests without polling for
if let Some(rtc) = rtc { // log messages
// layer drops schedule actions on remote timeline client to actually do the if let Ok(result) = result.as_mut() {
// deletions; don't care about the shutdown error, just exit fast let mut js = tokio::task::JoinSet::new();
drop(rtc.wait_completion().await); for layer in std::mem::take(&mut result.doomed_layers) {
} js.spawn(layer.wait_drop());
} }
tracing::info!(
match task_done.send(result) { total = js.len(),
Ok(_) => (), "starting to wait for the gc'd layers to be dropped"
Err(result) => error!("failed to send gc result: {result:?}"), );
while let Some(res) = js.join_next().await {
res.expect("wait_drop should not panic");
} }
Ok(())
} }
.instrument(span)
);
// drop the guard until after we've spawned the task so that timeline shutdown will wait for the task let timeline = tenant.get_timeline(timeline_id, false).ok();
drop(guard); let rtc = timeline.as_ref().and_then(|x| x.remote_client.as_ref());
Ok(wait_task_done) if let Some(rtc) = rtc {
// layer drops schedule actions on remote timeline client to actually do the
// deletions; don't care about the shutdown error, just exit fast
drop(rtc.wait_completion().await);
}
}
result.map_err(ApiError::InternalServerError)
} }
#[cfg(test)] #[cfg(test)]

View File

@@ -4656,11 +4656,9 @@ impl Timeline {
pub(super) async fn gc(&self) -> anyhow::Result<GcResult> { pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
// this is most likely the background tasks, but it might be the spawned task from // this is most likely the background tasks, but it might be the spawned task from
// immediate_gc // immediate_gc
let cancel = crate::task_mgr::shutdown_token();
let _g = tokio::select! { let _g = tokio::select! {
guard = self.gc_lock.lock() => guard, guard = self.gc_lock.lock() => guard,
_ = self.cancel.cancelled() => return Ok(GcResult::default()), _ = self.cancel.cancelled() => return Ok(GcResult::default()),
_ = cancel.cancelled() => return Ok(GcResult::default()),
}; };
let timer = self.metrics.garbage_collect_histo.start_timer(); let timer = self.metrics.garbage_collect_histo.start_timer();