diff --git a/Cargo.lock b/Cargo.lock index 45a313a72b..74cd2c8d2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4436,6 +4436,7 @@ dependencies = [ "futures", "futures-util", "http-types", + "humantime", "hyper", "itertools", "metrics", @@ -4447,6 +4448,7 @@ dependencies = [ "serde_json", "test-context", "tokio", + "tokio-stream", "tokio-util", "toml_edit", "tracing", diff --git a/libs/remote_storage/Cargo.toml b/libs/remote_storage/Cargo.toml index 2cc59a947b..15f3cd3b80 100644 --- a/libs/remote_storage/Cargo.toml +++ b/libs/remote_storage/Cargo.toml @@ -15,11 +15,13 @@ aws-sdk-s3.workspace = true aws-credential-types.workspace = true bytes.workspace = true camino.workspace = true +humantime.workspace = true hyper = { workspace = true, features = ["stream"] } futures.workspace = true serde.workspace = true serde_json.workspace = true tokio = { workspace = true, features = ["sync", "fs", "io-util"] } +tokio-stream.workspace = true tokio-util = { workspace = true, features = ["compat"] } toml_edit.workspace = true tracing.workspace = true diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index df6d45dde1..12ec680cb6 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -22,16 +22,15 @@ use azure_storage_blobs::{blob::operations::GetBlobBuilder, prelude::ContainerCl use bytes::Bytes; use futures::stream::Stream; use futures_util::StreamExt; +use futures_util::TryStreamExt; use http_types::{StatusCode, Url}; -use tokio::time::Instant; use tokio_util::sync::CancellationToken; use tracing::debug; -use crate::s3_bucket::RequestKind; -use crate::TimeTravelError; use crate::{ - AzureConfig, ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, - RemoteStorage, StorageMetadata, + error::Cancelled, s3_bucket::RequestKind, AzureConfig, ConcurrencyLimiter, Download, + DownloadError, Listing, ListingMode, RemotePath, RemoteStorage, StorageMetadata, + TimeTravelError, TimeoutOrCancel, }; pub struct AzureBlobStorage { @@ -39,10 +38,12 @@ pub struct AzureBlobStorage { prefix_in_container: Option, max_keys_per_list_response: Option, concurrency_limiter: ConcurrencyLimiter, + // Per-request timeout. Accessible for tests. + pub timeout: Duration, } impl AzureBlobStorage { - pub fn new(azure_config: &AzureConfig) -> Result { + pub fn new(azure_config: &AzureConfig, timeout: Duration) -> Result { debug!( "Creating azure remote storage for azure container {}", azure_config.container_name @@ -79,6 +80,7 @@ impl AzureBlobStorage { prefix_in_container: azure_config.prefix_in_container.to_owned(), max_keys_per_list_response, concurrency_limiter: ConcurrencyLimiter::new(azure_config.concurrency_limit.get()), + timeout, }) } @@ -121,8 +123,11 @@ impl AzureBlobStorage { async fn download_for_builder( &self, builder: GetBlobBuilder, + cancel: &CancellationToken, ) -> Result { - let mut response = builder.into_stream(); + let kind = RequestKind::Get; + + let _permit = self.permit(kind, cancel).await?; let mut etag = None; let mut last_modified = None; @@ -130,39 +135,70 @@ impl AzureBlobStorage { // TODO give proper streaming response instead of buffering into RAM // https://github.com/neondatabase/neon/issues/5563 - let mut bufs = Vec::new(); - while let Some(part) = response.next().await { - let part = part.map_err(to_download_error)?; - let etag_str: &str = part.blob.properties.etag.as_ref(); - if etag.is_none() { - etag = Some(etag.unwrap_or_else(|| etag_str.to_owned())); + let download = async { + let response = builder + // convert to concrete Pageable + .into_stream() + // convert to TryStream + .into_stream() + .map_err(to_download_error); + + // apply per request timeout + let response = tokio_stream::StreamExt::timeout(response, self.timeout); + + // flatten + let response = response.map(|res| match res { + Ok(res) => res, + Err(_elapsed) => Err(DownloadError::Timeout), + }); + + let mut response = std::pin::pin!(response); + + let mut bufs = Vec::new(); + while let Some(part) = response.next().await { + let part = part?; + let etag_str: &str = part.blob.properties.etag.as_ref(); + if etag.is_none() { + etag = Some(etag.unwrap_or_else(|| etag_str.to_owned())); + } + if last_modified.is_none() { + last_modified = Some(part.blob.properties.last_modified.into()); + } + if let Some(blob_meta) = part.blob.metadata { + metadata.extend(blob_meta.iter().map(|(k, v)| (k.to_owned(), v.to_owned()))); + } + let data = part + .data + .collect() + .await + .map_err(|e| DownloadError::Other(e.into()))?; + bufs.push(data); } - if last_modified.is_none() { - last_modified = Some(part.blob.properties.last_modified.into()); - } - if let Some(blob_meta) = part.blob.metadata { - metadata.extend(blob_meta.iter().map(|(k, v)| (k.to_owned(), v.to_owned()))); - } - let data = part - .data - .collect() - .await - .map_err(|e| DownloadError::Other(e.into()))?; - bufs.push(data); + Ok(Download { + download_stream: Box::pin(futures::stream::iter(bufs.into_iter().map(Ok))), + etag, + last_modified, + metadata: Some(StorageMetadata(metadata)), + }) + }; + + tokio::select! { + bufs = download => bufs, + _ = cancel.cancelled() => Err(DownloadError::Cancelled), } - Ok(Download { - download_stream: Box::pin(futures::stream::iter(bufs.into_iter().map(Ok))), - etag, - last_modified, - metadata: Some(StorageMetadata(metadata)), - }) } - async fn permit(&self, kind: RequestKind) -> tokio::sync::SemaphorePermit<'_> { - self.concurrency_limiter - .acquire(kind) - .await - .expect("semaphore is never closed") + async fn permit( + &self, + kind: RequestKind, + cancel: &CancellationToken, + ) -> Result, Cancelled> { + let acquire = self.concurrency_limiter.acquire(kind); + + tokio::select! { + permit = acquire => Ok(permit.expect("never closed")), + _ = cancel.cancelled() => Err(Cancelled), + } } } @@ -192,66 +228,87 @@ impl RemoteStorage for AzureBlobStorage { prefix: Option<&RemotePath>, mode: ListingMode, max_keys: Option, + cancel: &CancellationToken, ) -> anyhow::Result { - // get the passed prefix or if it is not set use prefix_in_bucket value - let list_prefix = prefix - .map(|p| self.relative_path_to_name(p)) - .or_else(|| self.prefix_in_container.clone()) - .map(|mut p| { - // required to end with a separator - // otherwise request will return only the entry of a prefix - if matches!(mode, ListingMode::WithDelimiter) - && !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) - { - p.push(REMOTE_STORAGE_PREFIX_SEPARATOR); - } - p + let _permit = self.permit(RequestKind::List, cancel).await?; + + let op = async { + // get the passed prefix or if it is not set use prefix_in_bucket value + let list_prefix = prefix + .map(|p| self.relative_path_to_name(p)) + .or_else(|| self.prefix_in_container.clone()) + .map(|mut p| { + // required to end with a separator + // otherwise request will return only the entry of a prefix + if matches!(mode, ListingMode::WithDelimiter) + && !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) + { + p.push(REMOTE_STORAGE_PREFIX_SEPARATOR); + } + p + }); + + let mut builder = self.client.list_blobs(); + + if let ListingMode::WithDelimiter = mode { + builder = builder.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string()); + } + + if let Some(prefix) = list_prefix { + builder = builder.prefix(Cow::from(prefix.to_owned())); + } + + if let Some(limit) = self.max_keys_per_list_response { + builder = builder.max_results(MaxResults::new(limit)); + } + + let response = builder.into_stream(); + let response = response.into_stream().map_err(to_download_error); + let response = tokio_stream::StreamExt::timeout(response, self.timeout); + let response = response.map(|res| match res { + Ok(res) => res, + Err(_elapsed) => Err(DownloadError::Timeout), }); - let mut builder = self.client.list_blobs(); + let mut response = std::pin::pin!(response); - if let ListingMode::WithDelimiter = mode { - builder = builder.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string()); - } + let mut res = Listing::default(); - if let Some(prefix) = list_prefix { - builder = builder.prefix(Cow::from(prefix.to_owned())); - } + let mut max_keys = max_keys.map(|mk| mk.get()); + while let Some(entry) = response.next().await { + let entry = entry?; + let prefix_iter = entry + .blobs + .prefixes() + .map(|prefix| self.name_to_relative_path(&prefix.name)); + res.prefixes.extend(prefix_iter); - if let Some(limit) = self.max_keys_per_list_response { - builder = builder.max_results(MaxResults::new(limit)); - } + let blob_iter = entry + .blobs + .blobs() + .map(|k| self.name_to_relative_path(&k.name)); - let mut response = builder.into_stream(); - let mut res = Listing::default(); - // NonZeroU32 doesn't support subtraction apparently - let mut max_keys = max_keys.map(|mk| mk.get()); - while let Some(l) = response.next().await { - let entry = l.map_err(to_download_error)?; - let prefix_iter = entry - .blobs - .prefixes() - .map(|prefix| self.name_to_relative_path(&prefix.name)); - res.prefixes.extend(prefix_iter); + for key in blob_iter { + res.keys.push(key); - let blob_iter = entry - .blobs - .blobs() - .map(|k| self.name_to_relative_path(&k.name)); - - for key in blob_iter { - res.keys.push(key); - if let Some(mut mk) = max_keys { - assert!(mk > 0); - mk -= 1; - if mk == 0 { - return Ok(res); // limit reached + if let Some(mut mk) = max_keys { + assert!(mk > 0); + mk -= 1; + if mk == 0 { + return Ok(res); // limit reached + } + max_keys = Some(mk); } - max_keys = Some(mk); } } + + Ok(res) + }; + + tokio::select! { + res = op => res, + _ = cancel.cancelled() => Err(DownloadError::Cancelled), } - Ok(res) } async fn upload( @@ -260,35 +317,52 @@ impl RemoteStorage for AzureBlobStorage { data_size_bytes: usize, to: &RemotePath, metadata: Option, + cancel: &CancellationToken, ) -> anyhow::Result<()> { - let _permit = self.permit(RequestKind::Put).await; - let blob_client = self.client.blob_client(self.relative_path_to_name(to)); + let _permit = self.permit(RequestKind::Put, cancel).await?; - let from: Pin> + Send + Sync + 'static>> = - Box::pin(from); + let op = async { + let blob_client = self.client.blob_client(self.relative_path_to_name(to)); - let from = NonSeekableStream::new(from, data_size_bytes); + let from: Pin> + Send + Sync + 'static>> = + Box::pin(from); - let body = azure_core::Body::SeekableStream(Box::new(from)); + let from = NonSeekableStream::new(from, data_size_bytes); - let mut builder = blob_client.put_block_blob(body); + let body = azure_core::Body::SeekableStream(Box::new(from)); - if let Some(metadata) = metadata { - builder = builder.metadata(to_azure_metadata(metadata)); + let mut builder = blob_client.put_block_blob(body); + + if let Some(metadata) = metadata { + builder = builder.metadata(to_azure_metadata(metadata)); + } + + let fut = builder.into_future(); + let fut = tokio::time::timeout(self.timeout, fut); + + match fut.await { + Ok(Ok(_response)) => Ok(()), + Ok(Err(azure)) => Err(azure.into()), + Err(_timeout) => Err(TimeoutOrCancel::Cancel.into()), + } + }; + + tokio::select! { + res = op => res, + _ = cancel.cancelled() => Err(TimeoutOrCancel::Cancel.into()), } - - let _response = builder.into_future().await?; - - Ok(()) } - async fn download(&self, from: &RemotePath) -> Result { - let _permit = self.permit(RequestKind::Get).await; + async fn download( + &self, + from: &RemotePath, + cancel: &CancellationToken, + ) -> Result { let blob_client = self.client.blob_client(self.relative_path_to_name(from)); let builder = blob_client.get(); - self.download_for_builder(builder).await + self.download_for_builder(builder, cancel).await } async fn download_byte_range( @@ -296,8 +370,8 @@ impl RemoteStorage for AzureBlobStorage { from: &RemotePath, start_inclusive: u64, end_exclusive: Option, + cancel: &CancellationToken, ) -> Result { - let _permit = self.permit(RequestKind::Get).await; let blob_client = self.client.blob_client(self.relative_path_to_name(from)); let mut builder = blob_client.get(); @@ -309,82 +383,113 @@ impl RemoteStorage for AzureBlobStorage { }; builder = builder.range(range); - self.download_for_builder(builder).await + self.download_for_builder(builder, cancel).await } - async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> { - let _permit = self.permit(RequestKind::Delete).await; - let blob_client = self.client.blob_client(self.relative_path_to_name(path)); + async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> { + self.delete_objects(std::array::from_ref(path), cancel) + .await + } - let builder = blob_client.delete(); + async fn delete_objects<'a>( + &self, + paths: &'a [RemotePath], + cancel: &CancellationToken, + ) -> anyhow::Result<()> { + let _permit = self.permit(RequestKind::Delete, cancel).await?; - match builder.into_future().await { - Ok(_response) => Ok(()), - Err(e) => { - if let Some(http_err) = e.as_http_error() { - if http_err.status() == StatusCode::NotFound { - return Ok(()); + let op = async { + // TODO batch requests are also not supported by the SDK + // https://github.com/Azure/azure-sdk-for-rust/issues/1068 + // https://github.com/Azure/azure-sdk-for-rust/issues/1249 + for path in paths { + let blob_client = self.client.blob_client(self.relative_path_to_name(path)); + + let request = blob_client.delete().into_future(); + + let res = tokio::time::timeout(self.timeout, request).await; + + match res { + Ok(Ok(_response)) => continue, + Ok(Err(e)) => { + if let Some(http_err) = e.as_http_error() { + if http_err.status() == StatusCode::NotFound { + continue; + } + } + return Err(e.into()); } + Err(_elapsed) => return Err(TimeoutOrCancel::Timeout.into()), } - Err(anyhow::Error::new(e)) } + + Ok(()) + }; + + tokio::select! { + res = op => res, + _ = cancel.cancelled() => Err(TimeoutOrCancel::Cancel.into()), } } - async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> { - // Permit is already obtained by inner delete function + async fn copy( + &self, + from: &RemotePath, + to: &RemotePath, + cancel: &CancellationToken, + ) -> anyhow::Result<()> { + let _permit = self.permit(RequestKind::Copy, cancel).await?; - // TODO batch requests are also not supported by the SDK - // https://github.com/Azure/azure-sdk-for-rust/issues/1068 - // https://github.com/Azure/azure-sdk-for-rust/issues/1249 - for path in paths { - self.delete(path).await?; - } - Ok(()) - } + let timeout = tokio::time::sleep(self.timeout); - async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> { - let _permit = self.permit(RequestKind::Copy).await; - let blob_client = self.client.blob_client(self.relative_path_to_name(to)); + let mut copy_status = None; - let source_url = format!( - "{}/{}", - self.client.url()?, - self.relative_path_to_name(from) - ); - let builder = blob_client.copy(Url::from_str(&source_url)?); + let op = async { + let blob_client = self.client.blob_client(self.relative_path_to_name(to)); - let result = builder.into_future().await?; + let source_url = format!( + "{}/{}", + self.client.url()?, + self.relative_path_to_name(from) + ); - let mut copy_status = result.copy_status; - let start_time = Instant::now(); - const MAX_WAIT_TIME: Duration = Duration::from_secs(60); - loop { - match copy_status { - CopyStatus::Aborted => { - anyhow::bail!("Received abort for copy from {from} to {to}."); + let builder = blob_client.copy(Url::from_str(&source_url)?); + let copy = builder.into_future(); + + let result = copy.await?; + + copy_status = Some(result.copy_status); + loop { + match copy_status.as_ref().expect("we always set it to Some") { + CopyStatus::Aborted => { + anyhow::bail!("Received abort for copy from {from} to {to}."); + } + CopyStatus::Failed => { + anyhow::bail!("Received failure response for copy from {from} to {to}."); + } + CopyStatus::Success => return Ok(()), + CopyStatus::Pending => (), } - CopyStatus::Failed => { - anyhow::bail!("Received failure response for copy from {from} to {to}."); - } - CopyStatus::Success => return Ok(()), - CopyStatus::Pending => (), + // The copy is taking longer. Waiting a second and then re-trying. + // TODO estimate time based on copy_progress and adjust time based on that + tokio::time::sleep(Duration::from_millis(1000)).await; + let properties = blob_client.get_properties().into_future().await?; + let Some(status) = properties.blob.properties.copy_status else { + tracing::warn!("copy_status for copy is None!, from={from}, to={to}"); + return Ok(()); + }; + copy_status = Some(status); } - // The copy is taking longer. Waiting a second and then re-trying. - // TODO estimate time based on copy_progress and adjust time based on that - tokio::time::sleep(Duration::from_millis(1000)).await; - let properties = blob_client.get_properties().into_future().await?; - let Some(status) = properties.blob.properties.copy_status else { - tracing::warn!("copy_status for copy is None!, from={from}, to={to}"); - return Ok(()); - }; - if start_time.elapsed() > MAX_WAIT_TIME { - anyhow::bail!("Copy from from {from} to {to} took longer than limit MAX_WAIT_TIME={}s. copy_pogress={:?}.", - MAX_WAIT_TIME.as_secs_f32(), - properties.blob.properties.copy_progress, - ); - } - copy_status = status; + }; + + tokio::select! { + res = op => res, + _ = cancel.cancelled() => Err(anyhow::Error::new(TimeoutOrCancel::Cancel)), + _ = timeout => { + let e = anyhow::Error::new(TimeoutOrCancel::Timeout); + let e = e.context(format!("Timeout, last status: {copy_status:?}")); + Err(e) + }, } } diff --git a/libs/remote_storage/src/error.rs b/libs/remote_storage/src/error.rs new file mode 100644 index 0000000000..96f044e087 --- /dev/null +++ b/libs/remote_storage/src/error.rs @@ -0,0 +1,181 @@ +/// Reasons for downloads or listings to fail. +#[derive(Debug)] +pub enum DownloadError { + /// Validation or other error happened due to user input. + BadInput(anyhow::Error), + /// The file was not found in the remote storage. + NotFound, + /// A cancellation token aborted the download, typically during + /// tenant detach or process shutdown. + Cancelled, + /// A timeout happened while executing the request. Possible reasons: + /// - stuck tcp connection + /// + /// Concurrency control is not timed within timeout. + Timeout, + /// The file was found in the remote storage, but the download failed. + Other(anyhow::Error), +} + +impl std::fmt::Display for DownloadError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + DownloadError::BadInput(e) => { + write!(f, "Failed to download a remote file due to user input: {e}") + } + DownloadError::NotFound => write!(f, "No file found for the remote object id given"), + DownloadError::Cancelled => write!(f, "Cancelled, shutting down"), + DownloadError::Timeout => write!(f, "timeout"), + DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"), + } + } +} + +impl std::error::Error for DownloadError {} + +impl DownloadError { + /// Returns true if the error should not be retried with backoff + pub fn is_permanent(&self) -> bool { + use DownloadError::*; + match self { + BadInput(_) | NotFound | Cancelled => true, + Timeout | Other(_) => false, + } + } +} + +#[derive(Debug)] +pub enum TimeTravelError { + /// Validation or other error happened due to user input. + BadInput(anyhow::Error), + /// The used remote storage does not have time travel recovery implemented + Unimplemented, + /// The number of versions/deletion markers is above our limit. + TooManyVersions, + /// A cancellation token aborted the process, typically during + /// request closure or process shutdown. + Cancelled, + /// Other errors + Other(anyhow::Error), +} + +impl std::fmt::Display for TimeTravelError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + TimeTravelError::BadInput(e) => { + write!( + f, + "Failed to time travel recover a prefix due to user input: {e}" + ) + } + TimeTravelError::Unimplemented => write!( + f, + "time travel recovery is not implemented for the current storage backend" + ), + TimeTravelError::Cancelled => write!(f, "Cancelled, shutting down"), + TimeTravelError::TooManyVersions => { + write!(f, "Number of versions/delete markers above limit") + } + TimeTravelError::Other(e) => write!(f, "Failed to time travel recover a prefix: {e:?}"), + } + } +} + +impl std::error::Error for TimeTravelError {} + +/// Plain cancelled error. +/// +/// By design this type does not not implement `std::error::Error` so it cannot be put as the root +/// cause of `std::io::Error` or `anyhow::Error`. It should never need to be exposed out of this +/// crate. +/// +/// It exists to implement permit acquiring in `{Download,TimeTravel}Error` and `anyhow::Error` returning +/// operations and ensuring that those get converted to proper versions with just `?`. +#[derive(Debug)] +pub(crate) struct Cancelled; + +impl From for anyhow::Error { + fn from(_: Cancelled) -> Self { + anyhow::Error::new(TimeoutOrCancel::Cancel) + } +} + +impl From for TimeTravelError { + fn from(_: Cancelled) -> Self { + TimeTravelError::Cancelled + } +} + +impl From for TimeoutOrCancel { + fn from(_: Cancelled) -> Self { + TimeoutOrCancel::Cancel + } +} + +impl From for DownloadError { + fn from(_: Cancelled) -> Self { + DownloadError::Cancelled + } +} + +/// This type is used at as the root cause for timeouts and cancellations with `anyhow::Error` returning +/// RemoteStorage methods. +/// +/// For use with `utils::backoff::retry` and `anyhow::Error` returning operations there is +/// `TimeoutOrCancel::caused_by_cancel` method to query "proper form" errors. +#[derive(Debug)] +pub enum TimeoutOrCancel { + Timeout, + Cancel, +} + +impl std::fmt::Display for TimeoutOrCancel { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use TimeoutOrCancel::*; + match self { + Timeout => write!(f, "timeout"), + Cancel => write!(f, "cancel"), + } + } +} + +impl std::error::Error for TimeoutOrCancel {} + +impl TimeoutOrCancel { + pub fn caused(error: &anyhow::Error) -> Option<&Self> { + error.root_cause().downcast_ref() + } + + /// Returns true if the error was caused by [`TimeoutOrCancel::Cancel`]. + pub fn caused_by_cancel(error: &anyhow::Error) -> bool { + Self::caused(error).is_some_and(Self::is_cancel) + } + + pub fn is_cancel(&self) -> bool { + matches!(self, TimeoutOrCancel::Cancel) + } + + pub fn is_timeout(&self) -> bool { + matches!(self, TimeoutOrCancel::Timeout) + } +} + +/// This conversion is used when [`crate::support::DownloadStream`] notices a cancellation or +/// timeout to wrap it in an `std::io::Error`. +impl From for std::io::Error { + fn from(value: TimeoutOrCancel) -> Self { + let e = DownloadError::from(value); + std::io::Error::other(e) + } +} + +impl From for DownloadError { + fn from(value: TimeoutOrCancel) -> Self { + use TimeoutOrCancel::*; + + match value { + Timeout => DownloadError::Timeout, + Cancel => DownloadError::Cancelled, + } + } +} diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 5a0b74e406..b0b69f9155 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -10,6 +10,7 @@ #![deny(clippy::undocumented_unsafe_blocks)] mod azure_blob; +mod error; mod local_fs; mod s3_bucket; mod simulate_failures; @@ -21,7 +22,7 @@ use std::{ num::{NonZeroU32, NonZeroUsize}, pin::Pin, sync::Arc, - time::SystemTime, + time::{Duration, SystemTime}, }; use anyhow::{bail, Context}; @@ -41,6 +42,8 @@ pub use self::{ }; use s3_bucket::RequestKind; +pub use error::{DownloadError, TimeTravelError, TimeoutOrCancel}; + /// Currently, sync happens with AWS S3, that has two limits on requests per second: /// ~200 RPS for IAM services /// @@ -158,9 +161,10 @@ pub trait RemoteStorage: Send + Sync + 'static { async fn list_prefixes( &self, prefix: Option<&RemotePath>, + cancel: &CancellationToken, ) -> Result, DownloadError> { let result = self - .list(prefix, ListingMode::WithDelimiter, None) + .list(prefix, ListingMode::WithDelimiter, None, cancel) .await? .prefixes; Ok(result) @@ -182,9 +186,10 @@ pub trait RemoteStorage: Send + Sync + 'static { &self, prefix: Option<&RemotePath>, max_keys: Option, + cancel: &CancellationToken, ) -> Result, DownloadError> { let result = self - .list(prefix, ListingMode::NoDelimiter, max_keys) + .list(prefix, ListingMode::NoDelimiter, max_keys, cancel) .await? .keys; Ok(result) @@ -195,9 +200,13 @@ pub trait RemoteStorage: Send + Sync + 'static { prefix: Option<&RemotePath>, _mode: ListingMode, max_keys: Option, + cancel: &CancellationToken, ) -> Result; /// Streams the local file contents into remote into the remote storage entry. + /// + /// If the operation fails because of timeout or cancellation, the root cause of the error will be + /// set to `TimeoutOrCancel`. async fn upload( &self, from: impl Stream> + Send + Sync + 'static, @@ -206,27 +215,61 @@ pub trait RemoteStorage: Send + Sync + 'static { data_size_bytes: usize, to: &RemotePath, metadata: Option, + cancel: &CancellationToken, ) -> anyhow::Result<()>; - /// Streams the remote storage entry contents into the buffered writer given, returns the filled writer. + /// Streams the remote storage entry contents. + /// + /// The returned download stream will obey initial timeout and cancellation signal by erroring + /// on whichever happens first. Only one of the reasons will fail the stream, which is usually + /// enough for `tokio::io::copy_buf` usage. If needed the error can be filtered out. + /// /// Returns the metadata, if any was stored with the file previously. - async fn download(&self, from: &RemotePath) -> Result; + async fn download( + &self, + from: &RemotePath, + cancel: &CancellationToken, + ) -> Result; - /// Streams a given byte range of the remote storage entry contents into the buffered writer given, returns the filled writer. + /// Streams a given byte range of the remote storage entry contents. + /// + /// The returned download stream will obey initial timeout and cancellation signal by erroring + /// on whichever happens first. Only one of the reasons will fail the stream, which is usually + /// enough for `tokio::io::copy_buf` usage. If needed the error can be filtered out. + /// /// Returns the metadata, if any was stored with the file previously. async fn download_byte_range( &self, from: &RemotePath, start_inclusive: u64, end_exclusive: Option, + cancel: &CancellationToken, ) -> Result; - async fn delete(&self, path: &RemotePath) -> anyhow::Result<()>; + /// Delete a single path from remote storage. + /// + /// If the operation fails because of timeout or cancellation, the root cause of the error will be + /// set to `TimeoutOrCancel`. In such situation it is unknown if the deletion went through. + async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()>; - async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()>; + /// Delete a multiple paths from remote storage. + /// + /// If the operation fails because of timeout or cancellation, the root cause of the error will be + /// set to `TimeoutOrCancel`. In such situation it is unknown which deletions, if any, went + /// through. + async fn delete_objects<'a>( + &self, + paths: &'a [RemotePath], + cancel: &CancellationToken, + ) -> anyhow::Result<()>; /// Copy a remote object inside a bucket from one path to another. - async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()>; + async fn copy( + &self, + from: &RemotePath, + to: &RemotePath, + cancel: &CancellationToken, + ) -> anyhow::Result<()>; /// Resets the content of everything with the given prefix to the given state async fn time_travel_recover( @@ -238,7 +281,13 @@ pub trait RemoteStorage: Send + Sync + 'static { ) -> Result<(), TimeTravelError>; } -pub type DownloadStream = Pin> + Unpin + Send + Sync>>; +/// DownloadStream is sensitive to the timeout and cancellation used with the original +/// [`RemoteStorage::download`] request. The type yields `std::io::Result` to be compatible +/// with `tokio::io::copy_buf`. +// This has 'static because safekeepers do not use cancellation tokens (yet) +pub type DownloadStream = + Pin> + Send + Sync + 'static>>; + pub struct Download { pub download_stream: DownloadStream, /// The last time the file was modified (`last-modified` HTTP header) @@ -257,86 +306,6 @@ impl Debug for Download { } } -#[derive(Debug)] -pub enum DownloadError { - /// Validation or other error happened due to user input. - BadInput(anyhow::Error), - /// The file was not found in the remote storage. - NotFound, - /// A cancellation token aborted the download, typically during - /// tenant detach or process shutdown. - Cancelled, - /// The file was found in the remote storage, but the download failed. - Other(anyhow::Error), -} - -impl std::fmt::Display for DownloadError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - DownloadError::BadInput(e) => { - write!(f, "Failed to download a remote file due to user input: {e}") - } - DownloadError::Cancelled => write!(f, "Cancelled, shutting down"), - DownloadError::NotFound => write!(f, "No file found for the remote object id given"), - DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"), - } - } -} - -impl std::error::Error for DownloadError {} - -impl DownloadError { - /// Returns true if the error should not be retried with backoff - pub fn is_permanent(&self) -> bool { - use DownloadError::*; - match self { - BadInput(_) => true, - NotFound => true, - Cancelled => true, - Other(_) => false, - } - } -} - -#[derive(Debug)] -pub enum TimeTravelError { - /// Validation or other error happened due to user input. - BadInput(anyhow::Error), - /// The used remote storage does not have time travel recovery implemented - Unimplemented, - /// The number of versions/deletion markers is above our limit. - TooManyVersions, - /// A cancellation token aborted the process, typically during - /// request closure or process shutdown. - Cancelled, - /// Other errors - Other(anyhow::Error), -} - -impl std::fmt::Display for TimeTravelError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - TimeTravelError::BadInput(e) => { - write!( - f, - "Failed to time travel recover a prefix due to user input: {e}" - ) - } - TimeTravelError::Unimplemented => write!( - f, - "time travel recovery is not implemented for the current storage backend" - ), - TimeTravelError::Cancelled => write!(f, "Cancelled, shutting down"), - TimeTravelError::TooManyVersions => { - write!(f, "Number of versions/delete markers above limit") - } - TimeTravelError::Other(e) => write!(f, "Failed to time travel recover a prefix: {e:?}"), - } - } -} - -impl std::error::Error for TimeTravelError {} - /// Every storage, currently supported. /// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics. #[derive(Clone)] @@ -354,12 +323,13 @@ impl GenericRemoteStorage> { prefix: Option<&RemotePath>, mode: ListingMode, max_keys: Option, + cancel: &CancellationToken, ) -> anyhow::Result { match self { - Self::LocalFs(s) => s.list(prefix, mode, max_keys).await, - Self::AwsS3(s) => s.list(prefix, mode, max_keys).await, - Self::AzureBlob(s) => s.list(prefix, mode, max_keys).await, - Self::Unreliable(s) => s.list(prefix, mode, max_keys).await, + Self::LocalFs(s) => s.list(prefix, mode, max_keys, cancel).await, + Self::AwsS3(s) => s.list(prefix, mode, max_keys, cancel).await, + Self::AzureBlob(s) => s.list(prefix, mode, max_keys, cancel).await, + Self::Unreliable(s) => s.list(prefix, mode, max_keys, cancel).await, } } @@ -372,12 +342,13 @@ impl GenericRemoteStorage> { &self, folder: Option<&RemotePath>, max_keys: Option, + cancel: &CancellationToken, ) -> Result, DownloadError> { match self { - Self::LocalFs(s) => s.list_files(folder, max_keys).await, - Self::AwsS3(s) => s.list_files(folder, max_keys).await, - Self::AzureBlob(s) => s.list_files(folder, max_keys).await, - Self::Unreliable(s) => s.list_files(folder, max_keys).await, + Self::LocalFs(s) => s.list_files(folder, max_keys, cancel).await, + Self::AwsS3(s) => s.list_files(folder, max_keys, cancel).await, + Self::AzureBlob(s) => s.list_files(folder, max_keys, cancel).await, + Self::Unreliable(s) => s.list_files(folder, max_keys, cancel).await, } } @@ -387,36 +358,43 @@ impl GenericRemoteStorage> { pub async fn list_prefixes( &self, prefix: Option<&RemotePath>, + cancel: &CancellationToken, ) -> Result, DownloadError> { match self { - Self::LocalFs(s) => s.list_prefixes(prefix).await, - Self::AwsS3(s) => s.list_prefixes(prefix).await, - Self::AzureBlob(s) => s.list_prefixes(prefix).await, - Self::Unreliable(s) => s.list_prefixes(prefix).await, + Self::LocalFs(s) => s.list_prefixes(prefix, cancel).await, + Self::AwsS3(s) => s.list_prefixes(prefix, cancel).await, + Self::AzureBlob(s) => s.list_prefixes(prefix, cancel).await, + Self::Unreliable(s) => s.list_prefixes(prefix, cancel).await, } } + /// See [`RemoteStorage::upload`] pub async fn upload( &self, from: impl Stream> + Send + Sync + 'static, data_size_bytes: usize, to: &RemotePath, metadata: Option, + cancel: &CancellationToken, ) -> anyhow::Result<()> { match self { - Self::LocalFs(s) => s.upload(from, data_size_bytes, to, metadata).await, - Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata).await, - Self::AzureBlob(s) => s.upload(from, data_size_bytes, to, metadata).await, - Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata).await, + Self::LocalFs(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await, + Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await, + Self::AzureBlob(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await, + Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await, } } - pub async fn download(&self, from: &RemotePath) -> Result { + pub async fn download( + &self, + from: &RemotePath, + cancel: &CancellationToken, + ) -> Result { match self { - Self::LocalFs(s) => s.download(from).await, - Self::AwsS3(s) => s.download(from).await, - Self::AzureBlob(s) => s.download(from).await, - Self::Unreliable(s) => s.download(from).await, + Self::LocalFs(s) => s.download(from, cancel).await, + Self::AwsS3(s) => s.download(from, cancel).await, + Self::AzureBlob(s) => s.download(from, cancel).await, + Self::Unreliable(s) => s.download(from, cancel).await, } } @@ -425,54 +403,72 @@ impl GenericRemoteStorage> { from: &RemotePath, start_inclusive: u64, end_exclusive: Option, + cancel: &CancellationToken, ) -> Result { match self { Self::LocalFs(s) => { - s.download_byte_range(from, start_inclusive, end_exclusive) + s.download_byte_range(from, start_inclusive, end_exclusive, cancel) .await } Self::AwsS3(s) => { - s.download_byte_range(from, start_inclusive, end_exclusive) + s.download_byte_range(from, start_inclusive, end_exclusive, cancel) .await } Self::AzureBlob(s) => { - s.download_byte_range(from, start_inclusive, end_exclusive) + s.download_byte_range(from, start_inclusive, end_exclusive, cancel) .await } Self::Unreliable(s) => { - s.download_byte_range(from, start_inclusive, end_exclusive) + s.download_byte_range(from, start_inclusive, end_exclusive, cancel) .await } } } - pub async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> { + /// See [`RemoteStorage::delete`] + pub async fn delete( + &self, + path: &RemotePath, + cancel: &CancellationToken, + ) -> anyhow::Result<()> { match self { - Self::LocalFs(s) => s.delete(path).await, - Self::AwsS3(s) => s.delete(path).await, - Self::AzureBlob(s) => s.delete(path).await, - Self::Unreliable(s) => s.delete(path).await, + Self::LocalFs(s) => s.delete(path, cancel).await, + Self::AwsS3(s) => s.delete(path, cancel).await, + Self::AzureBlob(s) => s.delete(path, cancel).await, + Self::Unreliable(s) => s.delete(path, cancel).await, } } - pub async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> { + /// See [`RemoteStorage::delete_objects`] + pub async fn delete_objects( + &self, + paths: &[RemotePath], + cancel: &CancellationToken, + ) -> anyhow::Result<()> { match self { - Self::LocalFs(s) => s.delete_objects(paths).await, - Self::AwsS3(s) => s.delete_objects(paths).await, - Self::AzureBlob(s) => s.delete_objects(paths).await, - Self::Unreliable(s) => s.delete_objects(paths).await, + Self::LocalFs(s) => s.delete_objects(paths, cancel).await, + Self::AwsS3(s) => s.delete_objects(paths, cancel).await, + Self::AzureBlob(s) => s.delete_objects(paths, cancel).await, + Self::Unreliable(s) => s.delete_objects(paths, cancel).await, } } - pub async fn copy_object(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> { + /// See [`RemoteStorage::copy`] + pub async fn copy_object( + &self, + from: &RemotePath, + to: &RemotePath, + cancel: &CancellationToken, + ) -> anyhow::Result<()> { match self { - Self::LocalFs(s) => s.copy(from, to).await, - Self::AwsS3(s) => s.copy(from, to).await, - Self::AzureBlob(s) => s.copy(from, to).await, - Self::Unreliable(s) => s.copy(from, to).await, + Self::LocalFs(s) => s.copy(from, to, cancel).await, + Self::AwsS3(s) => s.copy(from, to, cancel).await, + Self::AzureBlob(s) => s.copy(from, to, cancel).await, + Self::Unreliable(s) => s.copy(from, to, cancel).await, } } + /// See [`RemoteStorage::time_travel_recover`]. pub async fn time_travel_recover( &self, prefix: Option<&RemotePath>, @@ -503,10 +499,11 @@ impl GenericRemoteStorage> { impl GenericRemoteStorage { pub fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result { + let timeout = storage_config.timeout; Ok(match &storage_config.storage { - RemoteStorageKind::LocalFs(root) => { - info!("Using fs root '{root}' as a remote storage"); - Self::LocalFs(LocalFs::new(root.clone())?) + RemoteStorageKind::LocalFs(path) => { + info!("Using fs root '{path}' as a remote storage"); + Self::LocalFs(LocalFs::new(path.clone(), timeout)?) } RemoteStorageKind::AwsS3(s3_config) => { // The profile and access key id are only printed here for debugging purposes, @@ -516,12 +513,12 @@ impl GenericRemoteStorage { std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "".into()); info!("Using s3 bucket '{}' in region '{}' as a remote storage, prefix in bucket: '{:?}', bucket endpoint: '{:?}', profile: {profile}, access_key_id: {access_key_id}", s3_config.bucket_name, s3_config.bucket_region, s3_config.prefix_in_bucket, s3_config.endpoint); - Self::AwsS3(Arc::new(S3Bucket::new(s3_config)?)) + Self::AwsS3(Arc::new(S3Bucket::new(s3_config, timeout)?)) } RemoteStorageKind::AzureContainer(azure_config) => { info!("Using azure container '{}' in region '{}' as a remote storage, prefix in container: '{:?}'", azure_config.container_name, azure_config.container_region, azure_config.prefix_in_container); - Self::AzureBlob(Arc::new(AzureBlobStorage::new(azure_config)?)) + Self::AzureBlob(Arc::new(AzureBlobStorage::new(azure_config, timeout)?)) } }) } @@ -530,18 +527,15 @@ impl GenericRemoteStorage { Self::Unreliable(Arc::new(UnreliableWrapper::new(s, fail_first))) } - /// Takes storage object contents and its size and uploads to remote storage, - /// mapping `from_path` to the corresponding remote object id in the storage. - /// - /// The storage object does not have to be present on the `from_path`, - /// this path is used for the remote object id conversion only. + /// See [`RemoteStorage::upload`], which this method calls with `None` as metadata. pub async fn upload_storage_object( &self, from: impl Stream> + Send + Sync + 'static, from_size_bytes: usize, to: &RemotePath, + cancel: &CancellationToken, ) -> anyhow::Result<()> { - self.upload(from, from_size_bytes, to, None) + self.upload(from, from_size_bytes, to, None, cancel) .await .with_context(|| { format!("Failed to upload data of length {from_size_bytes} to storage path {to:?}") @@ -554,10 +548,11 @@ impl GenericRemoteStorage { &self, byte_range: Option<(u64, Option)>, from: &RemotePath, + cancel: &CancellationToken, ) -> Result { match byte_range { - Some((start, end)) => self.download_byte_range(from, start, end).await, - None => self.download(from).await, + Some((start, end)) => self.download_byte_range(from, start, end, cancel).await, + None => self.download(from, cancel).await, } } } @@ -572,6 +567,9 @@ pub struct StorageMetadata(HashMap); pub struct RemoteStorageConfig { /// The storage connection configuration. pub storage: RemoteStorageKind, + /// A common timeout enforced for all requests after concurrency limiter permit has been + /// acquired. + pub timeout: Duration, } /// A kind of a remote storage to connect to, with its connection configuration. @@ -656,6 +654,8 @@ impl Debug for AzureConfig { } impl RemoteStorageConfig { + pub const DEFAULT_TIMEOUT: Duration = std::time::Duration::from_secs(120); + pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result> { let local_path = toml.get("local_path"); let bucket_name = toml.get("bucket_name"); @@ -685,6 +685,27 @@ impl RemoteStorageConfig { .map(|endpoint| parse_toml_string("endpoint", endpoint)) .transpose()?; + let timeout = toml + .get("timeout") + .map(|timeout| { + timeout + .as_str() + .ok_or_else(|| anyhow::Error::msg("timeout was not a string")) + }) + .transpose() + .and_then(|timeout| { + timeout + .map(humantime::parse_duration) + .transpose() + .map_err(anyhow::Error::new) + }) + .context("parse timeout")? + .unwrap_or(Self::DEFAULT_TIMEOUT); + + if timeout < Duration::from_secs(1) { + bail!("timeout was specified as {timeout:?} which is too low"); + } + let storage = match ( local_path, bucket_name, @@ -746,7 +767,7 @@ impl RemoteStorageConfig { } }; - Ok(Some(RemoteStorageConfig { storage })) + Ok(Some(RemoteStorageConfig { storage, timeout })) } } @@ -842,4 +863,24 @@ mod tests { let err = RemotePath::new(Utf8Path::new("/")).expect_err("Should fail on absolute paths"); assert_eq!(err.to_string(), "Path \"/\" is not relative"); } + + #[test] + fn parse_localfs_config_with_timeout() { + let input = "local_path = '.' +timeout = '5s'"; + + let toml = input.parse::().unwrap(); + + let config = RemoteStorageConfig::from_toml(toml.as_item()) + .unwrap() + .expect("it exists"); + + assert_eq!( + config, + RemoteStorageConfig { + storage: RemoteStorageKind::LocalFs(Utf8PathBuf::from(".")), + timeout: Duration::from_secs(5) + } + ); + } } diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index e88111e8e2..6f847cf9d7 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -5,7 +5,12 @@ //! volume is mounted to the local FS. use std::{ - borrow::Cow, future::Future, io::ErrorKind, num::NonZeroU32, pin::Pin, time::SystemTime, + borrow::Cow, + future::Future, + io::ErrorKind, + num::NonZeroU32, + pin::Pin, + time::{Duration, SystemTime}, }; use anyhow::{bail, ensure, Context}; @@ -20,7 +25,9 @@ use tokio_util::{io::ReaderStream, sync::CancellationToken}; use tracing::*; use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty}; -use crate::{Download, DownloadError, Listing, ListingMode, RemotePath, TimeTravelError}; +use crate::{ + Download, DownloadError, Listing, ListingMode, RemotePath, TimeTravelError, TimeoutOrCancel, +}; use super::{RemoteStorage, StorageMetadata}; @@ -29,12 +36,13 @@ const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp"; #[derive(Debug, Clone)] pub struct LocalFs { storage_root: Utf8PathBuf, + timeout: Duration, } impl LocalFs { /// Attempts to create local FS storage, along with its root directory. /// Storage root will be created (if does not exist) and transformed into an absolute path (if passed as relative). - pub fn new(mut storage_root: Utf8PathBuf) -> anyhow::Result { + pub fn new(mut storage_root: Utf8PathBuf, timeout: Duration) -> anyhow::Result { if !storage_root.exists() { std::fs::create_dir_all(&storage_root).with_context(|| { format!("Failed to create all directories in the given root path {storage_root:?}") @@ -46,7 +54,10 @@ impl LocalFs { })?; } - Ok(Self { storage_root }) + Ok(Self { + storage_root, + timeout, + }) } // mirrors S3Bucket::s3_object_to_relative_path @@ -157,80 +168,14 @@ impl LocalFs { Ok(files) } -} -impl RemoteStorage for LocalFs { - async fn list( - &self, - prefix: Option<&RemotePath>, - mode: ListingMode, - max_keys: Option, - ) -> Result { - let mut result = Listing::default(); - - if let ListingMode::NoDelimiter = mode { - let keys = self - .list_recursive(prefix) - .await - .map_err(DownloadError::Other)?; - - result.keys = keys - .into_iter() - .filter(|k| { - let path = k.with_base(&self.storage_root); - !path.is_dir() - }) - .collect(); - if let Some(max_keys) = max_keys { - result.keys.truncate(max_keys.get() as usize); - } - - return Ok(result); - } - - let path = match prefix { - Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)), - None => Cow::Borrowed(&self.storage_root), - }; - - let prefixes_to_filter = get_all_files(path.as_ref(), false) - .await - .map_err(DownloadError::Other)?; - - // filter out empty directories to mirror s3 behavior. - for prefix in prefixes_to_filter { - if prefix.is_dir() - && is_directory_empty(&prefix) - .await - .map_err(DownloadError::Other)? - { - continue; - } - - let stripped = prefix - .strip_prefix(&self.storage_root) - .context("Failed to strip prefix") - .and_then(RemotePath::new) - .expect( - "We list files for storage root, hence should be able to remote the prefix", - ); - - if prefix.is_dir() { - result.prefixes.push(stripped); - } else { - result.keys.push(stripped); - } - } - - Ok(result) - } - - async fn upload( + async fn upload0( &self, data: impl Stream> + Send + Sync, data_size_bytes: usize, to: &RemotePath, metadata: Option, + cancel: &CancellationToken, ) -> anyhow::Result<()> { let target_file_path = to.with_base(&self.storage_root); create_target_directory(&target_file_path).await?; @@ -265,9 +210,26 @@ impl RemoteStorage for LocalFs { let mut buffer_to_read = data.take(from_size_bytes); // alternatively we could just write the bytes to a file, but local_fs is a testing utility - let bytes_read = io::copy_buf(&mut buffer_to_read, &mut destination) - .await - .with_context(|| { + let copy = io::copy_buf(&mut buffer_to_read, &mut destination); + + let bytes_read = tokio::select! { + biased; + _ = cancel.cancelled() => { + let file = destination.into_inner(); + // wait for the inflight operation(s) to complete so that there could be a next + // attempt right away and our writes are not directed to their file. + file.into_std().await; + + // TODO: leave the temp or not? leaving is probably less racy. enabled truncate at + // least. + fs::remove_file(temp_file_path).await.context("remove temp_file_path after cancellation or timeout")?; + return Err(TimeoutOrCancel::Cancel.into()); + } + read = copy => read, + }; + + let bytes_read = + bytes_read.with_context(|| { format!( "Failed to upload file (write temp) to the local storage at '{temp_file_path}'", ) @@ -299,6 +261,9 @@ impl RemoteStorage for LocalFs { })?; if let Some(storage_metadata) = metadata { + // FIXME: we must not be using metadata much, since this would forget the old metadata + // for new writes? or perhaps metadata is sticky; could consider removing if it's never + // used. let storage_metadata_path = storage_metadata_path(&target_file_path); fs::write( &storage_metadata_path, @@ -315,8 +280,131 @@ impl RemoteStorage for LocalFs { Ok(()) } +} - async fn download(&self, from: &RemotePath) -> Result { +impl RemoteStorage for LocalFs { + async fn list( + &self, + prefix: Option<&RemotePath>, + mode: ListingMode, + max_keys: Option, + cancel: &CancellationToken, + ) -> Result { + let op = async { + let mut result = Listing::default(); + + if let ListingMode::NoDelimiter = mode { + let keys = self + .list_recursive(prefix) + .await + .map_err(DownloadError::Other)?; + + result.keys = keys + .into_iter() + .filter(|k| { + let path = k.with_base(&self.storage_root); + !path.is_dir() + }) + .collect(); + + if let Some(max_keys) = max_keys { + result.keys.truncate(max_keys.get() as usize); + } + + return Ok(result); + } + + let path = match prefix { + Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)), + None => Cow::Borrowed(&self.storage_root), + }; + + let prefixes_to_filter = get_all_files(path.as_ref(), false) + .await + .map_err(DownloadError::Other)?; + + // filter out empty directories to mirror s3 behavior. + for prefix in prefixes_to_filter { + if prefix.is_dir() + && is_directory_empty(&prefix) + .await + .map_err(DownloadError::Other)? + { + continue; + } + + let stripped = prefix + .strip_prefix(&self.storage_root) + .context("Failed to strip prefix") + .and_then(RemotePath::new) + .expect( + "We list files for storage root, hence should be able to remote the prefix", + ); + + if prefix.is_dir() { + result.prefixes.push(stripped); + } else { + result.keys.push(stripped); + } + } + + Ok(result) + }; + + let timeout = async { + tokio::time::sleep(self.timeout).await; + Err(DownloadError::Timeout) + }; + + let cancelled = async { + cancel.cancelled().await; + Err(DownloadError::Cancelled) + }; + + tokio::select! { + res = op => res, + res = timeout => res, + res = cancelled => res, + } + } + + async fn upload( + &self, + data: impl Stream> + Send + Sync, + data_size_bytes: usize, + to: &RemotePath, + metadata: Option, + cancel: &CancellationToken, + ) -> anyhow::Result<()> { + let cancel = cancel.child_token(); + + let op = self.upload0(data, data_size_bytes, to, metadata, &cancel); + let mut op = std::pin::pin!(op); + + // race the upload0 to the timeout; if it goes over, do a graceful shutdown + let (res, timeout) = tokio::select! { + res = &mut op => (res, false), + _ = tokio::time::sleep(self.timeout) => { + cancel.cancel(); + (op.await, true) + } + }; + + match res { + Err(e) if timeout && TimeoutOrCancel::caused_by_cancel(&e) => { + // we caused this cancel (or they happened simultaneously) -- swap it out to + // Timeout + Err(TimeoutOrCancel::Timeout.into()) + } + res => res, + } + } + + async fn download( + &self, + from: &RemotePath, + cancel: &CancellationToken, + ) -> Result { let target_path = from.with_base(&self.storage_root); if file_exists(&target_path).map_err(DownloadError::BadInput)? { let source = ReaderStream::new( @@ -334,6 +422,10 @@ impl RemoteStorage for LocalFs { .read_storage_metadata(&target_path) .await .map_err(DownloadError::Other)?; + + let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone()); + let source = crate::support::DownloadStream::new(cancel_or_timeout, source); + Ok(Download { metadata, last_modified: None, @@ -350,6 +442,7 @@ impl RemoteStorage for LocalFs { from: &RemotePath, start_inclusive: u64, end_exclusive: Option, + cancel: &CancellationToken, ) -> Result { if let Some(end_exclusive) = end_exclusive { if end_exclusive <= start_inclusive { @@ -391,6 +484,9 @@ impl RemoteStorage for LocalFs { let source = source.take(end_exclusive.unwrap_or(len) - start_inclusive); let source = ReaderStream::new(source); + let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone()); + let source = crate::support::DownloadStream::new(cancel_or_timeout, source); + Ok(Download { metadata, last_modified: None, @@ -402,7 +498,7 @@ impl RemoteStorage for LocalFs { } } - async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> { + async fn delete(&self, path: &RemotePath, _cancel: &CancellationToken) -> anyhow::Result<()> { let file_path = path.with_base(&self.storage_root); match fs::remove_file(&file_path).await { Ok(()) => Ok(()), @@ -414,14 +510,23 @@ impl RemoteStorage for LocalFs { } } - async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> { + async fn delete_objects<'a>( + &self, + paths: &'a [RemotePath], + cancel: &CancellationToken, + ) -> anyhow::Result<()> { for path in paths { - self.delete(path).await? + self.delete(path, cancel).await? } Ok(()) } - async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> { + async fn copy( + &self, + from: &RemotePath, + to: &RemotePath, + _cancel: &CancellationToken, + ) -> anyhow::Result<()> { let from_path = from.with_base(&self.storage_root); let to_path = to.with_base(&self.storage_root); create_target_directory(&to_path).await?; @@ -528,8 +633,9 @@ mod fs_tests { remote_storage_path: &RemotePath, expected_metadata: Option<&StorageMetadata>, ) -> anyhow::Result { + let cancel = CancellationToken::new(); let download = storage - .download(remote_storage_path) + .download(remote_storage_path, &cancel) .await .map_err(|e| anyhow::anyhow!("Download failed: {e}"))?; ensure!( @@ -544,16 +650,16 @@ mod fs_tests { #[tokio::test] async fn upload_file() -> anyhow::Result<()> { - let storage = create_storage()?; + let (storage, cancel) = create_storage()?; - let target_path_1 = upload_dummy_file(&storage, "upload_1", None).await?; + let target_path_1 = upload_dummy_file(&storage, "upload_1", None, &cancel).await?; assert_eq!( storage.list_all().await?, vec![target_path_1.clone()], "Should list a single file after first upload" ); - let target_path_2 = upload_dummy_file(&storage, "upload_2", None).await?; + let target_path_2 = upload_dummy_file(&storage, "upload_2", None, &cancel).await?; assert_eq!( list_files_sorted(&storage).await?, vec![target_path_1.clone(), target_path_2.clone()], @@ -565,7 +671,7 @@ mod fs_tests { #[tokio::test] async fn upload_file_negatives() -> anyhow::Result<()> { - let storage = create_storage()?; + let (storage, cancel) = create_storage()?; let id = RemotePath::new(Utf8Path::new("dummy"))?; let content = Bytes::from_static(b"12345"); @@ -574,34 +680,34 @@ mod fs_tests { // Check that you get an error if the size parameter doesn't match the actual // size of the stream. storage - .upload(content(), 0, &id, None) + .upload(content(), 0, &id, None, &cancel) .await .expect_err("upload with zero size succeeded"); storage - .upload(content(), 4, &id, None) + .upload(content(), 4, &id, None, &cancel) .await .expect_err("upload with too short size succeeded"); storage - .upload(content(), 6, &id, None) + .upload(content(), 6, &id, None, &cancel) .await .expect_err("upload with too large size succeeded"); // Correct size is 5, this should succeed. - storage.upload(content(), 5, &id, None).await?; + storage.upload(content(), 5, &id, None, &cancel).await?; Ok(()) } - fn create_storage() -> anyhow::Result { + fn create_storage() -> anyhow::Result<(LocalFs, CancellationToken)> { let storage_root = tempdir()?.path().to_path_buf(); - LocalFs::new(storage_root) + LocalFs::new(storage_root, Duration::from_secs(120)).map(|s| (s, CancellationToken::new())) } #[tokio::test] async fn download_file() -> anyhow::Result<()> { - let storage = create_storage()?; + let (storage, cancel) = create_storage()?; let upload_name = "upload_1"; - let upload_target = upload_dummy_file(&storage, upload_name, None).await?; + let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?; let contents = read_and_check_metadata(&storage, &upload_target, None).await?; assert_eq!( @@ -611,7 +717,7 @@ mod fs_tests { ); let non_existing_path = "somewhere/else"; - match storage.download(&RemotePath::new(Utf8Path::new(non_existing_path))?).await { + match storage.download(&RemotePath::new(Utf8Path::new(non_existing_path))?, &cancel).await { Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"), } @@ -620,9 +726,9 @@ mod fs_tests { #[tokio::test] async fn download_file_range_positive() -> anyhow::Result<()> { - let storage = create_storage()?; + let (storage, cancel) = create_storage()?; let upload_name = "upload_1"; - let upload_target = upload_dummy_file(&storage, upload_name, None).await?; + let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?; let full_range_download_contents = read_and_check_metadata(&storage, &upload_target, None).await?; @@ -636,7 +742,12 @@ mod fs_tests { let (first_part_local, second_part_local) = uploaded_bytes.split_at(3); let first_part_download = storage - .download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64)) + .download_byte_range( + &upload_target, + 0, + Some(first_part_local.len() as u64), + &cancel, + ) .await?; assert!( first_part_download.metadata.is_none(), @@ -654,6 +765,7 @@ mod fs_tests { &upload_target, first_part_local.len() as u64, Some((first_part_local.len() + second_part_local.len()) as u64), + &cancel, ) .await?; assert!( @@ -668,7 +780,7 @@ mod fs_tests { ); let suffix_bytes = storage - .download_byte_range(&upload_target, 13, None) + .download_byte_range(&upload_target, 13, None, &cancel) .await? .download_stream; let suffix_bytes = aggregate(suffix_bytes).await?; @@ -676,7 +788,7 @@ mod fs_tests { assert_eq!(upload_name, suffix); let all_bytes = storage - .download_byte_range(&upload_target, 0, None) + .download_byte_range(&upload_target, 0, None, &cancel) .await? .download_stream; let all_bytes = aggregate(all_bytes).await?; @@ -688,9 +800,9 @@ mod fs_tests { #[tokio::test] async fn download_file_range_negative() -> anyhow::Result<()> { - let storage = create_storage()?; + let (storage, cancel) = create_storage()?; let upload_name = "upload_1"; - let upload_target = upload_dummy_file(&storage, upload_name, None).await?; + let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?; let start = 1_000_000_000; let end = start + 1; @@ -699,6 +811,7 @@ mod fs_tests { &upload_target, start, Some(end), // exclusive end + &cancel, ) .await { @@ -715,7 +828,7 @@ mod fs_tests { let end = 234; assert!(start > end, "Should test an incorrect range"); match storage - .download_byte_range(&upload_target, start, Some(end)) + .download_byte_range(&upload_target, start, Some(end), &cancel) .await { Ok(_) => panic!("Should not allow downloading wrong ranges"), @@ -732,15 +845,15 @@ mod fs_tests { #[tokio::test] async fn delete_file() -> anyhow::Result<()> { - let storage = create_storage()?; + let (storage, cancel) = create_storage()?; let upload_name = "upload_1"; - let upload_target = upload_dummy_file(&storage, upload_name, None).await?; + let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?; - storage.delete(&upload_target).await?; + storage.delete(&upload_target, &cancel).await?; assert!(storage.list_all().await?.is_empty()); storage - .delete(&upload_target) + .delete(&upload_target, &cancel) .await .expect("Should allow deleting non-existing storage files"); @@ -749,14 +862,14 @@ mod fs_tests { #[tokio::test] async fn file_with_metadata() -> anyhow::Result<()> { - let storage = create_storage()?; + let (storage, cancel) = create_storage()?; let upload_name = "upload_1"; let metadata = StorageMetadata(HashMap::from([ ("one".to_string(), "1".to_string()), ("two".to_string(), "2".to_string()), ])); let upload_target = - upload_dummy_file(&storage, upload_name, Some(metadata.clone())).await?; + upload_dummy_file(&storage, upload_name, Some(metadata.clone()), &cancel).await?; let full_range_download_contents = read_and_check_metadata(&storage, &upload_target, Some(&metadata)).await?; @@ -770,7 +883,12 @@ mod fs_tests { let (first_part_local, _) = uploaded_bytes.split_at(3); let partial_download_with_metadata = storage - .download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64)) + .download_byte_range( + &upload_target, + 0, + Some(first_part_local.len() as u64), + &cancel, + ) .await?; let first_part_remote = aggregate(partial_download_with_metadata.download_stream).await?; assert_eq!( @@ -791,16 +909,20 @@ mod fs_tests { #[tokio::test] async fn list() -> anyhow::Result<()> { // No delimiter: should recursively list everything - let storage = create_storage()?; - let child = upload_dummy_file(&storage, "grandparent/parent/child", None).await?; - let uncle = upload_dummy_file(&storage, "grandparent/uncle", None).await?; + let (storage, cancel) = create_storage()?; + let child = upload_dummy_file(&storage, "grandparent/parent/child", None, &cancel).await?; + let uncle = upload_dummy_file(&storage, "grandparent/uncle", None, &cancel).await?; - let listing = storage.list(None, ListingMode::NoDelimiter, None).await?; + let listing = storage + .list(None, ListingMode::NoDelimiter, None, &cancel) + .await?; assert!(listing.prefixes.is_empty()); assert_eq!(listing.keys, [uncle.clone(), child.clone()].to_vec()); // Delimiter: should only go one deep - let listing = storage.list(None, ListingMode::WithDelimiter, None).await?; + let listing = storage + .list(None, ListingMode::WithDelimiter, None, &cancel) + .await?; assert_eq!( listing.prefixes, @@ -814,6 +936,7 @@ mod fs_tests { Some(&RemotePath::from_string("timelines/some_timeline/grandparent").unwrap()), ListingMode::WithDelimiter, None, + &cancel, ) .await?; assert_eq!( @@ -826,10 +949,75 @@ mod fs_tests { Ok(()) } + #[tokio::test] + async fn overwrite_shorter_file() -> anyhow::Result<()> { + let (storage, cancel) = create_storage()?; + + let path = RemotePath::new("does/not/matter/file".into())?; + + let body = Bytes::from_static(b"long file contents is long"); + { + let len = body.len(); + let body = + futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone()))); + storage.upload(body, len, &path, None, &cancel).await?; + } + + let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?; + assert_eq!(body, read); + + let shorter = Bytes::from_static(b"shorter body"); + { + let len = shorter.len(); + let body = + futures::stream::once(futures::future::ready(std::io::Result::Ok(shorter.clone()))); + storage.upload(body, len, &path, None, &cancel).await?; + } + + let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?; + assert_eq!(shorter, read); + Ok(()) + } + + #[tokio::test] + async fn cancelled_upload_can_later_be_retried() -> anyhow::Result<()> { + let (storage, cancel) = create_storage()?; + + let path = RemotePath::new("does/not/matter/file".into())?; + + let body = Bytes::from_static(b"long file contents is long"); + { + let len = body.len(); + let body = + futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone()))); + let cancel = cancel.child_token(); + cancel.cancel(); + let e = storage + .upload(body, len, &path, None, &cancel) + .await + .unwrap_err(); + + assert!(TimeoutOrCancel::caused_by_cancel(&e)); + } + + { + let len = body.len(); + let body = + futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone()))); + storage.upload(body, len, &path, None, &cancel).await?; + } + + let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?; + assert_eq!(body, read); + + Ok(()) + } + async fn upload_dummy_file( storage: &LocalFs, name: &str, metadata: Option, + cancel: &CancellationToken, ) -> anyhow::Result { let from_path = storage .storage_root @@ -851,7 +1039,9 @@ mod fs_tests { let file = tokio_util::io::ReaderStream::new(file); - storage.upload(file, size, &relative_path, metadata).await?; + storage + .upload(file, size, &relative_path, metadata, cancel) + .await?; Ok(relative_path) } diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index dee5750cac..af70dc7ca2 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -11,7 +11,7 @@ use std::{ pin::Pin, sync::Arc, task::{Context, Poll}, - time::SystemTime, + time::{Duration, SystemTime}, }; use anyhow::{anyhow, Context as _}; @@ -46,9 +46,9 @@ use utils::backoff; use super::StorageMetadata; use crate::{ - support::PermitCarrying, ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, - RemotePath, RemoteStorage, S3Config, TimeTravelError, MAX_KEYS_PER_DELETE, - REMOTE_STORAGE_PREFIX_SEPARATOR, + error::Cancelled, support::PermitCarrying, ConcurrencyLimiter, Download, DownloadError, + Listing, ListingMode, RemotePath, RemoteStorage, S3Config, TimeTravelError, TimeoutOrCancel, + MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR, }; pub(super) mod metrics; @@ -63,6 +63,8 @@ pub struct S3Bucket { prefix_in_bucket: Option, max_keys_per_list_response: Option, concurrency_limiter: ConcurrencyLimiter, + // Per-request timeout. Accessible for tests. + pub timeout: Duration, } struct GetObjectRequest { @@ -72,7 +74,7 @@ struct GetObjectRequest { } impl S3Bucket { /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided. - pub fn new(aws_config: &S3Config) -> anyhow::Result { + pub fn new(aws_config: &S3Config, timeout: Duration) -> anyhow::Result { tracing::debug!( "Creating s3 remote storage for S3 bucket {}", aws_config.bucket_name @@ -152,6 +154,7 @@ impl S3Bucket { max_keys_per_list_response: aws_config.max_keys_per_list_response, prefix_in_bucket, concurrency_limiter: ConcurrencyLimiter::new(aws_config.concurrency_limit.get()), + timeout, }) } @@ -185,40 +188,55 @@ impl S3Bucket { } } - async fn permit(&self, kind: RequestKind) -> tokio::sync::SemaphorePermit<'_> { + async fn permit( + &self, + kind: RequestKind, + cancel: &CancellationToken, + ) -> Result, Cancelled> { let started_at = start_counting_cancelled_wait(kind); - let permit = self - .concurrency_limiter - .acquire(kind) - .await - .expect("semaphore is never closed"); + let acquire = self.concurrency_limiter.acquire(kind); + + let permit = tokio::select! { + permit = acquire => permit.expect("semaphore is never closed"), + _ = cancel.cancelled() => return Err(Cancelled), + }; let started_at = ScopeGuard::into_inner(started_at); metrics::BUCKET_METRICS .wait_seconds .observe_elapsed(kind, started_at); - permit + Ok(permit) } - async fn owned_permit(&self, kind: RequestKind) -> tokio::sync::OwnedSemaphorePermit { + async fn owned_permit( + &self, + kind: RequestKind, + cancel: &CancellationToken, + ) -> Result { let started_at = start_counting_cancelled_wait(kind); - let permit = self - .concurrency_limiter - .acquire_owned(kind) - .await - .expect("semaphore is never closed"); + let acquire = self.concurrency_limiter.acquire_owned(kind); + + let permit = tokio::select! { + permit = acquire => permit.expect("semaphore is never closed"), + _ = cancel.cancelled() => return Err(Cancelled), + }; let started_at = ScopeGuard::into_inner(started_at); metrics::BUCKET_METRICS .wait_seconds .observe_elapsed(kind, started_at); - permit + Ok(permit) } - async fn download_object(&self, request: GetObjectRequest) -> Result { + async fn download_object( + &self, + request: GetObjectRequest, + cancel: &CancellationToken, + ) -> Result { let kind = RequestKind::Get; - let permit = self.owned_permit(kind).await; + + let permit = self.owned_permit(kind, cancel).await?; let started_at = start_measuring_requests(kind); @@ -228,8 +246,13 @@ impl S3Bucket { .bucket(request.bucket) .key(request.key) .set_range(request.range) - .send() - .await; + .send(); + + let get_object = tokio::select! { + res = get_object => res, + _ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout), + _ = cancel.cancelled() => return Err(DownloadError::Cancelled), + }; let started_at = ScopeGuard::into_inner(started_at); @@ -259,6 +282,10 @@ impl S3Bucket { } }; + // even if we would have no timeout left, continue anyways. the caller can decide to ignore + // the errors considering timeouts and cancellation. + let remaining = self.timeout.saturating_sub(started_at.elapsed()); + let metadata = object_output.metadata().cloned().map(StorageMetadata); let etag = object_output.e_tag; let last_modified = object_output.last_modified.and_then(|t| t.try_into().ok()); @@ -268,6 +295,9 @@ impl S3Bucket { let body = PermitCarrying::new(permit, body); let body = TimedDownload::new(started_at, body); + let cancel_or_timeout = crate::support::cancel_or_timeout(remaining, cancel.clone()); + let body = crate::support::DownloadStream::new(cancel_or_timeout, body); + Ok(Download { metadata, etag, @@ -278,33 +308,44 @@ impl S3Bucket { async fn delete_oids( &self, - kind: RequestKind, + _permit: &tokio::sync::SemaphorePermit<'_>, delete_objects: &[ObjectIdentifier], + cancel: &CancellationToken, ) -> anyhow::Result<()> { + let kind = RequestKind::Delete; + let mut cancel = std::pin::pin!(cancel.cancelled()); + for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) { let started_at = start_measuring_requests(kind); - let resp = self + let req = self .client .delete_objects() .bucket(self.bucket_name.clone()) .delete( Delete::builder() .set_objects(Some(chunk.to_vec())) - .build()?, + .build() + .context("build request")?, ) - .send() - .await; + .send(); + + let resp = tokio::select! { + resp = req => resp, + _ = tokio::time::sleep(self.timeout) => return Err(TimeoutOrCancel::Timeout.into()), + _ = &mut cancel => return Err(TimeoutOrCancel::Cancel.into()), + }; let started_at = ScopeGuard::into_inner(started_at); metrics::BUCKET_METRICS .req_seconds .observe_elapsed(kind, &resp, started_at); - let resp = resp?; + let resp = resp.context("request deletion")?; metrics::BUCKET_METRICS .deleted_objects_total .inc_by(chunk.len() as u64); + if let Some(errors) = resp.errors { // Log a bounded number of the errors within the response: // these requests can carry 1000 keys so logging each one @@ -320,9 +361,10 @@ impl S3Bucket { ); } - return Err(anyhow::format_err!( - "Failed to delete {} objects", - errors.len() + return Err(anyhow::anyhow!( + "Failed to delete {}/{} objects", + errors.len(), + chunk.len(), )); } } @@ -410,6 +452,7 @@ impl RemoteStorage for S3Bucket { prefix: Option<&RemotePath>, mode: ListingMode, max_keys: Option, + cancel: &CancellationToken, ) -> Result { let kind = RequestKind::List; // s3 sdk wants i32 @@ -431,10 +474,11 @@ impl RemoteStorage for S3Bucket { p }); + let _permit = self.permit(kind, cancel).await?; + let mut continuation_token = None; loop { - let _guard = self.permit(kind).await; let started_at = start_measuring_requests(kind); // min of two Options, returning Some if one is value and another is @@ -456,9 +500,15 @@ impl RemoteStorage for S3Bucket { request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string()); } - let response = request - .send() - .await + let request = request.send(); + + let response = tokio::select! { + res = request => res, + _ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout), + _ = cancel.cancelled() => return Err(DownloadError::Cancelled), + }; + + let response = response .context("Failed to list S3 prefixes") .map_err(DownloadError::Other); @@ -511,16 +561,17 @@ impl RemoteStorage for S3Bucket { from_size_bytes: usize, to: &RemotePath, metadata: Option, + cancel: &CancellationToken, ) -> anyhow::Result<()> { let kind = RequestKind::Put; - let _guard = self.permit(kind).await; + let _permit = self.permit(kind, cancel).await?; let started_at = start_measuring_requests(kind); let body = Body::wrap_stream(from); let bytes_stream = ByteStream::new(SdkBody::from_body_0_4(body)); - let res = self + let upload = self .client .put_object() .bucket(self.bucket_name.clone()) @@ -528,22 +579,40 @@ impl RemoteStorage for S3Bucket { .set_metadata(metadata.map(|m| m.0)) .content_length(from_size_bytes.try_into()?) .body(bytes_stream) - .send() - .await; + .send(); - let started_at = ScopeGuard::into_inner(started_at); - metrics::BUCKET_METRICS - .req_seconds - .observe_elapsed(kind, &res, started_at); + let upload = tokio::time::timeout(self.timeout, upload); - res?; + let res = tokio::select! { + res = upload => res, + _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()), + }; - Ok(()) + if let Ok(inner) = &res { + // do not incl. timeouts as errors in metrics but cancellations + let started_at = ScopeGuard::into_inner(started_at); + metrics::BUCKET_METRICS + .req_seconds + .observe_elapsed(kind, inner, started_at); + } + + match res { + Ok(Ok(_put)) => Ok(()), + Ok(Err(sdk)) => Err(sdk.into()), + Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()), + } } - async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> { + async fn copy( + &self, + from: &RemotePath, + to: &RemotePath, + cancel: &CancellationToken, + ) -> anyhow::Result<()> { let kind = RequestKind::Copy; - let _guard = self.permit(kind).await; + let _permit = self.permit(kind, cancel).await?; + + let timeout = tokio::time::sleep(self.timeout); let started_at = start_measuring_requests(kind); @@ -554,14 +623,19 @@ impl RemoteStorage for S3Bucket { self.relative_path_to_s3_object(from) ); - let res = self + let op = self .client .copy_object() .bucket(self.bucket_name.clone()) .key(self.relative_path_to_s3_object(to)) .copy_source(copy_source) - .send() - .await; + .send(); + + let res = tokio::select! { + res = op => res, + _ = timeout => return Err(TimeoutOrCancel::Timeout.into()), + _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()), + }; let started_at = ScopeGuard::into_inner(started_at); metrics::BUCKET_METRICS @@ -573,14 +647,21 @@ impl RemoteStorage for S3Bucket { Ok(()) } - async fn download(&self, from: &RemotePath) -> Result { + async fn download( + &self, + from: &RemotePath, + cancel: &CancellationToken, + ) -> Result { // if prefix is not none then download file `prefix/from` // if prefix is none then download file `from` - self.download_object(GetObjectRequest { - bucket: self.bucket_name.clone(), - key: self.relative_path_to_s3_object(from), - range: None, - }) + self.download_object( + GetObjectRequest { + bucket: self.bucket_name.clone(), + key: self.relative_path_to_s3_object(from), + range: None, + }, + cancel, + ) .await } @@ -589,6 +670,7 @@ impl RemoteStorage for S3Bucket { from: &RemotePath, start_inclusive: u64, end_exclusive: Option, + cancel: &CancellationToken, ) -> Result { // S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35 // and needs both ends to be exclusive @@ -598,31 +680,39 @@ impl RemoteStorage for S3Bucket { None => format!("bytes={start_inclusive}-"), }); - self.download_object(GetObjectRequest { - bucket: self.bucket_name.clone(), - key: self.relative_path_to_s3_object(from), - range, - }) + self.download_object( + GetObjectRequest { + bucket: self.bucket_name.clone(), + key: self.relative_path_to_s3_object(from), + range, + }, + cancel, + ) .await } - async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> { - let kind = RequestKind::Delete; - let _guard = self.permit(kind).await; + async fn delete_objects<'a>( + &self, + paths: &'a [RemotePath], + cancel: &CancellationToken, + ) -> anyhow::Result<()> { + let kind = RequestKind::Delete; + let permit = self.permit(kind, cancel).await?; let mut delete_objects = Vec::with_capacity(paths.len()); for path in paths { let obj_id = ObjectIdentifier::builder() .set_key(Some(self.relative_path_to_s3_object(path))) - .build()?; + .build() + .context("convert path to oid")?; delete_objects.push(obj_id); } - self.delete_oids(kind, &delete_objects).await + self.delete_oids(&permit, &delete_objects, cancel).await } - async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> { + async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> { let paths = std::array::from_ref(path); - self.delete_objects(paths).await + self.delete_objects(paths, cancel).await } async fn time_travel_recover( @@ -633,7 +723,7 @@ impl RemoteStorage for S3Bucket { cancel: &CancellationToken, ) -> Result<(), TimeTravelError> { let kind = RequestKind::TimeTravel; - let _guard = self.permit(kind).await; + let permit = self.permit(kind, cancel).await?; let timestamp = DateTime::from(timestamp); let done_if_after = DateTime::from(done_if_after); @@ -647,7 +737,7 @@ impl RemoteStorage for S3Bucket { let warn_threshold = 3; let max_retries = 10; - let is_permanent = |_e: &_| false; + let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled); let mut key_marker = None; let mut version_id_marker = None; @@ -656,15 +746,19 @@ impl RemoteStorage for S3Bucket { loop { let response = backoff::retry( || async { - self.client + let op = self + .client .list_object_versions() .bucket(self.bucket_name.clone()) .set_prefix(prefix.clone()) .set_key_marker(key_marker.clone()) .set_version_id_marker(version_id_marker.clone()) - .send() - .await - .map_err(|e| TimeTravelError::Other(e.into())) + .send(); + + tokio::select! { + res = op => res.map_err(|e| TimeTravelError::Other(e.into())), + _ = cancel.cancelled() => Err(TimeTravelError::Cancelled), + } }, is_permanent, warn_threshold, @@ -786,14 +880,18 @@ impl RemoteStorage for S3Bucket { backoff::retry( || async { - self.client + let op = self + .client .copy_object() .bucket(self.bucket_name.clone()) .key(key) .copy_source(&source_id) - .send() - .await - .map_err(|e| TimeTravelError::Other(e.into())) + .send(); + + tokio::select! { + res = op => res.map_err(|e| TimeTravelError::Other(e.into())), + _ = cancel.cancelled() => Err(TimeTravelError::Cancelled), + } }, is_permanent, warn_threshold, @@ -824,10 +922,18 @@ impl RemoteStorage for S3Bucket { let oid = ObjectIdentifier::builder() .key(key.to_owned()) .build() - .map_err(|e| TimeTravelError::Other(anyhow::Error::new(e)))?; - self.delete_oids(kind, &[oid]) + .map_err(|e| TimeTravelError::Other(e.into()))?; + + self.delete_oids(&permit, &[oid], cancel) .await - .map_err(TimeTravelError::Other)?; + .map_err(|e| { + // delete_oid0 will use TimeoutOrCancel + if TimeoutOrCancel::caused_by_cancel(&e) { + TimeTravelError::Cancelled + } else { + TimeTravelError::Other(e) + } + })?; } } } @@ -963,7 +1069,8 @@ mod tests { concurrency_limit: NonZeroUsize::new(100).unwrap(), max_keys_per_list_response: Some(5), }; - let storage = S3Bucket::new(&config).expect("remote storage init"); + let storage = + S3Bucket::new(&config, std::time::Duration::ZERO).expect("remote storage init"); for (test_path_idx, test_path) in all_paths.iter().enumerate() { let result = storage.relative_path_to_s3_object(test_path); let expected = expected_outputs[prefix_idx][test_path_idx]; diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index 3dfa16b64e..f5344d3ae2 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -90,11 +90,16 @@ impl UnreliableWrapper { } } - async fn delete_inner(&self, path: &RemotePath, attempt: bool) -> anyhow::Result<()> { + async fn delete_inner( + &self, + path: &RemotePath, + attempt: bool, + cancel: &CancellationToken, + ) -> anyhow::Result<()> { if attempt { self.attempt(RemoteOp::Delete(path.clone()))?; } - self.inner.delete(path).await + self.inner.delete(path, cancel).await } } @@ -105,20 +110,22 @@ impl RemoteStorage for UnreliableWrapper { async fn list_prefixes( &self, prefix: Option<&RemotePath>, + cancel: &CancellationToken, ) -> Result, DownloadError> { self.attempt(RemoteOp::ListPrefixes(prefix.cloned())) .map_err(DownloadError::Other)?; - self.inner.list_prefixes(prefix).await + self.inner.list_prefixes(prefix, cancel).await } async fn list_files( &self, folder: Option<&RemotePath>, max_keys: Option, + cancel: &CancellationToken, ) -> Result, DownloadError> { self.attempt(RemoteOp::ListPrefixes(folder.cloned())) .map_err(DownloadError::Other)?; - self.inner.list_files(folder, max_keys).await + self.inner.list_files(folder, max_keys, cancel).await } async fn list( @@ -126,10 +133,11 @@ impl RemoteStorage for UnreliableWrapper { prefix: Option<&RemotePath>, mode: ListingMode, max_keys: Option, + cancel: &CancellationToken, ) -> Result { self.attempt(RemoteOp::ListPrefixes(prefix.cloned())) .map_err(DownloadError::Other)?; - self.inner.list(prefix, mode, max_keys).await + self.inner.list(prefix, mode, max_keys, cancel).await } async fn upload( @@ -140,15 +148,22 @@ impl RemoteStorage for UnreliableWrapper { data_size_bytes: usize, to: &RemotePath, metadata: Option, + cancel: &CancellationToken, ) -> anyhow::Result<()> { self.attempt(RemoteOp::Upload(to.clone()))?; - self.inner.upload(data, data_size_bytes, to, metadata).await + self.inner + .upload(data, data_size_bytes, to, metadata, cancel) + .await } - async fn download(&self, from: &RemotePath) -> Result { + async fn download( + &self, + from: &RemotePath, + cancel: &CancellationToken, + ) -> Result { self.attempt(RemoteOp::Download(from.clone())) .map_err(DownloadError::Other)?; - self.inner.download(from).await + self.inner.download(from, cancel).await } async fn download_byte_range( @@ -156,6 +171,7 @@ impl RemoteStorage for UnreliableWrapper { from: &RemotePath, start_inclusive: u64, end_exclusive: Option, + cancel: &CancellationToken, ) -> Result { // Note: We treat any download_byte_range as an "attempt" of the same // operation. We don't pay attention to the ranges. That's good enough @@ -163,20 +179,24 @@ impl RemoteStorage for UnreliableWrapper { self.attempt(RemoteOp::Download(from.clone())) .map_err(DownloadError::Other)?; self.inner - .download_byte_range(from, start_inclusive, end_exclusive) + .download_byte_range(from, start_inclusive, end_exclusive, cancel) .await } - async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> { - self.delete_inner(path, true).await + async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> { + self.delete_inner(path, true, cancel).await } - async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> { + async fn delete_objects<'a>( + &self, + paths: &'a [RemotePath], + cancel: &CancellationToken, + ) -> anyhow::Result<()> { self.attempt(RemoteOp::DeleteObjects(paths.to_vec()))?; let mut error_counter = 0; for path in paths { // Dont record attempt because it was already recorded above - if (self.delete_inner(path, false).await).is_err() { + if (self.delete_inner(path, false, cancel).await).is_err() { error_counter += 1; } } @@ -189,11 +209,16 @@ impl RemoteStorage for UnreliableWrapper { Ok(()) } - async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> { + async fn copy( + &self, + from: &RemotePath, + to: &RemotePath, + cancel: &CancellationToken, + ) -> anyhow::Result<()> { // copy is equivalent to download + upload self.attempt(RemoteOp::Download(from.clone()))?; self.attempt(RemoteOp::Upload(to.clone()))?; - self.inner.copy_object(from, to).await + self.inner.copy_object(from, to, cancel).await } async fn time_travel_recover( diff --git a/libs/remote_storage/src/support.rs b/libs/remote_storage/src/support.rs index 4688a484a5..20f193c6c8 100644 --- a/libs/remote_storage/src/support.rs +++ b/libs/remote_storage/src/support.rs @@ -1,9 +1,15 @@ use std::{ + future::Future, pin::Pin, task::{Context, Poll}, + time::Duration, }; +use bytes::Bytes; use futures_util::Stream; +use tokio_util::sync::CancellationToken; + +use crate::TimeoutOrCancel; pin_project_lite::pin_project! { /// An `AsyncRead` adapter which carries a permit for the lifetime of the value. @@ -31,3 +37,133 @@ impl Stream for PermitCarrying { self.inner.size_hint() } } + +pin_project_lite::pin_project! { + pub(crate) struct DownloadStream { + hit: bool, + #[pin] + cancellation: F, + #[pin] + inner: S, + } +} + +impl DownloadStream { + pub(crate) fn new(cancellation: F, inner: S) -> Self { + Self { + cancellation, + hit: false, + inner, + } + } +} + +/// See documentation on [`crate::DownloadStream`] on rationale why `std::io::Error` is used. +impl Stream for DownloadStream +where + std::io::Error: From, + F: Future, + S: Stream>, +{ + type Item = ::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + + if !*this.hit { + if let Poll::Ready(e) = this.cancellation.poll(cx) { + *this.hit = true; + let e = Err(std::io::Error::from(e)); + return Poll::Ready(Some(e)); + } + } + + this.inner.poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } +} + +/// Fires only on the first cancel or timeout, not on both. +pub(crate) async fn cancel_or_timeout( + timeout: Duration, + cancel: CancellationToken, +) -> TimeoutOrCancel { + tokio::select! { + _ = tokio::time::sleep(timeout) => TimeoutOrCancel::Timeout, + _ = cancel.cancelled() => TimeoutOrCancel::Cancel, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::DownloadError; + use futures::stream::StreamExt; + + #[tokio::test(start_paused = true)] + async fn cancelled_download_stream() { + let inner = futures::stream::pending(); + let timeout = Duration::from_secs(120); + let cancel = CancellationToken::new(); + + let stream = DownloadStream::new(cancel_or_timeout(timeout, cancel.clone()), inner); + let mut stream = std::pin::pin!(stream); + + let mut first = stream.next(); + + tokio::select! { + _ = &mut first => unreachable!("we haven't yet cancelled nor is timeout passed"), + _ = tokio::time::sleep(Duration::from_secs(1)) => {}, + } + + cancel.cancel(); + + let e = first.await.expect("there must be some").unwrap_err(); + assert!(matches!(e.kind(), std::io::ErrorKind::Other), "{e:?}"); + let inner = e.get_ref().expect("inner should be set"); + assert!( + inner + .downcast_ref::() + .is_some_and(|e| matches!(e, DownloadError::Cancelled)), + "{inner:?}" + ); + + tokio::select! { + _ = stream.next() => unreachable!("no timeout ever happens as we were already cancelled"), + _ = tokio::time::sleep(Duration::from_secs(121)) => {}, + } + } + + #[tokio::test(start_paused = true)] + async fn timeouted_download_stream() { + let inner = futures::stream::pending(); + let timeout = Duration::from_secs(120); + let cancel = CancellationToken::new(); + + let stream = DownloadStream::new(cancel_or_timeout(timeout, cancel.clone()), inner); + let mut stream = std::pin::pin!(stream); + + // because the stream uses 120s timeout we are paused, we advance to 120s right away. + let first = stream.next(); + + let e = first.await.expect("there must be some").unwrap_err(); + assert!(matches!(e.kind(), std::io::ErrorKind::Other), "{e:?}"); + let inner = e.get_ref().expect("inner should be set"); + assert!( + inner + .downcast_ref::() + .is_some_and(|e| matches!(e, DownloadError::Timeout)), + "{inner:?}" + ); + + cancel.cancel(); + + tokio::select! { + _ = stream.next() => unreachable!("no cancellation ever happens because we already timed out"), + _ = tokio::time::sleep(Duration::from_secs(121)) => {}, + } + } +} diff --git a/libs/remote_storage/tests/common/mod.rs b/libs/remote_storage/tests/common/mod.rs index bca117ed1a..da9dc08d8d 100644 --- a/libs/remote_storage/tests/common/mod.rs +++ b/libs/remote_storage/tests/common/mod.rs @@ -10,6 +10,7 @@ use futures::stream::Stream; use once_cell::sync::OnceCell; use remote_storage::{Download, GenericRemoteStorage, RemotePath}; use tokio::task::JoinSet; +use tokio_util::sync::CancellationToken; use tracing::{debug, error, info}; static LOGGING_DONE: OnceCell<()> = OnceCell::new(); @@ -58,8 +59,12 @@ pub(crate) async fn upload_simple_remote_data( ) -> ControlFlow, HashSet> { info!("Creating {upload_tasks_count} remote files"); let mut upload_tasks = JoinSet::new(); + let cancel = CancellationToken::new(); + for i in 1..upload_tasks_count + 1 { let task_client = Arc::clone(client); + let cancel = cancel.clone(); + upload_tasks.spawn(async move { let blob_path = PathBuf::from(format!("folder{}/blob_{}.txt", i / 7, i)); let blob_path = RemotePath::new( @@ -69,7 +74,9 @@ pub(crate) async fn upload_simple_remote_data( debug!("Creating remote item {i} at path {blob_path:?}"); let (data, len) = upload_stream(format!("remote blob data {i}").into_bytes().into()); - task_client.upload(data, len, &blob_path, None).await?; + task_client + .upload(data, len, &blob_path, None, &cancel) + .await?; Ok::<_, anyhow::Error>(blob_path) }); @@ -107,13 +114,15 @@ pub(crate) async fn cleanup( "Removing {} objects from the remote storage during cleanup", objects_to_delete.len() ); + let cancel = CancellationToken::new(); let mut delete_tasks = JoinSet::new(); for object_to_delete in objects_to_delete { let task_client = Arc::clone(client); + let cancel = cancel.clone(); delete_tasks.spawn(async move { debug!("Deleting remote item at path {object_to_delete:?}"); task_client - .delete(&object_to_delete) + .delete(&object_to_delete, &cancel) .await .with_context(|| format!("{object_to_delete:?} removal")) }); @@ -141,8 +150,12 @@ pub(crate) async fn upload_remote_data( ) -> ControlFlow { info!("Creating {upload_tasks_count} remote files"); let mut upload_tasks = JoinSet::new(); + let cancel = CancellationToken::new(); + for i in 1..upload_tasks_count + 1 { let task_client = Arc::clone(client); + let cancel = cancel.clone(); + upload_tasks.spawn(async move { let prefix = format!("{base_prefix_str}/sub_prefix_{i}/"); let blob_prefix = RemotePath::new(Utf8Path::new(&prefix)) @@ -152,7 +165,9 @@ pub(crate) async fn upload_remote_data( let (data, data_len) = upload_stream(format!("remote blob data {i}").into_bytes().into()); - task_client.upload(data, data_len, &blob_path, None).await?; + task_client + .upload(data, data_len, &blob_path, None, &cancel) + .await?; Ok::<_, anyhow::Error>((blob_prefix, blob_path)) }); diff --git a/libs/remote_storage/tests/common/tests.rs b/libs/remote_storage/tests/common/tests.rs index 6d062f3898..72f6f956e0 100644 --- a/libs/remote_storage/tests/common/tests.rs +++ b/libs/remote_storage/tests/common/tests.rs @@ -4,6 +4,7 @@ use remote_storage::RemotePath; use std::sync::Arc; use std::{collections::HashSet, num::NonZeroU32}; use test_context::test_context; +use tokio_util::sync::CancellationToken; use tracing::debug; use crate::common::{download_to_vec, upload_stream, wrap_stream}; @@ -45,13 +46,15 @@ async fn pagination_should_work(ctx: &mut MaybeEnabledStorageWithTestBlobs) -> a } }; + let cancel = CancellationToken::new(); + let test_client = Arc::clone(&ctx.enabled.client); let expected_remote_prefixes = ctx.remote_prefixes.clone(); let base_prefix = RemotePath::new(Utf8Path::new(ctx.enabled.base_prefix)) .context("common_prefix construction")?; let root_remote_prefixes = test_client - .list_prefixes(None) + .list_prefixes(None, &cancel) .await .context("client list root prefixes failure")? .into_iter() @@ -62,7 +65,7 @@ async fn pagination_should_work(ctx: &mut MaybeEnabledStorageWithTestBlobs) -> a ); let nested_remote_prefixes = test_client - .list_prefixes(Some(&base_prefix)) + .list_prefixes(Some(&base_prefix), &cancel) .await .context("client list nested prefixes failure")? .into_iter() @@ -99,11 +102,12 @@ async fn list_files_works(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> a anyhow::bail!("S3 init failed: {e:?}") } }; + let cancel = CancellationToken::new(); let test_client = Arc::clone(&ctx.enabled.client); let base_prefix = RemotePath::new(Utf8Path::new("folder1")).context("common_prefix construction")?; let root_files = test_client - .list_files(None, None) + .list_files(None, None, &cancel) .await .context("client list root files failure")? .into_iter() @@ -117,13 +121,13 @@ async fn list_files_works(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> a // Test that max_keys limit works. In total there are about 21 files (see // upload_simple_remote_data call in test_real_s3.rs). let limited_root_files = test_client - .list_files(None, Some(NonZeroU32::new(2).unwrap())) + .list_files(None, Some(NonZeroU32::new(2).unwrap()), &cancel) .await .context("client list root files failure")?; assert_eq!(limited_root_files.len(), 2); let nested_remote_files = test_client - .list_files(Some(&base_prefix), None) + .list_files(Some(&base_prefix), None, &cancel) .await .context("client list nested files failure")? .into_iter() @@ -150,12 +154,17 @@ async fn delete_non_exising_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Resu MaybeEnabledStorage::Disabled => return Ok(()), }; + let cancel = CancellationToken::new(); + let path = RemotePath::new(Utf8Path::new( format!("{}/for_sure_there_is_nothing_there_really", ctx.base_prefix).as_str(), )) .with_context(|| "RemotePath conversion")?; - ctx.client.delete(&path).await.expect("should succeed"); + ctx.client + .delete(&path, &cancel) + .await + .expect("should succeed"); Ok(()) } @@ -168,6 +177,8 @@ async fn delete_objects_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<( MaybeEnabledStorage::Disabled => return Ok(()), }; + let cancel = CancellationToken::new(); + let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str())) .with_context(|| "RemotePath conversion")?; @@ -178,21 +189,21 @@ async fn delete_objects_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<( .with_context(|| "RemotePath conversion")?; let (data, len) = upload_stream("remote blob data1".as_bytes().into()); - ctx.client.upload(data, len, &path1, None).await?; + ctx.client.upload(data, len, &path1, None, &cancel).await?; let (data, len) = upload_stream("remote blob data2".as_bytes().into()); - ctx.client.upload(data, len, &path2, None).await?; + ctx.client.upload(data, len, &path2, None, &cancel).await?; let (data, len) = upload_stream("remote blob data3".as_bytes().into()); - ctx.client.upload(data, len, &path3, None).await?; + ctx.client.upload(data, len, &path3, None, &cancel).await?; - ctx.client.delete_objects(&[path1, path2]).await?; + ctx.client.delete_objects(&[path1, path2], &cancel).await?; - let prefixes = ctx.client.list_prefixes(None).await?; + let prefixes = ctx.client.list_prefixes(None, &cancel).await?; assert_eq!(prefixes.len(), 1); - ctx.client.delete_objects(&[path3]).await?; + ctx.client.delete_objects(&[path3], &cancel).await?; Ok(()) } @@ -204,6 +215,8 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result< return Ok(()); }; + let cancel = CancellationToken::new(); + let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str())) .with_context(|| "RemotePath conversion")?; @@ -211,47 +224,56 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result< let (data, len) = wrap_stream(orig.clone()); - ctx.client.upload(data, len, &path, None).await?; + ctx.client.upload(data, len, &path, None, &cancel).await?; // Normal download request - let dl = ctx.client.download(&path).await?; + let dl = ctx.client.download(&path, &cancel).await?; let buf = download_to_vec(dl).await?; assert_eq!(&buf, &orig); // Full range (end specified) let dl = ctx .client - .download_byte_range(&path, 0, Some(len as u64)) + .download_byte_range(&path, 0, Some(len as u64), &cancel) .await?; let buf = download_to_vec(dl).await?; assert_eq!(&buf, &orig); // partial range (end specified) - let dl = ctx.client.download_byte_range(&path, 4, Some(10)).await?; + let dl = ctx + .client + .download_byte_range(&path, 4, Some(10), &cancel) + .await?; let buf = download_to_vec(dl).await?; assert_eq!(&buf, &orig[4..10]); // partial range (end beyond real end) let dl = ctx .client - .download_byte_range(&path, 8, Some(len as u64 * 100)) + .download_byte_range(&path, 8, Some(len as u64 * 100), &cancel) .await?; let buf = download_to_vec(dl).await?; assert_eq!(&buf, &orig[8..]); // Partial range (end unspecified) - let dl = ctx.client.download_byte_range(&path, 4, None).await?; + let dl = ctx + .client + .download_byte_range(&path, 4, None, &cancel) + .await?; let buf = download_to_vec(dl).await?; assert_eq!(&buf, &orig[4..]); // Full range (end unspecified) - let dl = ctx.client.download_byte_range(&path, 0, None).await?; + let dl = ctx + .client + .download_byte_range(&path, 0, None, &cancel) + .await?; let buf = download_to_vec(dl).await?; assert_eq!(&buf, &orig); debug!("Cleanup: deleting file at path {path:?}"); ctx.client - .delete(&path) + .delete(&path, &cancel) .await .with_context(|| format!("{path:?} removal"))?; @@ -265,6 +287,8 @@ async fn copy_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> { return Ok(()); }; + let cancel = CancellationToken::new(); + let path = RemotePath::new(Utf8Path::new( format!("{}/file_to_copy", ctx.base_prefix).as_str(), )) @@ -278,18 +302,18 @@ async fn copy_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> { let (data, len) = wrap_stream(orig.clone()); - ctx.client.upload(data, len, &path, None).await?; + ctx.client.upload(data, len, &path, None, &cancel).await?; // Normal download request - ctx.client.copy_object(&path, &path_dest).await?; + ctx.client.copy_object(&path, &path_dest, &cancel).await?; - let dl = ctx.client.download(&path_dest).await?; + let dl = ctx.client.download(&path_dest, &cancel).await?; let buf = download_to_vec(dl).await?; assert_eq!(&buf, &orig); debug!("Cleanup: deleting file at path {path:?}"); ctx.client - .delete_objects(&[path.clone(), path_dest.clone()]) + .delete_objects(&[path.clone(), path_dest.clone()], &cancel) .await .with_context(|| format!("{path:?} removal"))?; diff --git a/libs/remote_storage/tests/test_real_azure.rs b/libs/remote_storage/tests/test_real_azure.rs index 6f9a1ec6f7..6adddf52a9 100644 --- a/libs/remote_storage/tests/test_real_azure.rs +++ b/libs/remote_storage/tests/test_real_azure.rs @@ -1,9 +1,9 @@ -use std::collections::HashSet; use std::env; use std::num::NonZeroUsize; use std::ops::ControlFlow; use std::sync::Arc; use std::time::UNIX_EPOCH; +use std::{collections::HashSet, time::Duration}; use anyhow::Context; use remote_storage::{ @@ -39,6 +39,17 @@ impl EnabledAzure { base_prefix: BASE_PREFIX, } } + + #[allow(unused)] // this will be needed when moving the timeout integration tests back + fn configure_request_timeout(&mut self, timeout: Duration) { + match Arc::get_mut(&mut self.client).expect("outer Arc::get_mut") { + GenericRemoteStorage::AzureBlob(azure) => { + let azure = Arc::get_mut(azure).expect("inner Arc::get_mut"); + azure.timeout = timeout; + } + _ => unreachable!(), + } + } } enum MaybeEnabledStorage { @@ -213,6 +224,7 @@ fn create_azure_client( concurrency_limit: NonZeroUsize::new(100).unwrap(), max_keys_per_list_response, }), + timeout: Duration::from_secs(120), }; Ok(Arc::new( GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?, diff --git a/libs/remote_storage/tests/test_real_s3.rs b/libs/remote_storage/tests/test_real_s3.rs index 3dc8347c83..e927b40e80 100644 --- a/libs/remote_storage/tests/test_real_s3.rs +++ b/libs/remote_storage/tests/test_real_s3.rs @@ -1,5 +1,6 @@ use std::env; use std::fmt::{Debug, Display}; +use std::future::Future; use std::num::NonZeroUsize; use std::ops::ControlFlow; use std::sync::Arc; @@ -9,9 +10,10 @@ use std::{collections::HashSet, time::SystemTime}; use crate::common::{download_to_vec, upload_stream}; use anyhow::Context; use camino::Utf8Path; -use futures_util::Future; +use futures_util::StreamExt; use remote_storage::{ - GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config, + DownloadError, GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, + S3Config, }; use test_context::test_context; use test_context::AsyncTestContext; @@ -27,7 +29,6 @@ use common::{cleanup, ensure_logging_ready, upload_remote_data, upload_simple_re use utils::backoff; const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE"; - const BASE_PREFIX: &str = "test"; #[test_context(MaybeEnabledStorage)] @@ -69,8 +70,11 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow: ret } - async fn list_files(client: &Arc) -> anyhow::Result> { - Ok(retry(|| client.list_files(None, None)) + async fn list_files( + client: &Arc, + cancel: &CancellationToken, + ) -> anyhow::Result> { + Ok(retry(|| client.list_files(None, None, cancel)) .await .context("list root files failure")? .into_iter() @@ -90,11 +94,11 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow: retry(|| { let (data, len) = upload_stream("remote blob data1".as_bytes().into()); - ctx.client.upload(data, len, &path1, None) + ctx.client.upload(data, len, &path1, None, &cancel) }) .await?; - let t0_files = list_files(&ctx.client).await?; + let t0_files = list_files(&ctx.client, &cancel).await?; let t0 = time_point().await; println!("at t0: {t0_files:?}"); @@ -102,17 +106,17 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow: retry(|| { let (data, len) = upload_stream(old_data.as_bytes().into()); - ctx.client.upload(data, len, &path2, None) + ctx.client.upload(data, len, &path2, None, &cancel) }) .await?; - let t1_files = list_files(&ctx.client).await?; + let t1_files = list_files(&ctx.client, &cancel).await?; let t1 = time_point().await; println!("at t1: {t1_files:?}"); // A little check to ensure that our clock is not too far off from the S3 clock { - let dl = retry(|| ctx.client.download(&path2)).await?; + let dl = retry(|| ctx.client.download(&path2, &cancel)).await?; let last_modified = dl.last_modified.unwrap(); let half_wt = WAIT_TIME.mul_f32(0.5); let t0_hwt = t0 + half_wt; @@ -125,7 +129,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow: retry(|| { let (data, len) = upload_stream("remote blob data3".as_bytes().into()); - ctx.client.upload(data, len, &path3, None) + ctx.client.upload(data, len, &path3, None, &cancel) }) .await?; @@ -133,12 +137,12 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow: retry(|| { let (data, len) = upload_stream(new_data.as_bytes().into()); - ctx.client.upload(data, len, &path2, None) + ctx.client.upload(data, len, &path2, None, &cancel) }) .await?; - retry(|| ctx.client.delete(&path1)).await?; - let t2_files = list_files(&ctx.client).await?; + retry(|| ctx.client.delete(&path1, &cancel)).await?; + let t2_files = list_files(&ctx.client, &cancel).await?; let t2 = time_point().await; println!("at t2: {t2_files:?}"); @@ -147,10 +151,10 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow: ctx.client .time_travel_recover(None, t2, t_final, &cancel) .await?; - let t2_files_recovered = list_files(&ctx.client).await?; + let t2_files_recovered = list_files(&ctx.client, &cancel).await?; println!("after recovery to t2: {t2_files_recovered:?}"); assert_eq!(t2_files, t2_files_recovered); - let path2_recovered_t2 = download_to_vec(ctx.client.download(&path2).await?).await?; + let path2_recovered_t2 = download_to_vec(ctx.client.download(&path2, &cancel).await?).await?; assert_eq!(path2_recovered_t2, new_data.as_bytes()); // after recovery to t1: path1 is back, path2 has the old content @@ -158,10 +162,10 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow: ctx.client .time_travel_recover(None, t1, t_final, &cancel) .await?; - let t1_files_recovered = list_files(&ctx.client).await?; + let t1_files_recovered = list_files(&ctx.client, &cancel).await?; println!("after recovery to t1: {t1_files_recovered:?}"); assert_eq!(t1_files, t1_files_recovered); - let path2_recovered_t1 = download_to_vec(ctx.client.download(&path2).await?).await?; + let path2_recovered_t1 = download_to_vec(ctx.client.download(&path2, &cancel).await?).await?; assert_eq!(path2_recovered_t1, old_data.as_bytes()); // after recovery to t0: everything is gone except for path1 @@ -169,14 +173,14 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow: ctx.client .time_travel_recover(None, t0, t_final, &cancel) .await?; - let t0_files_recovered = list_files(&ctx.client).await?; + let t0_files_recovered = list_files(&ctx.client, &cancel).await?; println!("after recovery to t0: {t0_files_recovered:?}"); assert_eq!(t0_files, t0_files_recovered); // cleanup let paths = &[path1, path2, path3]; - retry(|| ctx.client.delete_objects(paths)).await?; + retry(|| ctx.client.delete_objects(paths, &cancel)).await?; Ok(()) } @@ -197,6 +201,16 @@ impl EnabledS3 { base_prefix: BASE_PREFIX, } } + + fn configure_request_timeout(&mut self, timeout: Duration) { + match Arc::get_mut(&mut self.client).expect("outer Arc::get_mut") { + GenericRemoteStorage::AwsS3(s3) => { + let s3 = Arc::get_mut(s3).expect("inner Arc::get_mut"); + s3.timeout = timeout; + } + _ => unreachable!(), + } + } } enum MaybeEnabledStorage { @@ -370,8 +384,169 @@ fn create_s3_client( concurrency_limit: NonZeroUsize::new(100).unwrap(), max_keys_per_list_response, }), + timeout: RemoteStorageConfig::DEFAULT_TIMEOUT, }; Ok(Arc::new( GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?, )) } + +#[test_context(MaybeEnabledStorage)] +#[tokio::test] +async fn download_is_timeouted(ctx: &mut MaybeEnabledStorage) { + let MaybeEnabledStorage::Enabled(ctx) = ctx else { + return; + }; + + let cancel = CancellationToken::new(); + + let path = RemotePath::new(Utf8Path::new( + format!("{}/file_to_copy", ctx.base_prefix).as_str(), + )) + .unwrap(); + + let len = upload_large_enough_file(&ctx.client, &path, &cancel).await; + + let timeout = std::time::Duration::from_secs(5); + + ctx.configure_request_timeout(timeout); + + let started_at = std::time::Instant::now(); + let mut stream = ctx + .client + .download(&path, &cancel) + .await + .expect("download succeeds") + .download_stream; + + if started_at.elapsed().mul_f32(0.9) >= timeout { + tracing::warn!( + elapsed_ms = started_at.elapsed().as_millis(), + "timeout might be too low, consumed most of it during headers" + ); + } + + let first = stream + .next() + .await + .expect("should have the first blob") + .expect("should have succeeded"); + + tracing::info!(len = first.len(), "downloaded first chunk"); + + assert!( + first.len() < len, + "uploaded file is too small, we downloaded all on first chunk" + ); + + tokio::time::sleep(timeout).await; + + { + let started_at = std::time::Instant::now(); + let next = stream + .next() + .await + .expect("stream should not have ended yet"); + + tracing::info!( + next.is_err = next.is_err(), + elapsed_ms = started_at.elapsed().as_millis(), + "received item after timeout" + ); + + let e = next.expect_err("expected an error, but got a chunk?"); + + let inner = e.get_ref().expect("std::io::Error::inner should be set"); + assert!( + inner + .downcast_ref::() + .is_some_and(|e| matches!(e, DownloadError::Timeout)), + "{inner:?}" + ); + } + + ctx.configure_request_timeout(RemoteStorageConfig::DEFAULT_TIMEOUT); + + ctx.client.delete_objects(&[path], &cancel).await.unwrap() +} + +#[test_context(MaybeEnabledStorage)] +#[tokio::test] +async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) { + let MaybeEnabledStorage::Enabled(ctx) = ctx else { + return; + }; + + let cancel = CancellationToken::new(); + + let path = RemotePath::new(Utf8Path::new( + format!("{}/file_to_copy", ctx.base_prefix).as_str(), + )) + .unwrap(); + + let len = upload_large_enough_file(&ctx.client, &path, &cancel).await; + + { + let mut stream = ctx + .client + .download(&path, &cancel) + .await + .expect("download succeeds") + .download_stream; + + let first = stream + .next() + .await + .expect("should have the first blob") + .expect("should have succeeded"); + + tracing::info!(len = first.len(), "downloaded first chunk"); + + assert!( + first.len() < len, + "uploaded file is too small, we downloaded all on first chunk" + ); + + cancel.cancel(); + + let next = stream.next().await.expect("stream should have more"); + + let e = next.expect_err("expected an error, but got a chunk?"); + + let inner = e.get_ref().expect("std::io::Error::inner should be set"); + assert!( + inner + .downcast_ref::() + .is_some_and(|e| matches!(e, DownloadError::Cancelled)), + "{inner:?}" + ); + } + + let cancel = CancellationToken::new(); + + ctx.client.delete_objects(&[path], &cancel).await.unwrap(); +} + +/// Upload a long enough file so that we cannot download it in single chunk +/// +/// For s3 the first chunk seems to be less than 10kB, so this has a bit of a safety margin +async fn upload_large_enough_file( + client: &GenericRemoteStorage, + path: &RemotePath, + cancel: &CancellationToken, +) -> usize { + let header = bytes::Bytes::from_static("remote blob data content".as_bytes()); + let body = bytes::Bytes::from(vec![0u8; 1024]); + let contents = std::iter::once(header).chain(std::iter::repeat(body).take(128)); + + let len = contents.clone().fold(0, |acc, next| acc + next.len()); + + let contents = futures::stream::iter(contents.map(std::io::Result::Ok)); + + client + .upload(contents, len, path, None, cancel) + .await + .expect("upload succeeds"); + + len +} diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 1989bef817..6d71ff1dd4 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -1359,6 +1359,7 @@ broker_endpoint = '{broker_endpoint}' parsed_remote_storage_config, RemoteStorageConfig { storage: RemoteStorageKind::LocalFs(local_storage_path.clone()), + timeout: RemoteStorageConfig::DEFAULT_TIMEOUT, }, "Remote storage config should correctly parse the local FS config and fill other storage defaults" ); @@ -1426,6 +1427,7 @@ broker_endpoint = '{broker_endpoint}' concurrency_limit: s3_concurrency_limit, max_keys_per_list_response: None, }), + timeout: RemoteStorageConfig::DEFAULT_TIMEOUT, }, "Remote storage config should correctly parse the S3 config" ); diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 81938b14b3..62ba702db7 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -867,6 +867,7 @@ mod test { let remote_fs_dir = harness.conf.workdir.join("remote_fs").canonicalize_utf8()?; let storage_config = RemoteStorageConfig { storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()), + timeout: RemoteStorageConfig::DEFAULT_TIMEOUT, }; let storage = GenericRemoteStorage::from_config(&storage_config).unwrap(); @@ -1170,6 +1171,7 @@ pub(crate) mod mock { pub struct ConsumerState { rx: tokio::sync::mpsc::UnboundedReceiver, executor_rx: tokio::sync::mpsc::Receiver, + cancel: CancellationToken, } impl ConsumerState { @@ -1183,7 +1185,7 @@ pub(crate) mod mock { match msg { DeleterMessage::Delete(objects) => { for path in objects { - match remote_storage.delete(&path).await { + match remote_storage.delete(&path, &self.cancel).await { Ok(_) => { debug!("Deleted {path}"); } @@ -1216,7 +1218,7 @@ pub(crate) mod mock { for path in objects { info!("Executing deletion {path}"); - match remote_storage.delete(&path).await { + match remote_storage.delete(&path, &self.cancel).await { Ok(_) => { debug!("Deleted {path}"); } @@ -1266,7 +1268,11 @@ pub(crate) mod mock { executor_tx, executed, remote_storage, - consumer: std::sync::Mutex::new(ConsumerState { rx, executor_rx }), + consumer: std::sync::Mutex::new(ConsumerState { + rx, + executor_rx, + cancel: CancellationToken::new(), + }), lsn_table: Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new())), } } diff --git a/pageserver/src/deletion_queue/deleter.rs b/pageserver/src/deletion_queue/deleter.rs index a75c73f2b1..1f04bc0410 100644 --- a/pageserver/src/deletion_queue/deleter.rs +++ b/pageserver/src/deletion_queue/deleter.rs @@ -8,6 +8,7 @@ use remote_storage::GenericRemoteStorage; use remote_storage::RemotePath; +use remote_storage::TimeoutOrCancel; use remote_storage::MAX_KEYS_PER_DELETE; use std::time::Duration; use tokio_util::sync::CancellationToken; @@ -71,9 +72,11 @@ impl Deleter { Err(anyhow::anyhow!("failpoint: deletion-queue-before-execute")) }); - self.remote_storage.delete_objects(&self.accumulator).await + self.remote_storage + .delete_objects(&self.accumulator, &self.cancel) + .await }, - |_| false, + TimeoutOrCancel::caused_by_cancel, 3, 10, "executing deletion batch", diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 88f4ae7086..e500a6123c 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -25,6 +25,7 @@ use pageserver_api::shard::ShardIdentity; use pageserver_api::shard::TenantShardId; use remote_storage::DownloadError; use remote_storage::GenericRemoteStorage; +use remote_storage::TimeoutOrCancel; use std::fmt; use storage_broker::BrokerClientChannel; use tokio::io::BufReader; @@ -3339,7 +3340,7 @@ impl Tenant { &self.cancel, ) .await - .ok_or_else(|| anyhow::anyhow!("Cancelled")) + .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel)) .and_then(|x| x) } @@ -3389,8 +3390,10 @@ impl Tenant { ); let dest_path = &remote_initdb_archive_path(&self.tenant_shard_id.tenant_id, &timeline_id); + + // if this fails, it will get retried by retried control plane requests storage - .copy_object(source_path, dest_path) + .copy_object(source_path, dest_path, &self.cancel) .await .context("copy initdb tar")?; } @@ -4031,6 +4034,7 @@ pub(crate) mod harness { std::fs::create_dir_all(&remote_fs_dir).unwrap(); let config = RemoteStorageConfig { storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()), + timeout: RemoteStorageConfig::DEFAULT_TIMEOUT, }; let remote_storage = GenericRemoteStorage::from_config(&config).unwrap(); let deletion_queue = MockDeletionQueue::new(Some(remote_storage.clone())); diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs index 0e192b577c..b64be8dcc5 100644 --- a/pageserver/src/tenant/delete.rs +++ b/pageserver/src/tenant/delete.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use anyhow::Context; use camino::{Utf8Path, Utf8PathBuf}; use pageserver_api::{models::TenantState, shard::TenantShardId}; -use remote_storage::{GenericRemoteStorage, RemotePath}; +use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel}; use tokio::sync::OwnedMutexGuard; use tokio_util::sync::CancellationToken; use tracing::{error, instrument, Instrument}; @@ -84,17 +84,17 @@ async fn create_remote_delete_mark( let data = bytes::Bytes::from_static(data); let stream = futures::stream::once(futures::future::ready(Ok(data))); remote_storage - .upload(stream, 0, &remote_mark_path, None) + .upload(stream, 0, &remote_mark_path, None, cancel) .await }, - |_e| false, + TimeoutOrCancel::caused_by_cancel, FAILED_UPLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, "mark_upload", cancel, ) .await - .ok_or_else(|| anyhow::anyhow!("Cancelled")) + .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel)) .and_then(|x| x) .context("mark_upload")?; @@ -184,15 +184,15 @@ async fn remove_tenant_remote_delete_mark( if let Some(remote_storage) = remote_storage { let path = remote_tenant_delete_mark_path(conf, tenant_shard_id)?; backoff::retry( - || async { remote_storage.delete(&path).await }, - |_e| false, + || async { remote_storage.delete(&path, cancel).await }, + TimeoutOrCancel::caused_by_cancel, FAILED_UPLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, "remove_tenant_remote_delete_mark", cancel, ) .await - .ok_or_else(|| anyhow::anyhow!("Cancelled")) + .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel)) .and_then(|x| x) .context("remove_tenant_remote_delete_mark")?; } diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 483f53d5c8..91e1179e53 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -196,14 +196,12 @@ pub(crate) use upload::upload_initdb_dir; use utils::backoff::{ self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, }; -use utils::timeout::{timeout_cancellable, TimeoutCancellableError}; use std::collections::{HashMap, VecDeque}; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Mutex}; -use std::time::Duration; -use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath}; +use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath, TimeoutOrCancel}; use std::ops::DerefMut; use tracing::{debug, error, info, instrument, warn}; use tracing::{info_span, Instrument}; @@ -263,11 +261,6 @@ pub(crate) const INITDB_PRESERVED_PATH: &str = "initdb-preserved.tar.zst"; /// Default buffer size when interfacing with [`tokio::fs::File`]. pub(crate) const BUFFER_SIZE: usize = 32 * 1024; -/// This timeout is intended to deal with hangs in lower layers, e.g. stuck TCP flows. It is not -/// intended to be snappy enough for prompt shutdown, as we have a CancellationToken for that. -pub(crate) const UPLOAD_TIMEOUT: Duration = Duration::from_secs(120); -pub(crate) const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(120); - pub enum MaybeDeletedIndexPart { IndexPart(IndexPart), Deleted(IndexPart), @@ -331,40 +324,6 @@ pub struct RemoteTimelineClient { cancel: CancellationToken, } -/// Wrapper for timeout_cancellable that flattens result and converts TimeoutCancellableError to anyhow. -/// -/// This is a convenience for the various upload functions. In future -/// the anyhow::Error result should be replaced with a more structured type that -/// enables callers to avoid handling shutdown as an error. -async fn upload_cancellable(cancel: &CancellationToken, future: F) -> anyhow::Result<()> -where - F: std::future::Future>, -{ - match timeout_cancellable(UPLOAD_TIMEOUT, cancel, future).await { - Ok(Ok(())) => Ok(()), - Ok(Err(e)) => Err(e), - Err(TimeoutCancellableError::Timeout) => Err(anyhow::anyhow!("Timeout")), - Err(TimeoutCancellableError::Cancelled) => Err(anyhow::anyhow!("Shutting down")), - } -} -/// Wrapper for timeout_cancellable that flattens result and converts TimeoutCancellableError to DownloaDError. -async fn download_cancellable( - cancel: &CancellationToken, - future: F, -) -> Result -where - F: std::future::Future>, -{ - match timeout_cancellable(DOWNLOAD_TIMEOUT, cancel, future).await { - Ok(Ok(r)) => Ok(r), - Ok(Err(e)) => Err(e), - Err(TimeoutCancellableError::Timeout) => { - Err(DownloadError::Other(anyhow::anyhow!("Timed out"))) - } - Err(TimeoutCancellableError::Cancelled) => Err(DownloadError::Cancelled), - } -} - impl RemoteTimelineClient { /// /// Create a remote storage client for given timeline @@ -1050,7 +1009,7 @@ impl RemoteTimelineClient { &self.cancel, ) .await - .ok_or_else(|| anyhow::anyhow!("Cancelled")) + .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel)) .and_then(|x| x)?; // all good, disarm the guard and mark as success @@ -1082,14 +1041,14 @@ impl RemoteTimelineClient { upload::preserve_initdb_archive(&self.storage_impl, tenant_id, timeline_id, cancel) .await }, - |_e| false, + TimeoutOrCancel::caused_by_cancel, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, "preserve_initdb_tar_zst", &cancel.clone(), ) .await - .ok_or_else(|| anyhow::anyhow!("Cancellled")) + .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel)) .and_then(|x| x) .context("backing up initdb archive")?; Ok(()) @@ -1151,7 +1110,7 @@ impl RemoteTimelineClient { let remaining = download_retry( || async { self.storage_impl - .list_files(Some(&timeline_storage_path), None) + .list_files(Some(&timeline_storage_path), None, &cancel) .await }, "list remaining files", @@ -1445,6 +1404,10 @@ impl RemoteTimelineClient { Ok(()) => { break; } + Err(e) if TimeoutOrCancel::caused_by_cancel(&e) => { + // loop around to do the proper stopping + continue; + } Err(e) => { let retries = task.retries.fetch_add(1, Ordering::SeqCst); diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index e755cd08f3..43f5e6c182 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -11,16 +11,14 @@ use camino::{Utf8Path, Utf8PathBuf}; use pageserver_api::shard::TenantShardId; use tokio::fs::{self, File, OpenOptions}; use tokio::io::{AsyncSeekExt, AsyncWriteExt}; +use tokio_util::io::StreamReader; use tokio_util::sync::CancellationToken; use tracing::warn; -use utils::timeout::timeout_cancellable; use utils::{backoff, crashsafe}; use crate::config::PageServerConf; use crate::span::debug_assert_current_span_has_tenant_and_timeline_id; -use crate::tenant::remote_timeline_client::{ - download_cancellable, remote_layer_path, remote_timelines_path, DOWNLOAD_TIMEOUT, -}; +use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path}; use crate::tenant::storage_layer::LayerFileName; use crate::tenant::Generation; use crate::virtual_file::on_fatal_io_error; @@ -83,15 +81,13 @@ pub async fn download_layer_file<'a>( .with_context(|| format!("create a destination file for layer '{temp_file_path}'")) .map_err(DownloadError::Other)?; - // Cancellation safety: it is safe to cancel this future, because it isn't writing to a local - // file: the write to local file doesn't start until after the request header is returned - // and we start draining the body stream below - let download = download_cancellable(cancel, storage.download(&remote_path)) + let download = storage + .download(&remote_path, cancel) .await .with_context(|| { format!( - "open a download stream for layer with remote storage path '{remote_path:?}'" - ) + "open a download stream for layer with remote storage path '{remote_path:?}'" + ) }) .map_err(DownloadError::Other)?; @@ -100,43 +96,26 @@ pub async fn download_layer_file<'a>( let mut reader = tokio_util::io::StreamReader::new(download.download_stream); - // Cancellation safety: it is safe to cancel this future because it is writing into a temporary file, - // and we will unlink the temporary file if there is an error. This unlink is important because we - // are in a retry loop, and we wouldn't want to leave behind a rogue write I/O to a file that - // we will imminiently try and write to again. - let bytes_amount: u64 = match timeout_cancellable( - DOWNLOAD_TIMEOUT, - cancel, - tokio::io::copy_buf(&mut reader, &mut destination_file), - ) - .await - .with_context(|| { - format!( + let bytes_amount = tokio::io::copy_buf(&mut reader, &mut destination_file) + .await + .with_context(|| format!( "download layer at remote path '{remote_path:?}' into file {temp_file_path:?}" - ) - }) - .map_err(DownloadError::Other)? - { - Ok(b) => Ok(b), + )) + .map_err(DownloadError::Other); + + match bytes_amount { + Ok(bytes_amount) => { + let destination_file = destination_file.into_inner(); + Ok((destination_file, bytes_amount)) + } Err(e) => { - // Remove incomplete files: on restart Timeline would do this anyway, but we must - // do it here for the retry case. if let Err(e) = tokio::fs::remove_file(&temp_file_path).await { on_fatal_io_error(&e, &format!("Removing temporary file {temp_file_path}")); } + Err(e) } } - .with_context(|| { - format!( - "download layer at remote path '{remote_path:?}' into file {temp_file_path:?}" - ) - }) - .map_err(DownloadError::Other)?; - - let destination_file = destination_file.into_inner(); - - Ok((destination_file, bytes_amount)) }, &format!("download {remote_path:?}"), cancel, @@ -218,9 +197,11 @@ pub async fn list_remote_timelines( let listing = download_retry_forever( || { - download_cancellable( + storage.list( + Some(&remote_path), + ListingMode::WithDelimiter, + None, &cancel, - storage.list(Some(&remote_path), ListingMode::WithDelimiter, None), ) }, &format!("list timelines for {tenant_shard_id}"), @@ -259,26 +240,23 @@ async fn do_download_index_part( index_generation: Generation, cancel: &CancellationToken, ) -> Result { - use futures::stream::StreamExt; - let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation); let index_part_bytes = download_retry_forever( || async { - // Cancellation: if is safe to cancel this future because we're just downloading into - // a memory buffer, not touching local disk. - let index_part_download = - download_cancellable(cancel, storage.download(&remote_path)).await?; + let download = storage.download(&remote_path, cancel).await?; - let mut index_part_bytes = Vec::new(); - let mut stream = std::pin::pin!(index_part_download.download_stream); - while let Some(chunk) = stream.next().await { - let chunk = chunk - .with_context(|| format!("download index part at {remote_path:?}")) - .map_err(DownloadError::Other)?; - index_part_bytes.extend_from_slice(&chunk[..]); - } - Ok(index_part_bytes) + let mut bytes = Vec::new(); + + let stream = download.download_stream; + let mut stream = StreamReader::new(stream); + + tokio::io::copy_buf(&mut stream, &mut bytes) + .await + .with_context(|| format!("download index part at {remote_path:?}")) + .map_err(DownloadError::Other)?; + + Ok(bytes) }, &format!("download {remote_path:?}"), cancel, @@ -373,7 +351,7 @@ pub(super) async fn download_index_part( let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none()); let indices = download_retry( - || async { storage.list_files(Some(&index_prefix), None).await }, + || async { storage.list_files(Some(&index_prefix), None, cancel).await }, "list index_part files", cancel, ) @@ -446,11 +424,10 @@ pub(crate) async fn download_initdb_tar_zst( .with_context(|| format!("tempfile creation {temp_path}")) .map_err(DownloadError::Other)?; - let download = match download_cancellable(cancel, storage.download(&remote_path)).await - { + let download = match storage.download(&remote_path, cancel).await { Ok(dl) => dl, Err(DownloadError::NotFound) => { - download_cancellable(cancel, storage.download(&remote_preserved_path)).await? + storage.download(&remote_preserved_path, cancel).await? } Err(other) => Err(other)?, }; @@ -460,6 +437,7 @@ pub(crate) async fn download_initdb_tar_zst( // TODO: this consumption of the response body should be subject to timeout + cancellation, but // not without thinking carefully about how to recover safely from cancelling a write to // local storage (e.g. by writing into a temp file as we do in download_layer) + // FIXME: flip the weird error wrapping tokio::io::copy_buf(&mut download, &mut writer) .await .with_context(|| format!("download initdb.tar.zst at {remote_path:?}")) diff --git a/pageserver/src/tenant/remote_timeline_client/upload.rs b/pageserver/src/tenant/remote_timeline_client/upload.rs index c17e27b446..137fe48b73 100644 --- a/pageserver/src/tenant/remote_timeline_client/upload.rs +++ b/pageserver/src/tenant/remote_timeline_client/upload.rs @@ -16,7 +16,7 @@ use crate::{ config::PageServerConf, tenant::remote_timeline_client::{ index::IndexPart, remote_index_path, remote_initdb_archive_path, - remote_initdb_preserved_archive_path, remote_path, upload_cancellable, + remote_initdb_preserved_archive_path, remote_path, }, }; use remote_storage::{GenericRemoteStorage, TimeTravelError}; @@ -49,16 +49,15 @@ pub(crate) async fn upload_index_part<'a>( let index_part_bytes = bytes::Bytes::from(index_part_bytes); let remote_path = remote_index_path(tenant_shard_id, timeline_id, generation); - upload_cancellable( - cancel, - storage.upload_storage_object( + storage + .upload_storage_object( futures::stream::once(futures::future::ready(Ok(index_part_bytes))), index_part_size, &remote_path, - ), - ) - .await - .with_context(|| format!("upload index part for '{tenant_shard_id} / {timeline_id}'")) + cancel, + ) + .await + .with_context(|| format!("upload index part for '{tenant_shard_id} / {timeline_id}'")) } /// Attempts to upload given layer files. @@ -115,11 +114,10 @@ pub(super) async fn upload_timeline_layer<'a>( let reader = tokio_util::io::ReaderStream::with_capacity(source_file, super::BUFFER_SIZE); - upload_cancellable(cancel, storage.upload(reader, fs_size, &storage_path, None)) + storage + .upload(reader, fs_size, &storage_path, None, cancel) .await - .with_context(|| format!("upload layer from local path '{source_path}'"))?; - - Ok(()) + .with_context(|| format!("upload layer from local path '{source_path}'")) } /// Uploads the given `initdb` data to the remote storage. @@ -139,12 +137,10 @@ pub(crate) async fn upload_initdb_dir( let file = tokio_util::io::ReaderStream::with_capacity(initdb_tar_zst, super::BUFFER_SIZE); let remote_path = remote_initdb_archive_path(tenant_id, timeline_id); - upload_cancellable( - cancel, - storage.upload_storage_object(file, size as usize, &remote_path), - ) - .await - .with_context(|| format!("upload initdb dir for '{tenant_id} / {timeline_id}'")) + storage + .upload_storage_object(file, size as usize, &remote_path, cancel) + .await + .with_context(|| format!("upload initdb dir for '{tenant_id} / {timeline_id}'")) } pub(crate) async fn preserve_initdb_archive( @@ -155,7 +151,8 @@ pub(crate) async fn preserve_initdb_archive( ) -> anyhow::Result<()> { let source_path = remote_initdb_archive_path(tenant_id, timeline_id); let dest_path = remote_initdb_preserved_archive_path(tenant_id, timeline_id); - upload_cancellable(cancel, storage.copy_object(&source_path, &dest_path)) + storage + .copy_object(&source_path, &dest_path, cancel) .await .with_context(|| format!("backing up initdb archive for '{tenant_id} / {timeline_id}'")) } diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index c23416a7f0..6966cf7709 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -523,12 +523,13 @@ impl<'a> TenantDownloader<'a> { tracing::debug!("Downloading heatmap for secondary tenant",); let heatmap_path = remote_heatmap_path(tenant_shard_id); + let cancel = &self.secondary_state.cancel; let heatmap_bytes = backoff::retry( || async { let download = self .remote_storage - .download(&heatmap_path) + .download(&heatmap_path, cancel) .await .map_err(UpdateError::from)?; let mut heatmap_bytes = Vec::new(); @@ -540,7 +541,7 @@ impl<'a> TenantDownloader<'a> { FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, "download heatmap", - &self.secondary_state.cancel, + cancel, ) .await .ok_or_else(|| UpdateError::Cancelled) diff --git a/pageserver/src/tenant/secondary/heatmap_uploader.rs b/pageserver/src/tenant/secondary/heatmap_uploader.rs index 806e3fb0e8..660459a733 100644 --- a/pageserver/src/tenant/secondary/heatmap_uploader.rs +++ b/pageserver/src/tenant/secondary/heatmap_uploader.rs @@ -21,18 +21,17 @@ use futures::Future; use md5; use pageserver_api::shard::TenantShardId; use rand::Rng; -use remote_storage::GenericRemoteStorage; +use remote_storage::{GenericRemoteStorage, TimeoutOrCancel}; use super::{ + heatmap::HeatMapTenant, scheduler::{self, JobGenerator, RunningJob, SchedulingResult, TenantBackgroundJobs}, - CommandRequest, + CommandRequest, UploadCommand, }; use tokio_util::sync::CancellationToken; use tracing::{info_span, instrument, Instrument}; use utils::{backoff, completion::Barrier, yielding_loop::yielding_loop}; -use super::{heatmap::HeatMapTenant, UploadCommand}; - pub(super) async fn heatmap_uploader_task( tenant_manager: Arc, remote_storage: GenericRemoteStorage, @@ -417,10 +416,10 @@ async fn upload_tenant_heatmap( || async { let bytes = futures::stream::once(futures::future::ready(Ok(bytes.clone()))); remote_storage - .upload_storage_object(bytes, size, &path) + .upload_storage_object(bytes, size, &path, cancel) .await }, - |_| false, + TimeoutOrCancel::caused_by_cancel, 3, u32::MAX, "Uploading heatmap", diff --git a/proxy/src/context/parquet.rs b/proxy/src/context/parquet.rs index ad22829183..d941445c2d 100644 --- a/proxy/src/context/parquet.rs +++ b/proxy/src/context/parquet.rs @@ -13,7 +13,7 @@ use parquet::{ }, record::RecordWriter, }; -use remote_storage::{GenericRemoteStorage, RemotePath, RemoteStorageConfig}; +use remote_storage::{GenericRemoteStorage, RemotePath, RemoteStorageConfig, TimeoutOrCancel}; use tokio::{sync::mpsc, time}; use tokio_util::sync::CancellationToken; use tracing::{debug, info, Span}; @@ -314,20 +314,23 @@ async fn upload_parquet( let path = RemotePath::from_string(&format!( "{year:04}/{month:02}/{day:02}/{hour:02}/requests_{id}.parquet" ))?; + let cancel = CancellationToken::new(); backoff::retry( || async { let stream = futures::stream::once(futures::future::ready(Ok(data.clone()))); - storage.upload(stream, data.len(), &path, None).await + storage + .upload(stream, data.len(), &path, None, &cancel) + .await }, - |_e| false, + TimeoutOrCancel::caused_by_cancel, FAILED_UPLOAD_WARN_THRESHOLD, FAILED_UPLOAD_MAX_RETRIES, "request_data_upload", // we don't want cancellation to interrupt here, so we make a dummy cancel token - &CancellationToken::new(), + &cancel, ) .await - .ok_or_else(|| anyhow::anyhow!("Cancelled")) + .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel)) .and_then(|x| x) .context("request_data_upload")?; @@ -413,7 +416,8 @@ mod tests { ) .unwrap(), max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, - }) + }), + timeout: RemoteStorageConfig::DEFAULT_TIMEOUT, }) ); assert_eq!(parquet_upload.parquet_upload_row_group_size, 100); @@ -466,6 +470,7 @@ mod tests { ) -> Vec<(u64, usize, i64)> { let remote_storage_config = RemoteStorageConfig { storage: RemoteStorageKind::LocalFs(tmpdir.to_path_buf()), + timeout: std::time::Duration::from_secs(120), }; let storage = GenericRemoteStorage::from_config(&remote_storage_config).unwrap(); diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index dbdc742d26..944d80f777 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -511,7 +511,11 @@ async fn backup_object( let file = tokio_util::io::ReaderStream::with_capacity(file, BUFFER_SIZE); - storage.upload_storage_object(file, size, target_file).await + let cancel = CancellationToken::new(); + + storage + .upload_storage_object(file, size, target_file, &cancel) + .await } pub async fn read_object( @@ -526,8 +530,10 @@ pub async fn read_object( info!("segment download about to start from remote path {file_path:?} at offset {offset}"); + let cancel = CancellationToken::new(); + let download = storage - .download_storage_object(Some((offset, None)), file_path) + .download_storage_object(Some((offset, None)), file_path, &cancel) .await .with_context(|| { format!("Failed to open WAL segment download stream for remote path {file_path:?}") @@ -559,7 +565,8 @@ pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> { // Note: listing segments might take a long time if there are many of them. // We don't currently have http requests timeout cancellation, but if/once // we have listing should get streaming interface to make progress. - let token = CancellationToken::new(); // not really used + + let cancel = CancellationToken::new(); // not really used backoff::retry( || async { // Do list-delete in batch_size batches to make progress even if there a lot of files. @@ -567,7 +574,7 @@ pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> { // I'm not sure deleting while iterating is expected in s3. loop { let files = storage - .list_files(Some(&remote_path), Some(batch_size)) + .list_files(Some(&remote_path), Some(batch_size), &cancel) .await?; if files.is_empty() { return Ok(()); // done @@ -580,14 +587,15 @@ pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> { files.first().unwrap().object_name().unwrap_or(""), files.last().unwrap().object_name().unwrap_or("") ); - storage.delete_objects(&files).await?; + storage.delete_objects(&files, &cancel).await?; } }, + // consider TimeoutOrCancel::caused_by_cancel when using cancellation |_| false, 3, 10, "executing WAL segments deletion batch", - &token, + &cancel, ) .await .ok_or_else(|| anyhow::anyhow!("canceled")) @@ -617,7 +625,12 @@ pub async fn copy_s3_segments( let remote_path = RemotePath::new(&relative_dst_path)?; - let files = storage.list_files(Some(&remote_path), None).await?; + let cancel = CancellationToken::new(); + + let files = storage + .list_files(Some(&remote_path), None, &cancel) + .await?; + let uploaded_segments = &files .iter() .filter_map(|file| file.object_name().map(ToOwned::to_owned)) @@ -645,7 +658,7 @@ pub async fn copy_s3_segments( let from = RemotePath::new(&relative_src_path.join(&segment_name))?; let to = RemotePath::new(&relative_dst_path.join(&segment_name))?; - storage.copy_object(&from, &to).await?; + storage.copy_object(&from, &to, &cancel).await?; } info!(