diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index 840917ef68..8d1962fa29 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -35,6 +35,7 @@ use utils::backoff; use utils::backoff::exponential_backoff_duration_seconds; use crate::metrics::{start_measuring_requests, AttemptOutcome, RequestKind}; +use crate::DownloadKind; use crate::{ config::AzureConfig, error::Cancelled, ConcurrencyLimiter, Download, DownloadError, DownloadOpts, Listing, ListingMode, ListingObject, RemotePath, RemoteStorage, StorageMetadata, @@ -49,10 +50,17 @@ pub struct AzureBlobStorage { concurrency_limiter: ConcurrencyLimiter, // Per-request timeout. Accessible for tests. pub timeout: Duration, + + // Alternative timeout used for metadata objects which are expected to be small + pub small_timeout: Duration, } impl AzureBlobStorage { - pub fn new(azure_config: &AzureConfig, timeout: Duration) -> Result { + pub fn new( + azure_config: &AzureConfig, + timeout: Duration, + small_timeout: Duration, + ) -> Result { debug!( "Creating azure remote storage for azure container {}", azure_config.container_name @@ -94,6 +102,7 @@ impl AzureBlobStorage { max_keys_per_list_response, concurrency_limiter: ConcurrencyLimiter::new(azure_config.concurrency_limit.get()), timeout, + small_timeout, }) } @@ -133,6 +142,7 @@ impl AzureBlobStorage { async fn download_for_builder( &self, builder: GetBlobBuilder, + timeout: Duration, cancel: &CancellationToken, ) -> Result { let kind = RequestKind::Get; @@ -156,7 +166,7 @@ impl AzureBlobStorage { .map_err(to_download_error); // apply per request timeout - let response = tokio_stream::StreamExt::timeout(response, self.timeout); + let response = tokio_stream::StreamExt::timeout(response, timeout); // flatten let response = response.map(|res| match res { @@ -415,7 +425,7 @@ impl RemoteStorage for AzureBlobStorage { let blob_client = self.client.blob_client(self.relative_path_to_name(key)); let properties_future = blob_client.get_properties().into_future(); - let properties_future = tokio::time::timeout(self.timeout, properties_future); + let properties_future = tokio::time::timeout(self.small_timeout, properties_future); let res = tokio::select! { res = properties_future => res, @@ -521,7 +531,12 @@ impl RemoteStorage for AzureBlobStorage { }); } - self.download_for_builder(builder, cancel).await + let timeout = match opts.kind { + DownloadKind::Small => self.small_timeout, + DownloadKind::Large => self.timeout, + }; + + self.download_for_builder(builder, timeout, cancel).await } async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> { diff --git a/libs/remote_storage/src/config.rs b/libs/remote_storage/src/config.rs index e99ae4f747..f6ef31077c 100644 --- a/libs/remote_storage/src/config.rs +++ b/libs/remote_storage/src/config.rs @@ -24,6 +24,13 @@ pub struct RemoteStorageConfig { skip_serializing_if = "is_default_timeout" )] pub timeout: Duration, + /// Alternative timeout used for metadata objects which are expected to be small + #[serde( + with = "humantime_serde", + default = "default_small_timeout", + skip_serializing_if = "is_default_small_timeout" + )] + pub small_timeout: Duration, } impl RemoteStorageKind { @@ -40,10 +47,18 @@ fn default_timeout() -> Duration { RemoteStorageConfig::DEFAULT_TIMEOUT } +fn default_small_timeout() -> Duration { + RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT +} + fn is_default_timeout(d: &Duration) -> bool { *d == RemoteStorageConfig::DEFAULT_TIMEOUT } +fn is_default_small_timeout(d: &Duration) -> bool { + *d == RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT +} + /// A kind of a remote storage to connect to, with its connection configuration. #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] #[serde(untagged)] @@ -184,6 +199,7 @@ fn serialize_storage_class( impl RemoteStorageConfig { pub const DEFAULT_TIMEOUT: Duration = std::time::Duration::from_secs(120); + pub const DEFAULT_SMALL_TIMEOUT: Duration = std::time::Duration::from_secs(30); pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result { Ok(utils::toml_edit_ext::deserialize_item(toml)?) @@ -219,7 +235,8 @@ timeout = '5s'"; storage: RemoteStorageKind::LocalFs { local_path: Utf8PathBuf::from(".") }, - timeout: Duration::from_secs(5) + timeout: Duration::from_secs(5), + small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT } ); } @@ -247,7 +264,8 @@ timeout = '5s'"; max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, upload_storage_class: Some(StorageClass::IntelligentTiering), }), - timeout: Duration::from_secs(7) + timeout: Duration::from_secs(7), + small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT } ); } @@ -299,7 +317,8 @@ timeout = '5s'"; concurrency_limit: default_remote_storage_azure_concurrency_limit(), max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, }), - timeout: Duration::from_secs(7) + timeout: Duration::from_secs(7), + small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT } ); } diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 719608dd5f..0ece29d99e 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -178,6 +178,15 @@ pub struct DownloadOpts { /// The end of the byte range to download, or unbounded. Must be after the /// start bound. pub byte_end: Bound, + /// Indicate whether we're downloading something small or large: this indirectly controls + /// timeouts: for something like an index/manifest/heatmap, we should time out faster than + /// for layer files + pub kind: DownloadKind, +} + +pub enum DownloadKind { + Large, + Small, } impl Default for DownloadOpts { @@ -186,6 +195,7 @@ impl Default for DownloadOpts { etag: Default::default(), byte_start: Bound::Unbounded, byte_end: Bound::Unbounded, + kind: DownloadKind::Large, } } } @@ -584,6 +594,10 @@ impl GenericRemoteStorage> { impl GenericRemoteStorage { pub async fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result { let timeout = storage_config.timeout; + + // If somkeone overrides timeout to be small without adjusting small_timeout, then adjust it automatically + let small_timeout = std::cmp::min(storage_config.small_timeout, timeout); + Ok(match &storage_config.storage { RemoteStorageKind::LocalFs { local_path: path } => { info!("Using fs root '{path}' as a remote storage"); @@ -606,7 +620,11 @@ impl GenericRemoteStorage { .unwrap_or(""); info!("Using azure container '{}' in account '{storage_account}' in region '{}' as a remote storage, prefix in container: '{:?}'", azure_config.container_name, azure_config.container_region, azure_config.prefix_in_container); - Self::AzureBlob(Arc::new(AzureBlobStorage::new(azure_config, timeout)?)) + Self::AzureBlob(Arc::new(AzureBlobStorage::new( + azure_config, + timeout, + small_timeout, + )?)) } }) } diff --git a/libs/remote_storage/tests/test_real_azure.rs b/libs/remote_storage/tests/test_real_azure.rs index 3a20649490..92d579fec8 100644 --- a/libs/remote_storage/tests/test_real_azure.rs +++ b/libs/remote_storage/tests/test_real_azure.rs @@ -219,7 +219,8 @@ async fn create_azure_client( concurrency_limit: NonZeroUsize::new(100).unwrap(), max_keys_per_list_response, }), - timeout: Duration::from_secs(120), + timeout: RemoteStorageConfig::DEFAULT_TIMEOUT, + small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT, }; Ok(Arc::new( GenericRemoteStorage::from_config(&remote_storage_config) diff --git a/libs/remote_storage/tests/test_real_s3.rs b/libs/remote_storage/tests/test_real_s3.rs index 3e99a65fac..e60ec18c93 100644 --- a/libs/remote_storage/tests/test_real_s3.rs +++ b/libs/remote_storage/tests/test_real_s3.rs @@ -396,6 +396,7 @@ async fn create_s3_client( upload_storage_class: None, }), timeout: RemoteStorageConfig::DEFAULT_TIMEOUT, + small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT, }; Ok(Arc::new( GenericRemoteStorage::from_config(&remote_storage_config) diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index e74c8ecf5a..1d508f5fe9 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -838,6 +838,7 @@ mod test { local_path: remote_fs_dir.clone(), }, timeout: RemoteStorageConfig::DEFAULT_TIMEOUT, + small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT, }; let storage = GenericRemoteStorage::from_config(&storage_config) .await diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 339a3ca1bb..cd0690bb1a 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -5423,6 +5423,7 @@ pub(crate) mod harness { local_path: remote_fs_dir.clone(), }, timeout: RemoteStorageConfig::DEFAULT_TIMEOUT, + small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT, }; let remote_storage = GenericRemoteStorage::from_config(&config).await.unwrap(); let deletion_queue = MockDeletionQueue::new(Some(remote_storage.clone())); diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index d632e595ad..739615be9c 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -30,7 +30,9 @@ use crate::tenant::Generation; use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile}; use crate::TEMP_FILE_SUFFIX; -use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath}; +use remote_storage::{ + DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath, +}; use utils::crashsafe::path_with_suffix_extension; use utils::id::{TenantId, TimelineId}; use utils::pausable_failpoint; @@ -345,12 +347,13 @@ pub async fn list_remote_timelines( async fn do_download_remote_path_retry_forever( storage: &GenericRemoteStorage, remote_path: &RemotePath, + download_opts: DownloadOpts, cancel: &CancellationToken, ) -> Result<(Vec, SystemTime), DownloadError> { download_retry_forever( || async { let download = storage - .download(remote_path, &DownloadOpts::default(), cancel) + .download(remote_path, &download_opts, cancel) .await?; let mut bytes = Vec::new(); @@ -377,8 +380,13 @@ async fn do_download_tenant_manifest( ) -> Result<(TenantManifest, Generation, SystemTime), DownloadError> { let remote_path = remote_tenant_manifest_path(tenant_shard_id, generation); + let download_opts = DownloadOpts { + kind: DownloadKind::Small, + ..Default::default() + }; + let (manifest_bytes, manifest_bytes_mtime) = - do_download_remote_path_retry_forever(storage, &remote_path, cancel).await?; + do_download_remote_path_retry_forever(storage, &remote_path, download_opts, cancel).await?; let tenant_manifest = TenantManifest::from_json_bytes(&manifest_bytes) .with_context(|| format!("deserialize tenant manifest file at {remote_path:?}")) @@ -398,8 +406,13 @@ async fn do_download_index_part( timeline_id.expect("A timeline ID is always provided when downloading an index"); let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation); + let download_opts = DownloadOpts { + kind: DownloadKind::Small, + ..Default::default() + }; + let (index_part_bytes, index_part_mtime) = - do_download_remote_path_retry_forever(storage, &remote_path, cancel).await?; + do_download_remote_path_retry_forever(storage, &remote_path, download_opts, cancel).await?; let index_part: IndexPart = serde_json::from_slice(&index_part_bytes) .with_context(|| format!("deserialize index part file at {remote_path:?}")) diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 7443261a9c..8d771dc405 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -49,7 +49,7 @@ use futures::Future; use metrics::UIntGauge; use pageserver_api::models::SecondaryProgress; use pageserver_api::shard::TenantShardId; -use remote_storage::{DownloadError, DownloadOpts, Etag, GenericRemoteStorage}; +use remote_storage::{DownloadError, DownloadKind, DownloadOpts, Etag, GenericRemoteStorage}; use tokio_util::sync::CancellationToken; use tracing::{info_span, instrument, warn, Instrument}; @@ -946,6 +946,7 @@ impl<'a> TenantDownloader<'a> { let cancel = &self.secondary_state.cancel; let opts = DownloadOpts { etag: prev_etag.cloned(), + kind: DownloadKind::Small, ..Default::default() }; diff --git a/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs b/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs index 8d5ab1780f..bc4d148a29 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs @@ -4,7 +4,8 @@ use anyhow::Context; use bytes::Bytes; use postgres_ffi::ControlFileData; use remote_storage::{ - Download, DownloadError, DownloadOpts, GenericRemoteStorage, Listing, ListingObject, RemotePath, + Download, DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, Listing, + ListingObject, RemotePath, }; use serde::de::DeserializeOwned; use tokio_util::sync::CancellationToken; @@ -239,6 +240,7 @@ impl RemoteStorageWrapper { .download( path, &DownloadOpts { + kind: DownloadKind::Large, etag: None, byte_start: Bound::Included(start_inclusive), byte_end: Bound::Excluded(end_exclusive) diff --git a/proxy/src/context/parquet.rs b/proxy/src/context/parquet.rs index e328c6de79..b375eb886e 100644 --- a/proxy/src/context/parquet.rs +++ b/proxy/src/context/parquet.rs @@ -486,6 +486,7 @@ mod tests { upload_storage_class: None, }), timeout: RemoteStorageConfig::DEFAULT_TIMEOUT, + small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT, }) ); assert_eq!(parquet_upload.parquet_upload_row_group_size, 100); @@ -545,6 +546,7 @@ mod tests { local_path: tmpdir.to_path_buf(), }, timeout: std::time::Duration::from_secs(120), + small_timeout: std::time::Duration::from_secs(30), }; let storage = GenericRemoteStorage::from_config(&remote_storage_config) .await