mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 21:20:37 +00:00
pageserver: improve API for invoking GC (#7655)
## Problem In https://github.com/neondatabase/neon/pull/7531, I had a test flaky because the GC API endpoint fails if the tenant happens not to be active yet. ## Summary of changes While adding that wait for the tenant to be active, I noticed that this endpoint is kind of strange (spawns a TaskManager task) and has a comment `// TODO: spawning is redundant now, need to hold the gate`, so this PR cleans it up to just run the GC inline while holding a gate. The GC code is updated to avoid assuming it runs inside a task manager task. Avoiding checking the task_mgr cancellation token is safe, because our timeline shutdown always cancels Timeline::cancel.
This commit is contained in:
@@ -1715,12 +1715,7 @@ async fn timeline_gc_handler(
|
|||||||
let gc_req: TimelineGcRequest = json_request(&mut request).await?;
|
let gc_req: TimelineGcRequest = json_request(&mut request).await?;
|
||||||
|
|
||||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||||
let wait_task_done = mgr::immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &ctx)?;
|
let gc_result = mgr::immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &ctx).await?;
|
||||||
let gc_result = wait_task_done
|
|
||||||
.await
|
|
||||||
.context("wait for gc task")
|
|
||||||
.map_err(ApiError::InternalServerError)?
|
|
||||||
.map_err(ApiError::InternalServerError)?;
|
|
||||||
|
|
||||||
json_response(StatusCode::OK, gc_result)
|
json_response(StatusCode::OK, gc_result)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2800,7 +2800,7 @@ impl Tenant {
|
|||||||
// See comments in [`Tenant::branch_timeline`] for more information about why branch
|
// See comments in [`Tenant::branch_timeline`] for more information about why branch
|
||||||
// creation task can run concurrently with timeline's GC iteration.
|
// creation task can run concurrently with timeline's GC iteration.
|
||||||
for timeline in gc_timelines {
|
for timeline in gc_timelines {
|
||||||
if task_mgr::is_shutdown_requested() || cancel.is_cancelled() {
|
if cancel.is_cancelled() {
|
||||||
// We were requested to shut down. Stop and return with the progress we
|
// We were requested to shut down. Stop and return with the progress we
|
||||||
// made.
|
// made.
|
||||||
break;
|
break;
|
||||||
|
|||||||
@@ -2880,86 +2880,73 @@ use {
|
|||||||
utils::http::error::ApiError,
|
utils::http::error::ApiError,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub(crate) fn immediate_gc(
|
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))]
|
||||||
|
pub(crate) async fn immediate_gc(
|
||||||
tenant_shard_id: TenantShardId,
|
tenant_shard_id: TenantShardId,
|
||||||
timeline_id: TimelineId,
|
timeline_id: TimelineId,
|
||||||
gc_req: TimelineGcRequest,
|
gc_req: TimelineGcRequest,
|
||||||
cancel: CancellationToken,
|
cancel: CancellationToken,
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> Result<tokio::sync::oneshot::Receiver<Result<GcResult, anyhow::Error>>, ApiError> {
|
) -> Result<GcResult, ApiError> {
|
||||||
let guard = TENANTS.read().unwrap();
|
let tenant = {
|
||||||
|
let guard = TENANTS.read().unwrap();
|
||||||
let tenant = guard
|
guard
|
||||||
.get(&tenant_shard_id)
|
.get(&tenant_shard_id)
|
||||||
.cloned()
|
.cloned()
|
||||||
.with_context(|| format!("tenant {tenant_shard_id}"))
|
.with_context(|| format!("tenant {tenant_shard_id}"))
|
||||||
.map_err(|e| ApiError::NotFound(e.into()))?;
|
.map_err(|e| ApiError::NotFound(e.into()))?
|
||||||
|
};
|
||||||
|
|
||||||
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
|
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
|
||||||
// Use tenant's pitr setting
|
// Use tenant's pitr setting
|
||||||
let pitr = tenant.get_pitr_interval();
|
let pitr = tenant.get_pitr_interval();
|
||||||
|
|
||||||
|
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
|
||||||
|
|
||||||
// Run in task_mgr to avoid race with tenant_detach operation
|
// Run in task_mgr to avoid race with tenant_detach operation
|
||||||
let ctx = ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
|
let ctx: RequestContext =
|
||||||
let (task_done, wait_task_done) = tokio::sync::oneshot::channel();
|
ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
|
||||||
let span = info_span!("manual_gc", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id);
|
|
||||||
|
|
||||||
// TODO: spawning is redundant now, need to hold the gate
|
let _gate_guard = tenant.gate.enter().map_err(|_| ApiError::ShuttingDown)?;
|
||||||
task_mgr::spawn(
|
|
||||||
&tokio::runtime::Handle::current(),
|
|
||||||
TaskKind::GarbageCollector,
|
|
||||||
Some(tenant_shard_id),
|
|
||||||
Some(timeline_id),
|
|
||||||
&format!("timeline_gc_handler garbage collection run for tenant {tenant_shard_id} timeline {timeline_id}"),
|
|
||||||
false,
|
|
||||||
async move {
|
|
||||||
fail::fail_point!("immediate_gc_task_pre");
|
|
||||||
|
|
||||||
#[allow(unused_mut)]
|
fail::fail_point!("immediate_gc_task_pre");
|
||||||
let mut result = tenant
|
|
||||||
.gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
|
|
||||||
.await;
|
|
||||||
// FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
|
|
||||||
// better once the types support it.
|
|
||||||
|
|
||||||
#[cfg(feature = "testing")]
|
#[allow(unused_mut)]
|
||||||
{
|
let mut result = tenant
|
||||||
// we need to synchronize with drop completion for python tests without polling for
|
.gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
|
||||||
// log messages
|
.await;
|
||||||
if let Ok(result) = result.as_mut() {
|
// FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
|
||||||
let mut js = tokio::task::JoinSet::new();
|
// better once the types support it.
|
||||||
for layer in std::mem::take(&mut result.doomed_layers) {
|
|
||||||
js.spawn(layer.wait_drop());
|
|
||||||
}
|
|
||||||
tracing::info!(total = js.len(), "starting to wait for the gc'd layers to be dropped");
|
|
||||||
while let Some(res) = js.join_next().await {
|
|
||||||
res.expect("wait_drop should not panic");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let timeline = tenant.get_timeline(timeline_id, false).ok();
|
#[cfg(feature = "testing")]
|
||||||
let rtc = timeline.as_ref().and_then(|x| x.remote_client.as_ref());
|
{
|
||||||
|
// we need to synchronize with drop completion for python tests without polling for
|
||||||
if let Some(rtc) = rtc {
|
// log messages
|
||||||
// layer drops schedule actions on remote timeline client to actually do the
|
if let Ok(result) = result.as_mut() {
|
||||||
// deletions; don't care about the shutdown error, just exit fast
|
let mut js = tokio::task::JoinSet::new();
|
||||||
drop(rtc.wait_completion().await);
|
for layer in std::mem::take(&mut result.doomed_layers) {
|
||||||
}
|
js.spawn(layer.wait_drop());
|
||||||
}
|
}
|
||||||
|
tracing::info!(
|
||||||
match task_done.send(result) {
|
total = js.len(),
|
||||||
Ok(_) => (),
|
"starting to wait for the gc'd layers to be dropped"
|
||||||
Err(result) => error!("failed to send gc result: {result:?}"),
|
);
|
||||||
|
while let Some(res) = js.join_next().await {
|
||||||
|
res.expect("wait_drop should not panic");
|
||||||
}
|
}
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
.instrument(span)
|
|
||||||
);
|
|
||||||
|
|
||||||
// drop the guard until after we've spawned the task so that timeline shutdown will wait for the task
|
let timeline = tenant.get_timeline(timeline_id, false).ok();
|
||||||
drop(guard);
|
let rtc = timeline.as_ref().and_then(|x| x.remote_client.as_ref());
|
||||||
|
|
||||||
Ok(wait_task_done)
|
if let Some(rtc) = rtc {
|
||||||
|
// layer drops schedule actions on remote timeline client to actually do the
|
||||||
|
// deletions; don't care about the shutdown error, just exit fast
|
||||||
|
drop(rtc.wait_completion().await);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result.map_err(ApiError::InternalServerError)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
@@ -4656,11 +4656,9 @@ impl Timeline {
|
|||||||
pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
|
pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
|
||||||
// this is most likely the background tasks, but it might be the spawned task from
|
// this is most likely the background tasks, but it might be the spawned task from
|
||||||
// immediate_gc
|
// immediate_gc
|
||||||
let cancel = crate::task_mgr::shutdown_token();
|
|
||||||
let _g = tokio::select! {
|
let _g = tokio::select! {
|
||||||
guard = self.gc_lock.lock() => guard,
|
guard = self.gc_lock.lock() => guard,
|
||||||
_ = self.cancel.cancelled() => return Ok(GcResult::default()),
|
_ = self.cancel.cancelled() => return Ok(GcResult::default()),
|
||||||
_ = cancel.cancelled() => return Ok(GcResult::default()),
|
|
||||||
};
|
};
|
||||||
let timer = self.metrics.garbage_collect_histo.start_timer();
|
let timer = self.metrics.garbage_collect_histo.start_timer();
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user