diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 124427ee8a..bb697e9346 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -33,6 +33,7 @@ struct FlushOp { tx: tokio::sync::oneshot::Sender<()>, } +#[derive(Clone)] pub struct DeletionQueueClient { tx: tokio::sync::mpsc::Sender, } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index cf159160c0..f42892dd80 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -397,6 +397,7 @@ impl Tenant { &self, timeline_id: TimelineId, remote_client: Option, + deletion_queue_client: Option, remote_startup_data: Option, local_metadata: Option, ancestor: Option>, @@ -418,6 +419,7 @@ impl Tenant { up_to_date_metadata, ancestor.clone(), remote_client, + deletion_queue_client, init_order, CreateTimelineCause::Load, )?; @@ -589,6 +591,11 @@ impl Tenant { .as_ref() .ok_or_else(|| anyhow::anyhow!("cannot attach without remote storage"))?; + let deletion_queue_client = self + .deletion_queue_client + .as_ref() + .ok_or_else(|| anyhow::anyhow!("cannot attach without deletion queue enabled"))?; + let remote_timeline_ids = remote_timeline_client::list_remote_timelines( remote_storage, self.conf, @@ -659,14 +666,21 @@ impl Tenant { .expect("just put it in above"); // TODO again handle early failure - self.load_remote_timeline(timeline_id, index_part, remote_metadata, remote_client, ctx) - .await - .with_context(|| { - format!( - "failed to load remote timeline {} for tenant {}", - timeline_id, self.tenant_id - ) - })?; + self.load_remote_timeline( + timeline_id, + index_part, + remote_metadata, + remote_client, + deletion_queue_client.clone(), + ctx, + ) + .await + .with_context(|| { + format!( + "failed to load remote timeline {} for tenant {}", + timeline_id, self.tenant_id + ) + })?; } std::fs::remove_file(&marker_file) @@ -703,6 +717,7 @@ impl Tenant { index_part: IndexPart, remote_metadata: TimelineMetadata, remote_client: RemoteTimelineClient, + deletion_queue_client: DeletionQueueClient, ctx: &RequestContext, ) -> anyhow::Result<()> { span::debug_assert_current_span_has_tenant_id(); @@ -733,6 +748,7 @@ impl Tenant { self.timeline_init_and_sync( timeline_id, Some(remote_client), + Some(deletion_queue_client), Some(RemoteStartupData { index_part, remote_metadata, @@ -1219,6 +1235,7 @@ impl Tenant { timeline_id, &local_metadata, Some(remote_client), + self.deletion_queue_client.clone(), init_order, ) .await @@ -1271,6 +1288,7 @@ impl Tenant { timeline_id, &local_metadata, None, + None, init_order, ) .await @@ -1295,6 +1313,7 @@ impl Tenant { self.timeline_init_and_sync( timeline_id, remote_client, + self.deletion_queue_client.clone(), remote_startup_data, Some(local_metadata), ancestor, @@ -2156,6 +2175,7 @@ impl Tenant { new_metadata: &TimelineMetadata, ancestor: Option>, remote_client: Option, + deletion_queue: Option, init_order: Option<&InitializationOrder>, cause: CreateTimelineCause, ) -> anyhow::Result> { @@ -2185,6 +2205,7 @@ impl Tenant { self.tenant_id, Arc::clone(&self.walredo_mgr), remote_client, + deletion_queue, pg_version, initial_logical_size_can_start.cloned(), initial_logical_size_attempt.cloned().flatten(), @@ -2875,6 +2896,7 @@ impl Tenant { new_metadata, ancestor, remote_client, + self.deletion_queue_client.clone(), None, CreateTimelineCause::Load, ) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index dd16116792..c9fb2d7809 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -38,6 +38,7 @@ use std::time::{Duration, Instant, SystemTime}; use crate::context::{ AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder, }; +use crate::deletion_queue::DeletionQueueClient; use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata}; use crate::tenant::storage_layer::delta_layer::DeltaEntry; use crate::tenant::storage_layer::{ @@ -192,6 +193,9 @@ pub struct Timeline { /// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details. pub remote_client: Option>, + /// Deletion queue: a global queue, separate to the remote storage queue's + deletion_queue_client: Option>, + // What page versions do we hold in the repository? If we get a // request > last_record_lsn, we need to wait until we receive all // the WAL up to the request. The SeqWait provides functions for @@ -1375,6 +1379,7 @@ impl Timeline { tenant_id: TenantId, walredo_mgr: Arc, remote_client: Option, + deletion_queue_client: Option, pg_version: u32, initial_logical_size_can_start: Option, initial_logical_size_attempt: Option, @@ -1410,6 +1415,7 @@ impl Timeline { walreceiver: Mutex::new(None), remote_client: remote_client.map(Arc::new), + deletion_queue_client: deletion_queue_client.map(Arc::new), // initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'. last_record_lsn: SeqWait::new(RecordLsn { diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index d3d9c8a082..9b6be55e0e 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -14,6 +14,7 @@ use utils::{ use crate::{ config::PageServerConf, + deletion_queue::DeletionQueueClient, task_mgr::{self, TaskKind}, tenant::{ metadata::TimelineMetadata, @@ -407,6 +408,7 @@ impl DeleteTimelineFlow { timeline_id: TimelineId, local_metadata: &TimelineMetadata, remote_client: Option, + deletion_queue_client: Option, init_order: Option<&InitializationOrder>, ) -> anyhow::Result<()> { // Note: here we even skip populating layer map. Timeline is essentially uninitialized. @@ -417,6 +419,7 @@ impl DeleteTimelineFlow { local_metadata, None, // Ancestor is not needed for deletion. remote_client, + deletion_queue_client, init_order, // Important. We dont pass ancestor above because it can be missing. // Thus we need to skip the validation here.