move timeouts and cancellation handling to remote_storage (#6697)

Cancellation and timeouts are handled at remote_storage callsites, if
they are. However they should always be handled, because we've had
transient problems with remote storage connections.

- Add cancellation token to the `trait RemoteStorage` methods
- For `download*`, `list*` methods there is
`DownloadError::{Cancelled,Timeout}`
- For the rest now using `anyhow::Error`, it will have root cause
`remote_storage::TimeoutOrCancel::{Cancel,Timeout}`
- Both types have `::is_permanent` equivalent which should be passed to
`backoff::retry`
- New generic RemoteStorageConfig option `timeout`, defaults to 120s
- Start counting timeouts only after acquiring concurrency limiter
permit
- Cancellable permit acquiring
- Download stream timeout or cancellation is communicated via an
`std::io::Error`
- Exit backoff::retry by marking cancellation errors permanent

Fixes: #6096
Closes: #4781

Co-authored-by: arpad-m <arpad-m@users.noreply.github.com>
This commit is contained in:
Joonas Koivunen
2024-02-15 01:24:07 +02:00
committed by GitHub
parent 024372a3db
commit 80854b98ff
25 changed files with 1712 additions and 726 deletions

2
Cargo.lock generated
View File

@@ -4436,6 +4436,7 @@ dependencies = [
"futures",
"futures-util",
"http-types",
"humantime",
"hyper",
"itertools",
"metrics",
@@ -4447,6 +4448,7 @@ dependencies = [
"serde_json",
"test-context",
"tokio",
"tokio-stream",
"tokio-util",
"toml_edit",
"tracing",

View File

@@ -15,11 +15,13 @@ aws-sdk-s3.workspace = true
aws-credential-types.workspace = true
bytes.workspace = true
camino.workspace = true
humantime.workspace = true
hyper = { workspace = true, features = ["stream"] }
futures.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio = { workspace = true, features = ["sync", "fs", "io-util"] }
tokio-stream.workspace = true
tokio-util = { workspace = true, features = ["compat"] }
toml_edit.workspace = true
tracing.workspace = true

View File

@@ -22,16 +22,15 @@ use azure_storage_blobs::{blob::operations::GetBlobBuilder, prelude::ContainerCl
use bytes::Bytes;
use futures::stream::Stream;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use http_types::{StatusCode, Url};
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use crate::s3_bucket::RequestKind;
use crate::TimeTravelError;
use crate::{
AzureConfig, ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath,
RemoteStorage, StorageMetadata,
error::Cancelled, s3_bucket::RequestKind, AzureConfig, ConcurrencyLimiter, Download,
DownloadError, Listing, ListingMode, RemotePath, RemoteStorage, StorageMetadata,
TimeTravelError, TimeoutOrCancel,
};
pub struct AzureBlobStorage {
@@ -39,10 +38,12 @@ pub struct AzureBlobStorage {
prefix_in_container: Option<String>,
max_keys_per_list_response: Option<NonZeroU32>,
concurrency_limiter: ConcurrencyLimiter,
// Per-request timeout. Accessible for tests.
pub timeout: Duration,
}
impl AzureBlobStorage {
pub fn new(azure_config: &AzureConfig) -> Result<Self> {
pub fn new(azure_config: &AzureConfig, timeout: Duration) -> Result<Self> {
debug!(
"Creating azure remote storage for azure container {}",
azure_config.container_name
@@ -79,6 +80,7 @@ impl AzureBlobStorage {
prefix_in_container: azure_config.prefix_in_container.to_owned(),
max_keys_per_list_response,
concurrency_limiter: ConcurrencyLimiter::new(azure_config.concurrency_limit.get()),
timeout,
})
}
@@ -121,8 +123,11 @@ impl AzureBlobStorage {
async fn download_for_builder(
&self,
builder: GetBlobBuilder,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
let mut response = builder.into_stream();
let kind = RequestKind::Get;
let _permit = self.permit(kind, cancel).await?;
let mut etag = None;
let mut last_modified = None;
@@ -130,39 +135,70 @@ impl AzureBlobStorage {
// TODO give proper streaming response instead of buffering into RAM
// https://github.com/neondatabase/neon/issues/5563
let mut bufs = Vec::new();
while let Some(part) = response.next().await {
let part = part.map_err(to_download_error)?;
let etag_str: &str = part.blob.properties.etag.as_ref();
if etag.is_none() {
etag = Some(etag.unwrap_or_else(|| etag_str.to_owned()));
let download = async {
let response = builder
// convert to concrete Pageable
.into_stream()
// convert to TryStream
.into_stream()
.map_err(to_download_error);
// apply per request timeout
let response = tokio_stream::StreamExt::timeout(response, self.timeout);
// flatten
let response = response.map(|res| match res {
Ok(res) => res,
Err(_elapsed) => Err(DownloadError::Timeout),
});
let mut response = std::pin::pin!(response);
let mut bufs = Vec::new();
while let Some(part) = response.next().await {
let part = part?;
let etag_str: &str = part.blob.properties.etag.as_ref();
if etag.is_none() {
etag = Some(etag.unwrap_or_else(|| etag_str.to_owned()));
}
if last_modified.is_none() {
last_modified = Some(part.blob.properties.last_modified.into());
}
if let Some(blob_meta) = part.blob.metadata {
metadata.extend(blob_meta.iter().map(|(k, v)| (k.to_owned(), v.to_owned())));
}
let data = part
.data
.collect()
.await
.map_err(|e| DownloadError::Other(e.into()))?;
bufs.push(data);
}
if last_modified.is_none() {
last_modified = Some(part.blob.properties.last_modified.into());
}
if let Some(blob_meta) = part.blob.metadata {
metadata.extend(blob_meta.iter().map(|(k, v)| (k.to_owned(), v.to_owned())));
}
let data = part
.data
.collect()
.await
.map_err(|e| DownloadError::Other(e.into()))?;
bufs.push(data);
Ok(Download {
download_stream: Box::pin(futures::stream::iter(bufs.into_iter().map(Ok))),
etag,
last_modified,
metadata: Some(StorageMetadata(metadata)),
})
};
tokio::select! {
bufs = download => bufs,
_ = cancel.cancelled() => Err(DownloadError::Cancelled),
}
Ok(Download {
download_stream: Box::pin(futures::stream::iter(bufs.into_iter().map(Ok))),
etag,
last_modified,
metadata: Some(StorageMetadata(metadata)),
})
}
async fn permit(&self, kind: RequestKind) -> tokio::sync::SemaphorePermit<'_> {
self.concurrency_limiter
.acquire(kind)
.await
.expect("semaphore is never closed")
async fn permit(
&self,
kind: RequestKind,
cancel: &CancellationToken,
) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
let acquire = self.concurrency_limiter.acquire(kind);
tokio::select! {
permit = acquire => Ok(permit.expect("never closed")),
_ = cancel.cancelled() => Err(Cancelled),
}
}
}
@@ -192,66 +228,87 @@ impl RemoteStorage for AzureBlobStorage {
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> anyhow::Result<Listing, DownloadError> {
// get the passed prefix or if it is not set use prefix_in_bucket value
let list_prefix = prefix
.map(|p| self.relative_path_to_name(p))
.or_else(|| self.prefix_in_container.clone())
.map(|mut p| {
// required to end with a separator
// otherwise request will return only the entry of a prefix
if matches!(mode, ListingMode::WithDelimiter)
&& !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR)
{
p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
}
p
let _permit = self.permit(RequestKind::List, cancel).await?;
let op = async {
// get the passed prefix or if it is not set use prefix_in_bucket value
let list_prefix = prefix
.map(|p| self.relative_path_to_name(p))
.or_else(|| self.prefix_in_container.clone())
.map(|mut p| {
// required to end with a separator
// otherwise request will return only the entry of a prefix
if matches!(mode, ListingMode::WithDelimiter)
&& !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR)
{
p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
}
p
});
let mut builder = self.client.list_blobs();
if let ListingMode::WithDelimiter = mode {
builder = builder.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
}
if let Some(prefix) = list_prefix {
builder = builder.prefix(Cow::from(prefix.to_owned()));
}
if let Some(limit) = self.max_keys_per_list_response {
builder = builder.max_results(MaxResults::new(limit));
}
let response = builder.into_stream();
let response = response.into_stream().map_err(to_download_error);
let response = tokio_stream::StreamExt::timeout(response, self.timeout);
let response = response.map(|res| match res {
Ok(res) => res,
Err(_elapsed) => Err(DownloadError::Timeout),
});
let mut builder = self.client.list_blobs();
let mut response = std::pin::pin!(response);
if let ListingMode::WithDelimiter = mode {
builder = builder.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
}
let mut res = Listing::default();
if let Some(prefix) = list_prefix {
builder = builder.prefix(Cow::from(prefix.to_owned()));
}
let mut max_keys = max_keys.map(|mk| mk.get());
while let Some(entry) = response.next().await {
let entry = entry?;
let prefix_iter = entry
.blobs
.prefixes()
.map(|prefix| self.name_to_relative_path(&prefix.name));
res.prefixes.extend(prefix_iter);
if let Some(limit) = self.max_keys_per_list_response {
builder = builder.max_results(MaxResults::new(limit));
}
let blob_iter = entry
.blobs
.blobs()
.map(|k| self.name_to_relative_path(&k.name));
let mut response = builder.into_stream();
let mut res = Listing::default();
// NonZeroU32 doesn't support subtraction apparently
let mut max_keys = max_keys.map(|mk| mk.get());
while let Some(l) = response.next().await {
let entry = l.map_err(to_download_error)?;
let prefix_iter = entry
.blobs
.prefixes()
.map(|prefix| self.name_to_relative_path(&prefix.name));
res.prefixes.extend(prefix_iter);
for key in blob_iter {
res.keys.push(key);
let blob_iter = entry
.blobs
.blobs()
.map(|k| self.name_to_relative_path(&k.name));
for key in blob_iter {
res.keys.push(key);
if let Some(mut mk) = max_keys {
assert!(mk > 0);
mk -= 1;
if mk == 0 {
return Ok(res); // limit reached
if let Some(mut mk) = max_keys {
assert!(mk > 0);
mk -= 1;
if mk == 0 {
return Ok(res); // limit reached
}
max_keys = Some(mk);
}
max_keys = Some(mk);
}
}
Ok(res)
};
tokio::select! {
res = op => res,
_ = cancel.cancelled() => Err(DownloadError::Cancelled),
}
Ok(res)
}
async fn upload(
@@ -260,35 +317,52 @@ impl RemoteStorage for AzureBlobStorage {
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let _permit = self.permit(RequestKind::Put).await;
let blob_client = self.client.blob_client(self.relative_path_to_name(to));
let _permit = self.permit(RequestKind::Put, cancel).await?;
let from: Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static>> =
Box::pin(from);
let op = async {
let blob_client = self.client.blob_client(self.relative_path_to_name(to));
let from = NonSeekableStream::new(from, data_size_bytes);
let from: Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static>> =
Box::pin(from);
let body = azure_core::Body::SeekableStream(Box::new(from));
let from = NonSeekableStream::new(from, data_size_bytes);
let mut builder = blob_client.put_block_blob(body);
let body = azure_core::Body::SeekableStream(Box::new(from));
if let Some(metadata) = metadata {
builder = builder.metadata(to_azure_metadata(metadata));
let mut builder = blob_client.put_block_blob(body);
if let Some(metadata) = metadata {
builder = builder.metadata(to_azure_metadata(metadata));
}
let fut = builder.into_future();
let fut = tokio::time::timeout(self.timeout, fut);
match fut.await {
Ok(Ok(_response)) => Ok(()),
Ok(Err(azure)) => Err(azure.into()),
Err(_timeout) => Err(TimeoutOrCancel::Cancel.into()),
}
};
tokio::select! {
res = op => res,
_ = cancel.cancelled() => Err(TimeoutOrCancel::Cancel.into()),
}
let _response = builder.into_future().await?;
Ok(())
}
async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
let _permit = self.permit(RequestKind::Get).await;
async fn download(
&self,
from: &RemotePath,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
let blob_client = self.client.blob_client(self.relative_path_to_name(from));
let builder = blob_client.get();
self.download_for_builder(builder).await
self.download_for_builder(builder, cancel).await
}
async fn download_byte_range(
@@ -296,8 +370,8 @@ impl RemoteStorage for AzureBlobStorage {
from: &RemotePath,
start_inclusive: u64,
end_exclusive: Option<u64>,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
let _permit = self.permit(RequestKind::Get).await;
let blob_client = self.client.blob_client(self.relative_path_to_name(from));
let mut builder = blob_client.get();
@@ -309,82 +383,113 @@ impl RemoteStorage for AzureBlobStorage {
};
builder = builder.range(range);
self.download_for_builder(builder).await
self.download_for_builder(builder, cancel).await
}
async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
let _permit = self.permit(RequestKind::Delete).await;
let blob_client = self.client.blob_client(self.relative_path_to_name(path));
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
self.delete_objects(std::array::from_ref(path), cancel)
.await
}
let builder = blob_client.delete();
async fn delete_objects<'a>(
&self,
paths: &'a [RemotePath],
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let _permit = self.permit(RequestKind::Delete, cancel).await?;
match builder.into_future().await {
Ok(_response) => Ok(()),
Err(e) => {
if let Some(http_err) = e.as_http_error() {
if http_err.status() == StatusCode::NotFound {
return Ok(());
let op = async {
// TODO batch requests are also not supported by the SDK
// https://github.com/Azure/azure-sdk-for-rust/issues/1068
// https://github.com/Azure/azure-sdk-for-rust/issues/1249
for path in paths {
let blob_client = self.client.blob_client(self.relative_path_to_name(path));
let request = blob_client.delete().into_future();
let res = tokio::time::timeout(self.timeout, request).await;
match res {
Ok(Ok(_response)) => continue,
Ok(Err(e)) => {
if let Some(http_err) = e.as_http_error() {
if http_err.status() == StatusCode::NotFound {
continue;
}
}
return Err(e.into());
}
Err(_elapsed) => return Err(TimeoutOrCancel::Timeout.into()),
}
Err(anyhow::Error::new(e))
}
Ok(())
};
tokio::select! {
res = op => res,
_ = cancel.cancelled() => Err(TimeoutOrCancel::Cancel.into()),
}
}
async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
// Permit is already obtained by inner delete function
async fn copy(
&self,
from: &RemotePath,
to: &RemotePath,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let _permit = self.permit(RequestKind::Copy, cancel).await?;
// TODO batch requests are also not supported by the SDK
// https://github.com/Azure/azure-sdk-for-rust/issues/1068
// https://github.com/Azure/azure-sdk-for-rust/issues/1249
for path in paths {
self.delete(path).await?;
}
Ok(())
}
let timeout = tokio::time::sleep(self.timeout);
async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
let _permit = self.permit(RequestKind::Copy).await;
let blob_client = self.client.blob_client(self.relative_path_to_name(to));
let mut copy_status = None;
let source_url = format!(
"{}/{}",
self.client.url()?,
self.relative_path_to_name(from)
);
let builder = blob_client.copy(Url::from_str(&source_url)?);
let op = async {
let blob_client = self.client.blob_client(self.relative_path_to_name(to));
let result = builder.into_future().await?;
let source_url = format!(
"{}/{}",
self.client.url()?,
self.relative_path_to_name(from)
);
let mut copy_status = result.copy_status;
let start_time = Instant::now();
const MAX_WAIT_TIME: Duration = Duration::from_secs(60);
loop {
match copy_status {
CopyStatus::Aborted => {
anyhow::bail!("Received abort for copy from {from} to {to}.");
let builder = blob_client.copy(Url::from_str(&source_url)?);
let copy = builder.into_future();
let result = copy.await?;
copy_status = Some(result.copy_status);
loop {
match copy_status.as_ref().expect("we always set it to Some") {
CopyStatus::Aborted => {
anyhow::bail!("Received abort for copy from {from} to {to}.");
}
CopyStatus::Failed => {
anyhow::bail!("Received failure response for copy from {from} to {to}.");
}
CopyStatus::Success => return Ok(()),
CopyStatus::Pending => (),
}
CopyStatus::Failed => {
anyhow::bail!("Received failure response for copy from {from} to {to}.");
}
CopyStatus::Success => return Ok(()),
CopyStatus::Pending => (),
// The copy is taking longer. Waiting a second and then re-trying.
// TODO estimate time based on copy_progress and adjust time based on that
tokio::time::sleep(Duration::from_millis(1000)).await;
let properties = blob_client.get_properties().into_future().await?;
let Some(status) = properties.blob.properties.copy_status else {
tracing::warn!("copy_status for copy is None!, from={from}, to={to}");
return Ok(());
};
copy_status = Some(status);
}
// The copy is taking longer. Waiting a second and then re-trying.
// TODO estimate time based on copy_progress and adjust time based on that
tokio::time::sleep(Duration::from_millis(1000)).await;
let properties = blob_client.get_properties().into_future().await?;
let Some(status) = properties.blob.properties.copy_status else {
tracing::warn!("copy_status for copy is None!, from={from}, to={to}");
return Ok(());
};
if start_time.elapsed() > MAX_WAIT_TIME {
anyhow::bail!("Copy from from {from} to {to} took longer than limit MAX_WAIT_TIME={}s. copy_pogress={:?}.",
MAX_WAIT_TIME.as_secs_f32(),
properties.blob.properties.copy_progress,
);
}
copy_status = status;
};
tokio::select! {
res = op => res,
_ = cancel.cancelled() => Err(anyhow::Error::new(TimeoutOrCancel::Cancel)),
_ = timeout => {
let e = anyhow::Error::new(TimeoutOrCancel::Timeout);
let e = e.context(format!("Timeout, last status: {copy_status:?}"));
Err(e)
},
}
}

View File

@@ -0,0 +1,181 @@
/// Reasons for downloads or listings to fail.
#[derive(Debug)]
pub enum DownloadError {
/// Validation or other error happened due to user input.
BadInput(anyhow::Error),
/// The file was not found in the remote storage.
NotFound,
/// A cancellation token aborted the download, typically during
/// tenant detach or process shutdown.
Cancelled,
/// A timeout happened while executing the request. Possible reasons:
/// - stuck tcp connection
///
/// Concurrency control is not timed within timeout.
Timeout,
/// The file was found in the remote storage, but the download failed.
Other(anyhow::Error),
}
impl std::fmt::Display for DownloadError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DownloadError::BadInput(e) => {
write!(f, "Failed to download a remote file due to user input: {e}")
}
DownloadError::NotFound => write!(f, "No file found for the remote object id given"),
DownloadError::Cancelled => write!(f, "Cancelled, shutting down"),
DownloadError::Timeout => write!(f, "timeout"),
DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"),
}
}
}
impl std::error::Error for DownloadError {}
impl DownloadError {
/// Returns true if the error should not be retried with backoff
pub fn is_permanent(&self) -> bool {
use DownloadError::*;
match self {
BadInput(_) | NotFound | Cancelled => true,
Timeout | Other(_) => false,
}
}
}
#[derive(Debug)]
pub enum TimeTravelError {
/// Validation or other error happened due to user input.
BadInput(anyhow::Error),
/// The used remote storage does not have time travel recovery implemented
Unimplemented,
/// The number of versions/deletion markers is above our limit.
TooManyVersions,
/// A cancellation token aborted the process, typically during
/// request closure or process shutdown.
Cancelled,
/// Other errors
Other(anyhow::Error),
}
impl std::fmt::Display for TimeTravelError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TimeTravelError::BadInput(e) => {
write!(
f,
"Failed to time travel recover a prefix due to user input: {e}"
)
}
TimeTravelError::Unimplemented => write!(
f,
"time travel recovery is not implemented for the current storage backend"
),
TimeTravelError::Cancelled => write!(f, "Cancelled, shutting down"),
TimeTravelError::TooManyVersions => {
write!(f, "Number of versions/delete markers above limit")
}
TimeTravelError::Other(e) => write!(f, "Failed to time travel recover a prefix: {e:?}"),
}
}
}
impl std::error::Error for TimeTravelError {}
/// Plain cancelled error.
///
/// By design this type does not not implement `std::error::Error` so it cannot be put as the root
/// cause of `std::io::Error` or `anyhow::Error`. It should never need to be exposed out of this
/// crate.
///
/// It exists to implement permit acquiring in `{Download,TimeTravel}Error` and `anyhow::Error` returning
/// operations and ensuring that those get converted to proper versions with just `?`.
#[derive(Debug)]
pub(crate) struct Cancelled;
impl From<Cancelled> for anyhow::Error {
fn from(_: Cancelled) -> Self {
anyhow::Error::new(TimeoutOrCancel::Cancel)
}
}
impl From<Cancelled> for TimeTravelError {
fn from(_: Cancelled) -> Self {
TimeTravelError::Cancelled
}
}
impl From<Cancelled> for TimeoutOrCancel {
fn from(_: Cancelled) -> Self {
TimeoutOrCancel::Cancel
}
}
impl From<Cancelled> for DownloadError {
fn from(_: Cancelled) -> Self {
DownloadError::Cancelled
}
}
/// This type is used at as the root cause for timeouts and cancellations with `anyhow::Error` returning
/// RemoteStorage methods.
///
/// For use with `utils::backoff::retry` and `anyhow::Error` returning operations there is
/// `TimeoutOrCancel::caused_by_cancel` method to query "proper form" errors.
#[derive(Debug)]
pub enum TimeoutOrCancel {
Timeout,
Cancel,
}
impl std::fmt::Display for TimeoutOrCancel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use TimeoutOrCancel::*;
match self {
Timeout => write!(f, "timeout"),
Cancel => write!(f, "cancel"),
}
}
}
impl std::error::Error for TimeoutOrCancel {}
impl TimeoutOrCancel {
pub fn caused(error: &anyhow::Error) -> Option<&Self> {
error.root_cause().downcast_ref()
}
/// Returns true if the error was caused by [`TimeoutOrCancel::Cancel`].
pub fn caused_by_cancel(error: &anyhow::Error) -> bool {
Self::caused(error).is_some_and(Self::is_cancel)
}
pub fn is_cancel(&self) -> bool {
matches!(self, TimeoutOrCancel::Cancel)
}
pub fn is_timeout(&self) -> bool {
matches!(self, TimeoutOrCancel::Timeout)
}
}
/// This conversion is used when [`crate::support::DownloadStream`] notices a cancellation or
/// timeout to wrap it in an `std::io::Error`.
impl From<TimeoutOrCancel> for std::io::Error {
fn from(value: TimeoutOrCancel) -> Self {
let e = DownloadError::from(value);
std::io::Error::other(e)
}
}
impl From<TimeoutOrCancel> for DownloadError {
fn from(value: TimeoutOrCancel) -> Self {
use TimeoutOrCancel::*;
match value {
Timeout => DownloadError::Timeout,
Cancel => DownloadError::Cancelled,
}
}
}

View File

@@ -10,6 +10,7 @@
#![deny(clippy::undocumented_unsafe_blocks)]
mod azure_blob;
mod error;
mod local_fs;
mod s3_bucket;
mod simulate_failures;
@@ -21,7 +22,7 @@ use std::{
num::{NonZeroU32, NonZeroUsize},
pin::Pin,
sync::Arc,
time::SystemTime,
time::{Duration, SystemTime},
};
use anyhow::{bail, Context};
@@ -41,6 +42,8 @@ pub use self::{
};
use s3_bucket::RequestKind;
pub use error::{DownloadError, TimeTravelError, TimeoutOrCancel};
/// Currently, sync happens with AWS S3, that has two limits on requests per second:
/// ~200 RPS for IAM services
/// <https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html>
@@ -158,9 +161,10 @@ pub trait RemoteStorage: Send + Sync + 'static {
async fn list_prefixes(
&self,
prefix: Option<&RemotePath>,
cancel: &CancellationToken,
) -> Result<Vec<RemotePath>, DownloadError> {
let result = self
.list(prefix, ListingMode::WithDelimiter, None)
.list(prefix, ListingMode::WithDelimiter, None, cancel)
.await?
.prefixes;
Ok(result)
@@ -182,9 +186,10 @@ pub trait RemoteStorage: Send + Sync + 'static {
&self,
prefix: Option<&RemotePath>,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> Result<Vec<RemotePath>, DownloadError> {
let result = self
.list(prefix, ListingMode::NoDelimiter, max_keys)
.list(prefix, ListingMode::NoDelimiter, max_keys, cancel)
.await?
.keys;
Ok(result)
@@ -195,9 +200,13 @@ pub trait RemoteStorage: Send + Sync + 'static {
prefix: Option<&RemotePath>,
_mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> Result<Listing, DownloadError>;
/// Streams the local file contents into remote into the remote storage entry.
///
/// If the operation fails because of timeout or cancellation, the root cause of the error will be
/// set to `TimeoutOrCancel`.
async fn upload(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
@@ -206,27 +215,61 @@ pub trait RemoteStorage: Send + Sync + 'static {
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
cancel: &CancellationToken,
) -> anyhow::Result<()>;
/// Streams the remote storage entry contents into the buffered writer given, returns the filled writer.
/// Streams the remote storage entry contents.
///
/// The returned download stream will obey initial timeout and cancellation signal by erroring
/// on whichever happens first. Only one of the reasons will fail the stream, which is usually
/// enough for `tokio::io::copy_buf` usage. If needed the error can be filtered out.
///
/// Returns the metadata, if any was stored with the file previously.
async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError>;
async fn download(
&self,
from: &RemotePath,
cancel: &CancellationToken,
) -> Result<Download, DownloadError>;
/// Streams a given byte range of the remote storage entry contents into the buffered writer given, returns the filled writer.
/// Streams a given byte range of the remote storage entry contents.
///
/// The returned download stream will obey initial timeout and cancellation signal by erroring
/// on whichever happens first. Only one of the reasons will fail the stream, which is usually
/// enough for `tokio::io::copy_buf` usage. If needed the error can be filtered out.
///
/// Returns the metadata, if any was stored with the file previously.
async fn download_byte_range(
&self,
from: &RemotePath,
start_inclusive: u64,
end_exclusive: Option<u64>,
cancel: &CancellationToken,
) -> Result<Download, DownloadError>;
async fn delete(&self, path: &RemotePath) -> anyhow::Result<()>;
/// Delete a single path from remote storage.
///
/// If the operation fails because of timeout or cancellation, the root cause of the error will be
/// set to `TimeoutOrCancel`. In such situation it is unknown if the deletion went through.
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()>;
async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()>;
/// Delete a multiple paths from remote storage.
///
/// If the operation fails because of timeout or cancellation, the root cause of the error will be
/// set to `TimeoutOrCancel`. In such situation it is unknown which deletions, if any, went
/// through.
async fn delete_objects<'a>(
&self,
paths: &'a [RemotePath],
cancel: &CancellationToken,
) -> anyhow::Result<()>;
/// Copy a remote object inside a bucket from one path to another.
async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()>;
async fn copy(
&self,
from: &RemotePath,
to: &RemotePath,
cancel: &CancellationToken,
) -> anyhow::Result<()>;
/// Resets the content of everything with the given prefix to the given state
async fn time_travel_recover(
@@ -238,7 +281,13 @@ pub trait RemoteStorage: Send + Sync + 'static {
) -> Result<(), TimeTravelError>;
}
pub type DownloadStream = Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Unpin + Send + Sync>>;
/// DownloadStream is sensitive to the timeout and cancellation used with the original
/// [`RemoteStorage::download`] request. The type yields `std::io::Result<Bytes>` to be compatible
/// with `tokio::io::copy_buf`.
// This has 'static because safekeepers do not use cancellation tokens (yet)
pub type DownloadStream =
Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static>>;
pub struct Download {
pub download_stream: DownloadStream,
/// The last time the file was modified (`last-modified` HTTP header)
@@ -257,86 +306,6 @@ impl Debug for Download {
}
}
#[derive(Debug)]
pub enum DownloadError {
/// Validation or other error happened due to user input.
BadInput(anyhow::Error),
/// The file was not found in the remote storage.
NotFound,
/// A cancellation token aborted the download, typically during
/// tenant detach or process shutdown.
Cancelled,
/// The file was found in the remote storage, but the download failed.
Other(anyhow::Error),
}
impl std::fmt::Display for DownloadError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DownloadError::BadInput(e) => {
write!(f, "Failed to download a remote file due to user input: {e}")
}
DownloadError::Cancelled => write!(f, "Cancelled, shutting down"),
DownloadError::NotFound => write!(f, "No file found for the remote object id given"),
DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"),
}
}
}
impl std::error::Error for DownloadError {}
impl DownloadError {
/// Returns true if the error should not be retried with backoff
pub fn is_permanent(&self) -> bool {
use DownloadError::*;
match self {
BadInput(_) => true,
NotFound => true,
Cancelled => true,
Other(_) => false,
}
}
}
#[derive(Debug)]
pub enum TimeTravelError {
/// Validation or other error happened due to user input.
BadInput(anyhow::Error),
/// The used remote storage does not have time travel recovery implemented
Unimplemented,
/// The number of versions/deletion markers is above our limit.
TooManyVersions,
/// A cancellation token aborted the process, typically during
/// request closure or process shutdown.
Cancelled,
/// Other errors
Other(anyhow::Error),
}
impl std::fmt::Display for TimeTravelError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TimeTravelError::BadInput(e) => {
write!(
f,
"Failed to time travel recover a prefix due to user input: {e}"
)
}
TimeTravelError::Unimplemented => write!(
f,
"time travel recovery is not implemented for the current storage backend"
),
TimeTravelError::Cancelled => write!(f, "Cancelled, shutting down"),
TimeTravelError::TooManyVersions => {
write!(f, "Number of versions/delete markers above limit")
}
TimeTravelError::Other(e) => write!(f, "Failed to time travel recover a prefix: {e:?}"),
}
}
}
impl std::error::Error for TimeTravelError {}
/// Every storage, currently supported.
/// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
#[derive(Clone)]
@@ -354,12 +323,13 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> anyhow::Result<Listing, DownloadError> {
match self {
Self::LocalFs(s) => s.list(prefix, mode, max_keys).await,
Self::AwsS3(s) => s.list(prefix, mode, max_keys).await,
Self::AzureBlob(s) => s.list(prefix, mode, max_keys).await,
Self::Unreliable(s) => s.list(prefix, mode, max_keys).await,
Self::LocalFs(s) => s.list(prefix, mode, max_keys, cancel).await,
Self::AwsS3(s) => s.list(prefix, mode, max_keys, cancel).await,
Self::AzureBlob(s) => s.list(prefix, mode, max_keys, cancel).await,
Self::Unreliable(s) => s.list(prefix, mode, max_keys, cancel).await,
}
}
@@ -372,12 +342,13 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
&self,
folder: Option<&RemotePath>,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> Result<Vec<RemotePath>, DownloadError> {
match self {
Self::LocalFs(s) => s.list_files(folder, max_keys).await,
Self::AwsS3(s) => s.list_files(folder, max_keys).await,
Self::AzureBlob(s) => s.list_files(folder, max_keys).await,
Self::Unreliable(s) => s.list_files(folder, max_keys).await,
Self::LocalFs(s) => s.list_files(folder, max_keys, cancel).await,
Self::AwsS3(s) => s.list_files(folder, max_keys, cancel).await,
Self::AzureBlob(s) => s.list_files(folder, max_keys, cancel).await,
Self::Unreliable(s) => s.list_files(folder, max_keys, cancel).await,
}
}
@@ -387,36 +358,43 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
pub async fn list_prefixes(
&self,
prefix: Option<&RemotePath>,
cancel: &CancellationToken,
) -> Result<Vec<RemotePath>, DownloadError> {
match self {
Self::LocalFs(s) => s.list_prefixes(prefix).await,
Self::AwsS3(s) => s.list_prefixes(prefix).await,
Self::AzureBlob(s) => s.list_prefixes(prefix).await,
Self::Unreliable(s) => s.list_prefixes(prefix).await,
Self::LocalFs(s) => s.list_prefixes(prefix, cancel).await,
Self::AwsS3(s) => s.list_prefixes(prefix, cancel).await,
Self::AzureBlob(s) => s.list_prefixes(prefix, cancel).await,
Self::Unreliable(s) => s.list_prefixes(prefix, cancel).await,
}
}
/// See [`RemoteStorage::upload`]
pub async fn upload(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
match self {
Self::LocalFs(s) => s.upload(from, data_size_bytes, to, metadata).await,
Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata).await,
Self::AzureBlob(s) => s.upload(from, data_size_bytes, to, metadata).await,
Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata).await,
Self::LocalFs(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
Self::AzureBlob(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
}
}
pub async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
pub async fn download(
&self,
from: &RemotePath,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
match self {
Self::LocalFs(s) => s.download(from).await,
Self::AwsS3(s) => s.download(from).await,
Self::AzureBlob(s) => s.download(from).await,
Self::Unreliable(s) => s.download(from).await,
Self::LocalFs(s) => s.download(from, cancel).await,
Self::AwsS3(s) => s.download(from, cancel).await,
Self::AzureBlob(s) => s.download(from, cancel).await,
Self::Unreliable(s) => s.download(from, cancel).await,
}
}
@@ -425,54 +403,72 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
from: &RemotePath,
start_inclusive: u64,
end_exclusive: Option<u64>,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
match self {
Self::LocalFs(s) => {
s.download_byte_range(from, start_inclusive, end_exclusive)
s.download_byte_range(from, start_inclusive, end_exclusive, cancel)
.await
}
Self::AwsS3(s) => {
s.download_byte_range(from, start_inclusive, end_exclusive)
s.download_byte_range(from, start_inclusive, end_exclusive, cancel)
.await
}
Self::AzureBlob(s) => {
s.download_byte_range(from, start_inclusive, end_exclusive)
s.download_byte_range(from, start_inclusive, end_exclusive, cancel)
.await
}
Self::Unreliable(s) => {
s.download_byte_range(from, start_inclusive, end_exclusive)
s.download_byte_range(from, start_inclusive, end_exclusive, cancel)
.await
}
}
}
pub async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
/// See [`RemoteStorage::delete`]
pub async fn delete(
&self,
path: &RemotePath,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
match self {
Self::LocalFs(s) => s.delete(path).await,
Self::AwsS3(s) => s.delete(path).await,
Self::AzureBlob(s) => s.delete(path).await,
Self::Unreliable(s) => s.delete(path).await,
Self::LocalFs(s) => s.delete(path, cancel).await,
Self::AwsS3(s) => s.delete(path, cancel).await,
Self::AzureBlob(s) => s.delete(path, cancel).await,
Self::Unreliable(s) => s.delete(path, cancel).await,
}
}
pub async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
/// See [`RemoteStorage::delete_objects`]
pub async fn delete_objects(
&self,
paths: &[RemotePath],
cancel: &CancellationToken,
) -> anyhow::Result<()> {
match self {
Self::LocalFs(s) => s.delete_objects(paths).await,
Self::AwsS3(s) => s.delete_objects(paths).await,
Self::AzureBlob(s) => s.delete_objects(paths).await,
Self::Unreliable(s) => s.delete_objects(paths).await,
Self::LocalFs(s) => s.delete_objects(paths, cancel).await,
Self::AwsS3(s) => s.delete_objects(paths, cancel).await,
Self::AzureBlob(s) => s.delete_objects(paths, cancel).await,
Self::Unreliable(s) => s.delete_objects(paths, cancel).await,
}
}
pub async fn copy_object(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
/// See [`RemoteStorage::copy`]
pub async fn copy_object(
&self,
from: &RemotePath,
to: &RemotePath,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
match self {
Self::LocalFs(s) => s.copy(from, to).await,
Self::AwsS3(s) => s.copy(from, to).await,
Self::AzureBlob(s) => s.copy(from, to).await,
Self::Unreliable(s) => s.copy(from, to).await,
Self::LocalFs(s) => s.copy(from, to, cancel).await,
Self::AwsS3(s) => s.copy(from, to, cancel).await,
Self::AzureBlob(s) => s.copy(from, to, cancel).await,
Self::Unreliable(s) => s.copy(from, to, cancel).await,
}
}
/// See [`RemoteStorage::time_travel_recover`].
pub async fn time_travel_recover(
&self,
prefix: Option<&RemotePath>,
@@ -503,10 +499,11 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
impl GenericRemoteStorage {
pub fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
let timeout = storage_config.timeout;
Ok(match &storage_config.storage {
RemoteStorageKind::LocalFs(root) => {
info!("Using fs root '{root}' as a remote storage");
Self::LocalFs(LocalFs::new(root.clone())?)
RemoteStorageKind::LocalFs(path) => {
info!("Using fs root '{path}' as a remote storage");
Self::LocalFs(LocalFs::new(path.clone(), timeout)?)
}
RemoteStorageKind::AwsS3(s3_config) => {
// The profile and access key id are only printed here for debugging purposes,
@@ -516,12 +513,12 @@ impl GenericRemoteStorage {
std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "<none>".into());
info!("Using s3 bucket '{}' in region '{}' as a remote storage, prefix in bucket: '{:?}', bucket endpoint: '{:?}', profile: {profile}, access_key_id: {access_key_id}",
s3_config.bucket_name, s3_config.bucket_region, s3_config.prefix_in_bucket, s3_config.endpoint);
Self::AwsS3(Arc::new(S3Bucket::new(s3_config)?))
Self::AwsS3(Arc::new(S3Bucket::new(s3_config, timeout)?))
}
RemoteStorageKind::AzureContainer(azure_config) => {
info!("Using azure container '{}' in region '{}' as a remote storage, prefix in container: '{:?}'",
azure_config.container_name, azure_config.container_region, azure_config.prefix_in_container);
Self::AzureBlob(Arc::new(AzureBlobStorage::new(azure_config)?))
Self::AzureBlob(Arc::new(AzureBlobStorage::new(azure_config, timeout)?))
}
})
}
@@ -530,18 +527,15 @@ impl GenericRemoteStorage {
Self::Unreliable(Arc::new(UnreliableWrapper::new(s, fail_first)))
}
/// Takes storage object contents and its size and uploads to remote storage,
/// mapping `from_path` to the corresponding remote object id in the storage.
///
/// The storage object does not have to be present on the `from_path`,
/// this path is used for the remote object id conversion only.
/// See [`RemoteStorage::upload`], which this method calls with `None` as metadata.
pub async fn upload_storage_object(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
from_size_bytes: usize,
to: &RemotePath,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
self.upload(from, from_size_bytes, to, None)
self.upload(from, from_size_bytes, to, None, cancel)
.await
.with_context(|| {
format!("Failed to upload data of length {from_size_bytes} to storage path {to:?}")
@@ -554,10 +548,11 @@ impl GenericRemoteStorage {
&self,
byte_range: Option<(u64, Option<u64>)>,
from: &RemotePath,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
match byte_range {
Some((start, end)) => self.download_byte_range(from, start, end).await,
None => self.download(from).await,
Some((start, end)) => self.download_byte_range(from, start, end, cancel).await,
None => self.download(from, cancel).await,
}
}
}
@@ -572,6 +567,9 @@ pub struct StorageMetadata(HashMap<String, String>);
pub struct RemoteStorageConfig {
/// The storage connection configuration.
pub storage: RemoteStorageKind,
/// A common timeout enforced for all requests after concurrency limiter permit has been
/// acquired.
pub timeout: Duration,
}
/// A kind of a remote storage to connect to, with its connection configuration.
@@ -656,6 +654,8 @@ impl Debug for AzureConfig {
}
impl RemoteStorageConfig {
pub const DEFAULT_TIMEOUT: Duration = std::time::Duration::from_secs(120);
pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<Option<RemoteStorageConfig>> {
let local_path = toml.get("local_path");
let bucket_name = toml.get("bucket_name");
@@ -685,6 +685,27 @@ impl RemoteStorageConfig {
.map(|endpoint| parse_toml_string("endpoint", endpoint))
.transpose()?;
let timeout = toml
.get("timeout")
.map(|timeout| {
timeout
.as_str()
.ok_or_else(|| anyhow::Error::msg("timeout was not a string"))
})
.transpose()
.and_then(|timeout| {
timeout
.map(humantime::parse_duration)
.transpose()
.map_err(anyhow::Error::new)
})
.context("parse timeout")?
.unwrap_or(Self::DEFAULT_TIMEOUT);
if timeout < Duration::from_secs(1) {
bail!("timeout was specified as {timeout:?} which is too low");
}
let storage = match (
local_path,
bucket_name,
@@ -746,7 +767,7 @@ impl RemoteStorageConfig {
}
};
Ok(Some(RemoteStorageConfig { storage }))
Ok(Some(RemoteStorageConfig { storage, timeout }))
}
}
@@ -842,4 +863,24 @@ mod tests {
let err = RemotePath::new(Utf8Path::new("/")).expect_err("Should fail on absolute paths");
assert_eq!(err.to_string(), "Path \"/\" is not relative");
}
#[test]
fn parse_localfs_config_with_timeout() {
let input = "local_path = '.'
timeout = '5s'";
let toml = input.parse::<toml_edit::Document>().unwrap();
let config = RemoteStorageConfig::from_toml(toml.as_item())
.unwrap()
.expect("it exists");
assert_eq!(
config,
RemoteStorageConfig {
storage: RemoteStorageKind::LocalFs(Utf8PathBuf::from(".")),
timeout: Duration::from_secs(5)
}
);
}
}

View File

@@ -5,7 +5,12 @@
//! volume is mounted to the local FS.
use std::{
borrow::Cow, future::Future, io::ErrorKind, num::NonZeroU32, pin::Pin, time::SystemTime,
borrow::Cow,
future::Future,
io::ErrorKind,
num::NonZeroU32,
pin::Pin,
time::{Duration, SystemTime},
};
use anyhow::{bail, ensure, Context};
@@ -20,7 +25,9 @@ use tokio_util::{io::ReaderStream, sync::CancellationToken};
use tracing::*;
use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty};
use crate::{Download, DownloadError, Listing, ListingMode, RemotePath, TimeTravelError};
use crate::{
Download, DownloadError, Listing, ListingMode, RemotePath, TimeTravelError, TimeoutOrCancel,
};
use super::{RemoteStorage, StorageMetadata};
@@ -29,12 +36,13 @@ const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp";
#[derive(Debug, Clone)]
pub struct LocalFs {
storage_root: Utf8PathBuf,
timeout: Duration,
}
impl LocalFs {
/// Attempts to create local FS storage, along with its root directory.
/// Storage root will be created (if does not exist) and transformed into an absolute path (if passed as relative).
pub fn new(mut storage_root: Utf8PathBuf) -> anyhow::Result<Self> {
pub fn new(mut storage_root: Utf8PathBuf, timeout: Duration) -> anyhow::Result<Self> {
if !storage_root.exists() {
std::fs::create_dir_all(&storage_root).with_context(|| {
format!("Failed to create all directories in the given root path {storage_root:?}")
@@ -46,7 +54,10 @@ impl LocalFs {
})?;
}
Ok(Self { storage_root })
Ok(Self {
storage_root,
timeout,
})
}
// mirrors S3Bucket::s3_object_to_relative_path
@@ -157,80 +168,14 @@ impl LocalFs {
Ok(files)
}
}
impl RemoteStorage for LocalFs {
async fn list(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
) -> Result<Listing, DownloadError> {
let mut result = Listing::default();
if let ListingMode::NoDelimiter = mode {
let keys = self
.list_recursive(prefix)
.await
.map_err(DownloadError::Other)?;
result.keys = keys
.into_iter()
.filter(|k| {
let path = k.with_base(&self.storage_root);
!path.is_dir()
})
.collect();
if let Some(max_keys) = max_keys {
result.keys.truncate(max_keys.get() as usize);
}
return Ok(result);
}
let path = match prefix {
Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)),
None => Cow::Borrowed(&self.storage_root),
};
let prefixes_to_filter = get_all_files(path.as_ref(), false)
.await
.map_err(DownloadError::Other)?;
// filter out empty directories to mirror s3 behavior.
for prefix in prefixes_to_filter {
if prefix.is_dir()
&& is_directory_empty(&prefix)
.await
.map_err(DownloadError::Other)?
{
continue;
}
let stripped = prefix
.strip_prefix(&self.storage_root)
.context("Failed to strip prefix")
.and_then(RemotePath::new)
.expect(
"We list files for storage root, hence should be able to remote the prefix",
);
if prefix.is_dir() {
result.prefixes.push(stripped);
} else {
result.keys.push(stripped);
}
}
Ok(result)
}
async fn upload(
async fn upload0(
&self,
data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let target_file_path = to.with_base(&self.storage_root);
create_target_directory(&target_file_path).await?;
@@ -265,9 +210,26 @@ impl RemoteStorage for LocalFs {
let mut buffer_to_read = data.take(from_size_bytes);
// alternatively we could just write the bytes to a file, but local_fs is a testing utility
let bytes_read = io::copy_buf(&mut buffer_to_read, &mut destination)
.await
.with_context(|| {
let copy = io::copy_buf(&mut buffer_to_read, &mut destination);
let bytes_read = tokio::select! {
biased;
_ = cancel.cancelled() => {
let file = destination.into_inner();
// wait for the inflight operation(s) to complete so that there could be a next
// attempt right away and our writes are not directed to their file.
file.into_std().await;
// TODO: leave the temp or not? leaving is probably less racy. enabled truncate at
// least.
fs::remove_file(temp_file_path).await.context("remove temp_file_path after cancellation or timeout")?;
return Err(TimeoutOrCancel::Cancel.into());
}
read = copy => read,
};
let bytes_read =
bytes_read.with_context(|| {
format!(
"Failed to upload file (write temp) to the local storage at '{temp_file_path}'",
)
@@ -299,6 +261,9 @@ impl RemoteStorage for LocalFs {
})?;
if let Some(storage_metadata) = metadata {
// FIXME: we must not be using metadata much, since this would forget the old metadata
// for new writes? or perhaps metadata is sticky; could consider removing if it's never
// used.
let storage_metadata_path = storage_metadata_path(&target_file_path);
fs::write(
&storage_metadata_path,
@@ -315,8 +280,131 @@ impl RemoteStorage for LocalFs {
Ok(())
}
}
async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
impl RemoteStorage for LocalFs {
async fn list(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> Result<Listing, DownloadError> {
let op = async {
let mut result = Listing::default();
if let ListingMode::NoDelimiter = mode {
let keys = self
.list_recursive(prefix)
.await
.map_err(DownloadError::Other)?;
result.keys = keys
.into_iter()
.filter(|k| {
let path = k.with_base(&self.storage_root);
!path.is_dir()
})
.collect();
if let Some(max_keys) = max_keys {
result.keys.truncate(max_keys.get() as usize);
}
return Ok(result);
}
let path = match prefix {
Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)),
None => Cow::Borrowed(&self.storage_root),
};
let prefixes_to_filter = get_all_files(path.as_ref(), false)
.await
.map_err(DownloadError::Other)?;
// filter out empty directories to mirror s3 behavior.
for prefix in prefixes_to_filter {
if prefix.is_dir()
&& is_directory_empty(&prefix)
.await
.map_err(DownloadError::Other)?
{
continue;
}
let stripped = prefix
.strip_prefix(&self.storage_root)
.context("Failed to strip prefix")
.and_then(RemotePath::new)
.expect(
"We list files for storage root, hence should be able to remote the prefix",
);
if prefix.is_dir() {
result.prefixes.push(stripped);
} else {
result.keys.push(stripped);
}
}
Ok(result)
};
let timeout = async {
tokio::time::sleep(self.timeout).await;
Err(DownloadError::Timeout)
};
let cancelled = async {
cancel.cancelled().await;
Err(DownloadError::Cancelled)
};
tokio::select! {
res = op => res,
res = timeout => res,
res = cancelled => res,
}
}
async fn upload(
&self,
data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let cancel = cancel.child_token();
let op = self.upload0(data, data_size_bytes, to, metadata, &cancel);
let mut op = std::pin::pin!(op);
// race the upload0 to the timeout; if it goes over, do a graceful shutdown
let (res, timeout) = tokio::select! {
res = &mut op => (res, false),
_ = tokio::time::sleep(self.timeout) => {
cancel.cancel();
(op.await, true)
}
};
match res {
Err(e) if timeout && TimeoutOrCancel::caused_by_cancel(&e) => {
// we caused this cancel (or they happened simultaneously) -- swap it out to
// Timeout
Err(TimeoutOrCancel::Timeout.into())
}
res => res,
}
}
async fn download(
&self,
from: &RemotePath,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
let target_path = from.with_base(&self.storage_root);
if file_exists(&target_path).map_err(DownloadError::BadInput)? {
let source = ReaderStream::new(
@@ -334,6 +422,10 @@ impl RemoteStorage for LocalFs {
.read_storage_metadata(&target_path)
.await
.map_err(DownloadError::Other)?;
let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
let source = crate::support::DownloadStream::new(cancel_or_timeout, source);
Ok(Download {
metadata,
last_modified: None,
@@ -350,6 +442,7 @@ impl RemoteStorage for LocalFs {
from: &RemotePath,
start_inclusive: u64,
end_exclusive: Option<u64>,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
if let Some(end_exclusive) = end_exclusive {
if end_exclusive <= start_inclusive {
@@ -391,6 +484,9 @@ impl RemoteStorage for LocalFs {
let source = source.take(end_exclusive.unwrap_or(len) - start_inclusive);
let source = ReaderStream::new(source);
let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
let source = crate::support::DownloadStream::new(cancel_or_timeout, source);
Ok(Download {
metadata,
last_modified: None,
@@ -402,7 +498,7 @@ impl RemoteStorage for LocalFs {
}
}
async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
async fn delete(&self, path: &RemotePath, _cancel: &CancellationToken) -> anyhow::Result<()> {
let file_path = path.with_base(&self.storage_root);
match fs::remove_file(&file_path).await {
Ok(()) => Ok(()),
@@ -414,14 +510,23 @@ impl RemoteStorage for LocalFs {
}
}
async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
async fn delete_objects<'a>(
&self,
paths: &'a [RemotePath],
cancel: &CancellationToken,
) -> anyhow::Result<()> {
for path in paths {
self.delete(path).await?
self.delete(path, cancel).await?
}
Ok(())
}
async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
async fn copy(
&self,
from: &RemotePath,
to: &RemotePath,
_cancel: &CancellationToken,
) -> anyhow::Result<()> {
let from_path = from.with_base(&self.storage_root);
let to_path = to.with_base(&self.storage_root);
create_target_directory(&to_path).await?;
@@ -528,8 +633,9 @@ mod fs_tests {
remote_storage_path: &RemotePath,
expected_metadata: Option<&StorageMetadata>,
) -> anyhow::Result<String> {
let cancel = CancellationToken::new();
let download = storage
.download(remote_storage_path)
.download(remote_storage_path, &cancel)
.await
.map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
ensure!(
@@ -544,16 +650,16 @@ mod fs_tests {
#[tokio::test]
async fn upload_file() -> anyhow::Result<()> {
let storage = create_storage()?;
let (storage, cancel) = create_storage()?;
let target_path_1 = upload_dummy_file(&storage, "upload_1", None).await?;
let target_path_1 = upload_dummy_file(&storage, "upload_1", None, &cancel).await?;
assert_eq!(
storage.list_all().await?,
vec![target_path_1.clone()],
"Should list a single file after first upload"
);
let target_path_2 = upload_dummy_file(&storage, "upload_2", None).await?;
let target_path_2 = upload_dummy_file(&storage, "upload_2", None, &cancel).await?;
assert_eq!(
list_files_sorted(&storage).await?,
vec![target_path_1.clone(), target_path_2.clone()],
@@ -565,7 +671,7 @@ mod fs_tests {
#[tokio::test]
async fn upload_file_negatives() -> anyhow::Result<()> {
let storage = create_storage()?;
let (storage, cancel) = create_storage()?;
let id = RemotePath::new(Utf8Path::new("dummy"))?;
let content = Bytes::from_static(b"12345");
@@ -574,34 +680,34 @@ mod fs_tests {
// Check that you get an error if the size parameter doesn't match the actual
// size of the stream.
storage
.upload(content(), 0, &id, None)
.upload(content(), 0, &id, None, &cancel)
.await
.expect_err("upload with zero size succeeded");
storage
.upload(content(), 4, &id, None)
.upload(content(), 4, &id, None, &cancel)
.await
.expect_err("upload with too short size succeeded");
storage
.upload(content(), 6, &id, None)
.upload(content(), 6, &id, None, &cancel)
.await
.expect_err("upload with too large size succeeded");
// Correct size is 5, this should succeed.
storage.upload(content(), 5, &id, None).await?;
storage.upload(content(), 5, &id, None, &cancel).await?;
Ok(())
}
fn create_storage() -> anyhow::Result<LocalFs> {
fn create_storage() -> anyhow::Result<(LocalFs, CancellationToken)> {
let storage_root = tempdir()?.path().to_path_buf();
LocalFs::new(storage_root)
LocalFs::new(storage_root, Duration::from_secs(120)).map(|s| (s, CancellationToken::new()))
}
#[tokio::test]
async fn download_file() -> anyhow::Result<()> {
let storage = create_storage()?;
let (storage, cancel) = create_storage()?;
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
let contents = read_and_check_metadata(&storage, &upload_target, None).await?;
assert_eq!(
@@ -611,7 +717,7 @@ mod fs_tests {
);
let non_existing_path = "somewhere/else";
match storage.download(&RemotePath::new(Utf8Path::new(non_existing_path))?).await {
match storage.download(&RemotePath::new(Utf8Path::new(non_existing_path))?, &cancel).await {
Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"),
}
@@ -620,9 +726,9 @@ mod fs_tests {
#[tokio::test]
async fn download_file_range_positive() -> anyhow::Result<()> {
let storage = create_storage()?;
let (storage, cancel) = create_storage()?;
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
let full_range_download_contents =
read_and_check_metadata(&storage, &upload_target, None).await?;
@@ -636,7 +742,12 @@ mod fs_tests {
let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
let first_part_download = storage
.download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
.download_byte_range(
&upload_target,
0,
Some(first_part_local.len() as u64),
&cancel,
)
.await?;
assert!(
first_part_download.metadata.is_none(),
@@ -654,6 +765,7 @@ mod fs_tests {
&upload_target,
first_part_local.len() as u64,
Some((first_part_local.len() + second_part_local.len()) as u64),
&cancel,
)
.await?;
assert!(
@@ -668,7 +780,7 @@ mod fs_tests {
);
let suffix_bytes = storage
.download_byte_range(&upload_target, 13, None)
.download_byte_range(&upload_target, 13, None, &cancel)
.await?
.download_stream;
let suffix_bytes = aggregate(suffix_bytes).await?;
@@ -676,7 +788,7 @@ mod fs_tests {
assert_eq!(upload_name, suffix);
let all_bytes = storage
.download_byte_range(&upload_target, 0, None)
.download_byte_range(&upload_target, 0, None, &cancel)
.await?
.download_stream;
let all_bytes = aggregate(all_bytes).await?;
@@ -688,9 +800,9 @@ mod fs_tests {
#[tokio::test]
async fn download_file_range_negative() -> anyhow::Result<()> {
let storage = create_storage()?;
let (storage, cancel) = create_storage()?;
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
let start = 1_000_000_000;
let end = start + 1;
@@ -699,6 +811,7 @@ mod fs_tests {
&upload_target,
start,
Some(end), // exclusive end
&cancel,
)
.await
{
@@ -715,7 +828,7 @@ mod fs_tests {
let end = 234;
assert!(start > end, "Should test an incorrect range");
match storage
.download_byte_range(&upload_target, start, Some(end))
.download_byte_range(&upload_target, start, Some(end), &cancel)
.await
{
Ok(_) => panic!("Should not allow downloading wrong ranges"),
@@ -732,15 +845,15 @@ mod fs_tests {
#[tokio::test]
async fn delete_file() -> anyhow::Result<()> {
let storage = create_storage()?;
let (storage, cancel) = create_storage()?;
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
storage.delete(&upload_target).await?;
storage.delete(&upload_target, &cancel).await?;
assert!(storage.list_all().await?.is_empty());
storage
.delete(&upload_target)
.delete(&upload_target, &cancel)
.await
.expect("Should allow deleting non-existing storage files");
@@ -749,14 +862,14 @@ mod fs_tests {
#[tokio::test]
async fn file_with_metadata() -> anyhow::Result<()> {
let storage = create_storage()?;
let (storage, cancel) = create_storage()?;
let upload_name = "upload_1";
let metadata = StorageMetadata(HashMap::from([
("one".to_string(), "1".to_string()),
("two".to_string(), "2".to_string()),
]));
let upload_target =
upload_dummy_file(&storage, upload_name, Some(metadata.clone())).await?;
upload_dummy_file(&storage, upload_name, Some(metadata.clone()), &cancel).await?;
let full_range_download_contents =
read_and_check_metadata(&storage, &upload_target, Some(&metadata)).await?;
@@ -770,7 +883,12 @@ mod fs_tests {
let (first_part_local, _) = uploaded_bytes.split_at(3);
let partial_download_with_metadata = storage
.download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
.download_byte_range(
&upload_target,
0,
Some(first_part_local.len() as u64),
&cancel,
)
.await?;
let first_part_remote = aggregate(partial_download_with_metadata.download_stream).await?;
assert_eq!(
@@ -791,16 +909,20 @@ mod fs_tests {
#[tokio::test]
async fn list() -> anyhow::Result<()> {
// No delimiter: should recursively list everything
let storage = create_storage()?;
let child = upload_dummy_file(&storage, "grandparent/parent/child", None).await?;
let uncle = upload_dummy_file(&storage, "grandparent/uncle", None).await?;
let (storage, cancel) = create_storage()?;
let child = upload_dummy_file(&storage, "grandparent/parent/child", None, &cancel).await?;
let uncle = upload_dummy_file(&storage, "grandparent/uncle", None, &cancel).await?;
let listing = storage.list(None, ListingMode::NoDelimiter, None).await?;
let listing = storage
.list(None, ListingMode::NoDelimiter, None, &cancel)
.await?;
assert!(listing.prefixes.is_empty());
assert_eq!(listing.keys, [uncle.clone(), child.clone()].to_vec());
// Delimiter: should only go one deep
let listing = storage.list(None, ListingMode::WithDelimiter, None).await?;
let listing = storage
.list(None, ListingMode::WithDelimiter, None, &cancel)
.await?;
assert_eq!(
listing.prefixes,
@@ -814,6 +936,7 @@ mod fs_tests {
Some(&RemotePath::from_string("timelines/some_timeline/grandparent").unwrap()),
ListingMode::WithDelimiter,
None,
&cancel,
)
.await?;
assert_eq!(
@@ -826,10 +949,75 @@ mod fs_tests {
Ok(())
}
#[tokio::test]
async fn overwrite_shorter_file() -> anyhow::Result<()> {
let (storage, cancel) = create_storage()?;
let path = RemotePath::new("does/not/matter/file".into())?;
let body = Bytes::from_static(b"long file contents is long");
{
let len = body.len();
let body =
futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
storage.upload(body, len, &path, None, &cancel).await?;
}
let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?;
assert_eq!(body, read);
let shorter = Bytes::from_static(b"shorter body");
{
let len = shorter.len();
let body =
futures::stream::once(futures::future::ready(std::io::Result::Ok(shorter.clone())));
storage.upload(body, len, &path, None, &cancel).await?;
}
let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?;
assert_eq!(shorter, read);
Ok(())
}
#[tokio::test]
async fn cancelled_upload_can_later_be_retried() -> anyhow::Result<()> {
let (storage, cancel) = create_storage()?;
let path = RemotePath::new("does/not/matter/file".into())?;
let body = Bytes::from_static(b"long file contents is long");
{
let len = body.len();
let body =
futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
let cancel = cancel.child_token();
cancel.cancel();
let e = storage
.upload(body, len, &path, None, &cancel)
.await
.unwrap_err();
assert!(TimeoutOrCancel::caused_by_cancel(&e));
}
{
let len = body.len();
let body =
futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
storage.upload(body, len, &path, None, &cancel).await?;
}
let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?;
assert_eq!(body, read);
Ok(())
}
async fn upload_dummy_file(
storage: &LocalFs,
name: &str,
metadata: Option<StorageMetadata>,
cancel: &CancellationToken,
) -> anyhow::Result<RemotePath> {
let from_path = storage
.storage_root
@@ -851,7 +1039,9 @@ mod fs_tests {
let file = tokio_util::io::ReaderStream::new(file);
storage.upload(file, size, &relative_path, metadata).await?;
storage
.upload(file, size, &relative_path, metadata, cancel)
.await?;
Ok(relative_path)
}

View File

@@ -11,7 +11,7 @@ use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::SystemTime,
time::{Duration, SystemTime},
};
use anyhow::{anyhow, Context as _};
@@ -46,9 +46,9 @@ use utils::backoff;
use super::StorageMetadata;
use crate::{
support::PermitCarrying, ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode,
RemotePath, RemoteStorage, S3Config, TimeTravelError, MAX_KEYS_PER_DELETE,
REMOTE_STORAGE_PREFIX_SEPARATOR,
error::Cancelled, support::PermitCarrying, ConcurrencyLimiter, Download, DownloadError,
Listing, ListingMode, RemotePath, RemoteStorage, S3Config, TimeTravelError, TimeoutOrCancel,
MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
};
pub(super) mod metrics;
@@ -63,6 +63,8 @@ pub struct S3Bucket {
prefix_in_bucket: Option<String>,
max_keys_per_list_response: Option<i32>,
concurrency_limiter: ConcurrencyLimiter,
// Per-request timeout. Accessible for tests.
pub timeout: Duration,
}
struct GetObjectRequest {
@@ -72,7 +74,7 @@ struct GetObjectRequest {
}
impl S3Bucket {
/// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
pub fn new(aws_config: &S3Config) -> anyhow::Result<Self> {
pub fn new(aws_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
tracing::debug!(
"Creating s3 remote storage for S3 bucket {}",
aws_config.bucket_name
@@ -152,6 +154,7 @@ impl S3Bucket {
max_keys_per_list_response: aws_config.max_keys_per_list_response,
prefix_in_bucket,
concurrency_limiter: ConcurrencyLimiter::new(aws_config.concurrency_limit.get()),
timeout,
})
}
@@ -185,40 +188,55 @@ impl S3Bucket {
}
}
async fn permit(&self, kind: RequestKind) -> tokio::sync::SemaphorePermit<'_> {
async fn permit(
&self,
kind: RequestKind,
cancel: &CancellationToken,
) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
let started_at = start_counting_cancelled_wait(kind);
let permit = self
.concurrency_limiter
.acquire(kind)
.await
.expect("semaphore is never closed");
let acquire = self.concurrency_limiter.acquire(kind);
let permit = tokio::select! {
permit = acquire => permit.expect("semaphore is never closed"),
_ = cancel.cancelled() => return Err(Cancelled),
};
let started_at = ScopeGuard::into_inner(started_at);
metrics::BUCKET_METRICS
.wait_seconds
.observe_elapsed(kind, started_at);
permit
Ok(permit)
}
async fn owned_permit(&self, kind: RequestKind) -> tokio::sync::OwnedSemaphorePermit {
async fn owned_permit(
&self,
kind: RequestKind,
cancel: &CancellationToken,
) -> Result<tokio::sync::OwnedSemaphorePermit, Cancelled> {
let started_at = start_counting_cancelled_wait(kind);
let permit = self
.concurrency_limiter
.acquire_owned(kind)
.await
.expect("semaphore is never closed");
let acquire = self.concurrency_limiter.acquire_owned(kind);
let permit = tokio::select! {
permit = acquire => permit.expect("semaphore is never closed"),
_ = cancel.cancelled() => return Err(Cancelled),
};
let started_at = ScopeGuard::into_inner(started_at);
metrics::BUCKET_METRICS
.wait_seconds
.observe_elapsed(kind, started_at);
permit
Ok(permit)
}
async fn download_object(&self, request: GetObjectRequest) -> Result<Download, DownloadError> {
async fn download_object(
&self,
request: GetObjectRequest,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
let kind = RequestKind::Get;
let permit = self.owned_permit(kind).await;
let permit = self.owned_permit(kind, cancel).await?;
let started_at = start_measuring_requests(kind);
@@ -228,8 +246,13 @@ impl S3Bucket {
.bucket(request.bucket)
.key(request.key)
.set_range(request.range)
.send()
.await;
.send();
let get_object = tokio::select! {
res = get_object => res,
_ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout),
_ = cancel.cancelled() => return Err(DownloadError::Cancelled),
};
let started_at = ScopeGuard::into_inner(started_at);
@@ -259,6 +282,10 @@ impl S3Bucket {
}
};
// even if we would have no timeout left, continue anyways. the caller can decide to ignore
// the errors considering timeouts and cancellation.
let remaining = self.timeout.saturating_sub(started_at.elapsed());
let metadata = object_output.metadata().cloned().map(StorageMetadata);
let etag = object_output.e_tag;
let last_modified = object_output.last_modified.and_then(|t| t.try_into().ok());
@@ -268,6 +295,9 @@ impl S3Bucket {
let body = PermitCarrying::new(permit, body);
let body = TimedDownload::new(started_at, body);
let cancel_or_timeout = crate::support::cancel_or_timeout(remaining, cancel.clone());
let body = crate::support::DownloadStream::new(cancel_or_timeout, body);
Ok(Download {
metadata,
etag,
@@ -278,33 +308,44 @@ impl S3Bucket {
async fn delete_oids(
&self,
kind: RequestKind,
_permit: &tokio::sync::SemaphorePermit<'_>,
delete_objects: &[ObjectIdentifier],
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let kind = RequestKind::Delete;
let mut cancel = std::pin::pin!(cancel.cancelled());
for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
let started_at = start_measuring_requests(kind);
let resp = self
let req = self
.client
.delete_objects()
.bucket(self.bucket_name.clone())
.delete(
Delete::builder()
.set_objects(Some(chunk.to_vec()))
.build()?,
.build()
.context("build request")?,
)
.send()
.await;
.send();
let resp = tokio::select! {
resp = req => resp,
_ = tokio::time::sleep(self.timeout) => return Err(TimeoutOrCancel::Timeout.into()),
_ = &mut cancel => return Err(TimeoutOrCancel::Cancel.into()),
};
let started_at = ScopeGuard::into_inner(started_at);
metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, &resp, started_at);
let resp = resp?;
let resp = resp.context("request deletion")?;
metrics::BUCKET_METRICS
.deleted_objects_total
.inc_by(chunk.len() as u64);
if let Some(errors) = resp.errors {
// Log a bounded number of the errors within the response:
// these requests can carry 1000 keys so logging each one
@@ -320,9 +361,10 @@ impl S3Bucket {
);
}
return Err(anyhow::format_err!(
"Failed to delete {} objects",
errors.len()
return Err(anyhow::anyhow!(
"Failed to delete {}/{} objects",
errors.len(),
chunk.len(),
));
}
}
@@ -410,6 +452,7 @@ impl RemoteStorage for S3Bucket {
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> Result<Listing, DownloadError> {
let kind = RequestKind::List;
// s3 sdk wants i32
@@ -431,10 +474,11 @@ impl RemoteStorage for S3Bucket {
p
});
let _permit = self.permit(kind, cancel).await?;
let mut continuation_token = None;
loop {
let _guard = self.permit(kind).await;
let started_at = start_measuring_requests(kind);
// min of two Options, returning Some if one is value and another is
@@ -456,9 +500,15 @@ impl RemoteStorage for S3Bucket {
request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
}
let response = request
.send()
.await
let request = request.send();
let response = tokio::select! {
res = request => res,
_ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout),
_ = cancel.cancelled() => return Err(DownloadError::Cancelled),
};
let response = response
.context("Failed to list S3 prefixes")
.map_err(DownloadError::Other);
@@ -511,16 +561,17 @@ impl RemoteStorage for S3Bucket {
from_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let kind = RequestKind::Put;
let _guard = self.permit(kind).await;
let _permit = self.permit(kind, cancel).await?;
let started_at = start_measuring_requests(kind);
let body = Body::wrap_stream(from);
let bytes_stream = ByteStream::new(SdkBody::from_body_0_4(body));
let res = self
let upload = self
.client
.put_object()
.bucket(self.bucket_name.clone())
@@ -528,22 +579,40 @@ impl RemoteStorage for S3Bucket {
.set_metadata(metadata.map(|m| m.0))
.content_length(from_size_bytes.try_into()?)
.body(bytes_stream)
.send()
.await;
.send();
let started_at = ScopeGuard::into_inner(started_at);
metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, &res, started_at);
let upload = tokio::time::timeout(self.timeout, upload);
res?;
let res = tokio::select! {
res = upload => res,
_ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
};
Ok(())
if let Ok(inner) = &res {
// do not incl. timeouts as errors in metrics but cancellations
let started_at = ScopeGuard::into_inner(started_at);
metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, inner, started_at);
}
match res {
Ok(Ok(_put)) => Ok(()),
Ok(Err(sdk)) => Err(sdk.into()),
Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
}
}
async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
async fn copy(
&self,
from: &RemotePath,
to: &RemotePath,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let kind = RequestKind::Copy;
let _guard = self.permit(kind).await;
let _permit = self.permit(kind, cancel).await?;
let timeout = tokio::time::sleep(self.timeout);
let started_at = start_measuring_requests(kind);
@@ -554,14 +623,19 @@ impl RemoteStorage for S3Bucket {
self.relative_path_to_s3_object(from)
);
let res = self
let op = self
.client
.copy_object()
.bucket(self.bucket_name.clone())
.key(self.relative_path_to_s3_object(to))
.copy_source(copy_source)
.send()
.await;
.send();
let res = tokio::select! {
res = op => res,
_ = timeout => return Err(TimeoutOrCancel::Timeout.into()),
_ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
};
let started_at = ScopeGuard::into_inner(started_at);
metrics::BUCKET_METRICS
@@ -573,14 +647,21 @@ impl RemoteStorage for S3Bucket {
Ok(())
}
async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
async fn download(
&self,
from: &RemotePath,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
// if prefix is not none then download file `prefix/from`
// if prefix is none then download file `from`
self.download_object(GetObjectRequest {
bucket: self.bucket_name.clone(),
key: self.relative_path_to_s3_object(from),
range: None,
})
self.download_object(
GetObjectRequest {
bucket: self.bucket_name.clone(),
key: self.relative_path_to_s3_object(from),
range: None,
},
cancel,
)
.await
}
@@ -589,6 +670,7 @@ impl RemoteStorage for S3Bucket {
from: &RemotePath,
start_inclusive: u64,
end_exclusive: Option<u64>,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
// S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
// and needs both ends to be exclusive
@@ -598,31 +680,39 @@ impl RemoteStorage for S3Bucket {
None => format!("bytes={start_inclusive}-"),
});
self.download_object(GetObjectRequest {
bucket: self.bucket_name.clone(),
key: self.relative_path_to_s3_object(from),
range,
})
self.download_object(
GetObjectRequest {
bucket: self.bucket_name.clone(),
key: self.relative_path_to_s3_object(from),
range,
},
cancel,
)
.await
}
async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
let kind = RequestKind::Delete;
let _guard = self.permit(kind).await;
async fn delete_objects<'a>(
&self,
paths: &'a [RemotePath],
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let kind = RequestKind::Delete;
let permit = self.permit(kind, cancel).await?;
let mut delete_objects = Vec::with_capacity(paths.len());
for path in paths {
let obj_id = ObjectIdentifier::builder()
.set_key(Some(self.relative_path_to_s3_object(path)))
.build()?;
.build()
.context("convert path to oid")?;
delete_objects.push(obj_id);
}
self.delete_oids(kind, &delete_objects).await
self.delete_oids(&permit, &delete_objects, cancel).await
}
async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
let paths = std::array::from_ref(path);
self.delete_objects(paths).await
self.delete_objects(paths, cancel).await
}
async fn time_travel_recover(
@@ -633,7 +723,7 @@ impl RemoteStorage for S3Bucket {
cancel: &CancellationToken,
) -> Result<(), TimeTravelError> {
let kind = RequestKind::TimeTravel;
let _guard = self.permit(kind).await;
let permit = self.permit(kind, cancel).await?;
let timestamp = DateTime::from(timestamp);
let done_if_after = DateTime::from(done_if_after);
@@ -647,7 +737,7 @@ impl RemoteStorage for S3Bucket {
let warn_threshold = 3;
let max_retries = 10;
let is_permanent = |_e: &_| false;
let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
let mut key_marker = None;
let mut version_id_marker = None;
@@ -656,15 +746,19 @@ impl RemoteStorage for S3Bucket {
loop {
let response = backoff::retry(
|| async {
self.client
let op = self
.client
.list_object_versions()
.bucket(self.bucket_name.clone())
.set_prefix(prefix.clone())
.set_key_marker(key_marker.clone())
.set_version_id_marker(version_id_marker.clone())
.send()
.await
.map_err(|e| TimeTravelError::Other(e.into()))
.send();
tokio::select! {
res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
_ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
}
},
is_permanent,
warn_threshold,
@@ -786,14 +880,18 @@ impl RemoteStorage for S3Bucket {
backoff::retry(
|| async {
self.client
let op = self
.client
.copy_object()
.bucket(self.bucket_name.clone())
.key(key)
.copy_source(&source_id)
.send()
.await
.map_err(|e| TimeTravelError::Other(e.into()))
.send();
tokio::select! {
res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
_ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
}
},
is_permanent,
warn_threshold,
@@ -824,10 +922,18 @@ impl RemoteStorage for S3Bucket {
let oid = ObjectIdentifier::builder()
.key(key.to_owned())
.build()
.map_err(|e| TimeTravelError::Other(anyhow::Error::new(e)))?;
self.delete_oids(kind, &[oid])
.map_err(|e| TimeTravelError::Other(e.into()))?;
self.delete_oids(&permit, &[oid], cancel)
.await
.map_err(TimeTravelError::Other)?;
.map_err(|e| {
// delete_oid0 will use TimeoutOrCancel
if TimeoutOrCancel::caused_by_cancel(&e) {
TimeTravelError::Cancelled
} else {
TimeTravelError::Other(e)
}
})?;
}
}
}
@@ -963,7 +1069,8 @@ mod tests {
concurrency_limit: NonZeroUsize::new(100).unwrap(),
max_keys_per_list_response: Some(5),
};
let storage = S3Bucket::new(&config).expect("remote storage init");
let storage =
S3Bucket::new(&config, std::time::Duration::ZERO).expect("remote storage init");
for (test_path_idx, test_path) in all_paths.iter().enumerate() {
let result = storage.relative_path_to_s3_object(test_path);
let expected = expected_outputs[prefix_idx][test_path_idx];

View File

@@ -90,11 +90,16 @@ impl UnreliableWrapper {
}
}
async fn delete_inner(&self, path: &RemotePath, attempt: bool) -> anyhow::Result<()> {
async fn delete_inner(
&self,
path: &RemotePath,
attempt: bool,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
if attempt {
self.attempt(RemoteOp::Delete(path.clone()))?;
}
self.inner.delete(path).await
self.inner.delete(path, cancel).await
}
}
@@ -105,20 +110,22 @@ impl RemoteStorage for UnreliableWrapper {
async fn list_prefixes(
&self,
prefix: Option<&RemotePath>,
cancel: &CancellationToken,
) -> Result<Vec<RemotePath>, DownloadError> {
self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
.map_err(DownloadError::Other)?;
self.inner.list_prefixes(prefix).await
self.inner.list_prefixes(prefix, cancel).await
}
async fn list_files(
&self,
folder: Option<&RemotePath>,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> Result<Vec<RemotePath>, DownloadError> {
self.attempt(RemoteOp::ListPrefixes(folder.cloned()))
.map_err(DownloadError::Other)?;
self.inner.list_files(folder, max_keys).await
self.inner.list_files(folder, max_keys, cancel).await
}
async fn list(
@@ -126,10 +133,11 @@ impl RemoteStorage for UnreliableWrapper {
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> Result<Listing, DownloadError> {
self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
.map_err(DownloadError::Other)?;
self.inner.list(prefix, mode, max_keys).await
self.inner.list(prefix, mode, max_keys, cancel).await
}
async fn upload(
@@ -140,15 +148,22 @@ impl RemoteStorage for UnreliableWrapper {
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
self.attempt(RemoteOp::Upload(to.clone()))?;
self.inner.upload(data, data_size_bytes, to, metadata).await
self.inner
.upload(data, data_size_bytes, to, metadata, cancel)
.await
}
async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
async fn download(
&self,
from: &RemotePath,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
self.attempt(RemoteOp::Download(from.clone()))
.map_err(DownloadError::Other)?;
self.inner.download(from).await
self.inner.download(from, cancel).await
}
async fn download_byte_range(
@@ -156,6 +171,7 @@ impl RemoteStorage for UnreliableWrapper {
from: &RemotePath,
start_inclusive: u64,
end_exclusive: Option<u64>,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
// Note: We treat any download_byte_range as an "attempt" of the same
// operation. We don't pay attention to the ranges. That's good enough
@@ -163,20 +179,24 @@ impl RemoteStorage for UnreliableWrapper {
self.attempt(RemoteOp::Download(from.clone()))
.map_err(DownloadError::Other)?;
self.inner
.download_byte_range(from, start_inclusive, end_exclusive)
.download_byte_range(from, start_inclusive, end_exclusive, cancel)
.await
}
async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
self.delete_inner(path, true).await
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
self.delete_inner(path, true, cancel).await
}
async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
async fn delete_objects<'a>(
&self,
paths: &'a [RemotePath],
cancel: &CancellationToken,
) -> anyhow::Result<()> {
self.attempt(RemoteOp::DeleteObjects(paths.to_vec()))?;
let mut error_counter = 0;
for path in paths {
// Dont record attempt because it was already recorded above
if (self.delete_inner(path, false).await).is_err() {
if (self.delete_inner(path, false, cancel).await).is_err() {
error_counter += 1;
}
}
@@ -189,11 +209,16 @@ impl RemoteStorage for UnreliableWrapper {
Ok(())
}
async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
async fn copy(
&self,
from: &RemotePath,
to: &RemotePath,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
// copy is equivalent to download + upload
self.attempt(RemoteOp::Download(from.clone()))?;
self.attempt(RemoteOp::Upload(to.clone()))?;
self.inner.copy_object(from, to).await
self.inner.copy_object(from, to, cancel).await
}
async fn time_travel_recover(

View File

@@ -1,9 +1,15 @@
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use bytes::Bytes;
use futures_util::Stream;
use tokio_util::sync::CancellationToken;
use crate::TimeoutOrCancel;
pin_project_lite::pin_project! {
/// An `AsyncRead` adapter which carries a permit for the lifetime of the value.
@@ -31,3 +37,133 @@ impl<S: Stream> Stream for PermitCarrying<S> {
self.inner.size_hint()
}
}
pin_project_lite::pin_project! {
pub(crate) struct DownloadStream<F, S> {
hit: bool,
#[pin]
cancellation: F,
#[pin]
inner: S,
}
}
impl<F, S> DownloadStream<F, S> {
pub(crate) fn new(cancellation: F, inner: S) -> Self {
Self {
cancellation,
hit: false,
inner,
}
}
}
/// See documentation on [`crate::DownloadStream`] on rationale why `std::io::Error` is used.
impl<E, F, S> Stream for DownloadStream<F, S>
where
std::io::Error: From<E>,
F: Future<Output = E>,
S: Stream<Item = std::io::Result<Bytes>>,
{
type Item = <S as Stream>::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
if !*this.hit {
if let Poll::Ready(e) = this.cancellation.poll(cx) {
*this.hit = true;
let e = Err(std::io::Error::from(e));
return Poll::Ready(Some(e));
}
}
this.inner.poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
/// Fires only on the first cancel or timeout, not on both.
pub(crate) async fn cancel_or_timeout(
timeout: Duration,
cancel: CancellationToken,
) -> TimeoutOrCancel {
tokio::select! {
_ = tokio::time::sleep(timeout) => TimeoutOrCancel::Timeout,
_ = cancel.cancelled() => TimeoutOrCancel::Cancel,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::DownloadError;
use futures::stream::StreamExt;
#[tokio::test(start_paused = true)]
async fn cancelled_download_stream() {
let inner = futures::stream::pending();
let timeout = Duration::from_secs(120);
let cancel = CancellationToken::new();
let stream = DownloadStream::new(cancel_or_timeout(timeout, cancel.clone()), inner);
let mut stream = std::pin::pin!(stream);
let mut first = stream.next();
tokio::select! {
_ = &mut first => unreachable!("we haven't yet cancelled nor is timeout passed"),
_ = tokio::time::sleep(Duration::from_secs(1)) => {},
}
cancel.cancel();
let e = first.await.expect("there must be some").unwrap_err();
assert!(matches!(e.kind(), std::io::ErrorKind::Other), "{e:?}");
let inner = e.get_ref().expect("inner should be set");
assert!(
inner
.downcast_ref::<DownloadError>()
.is_some_and(|e| matches!(e, DownloadError::Cancelled)),
"{inner:?}"
);
tokio::select! {
_ = stream.next() => unreachable!("no timeout ever happens as we were already cancelled"),
_ = tokio::time::sleep(Duration::from_secs(121)) => {},
}
}
#[tokio::test(start_paused = true)]
async fn timeouted_download_stream() {
let inner = futures::stream::pending();
let timeout = Duration::from_secs(120);
let cancel = CancellationToken::new();
let stream = DownloadStream::new(cancel_or_timeout(timeout, cancel.clone()), inner);
let mut stream = std::pin::pin!(stream);
// because the stream uses 120s timeout we are paused, we advance to 120s right away.
let first = stream.next();
let e = first.await.expect("there must be some").unwrap_err();
assert!(matches!(e.kind(), std::io::ErrorKind::Other), "{e:?}");
let inner = e.get_ref().expect("inner should be set");
assert!(
inner
.downcast_ref::<DownloadError>()
.is_some_and(|e| matches!(e, DownloadError::Timeout)),
"{inner:?}"
);
cancel.cancel();
tokio::select! {
_ = stream.next() => unreachable!("no cancellation ever happens because we already timed out"),
_ = tokio::time::sleep(Duration::from_secs(121)) => {},
}
}
}

View File

@@ -10,6 +10,7 @@ use futures::stream::Stream;
use once_cell::sync::OnceCell;
use remote_storage::{Download, GenericRemoteStorage, RemotePath};
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info};
static LOGGING_DONE: OnceCell<()> = OnceCell::new();
@@ -58,8 +59,12 @@ pub(crate) async fn upload_simple_remote_data(
) -> ControlFlow<HashSet<RemotePath>, HashSet<RemotePath>> {
info!("Creating {upload_tasks_count} remote files");
let mut upload_tasks = JoinSet::new();
let cancel = CancellationToken::new();
for i in 1..upload_tasks_count + 1 {
let task_client = Arc::clone(client);
let cancel = cancel.clone();
upload_tasks.spawn(async move {
let blob_path = PathBuf::from(format!("folder{}/blob_{}.txt", i / 7, i));
let blob_path = RemotePath::new(
@@ -69,7 +74,9 @@ pub(crate) async fn upload_simple_remote_data(
debug!("Creating remote item {i} at path {blob_path:?}");
let (data, len) = upload_stream(format!("remote blob data {i}").into_bytes().into());
task_client.upload(data, len, &blob_path, None).await?;
task_client
.upload(data, len, &blob_path, None, &cancel)
.await?;
Ok::<_, anyhow::Error>(blob_path)
});
@@ -107,13 +114,15 @@ pub(crate) async fn cleanup(
"Removing {} objects from the remote storage during cleanup",
objects_to_delete.len()
);
let cancel = CancellationToken::new();
let mut delete_tasks = JoinSet::new();
for object_to_delete in objects_to_delete {
let task_client = Arc::clone(client);
let cancel = cancel.clone();
delete_tasks.spawn(async move {
debug!("Deleting remote item at path {object_to_delete:?}");
task_client
.delete(&object_to_delete)
.delete(&object_to_delete, &cancel)
.await
.with_context(|| format!("{object_to_delete:?} removal"))
});
@@ -141,8 +150,12 @@ pub(crate) async fn upload_remote_data(
) -> ControlFlow<Uploads, Uploads> {
info!("Creating {upload_tasks_count} remote files");
let mut upload_tasks = JoinSet::new();
let cancel = CancellationToken::new();
for i in 1..upload_tasks_count + 1 {
let task_client = Arc::clone(client);
let cancel = cancel.clone();
upload_tasks.spawn(async move {
let prefix = format!("{base_prefix_str}/sub_prefix_{i}/");
let blob_prefix = RemotePath::new(Utf8Path::new(&prefix))
@@ -152,7 +165,9 @@ pub(crate) async fn upload_remote_data(
let (data, data_len) =
upload_stream(format!("remote blob data {i}").into_bytes().into());
task_client.upload(data, data_len, &blob_path, None).await?;
task_client
.upload(data, data_len, &blob_path, None, &cancel)
.await?;
Ok::<_, anyhow::Error>((blob_prefix, blob_path))
});

View File

@@ -4,6 +4,7 @@ use remote_storage::RemotePath;
use std::sync::Arc;
use std::{collections::HashSet, num::NonZeroU32};
use test_context::test_context;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use crate::common::{download_to_vec, upload_stream, wrap_stream};
@@ -45,13 +46,15 @@ async fn pagination_should_work(ctx: &mut MaybeEnabledStorageWithTestBlobs) -> a
}
};
let cancel = CancellationToken::new();
let test_client = Arc::clone(&ctx.enabled.client);
let expected_remote_prefixes = ctx.remote_prefixes.clone();
let base_prefix = RemotePath::new(Utf8Path::new(ctx.enabled.base_prefix))
.context("common_prefix construction")?;
let root_remote_prefixes = test_client
.list_prefixes(None)
.list_prefixes(None, &cancel)
.await
.context("client list root prefixes failure")?
.into_iter()
@@ -62,7 +65,7 @@ async fn pagination_should_work(ctx: &mut MaybeEnabledStorageWithTestBlobs) -> a
);
let nested_remote_prefixes = test_client
.list_prefixes(Some(&base_prefix))
.list_prefixes(Some(&base_prefix), &cancel)
.await
.context("client list nested prefixes failure")?
.into_iter()
@@ -99,11 +102,12 @@ async fn list_files_works(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> a
anyhow::bail!("S3 init failed: {e:?}")
}
};
let cancel = CancellationToken::new();
let test_client = Arc::clone(&ctx.enabled.client);
let base_prefix =
RemotePath::new(Utf8Path::new("folder1")).context("common_prefix construction")?;
let root_files = test_client
.list_files(None, None)
.list_files(None, None, &cancel)
.await
.context("client list root files failure")?
.into_iter()
@@ -117,13 +121,13 @@ async fn list_files_works(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> a
// Test that max_keys limit works. In total there are about 21 files (see
// upload_simple_remote_data call in test_real_s3.rs).
let limited_root_files = test_client
.list_files(None, Some(NonZeroU32::new(2).unwrap()))
.list_files(None, Some(NonZeroU32::new(2).unwrap()), &cancel)
.await
.context("client list root files failure")?;
assert_eq!(limited_root_files.len(), 2);
let nested_remote_files = test_client
.list_files(Some(&base_prefix), None)
.list_files(Some(&base_prefix), None, &cancel)
.await
.context("client list nested files failure")?
.into_iter()
@@ -150,12 +154,17 @@ async fn delete_non_exising_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Resu
MaybeEnabledStorage::Disabled => return Ok(()),
};
let cancel = CancellationToken::new();
let path = RemotePath::new(Utf8Path::new(
format!("{}/for_sure_there_is_nothing_there_really", ctx.base_prefix).as_str(),
))
.with_context(|| "RemotePath conversion")?;
ctx.client.delete(&path).await.expect("should succeed");
ctx.client
.delete(&path, &cancel)
.await
.expect("should succeed");
Ok(())
}
@@ -168,6 +177,8 @@ async fn delete_objects_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<(
MaybeEnabledStorage::Disabled => return Ok(()),
};
let cancel = CancellationToken::new();
let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
.with_context(|| "RemotePath conversion")?;
@@ -178,21 +189,21 @@ async fn delete_objects_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<(
.with_context(|| "RemotePath conversion")?;
let (data, len) = upload_stream("remote blob data1".as_bytes().into());
ctx.client.upload(data, len, &path1, None).await?;
ctx.client.upload(data, len, &path1, None, &cancel).await?;
let (data, len) = upload_stream("remote blob data2".as_bytes().into());
ctx.client.upload(data, len, &path2, None).await?;
ctx.client.upload(data, len, &path2, None, &cancel).await?;
let (data, len) = upload_stream("remote blob data3".as_bytes().into());
ctx.client.upload(data, len, &path3, None).await?;
ctx.client.upload(data, len, &path3, None, &cancel).await?;
ctx.client.delete_objects(&[path1, path2]).await?;
ctx.client.delete_objects(&[path1, path2], &cancel).await?;
let prefixes = ctx.client.list_prefixes(None).await?;
let prefixes = ctx.client.list_prefixes(None, &cancel).await?;
assert_eq!(prefixes.len(), 1);
ctx.client.delete_objects(&[path3]).await?;
ctx.client.delete_objects(&[path3], &cancel).await?;
Ok(())
}
@@ -204,6 +215,8 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<
return Ok(());
};
let cancel = CancellationToken::new();
let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str()))
.with_context(|| "RemotePath conversion")?;
@@ -211,47 +224,56 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<
let (data, len) = wrap_stream(orig.clone());
ctx.client.upload(data, len, &path, None).await?;
ctx.client.upload(data, len, &path, None, &cancel).await?;
// Normal download request
let dl = ctx.client.download(&path).await?;
let dl = ctx.client.download(&path, &cancel).await?;
let buf = download_to_vec(dl).await?;
assert_eq!(&buf, &orig);
// Full range (end specified)
let dl = ctx
.client
.download_byte_range(&path, 0, Some(len as u64))
.download_byte_range(&path, 0, Some(len as u64), &cancel)
.await?;
let buf = download_to_vec(dl).await?;
assert_eq!(&buf, &orig);
// partial range (end specified)
let dl = ctx.client.download_byte_range(&path, 4, Some(10)).await?;
let dl = ctx
.client
.download_byte_range(&path, 4, Some(10), &cancel)
.await?;
let buf = download_to_vec(dl).await?;
assert_eq!(&buf, &orig[4..10]);
// partial range (end beyond real end)
let dl = ctx
.client
.download_byte_range(&path, 8, Some(len as u64 * 100))
.download_byte_range(&path, 8, Some(len as u64 * 100), &cancel)
.await?;
let buf = download_to_vec(dl).await?;
assert_eq!(&buf, &orig[8..]);
// Partial range (end unspecified)
let dl = ctx.client.download_byte_range(&path, 4, None).await?;
let dl = ctx
.client
.download_byte_range(&path, 4, None, &cancel)
.await?;
let buf = download_to_vec(dl).await?;
assert_eq!(&buf, &orig[4..]);
// Full range (end unspecified)
let dl = ctx.client.download_byte_range(&path, 0, None).await?;
let dl = ctx
.client
.download_byte_range(&path, 0, None, &cancel)
.await?;
let buf = download_to_vec(dl).await?;
assert_eq!(&buf, &orig);
debug!("Cleanup: deleting file at path {path:?}");
ctx.client
.delete(&path)
.delete(&path, &cancel)
.await
.with_context(|| format!("{path:?} removal"))?;
@@ -265,6 +287,8 @@ async fn copy_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
return Ok(());
};
let cancel = CancellationToken::new();
let path = RemotePath::new(Utf8Path::new(
format!("{}/file_to_copy", ctx.base_prefix).as_str(),
))
@@ -278,18 +302,18 @@ async fn copy_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
let (data, len) = wrap_stream(orig.clone());
ctx.client.upload(data, len, &path, None).await?;
ctx.client.upload(data, len, &path, None, &cancel).await?;
// Normal download request
ctx.client.copy_object(&path, &path_dest).await?;
ctx.client.copy_object(&path, &path_dest, &cancel).await?;
let dl = ctx.client.download(&path_dest).await?;
let dl = ctx.client.download(&path_dest, &cancel).await?;
let buf = download_to_vec(dl).await?;
assert_eq!(&buf, &orig);
debug!("Cleanup: deleting file at path {path:?}");
ctx.client
.delete_objects(&[path.clone(), path_dest.clone()])
.delete_objects(&[path.clone(), path_dest.clone()], &cancel)
.await
.with_context(|| format!("{path:?} removal"))?;

View File

@@ -1,9 +1,9 @@
use std::collections::HashSet;
use std::env;
use std::num::NonZeroUsize;
use std::ops::ControlFlow;
use std::sync::Arc;
use std::time::UNIX_EPOCH;
use std::{collections::HashSet, time::Duration};
use anyhow::Context;
use remote_storage::{
@@ -39,6 +39,17 @@ impl EnabledAzure {
base_prefix: BASE_PREFIX,
}
}
#[allow(unused)] // this will be needed when moving the timeout integration tests back
fn configure_request_timeout(&mut self, timeout: Duration) {
match Arc::get_mut(&mut self.client).expect("outer Arc::get_mut") {
GenericRemoteStorage::AzureBlob(azure) => {
let azure = Arc::get_mut(azure).expect("inner Arc::get_mut");
azure.timeout = timeout;
}
_ => unreachable!(),
}
}
}
enum MaybeEnabledStorage {
@@ -213,6 +224,7 @@ fn create_azure_client(
concurrency_limit: NonZeroUsize::new(100).unwrap(),
max_keys_per_list_response,
}),
timeout: Duration::from_secs(120),
};
Ok(Arc::new(
GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?,

View File

@@ -1,5 +1,6 @@
use std::env;
use std::fmt::{Debug, Display};
use std::future::Future;
use std::num::NonZeroUsize;
use std::ops::ControlFlow;
use std::sync::Arc;
@@ -9,9 +10,10 @@ use std::{collections::HashSet, time::SystemTime};
use crate::common::{download_to_vec, upload_stream};
use anyhow::Context;
use camino::Utf8Path;
use futures_util::Future;
use futures_util::StreamExt;
use remote_storage::{
GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config,
DownloadError, GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind,
S3Config,
};
use test_context::test_context;
use test_context::AsyncTestContext;
@@ -27,7 +29,6 @@ use common::{cleanup, ensure_logging_ready, upload_remote_data, upload_simple_re
use utils::backoff;
const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
const BASE_PREFIX: &str = "test";
#[test_context(MaybeEnabledStorage)]
@@ -69,8 +70,11 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
ret
}
async fn list_files(client: &Arc<GenericRemoteStorage>) -> anyhow::Result<HashSet<RemotePath>> {
Ok(retry(|| client.list_files(None, None))
async fn list_files(
client: &Arc<GenericRemoteStorage>,
cancel: &CancellationToken,
) -> anyhow::Result<HashSet<RemotePath>> {
Ok(retry(|| client.list_files(None, None, cancel))
.await
.context("list root files failure")?
.into_iter()
@@ -90,11 +94,11 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
retry(|| {
let (data, len) = upload_stream("remote blob data1".as_bytes().into());
ctx.client.upload(data, len, &path1, None)
ctx.client.upload(data, len, &path1, None, &cancel)
})
.await?;
let t0_files = list_files(&ctx.client).await?;
let t0_files = list_files(&ctx.client, &cancel).await?;
let t0 = time_point().await;
println!("at t0: {t0_files:?}");
@@ -102,17 +106,17 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
retry(|| {
let (data, len) = upload_stream(old_data.as_bytes().into());
ctx.client.upload(data, len, &path2, None)
ctx.client.upload(data, len, &path2, None, &cancel)
})
.await?;
let t1_files = list_files(&ctx.client).await?;
let t1_files = list_files(&ctx.client, &cancel).await?;
let t1 = time_point().await;
println!("at t1: {t1_files:?}");
// A little check to ensure that our clock is not too far off from the S3 clock
{
let dl = retry(|| ctx.client.download(&path2)).await?;
let dl = retry(|| ctx.client.download(&path2, &cancel)).await?;
let last_modified = dl.last_modified.unwrap();
let half_wt = WAIT_TIME.mul_f32(0.5);
let t0_hwt = t0 + half_wt;
@@ -125,7 +129,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
retry(|| {
let (data, len) = upload_stream("remote blob data3".as_bytes().into());
ctx.client.upload(data, len, &path3, None)
ctx.client.upload(data, len, &path3, None, &cancel)
})
.await?;
@@ -133,12 +137,12 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
retry(|| {
let (data, len) = upload_stream(new_data.as_bytes().into());
ctx.client.upload(data, len, &path2, None)
ctx.client.upload(data, len, &path2, None, &cancel)
})
.await?;
retry(|| ctx.client.delete(&path1)).await?;
let t2_files = list_files(&ctx.client).await?;
retry(|| ctx.client.delete(&path1, &cancel)).await?;
let t2_files = list_files(&ctx.client, &cancel).await?;
let t2 = time_point().await;
println!("at t2: {t2_files:?}");
@@ -147,10 +151,10 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
ctx.client
.time_travel_recover(None, t2, t_final, &cancel)
.await?;
let t2_files_recovered = list_files(&ctx.client).await?;
let t2_files_recovered = list_files(&ctx.client, &cancel).await?;
println!("after recovery to t2: {t2_files_recovered:?}");
assert_eq!(t2_files, t2_files_recovered);
let path2_recovered_t2 = download_to_vec(ctx.client.download(&path2).await?).await?;
let path2_recovered_t2 = download_to_vec(ctx.client.download(&path2, &cancel).await?).await?;
assert_eq!(path2_recovered_t2, new_data.as_bytes());
// after recovery to t1: path1 is back, path2 has the old content
@@ -158,10 +162,10 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
ctx.client
.time_travel_recover(None, t1, t_final, &cancel)
.await?;
let t1_files_recovered = list_files(&ctx.client).await?;
let t1_files_recovered = list_files(&ctx.client, &cancel).await?;
println!("after recovery to t1: {t1_files_recovered:?}");
assert_eq!(t1_files, t1_files_recovered);
let path2_recovered_t1 = download_to_vec(ctx.client.download(&path2).await?).await?;
let path2_recovered_t1 = download_to_vec(ctx.client.download(&path2, &cancel).await?).await?;
assert_eq!(path2_recovered_t1, old_data.as_bytes());
// after recovery to t0: everything is gone except for path1
@@ -169,14 +173,14 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
ctx.client
.time_travel_recover(None, t0, t_final, &cancel)
.await?;
let t0_files_recovered = list_files(&ctx.client).await?;
let t0_files_recovered = list_files(&ctx.client, &cancel).await?;
println!("after recovery to t0: {t0_files_recovered:?}");
assert_eq!(t0_files, t0_files_recovered);
// cleanup
let paths = &[path1, path2, path3];
retry(|| ctx.client.delete_objects(paths)).await?;
retry(|| ctx.client.delete_objects(paths, &cancel)).await?;
Ok(())
}
@@ -197,6 +201,16 @@ impl EnabledS3 {
base_prefix: BASE_PREFIX,
}
}
fn configure_request_timeout(&mut self, timeout: Duration) {
match Arc::get_mut(&mut self.client).expect("outer Arc::get_mut") {
GenericRemoteStorage::AwsS3(s3) => {
let s3 = Arc::get_mut(s3).expect("inner Arc::get_mut");
s3.timeout = timeout;
}
_ => unreachable!(),
}
}
}
enum MaybeEnabledStorage {
@@ -370,8 +384,169 @@ fn create_s3_client(
concurrency_limit: NonZeroUsize::new(100).unwrap(),
max_keys_per_list_response,
}),
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
};
Ok(Arc::new(
GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?,
))
}
#[test_context(MaybeEnabledStorage)]
#[tokio::test]
async fn download_is_timeouted(ctx: &mut MaybeEnabledStorage) {
let MaybeEnabledStorage::Enabled(ctx) = ctx else {
return;
};
let cancel = CancellationToken::new();
let path = RemotePath::new(Utf8Path::new(
format!("{}/file_to_copy", ctx.base_prefix).as_str(),
))
.unwrap();
let len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
let timeout = std::time::Duration::from_secs(5);
ctx.configure_request_timeout(timeout);
let started_at = std::time::Instant::now();
let mut stream = ctx
.client
.download(&path, &cancel)
.await
.expect("download succeeds")
.download_stream;
if started_at.elapsed().mul_f32(0.9) >= timeout {
tracing::warn!(
elapsed_ms = started_at.elapsed().as_millis(),
"timeout might be too low, consumed most of it during headers"
);
}
let first = stream
.next()
.await
.expect("should have the first blob")
.expect("should have succeeded");
tracing::info!(len = first.len(), "downloaded first chunk");
assert!(
first.len() < len,
"uploaded file is too small, we downloaded all on first chunk"
);
tokio::time::sleep(timeout).await;
{
let started_at = std::time::Instant::now();
let next = stream
.next()
.await
.expect("stream should not have ended yet");
tracing::info!(
next.is_err = next.is_err(),
elapsed_ms = started_at.elapsed().as_millis(),
"received item after timeout"
);
let e = next.expect_err("expected an error, but got a chunk?");
let inner = e.get_ref().expect("std::io::Error::inner should be set");
assert!(
inner
.downcast_ref::<DownloadError>()
.is_some_and(|e| matches!(e, DownloadError::Timeout)),
"{inner:?}"
);
}
ctx.configure_request_timeout(RemoteStorageConfig::DEFAULT_TIMEOUT);
ctx.client.delete_objects(&[path], &cancel).await.unwrap()
}
#[test_context(MaybeEnabledStorage)]
#[tokio::test]
async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) {
let MaybeEnabledStorage::Enabled(ctx) = ctx else {
return;
};
let cancel = CancellationToken::new();
let path = RemotePath::new(Utf8Path::new(
format!("{}/file_to_copy", ctx.base_prefix).as_str(),
))
.unwrap();
let len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
{
let mut stream = ctx
.client
.download(&path, &cancel)
.await
.expect("download succeeds")
.download_stream;
let first = stream
.next()
.await
.expect("should have the first blob")
.expect("should have succeeded");
tracing::info!(len = first.len(), "downloaded first chunk");
assert!(
first.len() < len,
"uploaded file is too small, we downloaded all on first chunk"
);
cancel.cancel();
let next = stream.next().await.expect("stream should have more");
let e = next.expect_err("expected an error, but got a chunk?");
let inner = e.get_ref().expect("std::io::Error::inner should be set");
assert!(
inner
.downcast_ref::<DownloadError>()
.is_some_and(|e| matches!(e, DownloadError::Cancelled)),
"{inner:?}"
);
}
let cancel = CancellationToken::new();
ctx.client.delete_objects(&[path], &cancel).await.unwrap();
}
/// Upload a long enough file so that we cannot download it in single chunk
///
/// For s3 the first chunk seems to be less than 10kB, so this has a bit of a safety margin
async fn upload_large_enough_file(
client: &GenericRemoteStorage,
path: &RemotePath,
cancel: &CancellationToken,
) -> usize {
let header = bytes::Bytes::from_static("remote blob data content".as_bytes());
let body = bytes::Bytes::from(vec![0u8; 1024]);
let contents = std::iter::once(header).chain(std::iter::repeat(body).take(128));
let len = contents.clone().fold(0, |acc, next| acc + next.len());
let contents = futures::stream::iter(contents.map(std::io::Result::Ok));
client
.upload(contents, len, path, None, cancel)
.await
.expect("upload succeeds");
len
}

View File

@@ -1359,6 +1359,7 @@ broker_endpoint = '{broker_endpoint}'
parsed_remote_storage_config,
RemoteStorageConfig {
storage: RemoteStorageKind::LocalFs(local_storage_path.clone()),
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
},
"Remote storage config should correctly parse the local FS config and fill other storage defaults"
);
@@ -1426,6 +1427,7 @@ broker_endpoint = '{broker_endpoint}'
concurrency_limit: s3_concurrency_limit,
max_keys_per_list_response: None,
}),
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
},
"Remote storage config should correctly parse the S3 config"
);

View File

@@ -867,6 +867,7 @@ mod test {
let remote_fs_dir = harness.conf.workdir.join("remote_fs").canonicalize_utf8()?;
let storage_config = RemoteStorageConfig {
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
};
let storage = GenericRemoteStorage::from_config(&storage_config).unwrap();
@@ -1170,6 +1171,7 @@ pub(crate) mod mock {
pub struct ConsumerState {
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
executor_rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
cancel: CancellationToken,
}
impl ConsumerState {
@@ -1183,7 +1185,7 @@ pub(crate) mod mock {
match msg {
DeleterMessage::Delete(objects) => {
for path in objects {
match remote_storage.delete(&path).await {
match remote_storage.delete(&path, &self.cancel).await {
Ok(_) => {
debug!("Deleted {path}");
}
@@ -1216,7 +1218,7 @@ pub(crate) mod mock {
for path in objects {
info!("Executing deletion {path}");
match remote_storage.delete(&path).await {
match remote_storage.delete(&path, &self.cancel).await {
Ok(_) => {
debug!("Deleted {path}");
}
@@ -1266,7 +1268,11 @@ pub(crate) mod mock {
executor_tx,
executed,
remote_storage,
consumer: std::sync::Mutex::new(ConsumerState { rx, executor_rx }),
consumer: std::sync::Mutex::new(ConsumerState {
rx,
executor_rx,
cancel: CancellationToken::new(),
}),
lsn_table: Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new())),
}
}

View File

@@ -8,6 +8,7 @@
use remote_storage::GenericRemoteStorage;
use remote_storage::RemotePath;
use remote_storage::TimeoutOrCancel;
use remote_storage::MAX_KEYS_PER_DELETE;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
@@ -71,9 +72,11 @@ impl Deleter {
Err(anyhow::anyhow!("failpoint: deletion-queue-before-execute"))
});
self.remote_storage.delete_objects(&self.accumulator).await
self.remote_storage
.delete_objects(&self.accumulator, &self.cancel)
.await
},
|_| false,
TimeoutOrCancel::caused_by_cancel,
3,
10,
"executing deletion batch",

View File

@@ -25,6 +25,7 @@ use pageserver_api::shard::ShardIdentity;
use pageserver_api::shard::TenantShardId;
use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
use remote_storage::TimeoutOrCancel;
use std::fmt;
use storage_broker::BrokerClientChannel;
use tokio::io::BufReader;
@@ -3339,7 +3340,7 @@ impl Tenant {
&self.cancel,
)
.await
.ok_or_else(|| anyhow::anyhow!("Cancelled"))
.ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
.and_then(|x| x)
}
@@ -3389,8 +3390,10 @@ impl Tenant {
);
let dest_path =
&remote_initdb_archive_path(&self.tenant_shard_id.tenant_id, &timeline_id);
// if this fails, it will get retried by retried control plane requests
storage
.copy_object(source_path, dest_path)
.copy_object(source_path, dest_path, &self.cancel)
.await
.context("copy initdb tar")?;
}
@@ -4031,6 +4034,7 @@ pub(crate) mod harness {
std::fs::create_dir_all(&remote_fs_dir).unwrap();
let config = RemoteStorageConfig {
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
};
let remote_storage = GenericRemoteStorage::from_config(&config).unwrap();
let deletion_queue = MockDeletionQueue::new(Some(remote_storage.clone()));

View File

@@ -3,7 +3,7 @@ use std::sync::Arc;
use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::{models::TenantState, shard::TenantShardId};
use remote_storage::{GenericRemoteStorage, RemotePath};
use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
use tokio::sync::OwnedMutexGuard;
use tokio_util::sync::CancellationToken;
use tracing::{error, instrument, Instrument};
@@ -84,17 +84,17 @@ async fn create_remote_delete_mark(
let data = bytes::Bytes::from_static(data);
let stream = futures::stream::once(futures::future::ready(Ok(data)));
remote_storage
.upload(stream, 0, &remote_mark_path, None)
.upload(stream, 0, &remote_mark_path, None, cancel)
.await
},
|_e| false,
TimeoutOrCancel::caused_by_cancel,
FAILED_UPLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
"mark_upload",
cancel,
)
.await
.ok_or_else(|| anyhow::anyhow!("Cancelled"))
.ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
.and_then(|x| x)
.context("mark_upload")?;
@@ -184,15 +184,15 @@ async fn remove_tenant_remote_delete_mark(
if let Some(remote_storage) = remote_storage {
let path = remote_tenant_delete_mark_path(conf, tenant_shard_id)?;
backoff::retry(
|| async { remote_storage.delete(&path).await },
|_e| false,
|| async { remote_storage.delete(&path, cancel).await },
TimeoutOrCancel::caused_by_cancel,
FAILED_UPLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
"remove_tenant_remote_delete_mark",
cancel,
)
.await
.ok_or_else(|| anyhow::anyhow!("Cancelled"))
.ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
.and_then(|x| x)
.context("remove_tenant_remote_delete_mark")?;
}

View File

@@ -196,14 +196,12 @@ pub(crate) use upload::upload_initdb_dir;
use utils::backoff::{
self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
};
use utils::timeout::{timeout_cancellable, TimeoutCancellableError};
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath, TimeoutOrCancel};
use std::ops::DerefMut;
use tracing::{debug, error, info, instrument, warn};
use tracing::{info_span, Instrument};
@@ -263,11 +261,6 @@ pub(crate) const INITDB_PRESERVED_PATH: &str = "initdb-preserved.tar.zst";
/// Default buffer size when interfacing with [`tokio::fs::File`].
pub(crate) const BUFFER_SIZE: usize = 32 * 1024;
/// This timeout is intended to deal with hangs in lower layers, e.g. stuck TCP flows. It is not
/// intended to be snappy enough for prompt shutdown, as we have a CancellationToken for that.
pub(crate) const UPLOAD_TIMEOUT: Duration = Duration::from_secs(120);
pub(crate) const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(120);
pub enum MaybeDeletedIndexPart {
IndexPart(IndexPart),
Deleted(IndexPart),
@@ -331,40 +324,6 @@ pub struct RemoteTimelineClient {
cancel: CancellationToken,
}
/// Wrapper for timeout_cancellable that flattens result and converts TimeoutCancellableError to anyhow.
///
/// This is a convenience for the various upload functions. In future
/// the anyhow::Error result should be replaced with a more structured type that
/// enables callers to avoid handling shutdown as an error.
async fn upload_cancellable<F>(cancel: &CancellationToken, future: F) -> anyhow::Result<()>
where
F: std::future::Future<Output = anyhow::Result<()>>,
{
match timeout_cancellable(UPLOAD_TIMEOUT, cancel, future).await {
Ok(Ok(())) => Ok(()),
Ok(Err(e)) => Err(e),
Err(TimeoutCancellableError::Timeout) => Err(anyhow::anyhow!("Timeout")),
Err(TimeoutCancellableError::Cancelled) => Err(anyhow::anyhow!("Shutting down")),
}
}
/// Wrapper for timeout_cancellable that flattens result and converts TimeoutCancellableError to DownloaDError.
async fn download_cancellable<F, R>(
cancel: &CancellationToken,
future: F,
) -> Result<R, DownloadError>
where
F: std::future::Future<Output = Result<R, DownloadError>>,
{
match timeout_cancellable(DOWNLOAD_TIMEOUT, cancel, future).await {
Ok(Ok(r)) => Ok(r),
Ok(Err(e)) => Err(e),
Err(TimeoutCancellableError::Timeout) => {
Err(DownloadError::Other(anyhow::anyhow!("Timed out")))
}
Err(TimeoutCancellableError::Cancelled) => Err(DownloadError::Cancelled),
}
}
impl RemoteTimelineClient {
///
/// Create a remote storage client for given timeline
@@ -1050,7 +1009,7 @@ impl RemoteTimelineClient {
&self.cancel,
)
.await
.ok_or_else(|| anyhow::anyhow!("Cancelled"))
.ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
.and_then(|x| x)?;
// all good, disarm the guard and mark as success
@@ -1082,14 +1041,14 @@ impl RemoteTimelineClient {
upload::preserve_initdb_archive(&self.storage_impl, tenant_id, timeline_id, cancel)
.await
},
|_e| false,
TimeoutOrCancel::caused_by_cancel,
FAILED_DOWNLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
"preserve_initdb_tar_zst",
&cancel.clone(),
)
.await
.ok_or_else(|| anyhow::anyhow!("Cancellled"))
.ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
.and_then(|x| x)
.context("backing up initdb archive")?;
Ok(())
@@ -1151,7 +1110,7 @@ impl RemoteTimelineClient {
let remaining = download_retry(
|| async {
self.storage_impl
.list_files(Some(&timeline_storage_path), None)
.list_files(Some(&timeline_storage_path), None, &cancel)
.await
},
"list remaining files",
@@ -1445,6 +1404,10 @@ impl RemoteTimelineClient {
Ok(()) => {
break;
}
Err(e) if TimeoutOrCancel::caused_by_cancel(&e) => {
// loop around to do the proper stopping
continue;
}
Err(e) => {
let retries = task.retries.fetch_add(1, Ordering::SeqCst);

View File

@@ -11,16 +11,14 @@ use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::shard::TenantShardId;
use tokio::fs::{self, File, OpenOptions};
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
use tokio_util::io::StreamReader;
use tokio_util::sync::CancellationToken;
use tracing::warn;
use utils::timeout::timeout_cancellable;
use utils::{backoff, crashsafe};
use crate::config::PageServerConf;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::remote_timeline_client::{
download_cancellable, remote_layer_path, remote_timelines_path, DOWNLOAD_TIMEOUT,
};
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
use crate::tenant::storage_layer::LayerFileName;
use crate::tenant::Generation;
use crate::virtual_file::on_fatal_io_error;
@@ -83,15 +81,13 @@ pub async fn download_layer_file<'a>(
.with_context(|| format!("create a destination file for layer '{temp_file_path}'"))
.map_err(DownloadError::Other)?;
// Cancellation safety: it is safe to cancel this future, because it isn't writing to a local
// file: the write to local file doesn't start until after the request header is returned
// and we start draining the body stream below
let download = download_cancellable(cancel, storage.download(&remote_path))
let download = storage
.download(&remote_path, cancel)
.await
.with_context(|| {
format!(
"open a download stream for layer with remote storage path '{remote_path:?}'"
)
"open a download stream for layer with remote storage path '{remote_path:?}'"
)
})
.map_err(DownloadError::Other)?;
@@ -100,43 +96,26 @@ pub async fn download_layer_file<'a>(
let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
// Cancellation safety: it is safe to cancel this future because it is writing into a temporary file,
// and we will unlink the temporary file if there is an error. This unlink is important because we
// are in a retry loop, and we wouldn't want to leave behind a rogue write I/O to a file that
// we will imminiently try and write to again.
let bytes_amount: u64 = match timeout_cancellable(
DOWNLOAD_TIMEOUT,
cancel,
tokio::io::copy_buf(&mut reader, &mut destination_file),
)
.await
.with_context(|| {
format!(
let bytes_amount = tokio::io::copy_buf(&mut reader, &mut destination_file)
.await
.with_context(|| format!(
"download layer at remote path '{remote_path:?}' into file {temp_file_path:?}"
)
})
.map_err(DownloadError::Other)?
{
Ok(b) => Ok(b),
))
.map_err(DownloadError::Other);
match bytes_amount {
Ok(bytes_amount) => {
let destination_file = destination_file.into_inner();
Ok((destination_file, bytes_amount))
}
Err(e) => {
// Remove incomplete files: on restart Timeline would do this anyway, but we must
// do it here for the retry case.
if let Err(e) = tokio::fs::remove_file(&temp_file_path).await {
on_fatal_io_error(&e, &format!("Removing temporary file {temp_file_path}"));
}
Err(e)
}
}
.with_context(|| {
format!(
"download layer at remote path '{remote_path:?}' into file {temp_file_path:?}"
)
})
.map_err(DownloadError::Other)?;
let destination_file = destination_file.into_inner();
Ok((destination_file, bytes_amount))
},
&format!("download {remote_path:?}"),
cancel,
@@ -218,9 +197,11 @@ pub async fn list_remote_timelines(
let listing = download_retry_forever(
|| {
download_cancellable(
storage.list(
Some(&remote_path),
ListingMode::WithDelimiter,
None,
&cancel,
storage.list(Some(&remote_path), ListingMode::WithDelimiter, None),
)
},
&format!("list timelines for {tenant_shard_id}"),
@@ -259,26 +240,23 @@ async fn do_download_index_part(
index_generation: Generation,
cancel: &CancellationToken,
) -> Result<IndexPart, DownloadError> {
use futures::stream::StreamExt;
let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
let index_part_bytes = download_retry_forever(
|| async {
// Cancellation: if is safe to cancel this future because we're just downloading into
// a memory buffer, not touching local disk.
let index_part_download =
download_cancellable(cancel, storage.download(&remote_path)).await?;
let download = storage.download(&remote_path, cancel).await?;
let mut index_part_bytes = Vec::new();
let mut stream = std::pin::pin!(index_part_download.download_stream);
while let Some(chunk) = stream.next().await {
let chunk = chunk
.with_context(|| format!("download index part at {remote_path:?}"))
.map_err(DownloadError::Other)?;
index_part_bytes.extend_from_slice(&chunk[..]);
}
Ok(index_part_bytes)
let mut bytes = Vec::new();
let stream = download.download_stream;
let mut stream = StreamReader::new(stream);
tokio::io::copy_buf(&mut stream, &mut bytes)
.await
.with_context(|| format!("download index part at {remote_path:?}"))
.map_err(DownloadError::Other)?;
Ok(bytes)
},
&format!("download {remote_path:?}"),
cancel,
@@ -373,7 +351,7 @@ pub(super) async fn download_index_part(
let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none());
let indices = download_retry(
|| async { storage.list_files(Some(&index_prefix), None).await },
|| async { storage.list_files(Some(&index_prefix), None, cancel).await },
"list index_part files",
cancel,
)
@@ -446,11 +424,10 @@ pub(crate) async fn download_initdb_tar_zst(
.with_context(|| format!("tempfile creation {temp_path}"))
.map_err(DownloadError::Other)?;
let download = match download_cancellable(cancel, storage.download(&remote_path)).await
{
let download = match storage.download(&remote_path, cancel).await {
Ok(dl) => dl,
Err(DownloadError::NotFound) => {
download_cancellable(cancel, storage.download(&remote_preserved_path)).await?
storage.download(&remote_preserved_path, cancel).await?
}
Err(other) => Err(other)?,
};
@@ -460,6 +437,7 @@ pub(crate) async fn download_initdb_tar_zst(
// TODO: this consumption of the response body should be subject to timeout + cancellation, but
// not without thinking carefully about how to recover safely from cancelling a write to
// local storage (e.g. by writing into a temp file as we do in download_layer)
// FIXME: flip the weird error wrapping
tokio::io::copy_buf(&mut download, &mut writer)
.await
.with_context(|| format!("download initdb.tar.zst at {remote_path:?}"))

View File

@@ -16,7 +16,7 @@ use crate::{
config::PageServerConf,
tenant::remote_timeline_client::{
index::IndexPart, remote_index_path, remote_initdb_archive_path,
remote_initdb_preserved_archive_path, remote_path, upload_cancellable,
remote_initdb_preserved_archive_path, remote_path,
},
};
use remote_storage::{GenericRemoteStorage, TimeTravelError};
@@ -49,16 +49,15 @@ pub(crate) async fn upload_index_part<'a>(
let index_part_bytes = bytes::Bytes::from(index_part_bytes);
let remote_path = remote_index_path(tenant_shard_id, timeline_id, generation);
upload_cancellable(
cancel,
storage.upload_storage_object(
storage
.upload_storage_object(
futures::stream::once(futures::future::ready(Ok(index_part_bytes))),
index_part_size,
&remote_path,
),
)
.await
.with_context(|| format!("upload index part for '{tenant_shard_id} / {timeline_id}'"))
cancel,
)
.await
.with_context(|| format!("upload index part for '{tenant_shard_id} / {timeline_id}'"))
}
/// Attempts to upload given layer files.
@@ -115,11 +114,10 @@ pub(super) async fn upload_timeline_layer<'a>(
let reader = tokio_util::io::ReaderStream::with_capacity(source_file, super::BUFFER_SIZE);
upload_cancellable(cancel, storage.upload(reader, fs_size, &storage_path, None))
storage
.upload(reader, fs_size, &storage_path, None, cancel)
.await
.with_context(|| format!("upload layer from local path '{source_path}'"))?;
Ok(())
.with_context(|| format!("upload layer from local path '{source_path}'"))
}
/// Uploads the given `initdb` data to the remote storage.
@@ -139,12 +137,10 @@ pub(crate) async fn upload_initdb_dir(
let file = tokio_util::io::ReaderStream::with_capacity(initdb_tar_zst, super::BUFFER_SIZE);
let remote_path = remote_initdb_archive_path(tenant_id, timeline_id);
upload_cancellable(
cancel,
storage.upload_storage_object(file, size as usize, &remote_path),
)
.await
.with_context(|| format!("upload initdb dir for '{tenant_id} / {timeline_id}'"))
storage
.upload_storage_object(file, size as usize, &remote_path, cancel)
.await
.with_context(|| format!("upload initdb dir for '{tenant_id} / {timeline_id}'"))
}
pub(crate) async fn preserve_initdb_archive(
@@ -155,7 +151,8 @@ pub(crate) async fn preserve_initdb_archive(
) -> anyhow::Result<()> {
let source_path = remote_initdb_archive_path(tenant_id, timeline_id);
let dest_path = remote_initdb_preserved_archive_path(tenant_id, timeline_id);
upload_cancellable(cancel, storage.copy_object(&source_path, &dest_path))
storage
.copy_object(&source_path, &dest_path, cancel)
.await
.with_context(|| format!("backing up initdb archive for '{tenant_id} / {timeline_id}'"))
}

View File

@@ -523,12 +523,13 @@ impl<'a> TenantDownloader<'a> {
tracing::debug!("Downloading heatmap for secondary tenant",);
let heatmap_path = remote_heatmap_path(tenant_shard_id);
let cancel = &self.secondary_state.cancel;
let heatmap_bytes = backoff::retry(
|| async {
let download = self
.remote_storage
.download(&heatmap_path)
.download(&heatmap_path, cancel)
.await
.map_err(UpdateError::from)?;
let mut heatmap_bytes = Vec::new();
@@ -540,7 +541,7 @@ impl<'a> TenantDownloader<'a> {
FAILED_DOWNLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
"download heatmap",
&self.secondary_state.cancel,
cancel,
)
.await
.ok_or_else(|| UpdateError::Cancelled)

View File

@@ -21,18 +21,17 @@ use futures::Future;
use md5;
use pageserver_api::shard::TenantShardId;
use rand::Rng;
use remote_storage::GenericRemoteStorage;
use remote_storage::{GenericRemoteStorage, TimeoutOrCancel};
use super::{
heatmap::HeatMapTenant,
scheduler::{self, JobGenerator, RunningJob, SchedulingResult, TenantBackgroundJobs},
CommandRequest,
CommandRequest, UploadCommand,
};
use tokio_util::sync::CancellationToken;
use tracing::{info_span, instrument, Instrument};
use utils::{backoff, completion::Barrier, yielding_loop::yielding_loop};
use super::{heatmap::HeatMapTenant, UploadCommand};
pub(super) async fn heatmap_uploader_task(
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
@@ -417,10 +416,10 @@ async fn upload_tenant_heatmap(
|| async {
let bytes = futures::stream::once(futures::future::ready(Ok(bytes.clone())));
remote_storage
.upload_storage_object(bytes, size, &path)
.upload_storage_object(bytes, size, &path, cancel)
.await
},
|_| false,
TimeoutOrCancel::caused_by_cancel,
3,
u32::MAX,
"Uploading heatmap",

View File

@@ -13,7 +13,7 @@ use parquet::{
},
record::RecordWriter,
};
use remote_storage::{GenericRemoteStorage, RemotePath, RemoteStorageConfig};
use remote_storage::{GenericRemoteStorage, RemotePath, RemoteStorageConfig, TimeoutOrCancel};
use tokio::{sync::mpsc, time};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, Span};
@@ -314,20 +314,23 @@ async fn upload_parquet(
let path = RemotePath::from_string(&format!(
"{year:04}/{month:02}/{day:02}/{hour:02}/requests_{id}.parquet"
))?;
let cancel = CancellationToken::new();
backoff::retry(
|| async {
let stream = futures::stream::once(futures::future::ready(Ok(data.clone())));
storage.upload(stream, data.len(), &path, None).await
storage
.upload(stream, data.len(), &path, None, &cancel)
.await
},
|_e| false,
TimeoutOrCancel::caused_by_cancel,
FAILED_UPLOAD_WARN_THRESHOLD,
FAILED_UPLOAD_MAX_RETRIES,
"request_data_upload",
// we don't want cancellation to interrupt here, so we make a dummy cancel token
&CancellationToken::new(),
&cancel,
)
.await
.ok_or_else(|| anyhow::anyhow!("Cancelled"))
.ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
.and_then(|x| x)
.context("request_data_upload")?;
@@ -413,7 +416,8 @@ mod tests {
)
.unwrap(),
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
})
}),
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
})
);
assert_eq!(parquet_upload.parquet_upload_row_group_size, 100);
@@ -466,6 +470,7 @@ mod tests {
) -> Vec<(u64, usize, i64)> {
let remote_storage_config = RemoteStorageConfig {
storage: RemoteStorageKind::LocalFs(tmpdir.to_path_buf()),
timeout: std::time::Duration::from_secs(120),
};
let storage = GenericRemoteStorage::from_config(&remote_storage_config).unwrap();

View File

@@ -511,7 +511,11 @@ async fn backup_object(
let file = tokio_util::io::ReaderStream::with_capacity(file, BUFFER_SIZE);
storage.upload_storage_object(file, size, target_file).await
let cancel = CancellationToken::new();
storage
.upload_storage_object(file, size, target_file, &cancel)
.await
}
pub async fn read_object(
@@ -526,8 +530,10 @@ pub async fn read_object(
info!("segment download about to start from remote path {file_path:?} at offset {offset}");
let cancel = CancellationToken::new();
let download = storage
.download_storage_object(Some((offset, None)), file_path)
.download_storage_object(Some((offset, None)), file_path, &cancel)
.await
.with_context(|| {
format!("Failed to open WAL segment download stream for remote path {file_path:?}")
@@ -559,7 +565,8 @@ pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> {
// Note: listing segments might take a long time if there are many of them.
// We don't currently have http requests timeout cancellation, but if/once
// we have listing should get streaming interface to make progress.
let token = CancellationToken::new(); // not really used
let cancel = CancellationToken::new(); // not really used
backoff::retry(
|| async {
// Do list-delete in batch_size batches to make progress even if there a lot of files.
@@ -567,7 +574,7 @@ pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> {
// I'm not sure deleting while iterating is expected in s3.
loop {
let files = storage
.list_files(Some(&remote_path), Some(batch_size))
.list_files(Some(&remote_path), Some(batch_size), &cancel)
.await?;
if files.is_empty() {
return Ok(()); // done
@@ -580,14 +587,15 @@ pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> {
files.first().unwrap().object_name().unwrap_or(""),
files.last().unwrap().object_name().unwrap_or("")
);
storage.delete_objects(&files).await?;
storage.delete_objects(&files, &cancel).await?;
}
},
// consider TimeoutOrCancel::caused_by_cancel when using cancellation
|_| false,
3,
10,
"executing WAL segments deletion batch",
&token,
&cancel,
)
.await
.ok_or_else(|| anyhow::anyhow!("canceled"))
@@ -617,7 +625,12 @@ pub async fn copy_s3_segments(
let remote_path = RemotePath::new(&relative_dst_path)?;
let files = storage.list_files(Some(&remote_path), None).await?;
let cancel = CancellationToken::new();
let files = storage
.list_files(Some(&remote_path), None, &cancel)
.await?;
let uploaded_segments = &files
.iter()
.filter_map(|file| file.object_name().map(ToOwned::to_owned))
@@ -645,7 +658,7 @@ pub async fn copy_s3_segments(
let from = RemotePath::new(&relative_src_path.join(&segment_name))?;
let to = RemotePath::new(&relative_dst_path.join(&segment_name))?;
storage.copy_object(&from, &to).await?;
storage.copy_object(&from, &to, &cancel).await?;
}
info!(