Compare commits

...

9 Commits

Author SHA1 Message Date
Christian Schwarz
2b7bae3da9 arrrgh, what a mess this is. and turns out list() semantics wrt max_keys are also misunderstood / different for s3&azure than for localfs 2024-10-25 11:01:00 +02:00
Christian Schwarz
ed119b6eaf start fixing localfs tests 2024-10-25 11:00:52 +02:00
Christian Schwarz
7d0e931bcf most of the common tests pass now 2024-10-25 10:42:43 +02:00
Christian Schwarz
1b4b7369f6 Revert "Revert "remote_storage(local_fs): return correct file sizes""
This reverts commit 52c90aef74.
2024-10-24 20:15:04 +02:00
Christian Schwarz
593b1674ba run common tests for LocalFs
This now fails because of sizes in listings being 0, yay.
2024-10-24 20:14:43 +02:00
Christian Schwarz
593797a5f3 sigh, the generic tests don't cover it 2024-10-24 19:55:15 +02:00
Christian Schwarz
52c90aef74 Revert "remote_storage(local_fs): return correct file sizes"
This reverts commit fedea43652.
2024-10-24 19:54:26 +02:00
Christian Schwarz
5f9165f8e6 tests: check sizes in listing tests 2024-10-24 19:54:11 +02:00
Christian Schwarz
fedea43652 remote_storage(local_fs): return correct file sizes 2024-10-24 19:18:13 +02:00
11 changed files with 380 additions and 69 deletions

View File

@@ -1,4 +1,9 @@
use std::{fmt::Debug, num::NonZeroUsize, str::FromStr, time::Duration};
use std::{
fmt::Debug,
num::{NonZeroU32, NonZeroUsize},
str::FromStr,
time::Duration,
};
use aws_sdk_s3::types::StorageClass;
use camino::Utf8PathBuf;
@@ -40,7 +45,10 @@ fn is_default_timeout(d: &Duration) -> bool {
pub enum RemoteStorageKind {
/// Storage based on local file system.
/// Specify a root folder to place all stored files into.
LocalFs { local_path: Utf8PathBuf },
LocalFs {
local_path: Utf8PathBuf,
max_keys_per_list_response: Option<NonZeroU32>,
},
/// AWS S3 based storage, storing all files in the S3 bucket
/// specified by the config
AwsS3(S3Config),
@@ -200,7 +208,8 @@ timeout = '5s'";
config,
RemoteStorageConfig {
storage: RemoteStorageKind::LocalFs {
local_path: Utf8PathBuf::from(".")
local_path: Utf8PathBuf::from("."),
max_keys_per_list_response: None,
},
timeout: Duration::from_secs(5)
}

View File

@@ -21,7 +21,7 @@ mod support;
use std::{
collections::HashMap,
fmt::Debug,
num::NonZeroU32,
num::{NonZeroU32, NonZeroUsize},
ops::Bound,
pin::{pin, Pin},
sync::Arc,
@@ -33,10 +33,10 @@ use camino::{Utf8Path, Utf8PathBuf};
use bytes::Bytes;
use futures::{stream::Stream, StreamExt};
use itertools::Itertools as _;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;
use tokio_util::{either, sync::CancellationToken};
use tracing::info;
pub use self::{
@@ -162,7 +162,7 @@ pub struct ListingObject {
pub size: u64,
}
#[derive(Default)]
#[derive(Debug, Default)]
pub struct Listing {
pub prefixes: Vec<RemotePath>,
pub keys: Vec<ListingObject>,
@@ -585,9 +585,16 @@ impl GenericRemoteStorage {
pub async fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
let timeout = storage_config.timeout;
Ok(match &storage_config.storage {
RemoteStorageKind::LocalFs { local_path: path } => {
RemoteStorageKind::LocalFs {
local_path: path,
max_keys_per_list_response: max_keys_in_list_response,
} => {
info!("Using fs root '{path}' as a remote storage");
Self::LocalFs(LocalFs::new(path.clone(), timeout)?)
Self::LocalFs(LocalFs::new(
path.clone(),
timeout,
*max_keys_in_list_response,
)?)
}
RemoteStorageKind::AwsS3(s3_config) => {
// The profile and access key id are only printed here for debugging purposes,
@@ -699,6 +706,60 @@ impl ConcurrencyLimiter {
}
}
impl Listing {
/// For testing, slice up [`Self`] into pages with `prefixes.len() + keys.len() <= max_keys`.
///
/// Also, when paginating, anecdotally, S3 does {prefixes \u keys} | sort | paginate.
/// So, do the same here
pub fn paginate_like_s3(self, max_keys: Option<NonZeroU32>) -> impl Iterator<Item = Self> {
let Some(max_keys) = max_keys else {
return itertools::Either::Left(std::iter::once(self));
};
enum PrefixOrKey {
Prefix(RemotePath),
Key(ListingObject),
}
impl PrefixOrKey {
fn key(&self) -> impl Ord {
match self {
Self::Prefix(prefix) => prefix.0.clone(),
Self::Key(o) => o.key.0.clone(),
}
}
}
let mut combined: Vec<_> = self
.prefixes
.into_iter()
.map(PrefixOrKey::Prefix)
.chain(self.keys.into_iter().map(PrefixOrKey::Key))
.collect();
combined.sort_by_cached_key(|combined| combined.key());
itertools::Either::Right(
combined
.into_iter()
.chunks(
usize::try_from(max_keys.get())
.expect("true on the architectures we care about"),
)
.into_iter()
.map(|chunk| {
let mut prefixes = Vec::new();
let mut keys = Vec::new();
for item in chunk {
match item {
PrefixOrKey::Prefix(prefix) => prefixes.push(prefix),
PrefixOrKey::Key(key) => keys.push(key),
}
}
Listing { prefixes, keys }
})
.collect::<Vec<_>>()
.into_iter(),
)
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -36,12 +36,17 @@ const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp";
pub struct LocalFs {
storage_root: Utf8PathBuf,
timeout: Duration,
max_keys_per_list_response: Option<NonZeroU32>,
}
impl LocalFs {
/// Attempts to create local FS storage, along with its root directory.
/// Storage root will be created (if does not exist) and transformed into an absolute path (if passed as relative).
pub fn new(mut storage_root: Utf8PathBuf, timeout: Duration) -> anyhow::Result<Self> {
pub fn new(
mut storage_root: Utf8PathBuf,
timeout: Duration,
max_keys_per_list_response: Option<NonZeroU32>,
) -> anyhow::Result<Self> {
if !storage_root.exists() {
std::fs::create_dir_all(&storage_root).with_context(|| {
format!("Failed to create all directories in the given root path {storage_root:?}")
@@ -56,6 +61,7 @@ impl LocalFs {
Ok(Self {
storage_root,
timeout,
max_keys_per_list_response,
})
}
@@ -148,10 +154,12 @@ impl LocalFs {
// recursively lists all files in a directory,
// mirroring the `list_files` for `s3_bucket`
async fn list_recursive(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
dbg!(&folder);
let full_path = match folder {
Some(folder) => folder.with_base(&self.storage_root),
None => self.storage_root.clone(),
};
dbg!(&full_path);
// If we were given a directory, we may use it as our starting point.
// Otherwise, we must go up to the first ancestor dir that exists. This is because
@@ -205,6 +213,7 @@ impl LocalFs {
let full_file_name = cur_folder.join(file_name);
if full_file_name.as_str().starts_with(prefix) {
let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
dbg!(&file_remote_path);
files.push(file_remote_path);
if full_file_name.is_dir() {
directory_queue.push(full_file_name);
@@ -328,6 +337,13 @@ impl LocalFs {
Ok(())
}
fn effective_max_keys(&self, max_keys: Option<NonZeroU32>) -> Option<NonZeroU32> {
self.max_keys_per_list_response
.into_iter()
.chain(max_keys.into_iter())
.min()
}
}
impl RemoteStorage for LocalFs {
@@ -338,8 +354,12 @@ impl RemoteStorage for LocalFs {
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> impl Stream<Item = Result<Listing, DownloadError>> {
let listing = self.list(prefix, mode, max_keys, cancel);
futures::stream::once(listing)
async_stream::try_stream! {
let listing = self.list(prefix, mode, max_keys, cancel).await?;
for page in listing.paginate_like_s3(self.effective_max_keys(max_keys)) {
yield dbg!(page);
}
}
}
async fn list(
@@ -357,35 +377,34 @@ impl RemoteStorage for LocalFs {
.list_recursive(prefix)
.await
.map_err(DownloadError::Other)?;
let objects = keys
.into_iter()
.filter_map(|k| {
let path = k.with_base(&self.storage_root);
if path.is_dir() {
None
} else {
Some(ListingObject {
key: k.clone(),
// LocalFs is just for testing, so just specify a dummy time
last_modified: SystemTime::now(),
size: 0,
})
}
})
.collect();
let mut objects = Vec::with_capacity(keys.len());
for key in keys {
let path = key.with_base(&self.storage_root);
let metadata = file_metadata(&path).await?;
if metadata.is_dir() {
continue;
}
objects.push(ListingObject {
key: key.clone(),
last_modified: metadata.modified()?,
size: metadata.len(),
});
}
let objects = objects;
if let ListingMode::NoDelimiter = mode {
result.keys = objects;
} else {
let mut prefixes = HashSet::new();
for object in objects {
let key = object.key;
let key = dbg!(object.key);
// If the part after the prefix includes a "/", take only the first part and put it in `prefixes`.
let relative_key = if let Some(prefix) = prefix {
let mut prefix = prefix.clone();
// We only strip the dirname of the prefix, so that when we strip it from the start of keys we
// end up with full file/dir names.
let prefix_full_local_path = prefix.with_base(&self.storage_root);
dbg!(&prefix_full_local_path);
let has_slash = prefix.0.to_string().ends_with('/');
let strip_prefix = if prefix_full_local_path.is_dir() && has_slash {
prefix
@@ -393,6 +412,7 @@ impl RemoteStorage for LocalFs {
prefix.0.pop();
prefix
};
dbg!(&strip_prefix);
RemotePath::new(key.strip_prefix(&strip_prefix).unwrap()).unwrap()
} else {
@@ -400,19 +420,25 @@ impl RemoteStorage for LocalFs {
};
let relative_key = format!("{}", relative_key);
dbg!(&relative_key);
if relative_key.contains(REMOTE_STORAGE_PREFIX_SEPARATOR) {
let first_part = relative_key
.split(REMOTE_STORAGE_PREFIX_SEPARATOR)
.next()
.unwrap()
.to_owned();
prefixes.insert(first_part);
dbg!(&first_part);
dbg!(&prefix);
prefixes.insert(if let Some(prefix) = prefix {
prefix.join(first_part).to_string()
} else {
first_part
});
} else {
result.keys.push(ListingObject {
key: RemotePath::from_string(&relative_key).unwrap(),
// LocalFs is just for testing
last_modified: SystemTime::now(),
size: 0,
last_modified: object.last_modified,
size: object.size,
});
}
}
@@ -500,6 +526,9 @@ impl RemoteStorage for LocalFs {
let target_path = from.with_base(&self.storage_root);
let file_metadata = file_metadata(&target_path).await?;
if let Some(etag) = &opts.etag {
}
let etag = mock_etag(&file_metadata);
if opts.etag.as_ref() == Some(&etag) {
@@ -711,9 +740,10 @@ mod fs_tests {
Ok(())
}
fn create_storage() -> anyhow::Result<(LocalFs, CancellationToken)> {
pub(crate) fn create_storage() -> anyhow::Result<(LocalFs, CancellationToken)> {
let storage_root = tempdir()?.path().to_path_buf();
LocalFs::new(storage_root, Duration::from_secs(120)).map(|s| (s, CancellationToken::new()))
LocalFs::new(storage_root, Duration::from_secs(120), None)
.map(|s| (s, CancellationToken::new()))
}
#[tokio::test]
@@ -961,7 +991,7 @@ mod fs_tests {
);
assert_eq!(
listing.prefixes,
[RemotePath::from_string("parent").unwrap()].to_vec()
[RemotePath::from_string("timelines/some_timeline/grandparent/parent").unwrap()].to_vec()
);
// Delimiter and prefix without a trailing slash
@@ -976,7 +1006,7 @@ mod fs_tests {
assert_eq!(listing.keys, vec![]);
assert_eq!(
listing.prefixes,
[RemotePath::from_string("grandparent").unwrap()].to_vec()
[RemotePath::from_string("timelines/some_timeline/grandparent").unwrap()].to_vec()
);
// Delimiter and prefix that's partway through a path component
@@ -991,7 +1021,7 @@ mod fs_tests {
assert_eq!(listing.keys, vec![]);
assert_eq!(
listing.prefixes,
[RemotePath::from_string("grandparent").unwrap()].to_vec()
[RemotePath::from_string("timelines/some_timeline/grandparent").unwrap()].to_vec()
);
Ok(())

View File

@@ -8,7 +8,7 @@ use bytes::Bytes;
use camino::Utf8Path;
use futures::stream::Stream;
use once_cell::sync::OnceCell;
use remote_storage::{Download, GenericRemoteStorage, RemotePath};
use remote_storage::{Download, GenericRemoteStorage, ListingObject, RemotePath};
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info};
@@ -52,11 +52,26 @@ pub(crate) async fn download_to_vec(dl: Download) -> anyhow::Result<Vec<u8>> {
Ok(buf)
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub(crate) struct RemoteBlobInfo {
pub(crate) path: RemotePath,
pub(crate) size: u64,
}
impl From<ListingObject> for RemoteBlobInfo {
fn from(listing: ListingObject) -> Self {
RemoteBlobInfo {
path: listing.key,
size: listing.size,
}
}
}
// Uploads files `folder{j}/blob{i}.txt`. See test description for more details.
pub(crate) async fn upload_simple_remote_data(
client: &Arc<GenericRemoteStorage>,
upload_tasks_count: usize,
) -> ControlFlow<HashSet<RemotePath>, HashSet<RemotePath>> {
) -> ControlFlow<HashSet<RemoteBlobInfo>, HashSet<RemoteBlobInfo>> {
info!("Creating {upload_tasks_count} remote files");
let mut upload_tasks = JoinSet::new();
let cancel = CancellationToken::new();
@@ -73,12 +88,16 @@ pub(crate) async fn upload_simple_remote_data(
.with_context(|| format!("{blob_path:?} to RemotePath conversion"))?;
debug!("Creating remote item {i} at path {blob_path:?}");
let (data, len) = upload_stream(format!("remote blob data {i}").into_bytes().into());
let data = format!("remote blob data {i}").into_bytes();
let (data, len) = upload_stream(data.into());
task_client
.upload(data, len, &blob_path, None, &cancel)
.await?;
Ok::<_, anyhow::Error>(blob_path)
Ok::<_, anyhow::Error>(RemoteBlobInfo {
path: blob_path,
size: len as u64,
})
});
}
@@ -89,8 +108,8 @@ pub(crate) async fn upload_simple_remote_data(
.context("task join failed")
.and_then(|task_result| task_result.context("upload task failed"))
{
Ok(upload_path) => {
uploaded_blobs.insert(upload_path);
Ok(remote_blob_info) => {
uploaded_blobs.insert(remote_blob_info);
}
Err(e) => {
error!("Upload task failed: {e:?}");
@@ -108,7 +127,7 @@ pub(crate) async fn upload_simple_remote_data(
pub(crate) async fn cleanup(
client: &Arc<GenericRemoteStorage>,
objects_to_delete: HashSet<RemotePath>,
objects_to_delete: HashSet<RemoteBlobInfo>,
) {
info!(
"Removing {} objects from the remote storage during cleanup",
@@ -116,7 +135,11 @@ pub(crate) async fn cleanup(
);
let cancel = CancellationToken::new();
let mut delete_tasks = JoinSet::new();
for object_to_delete in objects_to_delete {
for RemoteBlobInfo {
path: object_to_delete,
..
} in objects_to_delete
{
let task_client = Arc::clone(client);
let cancel = cancel.clone();
delete_tasks.spawn(async move {
@@ -140,7 +163,7 @@ pub(crate) async fn cleanup(
}
pub(crate) struct Uploads {
pub(crate) prefixes: HashSet<RemotePath>,
pub(crate) blobs: HashSet<RemotePath>,
pub(crate) blobs: HashSet<RemoteBlobInfo>,
}
pub(crate) async fn upload_remote_data(
@@ -169,7 +192,13 @@ pub(crate) async fn upload_remote_data(
.upload(data, data_len, &blob_path, None, &cancel)
.await?;
Ok::<_, anyhow::Error>((blob_prefix, blob_path))
anyhow::Ok((
blob_prefix,
RemoteBlobInfo {
path: blob_path,
size: data_len as u64,
},
))
});
}

View File

@@ -9,7 +9,7 @@ use test_context::test_context;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use crate::common::{download_to_vec, upload_stream, wrap_stream};
use crate::common::{download_to_vec, upload_stream, wrap_stream, RemoteBlobInfo};
use super::{
MaybeEnabledStorage, MaybeEnabledStorageWithSimpleTestBlobs, MaybeEnabledStorageWithTestBlobs,
@@ -85,7 +85,7 @@ async fn pagination_should_work(ctx: &mut MaybeEnabledStorageWithTestBlobs) -> a
.collect::<HashSet<_>>();
assert_eq!(
remote_only_prefixes.len() + missing_uploaded_prefixes.len(), 0,
"remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}",
"remote storage nested prefixes list mismatches with the uploads.\n\nRemote only prefixes: {remote_only_prefixes:?}\n\nmissing uploaded prefixes: {missing_uploaded_prefixes:?}",
);
// list_streaming
@@ -102,6 +102,7 @@ async fn pagination_should_work(ctx: &mut MaybeEnabledStorageWithTestBlobs) -> a
let mut segment_max_size = 0;
while let Some(st) = nested_remote_prefixes_st.next().await {
let st = st?;
dbg!(&st.prefixes);
segment_max_size = segment_max_size.max(st.prefixes.len());
nested_remote_prefixes_combined.extend(st.prefixes.into_iter());
segments += 1;
@@ -156,7 +157,7 @@ async fn list_no_delimiter_works(
.context("client list root files failure")?
.keys
.into_iter()
.map(|o| o.key)
.map(RemoteBlobInfo::from)
.collect::<HashSet<_>>();
assert_eq!(
root_files,
@@ -183,14 +184,14 @@ async fn list_no_delimiter_works(
.context("client list nested files failure")?
.keys
.into_iter()
.map(|o| o.key)
.map(RemoteBlobInfo::from)
.collect::<HashSet<_>>();
let trim_remote_blobs: HashSet<_> = ctx
.remote_blobs
.iter()
.map(|x| x.get_path())
.filter(|x| x.starts_with("folder1"))
.map(|x| RemotePath::new(x).expect("must be valid path"))
.filter(|x| x.path.get_path().starts_with("folder1"))
.cloned()
.map(RemoteBlobInfo::from)
.collect();
assert_eq!(
nested_remote_files, trim_remote_blobs,
@@ -228,7 +229,7 @@ async fn list_partial_prefix(
.await?
.keys
.into_iter()
.map(|o| o.key)
.map(RemoteBlobInfo::from)
.collect();
assert_eq!(&objects, &ctx.remote_blobs);
@@ -258,7 +259,7 @@ async fn list_partial_prefix(
.await?
.keys
.into_iter()
.map(|o| o.key)
.map(RemoteBlobInfo::from)
.collect();
assert_eq!(&objects, &ctx.remote_blobs);
@@ -303,13 +304,14 @@ async fn list_partial_prefix(
.await?
.keys
.into_iter()
.map(|o| o.key)
.map(RemoteBlobInfo::from)
.collect();
let expect: HashSet<_> = ctx
.remote_blobs
.iter()
.filter(|o| o.get_path().starts_with("folder2"))
.filter(|o| o.path.get_path().starts_with("folder2"))
.cloned()
.map(RemoteBlobInfo::from)
.collect();
assert_eq!(&objects, &expect);
@@ -421,7 +423,7 @@ async fn delete_prefix(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> anyh
.await?
.keys
.into_iter()
.map(|o| o.key)
.map(RemoteBlobInfo::from)
.collect();
assert_eq!($expect, listing);
}};
@@ -444,13 +446,19 @@ async fn delete_prefix(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> anyh
// Deleting a path which overlaps with an existing object should do nothing. We pick the first
// path in the set as our common prefix.
let path = expect.iter().next().expect("empty set").clone().join("xyz");
let path = expect
.iter()
.next()
.expect("empty set")
.path
.clone()
.join("xyz");
test_client.delete_prefix(&path, &cancel).await?;
assert_list!(expect);
// Deleting an exact path should work. We pick the first path in the set.
let path = expect.iter().next().expect("empty set").clone();
test_client.delete_prefix(&path, &cancel).await?;
test_client.delete_prefix(&path.path, &cancel).await?;
expect.remove(&path);
assert_list!(expect);
@@ -458,7 +466,7 @@ async fn delete_prefix(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> anyh
test_client
.delete_prefix(&RemotePath::from_string("folder0/blob_")?, &cancel)
.await?;
expect.retain(|p| !p.get_path().as_str().starts_with("folder0/"));
expect.retain(|RemoteBlobInfo { path: p, .. }| !p.get_path().as_str().starts_with("folder0/"));
assert_list!(expect);
// Deleting a common prefix should delete all objects.

View File

@@ -0,0 +1,167 @@
use std::num::NonZeroU32;
use std::ops::ControlFlow;
use std::sync::Arc;
use std::{collections::HashSet, time::Duration};
use anyhow::Context;
use remote_storage::{
GenericRemoteStorage, LocalFs, RemotePath, RemoteStorageConfig, RemoteStorageKind,
};
use test_context::AsyncTestContext;
use tokio_util::sync::CancellationToken;
use tracing::info;
mod common;
#[path = "common/tests.rs"]
mod tests_localfs;
use common::{
cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data, RemoteBlobInfo,
};
const BASE_PREFIX: &str = "test";
struct EnabledLocalFs {
client: Arc<GenericRemoteStorage>,
base_prefix: &'static str,
}
impl EnabledLocalFs {
async fn setup(max_keys_per_list_response: Option<u32>) -> Self {
let storage_root = camino_tempfile::tempdir()
.expect("create tempdir")
.path()
.to_path_buf();
let remote_storage_config = RemoteStorageConfig {
storage: RemoteStorageKind::LocalFs {
local_path: storage_root,
max_keys_per_list_response: max_keys_per_list_response.and_then(NonZeroU32::new),
},
timeout: Duration::from_secs(120),
};
let client = Arc::new(
GenericRemoteStorage::from_config(&remote_storage_config)
.await
.context("remote storage init")
.unwrap(),
);
EnabledLocalFs {
client,
base_prefix: &BASE_PREFIX,
}
}
}
enum MaybeEnabledStorage {
Enabled(EnabledLocalFs),
Disabled,
}
impl AsyncTestContext for MaybeEnabledStorage {
async fn setup() -> Self {
ensure_logging_ready();
Self::Enabled(EnabledLocalFs::setup(None).await)
}
}
enum MaybeEnabledStorageWithTestBlobs {
Enabled(LocalFsWithTestBlobs),
Disabled,
UploadsFailed(anyhow::Error, LocalFsWithTestBlobs),
}
struct LocalFsWithTestBlobs {
enabled: EnabledLocalFs,
remote_prefixes: HashSet<RemotePath>,
remote_blobs: HashSet<RemoteBlobInfo>,
}
impl AsyncTestContext for MaybeEnabledStorageWithTestBlobs {
async fn setup() -> Self {
ensure_logging_ready();
let max_keys_in_list_response = 10;
let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
let enabled = EnabledLocalFs::setup(Some(max_keys_in_list_response)).await;
match upload_remote_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
ControlFlow::Continue(uploads) => {
info!("Remote objects created successfully");
Self::Enabled(LocalFsWithTestBlobs {
enabled,
remote_prefixes: uploads.prefixes,
remote_blobs: uploads.blobs,
})
}
ControlFlow::Break(uploads) => Self::UploadsFailed(
anyhow::anyhow!("One or multiple blobs failed to upload to Azure"),
LocalFsWithTestBlobs {
enabled,
remote_prefixes: uploads.prefixes,
remote_blobs: uploads.blobs,
},
),
}
}
async fn teardown(self) {
match self {
Self::Disabled => {}
Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
}
}
}
}
enum MaybeEnabledStorageWithSimpleTestBlobs {
Enabled(LocalFsWithSimpleTestBlobs),
Disabled,
UploadsFailed(anyhow::Error, LocalFsWithSimpleTestBlobs),
}
struct LocalFsWithSimpleTestBlobs {
enabled: EnabledLocalFs,
remote_blobs: HashSet<RemoteBlobInfo>,
}
impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs {
async fn setup() -> Self {
ensure_logging_ready();
let max_keys_in_list_response = 10;
let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
let enabled = EnabledLocalFs::setup(Some(max_keys_in_list_response)).await;
match upload_simple_remote_data(&enabled.client, upload_tasks_count).await {
ControlFlow::Continue(uploads) => {
info!("Remote objects created successfully");
Self::Enabled(LocalFsWithSimpleTestBlobs {
enabled,
remote_blobs: uploads,
})
}
ControlFlow::Break(uploads) => Self::UploadsFailed(
anyhow::anyhow!("One or multiple blobs failed to upload to Azure"),
LocalFsWithSimpleTestBlobs {
enabled,
remote_blobs: uploads,
},
),
}
}
async fn teardown(self) {
match self {
Self::Disabled => {}
Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
}
}
}
}

View File

@@ -17,7 +17,9 @@ mod common;
#[path = "common/tests.rs"]
mod tests_azure;
use common::{cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data};
use common::{
cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data, RemoteBlobInfo,
};
const ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_AZURE_REMOTE_STORAGE";
@@ -83,7 +85,7 @@ enum MaybeEnabledStorageWithTestBlobs {
struct AzureWithTestBlobs {
enabled: EnabledAzure,
remote_prefixes: HashSet<RemotePath>,
remote_blobs: HashSet<RemotePath>,
remote_blobs: HashSet<RemoteBlobInfo>,
}
impl AsyncTestContext for MaybeEnabledStorageWithTestBlobs {
@@ -140,7 +142,7 @@ enum MaybeEnabledStorageWithSimpleTestBlobs {
}
struct AzureWithSimpleTestBlobs {
enabled: EnabledAzure,
remote_blobs: HashSet<RemotePath>,
remote_blobs: HashSet<RemoteBlobInfo>,
}
impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs {

View File

@@ -26,7 +26,9 @@ mod common;
#[path = "common/tests.rs"]
mod tests_s3;
use common::{cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data};
use common::{
cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data, RemoteBlobInfo,
};
use utils::backoff;
const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
@@ -260,7 +262,7 @@ enum MaybeEnabledStorageWithTestBlobs {
struct S3WithTestBlobs {
enabled: EnabledS3,
remote_prefixes: HashSet<RemotePath>,
remote_blobs: HashSet<RemotePath>,
remote_blobs: HashSet<RemoteBlobInfo>,
}
impl AsyncTestContext for MaybeEnabledStorageWithTestBlobs {
@@ -317,7 +319,7 @@ enum MaybeEnabledStorageWithSimpleTestBlobs {
}
struct S3WithSimpleTestBlobs {
enabled: EnabledS3,
remote_blobs: HashSet<RemotePath>,
remote_blobs: HashSet<RemoteBlobInfo>,
}
impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs {

View File

@@ -841,6 +841,7 @@ mod test {
let storage_config = RemoteStorageConfig {
storage: RemoteStorageKind::LocalFs {
local_path: remote_fs_dir.clone(),
max_keys_per_list_response: None,
},
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
};

View File

@@ -4642,6 +4642,7 @@ pub(crate) mod harness {
let config = RemoteStorageConfig {
storage: RemoteStorageKind::LocalFs {
local_path: remote_fs_dir.clone(),
max_keys_per_list_response: None,
},
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
};

View File

@@ -543,6 +543,7 @@ mod tests {
let remote_storage_config = RemoteStorageConfig {
storage: RemoteStorageKind::LocalFs {
local_path: tmpdir.to_path_buf(),
max_keys_per_list_response: None,
},
timeout: std::time::Duration::from_secs(120),
};