mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
9 Commits
release-pr
...
problame/r
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2b7bae3da9 | ||
|
|
ed119b6eaf | ||
|
|
7d0e931bcf | ||
|
|
1b4b7369f6 | ||
|
|
593b1674ba | ||
|
|
593797a5f3 | ||
|
|
52c90aef74 | ||
|
|
5f9165f8e6 | ||
|
|
fedea43652 |
@@ -1,4 +1,9 @@
|
||||
use std::{fmt::Debug, num::NonZeroUsize, str::FromStr, time::Duration};
|
||||
use std::{
|
||||
fmt::Debug,
|
||||
num::{NonZeroU32, NonZeroUsize},
|
||||
str::FromStr,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use aws_sdk_s3::types::StorageClass;
|
||||
use camino::Utf8PathBuf;
|
||||
@@ -40,7 +45,10 @@ fn is_default_timeout(d: &Duration) -> bool {
|
||||
pub enum RemoteStorageKind {
|
||||
/// Storage based on local file system.
|
||||
/// Specify a root folder to place all stored files into.
|
||||
LocalFs { local_path: Utf8PathBuf },
|
||||
LocalFs {
|
||||
local_path: Utf8PathBuf,
|
||||
max_keys_per_list_response: Option<NonZeroU32>,
|
||||
},
|
||||
/// AWS S3 based storage, storing all files in the S3 bucket
|
||||
/// specified by the config
|
||||
AwsS3(S3Config),
|
||||
@@ -200,7 +208,8 @@ timeout = '5s'";
|
||||
config,
|
||||
RemoteStorageConfig {
|
||||
storage: RemoteStorageKind::LocalFs {
|
||||
local_path: Utf8PathBuf::from(".")
|
||||
local_path: Utf8PathBuf::from("."),
|
||||
max_keys_per_list_response: None,
|
||||
},
|
||||
timeout: Duration::from_secs(5)
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ mod support;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fmt::Debug,
|
||||
num::NonZeroU32,
|
||||
num::{NonZeroU32, NonZeroUsize},
|
||||
ops::Bound,
|
||||
pin::{pin, Pin},
|
||||
sync::Arc,
|
||||
@@ -33,10 +33,10 @@ use camino::{Utf8Path, Utf8PathBuf};
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::{stream::Stream, StreamExt};
|
||||
use itertools::Itertools as _;
|
||||
use itertools::Itertools;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tokio_util::{either, sync::CancellationToken};
|
||||
use tracing::info;
|
||||
|
||||
pub use self::{
|
||||
@@ -162,7 +162,7 @@ pub struct ListingObject {
|
||||
pub size: u64,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
#[derive(Debug, Default)]
|
||||
pub struct Listing {
|
||||
pub prefixes: Vec<RemotePath>,
|
||||
pub keys: Vec<ListingObject>,
|
||||
@@ -585,9 +585,16 @@ impl GenericRemoteStorage {
|
||||
pub async fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
|
||||
let timeout = storage_config.timeout;
|
||||
Ok(match &storage_config.storage {
|
||||
RemoteStorageKind::LocalFs { local_path: path } => {
|
||||
RemoteStorageKind::LocalFs {
|
||||
local_path: path,
|
||||
max_keys_per_list_response: max_keys_in_list_response,
|
||||
} => {
|
||||
info!("Using fs root '{path}' as a remote storage");
|
||||
Self::LocalFs(LocalFs::new(path.clone(), timeout)?)
|
||||
Self::LocalFs(LocalFs::new(
|
||||
path.clone(),
|
||||
timeout,
|
||||
*max_keys_in_list_response,
|
||||
)?)
|
||||
}
|
||||
RemoteStorageKind::AwsS3(s3_config) => {
|
||||
// The profile and access key id are only printed here for debugging purposes,
|
||||
@@ -699,6 +706,60 @@ impl ConcurrencyLimiter {
|
||||
}
|
||||
}
|
||||
|
||||
impl Listing {
|
||||
/// For testing, slice up [`Self`] into pages with `prefixes.len() + keys.len() <= max_keys`.
|
||||
///
|
||||
/// Also, when paginating, anecdotally, S3 does {prefixes \u keys} | sort | paginate.
|
||||
/// So, do the same here
|
||||
pub fn paginate_like_s3(self, max_keys: Option<NonZeroU32>) -> impl Iterator<Item = Self> {
|
||||
let Some(max_keys) = max_keys else {
|
||||
return itertools::Either::Left(std::iter::once(self));
|
||||
};
|
||||
enum PrefixOrKey {
|
||||
Prefix(RemotePath),
|
||||
Key(ListingObject),
|
||||
}
|
||||
impl PrefixOrKey {
|
||||
fn key(&self) -> impl Ord {
|
||||
match self {
|
||||
Self::Prefix(prefix) => prefix.0.clone(),
|
||||
Self::Key(o) => o.key.0.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut combined: Vec<_> = self
|
||||
.prefixes
|
||||
.into_iter()
|
||||
.map(PrefixOrKey::Prefix)
|
||||
.chain(self.keys.into_iter().map(PrefixOrKey::Key))
|
||||
.collect();
|
||||
combined.sort_by_cached_key(|combined| combined.key());
|
||||
itertools::Either::Right(
|
||||
combined
|
||||
.into_iter()
|
||||
.chunks(
|
||||
usize::try_from(max_keys.get())
|
||||
.expect("true on the architectures we care about"),
|
||||
)
|
||||
.into_iter()
|
||||
.map(|chunk| {
|
||||
let mut prefixes = Vec::new();
|
||||
let mut keys = Vec::new();
|
||||
for item in chunk {
|
||||
match item {
|
||||
PrefixOrKey::Prefix(prefix) => prefixes.push(prefix),
|
||||
PrefixOrKey::Key(key) => keys.push(key),
|
||||
}
|
||||
}
|
||||
Listing { prefixes, keys }
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.into_iter(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -36,12 +36,17 @@ const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp";
|
||||
pub struct LocalFs {
|
||||
storage_root: Utf8PathBuf,
|
||||
timeout: Duration,
|
||||
max_keys_per_list_response: Option<NonZeroU32>,
|
||||
}
|
||||
|
||||
impl LocalFs {
|
||||
/// Attempts to create local FS storage, along with its root directory.
|
||||
/// Storage root will be created (if does not exist) and transformed into an absolute path (if passed as relative).
|
||||
pub fn new(mut storage_root: Utf8PathBuf, timeout: Duration) -> anyhow::Result<Self> {
|
||||
pub fn new(
|
||||
mut storage_root: Utf8PathBuf,
|
||||
timeout: Duration,
|
||||
max_keys_per_list_response: Option<NonZeroU32>,
|
||||
) -> anyhow::Result<Self> {
|
||||
if !storage_root.exists() {
|
||||
std::fs::create_dir_all(&storage_root).with_context(|| {
|
||||
format!("Failed to create all directories in the given root path {storage_root:?}")
|
||||
@@ -56,6 +61,7 @@ impl LocalFs {
|
||||
Ok(Self {
|
||||
storage_root,
|
||||
timeout,
|
||||
max_keys_per_list_response,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -148,10 +154,12 @@ impl LocalFs {
|
||||
// recursively lists all files in a directory,
|
||||
// mirroring the `list_files` for `s3_bucket`
|
||||
async fn list_recursive(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
|
||||
dbg!(&folder);
|
||||
let full_path = match folder {
|
||||
Some(folder) => folder.with_base(&self.storage_root),
|
||||
None => self.storage_root.clone(),
|
||||
};
|
||||
dbg!(&full_path);
|
||||
|
||||
// If we were given a directory, we may use it as our starting point.
|
||||
// Otherwise, we must go up to the first ancestor dir that exists. This is because
|
||||
@@ -205,6 +213,7 @@ impl LocalFs {
|
||||
let full_file_name = cur_folder.join(file_name);
|
||||
if full_file_name.as_str().starts_with(prefix) {
|
||||
let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
|
||||
dbg!(&file_remote_path);
|
||||
files.push(file_remote_path);
|
||||
if full_file_name.is_dir() {
|
||||
directory_queue.push(full_file_name);
|
||||
@@ -328,6 +337,13 @@ impl LocalFs {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn effective_max_keys(&self, max_keys: Option<NonZeroU32>) -> Option<NonZeroU32> {
|
||||
self.max_keys_per_list_response
|
||||
.into_iter()
|
||||
.chain(max_keys.into_iter())
|
||||
.min()
|
||||
}
|
||||
}
|
||||
|
||||
impl RemoteStorage for LocalFs {
|
||||
@@ -338,8 +354,12 @@ impl RemoteStorage for LocalFs {
|
||||
max_keys: Option<NonZeroU32>,
|
||||
cancel: &CancellationToken,
|
||||
) -> impl Stream<Item = Result<Listing, DownloadError>> {
|
||||
let listing = self.list(prefix, mode, max_keys, cancel);
|
||||
futures::stream::once(listing)
|
||||
async_stream::try_stream! {
|
||||
let listing = self.list(prefix, mode, max_keys, cancel).await?;
|
||||
for page in listing.paginate_like_s3(self.effective_max_keys(max_keys)) {
|
||||
yield dbg!(page);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn list(
|
||||
@@ -357,35 +377,34 @@ impl RemoteStorage for LocalFs {
|
||||
.list_recursive(prefix)
|
||||
.await
|
||||
.map_err(DownloadError::Other)?;
|
||||
let objects = keys
|
||||
.into_iter()
|
||||
.filter_map(|k| {
|
||||
let path = k.with_base(&self.storage_root);
|
||||
if path.is_dir() {
|
||||
None
|
||||
} else {
|
||||
Some(ListingObject {
|
||||
key: k.clone(),
|
||||
// LocalFs is just for testing, so just specify a dummy time
|
||||
last_modified: SystemTime::now(),
|
||||
size: 0,
|
||||
})
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
let mut objects = Vec::with_capacity(keys.len());
|
||||
for key in keys {
|
||||
let path = key.with_base(&self.storage_root);
|
||||
let metadata = file_metadata(&path).await?;
|
||||
if metadata.is_dir() {
|
||||
continue;
|
||||
}
|
||||
objects.push(ListingObject {
|
||||
key: key.clone(),
|
||||
last_modified: metadata.modified()?,
|
||||
size: metadata.len(),
|
||||
});
|
||||
}
|
||||
let objects = objects;
|
||||
|
||||
if let ListingMode::NoDelimiter = mode {
|
||||
result.keys = objects;
|
||||
} else {
|
||||
let mut prefixes = HashSet::new();
|
||||
for object in objects {
|
||||
let key = object.key;
|
||||
let key = dbg!(object.key);
|
||||
// If the part after the prefix includes a "/", take only the first part and put it in `prefixes`.
|
||||
let relative_key = if let Some(prefix) = prefix {
|
||||
let mut prefix = prefix.clone();
|
||||
// We only strip the dirname of the prefix, so that when we strip it from the start of keys we
|
||||
// end up with full file/dir names.
|
||||
let prefix_full_local_path = prefix.with_base(&self.storage_root);
|
||||
dbg!(&prefix_full_local_path);
|
||||
let has_slash = prefix.0.to_string().ends_with('/');
|
||||
let strip_prefix = if prefix_full_local_path.is_dir() && has_slash {
|
||||
prefix
|
||||
@@ -393,6 +412,7 @@ impl RemoteStorage for LocalFs {
|
||||
prefix.0.pop();
|
||||
prefix
|
||||
};
|
||||
dbg!(&strip_prefix);
|
||||
|
||||
RemotePath::new(key.strip_prefix(&strip_prefix).unwrap()).unwrap()
|
||||
} else {
|
||||
@@ -400,19 +420,25 @@ impl RemoteStorage for LocalFs {
|
||||
};
|
||||
|
||||
let relative_key = format!("{}", relative_key);
|
||||
dbg!(&relative_key);
|
||||
if relative_key.contains(REMOTE_STORAGE_PREFIX_SEPARATOR) {
|
||||
let first_part = relative_key
|
||||
.split(REMOTE_STORAGE_PREFIX_SEPARATOR)
|
||||
.next()
|
||||
.unwrap()
|
||||
.to_owned();
|
||||
prefixes.insert(first_part);
|
||||
dbg!(&first_part);
|
||||
dbg!(&prefix);
|
||||
prefixes.insert(if let Some(prefix) = prefix {
|
||||
prefix.join(first_part).to_string()
|
||||
} else {
|
||||
first_part
|
||||
});
|
||||
} else {
|
||||
result.keys.push(ListingObject {
|
||||
key: RemotePath::from_string(&relative_key).unwrap(),
|
||||
// LocalFs is just for testing
|
||||
last_modified: SystemTime::now(),
|
||||
size: 0,
|
||||
last_modified: object.last_modified,
|
||||
size: object.size,
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -500,6 +526,9 @@ impl RemoteStorage for LocalFs {
|
||||
let target_path = from.with_base(&self.storage_root);
|
||||
|
||||
let file_metadata = file_metadata(&target_path).await?;
|
||||
if let Some(etag) = &opts.etag {
|
||||
|
||||
}
|
||||
let etag = mock_etag(&file_metadata);
|
||||
|
||||
if opts.etag.as_ref() == Some(&etag) {
|
||||
@@ -711,9 +740,10 @@ mod fs_tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn create_storage() -> anyhow::Result<(LocalFs, CancellationToken)> {
|
||||
pub(crate) fn create_storage() -> anyhow::Result<(LocalFs, CancellationToken)> {
|
||||
let storage_root = tempdir()?.path().to_path_buf();
|
||||
LocalFs::new(storage_root, Duration::from_secs(120)).map(|s| (s, CancellationToken::new()))
|
||||
LocalFs::new(storage_root, Duration::from_secs(120), None)
|
||||
.map(|s| (s, CancellationToken::new()))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -961,7 +991,7 @@ mod fs_tests {
|
||||
);
|
||||
assert_eq!(
|
||||
listing.prefixes,
|
||||
[RemotePath::from_string("parent").unwrap()].to_vec()
|
||||
[RemotePath::from_string("timelines/some_timeline/grandparent/parent").unwrap()].to_vec()
|
||||
);
|
||||
|
||||
// Delimiter and prefix without a trailing slash
|
||||
@@ -976,7 +1006,7 @@ mod fs_tests {
|
||||
assert_eq!(listing.keys, vec![]);
|
||||
assert_eq!(
|
||||
listing.prefixes,
|
||||
[RemotePath::from_string("grandparent").unwrap()].to_vec()
|
||||
[RemotePath::from_string("timelines/some_timeline/grandparent").unwrap()].to_vec()
|
||||
);
|
||||
|
||||
// Delimiter and prefix that's partway through a path component
|
||||
@@ -991,7 +1021,7 @@ mod fs_tests {
|
||||
assert_eq!(listing.keys, vec![]);
|
||||
assert_eq!(
|
||||
listing.prefixes,
|
||||
[RemotePath::from_string("grandparent").unwrap()].to_vec()
|
||||
[RemotePath::from_string("timelines/some_timeline/grandparent").unwrap()].to_vec()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -8,7 +8,7 @@ use bytes::Bytes;
|
||||
use camino::Utf8Path;
|
||||
use futures::stream::Stream;
|
||||
use once_cell::sync::OnceCell;
|
||||
use remote_storage::{Download, GenericRemoteStorage, RemotePath};
|
||||
use remote_storage::{Download, GenericRemoteStorage, ListingObject, RemotePath};
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info};
|
||||
@@ -52,11 +52,26 @@ pub(crate) async fn download_to_vec(dl: Download) -> anyhow::Result<Vec<u8>> {
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub(crate) struct RemoteBlobInfo {
|
||||
pub(crate) path: RemotePath,
|
||||
pub(crate) size: u64,
|
||||
}
|
||||
|
||||
impl From<ListingObject> for RemoteBlobInfo {
|
||||
fn from(listing: ListingObject) -> Self {
|
||||
RemoteBlobInfo {
|
||||
path: listing.key,
|
||||
size: listing.size,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Uploads files `folder{j}/blob{i}.txt`. See test description for more details.
|
||||
pub(crate) async fn upload_simple_remote_data(
|
||||
client: &Arc<GenericRemoteStorage>,
|
||||
upload_tasks_count: usize,
|
||||
) -> ControlFlow<HashSet<RemotePath>, HashSet<RemotePath>> {
|
||||
) -> ControlFlow<HashSet<RemoteBlobInfo>, HashSet<RemoteBlobInfo>> {
|
||||
info!("Creating {upload_tasks_count} remote files");
|
||||
let mut upload_tasks = JoinSet::new();
|
||||
let cancel = CancellationToken::new();
|
||||
@@ -73,12 +88,16 @@ pub(crate) async fn upload_simple_remote_data(
|
||||
.with_context(|| format!("{blob_path:?} to RemotePath conversion"))?;
|
||||
debug!("Creating remote item {i} at path {blob_path:?}");
|
||||
|
||||
let (data, len) = upload_stream(format!("remote blob data {i}").into_bytes().into());
|
||||
let data = format!("remote blob data {i}").into_bytes();
|
||||
let (data, len) = upload_stream(data.into());
|
||||
task_client
|
||||
.upload(data, len, &blob_path, None, &cancel)
|
||||
.await?;
|
||||
|
||||
Ok::<_, anyhow::Error>(blob_path)
|
||||
Ok::<_, anyhow::Error>(RemoteBlobInfo {
|
||||
path: blob_path,
|
||||
size: len as u64,
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
@@ -89,8 +108,8 @@ pub(crate) async fn upload_simple_remote_data(
|
||||
.context("task join failed")
|
||||
.and_then(|task_result| task_result.context("upload task failed"))
|
||||
{
|
||||
Ok(upload_path) => {
|
||||
uploaded_blobs.insert(upload_path);
|
||||
Ok(remote_blob_info) => {
|
||||
uploaded_blobs.insert(remote_blob_info);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Upload task failed: {e:?}");
|
||||
@@ -108,7 +127,7 @@ pub(crate) async fn upload_simple_remote_data(
|
||||
|
||||
pub(crate) async fn cleanup(
|
||||
client: &Arc<GenericRemoteStorage>,
|
||||
objects_to_delete: HashSet<RemotePath>,
|
||||
objects_to_delete: HashSet<RemoteBlobInfo>,
|
||||
) {
|
||||
info!(
|
||||
"Removing {} objects from the remote storage during cleanup",
|
||||
@@ -116,7 +135,11 @@ pub(crate) async fn cleanup(
|
||||
);
|
||||
let cancel = CancellationToken::new();
|
||||
let mut delete_tasks = JoinSet::new();
|
||||
for object_to_delete in objects_to_delete {
|
||||
for RemoteBlobInfo {
|
||||
path: object_to_delete,
|
||||
..
|
||||
} in objects_to_delete
|
||||
{
|
||||
let task_client = Arc::clone(client);
|
||||
let cancel = cancel.clone();
|
||||
delete_tasks.spawn(async move {
|
||||
@@ -140,7 +163,7 @@ pub(crate) async fn cleanup(
|
||||
}
|
||||
pub(crate) struct Uploads {
|
||||
pub(crate) prefixes: HashSet<RemotePath>,
|
||||
pub(crate) blobs: HashSet<RemotePath>,
|
||||
pub(crate) blobs: HashSet<RemoteBlobInfo>,
|
||||
}
|
||||
|
||||
pub(crate) async fn upload_remote_data(
|
||||
@@ -169,7 +192,13 @@ pub(crate) async fn upload_remote_data(
|
||||
.upload(data, data_len, &blob_path, None, &cancel)
|
||||
.await?;
|
||||
|
||||
Ok::<_, anyhow::Error>((blob_prefix, blob_path))
|
||||
anyhow::Ok((
|
||||
blob_prefix,
|
||||
RemoteBlobInfo {
|
||||
path: blob_path,
|
||||
size: data_len as u64,
|
||||
},
|
||||
))
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ use test_context::test_context;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::debug;
|
||||
|
||||
use crate::common::{download_to_vec, upload_stream, wrap_stream};
|
||||
use crate::common::{download_to_vec, upload_stream, wrap_stream, RemoteBlobInfo};
|
||||
|
||||
use super::{
|
||||
MaybeEnabledStorage, MaybeEnabledStorageWithSimpleTestBlobs, MaybeEnabledStorageWithTestBlobs,
|
||||
@@ -85,7 +85,7 @@ async fn pagination_should_work(ctx: &mut MaybeEnabledStorageWithTestBlobs) -> a
|
||||
.collect::<HashSet<_>>();
|
||||
assert_eq!(
|
||||
remote_only_prefixes.len() + missing_uploaded_prefixes.len(), 0,
|
||||
"remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}",
|
||||
"remote storage nested prefixes list mismatches with the uploads.\n\nRemote only prefixes: {remote_only_prefixes:?}\n\nmissing uploaded prefixes: {missing_uploaded_prefixes:?}",
|
||||
);
|
||||
|
||||
// list_streaming
|
||||
@@ -102,6 +102,7 @@ async fn pagination_should_work(ctx: &mut MaybeEnabledStorageWithTestBlobs) -> a
|
||||
let mut segment_max_size = 0;
|
||||
while let Some(st) = nested_remote_prefixes_st.next().await {
|
||||
let st = st?;
|
||||
dbg!(&st.prefixes);
|
||||
segment_max_size = segment_max_size.max(st.prefixes.len());
|
||||
nested_remote_prefixes_combined.extend(st.prefixes.into_iter());
|
||||
segments += 1;
|
||||
@@ -156,7 +157,7 @@ async fn list_no_delimiter_works(
|
||||
.context("client list root files failure")?
|
||||
.keys
|
||||
.into_iter()
|
||||
.map(|o| o.key)
|
||||
.map(RemoteBlobInfo::from)
|
||||
.collect::<HashSet<_>>();
|
||||
assert_eq!(
|
||||
root_files,
|
||||
@@ -183,14 +184,14 @@ async fn list_no_delimiter_works(
|
||||
.context("client list nested files failure")?
|
||||
.keys
|
||||
.into_iter()
|
||||
.map(|o| o.key)
|
||||
.map(RemoteBlobInfo::from)
|
||||
.collect::<HashSet<_>>();
|
||||
let trim_remote_blobs: HashSet<_> = ctx
|
||||
.remote_blobs
|
||||
.iter()
|
||||
.map(|x| x.get_path())
|
||||
.filter(|x| x.starts_with("folder1"))
|
||||
.map(|x| RemotePath::new(x).expect("must be valid path"))
|
||||
.filter(|x| x.path.get_path().starts_with("folder1"))
|
||||
.cloned()
|
||||
.map(RemoteBlobInfo::from)
|
||||
.collect();
|
||||
assert_eq!(
|
||||
nested_remote_files, trim_remote_blobs,
|
||||
@@ -228,7 +229,7 @@ async fn list_partial_prefix(
|
||||
.await?
|
||||
.keys
|
||||
.into_iter()
|
||||
.map(|o| o.key)
|
||||
.map(RemoteBlobInfo::from)
|
||||
.collect();
|
||||
assert_eq!(&objects, &ctx.remote_blobs);
|
||||
|
||||
@@ -258,7 +259,7 @@ async fn list_partial_prefix(
|
||||
.await?
|
||||
.keys
|
||||
.into_iter()
|
||||
.map(|o| o.key)
|
||||
.map(RemoteBlobInfo::from)
|
||||
.collect();
|
||||
assert_eq!(&objects, &ctx.remote_blobs);
|
||||
|
||||
@@ -303,13 +304,14 @@ async fn list_partial_prefix(
|
||||
.await?
|
||||
.keys
|
||||
.into_iter()
|
||||
.map(|o| o.key)
|
||||
.map(RemoteBlobInfo::from)
|
||||
.collect();
|
||||
let expect: HashSet<_> = ctx
|
||||
.remote_blobs
|
||||
.iter()
|
||||
.filter(|o| o.get_path().starts_with("folder2"))
|
||||
.filter(|o| o.path.get_path().starts_with("folder2"))
|
||||
.cloned()
|
||||
.map(RemoteBlobInfo::from)
|
||||
.collect();
|
||||
assert_eq!(&objects, &expect);
|
||||
|
||||
@@ -421,7 +423,7 @@ async fn delete_prefix(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> anyh
|
||||
.await?
|
||||
.keys
|
||||
.into_iter()
|
||||
.map(|o| o.key)
|
||||
.map(RemoteBlobInfo::from)
|
||||
.collect();
|
||||
assert_eq!($expect, listing);
|
||||
}};
|
||||
@@ -444,13 +446,19 @@ async fn delete_prefix(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> anyh
|
||||
|
||||
// Deleting a path which overlaps with an existing object should do nothing. We pick the first
|
||||
// path in the set as our common prefix.
|
||||
let path = expect.iter().next().expect("empty set").clone().join("xyz");
|
||||
let path = expect
|
||||
.iter()
|
||||
.next()
|
||||
.expect("empty set")
|
||||
.path
|
||||
.clone()
|
||||
.join("xyz");
|
||||
test_client.delete_prefix(&path, &cancel).await?;
|
||||
assert_list!(expect);
|
||||
|
||||
// Deleting an exact path should work. We pick the first path in the set.
|
||||
let path = expect.iter().next().expect("empty set").clone();
|
||||
test_client.delete_prefix(&path, &cancel).await?;
|
||||
test_client.delete_prefix(&path.path, &cancel).await?;
|
||||
expect.remove(&path);
|
||||
assert_list!(expect);
|
||||
|
||||
@@ -458,7 +466,7 @@ async fn delete_prefix(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> anyh
|
||||
test_client
|
||||
.delete_prefix(&RemotePath::from_string("folder0/blob_")?, &cancel)
|
||||
.await?;
|
||||
expect.retain(|p| !p.get_path().as_str().starts_with("folder0/"));
|
||||
expect.retain(|RemoteBlobInfo { path: p, .. }| !p.get_path().as_str().starts_with("folder0/"));
|
||||
assert_list!(expect);
|
||||
|
||||
// Deleting a common prefix should delete all objects.
|
||||
|
||||
167
libs/remote_storage/tests/test_localfs.rs
Normal file
167
libs/remote_storage/tests/test_localfs.rs
Normal file
@@ -0,0 +1,167 @@
|
||||
use std::num::NonZeroU32;
|
||||
use std::ops::ControlFlow;
|
||||
use std::sync::Arc;
|
||||
use std::{collections::HashSet, time::Duration};
|
||||
|
||||
use anyhow::Context;
|
||||
use remote_storage::{
|
||||
GenericRemoteStorage, LocalFs, RemotePath, RemoteStorageConfig, RemoteStorageKind,
|
||||
};
|
||||
use test_context::AsyncTestContext;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
|
||||
mod common;
|
||||
|
||||
#[path = "common/tests.rs"]
|
||||
mod tests_localfs;
|
||||
|
||||
use common::{
|
||||
cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data, RemoteBlobInfo,
|
||||
};
|
||||
|
||||
const BASE_PREFIX: &str = "test";
|
||||
|
||||
struct EnabledLocalFs {
|
||||
client: Arc<GenericRemoteStorage>,
|
||||
base_prefix: &'static str,
|
||||
}
|
||||
|
||||
impl EnabledLocalFs {
|
||||
async fn setup(max_keys_per_list_response: Option<u32>) -> Self {
|
||||
let storage_root = camino_tempfile::tempdir()
|
||||
.expect("create tempdir")
|
||||
.path()
|
||||
.to_path_buf();
|
||||
|
||||
let remote_storage_config = RemoteStorageConfig {
|
||||
storage: RemoteStorageKind::LocalFs {
|
||||
local_path: storage_root,
|
||||
max_keys_per_list_response: max_keys_per_list_response.and_then(NonZeroU32::new),
|
||||
},
|
||||
timeout: Duration::from_secs(120),
|
||||
};
|
||||
let client = Arc::new(
|
||||
GenericRemoteStorage::from_config(&remote_storage_config)
|
||||
.await
|
||||
.context("remote storage init")
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
EnabledLocalFs {
|
||||
client,
|
||||
base_prefix: &BASE_PREFIX,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum MaybeEnabledStorage {
|
||||
Enabled(EnabledLocalFs),
|
||||
Disabled,
|
||||
}
|
||||
|
||||
impl AsyncTestContext for MaybeEnabledStorage {
|
||||
async fn setup() -> Self {
|
||||
ensure_logging_ready();
|
||||
Self::Enabled(EnabledLocalFs::setup(None).await)
|
||||
}
|
||||
}
|
||||
|
||||
enum MaybeEnabledStorageWithTestBlobs {
|
||||
Enabled(LocalFsWithTestBlobs),
|
||||
Disabled,
|
||||
UploadsFailed(anyhow::Error, LocalFsWithTestBlobs),
|
||||
}
|
||||
|
||||
struct LocalFsWithTestBlobs {
|
||||
enabled: EnabledLocalFs,
|
||||
remote_prefixes: HashSet<RemotePath>,
|
||||
remote_blobs: HashSet<RemoteBlobInfo>,
|
||||
}
|
||||
|
||||
impl AsyncTestContext for MaybeEnabledStorageWithTestBlobs {
|
||||
async fn setup() -> Self {
|
||||
ensure_logging_ready();
|
||||
|
||||
let max_keys_in_list_response = 10;
|
||||
let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
|
||||
|
||||
let enabled = EnabledLocalFs::setup(Some(max_keys_in_list_response)).await;
|
||||
|
||||
match upload_remote_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
|
||||
ControlFlow::Continue(uploads) => {
|
||||
info!("Remote objects created successfully");
|
||||
|
||||
Self::Enabled(LocalFsWithTestBlobs {
|
||||
enabled,
|
||||
remote_prefixes: uploads.prefixes,
|
||||
remote_blobs: uploads.blobs,
|
||||
})
|
||||
}
|
||||
ControlFlow::Break(uploads) => Self::UploadsFailed(
|
||||
anyhow::anyhow!("One or multiple blobs failed to upload to Azure"),
|
||||
LocalFsWithTestBlobs {
|
||||
enabled,
|
||||
remote_prefixes: uploads.prefixes,
|
||||
remote_blobs: uploads.blobs,
|
||||
},
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
async fn teardown(self) {
|
||||
match self {
|
||||
Self::Disabled => {}
|
||||
Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
|
||||
cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum MaybeEnabledStorageWithSimpleTestBlobs {
|
||||
Enabled(LocalFsWithSimpleTestBlobs),
|
||||
Disabled,
|
||||
UploadsFailed(anyhow::Error, LocalFsWithSimpleTestBlobs),
|
||||
}
|
||||
struct LocalFsWithSimpleTestBlobs {
|
||||
enabled: EnabledLocalFs,
|
||||
remote_blobs: HashSet<RemoteBlobInfo>,
|
||||
}
|
||||
|
||||
impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs {
|
||||
async fn setup() -> Self {
|
||||
ensure_logging_ready();
|
||||
let max_keys_in_list_response = 10;
|
||||
let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
|
||||
|
||||
let enabled = EnabledLocalFs::setup(Some(max_keys_in_list_response)).await;
|
||||
|
||||
match upload_simple_remote_data(&enabled.client, upload_tasks_count).await {
|
||||
ControlFlow::Continue(uploads) => {
|
||||
info!("Remote objects created successfully");
|
||||
|
||||
Self::Enabled(LocalFsWithSimpleTestBlobs {
|
||||
enabled,
|
||||
remote_blobs: uploads,
|
||||
})
|
||||
}
|
||||
ControlFlow::Break(uploads) => Self::UploadsFailed(
|
||||
anyhow::anyhow!("One or multiple blobs failed to upload to Azure"),
|
||||
LocalFsWithSimpleTestBlobs {
|
||||
enabled,
|
||||
remote_blobs: uploads,
|
||||
},
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
async fn teardown(self) {
|
||||
match self {
|
||||
Self::Disabled => {}
|
||||
Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
|
||||
cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -17,7 +17,9 @@ mod common;
|
||||
#[path = "common/tests.rs"]
|
||||
mod tests_azure;
|
||||
|
||||
use common::{cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data};
|
||||
use common::{
|
||||
cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data, RemoteBlobInfo,
|
||||
};
|
||||
|
||||
const ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_AZURE_REMOTE_STORAGE";
|
||||
|
||||
@@ -83,7 +85,7 @@ enum MaybeEnabledStorageWithTestBlobs {
|
||||
struct AzureWithTestBlobs {
|
||||
enabled: EnabledAzure,
|
||||
remote_prefixes: HashSet<RemotePath>,
|
||||
remote_blobs: HashSet<RemotePath>,
|
||||
remote_blobs: HashSet<RemoteBlobInfo>,
|
||||
}
|
||||
|
||||
impl AsyncTestContext for MaybeEnabledStorageWithTestBlobs {
|
||||
@@ -140,7 +142,7 @@ enum MaybeEnabledStorageWithSimpleTestBlobs {
|
||||
}
|
||||
struct AzureWithSimpleTestBlobs {
|
||||
enabled: EnabledAzure,
|
||||
remote_blobs: HashSet<RemotePath>,
|
||||
remote_blobs: HashSet<RemoteBlobInfo>,
|
||||
}
|
||||
|
||||
impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs {
|
||||
|
||||
@@ -26,7 +26,9 @@ mod common;
|
||||
#[path = "common/tests.rs"]
|
||||
mod tests_s3;
|
||||
|
||||
use common::{cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data};
|
||||
use common::{
|
||||
cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data, RemoteBlobInfo,
|
||||
};
|
||||
use utils::backoff;
|
||||
|
||||
const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
|
||||
@@ -260,7 +262,7 @@ enum MaybeEnabledStorageWithTestBlobs {
|
||||
struct S3WithTestBlobs {
|
||||
enabled: EnabledS3,
|
||||
remote_prefixes: HashSet<RemotePath>,
|
||||
remote_blobs: HashSet<RemotePath>,
|
||||
remote_blobs: HashSet<RemoteBlobInfo>,
|
||||
}
|
||||
|
||||
impl AsyncTestContext for MaybeEnabledStorageWithTestBlobs {
|
||||
@@ -317,7 +319,7 @@ enum MaybeEnabledStorageWithSimpleTestBlobs {
|
||||
}
|
||||
struct S3WithSimpleTestBlobs {
|
||||
enabled: EnabledS3,
|
||||
remote_blobs: HashSet<RemotePath>,
|
||||
remote_blobs: HashSet<RemoteBlobInfo>,
|
||||
}
|
||||
|
||||
impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs {
|
||||
|
||||
@@ -841,6 +841,7 @@ mod test {
|
||||
let storage_config = RemoteStorageConfig {
|
||||
storage: RemoteStorageKind::LocalFs {
|
||||
local_path: remote_fs_dir.clone(),
|
||||
max_keys_per_list_response: None,
|
||||
},
|
||||
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
|
||||
};
|
||||
|
||||
@@ -4642,6 +4642,7 @@ pub(crate) mod harness {
|
||||
let config = RemoteStorageConfig {
|
||||
storage: RemoteStorageKind::LocalFs {
|
||||
local_path: remote_fs_dir.clone(),
|
||||
max_keys_per_list_response: None,
|
||||
},
|
||||
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
|
||||
};
|
||||
|
||||
@@ -543,6 +543,7 @@ mod tests {
|
||||
let remote_storage_config = RemoteStorageConfig {
|
||||
storage: RemoteStorageKind::LocalFs {
|
||||
local_path: tmpdir.to_path_buf(),
|
||||
max_keys_per_list_response: None,
|
||||
},
|
||||
timeout: std::time::Duration::from_secs(120),
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user