diff --git a/Cargo.lock b/Cargo.lock index 978cd20d12..3797e4e76b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1899,7 +1899,7 @@ dependencies = [ "libc", "log", "nix", - "parking_lot", + "parking_lot 0.11.2", "symbolic-demangle", "tempfile", "thiserror", diff --git a/docs/settings.md b/docs/settings.md index 69aadc602f..530876a42a 100644 --- a/docs/settings.md +++ b/docs/settings.md @@ -156,6 +156,9 @@ access_key_id = 'SOMEKEYAAAAASADSAH*#' # Secret access key to connect to the bucket ("password" part of the credentials) secret_access_key = 'SOMEsEcReTsd292v' + +# S3 API query limit to avoid getting errors/throttling from AWS. +concurrency_limit = 100 ``` ###### General remote storage configuration @@ -167,8 +170,8 @@ Besides, there are parameters common for all types of remote storage that can be ```toml [remote_storage] -# Max number of concurrent connections to open for uploading to or downloading from the remote storage. -max_concurrent_sync = 100 +# Max number of concurrent timeline synchronized (layers uploaded or downloaded) with the remote storage at the same time. +max_concurrent_timelines_sync = 50 # Max number of errors a single task can have before it's considered failed and not attempted to run anymore. max_sync_errors = 10 diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index df4d9910ee..8bfe8b57ec 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -4,8 +4,7 @@ //! file, or on the command line. //! See also `settings.md` for better description on every parameter. -use anyhow::{bail, ensure, Context, Result}; -use std::convert::TryInto; +use anyhow::{anyhow, bail, ensure, Context, Result}; use std::env; use std::num::{NonZeroU32, NonZeroUsize}; use std::path::{Path, PathBuf}; @@ -34,8 +33,18 @@ pub mod defaults { pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s"; pub const DEFAULT_SUPERUSER: &str = "zenith_admin"; - pub const DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNC: usize = 10; + /// How many different timelines can be processed simultaneously when synchronizing layers with the remote storage. + /// During regular work, pageserver produces one layer file per timeline checkpoint, with bursts of concurrency + /// during start (where local and remote timelines are compared and initial sync tasks are scheduled) and timeline attach. + /// Both cases may trigger timeline download, that might download a lot of layers. This concurrency is limited by the clients internally, if needed. + pub const DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_TIMELINES_SYNC: usize = 50; pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10; + /// Currently, sync happens with AWS S3, that has two limits on requests per second: + /// ~200 RPS for IAM services + /// https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html + /// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests + /// https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/ + pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100; pub const DEFAULT_PAGE_CACHE_SIZE: usize = 8192; pub const DEFAULT_MAX_FILE_DESCRIPTORS: usize = 100; @@ -127,7 +136,7 @@ impl FromStr for ProfilingConfig { let result = match s { "disabled" => ProfilingConfig::Disabled, "page_requests" => ProfilingConfig::PageRequests, - _ => bail!("invalid value \"{}\" for profiling option, valid values are \"disabled\" and \"page_requests\"", s), + _ => bail!("invalid value \"{s}\" for profiling option, valid values are \"disabled\" and \"page_requests\""), }; Ok(result) } @@ -269,36 +278,36 @@ impl PageServerConfigBuilder { Ok(PageServerConf { listen_pg_addr: self .listen_pg_addr - .ok_or(anyhow::anyhow!("missing listen_pg_addr"))?, + .ok_or(anyhow!("missing listen_pg_addr"))?, listen_http_addr: self .listen_http_addr - .ok_or(anyhow::anyhow!("missing listen_http_addr"))?, + .ok_or(anyhow!("missing listen_http_addr"))?, wait_lsn_timeout: self .wait_lsn_timeout - .ok_or(anyhow::anyhow!("missing wait_lsn_timeout"))?, + .ok_or(anyhow!("missing wait_lsn_timeout"))?, wal_redo_timeout: self .wal_redo_timeout - .ok_or(anyhow::anyhow!("missing wal_redo_timeout"))?, - superuser: self.superuser.ok_or(anyhow::anyhow!("missing superuser"))?, + .ok_or(anyhow!("missing wal_redo_timeout"))?, + superuser: self.superuser.ok_or(anyhow!("missing superuser"))?, page_cache_size: self .page_cache_size - .ok_or(anyhow::anyhow!("missing page_cache_size"))?, + .ok_or(anyhow!("missing page_cache_size"))?, max_file_descriptors: self .max_file_descriptors - .ok_or(anyhow::anyhow!("missing max_file_descriptors"))?, - workdir: self.workdir.ok_or(anyhow::anyhow!("missing workdir"))?, + .ok_or(anyhow!("missing max_file_descriptors"))?, + workdir: self.workdir.ok_or(anyhow!("missing workdir"))?, pg_distrib_dir: self .pg_distrib_dir - .ok_or(anyhow::anyhow!("missing pg_distrib_dir"))?, - auth_type: self.auth_type.ok_or(anyhow::anyhow!("missing auth_type"))?, + .ok_or(anyhow!("missing pg_distrib_dir"))?, + auth_type: self.auth_type.ok_or(anyhow!("missing auth_type"))?, auth_validation_public_key_path: self .auth_validation_public_key_path - .ok_or(anyhow::anyhow!("missing auth_validation_public_key_path"))?, + .ok_or(anyhow!("missing auth_validation_public_key_path"))?, remote_storage_config: self .remote_storage_config - .ok_or(anyhow::anyhow!("missing remote_storage_config"))?, - id: self.id.ok_or(anyhow::anyhow!("missing id"))?, - profiling: self.profiling.ok_or(anyhow::anyhow!("missing profiling"))?, + .ok_or(anyhow!("missing remote_storage_config"))?, + id: self.id.ok_or(anyhow!("missing id"))?, + profiling: self.profiling.ok_or(anyhow!("missing profiling"))?, // TenantConf is handled separately default_tenant_conf: TenantConf::default(), }) @@ -309,7 +318,7 @@ impl PageServerConfigBuilder { #[derive(Debug, Clone, PartialEq, Eq)] pub struct RemoteStorageConfig { /// Max allowed number of concurrent sync operations between pageserver and the remote storage. - pub max_concurrent_sync: NonZeroUsize, + pub max_concurrent_timelines_sync: NonZeroUsize, /// Max allowed errors before the sync task is considered failed and evicted. pub max_sync_errors: NonZeroU32, /// The storage connection configuration. @@ -350,6 +359,9 @@ pub struct S3Config { /// /// Example: `http://127.0.0.1:5000` pub endpoint: Option, + /// AWS S3 has various limits on its API calls, we need not to exceed those. + /// See [`defaults::DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT`] for more details. + pub concurrency_limit: NonZeroUsize, } impl std::fmt::Debug for S3Config { @@ -358,6 +370,7 @@ impl std::fmt::Debug for S3Config { .field("bucket_name", &self.bucket_name) .field("bucket_region", &self.bucket_region) .field("prefix_in_bucket", &self.prefix_in_bucket) + .field("concurrency_limit", &self.concurrency_limit) .finish() } } @@ -431,7 +444,7 @@ impl PageServerConf { } "id" => builder.id(ZNodeId(parse_toml_u64(key, item)?)), "profiling" => builder.profiling(parse_toml_from_str(key, item)?), - _ => bail!("unrecognized pageserver option '{}'", key), + _ => bail!("unrecognized pageserver option '{key}'"), } } @@ -509,32 +522,23 @@ impl PageServerConf { let bucket_name = toml.get("bucket_name"); let bucket_region = toml.get("bucket_region"); - let max_concurrent_sync: NonZeroUsize = if let Some(s) = toml.get("max_concurrent_sync") { - parse_toml_u64("max_concurrent_sync", s) - .and_then(|toml_u64| { - toml_u64.try_into().with_context(|| { - format!("'max_concurrent_sync' value {} is too large", toml_u64) - }) - }) - .ok() - .and_then(NonZeroUsize::new) - .context("'max_concurrent_sync' must be a non-zero positive integer")? - } else { - NonZeroUsize::new(defaults::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNC).unwrap() - }; - let max_sync_errors: NonZeroU32 = if let Some(s) = toml.get("max_sync_errors") { - parse_toml_u64("max_sync_errors", s) - .and_then(|toml_u64| { - toml_u64.try_into().with_context(|| { - format!("'max_sync_errors' value {} is too large", toml_u64) - }) - }) - .ok() - .and_then(NonZeroU32::new) - .context("'max_sync_errors' must be a non-zero positive integer")? - } else { - NonZeroU32::new(defaults::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS).unwrap() - }; + let max_concurrent_timelines_sync = NonZeroUsize::new( + parse_optional_integer("max_concurrent_timelines_sync", toml)? + .unwrap_or(defaults::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_TIMELINES_SYNC), + ) + .context("Failed to parse 'max_concurrent_timelines_sync' as a positive integer")?; + + let max_sync_errors = NonZeroU32::new( + parse_optional_integer("max_sync_errors", toml)? + .unwrap_or(defaults::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS), + ) + .context("Failed to parse 'max_sync_errors' as a positive integer")?; + + let concurrency_limit = NonZeroUsize::new( + parse_optional_integer("concurrency_limit", toml)? + .unwrap_or(defaults::DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT), + ) + .context("Failed to parse 'concurrency_limit' as a positive integer")?; let storage = match (local_path, bucket_name, bucket_region) { (None, None, None) => bail!("no 'local_path' nor 'bucket_name' option"), @@ -565,6 +569,7 @@ impl PageServerConf { .get("endpoint") .map(|endpoint| parse_toml_string("endpoint", endpoint)) .transpose()?, + concurrency_limit, }), (Some(local_path), None, None) => RemoteStorageKind::LocalFs(PathBuf::from( parse_toml_string("local_path", local_path)?, @@ -573,7 +578,7 @@ impl PageServerConf { }; Ok(RemoteStorageConfig { - max_concurrent_sync, + max_concurrent_timelines_sync, max_sync_errors, storage, }) @@ -581,7 +586,7 @@ impl PageServerConf { #[cfg(test)] pub fn test_repo_dir(test_name: &str) -> PathBuf { - PathBuf::from(format!("../tmp_check/test_{}", test_name)) + PathBuf::from(format!("../tmp_check/test_{test_name}")) } #[cfg(test)] @@ -611,7 +616,7 @@ impl PageServerConf { fn parse_toml_string(name: &str, item: &Item) -> Result { let s = item .as_str() - .with_context(|| format!("configure option {} is not a string", name))?; + .with_context(|| format!("configure option {name} is not a string"))?; Ok(s.to_string()) } @@ -620,17 +625,34 @@ fn parse_toml_u64(name: &str, item: &Item) -> Result { // for our use, though. let i: i64 = item .as_integer() - .with_context(|| format!("configure option {} is not an integer", name))?; + .with_context(|| format!("configure option {name} is not an integer"))?; if i < 0 { - bail!("configure option {} cannot be negative", name); + bail!("configure option {name} cannot be negative"); } Ok(i as u64) } +fn parse_optional_integer(name: &str, item: &toml_edit::Item) -> anyhow::Result> +where + I: TryFrom, + E: std::error::Error + Send + Sync + 'static, +{ + let toml_integer = match item.get(name) { + Some(item) => item + .as_integer() + .with_context(|| format!("configure option {name} is not an integer"))?, + None => return Ok(None), + }; + + I::try_from(toml_integer) + .map(Some) + .with_context(|| format!("configure option {name} is too large")) +} + fn parse_toml_duration(name: &str, item: &Item) -> Result { let s = item .as_str() - .with_context(|| format!("configure option {} is not a string", name))?; + .with_context(|| format!("configure option {name} is not a string"))?; Ok(humantime::parse_duration(s)?) } @@ -641,7 +663,7 @@ where { let v = item .as_str() - .with_context(|| format!("configure option {} is not a string", name))?; + .with_context(|| format!("configure option {name} is not a string"))?; T::from_str(v) } @@ -679,10 +701,8 @@ id = 10 let config_string = format!("pg_distrib_dir='{}'\nid=10", pg_distrib_dir.display()); let toml = config_string.parse()?; - let parsed_config = - PageServerConf::parse_and_validate(&toml, &workdir).unwrap_or_else(|e| { - panic!("Failed to parse config '{}', reason: {}", config_string, e) - }); + let parsed_config = PageServerConf::parse_and_validate(&toml, &workdir) + .unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e}")); assert_eq!( parsed_config, @@ -715,16 +735,13 @@ id = 10 let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?; let config_string = format!( - "{}pg_distrib_dir='{}'", - ALL_BASE_VALUES_TOML, + "{ALL_BASE_VALUES_TOML}pg_distrib_dir='{}'", pg_distrib_dir.display() ); let toml = config_string.parse()?; - let parsed_config = - PageServerConf::parse_and_validate(&toml, &workdir).unwrap_or_else(|e| { - panic!("Failed to parse config '{}', reason: {}", config_string, e) - }); + let parsed_config = PageServerConf::parse_and_validate(&toml, &workdir) + .unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e}")); assert_eq!( parsed_config, @@ -772,37 +789,33 @@ local_path = '{}'"#, for remote_storage_config_str in identical_toml_declarations { let config_string = format!( - r#"{} + r#"{ALL_BASE_VALUES_TOML} pg_distrib_dir='{}' -{}"#, - ALL_BASE_VALUES_TOML, +{remote_storage_config_str}"#, pg_distrib_dir.display(), - remote_storage_config_str, ); let toml = config_string.parse()?; let parsed_remote_storage_config = PageServerConf::parse_and_validate(&toml, &workdir) - .unwrap_or_else(|e| { - panic!("Failed to parse config '{}', reason: {}", config_string, e) - }) + .unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e}")) .remote_storage_config .expect("Should have remote storage config for the local FS"); assert_eq!( - parsed_remote_storage_config, - RemoteStorageConfig { - max_concurrent_sync: NonZeroUsize::new( - defaults::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNC - ) - .unwrap(), - max_sync_errors: NonZeroU32::new(defaults::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS) + parsed_remote_storage_config, + RemoteStorageConfig { + max_concurrent_timelines_sync: NonZeroUsize::new( + defaults::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_TIMELINES_SYNC + ) .unwrap(), - storage: RemoteStorageKind::LocalFs(local_storage_path.clone()), - }, - "Remote storage config should correctly parse the local FS config and fill other storage defaults" - ); + max_sync_errors: NonZeroU32::new(defaults::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS) + .unwrap(), + storage: RemoteStorageKind::LocalFs(local_storage_path.clone()), + }, + "Remote storage config should correctly parse the local FS config and fill other storage defaults" + ); } Ok(()) } @@ -818,52 +831,49 @@ pg_distrib_dir='{}' let access_key_id = "SOMEKEYAAAAASADSAH*#".to_string(); let secret_access_key = "SOMEsEcReTsd292v".to_string(); let endpoint = "http://localhost:5000".to_string(); - let max_concurrent_sync = NonZeroUsize::new(111).unwrap(); + let max_concurrent_timelines_sync = NonZeroUsize::new(111).unwrap(); let max_sync_errors = NonZeroU32::new(222).unwrap(); + let s3_concurrency_limit = NonZeroUsize::new(333).unwrap(); let identical_toml_declarations = &[ format!( r#"[remote_storage] -max_concurrent_sync = {} -max_sync_errors = {} -bucket_name = '{}' -bucket_region = '{}' -prefix_in_bucket = '{}' -access_key_id = '{}' -secret_access_key = '{}' -endpoint = '{}'"#, - max_concurrent_sync, max_sync_errors, bucket_name, bucket_region, prefix_in_bucket, access_key_id, secret_access_key, endpoint +max_concurrent_timelines_sync = {max_concurrent_timelines_sync} +max_sync_errors = {max_sync_errors} +bucket_name = '{bucket_name}' +bucket_region = '{bucket_region}' +prefix_in_bucket = '{prefix_in_bucket}' +access_key_id = '{access_key_id}' +secret_access_key = '{secret_access_key}' +endpoint = '{endpoint}' +concurrency_limit = {s3_concurrency_limit}"# ), format!( - "remote_storage={{max_concurrent_sync={}, max_sync_errors={}, bucket_name='{}', bucket_region='{}', prefix_in_bucket='{}', access_key_id='{}', secret_access_key='{}', endpoint='{}'}}", - max_concurrent_sync, max_sync_errors, bucket_name, bucket_region, prefix_in_bucket, access_key_id, secret_access_key, endpoint + "remote_storage={{max_concurrent_timelines_sync={max_concurrent_timelines_sync}, max_sync_errors={max_sync_errors}, bucket_name='{bucket_name}',\ + bucket_region='{bucket_region}', prefix_in_bucket='{prefix_in_bucket}', access_key_id='{access_key_id}', secret_access_key='{secret_access_key}', endpoint='{endpoint}', concurrency_limit={s3_concurrency_limit}}}", ), ]; for remote_storage_config_str in identical_toml_declarations { let config_string = format!( - r#"{} + r#"{ALL_BASE_VALUES_TOML} pg_distrib_dir='{}' -{}"#, - ALL_BASE_VALUES_TOML, +{remote_storage_config_str}"#, pg_distrib_dir.display(), - remote_storage_config_str, ); let toml = config_string.parse()?; let parsed_remote_storage_config = PageServerConf::parse_and_validate(&toml, &workdir) - .unwrap_or_else(|e| { - panic!("Failed to parse config '{}', reason: {}", config_string, e) - }) + .unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e}")) .remote_storage_config .expect("Should have remote storage config for S3"); assert_eq!( parsed_remote_storage_config, RemoteStorageConfig { - max_concurrent_sync, + max_concurrent_timelines_sync, max_sync_errors, storage: RemoteStorageKind::AwsS3(S3Config { bucket_name: bucket_name.clone(), @@ -871,7 +881,8 @@ pg_distrib_dir='{}' access_key_id: Some(access_key_id.clone()), secret_access_key: Some(secret_access_key.clone()), prefix_in_bucket: Some(prefix_in_bucket.clone()), - endpoint: Some(endpoint.clone()) + endpoint: Some(endpoint.clone()), + concurrency_limit: s3_concurrency_limit, }), }, "Remote storage config should correctly parse the S3 config" diff --git a/pageserver/src/remote_storage.rs b/pageserver/src/remote_storage.rs index 8a09f7b9ca..39595b7167 100644 --- a/pageserver/src/remote_storage.rs +++ b/pageserver/src/remote_storage.rs @@ -161,7 +161,7 @@ pub fn start_local_timeline_sync( config, local_timeline_files, LocalFs::new(root.clone(), &config.workdir)?, - storage_config.max_concurrent_sync, + storage_config.max_concurrent_timelines_sync, storage_config.max_sync_errors, ) }, @@ -172,7 +172,7 @@ pub fn start_local_timeline_sync( config, local_timeline_files, S3Bucket::new(s3_config, &config.workdir)?, - storage_config.max_concurrent_sync, + storage_config.max_concurrent_timelines_sync, storage_config.max_sync_errors, ) }, diff --git a/pageserver/src/remote_storage/s3_bucket.rs b/pageserver/src/remote_storage/s3_bucket.rs index b69634a1b6..73d828d150 100644 --- a/pageserver/src/remote_storage/s3_bucket.rs +++ b/pageserver/src/remote_storage/s3_bucket.rs @@ -15,7 +15,7 @@ use rusoto_s3::{ DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, StreamingBody, S3, }; -use tokio::io; +use tokio::{io, sync::Semaphore}; use tokio_util::io::ReaderStream; use tracing::debug; @@ -65,6 +65,10 @@ pub struct S3Bucket { client: S3Client, bucket_name: String, prefix_in_bucket: Option, + // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded. + // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold. + // The helps to ensure we don't exceed the thresholds. + concurrency_limiter: Semaphore, } impl S3Bucket { @@ -119,6 +123,7 @@ impl S3Bucket { pageserver_workdir, bucket_name: aws_config.bucket_name.clone(), prefix_in_bucket, + concurrency_limiter: Semaphore::new(aws_config.concurrency_limit.get()), }) } } @@ -147,6 +152,11 @@ impl RemoteStorage for S3Bucket { let mut continuation_token = None; loop { + let _guard = self + .concurrency_limiter + .acquire() + .await + .context("Concurrency limiter semaphore got closed during S3 list")?; let fetch_response = self .client .list_objects_v2(ListObjectsV2Request { @@ -180,6 +190,11 @@ impl RemoteStorage for S3Bucket { to: &Self::StoragePath, metadata: Option, ) -> anyhow::Result<()> { + let _guard = self + .concurrency_limiter + .acquire() + .await + .context("Concurrency limiter semaphore got closed during S3 upload")?; self.client .put_object(PutObjectRequest { body: Some(StreamingBody::new_with_size( @@ -200,6 +215,11 @@ impl RemoteStorage for S3Bucket { from: &Self::StoragePath, to: &mut (impl io::AsyncWrite + Unpin + Send + Sync), ) -> anyhow::Result> { + let _guard = self + .concurrency_limiter + .acquire() + .await + .context("Concurrency limiter semaphore got closed during S3 download")?; let object_output = self .client .get_object(GetObjectRequest { @@ -231,6 +251,11 @@ impl RemoteStorage for S3Bucket { Some(end_inclusive) => format!("bytes={}-{}", start_inclusive, end_inclusive), None => format!("bytes={}-", start_inclusive), }); + let _guard = self + .concurrency_limiter + .acquire() + .await + .context("Concurrency limiter semaphore got closed during S3 range download")?; let object_output = self .client .get_object(GetObjectRequest { @@ -250,6 +275,11 @@ impl RemoteStorage for S3Bucket { } async fn delete(&self, path: &Self::StoragePath) -> anyhow::Result<()> { + let _guard = self + .concurrency_limiter + .acquire() + .await + .context("Concurrency limiter semaphore got closed during S3 delete")?; self.client .delete_object(DeleteObjectRequest { bucket: self.bucket_name.clone(), @@ -433,6 +463,7 @@ mod tests { client: S3Client::new("us-east-1".parse().unwrap()), bucket_name: "dummy-bucket".to_string(), prefix_in_bucket: Some("dummy_prefix/".to_string()), + concurrency_limiter: Semaphore::new(1), } } diff --git a/pageserver/src/remote_storage/storage_sync.rs b/pageserver/src/remote_storage/storage_sync.rs index 4d1ec2e225..20012f32d7 100644 --- a/pageserver/src/remote_storage/storage_sync.rs +++ b/pageserver/src/remote_storage/storage_sync.rs @@ -62,7 +62,7 @@ pub mod index; mod upload; use std::{ - collections::{hash_map, HashMap, HashSet, VecDeque}, + collections::{HashMap, HashSet, VecDeque}, fmt::Debug, num::{NonZeroU32, NonZeroUsize}, ops::ControlFlow, @@ -132,7 +132,9 @@ lazy_static! { /// mpsc approach was picked to allow blocking the sync loop if no tasks are present, to avoid meaningless spinning. mod sync_queue { use std::{ - collections::{hash_map, HashMap}, + collections::{hash_map, HashMap, HashSet}, + num::NonZeroUsize, + ops::ControlFlow, sync::atomic::{AtomicUsize, Ordering}, }; @@ -179,7 +181,7 @@ mod sync_queue { /// Polls a new task from the queue, using its receiver counterpart. /// Does not block if the queue is empty, returning [`None`] instead. /// Needed to correctly track the queue length. - pub async fn next_task( + async fn next_task( receiver: &mut UnboundedReceiver<(ZTenantTimelineId, SyncTask)>, ) -> Option<(ZTenantTimelineId, SyncTask)> { let task = receiver.recv().await; @@ -195,15 +197,29 @@ mod sync_queue { /// or two (download and upload, if both were found in the queue during batch construction). pub async fn next_task_batch( receiver: &mut UnboundedReceiver<(ZTenantTimelineId, SyncTask)>, - mut max_batch_size: usize, - ) -> HashMap { - if max_batch_size == 0 { - return HashMap::new(); - } - let mut tasks: HashMap = - HashMap::with_capacity(max_batch_size); + max_timelines_to_sync: NonZeroUsize, + ) -> ControlFlow<(), HashMap> { + // request the first task in blocking fashion to do less meaningless work + let (first_sync_id, first_task) = if let Some(first_task) = next_task(receiver).await { + first_task + } else { + debug!("Queue sender part was dropped, aborting"); + return ControlFlow::Break(()); + }; + + let max_timelines_to_sync = max_timelines_to_sync.get(); + let mut batched_timelines = HashSet::with_capacity(max_timelines_to_sync); + batched_timelines.insert(first_sync_id.timeline_id); + + let mut tasks = HashMap::new(); + tasks.insert(first_sync_id, first_task); loop { + if batched_timelines.len() >= max_timelines_to_sync { + debug!("Filled a full task batch with {max_timelines_to_sync} timeline sync operations"); + break; + } + match receiver.try_recv() { Ok((sync_id, new_task)) => { LENGTH.fetch_sub(1, Ordering::Relaxed); @@ -216,24 +232,23 @@ mod sync_queue { v.insert(new_task); } } - - max_batch_size -= 1; - if max_batch_size == 0 { - break; - } + batched_timelines.insert(sync_id.timeline_id); } Err(TryRecvError::Disconnected) => { debug!("Sender disconnected, batch collection aborted"); break; } Err(TryRecvError::Empty) => { - debug!("No more data in the sync queue, task batch is not full"); + debug!( + "No more data in the sync queue, task batch is not full, length: {}, max allowed size: {max_timelines_to_sync}", + batched_timelines.len() + ); break; } } } - tasks + ControlFlow::Continue(tasks) } /// Length of the queue, assuming that all receiver counterparts were only called using the queue api. @@ -455,7 +470,7 @@ pub(super) fn spawn_storage_sync_thread( conf: &'static PageServerConf, local_timeline_files: HashMap)>, storage: S, - max_concurrent_sync: NonZeroUsize, + max_concurrent_timelines_sync: NonZeroUsize, max_sync_errors: NonZeroU32, ) -> anyhow::Result where @@ -497,7 +512,7 @@ where receiver, Arc::new(storage), loop_index, - max_concurrent_sync, + max_concurrent_timelines_sync, max_sync_errors, ); Ok(()) @@ -517,7 +532,7 @@ fn storage_sync_loop( mut receiver: UnboundedReceiver<(ZTenantTimelineId, SyncTask)>, storage: Arc, index: RemoteIndex, - max_concurrent_sync: NonZeroUsize, + max_concurrent_timelines_sync: NonZeroUsize, max_sync_errors: NonZeroU32, ) where P: Debug + Send + Sync + 'static, @@ -534,7 +549,7 @@ fn storage_sync_loop( &mut receiver, storage, loop_index, - max_concurrent_sync, + max_concurrent_timelines_sync, max_sync_errors, ) .instrument(info_span!("storage_sync_loop_step")) => step, @@ -568,34 +583,19 @@ async fn loop_step( receiver: &mut UnboundedReceiver<(ZTenantTimelineId, SyncTask)>, storage: Arc, index: RemoteIndex, - max_concurrent_sync: NonZeroUsize, + max_concurrent_timelines_sync: NonZeroUsize, max_sync_errors: NonZeroU32, ) -> ControlFlow<(), HashMap>> where P: Debug + Send + Sync + 'static, S: RemoteStorage + Send + Sync + 'static, { - let max_concurrent_sync = max_concurrent_sync.get(); - - // request the first task in blocking fashion to do less meaningless work - let (first_sync_id, first_task) = - if let Some(first_task) = sync_queue::next_task(receiver).await { - first_task - } else { - return ControlFlow::Break(()); + let batched_tasks = + match sync_queue::next_task_batch(receiver, max_concurrent_timelines_sync).await { + ControlFlow::Continue(batch) => batch, + ControlFlow::Break(()) => return ControlFlow::Break(()), }; - let mut batched_tasks = sync_queue::next_task_batch(receiver, max_concurrent_sync - 1).await; - match batched_tasks.entry(first_sync_id) { - hash_map::Entry::Occupied(o) => { - let current = o.remove(); - batched_tasks.insert(first_sync_id, current.merge(first_task)); - } - hash_map::Entry::Vacant(v) => { - v.insert(first_task); - } - } - let remaining_queue_length = sync_queue::len(); REMAINING_SYNC_ITEMS.set(remaining_queue_length as i64); if remaining_queue_length > 0 || !batched_tasks.is_empty() { @@ -623,7 +623,7 @@ where let mut new_timeline_states: HashMap< ZTenantId, HashMap, - > = HashMap::with_capacity(max_concurrent_sync); + > = HashMap::with_capacity(max_concurrent_timelines_sync.get()); while let Some((sync_id, state_update)) = sync_results.next().await { debug!("Finished storage sync task for sync id {sync_id}"); if let Some(state_update) = state_update {