diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index 24c1248304..220d4ef115 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -29,6 +29,7 @@ use http_types::{StatusCode, Url}; use tokio_util::sync::CancellationToken; use tracing::debug; +use crate::RemoteStorageActivity; use crate::{ error::Cancelled, s3_bucket::RequestKind, AzureConfig, ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage, StorageMetadata, @@ -525,6 +526,10 @@ impl RemoteStorage for AzureBlobStorage { // https://learn.microsoft.com/en-us/azure/storage/blobs/point-in-time-restore-overview Err(TimeTravelError::Unimplemented) } + + fn activity(&self) -> RemoteStorageActivity { + self.concurrency_limiter.activity() + } } pin_project_lite::pin_project! { diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 708662f20f..f024021507 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -263,6 +263,17 @@ pub trait RemoteStorage: Send + Sync + 'static { done_if_after: SystemTime, cancel: &CancellationToken, ) -> Result<(), TimeTravelError>; + + /// Query how busy we currently are: may be used by callers which wish to politely + /// back off if there are already a lot of operations underway. + fn activity(&self) -> RemoteStorageActivity; +} + +pub struct RemoteStorageActivity { + pub read_available: usize, + pub read_total: usize, + pub write_available: usize, + pub write_total: usize, } /// DownloadStream is sensitive to the timeout and cancellation used with the original @@ -444,6 +455,15 @@ impl GenericRemoteStorage> { } } } + + pub fn activity(&self) -> RemoteStorageActivity { + match self { + Self::LocalFs(s) => s.activity(), + Self::AwsS3(s) => s.activity(), + Self::AzureBlob(s) => s.activity(), + Self::Unreliable(s) => s.activity(), + } + } } impl GenericRemoteStorage { @@ -774,6 +794,9 @@ struct ConcurrencyLimiter { // The helps to ensure we don't exceed the thresholds. write: Arc, read: Arc, + + write_total: usize, + read_total: usize, } impl ConcurrencyLimiter { @@ -802,10 +825,21 @@ impl ConcurrencyLimiter { Arc::clone(self.for_kind(kind)).acquire_owned().await } + fn activity(&self) -> RemoteStorageActivity { + RemoteStorageActivity { + read_available: self.read.available_permits(), + read_total: self.read_total, + write_available: self.write.available_permits(), + write_total: self.write_total, + } + } + fn new(limit: usize) -> ConcurrencyLimiter { Self { read: Arc::new(Semaphore::new(limit)), write: Arc::new(Semaphore::new(limit)), + read_total: limit, + write_total: limit, } } } diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 1f7bcfc982..f12f6590a3 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -23,8 +23,8 @@ use tokio_util::{io::ReaderStream, sync::CancellationToken}; use utils::crashsafe::path_with_suffix_extension; use crate::{ - Download, DownloadError, Listing, ListingMode, RemotePath, TimeTravelError, TimeoutOrCancel, - REMOTE_STORAGE_PREFIX_SEPARATOR, + Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorageActivity, + TimeTravelError, TimeoutOrCancel, REMOTE_STORAGE_PREFIX_SEPARATOR, }; use super::{RemoteStorage, StorageMetadata}; @@ -605,6 +605,16 @@ impl RemoteStorage for LocalFs { ) -> Result<(), TimeTravelError> { Err(TimeTravelError::Unimplemented) } + + fn activity(&self) -> RemoteStorageActivity { + // LocalFS has no concurrency limiting: give callers the impression that plenty of units are available + RemoteStorageActivity { + read_available: 16, + read_total: 16, + write_available: 16, + write_total: 16, + } + } } fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf { diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index c3d6c75e20..0f6772b274 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -47,8 +47,8 @@ use utils::backoff; use super::StorageMetadata; use crate::{ error::Cancelled, support::PermitCarrying, ConcurrencyLimiter, Download, DownloadError, - Listing, ListingMode, RemotePath, RemoteStorage, S3Config, TimeTravelError, TimeoutOrCancel, - MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR, + Listing, ListingMode, RemotePath, RemoteStorage, RemoteStorageActivity, S3Config, + TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR, }; pub(super) mod metrics; @@ -975,6 +975,10 @@ impl RemoteStorage for S3Bucket { } Ok(()) } + + fn activity(&self) -> RemoteStorageActivity { + self.concurrency_limiter.activity() + } } /// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`]. diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index c467a2d196..66522e04ca 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -12,7 +12,7 @@ use tokio_util::sync::CancellationToken; use crate::{ Download, DownloadError, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorage, - StorageMetadata, TimeTravelError, + RemoteStorageActivity, StorageMetadata, TimeTravelError, }; pub struct UnreliableWrapper { @@ -213,4 +213,8 @@ impl RemoteStorage for UnreliableWrapper { .time_travel_recover(prefix, timestamp, done_if_after, cancel) .await } + + fn activity(&self) -> RemoteStorageActivity { + self.inner.activity() + } } diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 2a8f83be95..c28e041fa2 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -45,10 +45,10 @@ use crate::tenant::{ use camino::Utf8PathBuf; use chrono::format::{DelayedFormat, StrftimeItems}; -use futures::Future; +use futures::{Future, StreamExt}; use pageserver_api::models::SecondaryProgress; use pageserver_api::shard::TenantShardId; -use remote_storage::{DownloadError, Etag, GenericRemoteStorage}; +use remote_storage::{DownloadError, Etag, GenericRemoteStorage, RemoteStorageActivity}; use tokio_util::sync::CancellationToken; use tracing::{info_span, instrument, warn, Instrument}; @@ -71,6 +71,12 @@ use super::{ /// `` const DOWNLOAD_FRESHEN_INTERVAL: Duration = Duration::from_millis(60000); +/// Range of concurrency we may use when downloading layers within a timeline. This is independent +/// for each tenant we're downloading: the concurrency of _tenants_ is defined separately in +/// `PageServerConf::secondary_download_concurrency` +const MAX_LAYER_CONCURRENCY: usize = 16; +const MIN_LAYER_CONCURRENCY: usize = 1; + pub(super) async fn downloader_task( tenant_manager: Arc, remote_storage: GenericRemoteStorage, @@ -79,14 +85,15 @@ pub(super) async fn downloader_task( cancel: CancellationToken, root_ctx: RequestContext, ) { - let concurrency = tenant_manager.get_conf().secondary_download_concurrency; + // How many tenants' secondary download operations we will run concurrently + let tenant_concurrency = tenant_manager.get_conf().secondary_download_concurrency; let generator = SecondaryDownloader { tenant_manager, remote_storage, root_ctx, }; - let mut scheduler = Scheduler::new(generator, concurrency); + let mut scheduler = Scheduler::new(generator, tenant_concurrency); scheduler .run(command_queue, background_jobs_can_start, cancel) @@ -792,6 +799,8 @@ impl<'a> TenantDownloader<'a> { tracing::debug!(timeline_id=%timeline.timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len()); + let mut download_futs = Vec::new(); + // Download heatmap layers that are not present on local disk, or update their // access time if they are already present. for layer in timeline.layers { @@ -874,67 +883,33 @@ impl<'a> TenantDownloader<'a> { } } - // Failpoint for simulating slow remote storage - failpoint_support::sleep_millis_async!( - "secondary-layer-download-sleep", - &self.secondary_state.cancel - ); - - // Note: no backoff::retry wrapper here because download_layer_file does its own retries internally - let downloaded_bytes = match download_layer_file( - self.conf, - self.remote_storage, - *tenant_shard_id, - timeline.timeline_id, - &layer.name, - &LayerFileMetadata::from(&layer.metadata), - &self.secondary_state.cancel, + download_futs.push(self.download_layer( + tenant_shard_id, + &timeline.timeline_id, + layer, ctx, - ) - .await - { - Ok(bytes) => bytes, - Err(DownloadError::NotFound) => { - // A heatmap might be out of date and refer to a layer that doesn't exist any more. - // This is harmless: continue to download the next layer. It is expected during compaction - // GC. - tracing::debug!( - "Skipped downloading missing layer {}, raced with compaction/gc?", - layer.name - ); - continue; + )); + } + + // Break up layer downloads into chunks, so that for each chunk we can re-check how much + // concurrency to use based on activity level of remote storage. + while !download_futs.is_empty() { + let chunk = + download_futs.split_off(download_futs.len().saturating_sub(MAX_LAYER_CONCURRENCY)); + + let concurrency = Self::layer_concurrency(self.remote_storage.activity()); + + let mut result_stream = futures::stream::iter(chunk).buffered(concurrency); + let mut result_stream = std::pin::pin!(result_stream); + while let Some(result) = result_stream.next().await { + match result { + Err(e) => return Err(e), + Ok(None) => { + // No error, but we didn't download the layer. Don't mark it touched + } + Ok(Some(layer)) => touched.push(layer), } - Err(e) => return Err(e.into()), - }; - - if downloaded_bytes != layer.metadata.file_size { - let local_path = local_layer_path( - self.conf, - tenant_shard_id, - &timeline.timeline_id, - &layer.name, - &layer.metadata.generation, - ); - - tracing::warn!( - "Downloaded layer {} with unexpected size {} != {}. Removing download.", - layer.name, - downloaded_bytes, - layer.metadata.file_size - ); - - tokio::fs::remove_file(&local_path) - .await - .or_else(fs_ext::ignore_not_found)?; - } else { - tracing::info!("Downloaded layer {}, size {}", layer.name, downloaded_bytes); - let mut progress = self.secondary_state.progress.lock().unwrap(); - progress.bytes_downloaded += downloaded_bytes; - progress.layers_downloaded += 1; } - - SECONDARY_MODE.download_layer.inc(); - touched.push(layer) } // Write updates to state to record layers we just downloaded or touched. @@ -966,6 +941,90 @@ impl<'a> TenantDownloader<'a> { Ok(()) } + + async fn download_layer( + &self, + tenant_shard_id: &TenantShardId, + timeline_id: &TimelineId, + layer: HeatMapLayer, + ctx: &RequestContext, + ) -> Result, UpdateError> { + // Failpoint for simulating slow remote storage + failpoint_support::sleep_millis_async!( + "secondary-layer-download-sleep", + &self.secondary_state.cancel + ); + + // Note: no backoff::retry wrapper here because download_layer_file does its own retries internally + let downloaded_bytes = match download_layer_file( + self.conf, + self.remote_storage, + *tenant_shard_id, + *timeline_id, + &layer.name, + &LayerFileMetadata::from(&layer.metadata), + &self.secondary_state.cancel, + ctx, + ) + .await + { + Ok(bytes) => bytes, + Err(DownloadError::NotFound) => { + // A heatmap might be out of date and refer to a layer that doesn't exist any more. + // This is harmless: continue to download the next layer. It is expected during compaction + // GC. + tracing::debug!( + "Skipped downloading missing layer {}, raced with compaction/gc?", + layer.name + ); + return Ok(None); + } + Err(e) => return Err(e.into()), + }; + + if downloaded_bytes != layer.metadata.file_size { + let local_path = local_layer_path( + self.conf, + tenant_shard_id, + timeline_id, + &layer.name, + &layer.metadata.generation, + ); + + tracing::warn!( + "Downloaded layer {} with unexpected size {} != {}. Removing download.", + layer.name, + downloaded_bytes, + layer.metadata.file_size + ); + + tokio::fs::remove_file(&local_path) + .await + .or_else(fs_ext::ignore_not_found)?; + } else { + tracing::info!("Downloaded layer {}, size {}", layer.name, downloaded_bytes); + let mut progress = self.secondary_state.progress.lock().unwrap(); + progress.bytes_downloaded += downloaded_bytes; + progress.layers_downloaded += 1; + } + + SECONDARY_MODE.download_layer.inc(); + + Ok(Some(layer)) + } + + /// Calculate the currently allowed parallelism of layer download tasks, based on activity level of the remote storage + fn layer_concurrency(activity: RemoteStorageActivity) -> usize { + // When less than 75% of units are available, use minimum concurrency. Else, do a linear mapping + // of our concurrency range to the units available within the remaining 25%. + let clamp_at = (activity.read_total * 3) / 4; + if activity.read_available > clamp_at { + (MAX_LAYER_CONCURRENCY * (activity.read_available - clamp_at)) + / (activity.read_total - clamp_at) + } else { + MIN_LAYER_CONCURRENCY + } + } } /// Scan local storage and build up Layer objects based on the metadata in a HeatMapTimeline @@ -1092,3 +1151,58 @@ async fn init_timeline_state( detail } + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn layer_concurrency() { + // Totally idle + assert_eq!( + TenantDownloader::layer_concurrency(RemoteStorageActivity { + read_available: 16, + read_total: 16, + write_available: 16, + write_total: 16 + }), + MAX_LAYER_CONCURRENCY + ); + + // Totally busy + assert_eq!( + TenantDownloader::layer_concurrency(RemoteStorageActivity { + read_available: 0, + read_total: 16, + + write_available: 16, + write_total: 16 + }), + MIN_LAYER_CONCURRENCY + ); + + // Edge of the range at which we interpolate + assert_eq!( + TenantDownloader::layer_concurrency(RemoteStorageActivity { + read_available: 12, + read_total: 16, + + write_available: 16, + write_total: 16 + }), + MIN_LAYER_CONCURRENCY + ); + + // Midpoint of the range in which we interpolate + assert_eq!( + TenantDownloader::layer_concurrency(RemoteStorageActivity { + read_available: 14, + read_total: 16, + + write_available: 16, + write_total: 16 + }), + MAX_LAYER_CONCURRENCY / 2 + ); + } +}