diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index ac1f8a357e..0e9c237e1e 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -70,6 +70,14 @@ impl RemotePath { pub fn join(&self, segment: &Path) -> Self { Self(self.0.join(segment)) } + + pub fn get_path(&self) -> &PathBuf { + &self.0 + } + + pub fn extension(&self) -> Option<&str> { + self.0.extension()?.to_str() + } } /// Storage (potentially remote) API to manage its state. @@ -86,6 +94,19 @@ pub trait RemoteStorage: Send + Sync + 'static { prefix: Option<&RemotePath>, ) -> Result, DownloadError>; + /// Lists all files in directory "recursively" + /// (not really recursively, because AWS has a flat namespace) + /// Note: This is subtely different than list_prefixes, + /// because it is for listing files instead of listing + /// names sharing common prefixes. + /// For example, + /// list_files("foo/bar") = ["foo/bar/cat123.txt", + /// "foo/bar/cat567.txt", "foo/bar/dog123.txt", "foo/bar/dog456.txt"] + /// whereas, + /// list_prefixes("foo/bar/") = ["cat", "dog"] + /// See `test_real_s3.rs` for more details. + async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result>; + /// Streams the local file contents into remote into the remote storage entry. async fn upload( &self, @@ -174,6 +195,14 @@ impl GenericRemoteStorage { } } + pub async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result> { + match self { + Self::LocalFs(s) => s.list_files(folder).await, + Self::AwsS3(s) => s.list_files(folder).await, + Self::Unreliable(s) => s.list_files(folder).await, + } + } + pub async fn upload( &self, from: impl io::AsyncRead + Unpin + Send + Sync + 'static, diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 59304c2481..ca5fbd5de5 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -48,6 +48,14 @@ impl LocalFs { Ok(Self { storage_root }) } + // mirrors S3Bucket::s3_object_to_relative_path + fn local_file_to_relative_path(&self, key: PathBuf) -> RemotePath { + let relative_path = key + .strip_prefix(&self.storage_root) + .expect("relative path must contain storage_root as prefix"); + RemotePath(relative_path.into()) + } + async fn read_storage_metadata( &self, file_path: &Path, @@ -132,6 +140,34 @@ impl RemoteStorage for LocalFs { Ok(prefixes) } + // recursively lists all files in a directory, + // mirroring the `list_files` for `s3_bucket` + async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result> { + let full_path = match folder { + Some(folder) => folder.with_base(&self.storage_root), + None => self.storage_root.clone(), + }; + let mut files = vec![]; + let mut directory_queue = vec![full_path.clone()]; + + while !directory_queue.is_empty() { + let cur_folder = directory_queue + .pop() + .expect("queue cannot be empty: we just checked"); + let mut entries = fs::read_dir(cur_folder.clone()).await?; + while let Some(entry) = entries.next_entry().await? { + let file_name: PathBuf = entry.file_name().into(); + let full_file_name = cur_folder.clone().join(&file_name); + let file_remote_path = self.local_file_to_relative_path(full_file_name.clone()); + files.push(file_remote_path.clone()); + if full_file_name.is_dir() { + directory_queue.push(full_file_name); + } + } + } + Ok(files) + } + async fn upload( &self, data: impl io::AsyncRead + Unpin + Send + Sync + 'static, diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index dafb6dcb45..43d818dfb9 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -347,6 +347,51 @@ impl RemoteStorage for S3Bucket { Ok(document_keys) } + /// See the doc for `RemoteStorage::list_files` + async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result> { + let folder_name = folder + .map(|p| self.relative_path_to_s3_object(p)) + .or_else(|| self.prefix_in_bucket.clone()); + + // AWS may need to break the response into several parts + let mut continuation_token = None; + let mut all_files = vec![]; + loop { + let _guard = self + .concurrency_limiter + .acquire() + .await + .context("Concurrency limiter semaphore got closed during S3 list_files")?; + metrics::inc_list_objects(); + + let response = self + .client + .list_objects_v2() + .bucket(self.bucket_name.clone()) + .set_prefix(folder_name.clone()) + .set_continuation_token(continuation_token) + .set_max_keys(self.max_keys_per_list_response) + .send() + .await + .map_err(|e| { + metrics::inc_list_objects_fail(); + e + }) + .context("Failed to list files in S3 bucket")?; + + for object in response.contents().unwrap_or_default() { + let object_path = object.key().expect("response does not contain a key"); + let remote_path = self.s3_object_to_relative_path(object_path); + all_files.push(remote_path); + } + match response.next_continuation_token { + Some(new_token) => continuation_token = Some(new_token), + None => break, + } + } + Ok(all_files) + } + async fn upload( &self, from: impl io::AsyncRead + Unpin + Send + Sync + 'static, diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index 741c18bf6f..c46ca14ace 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -83,6 +83,11 @@ impl RemoteStorage for UnreliableWrapper { self.inner.list_prefixes(prefix).await } + async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result> { + self.attempt(RemoteOp::ListPrefixes(folder.cloned()))?; + self.inner.list_files(folder).await + } + async fn upload( &self, data: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static, diff --git a/libs/remote_storage/tests/test_real_s3.rs b/libs/remote_storage/tests/test_real_s3.rs index 5f52b0754c..6fe65a0362 100644 --- a/libs/remote_storage/tests/test_real_s3.rs +++ b/libs/remote_storage/tests/test_real_s3.rs @@ -88,6 +88,58 @@ async fn s3_pagination_should_work(ctx: &mut MaybeEnabledS3WithTestBlobs) -> any Ok(()) } +/// Tests that S3 client can list all files in a folder, even if the response comes paginated and requirees multiple S3 queries. +/// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified. Test will skip real code and pass if env vars not set. +/// See `s3_pagination_should_work` for more information. +/// +/// First, create a set of S3 objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_s3_data`] +/// Then performs the following queries: +/// 1. `list_files(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt` +/// 2. `list_files("folder1")`. This should return all files `random_prefix/folder1/blob_{i}.txt` +#[test_context(MaybeEnabledS3WithSimpleTestBlobs)] +#[tokio::test] +async fn s3_list_files_works(ctx: &mut MaybeEnabledS3WithSimpleTestBlobs) -> anyhow::Result<()> { + let ctx = match ctx { + MaybeEnabledS3WithSimpleTestBlobs::Enabled(ctx) => ctx, + MaybeEnabledS3WithSimpleTestBlobs::Disabled => return Ok(()), + MaybeEnabledS3WithSimpleTestBlobs::UploadsFailed(e, _) => { + anyhow::bail!("S3 init failed: {e:?}") + } + }; + let test_client = Arc::clone(&ctx.enabled.client); + let base_prefix = + RemotePath::new(Path::new("folder1")).context("common_prefix construction")?; + let root_files = test_client + .list_files(None) + .await + .context("client list root files failure")? + .into_iter() + .collect::>(); + assert_eq!( + root_files, + ctx.remote_blobs.clone(), + "remote storage list_files on root mismatches with the uploads." + ); + let nested_remote_files = test_client + .list_files(Some(&base_prefix)) + .await + .context("client list nested files failure")? + .into_iter() + .collect::>(); + let trim_remote_blobs: HashSet<_> = ctx + .remote_blobs + .iter() + .map(|x| x.get_path().to_str().expect("must be valid name")) + .filter(|x| x.starts_with("folder1")) + .map(|x| RemotePath::new(Path::new(x)).expect("must be valid name")) + .collect(); + assert_eq!( + nested_remote_files, trim_remote_blobs, + "remote storage list_files on subdirrectory mismatches with the uploads." + ); + Ok(()) +} + #[test_context(MaybeEnabledS3)] #[tokio::test] async fn s3_delete_non_exising_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> { @@ -248,6 +300,66 @@ impl AsyncTestContext for MaybeEnabledS3WithTestBlobs { } } +// NOTE: the setups for the list_prefixes test and the list_files test are very similar +// However, they are not idential. The list_prefixes function is concerned with listing prefixes, +// whereas the list_files function is concerned with listing files. +// See `RemoteStorage::list_files` documentation for more details +enum MaybeEnabledS3WithSimpleTestBlobs { + Enabled(S3WithSimpleTestBlobs), + Disabled, + UploadsFailed(anyhow::Error, S3WithSimpleTestBlobs), +} +struct S3WithSimpleTestBlobs { + enabled: EnabledS3, + remote_blobs: HashSet, +} + +#[async_trait::async_trait] +impl AsyncTestContext for MaybeEnabledS3WithSimpleTestBlobs { + async fn setup() -> Self { + ensure_logging_ready(); + if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() { + info!( + "`{}` env variable is not set, skipping the test", + ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME + ); + return Self::Disabled; + } + + let max_keys_in_list_response = 10; + let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap()); + + let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await; + + match upload_simple_s3_data(&enabled.client, upload_tasks_count).await { + ControlFlow::Continue(uploads) => { + info!("Remote objects created successfully"); + + Self::Enabled(S3WithSimpleTestBlobs { + enabled, + remote_blobs: uploads, + }) + } + ControlFlow::Break(uploads) => Self::UploadsFailed( + anyhow::anyhow!("One or multiple blobs failed to upload to S3"), + S3WithSimpleTestBlobs { + enabled, + remote_blobs: uploads, + }, + ), + } + } + + async fn teardown(self) { + match self { + Self::Disabled => {} + Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => { + cleanup(&ctx.enabled.client, ctx.remote_blobs).await; + } + } + } +} + fn create_s3_client( max_keys_per_list_response: Option, ) -> anyhow::Result> { @@ -258,7 +370,7 @@ fn create_s3_client( let random_prefix_part = std::time::SystemTime::now() .duration_since(UNIX_EPOCH) .context("random s3 test prefix part calculation")? - .as_millis(); + .as_nanos(); let remote_storage_config = RemoteStorageConfig { max_concurrent_syncs: NonZeroUsize::new(100).unwrap(), max_sync_errors: NonZeroU32::new(5).unwrap(), @@ -364,3 +476,52 @@ async fn cleanup(client: &Arc, objects_to_delete: HashSet< } } } + +// Uploads files `folder{j}/blob{i}.txt`. See test description for more details. +async fn upload_simple_s3_data( + client: &Arc, + upload_tasks_count: usize, +) -> ControlFlow, HashSet> { + info!("Creating {upload_tasks_count} S3 files"); + let mut upload_tasks = JoinSet::new(); + for i in 1..upload_tasks_count + 1 { + let task_client = Arc::clone(client); + upload_tasks.spawn(async move { + let blob_path = PathBuf::from(format!("folder{}/blob_{}.txt", i / 7, i)); + let blob_path = RemotePath::new(&blob_path) + .with_context(|| format!("{blob_path:?} to RemotePath conversion"))?; + debug!("Creating remote item {i} at path {blob_path:?}"); + + let data = format!("remote blob data {i}").into_bytes(); + let data_len = data.len(); + task_client + .upload(std::io::Cursor::new(data), data_len, &blob_path, None) + .await?; + + Ok::<_, anyhow::Error>(blob_path) + }); + } + + let mut upload_tasks_failed = false; + let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count); + while let Some(task_run_result) = upload_tasks.join_next().await { + match task_run_result + .context("task join failed") + .and_then(|task_result| task_result.context("upload task failed")) + { + Ok(upload_path) => { + uploaded_blobs.insert(upload_path); + } + Err(e) => { + error!("Upload task failed: {e:?}"); + upload_tasks_failed = true; + } + } + } + + if upload_tasks_failed { + ControlFlow::Break(uploaded_blobs) + } else { + ControlFlow::Continue(uploaded_blobs) + } +}