From 37f49722917cda5497d99d6b770b4367bd864124 Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 4 Sep 2023 16:15:18 +0100 Subject: [PATCH] pageserver: cut over to using deletion queue --- pageserver/src/tenant.rs | 11 +- .../src/tenant/remote_timeline_client.rs | 156 +++++++----------- .../tenant/remote_timeline_client/delete.rs | 34 ---- pageserver/src/tenant/timeline.rs | 6 +- pageserver/src/tenant/upload_queue.rs | 21 +-- 5 files changed, 81 insertions(+), 147 deletions(-) delete mode 100644 pageserver/src/tenant/remote_timeline_client/delete.rs diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 0fe57118c8..c04d9395e5 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -671,6 +671,7 @@ impl Tenant { for timeline_id in remote_timeline_ids { let client = RemoteTimelineClient::new( remote_storage.clone(), + self.deletion_queue_client.clone(), self.conf, self.tenant_id, timeline_id, @@ -2259,6 +2260,9 @@ impl Tenant { Ok(timeline) } + // Allow too_many_arguments because a constructor's argument list naturally grows with the + // number of attributes in the struct: breaking these out into a builder wouldn't be helpful. + #[allow(clippy::too_many_arguments)] fn new( state: TenantState, conf: &'static PageServerConf, @@ -2875,6 +2879,7 @@ impl Tenant { let remote_client = if let Some(remote_storage) = self.remote_storage.as_ref() { let remote_client = RemoteTimelineClient::new( remote_storage.clone(), + self.deletion_queue_client.clone(), self.conf, self.tenant_id, timeline_id, @@ -3344,6 +3349,7 @@ pub mod harness { use utils::logging; use utils::lsn::Lsn; + use crate::deletion_queue::mock::MockDeletionQueue; use crate::{ config::PageServerConf, repository::Key, @@ -3405,6 +3411,7 @@ pub mod harness { pub generation: Generation, pub remote_storage: GenericRemoteStorage, pub remote_fs_dir: PathBuf, + pub deletion_queue: MockDeletionQueue, } static LOG_HANDLE: OnceCell<()> = OnceCell::new(); @@ -3453,6 +3460,7 @@ pub mod harness { storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()), }; let remote_storage = GenericRemoteStorage::from_config(&config).unwrap(); + let deletion_queue = MockDeletionQueue::new(Some(remote_storage.clone())); Ok(Self { conf, @@ -3461,6 +3469,7 @@ pub mod harness { generation: Generation::new(0xdeadbeef), remote_storage, remote_fs_dir, + deletion_queue, }) } @@ -3485,7 +3494,7 @@ pub mod harness { self.tenant_id, self.generation, Some(self.remote_storage.clone()), - DeletionQueueClient::broken(), + self.deletion_queue.new_client(), )); tenant .load(None, ctx) diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 72ece7f6a7..0f435308af 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -200,7 +200,6 @@ //! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync //! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map -mod delete; mod download; pub mod index; mod upload; @@ -226,6 +225,7 @@ use tracing::{debug, error, info, instrument, warn}; use tracing::{info_span, Instrument}; use utils::lsn::Lsn; +use crate::deletion_queue::DeletionQueueClient; use crate::metrics::{ MeasureRemoteOp, RemoteOpFileKind, RemoteOpKind, RemoteTimelineClientMetrics, RemoteTimelineClientMetricsCallTrackSize, REMOTE_ONDEMAND_DOWNLOADED_BYTES, @@ -324,6 +324,8 @@ pub struct RemoteTimelineClient { metrics: Arc, storage_impl: GenericRemoteStorage, + + deletion_queue_client: DeletionQueueClient, } impl RemoteTimelineClient { @@ -335,6 +337,7 @@ impl RemoteTimelineClient { /// pub fn new( remote_storage: GenericRemoteStorage, + deletion_queue_client: DeletionQueueClient, conf: &'static PageServerConf, tenant_id: TenantId, timeline_id: TimelineId, @@ -352,6 +355,7 @@ impl RemoteTimelineClient { timeline_id, generation, storage_impl: remote_storage, + deletion_queue_client, upload_queue: Mutex::new(UploadQueue::Uninitialized), metrics: Arc::new(RemoteTimelineClientMetrics::new(&tenant_id, &timeline_id)), } @@ -643,7 +647,7 @@ impl RemoteTimelineClient { /// successfully. pub fn schedule_layer_file_deletion( self: &Arc, - names: &[LayerFileName], + names: Vec, ) -> anyhow::Result<()> { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; @@ -663,10 +667,10 @@ impl RemoteTimelineClient { // Decorate our list of names with each name's generation, dropping // makes that are unexpectedly missing from our metadata. let with_generations: Vec<_> = names - .iter() + .into_iter() .filter_map(|name| { // Remove from latest_files, learning the file's remote generation in the process - let meta = upload_queue.latest_files.remove(name); + let meta = upload_queue.latest_files.remove(&name); if let Some(meta) = meta { upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1; @@ -689,17 +693,12 @@ impl RemoteTimelineClient { } // schedule the actual deletions - for (name, generation) in with_generations { - let op = UploadOp::Delete(Delete { - file_kind: RemoteOpFileKind::Layer, - layer_file_name: name.clone(), - scheduled_from_timeline_delete: false, - generation, - }); - self.calls_unfinished_metric_begin(&op); - upload_queue.queued_operations.push_back(op); - info!("scheduled layer file deletion {name}"); - } + info!("scheduling {} layer file deletions", with_generations.len()); + let op = UploadOp::Delete(Delete { + layers: with_generations, + }); + self.calls_unfinished_metric_begin(&op); + upload_queue.queued_operations.push_back(op); // Launch the tasks immediately, if possible self.launch_queued_tasks(upload_queue); @@ -833,9 +832,7 @@ impl RemoteTimelineClient { pub(crate) async fn delete_all(self: &Arc) -> anyhow::Result<()> { debug_assert_current_span_has_tenant_and_timeline_id(); - let (mut receiver, deletions_queued) = { - let mut deletions_queued = 0; - + let layers: Vec = { let mut locked = self.upload_queue.lock().unwrap(); let stopped = locked.stopped_mut()?; @@ -847,42 +844,30 @@ impl RemoteTimelineClient { stopped .upload_queue_for_deletion - .queued_operations - .reserve(stopped.upload_queue_for_deletion.latest_files.len()); - - // schedule the actual deletions - for (name, meta) in &stopped.upload_queue_for_deletion.latest_files { - let op = UploadOp::Delete(Delete { - file_kind: RemoteOpFileKind::Layer, - layer_file_name: name.clone(), - scheduled_from_timeline_delete: true, - generation: meta.generation, - }); - - self.calls_unfinished_metric_begin(&op); - stopped - .upload_queue_for_deletion - .queued_operations - .push_back(op); - - info!("scheduled layer file deletion {name}"); - deletions_queued += 1; - } - - self.launch_queued_tasks(&mut stopped.upload_queue_for_deletion); - - ( - self.schedule_barrier(&mut stopped.upload_queue_for_deletion), - deletions_queued, - ) + .latest_files + .drain() + .map(|(file_name, meta)| { + remote_layer_path( + &self.tenant_id, + &self.timeline_id, + &file_name, + meta.generation, + ) + }) + .collect() }; - receiver.changed().await.context("upload queue shut down")?; + let layer_deletion_count = layers.len(); + self.deletion_queue_client.push_immediate(layers).await?; // Do not delete index part yet, it is needed for possible retry. If we remove it first // and retry will arrive to different pageserver there wont be any traces of it on remote storage let timeline_storage_path = remote_timeline_path(&self.tenant_id, &self.timeline_id); + // Execute all pending deletions, so that when we prroceed to do a list_prefixes below, we aren't + // taking the burden of listing all the layers that we already know we should delete. + self.deletion_queue_client.flush_immediate().await?; + let remaining = backoff::retry( || async { self.storage_impl @@ -910,17 +895,9 @@ impl RemoteTimelineClient { }) .collect(); + let not_referenced_count = remaining.len(); if !remaining.is_empty() { - backoff::retry( - || async { self.storage_impl.delete_objects(&remaining).await }, - |_e| false, - FAILED_UPLOAD_WARN_THRESHOLD, - FAILED_REMOTE_OP_RETRIES, - "delete_objects", - backoff::Cancel::new(shutdown_token(), || anyhow::anyhow!("Cancelled!")), - ) - .await - .context("delete_objects")?; + self.deletion_queue_client.push_immediate(remaining).await?; } fail::fail_point!("timeline-delete-before-index-delete", |_| { @@ -931,18 +908,14 @@ impl RemoteTimelineClient { let index_file_path = timeline_storage_path.join(Path::new(IndexPart::FILE_NAME)); - debug!("deleting index part"); + debug!("enqueuing index part deletion"); + self.deletion_queue_client + .push_immediate([index_file_path].to_vec()) + .await?; - backoff::retry( - || async { self.storage_impl.delete(&index_file_path).await }, - |_e| false, - FAILED_UPLOAD_WARN_THRESHOLD, - FAILED_REMOTE_OP_RETRIES, - "delete_index", - backoff::Cancel::new(shutdown_token(), || anyhow::anyhow!("Cancelled")), - ) - .await - .context("delete_index")?; + // Timeline deletion is rare and we have probably emitted a reasonably number of objects: wait + // for a flush to a persistent deletion list so that we may be sure deletion will occur. + self.deletion_queue_client.flush_immediate().await?; fail::fail_point!("timeline-delete-after-index-delete", |_| { Err(anyhow::anyhow!( @@ -950,7 +923,7 @@ impl RemoteTimelineClient { ))? }); - info!(prefix=%timeline_storage_path, referenced=deletions_queued, not_referenced=%remaining.len(), "done deleting in timeline prefix, including index_part.json"); + info!(prefix=%timeline_storage_path, referenced=layer_deletion_count, not_referenced=%not_referenced_count, "done deleting in timeline prefix, including index_part.json"); Ok(()) } @@ -1140,21 +1113,16 @@ impl RemoteTimelineClient { } res } - UploadOp::Delete(delete) => { - let path = &self - .conf - .timeline_path(&self.tenant_id, &self.timeline_id) - .join(delete.layer_file_name.file_name()); - delete::delete_layer(self.conf, &self.storage_impl, path, delete.generation) - .measure_remote_op( - self.tenant_id, - self.timeline_id, - delete.file_kind, - RemoteOpKind::Delete, - Arc::clone(&self.metrics), - ) - .await - } + UploadOp::Delete(delete) => self + .deletion_queue_client + .push_layers( + self.tenant_id, + self.timeline_id, + self.generation, + delete.layers.clone(), + ) + .await + .map_err(|e| anyhow::anyhow!(e)), UploadOp::Barrier(_) => { // unreachable. Barrier operations are handled synchronously in // launch_queued_tasks @@ -1214,14 +1182,8 @@ impl RemoteTimelineClient { let mut upload_queue_guard = self.upload_queue.lock().unwrap(); let upload_queue = match upload_queue_guard.deref_mut() { UploadQueue::Uninitialized => panic!("callers are responsible for ensuring this is only called on an initialized queue"), - UploadQueue::Stopped(stopped) => { - // Special care is needed for deletions, if it was an earlier deletion (not scheduled from deletion) - // then stop() took care of it so we just return. - // For deletions that come from delete_all we still want to maintain metrics, launch following tasks, etc. - match &task.op { - UploadOp::Delete(delete) if delete.scheduled_from_timeline_delete => Some(&mut stopped.upload_queue_for_deletion), - _ => None - } + UploadQueue::Stopped(_stopped) => { + None }, UploadQueue::Initialized(qi) => { Some(qi) } }; @@ -1278,8 +1240,8 @@ impl RemoteTimelineClient { reason: "metadata uploads are tiny", }, ), - UploadOp::Delete(delete) => ( - delete.file_kind, + UploadOp::Delete(_delete) => ( + RemoteOpFileKind::Layer, RemoteOpKind::Delete, DontTrackSize { reason: "should we track deletes? positive or negative sign?", @@ -1556,7 +1518,9 @@ mod tests { async fn new(test_name: &str) -> anyhow::Result { // Use a current-thread runtime in the test let test_name = Box::leak(Box::new(format!("remote_timeline_client__{test_name}"))); + let harness = TenantHarness::create(test_name)?; + let (tenant, ctx) = harness.load().await; let timeline = tenant @@ -1580,6 +1544,7 @@ mod tests { timeline_id: TIMELINE_ID, generation, storage_impl: self.harness.remote_storage.clone(), + deletion_queue_client: self.harness.deletion_queue.new_client(), upload_queue: Mutex::new(UploadQueue::Uninitialized), metrics: Arc::new(RemoteTimelineClientMetrics::new( &self.harness.tenant_id, @@ -1749,7 +1714,7 @@ mod tests { ) .unwrap(); client - .schedule_layer_file_deletion(&[layer_file_name_1.clone()]) + .schedule_layer_file_deletion([layer_file_name_1.clone()].to_vec()) .unwrap(); { let mut guard = client.upload_queue.lock().unwrap(); @@ -1775,6 +1740,7 @@ mod tests { // Finish them client.wait_completion().await.unwrap(); + harness.deletion_queue.pump().await; assert_remote_files( &[ diff --git a/pageserver/src/tenant/remote_timeline_client/delete.rs b/pageserver/src/tenant/remote_timeline_client/delete.rs deleted file mode 100644 index 7324559223..0000000000 --- a/pageserver/src/tenant/remote_timeline_client/delete.rs +++ /dev/null @@ -1,34 +0,0 @@ -//! Helper functions to delete files from remote storage with a RemoteStorage -use anyhow::Context; -use std::path::Path; -use tracing::debug; - -use remote_storage::GenericRemoteStorage; - -use crate::{ - config::PageServerConf, - tenant::{remote_timeline_client::remote_path, Generation}, -}; - -pub(super) async fn delete_layer<'a>( - conf: &'static PageServerConf, - storage: &'a GenericRemoteStorage, - local_layer_path: &'a Path, - generation: Generation, -) -> anyhow::Result<()> { - fail::fail_point!("before-delete-layer", |_| { - anyhow::bail!("failpoint before-delete-layer") - }); - debug!("Deleting layer from remote storage: {local_layer_path:?}",); - - let path_to_delete = remote_path(conf, local_layer_path, generation)?; - - // We don't want to print an error if the delete failed if the file has - // already been deleted. Thankfully, in this situation S3 already - // does not yield an error. While OS-provided local file system APIs do yield - // errors, we avoid them in the `LocalFs` wrapper. - storage - .delete(&path_to_delete) - .await - .with_context(|| format!("delete remote layer from storage at {path_to_delete:?}")) -} diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 12305e3ac3..2329dc25bc 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1822,7 +1822,7 @@ impl Timeline { for (layer, m) in needs_upload { rtc.schedule_layer_file_upload(&layer.layer_desc().filename(), &m)?; } - rtc.schedule_layer_file_deletion(&needs_cleanup)?; + rtc.schedule_layer_file_deletion(needs_cleanup)?; rtc.schedule_index_upload_for_file_changes()?; // Tenant::create_timeline will wait for these uploads to happen before returning, or // on retry. @@ -3864,7 +3864,7 @@ impl Timeline { // Also schedule the deletions in remote storage if let Some(remote_client) = &self.remote_client { - remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?; + remote_client.schedule_layer_file_deletion(layer_names_to_delete)?; } Ok(()) @@ -4199,7 +4199,7 @@ impl Timeline { } if let Some(remote_client) = &self.remote_client { - remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?; + remote_client.schedule_layer_file_deletion(layer_names_to_delete)?; } apply.flush(); diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index 28822335b0..cfa5e9db27 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -1,5 +1,3 @@ -use crate::metrics::RemoteOpFileKind; - use super::storage_layer::LayerFileName; use super::Generation; use crate::tenant::metadata::TimelineMetadata; @@ -201,12 +199,11 @@ pub(crate) struct UploadTask { pub(crate) op: UploadOp, } +/// A deletion of some layers within the lifetime of a timeline. This is not used +/// for timeline deletion, which skips this queue and goes directly to DeletionQueue. #[derive(Debug)] pub(crate) struct Delete { - pub(crate) file_kind: RemoteOpFileKind, - pub(crate) layer_file_name: LayerFileName, - pub(crate) scheduled_from_timeline_delete: bool, - pub(crate) generation: Generation, + pub(crate) layers: Vec<(LayerFileName, Generation)>, } #[derive(Debug)] @@ -217,7 +214,7 @@ pub(crate) enum UploadOp { /// Upload the metadata file UploadMetadata(IndexPart, Lsn), - /// Delete a layer file + /// Delete layer files Delete(Delete), /// Barrier. When the barrier operation is reached, @@ -239,13 +236,9 @@ impl std::fmt::Display for UploadOp { UploadOp::UploadMetadata(_, lsn) => { write!(f, "UploadMetadata(lsn: {})", lsn) } - UploadOp::Delete(delete) => write!( - f, - "Delete(path: {}, scheduled_from_timeline_delete: {}, gen: {:?})", - delete.layer_file_name.file_name(), - delete.scheduled_from_timeline_delete, - delete.generation - ), + UploadOp::Delete(delete) => { + write!(f, "Delete({} layers)", delete.layers.len(),) + } UploadOp::Barrier(_) => write!(f, "Barrier"), } }