mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 14:32:57 +00:00
pageserver: refactor immediate_gc into TenantManager (#9183)
## Problem Legacy functions that were called as `mgr::` and relied on the static TENANTS, see #5796 ## Summary of changes - Move the last stray function (immediate_gc) into TenantManager Closes: https://github.com/neondatabase/neon/issues/5796
This commit is contained in:
@@ -56,6 +56,7 @@ use utils::http::endpoint::request_span;
|
||||
use utils::http::request::must_parse_query_param;
|
||||
use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
@@ -80,7 +81,6 @@ use crate::tenant::timeline::CompactionError;
|
||||
use crate::tenant::timeline::Timeline;
|
||||
use crate::tenant::GetTimelineError;
|
||||
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
|
||||
use crate::{config::PageServerConf, tenant::mgr};
|
||||
use crate::{disk_usage_eviction_task, tenant};
|
||||
use pageserver_api::models::{
|
||||
StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest, TimelineGcRequest,
|
||||
@@ -1719,8 +1719,13 @@ async fn timeline_gc_handler(
|
||||
|
||||
let gc_req: TimelineGcRequest = json_request(&mut request).await?;
|
||||
|
||||
let state = get_state(&request);
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let gc_result = mgr::immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &ctx).await?;
|
||||
let gc_result = state
|
||||
.tenant_manager
|
||||
.immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &ctx)
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::OK, gc_result)
|
||||
}
|
||||
|
||||
@@ -2197,6 +2197,82 @@ impl TenantManager {
|
||||
|
||||
Ok((wanted_bytes, shard_count as u32))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))]
|
||||
pub(crate) async fn immediate_gc(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
gc_req: TimelineGcRequest,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<GcResult, ApiError> {
|
||||
let tenant = {
|
||||
let guard = self.tenants.read().unwrap();
|
||||
guard
|
||||
.get(&tenant_shard_id)
|
||||
.cloned()
|
||||
.with_context(|| format!("tenant {tenant_shard_id}"))
|
||||
.map_err(|e| ApiError::NotFound(e.into()))?
|
||||
};
|
||||
|
||||
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
|
||||
// Use tenant's pitr setting
|
||||
let pitr = tenant.get_pitr_interval();
|
||||
|
||||
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
|
||||
|
||||
// Run in task_mgr to avoid race with tenant_detach operation
|
||||
let ctx: RequestContext =
|
||||
ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
|
||||
|
||||
let _gate_guard = tenant.gate.enter().map_err(|_| ApiError::ShuttingDown)?;
|
||||
|
||||
fail::fail_point!("immediate_gc_task_pre");
|
||||
|
||||
#[allow(unused_mut)]
|
||||
let mut result = tenant
|
||||
.gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
|
||||
.await;
|
||||
// FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
|
||||
// better once the types support it.
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
// we need to synchronize with drop completion for python tests without polling for
|
||||
// log messages
|
||||
if let Ok(result) = result.as_mut() {
|
||||
let mut js = tokio::task::JoinSet::new();
|
||||
for layer in std::mem::take(&mut result.doomed_layers) {
|
||||
js.spawn(layer.wait_drop());
|
||||
}
|
||||
tracing::info!(
|
||||
total = js.len(),
|
||||
"starting to wait for the gc'd layers to be dropped"
|
||||
);
|
||||
while let Some(res) = js.join_next().await {
|
||||
res.expect("wait_drop should not panic");
|
||||
}
|
||||
}
|
||||
|
||||
let timeline = tenant.get_timeline(timeline_id, false).ok();
|
||||
let rtc = timeline.as_ref().map(|x| &x.remote_client);
|
||||
|
||||
if let Some(rtc) = rtc {
|
||||
// layer drops schedule actions on remote timeline client to actually do the
|
||||
// deletions; don't care about the shutdown error, just exit fast
|
||||
drop(rtc.wait_completion().await);
|
||||
}
|
||||
}
|
||||
|
||||
result.map_err(|e| match e {
|
||||
GcError::TenantCancelled | GcError::TimelineCancelled => ApiError::ShuttingDown,
|
||||
GcError::TimelineNotFound => {
|
||||
ApiError::NotFound(anyhow::anyhow!("Timeline not found").into())
|
||||
}
|
||||
other => ApiError::InternalServerError(anyhow::anyhow!(other)),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@@ -2341,7 +2417,7 @@ enum TenantSlotDropError {
|
||||
/// Errors that can happen any time we are walking the tenant map to try and acquire
|
||||
/// the TenantSlot for a particular tenant.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum TenantMapError {
|
||||
pub(crate) enum TenantMapError {
|
||||
// Tried to read while initializing
|
||||
#[error("tenant map is still initializing")]
|
||||
StillInitializing,
|
||||
@@ -2371,7 +2447,7 @@ pub enum TenantMapError {
|
||||
/// The `old_value` may be dropped before the SlotGuard is dropped, by calling
|
||||
/// `drop_old_value`. It is an error to call this without shutting down
|
||||
/// the conents of `old_value`.
|
||||
pub struct SlotGuard {
|
||||
pub(crate) struct SlotGuard {
|
||||
tenant_shard_id: TenantShardId,
|
||||
old_value: Option<TenantSlot>,
|
||||
upserted: bool,
|
||||
@@ -2764,81 +2840,6 @@ use {
|
||||
utils::http::error::ApiError,
|
||||
};
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))]
|
||||
pub(crate) async fn immediate_gc(
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
gc_req: TimelineGcRequest,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<GcResult, ApiError> {
|
||||
let tenant = {
|
||||
let guard = TENANTS.read().unwrap();
|
||||
guard
|
||||
.get(&tenant_shard_id)
|
||||
.cloned()
|
||||
.with_context(|| format!("tenant {tenant_shard_id}"))
|
||||
.map_err(|e| ApiError::NotFound(e.into()))?
|
||||
};
|
||||
|
||||
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
|
||||
// Use tenant's pitr setting
|
||||
let pitr = tenant.get_pitr_interval();
|
||||
|
||||
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
|
||||
|
||||
// Run in task_mgr to avoid race with tenant_detach operation
|
||||
let ctx: RequestContext =
|
||||
ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
|
||||
|
||||
let _gate_guard = tenant.gate.enter().map_err(|_| ApiError::ShuttingDown)?;
|
||||
|
||||
fail::fail_point!("immediate_gc_task_pre");
|
||||
|
||||
#[allow(unused_mut)]
|
||||
let mut result = tenant
|
||||
.gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
|
||||
.await;
|
||||
// FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
|
||||
// better once the types support it.
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
// we need to synchronize with drop completion for python tests without polling for
|
||||
// log messages
|
||||
if let Ok(result) = result.as_mut() {
|
||||
let mut js = tokio::task::JoinSet::new();
|
||||
for layer in std::mem::take(&mut result.doomed_layers) {
|
||||
js.spawn(layer.wait_drop());
|
||||
}
|
||||
tracing::info!(
|
||||
total = js.len(),
|
||||
"starting to wait for the gc'd layers to be dropped"
|
||||
);
|
||||
while let Some(res) = js.join_next().await {
|
||||
res.expect("wait_drop should not panic");
|
||||
}
|
||||
}
|
||||
|
||||
let timeline = tenant.get_timeline(timeline_id, false).ok();
|
||||
let rtc = timeline.as_ref().map(|x| &x.remote_client);
|
||||
|
||||
if let Some(rtc) = rtc {
|
||||
// layer drops schedule actions on remote timeline client to actually do the
|
||||
// deletions; don't care about the shutdown error, just exit fast
|
||||
drop(rtc.wait_completion().await);
|
||||
}
|
||||
}
|
||||
|
||||
result.map_err(|e| match e {
|
||||
GcError::TenantCancelled | GcError::TimelineCancelled => ApiError::ShuttingDown,
|
||||
GcError::TimelineNotFound => {
|
||||
ApiError::NotFound(anyhow::anyhow!("Timeline not found").into())
|
||||
}
|
||||
other => ApiError::InternalServerError(anyhow::anyhow!(other)),
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
Reference in New Issue
Block a user