From eb919cab88b8a28eb423b33eb07a858acbd61eab Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Fri, 9 Feb 2024 14:52:58 +0200 Subject: [PATCH] prepare to move timeouts and cancellation handling to remote_storage (#6696) This PR is preliminary cleanups and refactoring around `remote_storage` for next PR which will move the timeouts and cancellation into `remote_storage`. Summary: - smaller drive-by fixes - code simplification - refactor common parts like `DownloadError::is_permanent` - align error types with `RemoteStorage::list_*` to use more `download_retry` helper Cc: #6096 --- libs/remote_storage/src/lib.rs | 26 ++++++- libs/remote_storage/src/local_fs.rs | 50 ++++++++---- libs/remote_storage/src/s3_bucket.rs | 77 ++++++------------- libs/remote_storage/src/simulate_failures.rs | 28 ++++--- libs/remote_storage/src/support.rs | 33 ++++++++ pageserver/src/task_mgr.rs | 4 +- pageserver/src/tenant.rs | 4 +- .../src/tenant/remote_timeline_client.rs | 35 ++++----- .../tenant/remote_timeline_client/download.rs | 59 +++++--------- pageserver/src/tenant/secondary/downloader.rs | 2 +- 10 files changed, 175 insertions(+), 143 deletions(-) create mode 100644 libs/remote_storage/src/support.rs diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index e64b1de6f9..b6648931ac 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -13,6 +13,7 @@ mod azure_blob; mod local_fs; mod s3_bucket; mod simulate_failures; +mod support; use std::{ collections::HashMap, fmt::Debug, num::NonZeroUsize, pin::Pin, sync::Arc, time::SystemTime, @@ -170,7 +171,10 @@ pub trait RemoteStorage: Send + Sync + 'static { /// whereas, /// list_prefixes("foo/bar/") = ["cat", "dog"] /// See `test_real_s3.rs` for more details. - async fn list_files(&self, prefix: Option<&RemotePath>) -> anyhow::Result> { + async fn list_files( + &self, + prefix: Option<&RemotePath>, + ) -> Result, DownloadError> { let result = self.list(prefix, ListingMode::NoDelimiter).await?.keys; Ok(result) } @@ -179,7 +183,7 @@ pub trait RemoteStorage: Send + Sync + 'static { &self, prefix: Option<&RemotePath>, _mode: ListingMode, - ) -> anyhow::Result; + ) -> Result; /// Streams the local file contents into remote into the remote storage entry. async fn upload( @@ -269,6 +273,19 @@ impl std::fmt::Display for DownloadError { impl std::error::Error for DownloadError {} +impl DownloadError { + /// Returns true if the error should not be retried with backoff + pub fn is_permanent(&self) -> bool { + use DownloadError::*; + match self { + BadInput(_) => true, + NotFound => true, + Cancelled => true, + Other(_) => false, + } + } +} + #[derive(Debug)] pub enum TimeTravelError { /// Validation or other error happened due to user input. @@ -336,7 +353,10 @@ impl GenericRemoteStorage> { // A function for listing all the files in a "directory" // Example: // list_files("foo/bar") = ["foo/bar/a.txt", "foo/bar/b.txt"] - pub async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result> { + pub async fn list_files( + &self, + folder: Option<&RemotePath>, + ) -> Result, DownloadError> { match self { Self::LocalFs(s) => s.list_files(folder).await, Self::AwsS3(s) => s.list_files(folder).await, diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 36ec15e1b1..3ebea76181 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -18,9 +18,7 @@ use tokio_util::{io::ReaderStream, sync::CancellationToken}; use tracing::*; use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty}; -use crate::{ - Download, DownloadError, DownloadStream, Listing, ListingMode, RemotePath, TimeTravelError, -}; +use crate::{Download, DownloadError, Listing, ListingMode, RemotePath, TimeTravelError}; use super::{RemoteStorage, StorageMetadata}; @@ -365,27 +363,33 @@ impl RemoteStorage for LocalFs { format!("Failed to open source file {target_path:?} to use in the download") }) .map_err(DownloadError::Other)?; + + let len = source + .metadata() + .await + .context("query file length") + .map_err(DownloadError::Other)? + .len(); + source .seek(io::SeekFrom::Start(start_inclusive)) .await .context("Failed to seek to the range start in a local storage file") .map_err(DownloadError::Other)?; + let metadata = self .read_storage_metadata(&target_path) .await .map_err(DownloadError::Other)?; - let download_stream: DownloadStream = match end_exclusive { - Some(end_exclusive) => Box::pin(ReaderStream::new( - source.take(end_exclusive - start_inclusive), - )), - None => Box::pin(ReaderStream::new(source)), - }; + let source = source.take(end_exclusive.unwrap_or(len) - start_inclusive); + let source = ReaderStream::new(source); + Ok(Download { metadata, last_modified: None, etag: None, - download_stream, + download_stream: Box::pin(source), }) } else { Err(DownloadError::NotFound) @@ -514,10 +518,8 @@ mod fs_tests { use futures_util::Stream; use std::{collections::HashMap, io::Write}; - async fn read_and_assert_remote_file_contents( + async fn read_and_check_metadata( storage: &LocalFs, - #[allow(clippy::ptr_arg)] - // have to use &Utf8PathBuf due to `storage.local_path` parameter requirements remote_storage_path: &RemotePath, expected_metadata: Option<&StorageMetadata>, ) -> anyhow::Result { @@ -596,7 +598,7 @@ mod fs_tests { let upload_name = "upload_1"; let upload_target = upload_dummy_file(&storage, upload_name, None).await?; - let contents = read_and_assert_remote_file_contents(&storage, &upload_target, None).await?; + let contents = read_and_check_metadata(&storage, &upload_target, None).await?; assert_eq!( dummy_contents(upload_name), contents, @@ -618,7 +620,7 @@ mod fs_tests { let upload_target = upload_dummy_file(&storage, upload_name, None).await?; let full_range_download_contents = - read_and_assert_remote_file_contents(&storage, &upload_target, None).await?; + read_and_check_metadata(&storage, &upload_target, None).await?; assert_eq!( dummy_contents(upload_name), full_range_download_contents, @@ -660,6 +662,22 @@ mod fs_tests { "Second part bytes should be returned when requested" ); + let suffix_bytes = storage + .download_byte_range(&upload_target, 13, None) + .await? + .download_stream; + let suffix_bytes = aggregate(suffix_bytes).await?; + let suffix = std::str::from_utf8(&suffix_bytes)?; + assert_eq!(upload_name, suffix); + + let all_bytes = storage + .download_byte_range(&upload_target, 0, None) + .await? + .download_stream; + let all_bytes = aggregate(all_bytes).await?; + let all_bytes = std::str::from_utf8(&all_bytes)?; + assert_eq!(dummy_contents("upload_1"), all_bytes); + Ok(()) } @@ -736,7 +754,7 @@ mod fs_tests { upload_dummy_file(&storage, upload_name, Some(metadata.clone())).await?; let full_range_download_contents = - read_and_assert_remote_file_contents(&storage, &upload_target, Some(&metadata)).await?; + read_and_check_metadata(&storage, &upload_target, Some(&metadata)).await?; assert_eq!( dummy_contents(upload_name), full_range_download_contents, diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index c9ad9ef225..2b33a6ffd1 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -45,8 +45,9 @@ use utils::backoff; use super::StorageMetadata; use crate::{ - ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage, - S3Config, TimeTravelError, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR, + support::PermitCarrying, ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, + RemotePath, RemoteStorage, S3Config, TimeTravelError, MAX_KEYS_PER_DELETE, + REMOTE_STORAGE_PREFIX_SEPARATOR, }; pub(super) mod metrics; @@ -63,7 +64,6 @@ pub struct S3Bucket { concurrency_limiter: ConcurrencyLimiter, } -#[derive(Default)] struct GetObjectRequest { bucket: String, key: String, @@ -232,24 +232,8 @@ impl S3Bucket { let started_at = ScopeGuard::into_inner(started_at); - match get_object { - Ok(object_output) => { - let metadata = object_output.metadata().cloned().map(StorageMetadata); - let etag = object_output.e_tag.clone(); - let last_modified = object_output.last_modified.and_then(|t| t.try_into().ok()); - - let body = object_output.body; - let body = ByteStreamAsStream::from(body); - let body = PermitCarrying::new(permit, body); - let body = TimedDownload::new(started_at, body); - - Ok(Download { - metadata, - etag, - last_modified, - download_stream: Box::pin(body), - }) - } + let object_output = match get_object { + Ok(object_output) => object_output, Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => { // Count this in the AttemptOutcome::Ok bucket, because 404 is not // an error: we expect to sometimes fetch an object and find it missing, @@ -259,7 +243,7 @@ impl S3Bucket { AttemptOutcome::Ok, started_at, ); - Err(DownloadError::NotFound) + return Err(DownloadError::NotFound); } Err(e) => { metrics::BUCKET_METRICS.req_seconds.observe_elapsed( @@ -268,11 +252,27 @@ impl S3Bucket { started_at, ); - Err(DownloadError::Other( + return Err(DownloadError::Other( anyhow::Error::new(e).context("download s3 object"), - )) + )); } - } + }; + + let metadata = object_output.metadata().cloned().map(StorageMetadata); + let etag = object_output.e_tag; + let last_modified = object_output.last_modified.and_then(|t| t.try_into().ok()); + + let body = object_output.body; + let body = ByteStreamAsStream::from(body); + let body = PermitCarrying::new(permit, body); + let body = TimedDownload::new(started_at, body); + + Ok(Download { + metadata, + etag, + last_modified, + download_stream: Box::pin(body), + }) } async fn delete_oids( @@ -354,33 +354,6 @@ impl Stream for ByteStreamAsStream { // sense and Stream::size_hint does not really } -pin_project_lite::pin_project! { - /// An `AsyncRead` adapter which carries a permit for the lifetime of the value. - struct PermitCarrying { - permit: tokio::sync::OwnedSemaphorePermit, - #[pin] - inner: S, - } -} - -impl PermitCarrying { - fn new(permit: tokio::sync::OwnedSemaphorePermit, inner: S) -> Self { - Self { permit, inner } - } -} - -impl>> Stream for PermitCarrying { - type Item = ::Item; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().inner.poll_next(cx) - } - - fn size_hint(&self) -> (usize, Option) { - self.inner.size_hint() - } -} - pin_project_lite::pin_project! { /// Times and tracks the outcome of the request. struct TimedDownload { diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index 82d5a61fda..14bdb5ed4d 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -60,7 +60,7 @@ impl UnreliableWrapper { /// On the first attempts of this operation, return an error. After 'attempts_to_fail' /// attempts, let the operation go ahead, and clear the counter. /// - fn attempt(&self, op: RemoteOp) -> Result { + fn attempt(&self, op: RemoteOp) -> anyhow::Result { let mut attempts = self.attempts.lock().unwrap(); match attempts.entry(op) { @@ -78,13 +78,13 @@ impl UnreliableWrapper { } else { let error = anyhow::anyhow!("simulated failure of remote operation {:?}", e.key()); - Err(DownloadError::Other(error)) + Err(error) } } Entry::Vacant(e) => { let error = anyhow::anyhow!("simulated failure of remote operation {:?}", e.key()); e.insert(1); - Err(DownloadError::Other(error)) + Err(error) } } } @@ -105,12 +105,17 @@ impl RemoteStorage for UnreliableWrapper { &self, prefix: Option<&RemotePath>, ) -> Result, DownloadError> { - self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))?; + self.attempt(RemoteOp::ListPrefixes(prefix.cloned())) + .map_err(DownloadError::Other)?; self.inner.list_prefixes(prefix).await } - async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result> { - self.attempt(RemoteOp::ListPrefixes(folder.cloned()))?; + async fn list_files( + &self, + folder: Option<&RemotePath>, + ) -> Result, DownloadError> { + self.attempt(RemoteOp::ListPrefixes(folder.cloned())) + .map_err(DownloadError::Other)?; self.inner.list_files(folder).await } @@ -119,7 +124,8 @@ impl RemoteStorage for UnreliableWrapper { prefix: Option<&RemotePath>, mode: ListingMode, ) -> Result { - self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))?; + self.attempt(RemoteOp::ListPrefixes(prefix.cloned())) + .map_err(DownloadError::Other)?; self.inner.list(prefix, mode).await } @@ -137,7 +143,8 @@ impl RemoteStorage for UnreliableWrapper { } async fn download(&self, from: &RemotePath) -> Result { - self.attempt(RemoteOp::Download(from.clone()))?; + self.attempt(RemoteOp::Download(from.clone())) + .map_err(DownloadError::Other)?; self.inner.download(from).await } @@ -150,7 +157,8 @@ impl RemoteStorage for UnreliableWrapper { // Note: We treat any download_byte_range as an "attempt" of the same // operation. We don't pay attention to the ranges. That's good enough // for now. - self.attempt(RemoteOp::Download(from.clone()))?; + self.attempt(RemoteOp::Download(from.clone())) + .map_err(DownloadError::Other)?; self.inner .download_byte_range(from, start_inclusive, end_exclusive) .await @@ -193,7 +201,7 @@ impl RemoteStorage for UnreliableWrapper { cancel: &CancellationToken, ) -> Result<(), TimeTravelError> { self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned()))) - .map_err(|e| TimeTravelError::Other(anyhow::Error::new(e)))?; + .map_err(TimeTravelError::Other)?; self.inner .time_travel_recover(prefix, timestamp, done_if_after, cancel) .await diff --git a/libs/remote_storage/src/support.rs b/libs/remote_storage/src/support.rs new file mode 100644 index 0000000000..4688a484a5 --- /dev/null +++ b/libs/remote_storage/src/support.rs @@ -0,0 +1,33 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use futures_util::Stream; + +pin_project_lite::pin_project! { + /// An `AsyncRead` adapter which carries a permit for the lifetime of the value. + pub(crate) struct PermitCarrying { + permit: tokio::sync::OwnedSemaphorePermit, + #[pin] + inner: S, + } +} + +impl PermitCarrying { + pub(crate) fn new(permit: tokio::sync::OwnedSemaphorePermit, inner: S) -> Self { + Self { permit, inner } + } +} + +impl Stream for PermitCarrying { + type Item = ::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } +} diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 5a06a97525..3cec5fa850 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -576,8 +576,8 @@ pub fn shutdown_token() -> CancellationToken { /// Has the current task been requested to shut down? pub fn is_shutdown_requested() -> bool { - if let Ok(cancel) = SHUTDOWN_TOKEN.try_with(|t| t.clone()) { - cancel.is_cancelled() + if let Ok(true_or_false) = SHUTDOWN_TOKEN.try_with(|t| t.is_cancelled()) { + true_or_false } else { if !cfg!(test) { warn!("is_shutdown_requested() called in an unexpected task or thread"); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index f086f46213..4446c410b0 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -1377,7 +1377,7 @@ impl Tenant { async move { debug!("starting index part download"); - let index_part = client.download_index_file(cancel_clone).await; + let index_part = client.download_index_file(&cancel_clone).await; debug!("finished index part download"); @@ -2434,7 +2434,7 @@ impl Tenant { // operation is rare, so it's simpler to just download it (and robustly guarantees that the index // we use here really is the remotely persistent one). let result = tl_client - .download_index_file(self.cancel.clone()) + .download_index_file(&self.cancel) .instrument(info_span!("download_index_file", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id)) .await?; let index_part = match result { diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 152c9a2b7d..0c7dd68c3f 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -217,6 +217,7 @@ use crate::metrics::{ }; use crate::task_mgr::shutdown_token; use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id; +use crate::tenant::remote_timeline_client::download::download_retry; use crate::tenant::storage_layer::AsLayerDesc; use crate::tenant::upload_queue::Delete; use crate::tenant::TIMELINES_SEGMENT_NAME; @@ -262,6 +263,11 @@ pub(crate) const INITDB_PRESERVED_PATH: &str = "initdb-preserved.tar.zst"; /// Default buffer size when interfacing with [`tokio::fs::File`]. pub(crate) const BUFFER_SIZE: usize = 32 * 1024; +/// This timeout is intended to deal with hangs in lower layers, e.g. stuck TCP flows. It is not +/// intended to be snappy enough for prompt shutdown, as we have a CancellationToken for that. +pub(crate) const UPLOAD_TIMEOUT: Duration = Duration::from_secs(120); +pub(crate) const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(120); + pub enum MaybeDeletedIndexPart { IndexPart(IndexPart), Deleted(IndexPart), @@ -325,11 +331,6 @@ pub struct RemoteTimelineClient { cancel: CancellationToken, } -/// This timeout is intended to deal with hangs in lower layers, e.g. stuck TCP flows. It is not -/// intended to be snappy enough for prompt shutdown, as we have a CancellationToken for that. -const UPLOAD_TIMEOUT: Duration = Duration::from_secs(120); -const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(120); - /// Wrapper for timeout_cancellable that flattens result and converts TimeoutCancellableError to anyhow. /// /// This is a convenience for the various upload functions. In future @@ -506,7 +507,7 @@ impl RemoteTimelineClient { /// Download index file pub async fn download_index_file( &self, - cancel: CancellationToken, + cancel: &CancellationToken, ) -> Result { let _unfinished_gauge_guard = self.metrics.call_begin( &RemoteOpFileKind::Index, @@ -1147,22 +1148,17 @@ impl RemoteTimelineClient { let cancel = shutdown_token(); - let remaining = backoff::retry( + let remaining = download_retry( || async { self.storage_impl .list_files(Some(&timeline_storage_path)) .await }, - |_e| false, - FAILED_DOWNLOAD_WARN_THRESHOLD, - FAILED_REMOTE_OP_RETRIES, - "list_prefixes", + "list remaining files", &cancel, ) .await - .ok_or_else(|| anyhow::anyhow!("Cancelled!")) - .and_then(|x| x) - .context("list prefixes")?; + .context("list files remaining files")?; // We will delete the current index_part object last, since it acts as a deletion // marker via its deleted_at attribute @@ -1351,6 +1347,7 @@ impl RemoteTimelineClient { /// queue. /// async fn perform_upload_task(self: &Arc, task: Arc) { + let cancel = shutdown_token(); // Loop to retry until it completes. loop { // If we're requested to shut down, close up shop and exit. @@ -1362,7 +1359,7 @@ impl RemoteTimelineClient { // the Future, but we're not 100% sure if the remote storage library // is cancellation safe, so we don't dare to do that. Hopefully, the // upload finishes or times out soon enough. - if task_mgr::is_shutdown_requested() { + if cancel.is_cancelled() { info!("upload task cancelled by shutdown request"); match self.stop() { Ok(()) => {} @@ -1473,7 +1470,7 @@ impl RemoteTimelineClient { retries, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, - &shutdown_token(), + &cancel, ) .await; } @@ -1990,7 +1987,7 @@ mod tests { // Download back the index.json, and check that the list of files is correct let initial_index_part = match client - .download_index_file(CancellationToken::new()) + .download_index_file(&CancellationToken::new()) .await .unwrap() { @@ -2084,7 +2081,7 @@ mod tests { // Download back the index.json, and check that the list of files is correct let index_part = match client - .download_index_file(CancellationToken::new()) + .download_index_file(&CancellationToken::new()) .await .unwrap() { @@ -2286,7 +2283,7 @@ mod tests { let client = test_state.build_client(get_generation); let download_r = client - .download_index_file(CancellationToken::new()) + .download_index_file(&CancellationToken::new()) .await .expect("download should always succeed"); assert!(matches!(download_r, MaybeDeletedIndexPart::IndexPart(_))); diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 6c1125746b..33287fc8f4 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -216,16 +216,15 @@ pub async fn list_remote_timelines( anyhow::bail!("storage-sync-list-remote-timelines"); }); - let cancel_inner = cancel.clone(); let listing = download_retry_forever( || { download_cancellable( - &cancel_inner, + &cancel, storage.list(Some(&remote_path), ListingMode::WithDelimiter), ) }, &format!("list timelines for {tenant_shard_id}"), - cancel, + &cancel, ) .await?; @@ -258,19 +257,18 @@ async fn do_download_index_part( tenant_shard_id: &TenantShardId, timeline_id: &TimelineId, index_generation: Generation, - cancel: CancellationToken, + cancel: &CancellationToken, ) -> Result { use futures::stream::StreamExt; let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation); - let cancel_inner = cancel.clone(); let index_part_bytes = download_retry_forever( || async { // Cancellation: if is safe to cancel this future because we're just downloading into // a memory buffer, not touching local disk. let index_part_download = - download_cancellable(&cancel_inner, storage.download(&remote_path)).await?; + download_cancellable(cancel, storage.download(&remote_path)).await?; let mut index_part_bytes = Vec::new(); let mut stream = std::pin::pin!(index_part_download.download_stream); @@ -288,7 +286,7 @@ async fn do_download_index_part( .await?; let index_part: IndexPart = serde_json::from_slice(&index_part_bytes) - .with_context(|| format!("download index part file at {remote_path:?}")) + .with_context(|| format!("deserialize index part file at {remote_path:?}")) .map_err(DownloadError::Other)?; Ok(index_part) @@ -305,7 +303,7 @@ pub(super) async fn download_index_part( tenant_shard_id: &TenantShardId, timeline_id: &TimelineId, my_generation: Generation, - cancel: CancellationToken, + cancel: &CancellationToken, ) -> Result { debug_assert_current_span_has_tenant_and_timeline_id(); @@ -325,14 +323,8 @@ pub(super) async fn download_index_part( // index in our generation. // // This is an optimization to avoid doing the listing for the general case below. - let res = do_download_index_part( - storage, - tenant_shard_id, - timeline_id, - my_generation, - cancel.clone(), - ) - .await; + let res = + do_download_index_part(storage, tenant_shard_id, timeline_id, my_generation, cancel).await; match res { Ok(index_part) => { tracing::debug!( @@ -357,7 +349,7 @@ pub(super) async fn download_index_part( tenant_shard_id, timeline_id, my_generation.previous(), - cancel.clone(), + cancel, ) .await; match res { @@ -379,18 +371,13 @@ pub(super) async fn download_index_part( // objects, and select the highest one with a generation <= my_generation. Constructing the prefix is equivalent // to constructing a full index path with no generation, because the generation is a suffix. let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none()); - let indices = backoff::retry( + + let indices = download_retry( || async { storage.list_files(Some(&index_prefix)).await }, - |_| false, - FAILED_DOWNLOAD_WARN_THRESHOLD, - FAILED_REMOTE_OP_RETRIES, - "listing index_part files", - &cancel, + "list index_part files", + cancel, ) - .await - .ok_or_else(|| anyhow::anyhow!("Cancelled")) - .and_then(|x| x) - .map_err(DownloadError::Other)?; + .await?; // General case logic for which index to use: the latest index whose generation // is <= our own. See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md @@ -447,8 +434,6 @@ pub(crate) async fn download_initdb_tar_zst( "{INITDB_PATH}.download-{timeline_id}.{TEMP_FILE_SUFFIX}" )); - let cancel_inner = cancel.clone(); - let file = download_retry( || async { let file = OpenOptions::new() @@ -461,13 +446,11 @@ pub(crate) async fn download_initdb_tar_zst( .with_context(|| format!("tempfile creation {temp_path}")) .map_err(DownloadError::Other)?; - let download = match download_cancellable(&cancel_inner, storage.download(&remote_path)) - .await + let download = match download_cancellable(cancel, storage.download(&remote_path)).await { Ok(dl) => dl, Err(DownloadError::NotFound) => { - download_cancellable(&cancel_inner, storage.download(&remote_preserved_path)) - .await? + download_cancellable(cancel, storage.download(&remote_preserved_path)).await? } Err(other) => Err(other)?, }; @@ -516,7 +499,7 @@ pub(crate) async fn download_initdb_tar_zst( /// with backoff. /// /// (See similar logic for uploads in `perform_upload_task`) -async fn download_retry( +pub(super) async fn download_retry( op: O, description: &str, cancel: &CancellationToken, @@ -527,7 +510,7 @@ where { backoff::retry( op, - |e| matches!(e, DownloadError::BadInput(_) | DownloadError::NotFound), + DownloadError::is_permanent, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, description, @@ -541,7 +524,7 @@ where async fn download_retry_forever( op: O, description: &str, - cancel: CancellationToken, + cancel: &CancellationToken, ) -> Result where O: FnMut() -> F, @@ -549,11 +532,11 @@ where { backoff::retry( op, - |e| matches!(e, DownloadError::BadInput(_) | DownloadError::NotFound), + DownloadError::is_permanent, FAILED_DOWNLOAD_WARN_THRESHOLD, u32::MAX, description, - &cancel, + cancel, ) .await .ok_or_else(|| DownloadError::Cancelled) diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 9330edf946..0666e104f8 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -533,7 +533,7 @@ impl<'a> TenantDownloader<'a> { .map_err(UpdateError::from)?; let mut heatmap_bytes = Vec::new(); let mut body = tokio_util::io::StreamReader::new(download.download_stream); - let _size = tokio::io::copy(&mut body, &mut heatmap_bytes).await?; + let _size = tokio::io::copy_buf(&mut body, &mut heatmap_bytes).await?; Ok(heatmap_bytes) }, |e| matches!(e, UpdateError::NoData | UpdateError::Cancelled),