Limit concurrent S3 and IAM interactions

This commit is contained in:
Kirill Bulatov
2022-04-25 16:29:23 +03:00
committed by Kirill Bulatov
parent eabf6f89e4
commit 778744d35c
6 changed files with 195 additions and 150 deletions

2
Cargo.lock generated
View File

@@ -1899,7 +1899,7 @@ dependencies = [
"libc",
"log",
"nix",
"parking_lot",
"parking_lot 0.11.2",
"symbolic-demangle",
"tempfile",
"thiserror",

View File

@@ -156,6 +156,9 @@ access_key_id = 'SOMEKEYAAAAASADSAH*#'
# Secret access key to connect to the bucket ("password" part of the credentials)
secret_access_key = 'SOMEsEcReTsd292v'
# S3 API query limit to avoid getting errors/throttling from AWS.
concurrency_limit = 100
```
###### General remote storage configuration
@@ -167,8 +170,8 @@ Besides, there are parameters common for all types of remote storage that can be
```toml
[remote_storage]
# Max number of concurrent connections to open for uploading to or downloading from the remote storage.
max_concurrent_sync = 100
# Max number of concurrent timeline synchronized (layers uploaded or downloaded) with the remote storage at the same time.
max_concurrent_timelines_sync = 50
# Max number of errors a single task can have before it's considered failed and not attempted to run anymore.
max_sync_errors = 10

View File

@@ -4,8 +4,7 @@
//! file, or on the command line.
//! See also `settings.md` for better description on every parameter.
use anyhow::{bail, ensure, Context, Result};
use std::convert::TryInto;
use anyhow::{anyhow, bail, ensure, Context, Result};
use std::env;
use std::num::{NonZeroU32, NonZeroUsize};
use std::path::{Path, PathBuf};
@@ -34,8 +33,18 @@ pub mod defaults {
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
pub const DEFAULT_SUPERUSER: &str = "zenith_admin";
pub const DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNC: usize = 10;
/// How many different timelines can be processed simultaneously when synchronizing layers with the remote storage.
/// During regular work, pageserver produces one layer file per timeline checkpoint, with bursts of concurrency
/// during start (where local and remote timelines are compared and initial sync tasks are scheduled) and timeline attach.
/// Both cases may trigger timeline download, that might download a lot of layers. This concurrency is limited by the clients internally, if needed.
pub const DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_TIMELINES_SYNC: usize = 50;
pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10;
/// Currently, sync happens with AWS S3, that has two limits on requests per second:
/// ~200 RPS for IAM services
/// https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html
/// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests
/// https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/
pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
pub const DEFAULT_PAGE_CACHE_SIZE: usize = 8192;
pub const DEFAULT_MAX_FILE_DESCRIPTORS: usize = 100;
@@ -127,7 +136,7 @@ impl FromStr for ProfilingConfig {
let result = match s {
"disabled" => ProfilingConfig::Disabled,
"page_requests" => ProfilingConfig::PageRequests,
_ => bail!("invalid value \"{}\" for profiling option, valid values are \"disabled\" and \"page_requests\"", s),
_ => bail!("invalid value \"{s}\" for profiling option, valid values are \"disabled\" and \"page_requests\""),
};
Ok(result)
}
@@ -269,36 +278,36 @@ impl PageServerConfigBuilder {
Ok(PageServerConf {
listen_pg_addr: self
.listen_pg_addr
.ok_or(anyhow::anyhow!("missing listen_pg_addr"))?,
.ok_or(anyhow!("missing listen_pg_addr"))?,
listen_http_addr: self
.listen_http_addr
.ok_or(anyhow::anyhow!("missing listen_http_addr"))?,
.ok_or(anyhow!("missing listen_http_addr"))?,
wait_lsn_timeout: self
.wait_lsn_timeout
.ok_or(anyhow::anyhow!("missing wait_lsn_timeout"))?,
.ok_or(anyhow!("missing wait_lsn_timeout"))?,
wal_redo_timeout: self
.wal_redo_timeout
.ok_or(anyhow::anyhow!("missing wal_redo_timeout"))?,
superuser: self.superuser.ok_or(anyhow::anyhow!("missing superuser"))?,
.ok_or(anyhow!("missing wal_redo_timeout"))?,
superuser: self.superuser.ok_or(anyhow!("missing superuser"))?,
page_cache_size: self
.page_cache_size
.ok_or(anyhow::anyhow!("missing page_cache_size"))?,
.ok_or(anyhow!("missing page_cache_size"))?,
max_file_descriptors: self
.max_file_descriptors
.ok_or(anyhow::anyhow!("missing max_file_descriptors"))?,
workdir: self.workdir.ok_or(anyhow::anyhow!("missing workdir"))?,
.ok_or(anyhow!("missing max_file_descriptors"))?,
workdir: self.workdir.ok_or(anyhow!("missing workdir"))?,
pg_distrib_dir: self
.pg_distrib_dir
.ok_or(anyhow::anyhow!("missing pg_distrib_dir"))?,
auth_type: self.auth_type.ok_or(anyhow::anyhow!("missing auth_type"))?,
.ok_or(anyhow!("missing pg_distrib_dir"))?,
auth_type: self.auth_type.ok_or(anyhow!("missing auth_type"))?,
auth_validation_public_key_path: self
.auth_validation_public_key_path
.ok_or(anyhow::anyhow!("missing auth_validation_public_key_path"))?,
.ok_or(anyhow!("missing auth_validation_public_key_path"))?,
remote_storage_config: self
.remote_storage_config
.ok_or(anyhow::anyhow!("missing remote_storage_config"))?,
id: self.id.ok_or(anyhow::anyhow!("missing id"))?,
profiling: self.profiling.ok_or(anyhow::anyhow!("missing profiling"))?,
.ok_or(anyhow!("missing remote_storage_config"))?,
id: self.id.ok_or(anyhow!("missing id"))?,
profiling: self.profiling.ok_or(anyhow!("missing profiling"))?,
// TenantConf is handled separately
default_tenant_conf: TenantConf::default(),
})
@@ -309,7 +318,7 @@ impl PageServerConfigBuilder {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RemoteStorageConfig {
/// Max allowed number of concurrent sync operations between pageserver and the remote storage.
pub max_concurrent_sync: NonZeroUsize,
pub max_concurrent_timelines_sync: NonZeroUsize,
/// Max allowed errors before the sync task is considered failed and evicted.
pub max_sync_errors: NonZeroU32,
/// The storage connection configuration.
@@ -350,6 +359,9 @@ pub struct S3Config {
///
/// Example: `http://127.0.0.1:5000`
pub endpoint: Option<String>,
/// AWS S3 has various limits on its API calls, we need not to exceed those.
/// See [`defaults::DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT`] for more details.
pub concurrency_limit: NonZeroUsize,
}
impl std::fmt::Debug for S3Config {
@@ -358,6 +370,7 @@ impl std::fmt::Debug for S3Config {
.field("bucket_name", &self.bucket_name)
.field("bucket_region", &self.bucket_region)
.field("prefix_in_bucket", &self.prefix_in_bucket)
.field("concurrency_limit", &self.concurrency_limit)
.finish()
}
}
@@ -431,7 +444,7 @@ impl PageServerConf {
}
"id" => builder.id(ZNodeId(parse_toml_u64(key, item)?)),
"profiling" => builder.profiling(parse_toml_from_str(key, item)?),
_ => bail!("unrecognized pageserver option '{}'", key),
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
@@ -509,32 +522,23 @@ impl PageServerConf {
let bucket_name = toml.get("bucket_name");
let bucket_region = toml.get("bucket_region");
let max_concurrent_sync: NonZeroUsize = if let Some(s) = toml.get("max_concurrent_sync") {
parse_toml_u64("max_concurrent_sync", s)
.and_then(|toml_u64| {
toml_u64.try_into().with_context(|| {
format!("'max_concurrent_sync' value {} is too large", toml_u64)
})
})
.ok()
.and_then(NonZeroUsize::new)
.context("'max_concurrent_sync' must be a non-zero positive integer")?
} else {
NonZeroUsize::new(defaults::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNC).unwrap()
};
let max_sync_errors: NonZeroU32 = if let Some(s) = toml.get("max_sync_errors") {
parse_toml_u64("max_sync_errors", s)
.and_then(|toml_u64| {
toml_u64.try_into().with_context(|| {
format!("'max_sync_errors' value {} is too large", toml_u64)
})
})
.ok()
.and_then(NonZeroU32::new)
.context("'max_sync_errors' must be a non-zero positive integer")?
} else {
NonZeroU32::new(defaults::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS).unwrap()
};
let max_concurrent_timelines_sync = NonZeroUsize::new(
parse_optional_integer("max_concurrent_timelines_sync", toml)?
.unwrap_or(defaults::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_TIMELINES_SYNC),
)
.context("Failed to parse 'max_concurrent_timelines_sync' as a positive integer")?;
let max_sync_errors = NonZeroU32::new(
parse_optional_integer("max_sync_errors", toml)?
.unwrap_or(defaults::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS),
)
.context("Failed to parse 'max_sync_errors' as a positive integer")?;
let concurrency_limit = NonZeroUsize::new(
parse_optional_integer("concurrency_limit", toml)?
.unwrap_or(defaults::DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT),
)
.context("Failed to parse 'concurrency_limit' as a positive integer")?;
let storage = match (local_path, bucket_name, bucket_region) {
(None, None, None) => bail!("no 'local_path' nor 'bucket_name' option"),
@@ -565,6 +569,7 @@ impl PageServerConf {
.get("endpoint")
.map(|endpoint| parse_toml_string("endpoint", endpoint))
.transpose()?,
concurrency_limit,
}),
(Some(local_path), None, None) => RemoteStorageKind::LocalFs(PathBuf::from(
parse_toml_string("local_path", local_path)?,
@@ -573,7 +578,7 @@ impl PageServerConf {
};
Ok(RemoteStorageConfig {
max_concurrent_sync,
max_concurrent_timelines_sync,
max_sync_errors,
storage,
})
@@ -581,7 +586,7 @@ impl PageServerConf {
#[cfg(test)]
pub fn test_repo_dir(test_name: &str) -> PathBuf {
PathBuf::from(format!("../tmp_check/test_{}", test_name))
PathBuf::from(format!("../tmp_check/test_{test_name}"))
}
#[cfg(test)]
@@ -611,7 +616,7 @@ impl PageServerConf {
fn parse_toml_string(name: &str, item: &Item) -> Result<String> {
let s = item
.as_str()
.with_context(|| format!("configure option {} is not a string", name))?;
.with_context(|| format!("configure option {name} is not a string"))?;
Ok(s.to_string())
}
@@ -620,17 +625,34 @@ fn parse_toml_u64(name: &str, item: &Item) -> Result<u64> {
// for our use, though.
let i: i64 = item
.as_integer()
.with_context(|| format!("configure option {} is not an integer", name))?;
.with_context(|| format!("configure option {name} is not an integer"))?;
if i < 0 {
bail!("configure option {} cannot be negative", name);
bail!("configure option {name} cannot be negative");
}
Ok(i as u64)
}
fn parse_optional_integer<I, E>(name: &str, item: &toml_edit::Item) -> anyhow::Result<Option<I>>
where
I: TryFrom<i64, Error = E>,
E: std::error::Error + Send + Sync + 'static,
{
let toml_integer = match item.get(name) {
Some(item) => item
.as_integer()
.with_context(|| format!("configure option {name} is not an integer"))?,
None => return Ok(None),
};
I::try_from(toml_integer)
.map(Some)
.with_context(|| format!("configure option {name} is too large"))
}
fn parse_toml_duration(name: &str, item: &Item) -> Result<Duration> {
let s = item
.as_str()
.with_context(|| format!("configure option {} is not a string", name))?;
.with_context(|| format!("configure option {name} is not a string"))?;
Ok(humantime::parse_duration(s)?)
}
@@ -641,7 +663,7 @@ where
{
let v = item
.as_str()
.with_context(|| format!("configure option {} is not a string", name))?;
.with_context(|| format!("configure option {name} is not a string"))?;
T::from_str(v)
}
@@ -679,10 +701,8 @@ id = 10
let config_string = format!("pg_distrib_dir='{}'\nid=10", pg_distrib_dir.display());
let toml = config_string.parse()?;
let parsed_config =
PageServerConf::parse_and_validate(&toml, &workdir).unwrap_or_else(|e| {
panic!("Failed to parse config '{}', reason: {}", config_string, e)
});
let parsed_config = PageServerConf::parse_and_validate(&toml, &workdir)
.unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e}"));
assert_eq!(
parsed_config,
@@ -715,16 +735,13 @@ id = 10
let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
let config_string = format!(
"{}pg_distrib_dir='{}'",
ALL_BASE_VALUES_TOML,
"{ALL_BASE_VALUES_TOML}pg_distrib_dir='{}'",
pg_distrib_dir.display()
);
let toml = config_string.parse()?;
let parsed_config =
PageServerConf::parse_and_validate(&toml, &workdir).unwrap_or_else(|e| {
panic!("Failed to parse config '{}', reason: {}", config_string, e)
});
let parsed_config = PageServerConf::parse_and_validate(&toml, &workdir)
.unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e}"));
assert_eq!(
parsed_config,
@@ -772,37 +789,33 @@ local_path = '{}'"#,
for remote_storage_config_str in identical_toml_declarations {
let config_string = format!(
r#"{}
r#"{ALL_BASE_VALUES_TOML}
pg_distrib_dir='{}'
{}"#,
ALL_BASE_VALUES_TOML,
{remote_storage_config_str}"#,
pg_distrib_dir.display(),
remote_storage_config_str,
);
let toml = config_string.parse()?;
let parsed_remote_storage_config = PageServerConf::parse_and_validate(&toml, &workdir)
.unwrap_or_else(|e| {
panic!("Failed to parse config '{}', reason: {}", config_string, e)
})
.unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e}"))
.remote_storage_config
.expect("Should have remote storage config for the local FS");
assert_eq!(
parsed_remote_storage_config,
RemoteStorageConfig {
max_concurrent_sync: NonZeroUsize::new(
defaults::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNC
)
.unwrap(),
max_sync_errors: NonZeroU32::new(defaults::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS)
parsed_remote_storage_config,
RemoteStorageConfig {
max_concurrent_timelines_sync: NonZeroUsize::new(
defaults::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_TIMELINES_SYNC
)
.unwrap(),
storage: RemoteStorageKind::LocalFs(local_storage_path.clone()),
},
"Remote storage config should correctly parse the local FS config and fill other storage defaults"
);
max_sync_errors: NonZeroU32::new(defaults::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS)
.unwrap(),
storage: RemoteStorageKind::LocalFs(local_storage_path.clone()),
},
"Remote storage config should correctly parse the local FS config and fill other storage defaults"
);
}
Ok(())
}
@@ -818,52 +831,49 @@ pg_distrib_dir='{}'
let access_key_id = "SOMEKEYAAAAASADSAH*#".to_string();
let secret_access_key = "SOMEsEcReTsd292v".to_string();
let endpoint = "http://localhost:5000".to_string();
let max_concurrent_sync = NonZeroUsize::new(111).unwrap();
let max_concurrent_timelines_sync = NonZeroUsize::new(111).unwrap();
let max_sync_errors = NonZeroU32::new(222).unwrap();
let s3_concurrency_limit = NonZeroUsize::new(333).unwrap();
let identical_toml_declarations = &[
format!(
r#"[remote_storage]
max_concurrent_sync = {}
max_sync_errors = {}
bucket_name = '{}'
bucket_region = '{}'
prefix_in_bucket = '{}'
access_key_id = '{}'
secret_access_key = '{}'
endpoint = '{}'"#,
max_concurrent_sync, max_sync_errors, bucket_name, bucket_region, prefix_in_bucket, access_key_id, secret_access_key, endpoint
max_concurrent_timelines_sync = {max_concurrent_timelines_sync}
max_sync_errors = {max_sync_errors}
bucket_name = '{bucket_name}'
bucket_region = '{bucket_region}'
prefix_in_bucket = '{prefix_in_bucket}'
access_key_id = '{access_key_id}'
secret_access_key = '{secret_access_key}'
endpoint = '{endpoint}'
concurrency_limit = {s3_concurrency_limit}"#
),
format!(
"remote_storage={{max_concurrent_sync={}, max_sync_errors={}, bucket_name='{}', bucket_region='{}', prefix_in_bucket='{}', access_key_id='{}', secret_access_key='{}', endpoint='{}'}}",
max_concurrent_sync, max_sync_errors, bucket_name, bucket_region, prefix_in_bucket, access_key_id, secret_access_key, endpoint
"remote_storage={{max_concurrent_timelines_sync={max_concurrent_timelines_sync}, max_sync_errors={max_sync_errors}, bucket_name='{bucket_name}',\
bucket_region='{bucket_region}', prefix_in_bucket='{prefix_in_bucket}', access_key_id='{access_key_id}', secret_access_key='{secret_access_key}', endpoint='{endpoint}', concurrency_limit={s3_concurrency_limit}}}",
),
];
for remote_storage_config_str in identical_toml_declarations {
let config_string = format!(
r#"{}
r#"{ALL_BASE_VALUES_TOML}
pg_distrib_dir='{}'
{}"#,
ALL_BASE_VALUES_TOML,
{remote_storage_config_str}"#,
pg_distrib_dir.display(),
remote_storage_config_str,
);
let toml = config_string.parse()?;
let parsed_remote_storage_config = PageServerConf::parse_and_validate(&toml, &workdir)
.unwrap_or_else(|e| {
panic!("Failed to parse config '{}', reason: {}", config_string, e)
})
.unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e}"))
.remote_storage_config
.expect("Should have remote storage config for S3");
assert_eq!(
parsed_remote_storage_config,
RemoteStorageConfig {
max_concurrent_sync,
max_concurrent_timelines_sync,
max_sync_errors,
storage: RemoteStorageKind::AwsS3(S3Config {
bucket_name: bucket_name.clone(),
@@ -871,7 +881,8 @@ pg_distrib_dir='{}'
access_key_id: Some(access_key_id.clone()),
secret_access_key: Some(secret_access_key.clone()),
prefix_in_bucket: Some(prefix_in_bucket.clone()),
endpoint: Some(endpoint.clone())
endpoint: Some(endpoint.clone()),
concurrency_limit: s3_concurrency_limit,
}),
},
"Remote storage config should correctly parse the S3 config"

View File

@@ -161,7 +161,7 @@ pub fn start_local_timeline_sync(
config,
local_timeline_files,
LocalFs::new(root.clone(), &config.workdir)?,
storage_config.max_concurrent_sync,
storage_config.max_concurrent_timelines_sync,
storage_config.max_sync_errors,
)
},
@@ -172,7 +172,7 @@ pub fn start_local_timeline_sync(
config,
local_timeline_files,
S3Bucket::new(s3_config, &config.workdir)?,
storage_config.max_concurrent_sync,
storage_config.max_concurrent_timelines_sync,
storage_config.max_sync_errors,
)
},

View File

@@ -15,7 +15,7 @@ use rusoto_s3::{
DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client,
StreamingBody, S3,
};
use tokio::io;
use tokio::{io, sync::Semaphore};
use tokio_util::io::ReaderStream;
use tracing::debug;
@@ -65,6 +65,10 @@ pub struct S3Bucket {
client: S3Client,
bucket_name: String,
prefix_in_bucket: Option<String>,
// Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
// Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
// The helps to ensure we don't exceed the thresholds.
concurrency_limiter: Semaphore,
}
impl S3Bucket {
@@ -119,6 +123,7 @@ impl S3Bucket {
pageserver_workdir,
bucket_name: aws_config.bucket_name.clone(),
prefix_in_bucket,
concurrency_limiter: Semaphore::new(aws_config.concurrency_limit.get()),
})
}
}
@@ -147,6 +152,11 @@ impl RemoteStorage for S3Bucket {
let mut continuation_token = None;
loop {
let _guard = self
.concurrency_limiter
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 list")?;
let fetch_response = self
.client
.list_objects_v2(ListObjectsV2Request {
@@ -180,6 +190,11 @@ impl RemoteStorage for S3Bucket {
to: &Self::StoragePath,
metadata: Option<StorageMetadata>,
) -> anyhow::Result<()> {
let _guard = self
.concurrency_limiter
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 upload")?;
self.client
.put_object(PutObjectRequest {
body: Some(StreamingBody::new_with_size(
@@ -200,6 +215,11 @@ impl RemoteStorage for S3Bucket {
from: &Self::StoragePath,
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<Option<StorageMetadata>> {
let _guard = self
.concurrency_limiter
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 download")?;
let object_output = self
.client
.get_object(GetObjectRequest {
@@ -231,6 +251,11 @@ impl RemoteStorage for S3Bucket {
Some(end_inclusive) => format!("bytes={}-{}", start_inclusive, end_inclusive),
None => format!("bytes={}-", start_inclusive),
});
let _guard = self
.concurrency_limiter
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 range download")?;
let object_output = self
.client
.get_object(GetObjectRequest {
@@ -250,6 +275,11 @@ impl RemoteStorage for S3Bucket {
}
async fn delete(&self, path: &Self::StoragePath) -> anyhow::Result<()> {
let _guard = self
.concurrency_limiter
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 delete")?;
self.client
.delete_object(DeleteObjectRequest {
bucket: self.bucket_name.clone(),
@@ -433,6 +463,7 @@ mod tests {
client: S3Client::new("us-east-1".parse().unwrap()),
bucket_name: "dummy-bucket".to_string(),
prefix_in_bucket: Some("dummy_prefix/".to_string()),
concurrency_limiter: Semaphore::new(1),
}
}

View File

@@ -62,7 +62,7 @@ pub mod index;
mod upload;
use std::{
collections::{hash_map, HashMap, HashSet, VecDeque},
collections::{HashMap, HashSet, VecDeque},
fmt::Debug,
num::{NonZeroU32, NonZeroUsize},
ops::ControlFlow,
@@ -132,7 +132,9 @@ lazy_static! {
/// mpsc approach was picked to allow blocking the sync loop if no tasks are present, to avoid meaningless spinning.
mod sync_queue {
use std::{
collections::{hash_map, HashMap},
collections::{hash_map, HashMap, HashSet},
num::NonZeroUsize,
ops::ControlFlow,
sync::atomic::{AtomicUsize, Ordering},
};
@@ -179,7 +181,7 @@ mod sync_queue {
/// Polls a new task from the queue, using its receiver counterpart.
/// Does not block if the queue is empty, returning [`None`] instead.
/// Needed to correctly track the queue length.
pub async fn next_task(
async fn next_task(
receiver: &mut UnboundedReceiver<(ZTenantTimelineId, SyncTask)>,
) -> Option<(ZTenantTimelineId, SyncTask)> {
let task = receiver.recv().await;
@@ -195,15 +197,29 @@ mod sync_queue {
/// or two (download and upload, if both were found in the queue during batch construction).
pub async fn next_task_batch(
receiver: &mut UnboundedReceiver<(ZTenantTimelineId, SyncTask)>,
mut max_batch_size: usize,
) -> HashMap<ZTenantTimelineId, SyncTask> {
if max_batch_size == 0 {
return HashMap::new();
}
let mut tasks: HashMap<ZTenantTimelineId, SyncTask> =
HashMap::with_capacity(max_batch_size);
max_timelines_to_sync: NonZeroUsize,
) -> ControlFlow<(), HashMap<ZTenantTimelineId, SyncTask>> {
// request the first task in blocking fashion to do less meaningless work
let (first_sync_id, first_task) = if let Some(first_task) = next_task(receiver).await {
first_task
} else {
debug!("Queue sender part was dropped, aborting");
return ControlFlow::Break(());
};
let max_timelines_to_sync = max_timelines_to_sync.get();
let mut batched_timelines = HashSet::with_capacity(max_timelines_to_sync);
batched_timelines.insert(first_sync_id.timeline_id);
let mut tasks = HashMap::new();
tasks.insert(first_sync_id, first_task);
loop {
if batched_timelines.len() >= max_timelines_to_sync {
debug!("Filled a full task batch with {max_timelines_to_sync} timeline sync operations");
break;
}
match receiver.try_recv() {
Ok((sync_id, new_task)) => {
LENGTH.fetch_sub(1, Ordering::Relaxed);
@@ -216,24 +232,23 @@ mod sync_queue {
v.insert(new_task);
}
}
max_batch_size -= 1;
if max_batch_size == 0 {
break;
}
batched_timelines.insert(sync_id.timeline_id);
}
Err(TryRecvError::Disconnected) => {
debug!("Sender disconnected, batch collection aborted");
break;
}
Err(TryRecvError::Empty) => {
debug!("No more data in the sync queue, task batch is not full");
debug!(
"No more data in the sync queue, task batch is not full, length: {}, max allowed size: {max_timelines_to_sync}",
batched_timelines.len()
);
break;
}
}
}
tasks
ControlFlow::Continue(tasks)
}
/// Length of the queue, assuming that all receiver counterparts were only called using the queue api.
@@ -455,7 +470,7 @@ pub(super) fn spawn_storage_sync_thread<P, S>(
conf: &'static PageServerConf,
local_timeline_files: HashMap<ZTenantTimelineId, (TimelineMetadata, HashSet<PathBuf>)>,
storage: S,
max_concurrent_sync: NonZeroUsize,
max_concurrent_timelines_sync: NonZeroUsize,
max_sync_errors: NonZeroU32,
) -> anyhow::Result<SyncStartupData>
where
@@ -497,7 +512,7 @@ where
receiver,
Arc::new(storage),
loop_index,
max_concurrent_sync,
max_concurrent_timelines_sync,
max_sync_errors,
);
Ok(())
@@ -517,7 +532,7 @@ fn storage_sync_loop<P, S>(
mut receiver: UnboundedReceiver<(ZTenantTimelineId, SyncTask)>,
storage: Arc<S>,
index: RemoteIndex,
max_concurrent_sync: NonZeroUsize,
max_concurrent_timelines_sync: NonZeroUsize,
max_sync_errors: NonZeroU32,
) where
P: Debug + Send + Sync + 'static,
@@ -534,7 +549,7 @@ fn storage_sync_loop<P, S>(
&mut receiver,
storage,
loop_index,
max_concurrent_sync,
max_concurrent_timelines_sync,
max_sync_errors,
)
.instrument(info_span!("storage_sync_loop_step")) => step,
@@ -568,34 +583,19 @@ async fn loop_step<P, S>(
receiver: &mut UnboundedReceiver<(ZTenantTimelineId, SyncTask)>,
storage: Arc<S>,
index: RemoteIndex,
max_concurrent_sync: NonZeroUsize,
max_concurrent_timelines_sync: NonZeroUsize,
max_sync_errors: NonZeroU32,
) -> ControlFlow<(), HashMap<ZTenantId, HashMap<ZTimelineId, TimelineSyncStatusUpdate>>>
where
P: Debug + Send + Sync + 'static,
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
{
let max_concurrent_sync = max_concurrent_sync.get();
// request the first task in blocking fashion to do less meaningless work
let (first_sync_id, first_task) =
if let Some(first_task) = sync_queue::next_task(receiver).await {
first_task
} else {
return ControlFlow::Break(());
let batched_tasks =
match sync_queue::next_task_batch(receiver, max_concurrent_timelines_sync).await {
ControlFlow::Continue(batch) => batch,
ControlFlow::Break(()) => return ControlFlow::Break(()),
};
let mut batched_tasks = sync_queue::next_task_batch(receiver, max_concurrent_sync - 1).await;
match batched_tasks.entry(first_sync_id) {
hash_map::Entry::Occupied(o) => {
let current = o.remove();
batched_tasks.insert(first_sync_id, current.merge(first_task));
}
hash_map::Entry::Vacant(v) => {
v.insert(first_task);
}
}
let remaining_queue_length = sync_queue::len();
REMAINING_SYNC_ITEMS.set(remaining_queue_length as i64);
if remaining_queue_length > 0 || !batched_tasks.is_empty() {
@@ -623,7 +623,7 @@ where
let mut new_timeline_states: HashMap<
ZTenantId,
HashMap<ZTimelineId, TimelineSyncStatusUpdate>,
> = HashMap::with_capacity(max_concurrent_sync);
> = HashMap::with_capacity(max_concurrent_timelines_sync.get());
while let Some((sync_id, state_update)) = sync_results.next().await {
debug!("Finished storage sync task for sync id {sync_id}");
if let Some(state_update) = state_update {