mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-31 12:00:42 +00:00
pageserver: download small objects using a smaller timeout (#9938)
## Problem It appears that the Azure storage API tends to hang TCP connections more than S3 does. Currently we use a 2 minute timeout for all downloads. This is large because sometimes the objects we download are large. However, waiting 2 minutes when doing something like downloading a manifest on tenant attach is problematic, because when someone is doing a "create tenant, create timeline" workflow, that 2 minutes is long enough for them reasonably to give up creating that timeline. Rather than propagate oversized timeouts further up the stack, we should use a different timeout for objects that we expect to be small. Closes: https://github.com/neondatabase/neon/issues/9836 ## Summary of changes - Add a `small_timeout` configuration attribute to remote storage, defaulting to 30 seconds (still a very generous period to do something like download an index) - Add a DownloadKind parameter to DownloadOpts, so that callers can indicate whether they expect the object to be small or large. - In the azure client, use small timeout for HEAD requests, and for GET requests if DownloadKind::Small is used. - Use DownloadKind::Small for manifests, indices, and heatmap downloads. This PR intentionally does not make the equivalent change to the S3 client, to reduce blast radius in case this has unexpected consequences (we could accomplish the same thing by editing lots of configs, but just skipping the code is simpler for right now)
This commit is contained in:
@@ -35,6 +35,7 @@ use utils::backoff;
|
||||
use utils::backoff::exponential_backoff_duration_seconds;
|
||||
|
||||
use crate::metrics::{start_measuring_requests, AttemptOutcome, RequestKind};
|
||||
use crate::DownloadKind;
|
||||
use crate::{
|
||||
config::AzureConfig, error::Cancelled, ConcurrencyLimiter, Download, DownloadError,
|
||||
DownloadOpts, Listing, ListingMode, ListingObject, RemotePath, RemoteStorage, StorageMetadata,
|
||||
@@ -49,10 +50,17 @@ pub struct AzureBlobStorage {
|
||||
concurrency_limiter: ConcurrencyLimiter,
|
||||
// Per-request timeout. Accessible for tests.
|
||||
pub timeout: Duration,
|
||||
|
||||
// Alternative timeout used for metadata objects which are expected to be small
|
||||
pub small_timeout: Duration,
|
||||
}
|
||||
|
||||
impl AzureBlobStorage {
|
||||
pub fn new(azure_config: &AzureConfig, timeout: Duration) -> Result<Self> {
|
||||
pub fn new(
|
||||
azure_config: &AzureConfig,
|
||||
timeout: Duration,
|
||||
small_timeout: Duration,
|
||||
) -> Result<Self> {
|
||||
debug!(
|
||||
"Creating azure remote storage for azure container {}",
|
||||
azure_config.container_name
|
||||
@@ -94,6 +102,7 @@ impl AzureBlobStorage {
|
||||
max_keys_per_list_response,
|
||||
concurrency_limiter: ConcurrencyLimiter::new(azure_config.concurrency_limit.get()),
|
||||
timeout,
|
||||
small_timeout,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -133,6 +142,7 @@ impl AzureBlobStorage {
|
||||
async fn download_for_builder(
|
||||
&self,
|
||||
builder: GetBlobBuilder,
|
||||
timeout: Duration,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Download, DownloadError> {
|
||||
let kind = RequestKind::Get;
|
||||
@@ -156,7 +166,7 @@ impl AzureBlobStorage {
|
||||
.map_err(to_download_error);
|
||||
|
||||
// apply per request timeout
|
||||
let response = tokio_stream::StreamExt::timeout(response, self.timeout);
|
||||
let response = tokio_stream::StreamExt::timeout(response, timeout);
|
||||
|
||||
// flatten
|
||||
let response = response.map(|res| match res {
|
||||
@@ -415,7 +425,7 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
let blob_client = self.client.blob_client(self.relative_path_to_name(key));
|
||||
let properties_future = blob_client.get_properties().into_future();
|
||||
|
||||
let properties_future = tokio::time::timeout(self.timeout, properties_future);
|
||||
let properties_future = tokio::time::timeout(self.small_timeout, properties_future);
|
||||
|
||||
let res = tokio::select! {
|
||||
res = properties_future => res,
|
||||
@@ -521,7 +531,12 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
});
|
||||
}
|
||||
|
||||
self.download_for_builder(builder, cancel).await
|
||||
let timeout = match opts.kind {
|
||||
DownloadKind::Small => self.small_timeout,
|
||||
DownloadKind::Large => self.timeout,
|
||||
};
|
||||
|
||||
self.download_for_builder(builder, timeout, cancel).await
|
||||
}
|
||||
|
||||
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
|
||||
|
||||
@@ -24,6 +24,13 @@ pub struct RemoteStorageConfig {
|
||||
skip_serializing_if = "is_default_timeout"
|
||||
)]
|
||||
pub timeout: Duration,
|
||||
/// Alternative timeout used for metadata objects which are expected to be small
|
||||
#[serde(
|
||||
with = "humantime_serde",
|
||||
default = "default_small_timeout",
|
||||
skip_serializing_if = "is_default_small_timeout"
|
||||
)]
|
||||
pub small_timeout: Duration,
|
||||
}
|
||||
|
||||
impl RemoteStorageKind {
|
||||
@@ -40,10 +47,18 @@ fn default_timeout() -> Duration {
|
||||
RemoteStorageConfig::DEFAULT_TIMEOUT
|
||||
}
|
||||
|
||||
fn default_small_timeout() -> Duration {
|
||||
RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT
|
||||
}
|
||||
|
||||
fn is_default_timeout(d: &Duration) -> bool {
|
||||
*d == RemoteStorageConfig::DEFAULT_TIMEOUT
|
||||
}
|
||||
|
||||
fn is_default_small_timeout(d: &Duration) -> bool {
|
||||
*d == RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT
|
||||
}
|
||||
|
||||
/// A kind of a remote storage to connect to, with its connection configuration.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
|
||||
#[serde(untagged)]
|
||||
@@ -184,6 +199,7 @@ fn serialize_storage_class<S: serde::Serializer>(
|
||||
|
||||
impl RemoteStorageConfig {
|
||||
pub const DEFAULT_TIMEOUT: Duration = std::time::Duration::from_secs(120);
|
||||
pub const DEFAULT_SMALL_TIMEOUT: Duration = std::time::Duration::from_secs(30);
|
||||
|
||||
pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<RemoteStorageConfig> {
|
||||
Ok(utils::toml_edit_ext::deserialize_item(toml)?)
|
||||
@@ -219,7 +235,8 @@ timeout = '5s'";
|
||||
storage: RemoteStorageKind::LocalFs {
|
||||
local_path: Utf8PathBuf::from(".")
|
||||
},
|
||||
timeout: Duration::from_secs(5)
|
||||
timeout: Duration::from_secs(5),
|
||||
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT
|
||||
}
|
||||
);
|
||||
}
|
||||
@@ -247,7 +264,8 @@ timeout = '5s'";
|
||||
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
|
||||
upload_storage_class: Some(StorageClass::IntelligentTiering),
|
||||
}),
|
||||
timeout: Duration::from_secs(7)
|
||||
timeout: Duration::from_secs(7),
|
||||
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT
|
||||
}
|
||||
);
|
||||
}
|
||||
@@ -299,7 +317,8 @@ timeout = '5s'";
|
||||
concurrency_limit: default_remote_storage_azure_concurrency_limit(),
|
||||
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
|
||||
}),
|
||||
timeout: Duration::from_secs(7)
|
||||
timeout: Duration::from_secs(7),
|
||||
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@@ -178,6 +178,15 @@ pub struct DownloadOpts {
|
||||
/// The end of the byte range to download, or unbounded. Must be after the
|
||||
/// start bound.
|
||||
pub byte_end: Bound<u64>,
|
||||
/// Indicate whether we're downloading something small or large: this indirectly controls
|
||||
/// timeouts: for something like an index/manifest/heatmap, we should time out faster than
|
||||
/// for layer files
|
||||
pub kind: DownloadKind,
|
||||
}
|
||||
|
||||
pub enum DownloadKind {
|
||||
Large,
|
||||
Small,
|
||||
}
|
||||
|
||||
impl Default for DownloadOpts {
|
||||
@@ -186,6 +195,7 @@ impl Default for DownloadOpts {
|
||||
etag: Default::default(),
|
||||
byte_start: Bound::Unbounded,
|
||||
byte_end: Bound::Unbounded,
|
||||
kind: DownloadKind::Large,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -584,6 +594,10 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
impl GenericRemoteStorage {
|
||||
pub async fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
|
||||
let timeout = storage_config.timeout;
|
||||
|
||||
// If somkeone overrides timeout to be small without adjusting small_timeout, then adjust it automatically
|
||||
let small_timeout = std::cmp::min(storage_config.small_timeout, timeout);
|
||||
|
||||
Ok(match &storage_config.storage {
|
||||
RemoteStorageKind::LocalFs { local_path: path } => {
|
||||
info!("Using fs root '{path}' as a remote storage");
|
||||
@@ -606,7 +620,11 @@ impl GenericRemoteStorage {
|
||||
.unwrap_or("<AZURE_STORAGE_ACCOUNT>");
|
||||
info!("Using azure container '{}' in account '{storage_account}' in region '{}' as a remote storage, prefix in container: '{:?}'",
|
||||
azure_config.container_name, azure_config.container_region, azure_config.prefix_in_container);
|
||||
Self::AzureBlob(Arc::new(AzureBlobStorage::new(azure_config, timeout)?))
|
||||
Self::AzureBlob(Arc::new(AzureBlobStorage::new(
|
||||
azure_config,
|
||||
timeout,
|
||||
small_timeout,
|
||||
)?))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -219,7 +219,8 @@ async fn create_azure_client(
|
||||
concurrency_limit: NonZeroUsize::new(100).unwrap(),
|
||||
max_keys_per_list_response,
|
||||
}),
|
||||
timeout: Duration::from_secs(120),
|
||||
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
|
||||
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT,
|
||||
};
|
||||
Ok(Arc::new(
|
||||
GenericRemoteStorage::from_config(&remote_storage_config)
|
||||
|
||||
@@ -396,6 +396,7 @@ async fn create_s3_client(
|
||||
upload_storage_class: None,
|
||||
}),
|
||||
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
|
||||
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT,
|
||||
};
|
||||
Ok(Arc::new(
|
||||
GenericRemoteStorage::from_config(&remote_storage_config)
|
||||
|
||||
@@ -838,6 +838,7 @@ mod test {
|
||||
local_path: remote_fs_dir.clone(),
|
||||
},
|
||||
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
|
||||
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT,
|
||||
};
|
||||
let storage = GenericRemoteStorage::from_config(&storage_config)
|
||||
.await
|
||||
|
||||
@@ -5423,6 +5423,7 @@ pub(crate) mod harness {
|
||||
local_path: remote_fs_dir.clone(),
|
||||
},
|
||||
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
|
||||
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT,
|
||||
};
|
||||
let remote_storage = GenericRemoteStorage::from_config(&config).await.unwrap();
|
||||
let deletion_queue = MockDeletionQueue::new(Some(remote_storage.clone()));
|
||||
|
||||
@@ -30,7 +30,9 @@ use crate::tenant::Generation;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
|
||||
use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile};
|
||||
use crate::TEMP_FILE_SUFFIX;
|
||||
use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath};
|
||||
use remote_storage::{
|
||||
DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath,
|
||||
};
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::pausable_failpoint;
|
||||
@@ -345,12 +347,13 @@ pub async fn list_remote_timelines(
|
||||
async fn do_download_remote_path_retry_forever(
|
||||
storage: &GenericRemoteStorage,
|
||||
remote_path: &RemotePath,
|
||||
download_opts: DownloadOpts,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(Vec<u8>, SystemTime), DownloadError> {
|
||||
download_retry_forever(
|
||||
|| async {
|
||||
let download = storage
|
||||
.download(remote_path, &DownloadOpts::default(), cancel)
|
||||
.download(remote_path, &download_opts, cancel)
|
||||
.await?;
|
||||
|
||||
let mut bytes = Vec::new();
|
||||
@@ -377,8 +380,13 @@ async fn do_download_tenant_manifest(
|
||||
) -> Result<(TenantManifest, Generation, SystemTime), DownloadError> {
|
||||
let remote_path = remote_tenant_manifest_path(tenant_shard_id, generation);
|
||||
|
||||
let download_opts = DownloadOpts {
|
||||
kind: DownloadKind::Small,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let (manifest_bytes, manifest_bytes_mtime) =
|
||||
do_download_remote_path_retry_forever(storage, &remote_path, cancel).await?;
|
||||
do_download_remote_path_retry_forever(storage, &remote_path, download_opts, cancel).await?;
|
||||
|
||||
let tenant_manifest = TenantManifest::from_json_bytes(&manifest_bytes)
|
||||
.with_context(|| format!("deserialize tenant manifest file at {remote_path:?}"))
|
||||
@@ -398,8 +406,13 @@ async fn do_download_index_part(
|
||||
timeline_id.expect("A timeline ID is always provided when downloading an index");
|
||||
let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
|
||||
|
||||
let download_opts = DownloadOpts {
|
||||
kind: DownloadKind::Small,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let (index_part_bytes, index_part_mtime) =
|
||||
do_download_remote_path_retry_forever(storage, &remote_path, cancel).await?;
|
||||
do_download_remote_path_retry_forever(storage, &remote_path, download_opts, cancel).await?;
|
||||
|
||||
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
|
||||
.with_context(|| format!("deserialize index part file at {remote_path:?}"))
|
||||
|
||||
@@ -49,7 +49,7 @@ use futures::Future;
|
||||
use metrics::UIntGauge;
|
||||
use pageserver_api::models::SecondaryProgress;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::{DownloadError, DownloadOpts, Etag, GenericRemoteStorage};
|
||||
use remote_storage::{DownloadError, DownloadKind, DownloadOpts, Etag, GenericRemoteStorage};
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info_span, instrument, warn, Instrument};
|
||||
@@ -946,6 +946,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
let cancel = &self.secondary_state.cancel;
|
||||
let opts = DownloadOpts {
|
||||
etag: prev_etag.cloned(),
|
||||
kind: DownloadKind::Small,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
|
||||
@@ -4,7 +4,8 @@ use anyhow::Context;
|
||||
use bytes::Bytes;
|
||||
use postgres_ffi::ControlFileData;
|
||||
use remote_storage::{
|
||||
Download, DownloadError, DownloadOpts, GenericRemoteStorage, Listing, ListingObject, RemotePath,
|
||||
Download, DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, Listing,
|
||||
ListingObject, RemotePath,
|
||||
};
|
||||
use serde::de::DeserializeOwned;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -239,6 +240,7 @@ impl RemoteStorageWrapper {
|
||||
.download(
|
||||
path,
|
||||
&DownloadOpts {
|
||||
kind: DownloadKind::Large,
|
||||
etag: None,
|
||||
byte_start: Bound::Included(start_inclusive),
|
||||
byte_end: Bound::Excluded(end_exclusive)
|
||||
|
||||
@@ -486,6 +486,7 @@ mod tests {
|
||||
upload_storage_class: None,
|
||||
}),
|
||||
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
|
||||
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT,
|
||||
})
|
||||
);
|
||||
assert_eq!(parquet_upload.parquet_upload_row_group_size, 100);
|
||||
@@ -545,6 +546,7 @@ mod tests {
|
||||
local_path: tmpdir.to_path_buf(),
|
||||
},
|
||||
timeout: std::time::Duration::from_secs(120),
|
||||
small_timeout: std::time::Duration::from_secs(30),
|
||||
};
|
||||
let storage = GenericRemoteStorage::from_config(&remote_storage_config)
|
||||
.await
|
||||
|
||||
Reference in New Issue
Block a user