diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 1e2b73b929..399d6c9ffc 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -350,10 +350,14 @@ fn start_pageserver( let remote_storage = create_remote_storage_client(conf)?; // Set up deletion queue - let (deletion_queue, mut deletion_frontend, mut deletion_backend) = + let (deletion_queue, deletion_frontend, deletion_backend) = DeletionQueue::new(remote_storage.clone(), conf); - BACKGROUND_RUNTIME.spawn(async move { deletion_frontend.background().await }); - BACKGROUND_RUNTIME.spawn(async move { deletion_backend.background().await }); + if let Some(mut deletion_frontend) = deletion_frontend { + BACKGROUND_RUNTIME.spawn(async move { deletion_frontend.background().await }); + } + if let Some(mut deletion_backend) = deletion_backend { + BACKGROUND_RUNTIME.spawn(async move { deletion_backend.background().await }); + } // Up to this point no significant I/O has been done: this should have been fast. Record // duration prior to starting I/O intensive phase of startup. diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 93abcb4bc7..4ffc321856 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -12,12 +12,12 @@ use crate::{config::PageServerConf, tenant::storage_layer::LayerFileName}; // TODO: small value is just for testing, make this bigger const DELETION_LIST_TARGET_SIZE: usize = 16; -// Ordinarily, we only flush periodically, to bound the window during +// Ordinarily, we only flush to DeletionList periodically, to bound the window during // which we might leak objects from not flushing a DeletionList after // the objects are already unlinked from timeline metadata. const FLUSH_DEFAULT_DEADLINE: Duration = Duration::from_millis(10000); -// If someone is waiting for a flush, only delay a little to accumulate +// If someone is waiting for a flush to DeletionList, only delay a little to accumulate // more objects before doing the flush. const FLUSH_EXPLICIT_DEADLINE: Duration = Duration::from_millis(100); @@ -28,6 +28,8 @@ const FLUSH_EXPLICIT_DEADLINE: Duration = Duration::from_millis(100); // TODO: implement admin API hook to flush deletion queue, for use in integration tests // that would like to assert deleted objects are gone +// TODO: configurable for how long to wait before executing deletions + /// We aggregate object deletions from many tenants in one place, for several reasons: /// - Coalesce deletions into fewer DeleteObjects calls /// - Enable Tenant/Timeline lifetimes to be shorter than the time it takes @@ -167,7 +169,7 @@ impl DeletionQueueClient { } pub struct BackendQueueWorker { - remote_storage: Option, + remote_storage: GenericRemoteStorage, conf: &'static PageServerConf, rx: tokio::sync::mpsc::Receiver, @@ -185,16 +187,7 @@ pub struct BackendQueueWorker { impl BackendQueueWorker { async fn maybe_execute(&mut self) { - // TODO: refactor so that worker is just not constructed if there is no remote - let remote_storage = match &self.remote_storage { - Some(rs) => rs, - None => { - info!("No remote storage configured, deletion queue will not run"); - return; - } - }; - - match remote_storage.delete_objects(&self.accumulator).await { + match self.remote_storage.delete_objects(&self.accumulator).await { Ok(()) => { self.accumulator.clear(); self.executed_lists.append(&mut self.pending_lists); @@ -272,15 +265,7 @@ impl BackendQueueWorker { }) .collect(); - // TODO: refactor so that worker is just not constructed if there is no remote - let remote_storage = match &mut self.remote_storage { - Some(rs) => rs, - None => { - info!("No remote storage configured, deletion queue will not run"); - return; - } - }; - match remote_storage.delete_objects(&executed_keys).await { + match self.remote_storage.delete_objects(&executed_keys).await { Ok(()) => { // Retain any lists that couldn't be deleted in that request self.executed_lists = @@ -312,7 +297,7 @@ enum BackendQueueMessage { } pub struct FrontendQueueWorker { - remote_storage: Option, + remote_storage: GenericRemoteStorage, conf: &'static PageServerConf, // Incoming frontend requests to delete some keys @@ -342,14 +327,11 @@ impl FrontendQueueWorker { let key = RemotePath::new(&self.conf.remote_deletion_list_path(self.pending.sequence)) .expect("Failed to compose deletion list path"); - // We don't run this worker unless there is remote storage available. - let remote_storage = self.remote_storage.as_ref().unwrap(); - let bytes = serde_json::to_vec(&self.pending).expect("Failed to serialize deletion list"); let size = bytes.len(); let source = tokio::io::BufReader::new(std::io::Cursor::new(bytes)); - match remote_storage.upload(source, size, &key, None).await { + match self.remote_storage.upload(source, size, &key, None).await { Ok(_) => { for f in self.pending_flushes.drain(..) { f.fire(); @@ -394,6 +376,7 @@ impl FrontendQueueWorker { } }, _ = tokio::time::sleep(flush_delay) => { + self.deadline = Instant::now() + FLUSH_DEFAULT_DEADLINE; if !self.pending.objects.is_empty() { debug!("Flushing for deadline"); self.flush().await; @@ -463,35 +446,49 @@ impl DeletionQueue { } } + /// Caller may use the returned object to construct clients with new_client. + /// Caller should tokio::spawn the background() members of the two worker objects returned: + /// we don't spawn those inside new() so that the caller can use their runtime/spans of choice. + /// + /// If remote_storage is None, then the returned workers will also be None. pub fn new( remote_storage: Option, conf: &'static PageServerConf, - ) -> (Self, FrontendQueueWorker, BackendQueueWorker) { + ) -> ( + Self, + Option, + Option, + ) { let (tx, rx) = tokio::sync::mpsc::channel(16384); + let remote_storage = match remote_storage { + None => return (Self { tx }, None, None), + Some(r) => r, + }; + let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16384); ( Self { tx }, - FrontendQueueWorker { + Some(FrontendQueueWorker { // TODO: on startup, recover sequence number by listing persistent list objects, // *or* if we implement generation numbers, we may start from 0 every time pending: DeletionList::new(0xdeadbeef), - remote_storage: remote_storage.as_ref().map(|s| s.clone()), + remote_storage: remote_storage.clone(), conf, rx, tx: backend_tx, deadline: Instant::now() + FLUSH_DEFAULT_DEADLINE, pending_flushes: Vec::new(), - }, - BackendQueueWorker { + }), + Some(BackendQueueWorker { remote_storage, conf, rx: backend_rx, accumulator: Vec::new(), pending_lists: Vec::new(), executed_lists: Vec::new(), - }, + }), ) } }