From 0a7735a65676737bb97440511ccd742bfdce68dd Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Sun, 1 May 2022 19:07:17 +0300 Subject: [PATCH] Rework remote storage sync queue, general refactoring --- .../src/remote_storage/storage_sync/delete.rs | 223 ++++++ pageserver/src/storage_sync.rs | 725 ++++++++++++------ pageserver/src/storage_sync/delete.rs | 227 ++++++ pageserver/src/storage_sync/download.rs | 30 +- pageserver/src/storage_sync/upload.rs | 47 +- 5 files changed, 974 insertions(+), 278 deletions(-) create mode 100644 pageserver/src/remote_storage/storage_sync/delete.rs diff --git a/pageserver/src/remote_storage/storage_sync/delete.rs b/pageserver/src/remote_storage/storage_sync/delete.rs new file mode 100644 index 0000000000..00e7c85e35 --- /dev/null +++ b/pageserver/src/remote_storage/storage_sync/delete.rs @@ -0,0 +1,223 @@ +//! Timeline synchrnonization logic to delete a bulk of timeline's remote files from the remote storage. + +use anyhow::Context; +use futures::stream::{FuturesUnordered, StreamExt}; +use tracing::{debug, error, info}; +use utils::zid::ZTenantTimelineId; + +use crate::remote_storage::{ + storage_sync::{SyncQueue, SyncTask}, + RemoteStorage, +}; + +use super::{LayersDeletion, SyncData}; + +/// Attempts to remove the timleline layers from the remote storage. +/// If the task had not adjusted the metadata before, the deletion will fail. +pub(super) async fn delete_timeline_layers<'a, P, S>( + storage: &'a S, + sync_queue: &SyncQueue, + sync_id: ZTenantTimelineId, + mut delete_data: SyncData, +) -> bool +where + P: std::fmt::Debug + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, +{ + if !delete_data.data.deletion_registered { + error!("Cannot delete timeline layers before the deletion metadata is not registered, reenqueueing"); + delete_data.retries += 1; + sync_queue.push(sync_id, SyncTask::Delete(delete_data)); + return false; + } + + if delete_data.data.layers_to_delete.is_empty() { + info!("No layers to delete, skipping"); + return true; + } + + let layers_to_delete = delete_data + .data + .layers_to_delete + .drain() + .collect::>(); + debug!("Layers to delete: {layers_to_delete:?}"); + info!("Deleting {} timeline layers", layers_to_delete.len()); + + let mut delete_tasks = layers_to_delete + .into_iter() + .map(|local_layer_path| async { + let storage_path = match storage.storage_path(&local_layer_path).with_context(|| { + format!( + "Failed to get the layer storage path for local path '{}'", + local_layer_path.display() + ) + }) { + Ok(path) => path, + Err(e) => return Err((e, local_layer_path)), + }; + + match storage.delete(&storage_path).await.with_context(|| { + format!( + "Failed to delete remote layer from storage at '{:?}'", + storage_path + ) + }) { + Ok(()) => Ok(local_layer_path), + Err(e) => Err((e, local_layer_path)), + } + }) + .collect::>(); + + let mut errored = false; + while let Some(deletion_result) = delete_tasks.next().await { + match deletion_result { + Ok(local_layer_path) => { + debug!( + "Successfully deleted layer {} for timeline {sync_id}", + local_layer_path.display() + ); + delete_data.data.deleted_layers.insert(local_layer_path); + } + Err((e, local_layer_path)) => { + errored = true; + error!( + "Failed to delete layer {} for timeline {sync_id}: {e:?}", + local_layer_path.display() + ); + delete_data.data.layers_to_delete.insert(local_layer_path); + } + } + } + + if errored { + debug!("Reenqueuing failed delete task for timeline {sync_id}"); + delete_data.retries += 1; + sync_queue.push(sync_id, SyncTask::Delete(delete_data)); + } + errored +} + +#[cfg(test)] +mod tests { + use std::{collections::HashSet, num::NonZeroUsize}; + + use itertools::Itertools; + use tempfile::tempdir; + use tokio::fs; + use utils::lsn::Lsn; + + use crate::{ + remote_storage::{ + storage_sync::test_utils::{create_local_timeline, dummy_metadata}, + LocalFs, + }, + repository::repo_harness::{RepoHarness, TIMELINE_ID}, + }; + + use super::*; + + #[tokio::test] + async fn delete_timeline_negative() -> anyhow::Result<()> { + let harness = RepoHarness::create("delete_timeline_negative")?; + let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap()); + let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); + let storage = LocalFs::new(tempdir()?.path().to_path_buf(), &harness.conf.workdir)?; + + let deleted = delete_timeline_layers( + &storage, + &sync_queue, + sync_id, + SyncData { + retries: 1, + data: LayersDeletion { + deleted_layers: HashSet::new(), + layers_to_delete: HashSet::new(), + deletion_registered: false, + }, + }, + ) + .await; + + assert!( + !deleted, + "Should not start the deletion for task with delete metadata unregistered" + ); + + Ok(()) + } + + #[tokio::test] + async fn delete_timeline() -> anyhow::Result<()> { + let harness = RepoHarness::create("delete_timeline")?; + let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap()); + + let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); + let layer_files = ["a", "b", "c", "d"]; + let storage = LocalFs::new(tempdir()?.path().to_path_buf(), &harness.conf.workdir)?; + let current_retries = 3; + let metadata = dummy_metadata(Lsn(0x30)); + let local_timeline_path = harness.timeline_path(&TIMELINE_ID); + let timeline_upload = + create_local_timeline(&harness, TIMELINE_ID, &layer_files, metadata.clone()).await?; + for local_path in timeline_upload.layers_to_upload { + let remote_path = storage.storage_path(&local_path)?; + let remote_parent_dir = remote_path.parent().unwrap(); + if !remote_parent_dir.exists() { + fs::create_dir_all(&remote_parent_dir).await?; + } + fs::copy(&local_path, &remote_path).await?; + } + assert_eq!( + storage + .list() + .await? + .into_iter() + .map(|remote_path| storage.local_path(&remote_path).unwrap()) + .filter_map(|local_path| { Some(local_path.file_name()?.to_str()?.to_owned()) }) + .sorted() + .collect::>(), + layer_files + .iter() + .map(|layer_str| layer_str.to_string()) + .sorted() + .collect::>(), + "Expect to have all layer files remotely before deletion" + ); + + let deleted = delete_timeline_layers( + &storage, + &sync_queue, + sync_id, + SyncData { + retries: current_retries, + data: LayersDeletion { + deleted_layers: HashSet::new(), + layers_to_delete: HashSet::from([ + local_timeline_path.join("a"), + local_timeline_path.join("c"), + local_timeline_path.join("something_different"), + ]), + deletion_registered: true, + }, + }, + ) + .await; + assert!(deleted, "Should be able to delete timeline files"); + + assert_eq!( + storage + .list() + .await? + .into_iter() + .map(|remote_path| storage.local_path(&remote_path).unwrap()) + .filter_map(|local_path| { Some(local_path.file_name()?.to_str()?.to_owned()) }) + .sorted() + .collect::>(), + vec!["b".to_string(), "d".to_string()], + "Expect to have only non-deleted files remotely" + ); + + Ok(()) + } +} diff --git a/pageserver/src/storage_sync.rs b/pageserver/src/storage_sync.rs index 52e0df3784..b8c6f7fdab 100644 --- a/pageserver/src/storage_sync.rs +++ b/pageserver/src/storage_sync.rs @@ -147,23 +147,27 @@ pub mod index; mod upload; use std::{ - collections::{HashMap, HashSet, VecDeque}, + collections::{hash_map, HashMap, HashSet, VecDeque}, ffi::OsStr, fmt::Debug, num::{NonZeroU32, NonZeroUsize}, ops::ControlFlow, path::{Path, PathBuf}, - sync::Arc, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, }; -use anyhow::{bail, Context}; +use anyhow::{anyhow, bail, Context}; use futures::stream::{FuturesUnordered, StreamExt}; use lazy_static::lazy_static; +use once_cell::sync::OnceCell; use remote_storage::{GenericRemoteStorage, RemoteStorage}; use tokio::{ fs, runtime::Runtime, - sync::mpsc::{self, UnboundedReceiver}, + sync::mpsc::{self, error::TryRecvError, UnboundedReceiver, UnboundedSender}, time::{Duration, Instant}, }; use tracing::*; @@ -221,6 +225,8 @@ lazy_static! { .expect("failed to register pageserver image sync time histogram vec"); } +static SYNC_QUEUE: OnceCell = OnceCell::new(); + /// A timeline status to share with pageserver's sync counterpart, /// after comparing local and remote timeline state. #[derive(Clone, Copy, Debug)] @@ -449,144 +455,131 @@ fn collect_timeline_files( /// Wraps mpsc channel bits around into a queue interface. /// mpsc approach was picked to allow blocking the sync loop if no tasks are present, to avoid meaningless spinning. -mod sync_queue { - use std::{ - collections::{HashMap, HashSet}, - num::NonZeroUsize, - ops::ControlFlow, - sync::atomic::{AtomicUsize, Ordering}, - }; +struct SyncQueue { + len: AtomicUsize, + max_timelines_per_batch: NonZeroUsize, + sender: UnboundedSender<(ZTenantTimelineId, SyncTask)>, +} - use anyhow::anyhow; - use once_cell::sync::OnceCell; - use tokio::sync::mpsc::{error::TryRecvError, UnboundedReceiver, UnboundedSender}; - use tracing::{debug, warn}; - - use super::{SyncTask, SyncTaskBatch}; - use utils::zid::ZTenantTimelineId; - - static SENDER: OnceCell> = OnceCell::new(); - static LENGTH: AtomicUsize = AtomicUsize::new(0); - - /// Initializes the queue with the given sender channel that is used to put the tasks into later. - /// Errors if called more than once. - pub fn init(sender: UnboundedSender<(ZTenantTimelineId, SyncTask)>) -> anyhow::Result<()> { - SENDER - .set(sender) - .map_err(|_sender| anyhow!("sync queue was already initialized"))?; - Ok(()) +impl SyncQueue { + fn new( + max_timelines_per_batch: NonZeroUsize, + ) -> (Self, UnboundedReceiver<(ZTenantTimelineId, SyncTask)>) { + let (sender, receiver) = mpsc::unbounded_channel(); + ( + Self { + len: AtomicUsize::new(0), + max_timelines_per_batch, + sender, + }, + receiver, + ) } - /// Adds a new task to the queue, if the queue was initialized, returning `true` on success. - /// On any error, or if the queue was not initialized, the task gets dropped (not scheduled) and `false` is returned. - pub fn push(sync_id: ZTenantTimelineId, new_task: SyncTask) -> bool { - if let Some(sender) = SENDER.get() { - match sender.send((sync_id, new_task)) { - Err(e) => { - warn!("Failed to enqueue a sync task: the receiver is dropped: {e}"); - false - } - Ok(()) => { - LENGTH.fetch_add(1, Ordering::Relaxed); - true - } + fn push(&self, sync_id: ZTenantTimelineId, new_task: SyncTask) { + match self.sender.send((sync_id, new_task)) { + Ok(()) => { + self.len.fetch_add(1, Ordering::Relaxed); + } + Err(e) => { + error!("failed to push sync task to queue: {e}"); } - } else { - warn!("Failed to enqueue a sync task: the sender is not initialized"); - false } } - /// Polls a new task from the queue, using its receiver counterpart. - /// Does not block if the queue is empty, returning [`None`] instead. - /// Needed to correctly track the queue length. - async fn next_task( - receiver: &mut UnboundedReceiver<(ZTenantTimelineId, SyncTask)>, - ) -> Option<(ZTenantTimelineId, SyncTask)> { - let task = receiver.recv().await; - if task.is_some() { - LENGTH.fetch_sub(1, Ordering::Relaxed); - } - task - } - - /// Fetches a task batch, not bigger than the given limit. - /// Not blocking, can return fewer tasks if the queue does not contain enough. - /// Batch tasks are split by timelines, with all related tasks merged into one (download/upload) - /// or two (download and upload, if both were found in the queue during batch construction). - pub(super) async fn next_task_batch( - receiver: &mut UnboundedReceiver<(ZTenantTimelineId, SyncTask)>, - max_timelines_to_sync: NonZeroUsize, - ) -> ControlFlow<(), HashMap> { + /// Fetches a task batch, getting every existing entry from the queue, grouping by timelines and merging the tasks for every timeline. + /// A timeline has to care to not to delete cetain layers from the remote storage before the corresponding uploads happen. + /// Otherwise, due to "immutable" nature of the layers, the order of their deletion/uploading/downloading does not matter. + /// Hence, we merge the layers together into single task per timeline and run those concurrently (with the deletion happening only after successful uploading). + async fn next_task_batch( + &self, + // The queue is based on two ends of a channel and has to be accessible statically without blocking for submissions from the sync code. + // Its receiver needs &mut, so we cannot place it in the same container with the other end and get both static and non-blocking access. + // Hence toss this around to use it from the sync loop directly as &mut. + sync_queue_receiver: &mut UnboundedReceiver<(ZTenantTimelineId, SyncTask)>, + ) -> HashMap { // request the first task in blocking fashion to do less meaningless work - let (first_sync_id, first_task) = if let Some(first_task) = next_task(receiver).await { + let (first_sync_id, first_task) = if let Some(first_task) = sync_queue_receiver.recv().await + { + self.len.fetch_sub(1, Ordering::Relaxed); first_task } else { - debug!("Queue sender part was dropped, aborting"); - return ControlFlow::Break(()); + info!("Queue sender part was dropped, aborting"); + return HashMap::new(); }; + let mut timelines_left_to_batch = self.max_timelines_per_batch.get() - 1; + let mut tasks_to_process = self.len(); - let max_timelines_to_sync = max_timelines_to_sync.get(); - let mut batched_timelines = HashSet::with_capacity(max_timelines_to_sync); - batched_timelines.insert(first_sync_id.timeline_id); + let mut batches = HashMap::with_capacity(tasks_to_process); + batches.insert(first_sync_id, SyncTaskBatch::new(first_task)); - let mut tasks = HashMap::new(); - tasks.insert(first_sync_id, SyncTaskBatch::new(first_task)); + let mut tasks_to_reenqueue = Vec::with_capacity(tasks_to_process); - loop { - if batched_timelines.len() >= max_timelines_to_sync { - debug!( - "Filled a full task batch with {} timeline sync operations", - batched_timelines.len() - ); - break; - } - - match receiver.try_recv() { + // Pull the queue channel until we get all tasks that were there at the beginning of the batch construction. + // Yet do not put all timelines in the batch, but only the first ones that fit the timeline limit. + // Still merge the rest of the pulled tasks and reenqueue those for later. + while tasks_to_process > 0 { + match sync_queue_receiver.try_recv() { Ok((sync_id, new_task)) => { - LENGTH.fetch_sub(1, Ordering::Relaxed); - tasks.entry(sync_id).or_default().add(new_task); - batched_timelines.insert(sync_id.timeline_id); + self.len.fetch_sub(1, Ordering::Relaxed); + tasks_to_process -= 1; + + match batches.entry(sync_id) { + hash_map::Entry::Occupied(mut v) => v.get_mut().add(new_task), + hash_map::Entry::Vacant(v) => { + timelines_left_to_batch = timelines_left_to_batch.saturating_sub(1); + if timelines_left_to_batch == 0 { + tasks_to_reenqueue.push((sync_id, new_task)); + } else { + v.insert(SyncTaskBatch::new(new_task)); + } + } + } } Err(TryRecvError::Disconnected) => { debug!("Sender disconnected, batch collection aborted"); break; } Err(TryRecvError::Empty) => { - debug!( - "No more data in the sync queue, task batch is not full, length: {}, max allowed size: {max_timelines_to_sync}", - batched_timelines.len() - ); + debug!("No more data in the sync queue, task batch is not full"); break; } } } - ControlFlow::Continue(tasks) + debug!( + "Batched {} timelines, reenqueuing {}", + batches.len(), + tasks_to_reenqueue.len() + ); + for (id, task) in tasks_to_reenqueue { + self.push(id, task); + } + + batches } - /// Length of the queue, assuming that all receiver counterparts were only called using the queue api. - pub fn len() -> usize { - LENGTH.load(Ordering::Relaxed) + fn len(&self) -> usize { + self.len.load(Ordering::Relaxed) } } /// A task to run in the async download/upload loop. /// Limited by the number of retries, after certain threshold the failing task gets evicted and the timeline disabled. -#[derive(Debug)] -pub enum SyncTask { +#[derive(Debug, Clone)] +enum SyncTask { /// A checkpoint outcome with possible local file updates that need actualization in the remote storage. /// Not necessary more fresh than the one already uploaded. - Download(SyncData), + Download(SyncData), /// A certain amount of image files to download. - Upload(SyncData), + Upload(SyncData), /// Delete remote files. - Delete(SyncData), + Delete(SyncData), } /// Stores the data to synd and its retries, to evict the tasks failing to frequently. #[derive(Debug, Clone, PartialEq, Eq)] -pub struct SyncData { +struct SyncData { retries: u32, data: T, } @@ -598,24 +591,24 @@ impl SyncData { } impl SyncTask { - fn download(download_task: TimelineDownload) -> Self { + fn download(download_task: LayersDownload) -> Self { Self::Download(SyncData::new(0, download_task)) } - fn upload(upload_task: TimelineUpload) -> Self { + fn upload(upload_task: LayersUpload) -> Self { Self::Upload(SyncData::new(0, upload_task)) } - fn delete(delete_task: TimelineDelete) -> Self { + fn delete(delete_task: LayersDeletion) -> Self { Self::Delete(SyncData::new(0, delete_task)) } } -#[derive(Debug, Default)] +#[derive(Debug, Default, PartialEq, Eq)] struct SyncTaskBatch { - upload: Option>, - download: Option>, - delete: Option>, + upload: Option>, + download: Option>, + delete: Option>, } impl SyncTaskBatch { @@ -666,6 +659,31 @@ impl SyncTaskBatch { SyncTask::Delete(new_delete) => match &mut self.delete { Some(batch_delete) => { batch_delete.retries = batch_delete.retries.min(new_delete.retries); + // Need to reregister deletions, but it's ok to register already deleted files once again, they will be skipped. + batch_delete.data.deletion_registered = batch_delete + .data + .deletion_registered + .min(new_delete.data.deletion_registered); + + // Do not download and upload the layers getting removed in the same batch + if let Some(batch_download) = &mut self.download { + batch_download + .data + .layers_to_skip + .extend(new_delete.data.layers_to_delete.iter().cloned()); + batch_download + .data + .layers_to_skip + .extend(new_delete.data.deleted_layers.iter().cloned()); + } + if let Some(batch_upload) = &mut self.upload { + let not_deleted = |layer: &PathBuf| { + !new_delete.data.layers_to_delete.contains(layer) + && !new_delete.data.deleted_layers.contains(layer) + }; + batch_upload.data.layers_to_upload.retain(not_deleted); + batch_upload.data.uploaded_layers.retain(not_deleted); + } batch_delete .data @@ -685,7 +703,7 @@ impl SyncTaskBatch { /// Local timeline files for upload, appeared after the new checkpoint. /// Current checkpoint design assumes new files are added only, no deletions or amendment happens. #[derive(Debug, Clone, PartialEq, Eq)] -pub struct TimelineUpload { +struct LayersUpload { /// Layer file path in the pageserver workdir, that were added for the corresponding checkpoint. layers_to_upload: HashSet, /// Already uploaded layers. Used to store the data about the uploads between task retries @@ -700,14 +718,19 @@ pub struct TimelineUpload { /// without using the remote index or any other ways to list the remote timleine files. /// Skips the files that are already downloaded. #[derive(Debug, Clone, PartialEq, Eq)] -pub struct TimelineDownload { +struct LayersDownload { layers_to_skip: HashSet, } #[derive(Debug, Clone, PartialEq, Eq)] -pub struct TimelineDelete { +struct LayersDeletion { layers_to_delete: HashSet, deleted_layers: HashSet, + /// Pageserver uses [`IndexPart`] as a source of truth for listing the files per timeline. + /// This object gets serialized and placed into the remote storage. + /// So if we manage to update pageserver's [`RemoteIndex`] and update the index part on the remote storage, + /// the corresponding files on S3 won't exist for pageserver albeit being physically present on that remote storage still. + /// Then all that's left is to remove the files from the remote storage, without concerns about consistency. deletion_registered: bool, } @@ -721,45 +744,55 @@ pub fn schedule_layer_upload( layers_to_upload: HashSet, metadata: Option, ) { - debug!("Scheduling layer upload for tenant {tenant_id}, timeline {timeline_id}, to upload: {layers_to_upload:?}"); - if !sync_queue::push( + let sync_queue = match SYNC_QUEUE.get() { + Some(queue) => queue, + None => { + warn!("Could not send an upload task for tenant {tenant_id}, timeline {timeline_id}"); + return; + } + }; + sync_queue.push( ZTenantTimelineId { tenant_id, timeline_id, }, - SyncTask::upload(TimelineUpload { + SyncTask::upload(LayersUpload { layers_to_upload, uploaded_layers: HashSet::new(), metadata, }), - ) { - warn!("Could not send an upload task for tenant {tenant_id}, timeline {timeline_id}") - } else { - debug!("Upload task for tenant {tenant_id}, timeline {timeline_id} sent") - } + ); + debug!("Upload task for tenant {tenant_id}, timeline {timeline_id} sent") } +/// Adds the new files to delete as a deletion task to the queue. +/// On task failure, it gets retried again from the start a number of times. +/// +/// Ensure that the loop is started otherwise the task is never processed. pub fn schedule_layer_delete( tenant_id: ZTenantId, timeline_id: ZTimelineId, layers_to_delete: HashSet, ) { - debug!("Scheduling layer deletion for tenant {tenant_id}, timeline {timeline_id}, to delete: {layers_to_delete:?}"); - if !sync_queue::push( + let sync_queue = match SYNC_QUEUE.get() { + Some(queue) => queue, + None => { + warn!("Could not send deletion task for tenant {tenant_id}, timeline {timeline_id}"); + return; + } + }; + sync_queue.push( ZTenantTimelineId { tenant_id, timeline_id, }, - SyncTask::delete(TimelineDelete { + SyncTask::delete(LayersDeletion { layers_to_delete, deleted_layers: HashSet::new(), deletion_registered: false, }), - ) { - warn!("Could not send deletion task for tenant {tenant_id}, timeline {timeline_id}") - } else { - debug!("Deletion task for tenant {tenant_id}, timeline {timeline_id} sent") - } + ); + debug!("Deletion task for tenant {tenant_id}, timeline {timeline_id} sent") } /// Requests the download of the entire timeline for a given tenant. @@ -771,15 +804,23 @@ pub fn schedule_layer_delete( /// Ensure that the loop is started otherwise the task is never processed. pub fn schedule_layer_download(tenant_id: ZTenantId, timeline_id: ZTimelineId) { debug!("Scheduling layer download for tenant {tenant_id}, timeline {timeline_id}"); - sync_queue::push( + let sync_queue = match SYNC_QUEUE.get() { + Some(queue) => queue, + None => { + warn!("Could not send download task for tenant {tenant_id}, timeline {timeline_id}"); + return; + } + }; + sync_queue.push( ZTenantTimelineId { tenant_id, timeline_id, }, - SyncTask::download(TimelineDownload { + SyncTask::download(LayersDownload { layers_to_skip: HashSet::new(), }), ); + debug!("Download task for tenant {tenant_id}, timeline {timeline_id} sent") } /// Uses a remote storage given to start the storage sync loop. @@ -795,8 +836,14 @@ where P: Debug + Send + Sync + 'static, S: RemoteStorage + Send + Sync + 'static, { - let (sender, receiver) = mpsc::unbounded_channel(); - sync_queue::init(sender)?; + let (sync_queue, sync_queue_receiver) = SyncQueue::new(max_concurrent_timelines_sync); + SYNC_QUEUE + .set(sync_queue) + .map_err(|_queue| anyhow!("Could not initialize sync queue"))?; + let sync_queue = match SYNC_QUEUE.get() { + Some(queue) => queue, + None => bail!("Could not get sync queue during the sync loop step, aborting"), + }; let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -813,6 +860,7 @@ where let local_timeline_init_statuses = schedule_first_sync_tasks( &mut runtime.block_on(remote_index.write()), + sync_queue, local_timeline_files, ); @@ -827,10 +875,12 @@ where storage_sync_loop( runtime, conf, - receiver, - Arc::new(storage), - loop_index, - max_concurrent_timelines_sync, + ( + Arc::new(storage), + loop_index, + sync_queue, + sync_queue_receiver, + ), max_sync_errors, ); Ok(()) @@ -843,14 +893,15 @@ where }) } -#[allow(clippy::too_many_arguments)] fn storage_sync_loop( runtime: Runtime, conf: &'static PageServerConf, - mut receiver: UnboundedReceiver<(ZTenantTimelineId, SyncTask)>, - storage: Arc, - index: RemoteIndex, - max_concurrent_timelines_sync: NonZeroUsize, + (storage, index, sync_queue, mut sync_queue_receiver): ( + Arc, + RemoteIndex, + &SyncQueue, + UnboundedReceiver<(ZTenantTimelineId, SyncTask)>, + ), max_sync_errors: NonZeroU32, ) where P: Debug + Send + Sync + 'static, @@ -859,15 +910,12 @@ fn storage_sync_loop( info!("Starting remote storage sync loop"); loop { let loop_index = index.clone(); - let storage = Arc::clone(&storage); + let loop_storage = Arc::clone(&storage); let loop_step = runtime.block_on(async { tokio::select! { step = loop_step( conf, - &mut receiver, - storage, - loop_index, - max_concurrent_timelines_sync, + (loop_storage, loop_index, sync_queue, &mut sync_queue_receiver), max_sync_errors, ) .instrument(info_span!("storage_sync_loop_step")) => step, @@ -898,23 +946,21 @@ fn storage_sync_loop( async fn loop_step( conf: &'static PageServerConf, - receiver: &mut UnboundedReceiver<(ZTenantTimelineId, SyncTask)>, - storage: Arc, - index: RemoteIndex, - max_concurrent_timelines_sync: NonZeroUsize, + (storage, index, sync_queue, sync_queue_receiver): ( + Arc, + RemoteIndex, + &SyncQueue, + &mut UnboundedReceiver<(ZTenantTimelineId, SyncTask)>, + ), max_sync_errors: NonZeroU32, ) -> ControlFlow<(), HashMap>> where P: Debug + Send + Sync + 'static, S: RemoteStorage + Send + Sync + 'static, { - let batched_tasks = - match sync_queue::next_task_batch(receiver, max_concurrent_timelines_sync).await { - ControlFlow::Continue(batch) => batch, - ControlFlow::Break(()) => return ControlFlow::Break(()), - }; + let batched_tasks = sync_queue.next_task_batch(sync_queue_receiver).await; - let remaining_queue_length = sync_queue::len(); + let remaining_queue_length = sync_queue.len(); REMAINING_SYNC_ITEMS.set(remaining_queue_length as i64); if remaining_queue_length > 0 || !batched_tasks.is_empty() { info!("Processing tasks for {} timelines in batch, more tasks left to process: {remaining_queue_length}", batched_tasks.len()); @@ -929,10 +975,15 @@ where let storage = Arc::clone(&storage); let index = index.clone(); async move { - let state_update = - process_sync_task_batch(conf, storage, index, max_sync_errors, sync_id, batch) - .instrument(info_span!("process_sync_task_batch", sync_id = %sync_id)) - .await; + let state_update = process_sync_task_batch( + conf, + (storage, index, sync_queue), + max_sync_errors, + sync_id, + batch, + ) + .instrument(info_span!("process_sync_task_batch", sync_id = %sync_id)) + .await; (sync_id, state_update) } }) @@ -941,7 +992,7 @@ where let mut new_timeline_states: HashMap< ZTenantId, HashMap, - > = HashMap::with_capacity(max_concurrent_timelines_sync.get()); + > = HashMap::new(); while let Some((sync_id, state_update)) = sync_results.next().await { debug!("Finished storage sync task for sync id {sync_id}"); if let Some(state_update) = state_update { @@ -957,8 +1008,7 @@ where async fn process_sync_task_batch( conf: &'static PageServerConf, - storage: Arc, - index: RemoteIndex, + (storage, index, sync_queue): (Arc, RemoteIndex, &SyncQueue), max_sync_errors: NonZeroU32, sync_id: ZTenantTimelineId, batch: SyncTaskBatch, @@ -972,6 +1022,13 @@ where let upload_data = batch.upload.clone(); let download_data = batch.download.clone(); + // Run both upload and download tasks concurrently (not in parallel): + // download and upload tasks do not conflict and spoil the pageserver state even if they are executed in parallel. + // Under "spoiling" here means potentially inconsistent layer set that misses some of the layers, declared present + // in local (implicitly, via Lsn values and related memory state) or remote (explicitly via remote layer file paths) metadata. + // When operating in a system without tasks failing over the error threshold, + // current batching and task processing systems aim to update the layer set and metadata files (remote and local), + // without "loosing" such layer files. let (upload_result, status_update) = tokio::join!( async { if let Some(upload_data) = upload_data { @@ -982,7 +1039,7 @@ where ControlFlow::Continue(new_upload_data) => { upload_timeline_data( conf, - (storage.as_ref(), &index), + (storage.as_ref(), &index, sync_queue), current_remote_timeline.as_ref(), sync_id, new_upload_data, @@ -1022,14 +1079,14 @@ where ControlFlow::Continue(new_download_data) => { return download_timeline_data( conf, - (storage.as_ref(), &index), + (storage.as_ref(), &index, sync_queue), current_remote_timeline.as_ref(), sync_id, new_download_data, sync_start, "download", ) - .await + .await; } ControlFlow::Break(_) => { index @@ -1046,35 +1103,40 @@ where ); if let Some(delete_data) = batch.delete { - match validate_task_retries(delete_data, max_sync_errors) - .instrument(info_span!("retries_validation")) - .await - { - ControlFlow::Continue(new_delete_data) => { - delete_timeline_data( - conf, - (storage.as_ref(), &index), - sync_id, - new_delete_data, - sync_start, - "delete", - ) - .instrument(info_span!("delete_timeline_data")) - .await; - } - ControlFlow::Break(failed_delete_data) => { - if let Err(e) = update_remote_data( - conf, - storage.as_ref(), - &index, - sync_id, - RemoteDataUpdate::Delete(&failed_delete_data.data.deleted_layers), - ) + if upload_result.is_some() { + match validate_task_retries(delete_data, max_sync_errors) + .instrument(info_span!("retries_validation")) .await - { - error!("Failed to update remote timeline {sync_id}: {e:?}"); + { + ControlFlow::Continue(new_delete_data) => { + delete_timeline_data( + conf, + (storage.as_ref(), &index, sync_queue), + sync_id, + new_delete_data, + sync_start, + "delete", + ) + .instrument(info_span!("delete_timeline_data")) + .await; + } + ControlFlow::Break(failed_delete_data) => { + if let Err(e) = update_remote_data( + conf, + storage.as_ref(), + &index, + sync_id, + RemoteDataUpdate::Delete(&failed_delete_data.data.deleted_layers), + ) + .await + { + error!("Failed to update remote timeline {sync_id}: {e:?}"); + } } } + } else { + sync_queue.push(sync_id, SyncTask::Delete(delete_data)); + warn!("Skipping delete task due to failed upload tasks, reenqueuing"); } } @@ -1083,10 +1145,10 @@ where async fn download_timeline_data( conf: &'static PageServerConf, - (storage, index): (&S, &RemoteIndex), + (storage, index, sync_queue): (&S, &RemoteIndex, &SyncQueue), current_remote_timeline: Option<&RemoteTimeline>, sync_id: ZTenantTimelineId, - new_download_data: SyncData, + new_download_data: SyncData, sync_start: Instant, task_name: &str, ) -> Option @@ -1097,6 +1159,7 @@ where match download_timeline_layers( conf, storage, + sync_queue, current_remote_timeline, sync_id, new_download_data, @@ -1126,7 +1189,7 @@ where Err(e) => { error!("Failed to update local timeline metadata: {e:?}"); download_data.retries += 1; - sync_queue::push(sync_id, SyncTask::Download(download_data)); + sync_queue.push(sync_id, SyncTask::Download(download_data)); register_sync_status(sync_start, task_name, Some(false)); } } @@ -1199,14 +1262,14 @@ async fn update_local_metadata( async fn delete_timeline_data( conf: &'static PageServerConf, - (storage, index): (&S, &RemoteIndex), + (storage, index, sync_queue): (&S, &RemoteIndex, &SyncQueue), sync_id: ZTenantTimelineId, - mut new_delete_data: SyncData, + mut new_delete_data: SyncData, sync_start: Instant, task_name: &str, ) where P: Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, { let timeline_delete = &mut new_delete_data.data; @@ -1222,14 +1285,14 @@ async fn delete_timeline_data( { error!("Failed to update remote timeline {sync_id}: {e:?}"); new_delete_data.retries += 1; - sync_queue::push(sync_id, SyncTask::Delete(new_delete_data)); + sync_queue.push(sync_id, SyncTask::Delete(new_delete_data)); register_sync_status(sync_start, task_name, Some(false)); return; } } timeline_delete.deletion_registered = true; - let sync_status = delete_timeline_layers(storage, sync_id, new_delete_data).await; + let sync_status = delete_timeline_layers(storage, sync_queue, sync_id, new_delete_data).await; register_sync_status(sync_start, task_name, Some(sync_status)); } @@ -1244,48 +1307,31 @@ async fn read_metadata_file(metadata_path: &Path) -> anyhow::Result( conf: &'static PageServerConf, - (storage, index): (&S, &RemoteIndex), + (storage, index, sync_queue): (&S, &RemoteIndex, &SyncQueue), current_remote_timeline: Option<&RemoteTimeline>, sync_id: ZTenantTimelineId, - new_upload_data: SyncData, + new_upload_data: SyncData, sync_start: Instant, task_name: &str, ) where P: Debug + Send + Sync + 'static, S: RemoteStorage + Send + Sync + 'static, { - let mut uploaded_data = - match upload_timeline_layers(storage, current_remote_timeline, sync_id, new_upload_data) - .await - { - UploadedTimeline::FailedAndRescheduled => { - register_sync_status(sync_start, task_name, Some(false)); - return; - } - UploadedTimeline::Successful(upload_data) => upload_data, - UploadedTimeline::SuccessfulAfterLocalFsUpdate(mut outdated_upload_data) => { - if outdated_upload_data.data.metadata.is_some() { - let local_metadata_path = - metadata_path(conf, sync_id.timeline_id, sync_id.tenant_id); - let local_metadata = match read_metadata_file(&local_metadata_path).await { - Ok(metadata) => metadata, - Err(e) => { - error!( - "Failed to load local metadata from path '{}': {e:?}", - local_metadata_path.display() - ); - outdated_upload_data.retries += 1; - sync_queue::push(sync_id, SyncTask::Upload(outdated_upload_data)); - register_sync_status(sync_start, task_name, Some(false)); - return; - } - }; - - outdated_upload_data.data.metadata = Some(local_metadata); - } - outdated_upload_data - } - }; + let mut uploaded_data = match upload_timeline_layers( + storage, + sync_queue, + current_remote_timeline, + sync_id, + new_upload_data, + ) + .await + { + UploadedTimeline::FailedAndRescheduled => { + register_sync_status(sync_start, task_name, Some(false)); + return; + } + UploadedTimeline::Successful(upload_data) => upload_data, + }; match update_remote_data( conf, @@ -1305,7 +1351,7 @@ async fn upload_timeline_data( Err(e) => { error!("Failed to update remote timeline {sync_id}: {e:?}"); uploaded_data.retries += 1; - sync_queue::push(sync_id, SyncTask::Upload(uploaded_data)); + sync_queue.push(sync_id, SyncTask::Upload(uploaded_data)); register_sync_status(sync_start, task_name, Some(false)); } } @@ -1313,7 +1359,7 @@ async fn upload_timeline_data( enum RemoteDataUpdate<'a> { Upload { - uploaded_data: TimelineUpload, + uploaded_data: LayersUpload, upload_failed: bool, }, Delete(&'a HashSet), @@ -1455,6 +1501,7 @@ where fn schedule_first_sync_tasks( index: &mut RemoteTimelineIndex, + sync_queue: &SyncQueue, local_timeline_files: HashMap)>, ) -> LocalTimelineInitStatuses { let mut local_timeline_init_statuses = LocalTimelineInitStatuses::new(); @@ -1491,7 +1538,7 @@ fn schedule_first_sync_tasks( // is it safe to upload this checkpoint? could it be half broken? new_sync_tasks.push_back(( sync_id, - SyncTask::upload(TimelineUpload { + SyncTask::upload(LayersUpload { layers_to_upload: local_files, uploaded_layers: HashSet::new(), metadata: Some(local_metadata), @@ -1509,7 +1556,7 @@ fn schedule_first_sync_tasks( } new_sync_tasks.into_iter().for_each(|(sync_id, task)| { - sync_queue::push(sync_id, task); + sync_queue.push(sync_id, task); }); local_timeline_init_statuses } @@ -1535,7 +1582,7 @@ fn compare_local_and_remote_timeline( let (initial_timeline_status, awaits_download) = if number_of_layers_to_download > 0 { new_sync_tasks.push_back(( sync_id, - SyncTask::download(TimelineDownload { + SyncTask::download(LayersDownload { layers_to_skip: local_files.clone(), }), )); @@ -1553,7 +1600,7 @@ fn compare_local_and_remote_timeline( if !layers_to_upload.is_empty() { new_sync_tasks.push_back(( sync_id, - SyncTask::upload(TimelineUpload { + SyncTask::upload(LayersUpload { layers_to_upload, uploaded_layers: HashSet::new(), metadata: Some(local_metadata), @@ -1584,12 +1631,12 @@ mod test_utils { use super::*; - pub async fn create_local_timeline( + pub(super) async fn create_local_timeline( harness: &RepoHarness<'_>, timeline_id: ZTimelineId, filenames: &[&str], metadata: TimelineMetadata, - ) -> anyhow::Result { + ) -> anyhow::Result { let timeline_path = harness.timeline_path(&timeline_id); fs::create_dir_all(&timeline_path).await?; @@ -1606,28 +1653,212 @@ mod test_utils { ) .await?; - Ok(TimelineUpload { + Ok(LayersUpload { layers_to_upload, uploaded_layers: HashSet::new(), metadata: Some(metadata), }) } - pub fn dummy_contents(name: &str) -> String { + pub(super) fn dummy_contents(name: &str) -> String { format!("contents for {name}") } - pub fn dummy_metadata(disk_consistent_lsn: Lsn) -> TimelineMetadata { + pub(super) fn dummy_metadata(disk_consistent_lsn: Lsn) -> TimelineMetadata { TimelineMetadata::new(disk_consistent_lsn, None, None, Lsn(0), Lsn(0), Lsn(0)) } } #[cfg(test)] mod tests { + use super::test_utils::dummy_metadata; + use crate::repository::repo_harness::TIMELINE_ID; + use hex_literal::hex; + use utils::lsn::Lsn; + use super::*; - #[test] - fn batching_tests() { - todo!("TODO kb") + const TEST_SYNC_ID: ZTenantTimelineId = ZTenantTimelineId { + tenant_id: ZTenantId::from_array(hex!("11223344556677881122334455667788")), + timeline_id: TIMELINE_ID, + }; + + #[tokio::test] + async fn separate_task_ids_batch() { + let (sync_queue, mut sync_queue_receiver) = SyncQueue::new(NonZeroUsize::new(100).unwrap()); + assert_eq!(sync_queue.len(), 0); + + let sync_id_2 = ZTenantTimelineId { + tenant_id: ZTenantId::from_array(hex!("22223344556677881122334455667788")), + timeline_id: TIMELINE_ID, + }; + let sync_id_3 = ZTenantTimelineId { + tenant_id: ZTenantId::from_array(hex!("33223344556677881122334455667788")), + timeline_id: TIMELINE_ID, + }; + assert!(sync_id_2 != TEST_SYNC_ID); + assert!(sync_id_2 != sync_id_3); + assert!(sync_id_3 != TEST_SYNC_ID); + + let download_task = SyncTask::download(LayersDownload { + layers_to_skip: HashSet::from([PathBuf::from("sk")]), + }); + let upload_task = SyncTask::upload(LayersUpload { + layers_to_upload: HashSet::from([PathBuf::from("up")]), + uploaded_layers: HashSet::from([PathBuf::from("upl")]), + metadata: Some(dummy_metadata(Lsn(2))), + }); + let delete_task = SyncTask::delete(LayersDeletion { + layers_to_delete: HashSet::from([PathBuf::from("de")]), + deleted_layers: HashSet::from([PathBuf::from("del")]), + deletion_registered: false, + }); + + sync_queue.push(TEST_SYNC_ID, download_task.clone()); + sync_queue.push(sync_id_2, upload_task.clone()); + sync_queue.push(sync_id_3, delete_task.clone()); + + let submitted_tasks_count = sync_queue.len(); + assert_eq!(submitted_tasks_count, 3); + let mut batch = sync_queue.next_task_batch(&mut sync_queue_receiver).await; + assert_eq!( + batch.len(), + submitted_tasks_count, + "Batch should consist of all tasks submitted" + ); + + assert_eq!( + Some(SyncTaskBatch::new(download_task)), + batch.remove(&TEST_SYNC_ID) + ); + assert_eq!( + Some(SyncTaskBatch::new(upload_task)), + batch.remove(&sync_id_2) + ); + assert_eq!( + Some(SyncTaskBatch::new(delete_task)), + batch.remove(&sync_id_3) + ); + + assert!(batch.is_empty(), "Should check all batch tasks"); + assert_eq!(sync_queue.len(), 0); + } + + #[tokio::test] + async fn same_task_id_separate_tasks_batch() { + let (sync_queue, mut sync_queue_receiver) = SyncQueue::new(NonZeroUsize::new(100).unwrap()); + assert_eq!(sync_queue.len(), 0); + + let download = LayersDownload { + layers_to_skip: HashSet::from([PathBuf::from("sk")]), + }; + let upload = LayersUpload { + layers_to_upload: HashSet::from([PathBuf::from("up")]), + uploaded_layers: HashSet::from([PathBuf::from("upl")]), + metadata: Some(dummy_metadata(Lsn(2))), + }; + let delete = LayersDeletion { + layers_to_delete: HashSet::from([PathBuf::from("de")]), + deleted_layers: HashSet::from([PathBuf::from("del")]), + deletion_registered: false, + }; + + sync_queue.push(TEST_SYNC_ID, SyncTask::download(download.clone())); + sync_queue.push(TEST_SYNC_ID, SyncTask::upload(upload.clone())); + sync_queue.push(TEST_SYNC_ID, SyncTask::delete(delete.clone())); + + let submitted_tasks_count = sync_queue.len(); + assert_eq!(submitted_tasks_count, 3); + let mut batch = sync_queue.next_task_batch(&mut sync_queue_receiver).await; + assert_eq!( + batch.len(), + 1, + "Queue should have one batch merged from 3 sync tasks of the same user" + ); + + assert_eq!( + Some(SyncTaskBatch { + upload: Some(SyncData { + retries: 0, + data: upload + }), + download: Some(SyncData { + retries: 0, + data: download + }), + delete: Some(SyncData { + retries: 0, + data: delete + }), + }), + batch.remove(&TEST_SYNC_ID), + "Should have one batch containing all tasks unchanged" + ); + + assert!(batch.is_empty(), "Should check all batch tasks"); + assert_eq!(sync_queue.len(), 0); + } + + #[tokio::test] + async fn same_task_id_same_tasks_batch() { + let (sync_queue, mut sync_queue_receiver) = SyncQueue::new(NonZeroUsize::new(1).unwrap()); + let download_1 = LayersDownload { + layers_to_skip: HashSet::from([PathBuf::from("sk1")]), + }; + let download_2 = LayersDownload { + layers_to_skip: HashSet::from([PathBuf::from("sk2")]), + }; + let download_3 = LayersDownload { + layers_to_skip: HashSet::from([PathBuf::from("sk3")]), + }; + let download_4 = LayersDownload { + layers_to_skip: HashSet::from([PathBuf::from("sk4")]), + }; + + let sync_id_2 = ZTenantTimelineId { + tenant_id: ZTenantId::from_array(hex!("22223344556677881122334455667788")), + timeline_id: TIMELINE_ID, + }; + assert!(sync_id_2 != TEST_SYNC_ID); + + sync_queue.push(TEST_SYNC_ID, SyncTask::download(download_1.clone())); + sync_queue.push(TEST_SYNC_ID, SyncTask::download(download_2.clone())); + sync_queue.push(sync_id_2, SyncTask::download(download_3.clone())); + sync_queue.push(TEST_SYNC_ID, SyncTask::download(download_4.clone())); + assert_eq!(sync_queue.len(), 4); + + let mut smallest_batch = sync_queue.next_task_batch(&mut sync_queue_receiver).await; + assert_eq!( + smallest_batch.len(), + 1, + "Queue should have one batch merged from the all sync tasks, but not the other user's task" + ); + assert_eq!( + Some(SyncTaskBatch { + download: Some(SyncData { + retries: 0, + data: LayersDownload { + layers_to_skip: { + let mut set = HashSet::new(); + set.extend(download_1.layers_to_skip.into_iter()); + set.extend(download_2.layers_to_skip.into_iter()); + set.extend(download_4.layers_to_skip.into_iter()); + set + }, + } + }), + upload: None, + delete: None, + }), + smallest_batch.remove(&TEST_SYNC_ID), + "Should have one batch containing all tasks merged for the tenant first appeared in the batch" + ); + + assert!(smallest_batch.is_empty(), "Should check all batch tasks"); + assert_eq!( + sync_queue.len(), + 1, + "Should have one task left out of the batch" + ); } } diff --git a/pageserver/src/storage_sync/delete.rs b/pageserver/src/storage_sync/delete.rs index 8b13789179..047ad6c2be 100644 --- a/pageserver/src/storage_sync/delete.rs +++ b/pageserver/src/storage_sync/delete.rs @@ -1 +1,228 @@ +//! Timeline synchrnonization logic to delete a bulk of timeline's remote files from the remote storage. +use anyhow::Context; +use futures::stream::{FuturesUnordered, StreamExt}; +use tracing::{debug, error, info}; + +use crate::storage_sync::{SyncQueue, SyncTask}; +use remote_storage::RemoteStorage; +use utils::zid::ZTenantTimelineId; + +use super::{LayersDeletion, SyncData}; + +/// Attempts to remove the timleline layers from the remote storage. +/// If the task had not adjusted the metadata before, the deletion will fail. +pub(super) async fn delete_timeline_layers<'a, P, S>( + storage: &'a S, + sync_queue: &SyncQueue, + sync_id: ZTenantTimelineId, + mut delete_data: SyncData, +) -> bool +where + P: std::fmt::Debug + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, +{ + if !delete_data.data.deletion_registered { + error!("Cannot delete timeline layers before the deletion metadata is not registered, reenqueueing"); + delete_data.retries += 1; + sync_queue.push(sync_id, SyncTask::Delete(delete_data)); + return false; + } + + if delete_data.data.layers_to_delete.is_empty() { + info!("No layers to delete, skipping"); + return true; + } + + let layers_to_delete = delete_data + .data + .layers_to_delete + .drain() + .collect::>(); + debug!("Layers to delete: {layers_to_delete:?}"); + info!("Deleting {} timeline layers", layers_to_delete.len()); + + let mut delete_tasks = layers_to_delete + .into_iter() + .map(|local_layer_path| async { + let storage_path = + match storage + .remote_object_id(&local_layer_path) + .with_context(|| { + format!( + "Failed to get the layer storage path for local path '{}'", + local_layer_path.display() + ) + }) { + Ok(path) => path, + Err(e) => return Err((e, local_layer_path)), + }; + + match storage.delete(&storage_path).await.with_context(|| { + format!( + "Failed to delete remote layer from storage at '{:?}'", + storage_path + ) + }) { + Ok(()) => Ok(local_layer_path), + Err(e) => Err((e, local_layer_path)), + } + }) + .collect::>(); + + let mut errored = false; + while let Some(deletion_result) = delete_tasks.next().await { + match deletion_result { + Ok(local_layer_path) => { + debug!( + "Successfully deleted layer {} for timeline {sync_id}", + local_layer_path.display() + ); + delete_data.data.deleted_layers.insert(local_layer_path); + } + Err((e, local_layer_path)) => { + errored = true; + error!( + "Failed to delete layer {} for timeline {sync_id}: {e:?}", + local_layer_path.display() + ); + delete_data.data.layers_to_delete.insert(local_layer_path); + } + } + } + + if errored { + debug!("Reenqueuing failed delete task for timeline {sync_id}"); + delete_data.retries += 1; + sync_queue.push(sync_id, SyncTask::Delete(delete_data)); + } + errored +} + +#[cfg(test)] +mod tests { + use std::{collections::HashSet, num::NonZeroUsize}; + + use itertools::Itertools; + use tempfile::tempdir; + use tokio::fs; + use utils::lsn::Lsn; + + use crate::{ + repository::repo_harness::{RepoHarness, TIMELINE_ID}, + storage_sync::test_utils::{create_local_timeline, dummy_metadata}, + }; + use remote_storage::LocalFs; + + use super::*; + + #[tokio::test] + async fn delete_timeline_negative() -> anyhow::Result<()> { + let harness = RepoHarness::create("delete_timeline_negative")?; + let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap()); + let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); + let storage = LocalFs::new( + tempdir()?.path().to_path_buf(), + harness.conf.workdir.clone(), + )?; + + let deleted = delete_timeline_layers( + &storage, + &sync_queue, + sync_id, + SyncData { + retries: 1, + data: LayersDeletion { + deleted_layers: HashSet::new(), + layers_to_delete: HashSet::new(), + deletion_registered: false, + }, + }, + ) + .await; + + assert!( + !deleted, + "Should not start the deletion for task with delete metadata unregistered" + ); + + Ok(()) + } + + #[tokio::test] + async fn delete_timeline() -> anyhow::Result<()> { + let harness = RepoHarness::create("delete_timeline")?; + let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap()); + + let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); + let layer_files = ["a", "b", "c", "d"]; + let storage = LocalFs::new( + tempdir()?.path().to_path_buf(), + harness.conf.workdir.clone(), + )?; + let current_retries = 3; + let metadata = dummy_metadata(Lsn(0x30)); + let local_timeline_path = harness.timeline_path(&TIMELINE_ID); + let timeline_upload = + create_local_timeline(&harness, TIMELINE_ID, &layer_files, metadata.clone()).await?; + for local_path in timeline_upload.layers_to_upload { + let remote_path = storage.remote_object_id(&local_path)?; + let remote_parent_dir = remote_path.parent().unwrap(); + if !remote_parent_dir.exists() { + fs::create_dir_all(&remote_parent_dir).await?; + } + fs::copy(&local_path, &remote_path).await?; + } + assert_eq!( + storage + .list() + .await? + .into_iter() + .map(|remote_path| storage.local_path(&remote_path).unwrap()) + .filter_map(|local_path| { Some(local_path.file_name()?.to_str()?.to_owned()) }) + .sorted() + .collect::>(), + layer_files + .iter() + .map(|layer_str| layer_str.to_string()) + .sorted() + .collect::>(), + "Expect to have all layer files remotely before deletion" + ); + + let deleted = delete_timeline_layers( + &storage, + &sync_queue, + sync_id, + SyncData { + retries: current_retries, + data: LayersDeletion { + deleted_layers: HashSet::new(), + layers_to_delete: HashSet::from([ + local_timeline_path.join("a"), + local_timeline_path.join("c"), + local_timeline_path.join("something_different"), + ]), + deletion_registered: true, + }, + }, + ) + .await; + assert!(deleted, "Should be able to delete timeline files"); + + assert_eq!( + storage + .list() + .await? + .into_iter() + .map(|remote_path| storage.local_path(&remote_path).unwrap()) + .filter_map(|local_path| { Some(local_path.file_name()?.to_str()?.to_owned()) }) + .sorted() + .collect::>(), + vec!["b".to_string(), "d".to_string()], + "Expect to have only non-deleted files remotely" + ); + + Ok(()) + } +} diff --git a/pageserver/src/storage_sync/download.rs b/pageserver/src/storage_sync/download.rs index 3cd6de57c7..98a0a0e2fc 100644 --- a/pageserver/src/storage_sync/download.rs +++ b/pageserver/src/storage_sync/download.rs @@ -12,15 +12,13 @@ use tokio::{ use tracing::{debug, error, info, warn}; use crate::{ - config::PageServerConf, - layered_repository::metadata::metadata_path, - storage_sync::{sync_queue, SyncTask}, + config::PageServerConf, layered_repository::metadata::metadata_path, storage_sync::SyncTask, }; use utils::zid::ZTenantTimelineId; use super::{ index::{IndexPart, RemoteTimeline}, - SyncData, TimelineDownload, + LayersDownload, SyncData, SyncQueue, }; pub const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download"; @@ -76,7 +74,7 @@ pub(super) enum DownloadedTimeline { FailedAndRescheduled, /// Remote timeline data is found, its latest checkpoint's metadata contents (disk_consistent_lsn) is known. /// Initial download successful. - Successful(SyncData), + Successful(SyncData), } /// Attempts to download all given timeline's layers. @@ -87,9 +85,10 @@ pub(super) enum DownloadedTimeline { pub(super) async fn download_timeline_layers<'a, P, S>( conf: &'static PageServerConf, storage: &'a S, + sync_queue: &'a SyncQueue, remote_timeline: Option<&'a RemoteTimeline>, sync_id: ZTenantTimelineId, - mut download_data: SyncData, + mut download_data: SyncData, ) -> DownloadedTimeline where P: Debug + Send + Sync + 'static, @@ -251,7 +250,7 @@ where if errors_happened { debug!("Reenqueuing failed download task for timeline {sync_id}"); download_data.retries += 1; - sync_queue::push(sync_id, SyncTask::Download(download_data)); + sync_queue.push(sync_id, SyncTask::Download(download_data)); DownloadedTimeline::FailedAndRescheduled } else { info!("Successfully downloaded all layers"); @@ -265,7 +264,10 @@ async fn fsync_path(path: impl AsRef) -> Result<(), io::Error> { #[cfg(test)] mod tests { - use std::collections::{BTreeSet, HashSet}; + use std::{ + collections::{BTreeSet, HashSet}, + num::NonZeroUsize, + }; use remote_storage::{LocalFs, RemoteStorage}; use tempfile::tempdir; @@ -284,6 +286,8 @@ mod tests { #[tokio::test] async fn download_timeline() -> anyhow::Result<()> { let harness = RepoHarness::create("download_timeline")?; + let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap()); + let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); let layer_files = ["a", "b", "layer_to_skip", "layer_to_keep_locally"]; let storage = LocalFs::new( @@ -324,11 +328,12 @@ mod tests { let download_data = match download_timeline_layers( harness.conf, &storage, + &sync_queue, Some(&remote_timeline), sync_id, SyncData::new( current_retries, - TimelineDownload { + LayersDownload { layers_to_skip: HashSet::from([local_timeline_path.join("layer_to_skip")]), }, ), @@ -380,17 +385,19 @@ mod tests { #[tokio::test] async fn download_timeline_negatives() -> anyhow::Result<()> { let harness = RepoHarness::create("download_timeline_negatives")?; + let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap()); let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); let storage = LocalFs::new(tempdir()?.path().to_owned(), harness.conf.workdir.clone())?; let empty_remote_timeline_download = download_timeline_layers( harness.conf, &storage, + &sync_queue, None, sync_id, SyncData::new( 0, - TimelineDownload { + LayersDownload { layers_to_skip: HashSet::new(), }, ), @@ -409,11 +416,12 @@ mod tests { let already_downloading_remote_timeline_download = download_timeline_layers( harness.conf, &storage, + &sync_queue, Some(¬_expecting_download_remote_timeline), sync_id, SyncData::new( 0, - TimelineDownload { + LayersDownload { layers_to_skip: HashSet::new(), }, ), diff --git a/pageserver/src/storage_sync/upload.rs b/pageserver/src/storage_sync/upload.rs index 1e2594ac70..f9d606f2b8 100644 --- a/pageserver/src/storage_sync/upload.rs +++ b/pageserver/src/storage_sync/upload.rs @@ -8,16 +8,14 @@ use remote_storage::RemoteStorage; use tokio::fs; use tracing::{debug, error, info, warn}; -use crate::{ - config::PageServerConf, - layered_repository::metadata::metadata_path, - storage_sync::{sync_queue, SyncTask}, -}; use utils::zid::ZTenantTimelineId; use super::{ index::{IndexPart, RemoteTimeline}, - SyncData, TimelineUpload, + LayersUpload, SyncData, SyncQueue, +}; +use crate::{ + config::PageServerConf, layered_repository::metadata::metadata_path, storage_sync::SyncTask, }; /// Serializes and uploads the given index part data to the remote storage. @@ -68,11 +66,7 @@ pub(super) enum UploadedTimeline { /// Upload failed due to some error, the upload task is rescheduled for another retry. FailedAndRescheduled, /// No issues happened during the upload, all task files were put into the remote storage. - Successful(SyncData), - /// No failures happened during the upload, but some files were removed locally before the upload task completed - /// (could happen due to retries, for instance, if GC happens in the interim). - /// Such files are considered "not needed" and ignored, but the task's metadata should be discarded and the new one loaded from the local file. - SuccessfulAfterLocalFsUpdate(SyncData), + Successful(SyncData), } /// Attempts to upload given layer files. @@ -81,9 +75,10 @@ pub(super) enum UploadedTimeline { /// On an error, bumps the retries count and reschedules the entire task. pub(super) async fn upload_timeline_layers<'a, P, S>( storage: &'a S, + sync_queue: &SyncQueue, remote_timeline: Option<&'a RemoteTimeline>, sync_id: ZTenantTimelineId, - mut upload_data: SyncData, + mut upload_data: SyncData, ) -> UploadedTimeline where P: Debug + Send + Sync + 'static, @@ -168,7 +163,6 @@ where .collect::>(); let mut errors_happened = false; - let mut local_fs_updated = false; while let Some(upload_result) = upload_tasks.next().await { match upload_result { Ok(uploaded_path) => { @@ -185,7 +179,16 @@ where errors_happened = true; error!("Failed to upload a layer for timeline {sync_id}: {e:?}"); } else { - local_fs_updated = true; + // We have run the upload sync task, but the file we wanted to upload is gone. + // This is "fine" due the asynchronous nature of the sync loop: it only reacts to events and might need to + // retry the upload tasks, if S3 or network is down: but during this time, pageserver might still operate and + // run compaction/gc threads, removing redundant files from disk. + // It's not good to pause GC/compaction because of those and we would rather skip such uploads. + // + // Yet absence of such files might also mean that the timeline metadata file was updated (GC moves the Lsn forward, for instance). + // We don't try to read a more recent version, since it could contain `disk_consistent_lsn` that does not have its upload finished yet. + // This will create "missing" layers and make data inconsistent. + // Instead, we only update the metadata when it was submitted in an upload task as a checkpoint result. upload.layers_to_upload.remove(&source_path); warn!( "Missing locally a layer file {} scheduled for upload, skipping", @@ -200,11 +203,8 @@ where if errors_happened { debug!("Reenqueuing failed upload task for timeline {sync_id}"); upload_data.retries += 1; - sync_queue::push(sync_id, SyncTask::Upload(upload_data)); + sync_queue.push(sync_id, SyncTask::Upload(upload_data)); UploadedTimeline::FailedAndRescheduled - } else if local_fs_updated { - info!("Successfully uploaded all layers, some local layers were removed during the upload"); - UploadedTimeline::SuccessfulAfterLocalFsUpdate(upload_data) } else { info!("Successfully uploaded all layers"); UploadedTimeline::Successful(upload_data) @@ -218,7 +218,10 @@ enum UploadError { #[cfg(test)] mod tests { - use std::collections::{BTreeSet, HashSet}; + use std::{ + collections::{BTreeSet, HashSet}, + num::NonZeroUsize, + }; use remote_storage::LocalFs; use tempfile::tempdir; @@ -237,6 +240,7 @@ mod tests { #[tokio::test] async fn regular_layer_upload() -> anyhow::Result<()> { let harness = RepoHarness::create("regular_layer_upload")?; + let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap()); let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); let layer_files = ["a", "b"]; @@ -258,6 +262,7 @@ mod tests { let upload_result = upload_timeline_layers( &storage, + &sync_queue, None, sync_id, SyncData::new(current_retries, timeline_upload.clone()), @@ -322,6 +327,7 @@ mod tests { #[tokio::test] async fn layer_upload_after_local_fs_update() -> anyhow::Result<()> { let harness = RepoHarness::create("layer_upload_after_local_fs_update")?; + let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap()); let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); let layer_files = ["a1", "b1"]; @@ -347,6 +353,7 @@ mod tests { let upload_result = upload_timeline_layers( &storage, + &sync_queue, None, sync_id, SyncData::new(current_retries, timeline_upload.clone()), @@ -354,7 +361,7 @@ mod tests { .await; let upload_data = match upload_result { - UploadedTimeline::SuccessfulAfterLocalFsUpdate(upload_data) => upload_data, + UploadedTimeline::Successful(upload_data) => upload_data, wrong_result => panic!( "Expected a successful after local fs upload for timeline, but got: {wrong_result:?}" ),