From 670205e17a426ef8ea62a5eb048a5e5fb4dabc22 Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Thu, 11 Nov 2021 00:59:43 +0200 Subject: [PATCH] Evict excessively failing sync tasks, improve processing for the rest of the tasks --- pageserver/Cargo.toml | 5 +- pageserver/README.md | 4 +- pageserver/src/bin/pageserver.rs | 21 +- pageserver/src/layered_repository.rs | 49 +- pageserver/src/lib.rs | 10 +- pageserver/src/remote_storage.rs | 15 +- pageserver/src/remote_storage/README.md | 7 - pageserver/src/remote_storage/storage_sync.rs | 926 +++++++++--------- pageserver/src/repository.rs | 3 + pageserver/src/tenant_mgr.rs | 79 +- 10 files changed, 625 insertions(+), 494 deletions(-) diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 2e4cbd9877..6cb26404c5 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -17,7 +17,7 @@ lazy_static = "1.4.0" log = "0.4.14" clap = "2.33.0" daemonize = "0.4.1" -tokio = { version = "1.11", features = ["process", "macros", "fs", "rt", "io-util"] } +tokio = { version = "1.11", features = ["process", "sync", "macros", "fs", "rt", "io-util", "time"] } postgres-types = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } @@ -32,7 +32,6 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1" toml = "0.5" scopeguard = "1.1.0" -rust-s3 = { version = "0.27.0-rc4", features = ["no-verify-ssl"] } async-trait = "0.1" const_format = "0.2.21" tracing = "0.1.27" @@ -41,6 +40,8 @@ url = "2" nix = "0.23" once_cell = "1.8.0" +rust-s3 = { version = "0.27.0-rc4", features = ["no-verify-ssl"] } + postgres_ffi = { path = "../postgres_ffi" } zenith_metrics = { path = "../zenith_metrics" } zenith_utils = { path = "../zenith_utils" } diff --git a/pageserver/README.md b/pageserver/README.md index 7a9f947e95..b812a9dba9 100644 --- a/pageserver/README.md +++ b/pageserver/README.md @@ -134,8 +134,8 @@ Implementation details are covered in the [backup readme](./src/remote_storage/R The backup service is disabled by default and can be enabled to interact with a single remote storage. CLI examples: -* Local FS: `${PAGESERVER_BIN} --relish-storage-local-path="/some/local/path/"` -* AWS S3 : `${PAGESERVER_BIN} --relish-storage-s3-bucket="some-sample-bucket" --relish-storage-region="eu-north-1" --relish-storage-access-key="SOMEKEYAAAAASADSAH*#" --relish-storage-secret-access-key="SOMEsEcReTsd292v"` +* Local FS: `${PAGESERVER_BIN} --remote-storage-local-path="/some/local/path/"` +* AWS S3 : `${PAGESERVER_BIN} --remote-storage-s3-bucket="some-sample-bucket" --remote-storage-region="eu-north-1" --remote-storage-access-key="SOMEKEYAAAAASADSAH*#" --remote-storage-secret-access-key="SOMEsEcReTsd292v"` For Amazon AWS S3, a key id and secret access key could be located in `~/.aws/credentials` if awscli was ever configured to work with the desired bucket, on the AWS Settings page for a certain user. Also note, that the bucket names does not contain any protocols when used on AWS. For local S3 installations, refer to the their documentation for name format and credentials. diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 88a0195858..63de235003 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize}; use std::{ env, + num::{NonZeroU32, NonZeroUsize}, path::{Path, PathBuf}, str::FromStr, thread, @@ -50,6 +51,7 @@ struct CfgFileParams { auth_validation_public_key_path: Option, auth_type: Option, remote_storage_max_concurrent_sync: Option, + remote_storage_max_sync_errors: Option, ///////////////////////////////// //// Don't put `Option` and other "simple" values below. //// @@ -115,6 +117,7 @@ impl CfgFileParams { auth_type: get_arg("auth-type"), remote_storage, remote_storage_max_concurrent_sync: get_arg("remote-storage-max-concurrent-sync"), + remote_storage_max_sync_errors: get_arg("remote-storage-max-sync-errors"), } } @@ -140,6 +143,9 @@ impl CfgFileParams { remote_storage_max_concurrent_sync: self .remote_storage_max_concurrent_sync .or(other.remote_storage_max_concurrent_sync), + remote_storage_max_sync_errors: self + .remote_storage_max_sync_errors + .or(other.remote_storage_max_sync_errors), } } @@ -225,7 +231,11 @@ impl CfgFileParams { let max_concurrent_sync = match self.remote_storage_max_concurrent_sync.as_deref() { Some(number_str) => number_str.parse()?, - None => DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNC_LIMITS, + None => NonZeroUsize::new(DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNC).unwrap(), + }; + let max_sync_errors = match self.remote_storage_max_sync_errors.as_deref() { + Some(number_str) => number_str.parse()?, + None => NonZeroU32::new(DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS).unwrap(), }; let remote_storage_config = self.remote_storage.as_ref().map(|storage_params| { let storage = match storage_params.clone() { @@ -246,6 +256,7 @@ impl CfgFileParams { }; RemoteStorageConfig { max_concurrent_sync, + max_sync_errors, storage, } }); @@ -667,6 +678,9 @@ mod tests { remote_storage_max_concurrent_sync: Some( "remote_storage_max_concurrent_sync_VALUE".to_string(), ), + remote_storage_max_sync_errors: Some( + "remote_storage_max_sync_errors_VALUE".to_string(), + ), }; let toml_string = toml::to_string(¶ms).expect("Failed to serialize correct config"); @@ -686,6 +700,7 @@ pg_distrib_dir = 'pg_distrib_dir_VALUE' auth_validation_public_key_path = 'auth_validation_public_key_path_VALUE' auth_type = 'auth_type_VALUE' remote_storage_max_concurrent_sync = 'remote_storage_max_concurrent_sync_VALUE' +remote_storage_max_sync_errors = 'remote_storage_max_sync_errors_VALUE' [remote_storage] local_path = 'remote_storage_local_VALUE' @@ -733,6 +748,9 @@ local_path = 'remote_storage_local_VALUE' remote_storage_max_concurrent_sync: Some( "remote_storage_max_concurrent_sync_VALUE".to_string(), ), + remote_storage_max_sync_errors: Some( + "remote_storage_max_sync_errors_VALUE".to_string(), + ), }; let toml_string = toml::to_string(¶ms).expect("Failed to serialize correct config"); @@ -752,6 +770,7 @@ pg_distrib_dir = 'pg_distrib_dir_VALUE' auth_validation_public_key_path = 'auth_validation_public_key_path_VALUE' auth_type = 'auth_type_VALUE' remote_storage_max_concurrent_sync = 'remote_storage_max_concurrent_sync_VALUE' +remote_storage_max_sync_errors = 'remote_storage_max_sync_errors_VALUE' [remote_storage] bucket_name = 'bucket_name_VALUE' diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 0173c3ca5a..af046536e0 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -34,7 +34,7 @@ use std::time::{Duration, Instant}; use self::metadata::{metadata_path, TimelineMetadata}; use crate::page_cache; use crate::relish::*; -use crate::remote_storage::schedule_timeline_upload; +use crate::remote_storage::schedule_timeline_checkpoint_upload; use crate::repository::{GcResult, Repository, Timeline, TimelineWriter, WALRecord}; use crate::tenant_mgr; use crate::walreceiver; @@ -259,16 +259,37 @@ impl Repository for LayeredRepository { let timelines = self.timelines.lock().unwrap(); for (timelineid, timeline) in timelines.iter() { - walreceiver::stop_wal_receiver(*timelineid); - // Wait for syncing data to disk - trace!("repo shutdown. checkpoint timeline {}", timelineid); - timeline.checkpoint(CheckpointConfig::Forced)?; - - //TODO Wait for walredo process to shutdown too + shutdown_timeline(*timelineid, timeline.as_ref())?; } Ok(()) } + + fn unload_timeline(&self, timeline_id: ZTimelineId) -> Result<()> { + let mut timelines = self.timelines.lock().unwrap(); + let removed_timeline = match timelines.remove(&timeline_id) { + Some(timeline) => timeline, + None => { + warn!("Timeline {} not found, nothing to remove", timeline_id); + return Ok(()); + } + }; + drop(timelines); + shutdown_timeline(timeline_id, removed_timeline.as_ref())?; + + Ok(()) + } +} + +fn shutdown_timeline( + timelineid: ZTimelineId, + timeline: &LayeredTimeline, +) -> Result<(), anyhow::Error> { + walreceiver::stop_wal_receiver(timelineid); + trace!("repo shutdown. checkpoint timeline {}", timelineid); + timeline.checkpoint(CheckpointConfig::Forced)?; + //TODO Wait for walredo process to shutdown too + Ok(()) } /// Private functions @@ -318,7 +339,12 @@ impl LayeredRepository { .load_layer_map(disk_consistent_lsn) .context("failed to load layermap")?; if self.upload_relishes { - schedule_timeline_upload(self.tenantid, timelineid, loaded_layers, metadata); + schedule_timeline_checkpoint_upload( + self.tenantid, + timelineid, + loaded_layers, + metadata, + ); } // needs to be after load_layer_map @@ -1332,7 +1358,12 @@ impl LayeredTimeline { false, )?; if self.upload_relishes { - schedule_timeline_upload(self.tenantid, self.timelineid, layer_uploads, metadata); + schedule_timeline_checkpoint_upload( + self.tenantid, + self.timelineid, + layer_uploads, + metadata, + ); } // Also update the in-memory copy diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 6c76d60a82..660b913b0e 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -2,6 +2,7 @@ use layered_repository::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME}; use zenith_utils::postgres_backend::AuthType; use zenith_utils::zid::{ZTenantId, ZTimelineId}; +use std::num::{NonZeroU32, NonZeroUsize}; use std::path::PathBuf; use std::time::Duration; @@ -44,7 +45,8 @@ pub mod defaults { pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100); pub const DEFAULT_SUPERUSER: &str = "zenith_admin"; - pub const DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNC_LIMITS: usize = 100; + pub const DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNC: usize = 100; + pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10; pub const DEFAULT_OPEN_MEM_LIMIT: usize = 128 * 1024 * 1024; pub const DEFAULT_PAGE_CACHE_SIZE: usize = 8192; @@ -186,8 +188,10 @@ pub enum CheckpointConfig { /// External backup storage configuration, enough for creating a client for that storage. #[derive(Debug, Clone)] pub struct RemoteStorageConfig { - /// Limits the number of concurrent sync operations between pageserver and the remote storage. - pub max_concurrent_sync: usize, + /// Max allowed number of concurrent sync operations between pageserver and the remote storage. + pub max_concurrent_sync: NonZeroUsize, + /// Max allowed errors before the sync task is considered failed and evicted. + pub max_sync_errors: NonZeroU32, /// The storage connection configuration. pub storage: RemoteStorageKind, } diff --git a/pageserver/src/remote_storage.rs b/pageserver/src/remote_storage.rs index 6e960ed1b7..be7d421dbc 100644 --- a/pageserver/src/remote_storage.rs +++ b/pageserver/src/remote_storage.rs @@ -9,7 +9,7 @@ //! //! * synchronization logic at [`storage_sync`] module that keeps pageserver state (both runtime one and the workdir files) and storage state in sync. //! -//! * public API via to interact with the external world: [`run_storage_sync_thread`] and [`schedule_timeline_upload`] +//! * public API via to interact with the external world: [`run_storage_sync_thread`] and [`schedule_timeline_checkpoint_upload`] //! //! Here's a schematic overview of all interactions backup and the rest of the pageserver perform: //! @@ -17,7 +17,7 @@ //! | | - - - (init async loop) - - - -> | | //! | | | | //! | | -------------------------------> | async | -//! | pageserver | (schedule frozen layer upload) | upload/download | +//! | pageserver | (schedule checkpoint upload) | upload/download | //! | | | loop | //! | | <------------------------------- | | //! | | (register downloaded layers) | | @@ -80,11 +80,17 @@ use std::{ use anyhow::Context; use tokio::io; +use zenith_utils::zid::{ZTenantId, ZTimelineId}; -pub use self::storage_sync::schedule_timeline_upload; +pub use self::storage_sync::schedule_timeline_checkpoint_upload; use self::{local_fs::LocalFs, rust_s3::S3}; use crate::{PageServerConf, RemoteStorageKind}; +/// Any timeline has its own id and its own tenant it belongs to, +/// the sync processes group timelines by both for simplicity. +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Hash)] +pub struct TimelineSyncId(ZTenantId, ZTimelineId); + /// Based on the config, initiates the remote storage connection and starts a separate thread /// that ensures that pageserver and the remote storage are in sync with each other. /// If no external configuraion connection given, no thread or storage initialization is done. @@ -94,16 +100,19 @@ pub fn run_storage_sync_thread( match &config.remote_storage_config { Some(storage_config) => { let max_concurrent_sync = storage_config.max_concurrent_sync; + let max_sync_errors = storage_config.max_sync_errors; let handle = match &storage_config.storage { RemoteStorageKind::LocalFs(root) => storage_sync::spawn_storage_sync_thread( config, LocalFs::new(root.clone(), &config.workdir)?, max_concurrent_sync, + max_sync_errors, ), RemoteStorageKind::AwsS3(s3_config) => storage_sync::spawn_storage_sync_thread( config, S3::new(s3_config, &config.workdir)?, max_concurrent_sync, + max_sync_errors, ), }; handle.map(Some) diff --git a/pageserver/src/remote_storage/README.md b/pageserver/src/remote_storage/README.md index e9ef769fe6..c8e4ad242a 100644 --- a/pageserver/src/remote_storage/README.md +++ b/pageserver/src/remote_storage/README.md @@ -43,13 +43,6 @@ AWS S3 returns file checksums during the `list` operation, so that can be used t For now, due to this, we consider local workdir files as source of truth, not removing them ever and adjusting remote files instead, if image files mismatch. -* no proper retry management - -Now, the storage sync attempts to redo the upload/download operation for the image files that failed. -No proper task eviction or backpressure is implemented currently: the tasks will stay in the queue forever, reattempting the downloads. - -This will be fixed when more details on the file consistency model will be agreed on. - * sad rust-s3 api rust-s3 is not very pleasant to use: diff --git a/pageserver/src/remote_storage/storage_sync.rs b/pageserver/src/remote_storage/storage_sync.rs index e16ffaf898..f10304fbd4 100644 --- a/pageserver/src/remote_storage/storage_sync.rs +++ b/pageserver/src/remote_storage/storage_sync.rs @@ -13,7 +13,7 @@ //! pageserver writes them. //! //! The list construction is currently the only place where the storage sync can return an [`Err`] to the user. -//! New upload tasks are accepted via [`schedule_timeline_upload`] function disregarding of the corresponding loop startup, +//! New upload tasks are accepted via [`schedule_timeline_checkpoint_upload`] function disregarding of the corresponding loop startup, //! it's up to the caller to avoid uploading of the new file, if that caller did not enable the loop. //! After the initial state is loaded into memory and the loop starts, any further [`Err`] results do not stop the loop, but rather //! reschedules the same task, with possibly less files to sync in it. @@ -59,12 +59,12 @@ //! This is a subject to change in the near future, but requires more changes to [`crate::tenant_mgr`] before it can happen. use std::{ - cmp::Ordering, - collections::{hash_map, BinaryHeap, HashMap, HashSet}, + collections::{hash_map, HashMap, HashSet}, + num::{NonZeroU32, NonZeroUsize}, + ops::DerefMut, path::{Path, PathBuf}, - sync::Mutex, + sync::Arc, thread, - time::Duration, }; use anyhow::{anyhow, ensure, Context}; @@ -73,18 +73,21 @@ use lazy_static::lazy_static; use tokio::{ fs, io::{self, AsyncWriteExt}, - sync::Semaphore, + sync::{ + mpsc::{self, UnboundedReceiver}, + Mutex, + }, time::Instant, }; use tracing::*; -use super::RemoteStorage; +use super::{RemoteStorage, TimelineSyncId}; use crate::{ layered_repository::{ metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME}, TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME, }, - tenant_mgr::register_timeline_download, + tenant_mgr::{perform_post_timeline_sync_steps, PostTimelineSyncStep}, PageServerConf, }; use zenith_metrics::{register_histogram_vec, register_int_gauge, HistogramVec, IntGauge}; @@ -112,41 +115,134 @@ lazy_static! { .expect("failed to register pageserver image sync time histogram vec"); } -lazy_static! { - static ref SYNC_QUEUE: Mutex> = Mutex::new(BinaryHeap::new()); +/// Wraps mpsc channel bits around into a queue interface. +/// mpsc approach was picked to allow blocking the sync loop if no tasks are present, to avoud meaningless spinning. +mod sync_queue { + use std::sync::atomic::{AtomicUsize, Ordering}; + + use anyhow::anyhow; + use once_cell::sync::OnceCell; + use tokio::sync::mpsc::{error::TryRecvError, UnboundedReceiver, UnboundedSender}; + use tracing::{debug, warn}; + + use super::SyncTask; + + static SENDER: OnceCell> = OnceCell::new(); + static LENGTH: AtomicUsize = AtomicUsize::new(0); + + pub fn init(sender: UnboundedSender) -> anyhow::Result<()> { + SENDER + .set(sender) + .map_err(|_| anyhow!("sync queue was already initialized"))?; + Ok(()) + } + + pub fn push(new_task: SyncTask) -> bool { + if let Some(sender) = SENDER.get() { + match sender.send(new_task) { + Err(e) => { + warn!( + "Failed to enqueue a sync task: the receiver is dropped: {}", + e + ); + false + } + Ok(()) => { + LENGTH.fetch_add(1, Ordering::Relaxed); + true + } + } + } else { + warn!("Failed to enqueue a sync task: the receiver is not initialized"); + false + } + } + + pub async fn next_task(receiver: &mut UnboundedReceiver) -> Option { + let task = receiver.recv().await; + LENGTH.fetch_sub(1, Ordering::Relaxed); + task + } + + pub async fn next_task_batch( + receiver: &mut UnboundedReceiver, + mut max_batch_size: usize, + ) -> Vec { + let mut tasks = Vec::with_capacity(max_batch_size); + + if max_batch_size == 0 { + return tasks; + } + + loop { + match receiver.try_recv() { + Ok(new_task) => { + max_batch_size -= 1; + LENGTH.fetch_sub(1, Ordering::Relaxed); + tasks.push(new_task); + if max_batch_size == 0 { + break; + } + } + Err(TryRecvError::Disconnected) => { + debug!("Sender disconnected, batch collection aborted"); + break; + } + Err(TryRecvError::Empty) => { + debug!("No more data in the sync queue, task batch is not full"); + break; + } + } + } + + tasks + } + + pub fn len() -> usize { + LENGTH.load(Ordering::Relaxed) + } } -/// An image sync task to store in the priority queue. -/// The task priority is defined by its [`PartialOrd`] derive: -/// * lower enum variants are of more priority compared to the higher ones -/// * for the same enum variants, "natural" comparison happens for their data -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] -enum SyncTask { +/// A task to run in the async download/upload loop. +/// Limited by the number of retries, after certain threshold the failing task gets evicted and the timeline disabled. +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct SyncTask { + sync_id: TimelineSyncId, + retries: u32, + kind: SyncKind, +} + +impl SyncTask { + fn new(sync_id: TimelineSyncId, retries: u32, kind: SyncKind) -> Self { + Self { + sync_id, + retries, + kind, + } + } +} + +#[derive(Debug, PartialEq, Eq, Clone)] +enum SyncKind { /// Regular image download, that is not critical for running, but still needed. Download(RemoteTimeline), /// A checkpoint outcome with possible local file updates that need actualization in the remote storage. /// Not necessary more fresh than the one already uploaded. - Upload(LocalTimeline), - /// Every image that's not present locally but found remotely during sync loop start. - /// Treated as "lost state" that pageserver needs to recover fully before it's ready to work. - UrgentDownload(RemoteTimeline), + Upload(NewCheckpoint), } -/// Local timeline files for upload. -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] -struct LocalTimeline { - tenant_id: ZTenantId, - timeline_id: ZTimelineId, - /// Relish file paths in the pageserver workdir. +/// Local timeline files for upload, appeared after the new checkpoint. +/// Current checkpoint design assumes new files are added only, no deletions or amendment happens. +#[derive(Debug, PartialEq, Eq, Clone)] +struct NewCheckpoint { + /// Relish file paths in the pageserver workdir, that were added for the corresponding checkpoint. layers: Vec, metadata: TimelineMetadata, } /// Info about the remote image files. -#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Clone, Debug, PartialEq, Eq)] struct RemoteTimeline { - tenant_id: ZTenantId, - timeline_id: ZTimelineId, /// Same paths as in [`LocalTimeline`], pointing at the download /// destination of every of the remote timeline layers. layers: Vec, @@ -165,46 +261,60 @@ impl RemoteTimeline { } } -/// Adds the new image as an upload sync task to the queue. +/// Adds the new checkpoint files as an upload sync task to the queue. /// Ensure that the loop is started otherwise the task is never processed. -pub fn schedule_timeline_upload( +pub fn schedule_timeline_checkpoint_upload( tenant_id: ZTenantId, timeline_id: ZTimelineId, layers: Vec, metadata: TimelineMetadata, ) { - SYNC_QUEUE - .lock() - .unwrap() - .push(SyncTask::Upload(LocalTimeline { - tenant_id, - timeline_id, - layers, - metadata, - })) + if layers.is_empty() { + debug!("Skipping empty layers upload task"); + return; + } + + if !sync_queue::push(SyncTask::new( + TimelineSyncId(tenant_id, timeline_id), + 0, + SyncKind::Upload(NewCheckpoint { layers, metadata }), + )) { + warn!( + "Could not send an upload task for tenant {}, timeline {}", + tenant_id, timeline_id + ) + } else { + warn!( + "Could not send an upload task for tenant {}, timeline {}: the sync queue is not initialized", + tenant_id, timeline_id + ) + } } /// Uses a remote storage given to start the storage sync loop. /// See module docs for loop step description. pub(super) fn spawn_storage_sync_thread< - P: std::fmt::Debug, - S: 'static + RemoteStorage, + P: std::fmt::Debug + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, >( config: &'static PageServerConf, remote_storage: S, - max_concurrent_sync: usize, + max_concurrent_sync: NonZeroUsize, + max_sync_errors: NonZeroU32, ) -> anyhow::Result>> { - ensure!( - max_concurrent_sync > 0, - "Got 0 as max concurrent synchronizations allowed, cannot initialize a storage sync thread" - ); + let (sender, receiver) = mpsc::unbounded_channel(); + sync_queue::init(sender)?; let handle = thread::Builder::new() .name("Queue based remote storage sync".to_string()) .spawn(move || { - let concurrent_sync_limit = Semaphore::new(max_concurrent_sync); - let thread_result = storage_sync_loop(config, remote_storage, &concurrent_sync_limit); - concurrent_sync_limit.close(); + let thread_result = storage_sync_loop( + config, + receiver, + remote_storage, + max_concurrent_sync, + max_sync_errors, + ); if let Err(e) = &thread_result { error!("Failed to run storage sync thread: {:#}", e); } @@ -213,112 +323,206 @@ pub(super) fn spawn_storage_sync_thread< Ok(handle) } -fn storage_sync_loop>( +fn storage_sync_loop< + P: std::fmt::Debug + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, +>( config: &'static PageServerConf, + mut receiver: UnboundedReceiver, remote_storage: S, - concurrent_sync_limit: &Semaphore, + max_concurrent_sync: NonZeroUsize, + max_sync_errors: NonZeroU32, ) -> anyhow::Result<()> { let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build()?; - let mut remote_timelines = runtime + let remote_timelines = runtime .block_on(fetch_existing_uploads(&remote_storage)) .context("Failed to determine previously uploaded timelines")?; - let urgent_downloads = latest_timelines(&remote_timelines) + schedule_first_tasks(config, &remote_timelines); + + // placing the two variables, shared between the async loop tasks. Main reason for using `Arc` is `tokio::spawn` with its `'static` requirements. + let remote_timelines_and_storage = Arc::new((Mutex::new(remote_timelines), remote_storage)); + + while !crate::tenant_mgr::shutdown_requested() { + let registration_steps = runtime.block_on(loop_step( + config, + &mut receiver, + Arc::clone(&remote_timelines_and_storage), + max_concurrent_sync, + max_sync_errors, + )); + // Batch timeline download registration to ensure that the external registration code won't block any running tasks before. + perform_post_timeline_sync_steps(config, registration_steps); + } + + debug!("Shutdown requested, stopping"); + Ok(()) +} + +async fn loop_step< + P: std::fmt::Debug + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, +>( + config: &'static PageServerConf, + receiver: &mut UnboundedReceiver, + remote_timelines_and_storage: Arc<(Mutex>, S)>, + max_concurrent_sync: NonZeroUsize, + max_sync_errors: NonZeroU32, +) -> HashMap<(ZTenantId, ZTimelineId), PostTimelineSyncStep> { + let max_concurrent_sync = max_concurrent_sync.get(); + let mut next_tasks = Vec::with_capacity(max_concurrent_sync); + + // request the first task in blocking fashion to do less meaningless work + if let Some(first_task) = sync_queue::next_task(receiver).await { + next_tasks.push(first_task); + } else { + debug!("Shutdown requested, stopping"); + return HashMap::new(); + }; + next_tasks.extend( + sync_queue::next_task_batch(receiver, max_concurrent_sync - 1) + .await + .into_iter(), + ); + + let remaining_queue_length = sync_queue::len(); + debug!( + "Processing {} tasks, more tasks left to process: {}", + next_tasks.len(), + remaining_queue_length + ); + REMAINING_SYNC_ITEMS.set(remaining_queue_length as i64); + + let mut task_batch = next_tasks + .into_iter() + .map(|task| async { + let sync_id = task.sync_id; + let extra_step = match tokio::spawn(process_task( + config, + Arc::clone(&remote_timelines_and_storage), + task, + max_sync_errors, + )) + .await + { + Ok(extra_step) => extra_step, + Err(e) => { + error!( + "Failed to process storage sync task for tenant {}, timeline {}: {:#}", + sync_id.0, sync_id.1, e + ); + None + } + }; + (sync_id, extra_step) + }) + .collect::>(); + + let mut extra_sync_steps = HashMap::with_capacity(max_concurrent_sync); + while let Some((sync_id, extra_step)) = task_batch.next().await { + let TimelineSyncId(tenant_id, timeline_id) = sync_id; + debug!( + "Finished storage sync task for tenant {}, timeline {}", + tenant_id, timeline_id + ); + if let Some(extra_step) = extra_step { + extra_sync_steps.insert((tenant_id, timeline_id), extra_step); + } + } + + extra_sync_steps +} + +type TaskSharedRemotes = (Mutex>, S); + +async fn process_task< + P: std::fmt::Debug + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, +>( + config: &'static PageServerConf, + remote_timelines_and_storage: Arc>, + task: SyncTask, + max_sync_errors: NonZeroU32, +) -> Option { + let (remote_timelines, remote_storage) = remote_timelines_and_storage.as_ref(); + if task.retries > max_sync_errors.get() { + error!( + "Evicting task {:?} that failed {} times, exceeding the error theshold", + task.kind, task.retries + ); + return Some(PostTimelineSyncStep::Evict); + } + + if task.retries > 0 { + let seconds_to_wait = 2.0_f64.powf(task.retries as f64 - 1.0).min(30.0); + debug!( + "Waiting {} seconds before starting the task", + seconds_to_wait + ); + tokio::time::sleep(tokio::time::Duration::from_secs_f64(seconds_to_wait)).await; + } + + let sync_start = Instant::now(); + match task.kind { + SyncKind::Download(download_data) => { + let sync_status = download_timeline( + config, + remote_storage, + task.sync_id, + download_data, + task.retries + 1, + ) + .await; + register_sync_status(sync_start, "download", sync_status); + Some(PostTimelineSyncStep::RegisterDownload) + } + SyncKind::Upload(layer_upload) => { + let sync_status = upload_timeline( + config, + remote_timelines.lock().await.deref_mut(), + remote_storage, + task.sync_id, + layer_upload, + task.retries + 1, + ) + .await; + register_sync_status(sync_start, "upload", sync_status); + None + } + } +} + +fn schedule_first_tasks( + config: &'static PageServerConf, + remote_timelines: &HashMap, +) { + latest_timelines(remote_timelines) .iter() - .filter_map(|(&tenant_id, &timeline_id)| remote_timelines.get(&(tenant_id, timeline_id))) - .filter(|latest_remote_timeline| { - let tenant_id = latest_remote_timeline.tenant_id; - let timeline_id = latest_remote_timeline.timeline_id; - let exists_locally = config.timeline_path(&timeline_id, &tenant_id).exists(); + .filter(|sync_id| { + let exists_locally = config.timeline_path(&sync_id.1, &sync_id.0).exists(); if exists_locally { debug!( "Timeline with tenant id {}, timeline id {} exists locally, not downloading", - tenant_id, timeline_id + sync_id.0, sync_id.1 ); false } else { true } }) - .cloned() - .map(SyncTask::UrgentDownload) - .collect::>(); - info!( - "Will download {} timelines to restore state", - urgent_downloads.len() - ); - let mut accessor = SYNC_QUEUE.lock().unwrap(); - accessor.extend(urgent_downloads.into_iter()); - drop(accessor); - - while !crate::tenant_mgr::shutdown_requested() { - let mut queue_accessor = SYNC_QUEUE.lock().unwrap(); - let next_task = queue_accessor.pop(); - let remaining_queue_length = queue_accessor.len(); - drop(queue_accessor); - - match next_task { - Some(task) => { - debug!( - "Processing a new task, more tasks left to process: {}", - remaining_queue_length - ); - REMAINING_SYNC_ITEMS.set(remaining_queue_length as i64); - - runtime.block_on(async { - let sync_start = Instant::now(); - match task { - SyncTask::Download(download_data) => { - let sync_status = download_timeline( - config, - concurrent_sync_limit, - &remote_storage, - download_data, - false, - ) - .await; - register_sync_status(sync_start, "download", sync_status); - } - SyncTask::UrgentDownload(download_data) => { - let sync_status = download_timeline( - config, - concurrent_sync_limit, - &remote_storage, - download_data, - true, - ) - .await; - register_sync_status(sync_start, "download", sync_status); - } - SyncTask::Upload(layer_upload) => { - let sync_status = upload_timeline( - config, - concurrent_sync_limit, - &mut remote_timelines, - &remote_storage, - layer_upload, - ) - .await; - register_sync_status(sync_start, "upload", sync_status); - } - } - }) - } - None => { - trace!("No storage sync tasks found"); - thread::sleep(Duration::from_secs(1)); - continue; - } - }; - } - debug!("Queue based remote storage sync thread shut down"); - Ok(()) -} - -fn add_to_queue(task: SyncTask) { - SYNC_QUEUE.lock().unwrap().push(task) + .filter_map(|&sync_id| { + let remote_timeline = remote_timelines.get(&sync_id)?; + Some(SyncTask::new( + sync_id, + 0, + SyncKind::Download(remote_timeline.clone()), + )) + }) + .for_each(|task| { + sync_queue::push(task); + }); } fn register_sync_status(sync_start: Instant, sync_name: &str, sync_status: Option) { @@ -333,33 +537,33 @@ fn register_sync_status(sync_start: Instant, sync_name: &str, sync_status: Optio } fn latest_timelines( - remote_timelines: &HashMap<(ZTenantId, ZTimelineId), RemoteTimeline>, -) -> HashMap { + remote_timelines: &HashMap, +) -> HashSet { let mut latest_timelines_for_tenants = HashMap::with_capacity(remote_timelines.len()); - for (&(remote_tenant_id, remote_timeline_id), remote_timeline_data) in remote_timelines { + for (&sync_id, remote_timeline_data) in remote_timelines { let (latest_timeline_id, timeline_metadata) = latest_timelines_for_tenants - .entry(remote_tenant_id) - .or_insert_with(|| (remote_timeline_id, remote_timeline_data.metadata.clone())); - if latest_timeline_id != &remote_timeline_id + .entry(sync_id.0) + .or_insert_with(|| (sync_id.1, remote_timeline_data.metadata.clone())); + if latest_timeline_id != &sync_id.1 && timeline_metadata .as_ref() .map(|metadata| metadata.disk_consistent_lsn()) < remote_timeline_data.disk_consistent_lsn() { - *latest_timeline_id = remote_timeline_id; + *latest_timeline_id = sync_id.1; *timeline_metadata = remote_timeline_data.metadata.clone(); } } latest_timelines_for_tenants .into_iter() - .map(|(tenant_id, (timeline_id, _))| (tenant_id, timeline_id)) + .map(|(tenant_id, (timeline_id, _))| TimelineSyncId(tenant_id, timeline_id)) .collect() } async fn fetch_existing_uploads>( remote_storage: &S, -) -> anyhow::Result> { +) -> anyhow::Result> { let uploaded_files = remote_storage .list() .await @@ -393,25 +597,19 @@ async fn fetch_existing_uploads((local_path, tenant_id, timeline_id, metadata)) + let sync_id = parse_sync_id(&local_path).map_err(|e| (e, remote_path))?; + Ok::<_, (anyhow::Error, P)>((local_path, sync_id, metadata)) }) .collect::>(); let mut fetched = HashMap::new(); while let Some(fetch_result) = data_fetches.next().await { match fetch_result { - Ok((local_path, tenant_id, timeline_id, remote_metadata)) => { - let remote_timeline = - fetched - .entry((tenant_id, timeline_id)) - .or_insert_with(|| RemoteTimeline { - tenant_id, - timeline_id, - layers: Vec::new(), - metadata: None, - }); + Ok((local_path, sync_id, remote_metadata)) => { + let remote_timeline = fetched.entry(sync_id).or_insert_with(|| RemoteTimeline { + layers: Vec::new(), + metadata: None, + }); if remote_metadata.is_some() { remote_timeline.metadata = remote_metadata; } else { @@ -431,7 +629,7 @@ async fn fetch_existing_uploads anyhow::Result<(ZTenantId, ZTimelineId)> { +fn parse_sync_id(path: &Path) -> anyhow::Result { let mut segments = path .iter() .flat_map(|segment| segment.to_str()) @@ -494,19 +692,17 @@ fn parse_ids_from_path(path: &Path) -> anyhow::Result<(ZTenantId, ZTimelineId)> ) })?; - Ok((tenant_id, timeline_id)) + Ok(TimelineSyncId(tenant_id, timeline_id)) } -async fn download_timeline<'a, P, S: 'static + RemoteStorage>( +async fn download_timeline>( config: &'static PageServerConf, - concurrent_sync_limit: &'a Semaphore, - remote_storage: &'a S, + remote_storage: &S, + sync_id: TimelineSyncId, remote_timeline: RemoteTimeline, - urgent: bool, + current_retry: u32, ) -> Option { - let timeline_id = remote_timeline.timeline_id; - let tenant_id = remote_timeline.tenant_id; - debug!("Downloading layers for timeline {}", timeline_id); + debug!("Downloading layers for timeline {}", sync_id.1); let new_metadata = if let Some(metadata) = remote_timeline.metadata { metadata @@ -518,65 +714,54 @@ async fn download_timeline<'a, P, S: 'static + RemoteStorage>( let sync_result = synchronize_layers( config, - concurrent_sync_limit, remote_storage, remote_timeline.layers.into_iter(), SyncOperation::Download, &new_metadata, - tenant_id, - timeline_id, + sync_id, ) .await; match sync_result { - SyncResult::Success { .. } => { - register_timeline_download(config, tenant_id, timeline_id); - Some(true) - } + SyncResult::Success { .. } => Some(true), SyncResult::MetadataSyncError { .. } => { let download = RemoteTimeline { layers: Vec::new(), metadata: Some(new_metadata), - tenant_id, - timeline_id, }; - add_to_queue(if urgent { - SyncTask::UrgentDownload(download) - } else { - SyncTask::Download(download) - }); + sync_queue::push(SyncTask::new( + sync_id, + current_retry, + SyncKind::Download(download), + )); Some(false) } SyncResult::LayerSyncError { not_synced, .. } => { let download = RemoteTimeline { layers: not_synced, metadata: Some(new_metadata), - tenant_id, - timeline_id, }; - add_to_queue(if urgent { - SyncTask::UrgentDownload(download) - } else { - SyncTask::Download(download) - }); + sync_queue::push(SyncTask::new( + sync_id, + current_retry, + SyncKind::Download(download), + )); Some(false) } } } -#[allow(clippy::unnecessary_filter_map)] async fn upload_timeline<'a, P, S: 'static + RemoteStorage>( config: &'static PageServerConf, - concurrent_sync_limit: &'a Semaphore, - remote_timelines: &'a mut HashMap<(ZTenantId, ZTimelineId), RemoteTimeline>, + remote_timelines: &'a mut HashMap, remote_storage: &'a S, - mut new_upload: LocalTimeline, + sync_id: TimelineSyncId, + mut new_upload: NewCheckpoint, + current_retry: u32, ) -> Option { - let tenant_id = new_upload.tenant_id; - let timeline_id = new_upload.timeline_id; - debug!("Uploading layers for timeline {}", timeline_id); + debug!("Uploading layers for timeline {}", sync_id.1); - if let hash_map::Entry::Occupied(o) = remote_timelines.entry((tenant_id, timeline_id)) { + if let hash_map::Entry::Occupied(o) = remote_timelines.entry(sync_id) { let uploaded_timeline_files = o.get(); let uploaded_layers = uploaded_timeline_files .layers @@ -590,47 +775,42 @@ async fn upload_timeline<'a, P, S: 'static + RemoteStorage>( Some(remote_metadata) => { let new_lsn = new_upload.metadata.disk_consistent_lsn(); let remote_lsn = remote_metadata.disk_consistent_lsn(); - match new_lsn.cmp(&remote_lsn) { - Ordering::Equal | Ordering::Less => { - warn!( + if new_lsn <= remote_lsn { + warn!( "Received a timeline witn LSN {} that's not later than the one from remote storage {}, not uploading", new_lsn, remote_lsn ); - return None; - } - Ordering::Greater => debug!( - "Received a timeline with newer LSN {} (storage LSN {}), updating the upload", - new_lsn, remote_lsn - ), + return None; + } else { + debug!( + "Received a timeline with newer LSN {} (storage LSN {}), updating the upload", + new_lsn, remote_lsn + ) } } } } - let LocalTimeline { + let NewCheckpoint { layers: new_layers, metadata: new_metadata, .. } = new_upload; let sync_result = synchronize_layers( config, - concurrent_sync_limit, remote_storage, new_layers.into_iter(), SyncOperation::Upload, &new_metadata, - tenant_id, - timeline_id, + sync_id, ) .await; let entry_to_update = remote_timelines - .entry((tenant_id, timeline_id)) + .entry(sync_id) .or_insert_with(|| RemoteTimeline { layers: Vec::new(), metadata: Some(new_metadata.clone()), - tenant_id, - timeline_id, }); match sync_result { SyncResult::Success { synced } => { @@ -640,22 +820,26 @@ async fn upload_timeline<'a, P, S: 'static + RemoteStorage>( } SyncResult::MetadataSyncError { synced } => { entry_to_update.layers.extend(synced.into_iter()); - add_to_queue(SyncTask::Upload(LocalTimeline { - tenant_id, - timeline_id, - layers: Vec::new(), - metadata: new_metadata, - })); + sync_queue::push(SyncTask::new( + sync_id, + current_retry, + SyncKind::Upload(NewCheckpoint { + layers: Vec::new(), + metadata: new_metadata, + }), + )); Some(false) } SyncResult::LayerSyncError { synced, not_synced } => { entry_to_update.layers.extend(synced.into_iter()); - add_to_queue(SyncTask::Upload(LocalTimeline { - tenant_id, - timeline_id, - layers: not_synced, - metadata: new_metadata, - })); + sync_queue::push(SyncTask::new( + sync_id, + current_retry, + SyncKind::Upload(NewCheckpoint { + layers: not_synced, + metadata: new_metadata, + }), + )); Some(false) } } @@ -695,26 +879,19 @@ enum SyncResult { #[allow(clippy::too_many_arguments)] async fn synchronize_layers<'a, P, S: 'static + RemoteStorage>( config: &'static PageServerConf, - concurrent_sync_limit: &'a Semaphore, remote_storage: &'a S, layers: impl Iterator, sync_operation: SyncOperation, new_metadata: &'a TimelineMetadata, - tenant_id: ZTenantId, - timeline_id: ZTimelineId, + sync_id: TimelineSyncId, ) -> SyncResult { let mut sync_operations = layers .into_iter() .map(|layer_path| async move { - let permit = concurrent_sync_limit - .acquire() - .await - .expect("Semaphore should not be closed yet"); let sync_result = match sync_operation { SyncOperation::Download => download(remote_storage, &layer_path).await, SyncOperation::Upload => upload(remote_storage, &layer_path).await, }; - drop(permit); (layer_path, sync_result) }) .collect::>(); @@ -748,8 +925,7 @@ async fn synchronize_layers<'a, P, S: 'static + RemoteStorage>( remote_storage, sync_operation, new_metadata, - tenant_id, - timeline_id, + sync_id, ) .await { @@ -775,12 +951,11 @@ async fn sync_metadata<'a, P, S: 'static + RemoteStorage>( remote_storage: &'a S, sync_operation: SyncOperation, new_metadata: &'a TimelineMetadata, - tenant_id: ZTenantId, - timeline_id: ZTimelineId, + sync_id: TimelineSyncId, ) -> anyhow::Result<()> { debug!("Synchronizing ({:?}) metadata file", sync_operation); - let local_metadata_path = metadata_path(config, timeline_id, tenant_id); + let local_metadata_path = metadata_path(config, sync_id.1, sync_id.0); let new_metadata_bytes = new_metadata.to_bytes()?; match sync_operation { SyncOperation::Download => { @@ -909,10 +1084,6 @@ mod tests { const CORRUPT_METADATA_TIMELINE_ID: ZTimelineId = ZTimelineId::from_array(hex!("314db9af91fbc02dda586880a3216c61")); - lazy_static! { - static ref LIMIT: Semaphore = Semaphore::new(100); - } - #[tokio::test] async fn upload_new_timeline() -> anyhow::Result<()> { let repo_harness = RepoHarness::create("upload_new_timeline")?; @@ -932,15 +1103,19 @@ mod tests { upload_metadata.clone(), )?; let expected_layers = upload.layers.clone(); - ensure_correct_timeline_upload(&repo_harness, &mut remote_timelines, &storage, upload) - .await; + ensure_correct_timeline_upload( + &repo_harness, + &mut remote_timelines, + &storage, + TIMELINE_ID, + upload, + ) + .await; let mut expected_uploads = HashMap::new(); expected_uploads.insert( - (repo_harness.tenant_id, TIMELINE_ID), + TimelineSyncId(repo_harness.tenant_id, TIMELINE_ID), RemoteTimeline { - tenant_id: repo_harness.tenant_id, - timeline_id: TIMELINE_ID, layers: expected_layers, metadata: Some(upload_metadata), }, @@ -953,6 +1128,7 @@ mod tests { #[tokio::test] async fn reupload_timeline() -> anyhow::Result<()> { let repo_harness = RepoHarness::create("reupload_timeline")?; + let sync_id = TimelineSyncId(repo_harness.tenant_id, TIMELINE_ID); let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?; let mut remote_timelines = HashMap::new(); @@ -968,6 +1144,7 @@ mod tests { &repo_harness, &mut remote_timelines, &storage, + TIMELINE_ID, first_timeline, ) .await; @@ -981,13 +1158,13 @@ mod tests { create_local_timeline(&repo_harness, TIMELINE_ID, &["b", "c"], new_upload_metadata)?; upload_timeline( repo_harness.conf, - &LIMIT, &mut remote_timelines, &storage, + sync_id, new_upload.clone(), + 0, ) .await; - assert_sync_queue_contents(SyncTask::Upload(new_upload), false); assert_timelines_equal(after_first_uploads, remote_timelines.clone()); let second_upload_metadata = dummy_metadata(Lsn(0x40)); @@ -1006,6 +1183,7 @@ mod tests { &repo_harness, &mut remote_timelines, &storage, + TIMELINE_ID, second_timeline, ) .await; @@ -1016,10 +1194,8 @@ mod tests { expected_layers.dedup(); expected_uploads.insert( - (repo_harness.tenant_id, TIMELINE_ID), + TimelineSyncId(repo_harness.tenant_id, TIMELINE_ID), RemoteTimeline { - tenant_id: repo_harness.tenant_id, - timeline_id: TIMELINE_ID, layers: expected_layers, metadata: Some(second_upload_metadata.clone()), }, @@ -1042,6 +1218,7 @@ mod tests { &repo_harness, &mut remote_timelines, &storage, + TIMELINE_ID, third_timeline, ) .await; @@ -1053,10 +1230,8 @@ mod tests { expected_layers.dedup(); expected_uploads.insert( - (repo_harness.tenant_id, TIMELINE_ID), + TimelineSyncId(repo_harness.tenant_id, TIMELINE_ID), RemoteTimeline { - tenant_id: repo_harness.tenant_id, - timeline_id: TIMELINE_ID, layers: expected_layers, metadata: Some(third_upload_metadata), }, @@ -1069,6 +1244,7 @@ mod tests { #[tokio::test] async fn reupload_missing_metadata() -> anyhow::Result<()> { let repo_harness = RepoHarness::create("reupload_missing_metadata")?; + let sync_id = TimelineSyncId(repo_harness.tenant_id, NO_METADATA_TIMELINE_ID); let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?; let mut remote_timelines = store_timelines_with_incorrect_metadata(&repo_harness, &storage).await?; @@ -1077,19 +1253,15 @@ mod tests { fetch_existing_uploads(&storage).await?, ); - let old_remote_timeline = remote_timelines - .get(&(repo_harness.tenant_id, NO_METADATA_TIMELINE_ID)) - .unwrap() - .clone(); + let old_remote_timeline = remote_timelines.get(&sync_id).unwrap().clone(); let updated_metadata = dummy_metadata(Lsn(0x100)); create_local_metadata(&repo_harness, NO_METADATA_TIMELINE_ID, &updated_metadata)?; ensure_correct_timeline_upload( &repo_harness, &mut remote_timelines, &storage, - LocalTimeline { - tenant_id: repo_harness.tenant_id, - timeline_id: NO_METADATA_TIMELINE_ID, + NO_METADATA_TIMELINE_ID, + NewCheckpoint { layers: old_remote_timeline.layers.clone(), metadata: updated_metadata.clone(), }, @@ -1102,10 +1274,7 @@ mod tests { ..old_remote_timeline }; expected_timeline.layers.sort(); - let mut updated_timeline = reuploaded_timelines - .get(&(repo_harness.tenant_id, NO_METADATA_TIMELINE_ID)) - .unwrap() - .clone(); + let mut updated_timeline = reuploaded_timelines.get(&sync_id).unwrap().clone(); updated_timeline.layers.sort(); assert_eq!(expected_timeline, updated_timeline); @@ -1115,6 +1284,7 @@ mod tests { #[tokio::test] async fn test_upload_with_errors() -> anyhow::Result<()> { let repo_harness = RepoHarness::create("test_upload_with_errors")?; + let sync_id = TimelineSyncId(repo_harness.tenant_id, TIMELINE_ID); let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?; let mut remote_timelines = HashMap::new(); @@ -1122,24 +1292,22 @@ mod tests { assert!(!local_path.exists()); assert!(fetch_existing_uploads(&storage).await?.is_empty()); - let timeline_without_local_files = LocalTimeline { - tenant_id: repo_harness.tenant_id, - timeline_id: TIMELINE_ID, + let timeline_without_local_files = NewCheckpoint { layers: vec![local_path], metadata: dummy_metadata(Lsn(0x30)), }; upload_timeline( repo_harness.conf, - &LIMIT, &mut remote_timelines, &storage, + sync_id, timeline_without_local_files.clone(), + 0, ) .await; assert!(fetch_existing_uploads(&storage).await?.is_empty()); - assert_sync_queue_contents(SyncTask::Upload(timeline_without_local_files), true); assert!(!repo_harness.timeline_path(&TIMELINE_ID).exists()); Ok(()) @@ -1148,6 +1316,7 @@ mod tests { #[tokio::test] async fn test_download_timeline() -> anyhow::Result<()> { let repo_harness = RepoHarness::create("test_download_timeline")?; + let sync_id = TimelineSyncId(repo_harness.tenant_id, TIMELINE_ID); let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?; let mut remote_timelines = store_timelines_with_incorrect_metadata(&repo_harness, &storage).await?; @@ -1165,51 +1334,43 @@ mod tests { &repo_harness, &mut remote_timelines, &storage, + TIMELINE_ID, regular_timeline, ) .await; fs::remove_dir_all(®ular_timeline_path)?; - let remote_regular_timeline = remote_timelines - .get(&(repo_harness.tenant_id, TIMELINE_ID)) - .unwrap() - .clone(); + let remote_regular_timeline = remote_timelines.get(&sync_id).unwrap().clone(); download_timeline( repo_harness.conf, - &LIMIT, &storage, + sync_id, remote_regular_timeline.clone(), - true, + 0, ) .await; download_timeline( repo_harness.conf, - &LIMIT, &storage, + sync_id, remote_regular_timeline.clone(), - true, + 0, ) .await; download_timeline( repo_harness.conf, - &LIMIT, &storage, - remote_timelines - .get(&(repo_harness.tenant_id, NO_METADATA_TIMELINE_ID)) - .unwrap() - .clone(), - true, + sync_id, + remote_timelines.get(&sync_id).unwrap().clone(), + 0, ) .await; download_timeline( repo_harness.conf, - &LIMIT, &storage, - remote_timelines - .get(&(repo_harness.tenant_id, CORRUPT_METADATA_TIMELINE_ID)) - .unwrap() - .clone(), - true, + sync_id, + remote_timelines.get(&sync_id).unwrap().clone(), + 0, ) .await; @@ -1220,7 +1381,7 @@ mod tests { assert!(!repo_harness .timeline_path(&CORRUPT_METADATA_TIMELINE_ID) .exists()); - assert_timeline_files_match(&repo_harness, remote_regular_timeline); + assert_timeline_files_match(&repo_harness, TIMELINE_ID, remote_regular_timeline); Ok(()) } @@ -1228,6 +1389,7 @@ mod tests { #[tokio::test] async fn metadata_file_sync() -> anyhow::Result<()> { let repo_harness = RepoHarness::create("metadata_file_sync")?; + let sync_id = TimelineSyncId(repo_harness.tenant_id, TIMELINE_ID); let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?; let mut remote_timelines = HashMap::new(); @@ -1244,10 +1406,11 @@ mod tests { upload_timeline( repo_harness.conf, - &LIMIT, &mut remote_timelines, &storage, + sync_id, new_upload.clone(), + 0, ) .await; assert_timelines_equal( @@ -1255,10 +1418,7 @@ mod tests { fetch_existing_uploads(&storage).await?, ); - let remote_timeline = remote_timelines - .get(&(repo_harness.tenant_id, TIMELINE_ID)) - .unwrap() - .clone(); + let remote_timeline = remote_timelines.get(&sync_id).unwrap().clone(); assert_eq!( remote_timeline.metadata.as_ref(), Some(&uploaded_metadata), @@ -1267,10 +1427,10 @@ mod tests { download_timeline( repo_harness.conf, - &LIMIT, &storage, + sync_id, remote_timeline.clone(), - false, + 0, ) .await; let downloaded_metadata_bytes = tokio::fs::read(&metadata_local_path) @@ -1286,113 +1446,22 @@ mod tests { Ok(()) } - #[test] - fn queue_order_test() { - let repo_harness = RepoHarness::create("queue_order_test").unwrap(); - - let tenant_id = repo_harness.tenant_id; - let timeline_id = TIMELINE_ID; - let layers = Vec::new(); - let smaller_lsn_metadata = dummy_metadata(Lsn(0x200)); - let bigger_lsn_metadata = dummy_metadata(Lsn(0x300)); - assert!(bigger_lsn_metadata > smaller_lsn_metadata); - - for metadata in [bigger_lsn_metadata.clone(), smaller_lsn_metadata.clone()] { - add_to_queue(SyncTask::Upload(LocalTimeline { - tenant_id, - timeline_id, - layers: layers.clone(), - metadata: metadata.clone(), - })); - add_to_queue(SyncTask::Download(RemoteTimeline { - tenant_id, - timeline_id, - layers: layers.clone(), - metadata: Some(metadata.clone()), - })); - add_to_queue(SyncTask::UrgentDownload(RemoteTimeline { - tenant_id, - timeline_id, - layers: layers.clone(), - metadata: Some(metadata), - })); - } - - let mut queue_accessor = SYNC_QUEUE.lock().unwrap(); - let mut ordered_tasks = Vec::with_capacity(queue_accessor.len()); - while let Some(task) = queue_accessor.pop() { - let task_lsn = match &task { - SyncTask::Upload(LocalTimeline { metadata, .. }) => { - Some(metadata.disk_consistent_lsn()) - } - SyncTask::UrgentDownload(remote_timeline) | SyncTask::Download(remote_timeline) => { - remote_timeline.disk_consistent_lsn() - } - }; - - if let Some(task_lsn) = task_lsn { - if task_lsn == smaller_lsn_metadata.disk_consistent_lsn() - || task_lsn == bigger_lsn_metadata.disk_consistent_lsn() - { - ordered_tasks.push(task); - } - } - } - drop(queue_accessor); - - let expected_ordered_tasks = vec![ - SyncTask::UrgentDownload(RemoteTimeline { - tenant_id, - timeline_id, - layers: layers.clone(), - metadata: Some(bigger_lsn_metadata.clone()), - }), - SyncTask::UrgentDownload(RemoteTimeline { - tenant_id, - timeline_id, - layers: layers.clone(), - metadata: Some(smaller_lsn_metadata.clone()), - }), - SyncTask::Upload(LocalTimeline { - tenant_id, - timeline_id, - layers: layers.clone(), - metadata: bigger_lsn_metadata.clone(), - }), - SyncTask::Upload(LocalTimeline { - tenant_id, - timeline_id, - layers: layers.clone(), - metadata: smaller_lsn_metadata.clone(), - }), - SyncTask::Download(RemoteTimeline { - tenant_id, - timeline_id, - layers: layers.clone(), - metadata: Some(bigger_lsn_metadata), - }), - SyncTask::Download(RemoteTimeline { - tenant_id, - timeline_id, - layers, - metadata: Some(smaller_lsn_metadata), - }), - ]; - assert_eq!(expected_ordered_tasks, ordered_tasks); - } - - async fn ensure_correct_timeline_upload<'a>( + #[track_caller] + async fn ensure_correct_timeline_upload( harness: &RepoHarness, - remote_timelines: &'a mut HashMap<(ZTenantId, ZTimelineId), RemoteTimeline>, - remote_storage: &'a LocalFs, - new_upload: LocalTimeline, + remote_timelines: &mut HashMap, + remote_storage: &LocalFs, + timeline_id: ZTimelineId, + new_upload: NewCheckpoint, ) { + let sync_id = TimelineSyncId(harness.tenant_id, timeline_id); upload_timeline( harness.conf, - &LIMIT, remote_timelines, remote_storage, + sync_id, new_upload.clone(), + 0, ) .await; assert_timelines_equal( @@ -1400,12 +1469,7 @@ mod tests { fetch_existing_uploads(remote_storage).await.unwrap(), ); - let new_remote_files = remote_timelines - .get(&(new_upload.tenant_id, new_upload.timeline_id)) - .unwrap() - .clone(); - assert_eq!(new_remote_files.tenant_id, new_upload.tenant_id); - assert_eq!(new_remote_files.timeline_id, new_upload.timeline_id); + let new_remote_files = remote_timelines.get(&sync_id).unwrap().clone(); assert_eq!( new_remote_files.metadata, Some(new_upload.metadata.clone()), @@ -1424,14 +1488,13 @@ mod tests { ); } - assert_timeline_files_match(harness, new_remote_files); - assert_sync_queue_contents(SyncTask::Upload(new_upload), false); + assert_timeline_files_match(harness, timeline_id, new_remote_files); } #[track_caller] fn assert_timelines_equal( - mut expected: HashMap<(ZTenantId, ZTimelineId), RemoteTimeline>, - mut actual: HashMap<(ZTenantId, ZTimelineId), RemoteTimeline>, + mut expected: HashMap, + mut actual: HashMap, ) { let expected_sorted = expected .iter_mut() @@ -1455,37 +1518,19 @@ mod tests { ); } - #[track_caller] - fn assert_sync_queue_contents(task: SyncTask, expected_in_queue: bool) { - let mut queue_accessor = SYNC_QUEUE.lock().unwrap(); - let queue_tasks = queue_accessor.drain().collect::>(); - drop(queue_accessor); - - if expected_in_queue { - assert!( - queue_tasks.contains(&task), - "Sync queue should contain task {:?}", - task - ); - } else { - assert!( - !queue_tasks.contains(&task), - "Sync queue has unexpected task {:?}", - task - ); - } - } - - fn assert_timeline_files_match(harness: &RepoHarness, remote_files: RemoteTimeline) { - let local_timeline_dir = harness.timeline_path(&remote_files.timeline_id); + fn assert_timeline_files_match( + harness: &RepoHarness, + timeline_id: ZTimelineId, + remote_files: RemoteTimeline, + ) { + let local_timeline_dir = harness.timeline_path(&timeline_id); let local_paths = fs::read_dir(&local_timeline_dir) .unwrap() .map(|dir| dir.unwrap().path()) .collect::>(); let mut reported_remote_files = remote_files.layers.into_iter().collect::>(); if let Some(remote_metadata) = remote_files.metadata { - let local_metadata_path = - metadata_path(harness.conf, remote_files.timeline_id, harness.tenant_id); + let local_metadata_path = metadata_path(harness.conf, timeline_id, harness.tenant_id); let local_metadata = TimelineMetadata::from_bytes( &fs::read(&local_metadata_path).expect("Failed to read metadata file when comparing remote and local image files") ).expect("Failed to parse metadata file contents when comparing remote and local image files"); @@ -1527,13 +1572,14 @@ mod tests { async fn store_timelines_with_incorrect_metadata( harness: &RepoHarness, storage: &LocalFs, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let mut remote_timelines = HashMap::new(); ensure_correct_timeline_upload( harness, &mut remote_timelines, storage, + NO_METADATA_TIMELINE_ID, create_local_timeline( harness, NO_METADATA_TIMELINE_ID, @@ -1546,6 +1592,7 @@ mod tests { harness, &mut remote_timelines, storage, + CORRUPT_METADATA_TIMELINE_ID, create_local_timeline( harness, CORRUPT_METADATA_TIMELINE_ID, @@ -1585,7 +1632,7 @@ mod tests { timeline_id: ZTimelineId, filenames: &[&str], metadata: TimelineMetadata, - ) -> anyhow::Result { + ) -> anyhow::Result { let timeline_path = harness.timeline_path(&timeline_id); fs::create_dir_all(&timeline_path)?; @@ -1598,12 +1645,7 @@ mod tests { create_local_metadata(harness, timeline_id, &metadata)?; - Ok(LocalTimeline { - tenant_id: harness.tenant_id, - timeline_id, - layers, - metadata, - }) + Ok(NewCheckpoint { layers, metadata }) } fn create_local_metadata( diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 7057dc9b3b..14047661d1 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -16,6 +16,9 @@ use zenith_utils::zid::ZTimelineId; pub trait Repository: Send + Sync { fn shutdown(&self) -> Result<()>; + /// Stops all timeline-related process in the repository and removes the timeline data from memory. + fn unload_timeline(&self, timeline_id: ZTimelineId) -> Result<()>; + /// Get Timeline handle for given zenith timeline ID. fn get_timeline(&self, timelineid: ZTimelineId) -> Result>; diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index a2689d6bbf..ef35762831 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -9,7 +9,7 @@ use crate::walredo::PostgresRedoManager; use crate::PageServerConf; use anyhow::{anyhow, bail, Context, Result}; use lazy_static::lazy_static; -use log::{debug, info}; +use log::*; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fmt; @@ -49,6 +49,17 @@ pub enum TenantState { Stopping, } +/// A remote storage timeline synchronization event, that needs another step +/// to be fully completed. +#[derive(Debug)] +pub enum PostTimelineSyncStep { + /// The timeline cannot be synchronized anymore due to some sync issues. + /// Needs to be removed from pageserver, to avoid further data diverging. + Evict, + /// A new timeline got downloaded and needs to be loaded into pageserver. + RegisterDownload, +} + impl fmt::Display for TenantState { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { @@ -104,39 +115,57 @@ fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) { tenant.state = TenantState::Idle; } -pub fn register_timeline_download( +pub fn perform_post_timeline_sync_steps( conf: &'static PageServerConf, - tenant_id: ZTenantId, - timeline_id: ZTimelineId, + post_sync_steps: HashMap<(ZTenantId, ZTimelineId), PostTimelineSyncStep>, ) { - log::info!( - "Registering new download, tenant id {}, timeline id: {}", - tenant_id, - timeline_id - ); + if post_sync_steps.is_empty() { + return; + } + + info!("Performing {} post-sync steps", post_sync_steps.len()); + trace!("Steps: {:?}", post_sync_steps); { let mut m = access_tenants(); - let tenant = m.entry(tenant_id).or_insert_with(|| Tenant { - state: TenantState::Downloading, - repo: None, - }); - tenant.state = TenantState::Downloading; - match &tenant.repo { - Some(repo) => { - init_timeline(repo.as_ref(), timeline_id); - tenant.state = TenantState::Idle; - return; + for &(tenant_id, timeline_id) in post_sync_steps.keys() { + let tenant = m.entry(tenant_id).or_insert_with(|| Tenant { + state: TenantState::Downloading, + repo: None, + }); + tenant.state = TenantState::Downloading; + match &tenant.repo { + Some(repo) => { + init_timeline(repo.as_ref(), timeline_id); + tenant.state = TenantState::Idle; + return; + } + None => log::warn!("Initialize new repo"), } - None => log::warn!("Initialize new repo"), + tenant.state = TenantState::Idle; } - tenant.state = TenantState::Idle; } - // init repo updates Tenant state - init_repo(conf, tenant_id); - let new_repo = get_repository_for_tenant(tenant_id).unwrap(); - init_timeline(new_repo.as_ref(), timeline_id); + for ((tenant_id, timeline_id), post_sync_step) in post_sync_steps { + match post_sync_step { + PostTimelineSyncStep::Evict => { + if let Err(e) = get_repository_for_tenant(tenant_id) + .and_then(|repo| repo.unload_timeline(timeline_id)) + { + error!( + "Failed to remove repository for tenant {}, timeline {}: {:#}", + tenant_id, timeline_id, e + ) + } + } + PostTimelineSyncStep::RegisterDownload => { + // init repo updates Tenant state + init_repo(conf, tenant_id); + let new_repo = get_repository_for_tenant(tenant_id).unwrap(); + init_timeline(new_repo.as_ref(), timeline_id); + } + } + } } fn init_timeline(repo: &dyn Repository, timeline_id: ZTimelineId) {