pageserver: wire deletion queue through to Tenant

This commit is contained in:
John Spray
2023-08-09 14:08:12 +01:00
parent 6f9ae6bb5f
commit 3af693749d
6 changed files with 73 additions and 8 deletions

View File

@@ -389,6 +389,7 @@ fn start_pageserver(
conf,
broker_client.clone(),
remote_storage.clone(),
deletion_queue.clone(),
order,
))?;
@@ -484,6 +485,7 @@ fn start_pageserver(
http_auth,
broker_client.clone(),
remote_storage,
deletion_queue.clone(),
disk_usage_eviction_state,
)?
.build()

View File

@@ -13,6 +13,7 @@ use crate::{config::PageServerConf, tenant::storage_layer::LayerFileName};
/// not compete with the same S3 clients/connections used for higher priority uploads.
///
/// DeletionQueue is the frontend that the rest of the pageserver interacts with.
#[derive(Clone)]
pub struct DeletionQueue {
tx: tokio::sync::mpsc::Sender<QueueMessage>,
}
@@ -157,4 +158,11 @@ impl DeletionQueue {
},
)
}
/// A queue to nowhere: attempts to delete will do nothing
#[cfg(test)]
pub fn new_mock() -> Self {
let (tx, _) = tokio::sync::mpsc::channel(16384);
Self { tx }
}
}

View File

@@ -23,6 +23,7 @@ use super::models::{
TimelineCreateRequest, TimelineGcRequest, TimelineInfo,
};
use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::DeletionQueue;
use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
@@ -56,6 +57,7 @@ struct State {
auth: Option<Arc<JwtAuth>>,
allowlist_routes: Vec<Uri>,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue: DeletionQueue,
broker_client: storage_broker::BrokerClientChannel,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
}
@@ -65,6 +67,7 @@ impl State {
conf: &'static PageServerConf,
auth: Option<Arc<JwtAuth>>,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue: DeletionQueue,
broker_client: storage_broker::BrokerClientChannel,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
) -> anyhow::Result<Self> {
@@ -78,6 +81,7 @@ impl State {
allowlist_routes,
remote_storage,
broker_client,
deletion_queue,
disk_usage_eviction_state,
})
}
@@ -490,6 +494,7 @@ async fn tenant_attach_handler(
tenant_conf,
state.broker_client.clone(),
remote_storage.clone(),
&state.deletion_queue,
&ctx,
)
.instrument(info_span!("tenant_attach", %tenant_id))
@@ -553,6 +558,7 @@ async fn tenant_load_handler(
tenant_id,
state.broker_client.clone(),
state.remote_storage.clone(),
&state.deletion_queue,
&ctx,
)
.instrument(info_span!("load", %tenant_id))
@@ -878,6 +884,7 @@ async fn tenant_create_handler(
target_tenant_id,
state.broker_client.clone(),
state.remote_storage.clone(),
&state.deletion_queue,
&ctx,
)
.instrument(info_span!("tenant_create", tenant_id = %target_tenant_id))
@@ -1334,6 +1341,7 @@ pub fn make_router(
auth: Option<Arc<JwtAuth>>,
broker_client: BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue: DeletionQueue,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
) -> anyhow::Result<RouterBuilder<hyper::Body, ApiError>> {
let spec = include_bytes!("openapi_spec.yml");
@@ -1363,6 +1371,7 @@ pub fn make_router(
conf,
auth,
remote_storage,
deletion_queue,
broker_client,
disk_usage_eviction_state,
)

View File

@@ -57,6 +57,8 @@ use self::timeline::uninit::UninitializedTimeline;
use self::timeline::EvictionTaskTenantState;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::DeletionQueue;
use crate::deletion_queue::DeletionQueueClient;
use crate::import_datadir;
use crate::is_uninit_mark;
use crate::metrics::TENANT_ACTIVATION;
@@ -185,6 +187,9 @@ pub struct Tenant {
// provides access to timeline data sitting in the remote storage
remote_storage: Option<GenericRemoteStorage>,
// Access to global deletion queue for when this tenant wants to schedule a deletion
deletion_queue_client: Option<DeletionQueueClient>,
/// Cached logical sizes updated updated on each [`Tenant::gather_size_inputs`].
cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
cached_synthetic_tenant_size: Arc<AtomicU64>,
@@ -503,6 +508,7 @@ impl Tenant {
tenant_id: TenantId,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: GenericRemoteStorage,
deletion_queue: &DeletionQueue,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
// TODO dedup with spawn_load
@@ -517,6 +523,7 @@ impl Tenant {
wal_redo_manager,
tenant_id,
Some(remote_storage),
Some(deletion_queue.new_client()),
));
// Do all the hard work in the background
@@ -756,6 +763,7 @@ impl Tenant {
wal_redo_manager,
tenant_id,
None,
None,
))
}
@@ -774,6 +782,7 @@ impl Tenant {
tenant_id: TenantId,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue: &DeletionQueue,
init_order: Option<InitializationOrder>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
ctx: &RequestContext,
@@ -796,6 +805,7 @@ impl Tenant {
wal_redo_manager,
tenant_id,
remote_storage.clone(),
Some(deletion_queue.new_client()),
);
let tenant = Arc::new(tenant);
@@ -2191,7 +2201,16 @@ impl Tenant {
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
tenant_id: TenantId,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue_client: Option<DeletionQueueClient>,
) -> Tenant {
#[cfg(not(test))]
match state {
TenantState::Broken { .. } => {}
_ => {
// Non-broken tenants must be constructed with a deletion queue
assert!(deletion_queue_client.is_some());
}
}
let (state, mut rx) = watch::channel(state);
tokio::spawn(async move {
@@ -2257,6 +2276,7 @@ impl Tenant {
gc_cs: tokio::sync::Mutex::new(()),
walredo_mgr,
remote_storage,
deletion_queue_client,
state,
cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
@@ -3389,7 +3409,7 @@ pub mod harness {
pub async fn load(&self) -> (Arc<Tenant>, RequestContext) {
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
(
self.try_load(&ctx, None)
self.try_load(&ctx, None, None)
.await
.expect("failed to load test tenant"),
ctx,
@@ -3400,6 +3420,7 @@ pub mod harness {
&self,
ctx: &RequestContext,
remote_storage: Option<remote_storage::GenericRemoteStorage>,
deletion_queue: Option<&DeletionQueue>,
) -> anyhow::Result<Arc<Tenant>> {
let walredo_mgr = Arc::new(TestRedoManager);
@@ -3410,6 +3431,7 @@ pub mod harness {
walredo_mgr,
self.tenant_id,
remote_storage,
deletion_queue.map(|q| q.new_client()),
));
tenant
.load(None, ctx)
@@ -3949,7 +3971,7 @@ mod tests {
std::fs::write(metadata_path, metadata_bytes)?;
let err = harness
.try_load(&ctx, None)
.try_load(&ctx, None, None)
.await
.err()
.expect("should fail");

View File

@@ -18,6 +18,7 @@ use utils::crashsafe;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::DeletionQueue;
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::TenantConfOpt;
use crate::tenant::delete::DeleteTenantFlow;
@@ -68,6 +69,7 @@ pub async fn init_tenant_mgr(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue: DeletionQueue,
init_order: InitializationOrder,
) -> anyhow::Result<()> {
// Scan local filesystem for attached tenants
@@ -127,6 +129,7 @@ pub async fn init_tenant_mgr(
&tenant_dir_path,
broker_client.clone(),
remote_storage.clone(),
&deletion_queue,
Some(init_order.clone()),
&TENANTS,
&ctx,
@@ -164,6 +167,7 @@ pub(crate) fn schedule_local_tenant_processing(
tenant_path: &Path,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue: &DeletionQueue,
init_order: Option<InitializationOrder>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
ctx: &RequestContext,
@@ -201,7 +205,14 @@ pub(crate) fn schedule_local_tenant_processing(
let tenant = if conf.tenant_attaching_mark_file_path(&tenant_id).exists() {
info!("tenant {tenant_id} has attaching mark file, resuming its attach operation");
if let Some(remote_storage) = remote_storage {
match Tenant::spawn_attach(conf, tenant_id, broker_client, remote_storage, ctx) {
match Tenant::spawn_attach(
conf,
tenant_id,
broker_client,
remote_storage,
deletion_queue,
ctx,
) {
Ok(tenant) => tenant,
Err(e) => {
error!("Failed to spawn_attach tenant {tenant_id}, reason: {e:#}");
@@ -224,6 +235,7 @@ pub(crate) fn schedule_local_tenant_processing(
tenant_id,
broker_client,
remote_storage,
deletion_queue,
init_order,
tenants,
ctx,
@@ -352,6 +364,7 @@ pub async fn create_tenant(
tenant_id: TenantId,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue: &DeletionQueue,
ctx: &RequestContext,
) -> Result<Arc<Tenant>, TenantMapInsertError> {
tenant_map_insert(tenant_id, || {
@@ -363,7 +376,7 @@ pub async fn create_tenant(
// See https://github.com/neondatabase/neon/issues/4233
let created_tenant =
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, None, &TENANTS, ctx)?;
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, deletion_queue, None, &TENANTS, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
@@ -512,6 +525,7 @@ pub async fn load_tenant(
tenant_id: TenantId,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue: &DeletionQueue,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
tenant_map_insert(tenant_id, || {
@@ -522,7 +536,7 @@ pub async fn load_tenant(
.with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
}
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, None, &TENANTS, ctx)
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, deletion_queue, None, &TENANTS, ctx)
.with_context(|| {
format!("Failed to schedule tenant processing in path {tenant_path:?}")
})?;
@@ -589,6 +603,7 @@ pub async fn attach_tenant(
tenant_conf: TenantConfOpt,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: GenericRemoteStorage,
deletion_queue: &DeletionQueue,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
// Temporary solution, proper one would be to resume deletion, but that needs more plumbing around Tenant::load/Tenant::attach
@@ -609,7 +624,7 @@ pub async fn attach_tenant(
.context("check for attach marker file existence")?;
anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), None, &TENANTS, ctx)?;
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), deletion_queue, None, &TENANTS, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233

View File

@@ -4882,6 +4882,7 @@ mod tests {
use utils::{id::TimelineId, lsn::Lsn};
use crate::deletion_queue::DeletionQueue;
use crate::tenant::{harness::TenantHarness, storage_layer::PersistentLayer};
use super::{EvictionError, Timeline};
@@ -4904,9 +4905,13 @@ mod tests {
};
GenericRemoteStorage::from_config(&config).unwrap()
};
let deletion_queue = DeletionQueue::new_mock();
let ctx = any_context();
let tenant = harness.try_load(&ctx, Some(remote_storage)).await.unwrap();
let tenant = harness
.try_load(&ctx, Some(remote_storage), Some(&deletion_queue))
.await
.unwrap();
let timeline = tenant
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
.await
@@ -4969,9 +4974,13 @@ mod tests {
};
GenericRemoteStorage::from_config(&config).unwrap()
};
let deletion_queue = DeletionQueue::new_mock();
let ctx = any_context();
let tenant = harness.try_load(&ctx, Some(remote_storage)).await.unwrap();
let tenant = harness
.try_load(&ctx, Some(remote_storage), Some(&deletion_queue))
.await
.unwrap();
let timeline = tenant
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
.await