diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index 220d4ef115..24c1248304 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -29,7 +29,6 @@ use http_types::{StatusCode, Url}; use tokio_util::sync::CancellationToken; use tracing::debug; -use crate::RemoteStorageActivity; use crate::{ error::Cancelled, s3_bucket::RequestKind, AzureConfig, ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage, StorageMetadata, @@ -526,10 +525,6 @@ impl RemoteStorage for AzureBlobStorage { // https://learn.microsoft.com/en-us/azure/storage/blobs/point-in-time-restore-overview Err(TimeTravelError::Unimplemented) } - - fn activity(&self) -> RemoteStorageActivity { - self.concurrency_limiter.activity() - } } pin_project_lite::pin_project! { diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index f024021507..708662f20f 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -263,17 +263,6 @@ pub trait RemoteStorage: Send + Sync + 'static { done_if_after: SystemTime, cancel: &CancellationToken, ) -> Result<(), TimeTravelError>; - - /// Query how busy we currently are: may be used by callers which wish to politely - /// back off if there are already a lot of operations underway. - fn activity(&self) -> RemoteStorageActivity; -} - -pub struct RemoteStorageActivity { - pub read_available: usize, - pub read_total: usize, - pub write_available: usize, - pub write_total: usize, } /// DownloadStream is sensitive to the timeout and cancellation used with the original @@ -455,15 +444,6 @@ impl GenericRemoteStorage> { } } } - - pub fn activity(&self) -> RemoteStorageActivity { - match self { - Self::LocalFs(s) => s.activity(), - Self::AwsS3(s) => s.activity(), - Self::AzureBlob(s) => s.activity(), - Self::Unreliable(s) => s.activity(), - } - } } impl GenericRemoteStorage { @@ -794,9 +774,6 @@ struct ConcurrencyLimiter { // The helps to ensure we don't exceed the thresholds. write: Arc, read: Arc, - - write_total: usize, - read_total: usize, } impl ConcurrencyLimiter { @@ -825,21 +802,10 @@ impl ConcurrencyLimiter { Arc::clone(self.for_kind(kind)).acquire_owned().await } - fn activity(&self) -> RemoteStorageActivity { - RemoteStorageActivity { - read_available: self.read.available_permits(), - read_total: self.read_total, - write_available: self.write.available_permits(), - write_total: self.write_total, - } - } - fn new(limit: usize) -> ConcurrencyLimiter { Self { read: Arc::new(Semaphore::new(limit)), write: Arc::new(Semaphore::new(limit)), - read_total: limit, - write_total: limit, } } } diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index f12f6590a3..1f7bcfc982 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -23,8 +23,8 @@ use tokio_util::{io::ReaderStream, sync::CancellationToken}; use utils::crashsafe::path_with_suffix_extension; use crate::{ - Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorageActivity, - TimeTravelError, TimeoutOrCancel, REMOTE_STORAGE_PREFIX_SEPARATOR, + Download, DownloadError, Listing, ListingMode, RemotePath, TimeTravelError, TimeoutOrCancel, + REMOTE_STORAGE_PREFIX_SEPARATOR, }; use super::{RemoteStorage, StorageMetadata}; @@ -605,16 +605,6 @@ impl RemoteStorage for LocalFs { ) -> Result<(), TimeTravelError> { Err(TimeTravelError::Unimplemented) } - - fn activity(&self) -> RemoteStorageActivity { - // LocalFS has no concurrency limiting: give callers the impression that plenty of units are available - RemoteStorageActivity { - read_available: 16, - read_total: 16, - write_available: 16, - write_total: 16, - } - } } fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf { diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 0f6772b274..c3d6c75e20 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -47,8 +47,8 @@ use utils::backoff; use super::StorageMetadata; use crate::{ error::Cancelled, support::PermitCarrying, ConcurrencyLimiter, Download, DownloadError, - Listing, ListingMode, RemotePath, RemoteStorage, RemoteStorageActivity, S3Config, - TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR, + Listing, ListingMode, RemotePath, RemoteStorage, S3Config, TimeTravelError, TimeoutOrCancel, + MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR, }; pub(super) mod metrics; @@ -975,10 +975,6 @@ impl RemoteStorage for S3Bucket { } Ok(()) } - - fn activity(&self) -> RemoteStorageActivity { - self.concurrency_limiter.activity() - } } /// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`]. diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index 66522e04ca..c467a2d196 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -12,7 +12,7 @@ use tokio_util::sync::CancellationToken; use crate::{ Download, DownloadError, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorage, - RemoteStorageActivity, StorageMetadata, TimeTravelError, + StorageMetadata, TimeTravelError, }; pub struct UnreliableWrapper { @@ -213,8 +213,4 @@ impl RemoteStorage for UnreliableWrapper { .time_travel_recover(prefix, timestamp, done_if_after, cancel) .await } - - fn activity(&self) -> RemoteStorageActivity { - self.inner.activity() - } } diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 789f1a0fa9..0ec1bd649b 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -45,10 +45,10 @@ use crate::tenant::{ use camino::Utf8PathBuf; use chrono::format::{DelayedFormat, StrftimeItems}; -use futures::{Future, StreamExt}; +use futures::Future; use pageserver_api::models::SecondaryProgress; use pageserver_api::shard::TenantShardId; -use remote_storage::{DownloadError, Etag, GenericRemoteStorage, RemoteStorageActivity}; +use remote_storage::{DownloadError, Etag, GenericRemoteStorage}; use tokio_util::sync::CancellationToken; use tracing::{info_span, instrument, warn, Instrument}; @@ -67,12 +67,6 @@ use super::{ /// download, if the uploader populated it. const DEFAULT_DOWNLOAD_INTERVAL: Duration = Duration::from_millis(60000); -/// Range of concurrency we may use when downloading layers within a timeline. This is independent -/// for each tenant we're downloading: the concurrency of _tenants_ is defined separately in -/// `PageServerConf::secondary_download_concurrency` -const MAX_LAYER_CONCURRENCY: usize = 16; -const MIN_LAYER_CONCURRENCY: usize = 1; - pub(super) async fn downloader_task( tenant_manager: Arc, remote_storage: GenericRemoteStorage, @@ -81,15 +75,14 @@ pub(super) async fn downloader_task( cancel: CancellationToken, root_ctx: RequestContext, ) { - // How many tenants' secondary download operations we will run concurrently - let tenant_concurrency = tenant_manager.get_conf().secondary_download_concurrency; + let concurrency = tenant_manager.get_conf().secondary_download_concurrency; let generator = SecondaryDownloader { tenant_manager, remote_storage, root_ctx, }; - let mut scheduler = Scheduler::new(generator, tenant_concurrency); + let mut scheduler = Scheduler::new(generator, concurrency); scheduler .run(command_queue, background_jobs_can_start, cancel) @@ -414,7 +407,7 @@ impl JobGenerator { - tracing::debug!("Shut down while downloading"); + tracing::info!("Shut down while downloading"); }, Err(UpdateError::Deserialize(e)) => { tracing::error!("Corrupt content while downloading tenant: {e}"); @@ -848,8 +841,6 @@ impl<'a> TenantDownloader<'a> { tracing::debug!(timeline_id=%timeline.timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len()); - let mut download_futs = Vec::new(); - // Download heatmap layers that are not present on local disk, or update their // access time if they are already present. for layer in timeline.layers { @@ -922,31 +913,14 @@ impl<'a> TenantDownloader<'a> { } } - download_futs.push(self.download_layer( - tenant_shard_id, - &timeline.timeline_id, - layer, - ctx, - )); - } - - // Break up layer downloads into chunks, so that for each chunk we can re-check how much - // concurrency to use based on activity level of remote storage. - while !download_futs.is_empty() { - let chunk = - download_futs.split_off(download_futs.len().saturating_sub(MAX_LAYER_CONCURRENCY)); - - let concurrency = Self::layer_concurrency(self.remote_storage.activity()); - - let mut result_stream = futures::stream::iter(chunk).buffered(concurrency); - let mut result_stream = std::pin::pin!(result_stream); - while let Some(result) = result_stream.next().await { - match result { - Err(e) => return Err(e), - Ok(None) => { - // No error, but we didn't download the layer. Don't mark it touched - } - Ok(Some(layer)) => touched.push(layer), + match self + .download_layer(tenant_shard_id, &timeline.timeline_id, layer, ctx) + .await? + { + Some(layer) => touched.push(layer), + None => { + // Not an error but we didn't download it: remote layer is missing. Don't add it to the list of + // things to consider touched. } } } @@ -1081,19 +1055,6 @@ impl<'a> TenantDownloader<'a> { Ok(Some(layer)) } - - /// Calculate the currently allowed parallelism of layer download tasks, based on activity level of the remote storage - fn layer_concurrency(activity: RemoteStorageActivity) -> usize { - // When less than 75% of units are available, use minimum concurrency. Else, do a linear mapping - // of our concurrency range to the units available within the remaining 25%. - let clamp_at = (activity.read_total * 3) / 4; - if activity.read_available > clamp_at { - (MAX_LAYER_CONCURRENCY * (activity.read_available - clamp_at)) - / (activity.read_total - clamp_at) - } else { - MIN_LAYER_CONCURRENCY - } - } } /// Scan local storage and build up Layer objects based on the metadata in a HeatMapTimeline @@ -1217,58 +1178,3 @@ async fn init_timeline_state( detail } - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn layer_concurrency() { - // Totally idle - assert_eq!( - TenantDownloader::layer_concurrency(RemoteStorageActivity { - read_available: 16, - read_total: 16, - write_available: 16, - write_total: 16 - }), - MAX_LAYER_CONCURRENCY - ); - - // Totally busy - assert_eq!( - TenantDownloader::layer_concurrency(RemoteStorageActivity { - read_available: 0, - read_total: 16, - - write_available: 16, - write_total: 16 - }), - MIN_LAYER_CONCURRENCY - ); - - // Edge of the range at which we interpolate - assert_eq!( - TenantDownloader::layer_concurrency(RemoteStorageActivity { - read_available: 12, - read_total: 16, - - write_available: 16, - write_total: 16 - }), - MIN_LAYER_CONCURRENCY - ); - - // Midpoint of the range in which we interpolate - assert_eq!( - TenantDownloader::layer_concurrency(RemoteStorageActivity { - read_available: 14, - read_total: 16, - - write_available: 16, - write_total: 16 - }), - MAX_LAYER_CONCURRENCY / 2 - ); - } -}