pageserver: wire deletion queue through to tenant

This commit is contained in:
John Spray
2023-09-04 14:35:57 +01:00
parent eb464d5322
commit 38b41e5c34
6 changed files with 80 additions and 52 deletions

View File

@@ -432,6 +432,7 @@ fn start_pageserver(
TenantSharedResources {
broker_client: broker_client.clone(),
remote_storage: remote_storage.clone(),
deletion_queue_client,
},
order,
shutdown_pageserver.clone(),
@@ -530,6 +531,7 @@ fn start_pageserver(
remote_storage.clone(),
broker_client.clone(),
disk_usage_eviction_state,
deletion_queue.new_client(),
)
.context("Failed to initialize router state")?,
);

View File

@@ -24,6 +24,7 @@ use super::models::{
TimelineCreateRequest, TimelineGcRequest, TimelineInfo,
};
use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::DeletionQueueClient;
use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
@@ -34,7 +35,7 @@ use crate::tenant::mgr::{
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::LayerAccessStatsReset;
use crate::tenant::timeline::Timeline;
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, TenantSharedResources};
use crate::{config::PageServerConf, tenant::mgr};
use crate::{disk_usage_eviction_task, tenant};
use utils::{
@@ -61,6 +62,7 @@ pub struct State {
remote_storage: Option<GenericRemoteStorage>,
broker_client: storage_broker::BrokerClientChannel,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
}
impl State {
@@ -70,6 +72,7 @@ impl State {
remote_storage: Option<GenericRemoteStorage>,
broker_client: storage_broker::BrokerClientChannel,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
) -> anyhow::Result<Self> {
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"]
.iter()
@@ -82,8 +85,17 @@ impl State {
remote_storage,
broker_client,
disk_usage_eviction_state,
deletion_queue_client,
})
}
fn tenant_resources(&self) -> TenantSharedResources {
TenantSharedResources {
broker_client: self.broker_client.clone(),
remote_storage: self.remote_storage.clone(),
deletion_queue_client: self.deletion_queue_client.clone(),
}
}
}
#[inline(always)]
@@ -492,24 +504,23 @@ async fn tenant_attach_handler(
let generation = get_request_generation(state, maybe_body.as_ref().and_then(|r| r.generation))?;
if let Some(remote_storage) = &state.remote_storage {
mgr::attach_tenant(
state.conf,
tenant_id,
generation,
tenant_conf,
state.broker_client.clone(),
remote_storage.clone(),
&ctx,
)
.instrument(info_span!("tenant_attach", %tenant_id))
.await?;
} else {
if state.remote_storage.is_none() {
return Err(ApiError::BadRequest(anyhow!(
"attach_tenant is not possible because pageserver was configured without remote storage"
)));
}
mgr::attach_tenant(
state.conf,
tenant_id,
generation,
tenant_conf,
state.tenant_resources(),
&ctx,
)
.instrument(info_span!("tenant_attach", %tenant_id))
.await?;
json_response(StatusCode::ACCEPTED, ())
}
@@ -570,6 +581,7 @@ async fn tenant_load_handler(
generation,
state.broker_client.clone(),
state.remote_storage.clone(),
state.deletion_queue_client.clone(),
&ctx,
)
.instrument(info_span!("load", %tenant_id))
@@ -911,8 +923,7 @@ async fn tenant_create_handler(
tenant_conf,
target_tenant_id,
generation,
state.broker_client.clone(),
state.remote_storage.clone(),
state.tenant_resources(),
&ctx,
)
.instrument(info_span!("tenant_create", tenant_id = %target_tenant_id))

View File

@@ -57,6 +57,7 @@ use self::timeline::EvictionTaskTenantState;
use self::timeline::TimelineResources;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::DeletionQueueClient;
use crate::import_datadir;
use crate::is_uninit_mark;
use crate::metrics::TENANT_ACTIVATION;
@@ -157,6 +158,7 @@ pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted";
pub struct TenantSharedResources {
pub broker_client: storage_broker::BrokerClientChannel,
pub remote_storage: Option<GenericRemoteStorage>,
pub deletion_queue_client: DeletionQueueClient,
}
///
@@ -197,6 +199,9 @@ pub struct Tenant {
// provides access to timeline data sitting in the remote storage
pub(crate) remote_storage: Option<GenericRemoteStorage>,
// Access to global deletion queue for when this tenant wants to schedule a deletion
deletion_queue_client: DeletionQueueClient,
/// Cached logical sizes updated updated on each [`Tenant::gather_size_inputs`].
cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
cached_synthetic_tenant_size: Arc<AtomicU64>,
@@ -523,15 +528,20 @@ impl Tenant {
conf: &'static PageServerConf,
tenant_id: TenantId,
generation: Generation,
broker_client: storage_broker::BrokerClientChannel,
resources: TenantSharedResources,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
remote_storage: GenericRemoteStorage,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
// TODO dedup with spawn_load
let tenant_conf =
Self::load_tenant_config(conf, &tenant_id).context("load tenant config")?;
let TenantSharedResources {
broker_client,
remote_storage,
deletion_queue_client,
} = resources;
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
let tenant = Arc::new(Tenant::new(
TenantState::Attaching,
@@ -540,7 +550,8 @@ impl Tenant {
wal_redo_manager,
tenant_id,
generation,
Some(remote_storage.clone()),
remote_storage.clone(),
deletion_queue_client,
));
// Do all the hard work in the background
@@ -571,7 +582,7 @@ impl Tenant {
let pending_deletion = {
match DeleteTenantFlow::should_resume_deletion(
conf,
Some(&remote_storage),
remote_storage.as_ref(),
&tenant_clone,
)
.await
@@ -726,6 +737,7 @@ impl Tenant {
remote_metadata,
TimelineResources {
remote_client: Some(remote_client),
deletion_queue_client: self.deletion_queue_client.clone(),
},
ctx,
)
@@ -750,6 +762,7 @@ impl Tenant {
timeline_id,
&index_part.metadata,
Some(remote_timeline_client),
self.deletion_queue_client.clone(),
None,
)
.await
@@ -851,6 +864,7 @@ impl Tenant {
tenant_id,
Generation::broken(),
None,
DeletionQueueClient::broken(),
))
}
@@ -895,6 +909,7 @@ impl Tenant {
tenant_id,
generation,
remote_storage.clone(),
resources.deletion_queue_client.clone(),
);
let tenant = Arc::new(tenant);
@@ -1302,6 +1317,7 @@ impl Tenant {
timeline_id,
&local_metadata,
Some(remote_client),
self.deletion_queue_client.clone(),
init_order,
)
.await
@@ -1351,6 +1367,7 @@ impl Tenant {
timeline_id,
&local_metadata,
None,
self.deletion_queue_client.clone(),
init_order,
)
.await
@@ -2250,6 +2267,7 @@ impl Tenant {
tenant_id: TenantId,
generation: Generation,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue_client: DeletionQueueClient,
) -> Tenant {
let (state, mut rx) = watch::channel(state);
@@ -2317,6 +2335,7 @@ impl Tenant {
gc_cs: tokio::sync::Mutex::new(()),
walredo_mgr,
remote_storage,
deletion_queue_client,
state,
cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
@@ -2866,7 +2885,10 @@ impl Tenant {
None
};
TimelineResources { remote_client }
TimelineResources {
remote_client,
deletion_queue_client: self.deletion_queue_client.clone(),
}
}
/// Creates intermediate timeline structure and its files.
@@ -3463,6 +3485,7 @@ pub mod harness {
self.tenant_id,
self.generation,
Some(self.remote_storage.clone()),
DeletionQueueClient::broken(),
));
tenant
.load(None, ctx)

View File

@@ -294,29 +294,21 @@ pub(crate) fn schedule_local_tenant_processing(
let tenant = if conf.tenant_attaching_mark_file_path(&tenant_id).exists() {
info!("tenant {tenant_id} has attaching mark file, resuming its attach operation");
if let Some(remote_storage) = resources.remote_storage {
match Tenant::spawn_attach(
conf,
tenant_id,
generation,
resources.broker_client,
tenants,
remote_storage,
ctx,
) {
Ok(tenant) => tenant,
Err(e) => {
error!("Failed to spawn_attach tenant {tenant_id}, reason: {e:#}");
Tenant::create_broken_tenant(conf, tenant_id, format!("{e:#}"))
}
}
} else {
if resources.remote_storage.is_none() {
warn!("tenant {tenant_id} has attaching mark file, but pageserver has no remote storage configured");
Tenant::create_broken_tenant(
conf,
tenant_id,
"attaching mark file present but no remote storage configured".to_string(),
)
} else {
match Tenant::spawn_attach(conf, tenant_id, generation, resources, tenants, ctx) {
Ok(tenant) => tenant,
Err(e) => {
error!("Failed to spawn_attach tenant {tenant_id}, reason: {e:#}");
Tenant::create_broken_tenant(conf, tenant_id, format!("{e:#}"))
}
}
}
} else {
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
@@ -447,8 +439,7 @@ pub async fn create_tenant(
tenant_conf: TenantConfOpt,
tenant_id: TenantId,
generation: Generation,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
resources: TenantSharedResources,
ctx: &RequestContext,
) -> Result<Arc<Tenant>, TenantMapInsertError> {
tenant_map_insert(tenant_id, || async {
@@ -459,13 +450,9 @@ pub async fn create_tenant(
// TODO: tenant directory remains on disk if we bail out from here on.
// See https://github.com/neondatabase/neon/issues/4233
let tenant_resources = TenantSharedResources {
broker_client,
remote_storage,
};
let created_tenant =
schedule_local_tenant_processing(conf, tenant_id, &tenant_directory,
generation, tenant_resources, None, &TENANTS, ctx)?;
generation, resources, None, &TENANTS, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
@@ -629,6 +616,7 @@ pub async fn load_tenant(
generation: Generation,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue_client: DeletionQueueClient,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
tenant_map_insert(tenant_id, || async {
@@ -642,6 +630,7 @@ pub async fn load_tenant(
let resources = TenantSharedResources {
broker_client,
remote_storage,
deletion_queue_client
};
let new_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_path, generation, resources, None, &TENANTS, ctx)
.with_context(|| {
@@ -709,8 +698,7 @@ pub async fn attach_tenant(
tenant_id: TenantId,
generation: Generation,
tenant_conf: TenantConfOpt,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: GenericRemoteStorage,
resources: TenantSharedResources,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
tenant_map_insert(tenant_id, || async {
@@ -725,10 +713,7 @@ pub async fn attach_tenant(
.context("check for attach marker file existence")?;
anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
let resources = TenantSharedResources {
broker_client,
remote_storage: Some(remote_storage),
};
let attached_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_dir, generation, resources, None, &TENANTS, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233

View File

@@ -38,6 +38,7 @@ use std::time::{Duration, Instant, SystemTime};
use crate::context::{
AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder,
};
use crate::deletion_queue::DeletionQueueClient;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::delta_layer::DeltaEntry;
use crate::tenant::storage_layer::{
@@ -143,6 +144,7 @@ fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
/// The outward-facing resources required to build a Timeline
pub struct TimelineResources {
pub remote_client: Option<RemoteTimelineClient>,
pub deletion_queue_client: DeletionQueueClient,
}
pub struct Timeline {

View File

@@ -14,6 +14,7 @@ use utils::{
use crate::{
config::PageServerConf,
deletion_queue::DeletionQueueClient,
task_mgr::{self, TaskKind},
tenant::{
metadata::TimelineMetadata,
@@ -407,6 +408,7 @@ impl DeleteTimelineFlow {
timeline_id: TimelineId,
local_metadata: &TimelineMetadata,
remote_client: Option<RemoteTimelineClient>,
deletion_queue_client: DeletionQueueClient,
init_order: Option<&InitializationOrder>,
) -> anyhow::Result<()> {
// Note: here we even skip populating layer map. Timeline is essentially uninitialized.
@@ -416,7 +418,10 @@ impl DeleteTimelineFlow {
timeline_id,
local_metadata,
None, // Ancestor is not needed for deletion.
TimelineResources { remote_client },
TimelineResources {
remote_client,
deletion_queue_client,
},
init_order,
// Important. We dont pass ancestor above because it can be missing.
// Thus we need to skip the validation here.