diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index e708854be2..14c391ca53 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -134,6 +134,11 @@ impl RemotePath { pub fn strip_prefix(&self, p: &RemotePath) -> Result<&Utf8Path, std::path::StripPrefixError> { self.0.strip_prefix(&p.0) } + + pub fn add_trailing_slash(&self) -> Self { + // Unwrap safety inputs are guararnteed to be valid UTF-8 + Self(format!("{}/", self.0).try_into().unwrap()) + } } /// We don't need callers to be able to pass arbitrary delimiters: just control @@ -157,47 +162,21 @@ pub struct Listing { /// providing basic CRUD operations for storage files. #[allow(async_fn_in_trait)] pub trait RemoteStorage: Send + Sync + 'static { - /// Lists all top level subdirectories for a given prefix - /// Note: here we assume that if the prefix is passed it was obtained via remote_object_id - /// which already takes into account any kind of global prefix (prefix_in_bucket for S3 or storage_root for LocalFS) - /// so this method doesnt need to. - async fn list_prefixes( - &self, - prefix: Option<&RemotePath>, - cancel: &CancellationToken, - ) -> Result, DownloadError> { - let result = self - .list(prefix, ListingMode::WithDelimiter, None, cancel) - .await? - .prefixes; - Ok(result) - } - /// Lists all files in directory "recursively" - /// (not really recursively, because AWS has a flat namespace) - /// Note: This is subtely different than list_prefixes, - /// because it is for listing files instead of listing - /// names sharing common prefixes. - /// For example, - /// list_files("foo/bar") = ["foo/bar/cat123.txt", - /// "foo/bar/cat567.txt", "foo/bar/dog123.txt", "foo/bar/dog456.txt"] - /// whereas, - /// list_prefixes("foo/bar/") = ["cat", "dog"] - /// See `test_real_s3.rs` for more details. + /// List objects in remote storage, with semantics matching AWS S3's ListObjectsV2. + /// (see ``) + /// + /// Note that the prefix is relative to any `prefix_in_bucket` configured for the client, not + /// from the absolute root of the bucket. + /// + /// `mode` configures whether to use a delimiter. Without a delimiter all keys + /// within the prefix are listed in the `keys` of the result. With a delimiter, any "directories" at the top level of + /// the prefix are returned in the `prefixes` of the result, and keys in the top level of the prefix are + /// returned in `keys` (). + /// + /// `max_keys` controls the maximum number of keys that will be returned. If this is None, this function + /// will iteratively call listobjects until it runs out of keys. Note that this is not safe to use on + /// unlimted size buckets, as the full list of objects is allocated into a monolithic data structure. /// - /// max_keys limits max number of keys returned; None means unlimited. - async fn list_files( - &self, - prefix: Option<&RemotePath>, - max_keys: Option, - cancel: &CancellationToken, - ) -> Result, DownloadError> { - let result = self - .list(prefix, ListingMode::NoDelimiter, max_keys, cancel) - .await? - .keys; - Ok(result) - } - async fn list( &self, prefix: Option<&RemotePath>, @@ -336,41 +315,6 @@ impl GenericRemoteStorage> { } } - // A function for listing all the files in a "directory" - // Example: - // list_files("foo/bar") = ["foo/bar/a.txt", "foo/bar/b.txt"] - // - // max_keys limits max number of keys returned; None means unlimited. - pub async fn list_files( - &self, - folder: Option<&RemotePath>, - max_keys: Option, - cancel: &CancellationToken, - ) -> Result, DownloadError> { - match self { - Self::LocalFs(s) => s.list_files(folder, max_keys, cancel).await, - Self::AwsS3(s) => s.list_files(folder, max_keys, cancel).await, - Self::AzureBlob(s) => s.list_files(folder, max_keys, cancel).await, - Self::Unreliable(s) => s.list_files(folder, max_keys, cancel).await, - } - } - - // lists common *prefixes*, if any of files - // Example: - // list_prefixes("foo123","foo567","bar123","bar432") = ["foo", "bar"] - pub async fn list_prefixes( - &self, - prefix: Option<&RemotePath>, - cancel: &CancellationToken, - ) -> Result, DownloadError> { - match self { - Self::LocalFs(s) => s.list_prefixes(prefix, cancel).await, - Self::AwsS3(s) => s.list_prefixes(prefix, cancel).await, - Self::AzureBlob(s) => s.list_prefixes(prefix, cancel).await, - Self::Unreliable(s) => s.list_prefixes(prefix, cancel).await, - } - } - /// See [`RemoteStorage::upload`] pub async fn upload( &self, diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 8cad863731..1f7bcfc982 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -5,11 +5,9 @@ //! volume is mounted to the local FS. use std::{ - borrow::Cow, - future::Future, + collections::HashSet, io::ErrorKind, num::NonZeroU32, - pin::Pin, time::{Duration, SystemTime, UNIX_EPOCH}, }; @@ -22,11 +20,11 @@ use tokio::{ io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}, }; use tokio_util::{io::ReaderStream, sync::CancellationToken}; -use tracing::*; -use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty}; +use utils::crashsafe::path_with_suffix_extension; use crate::{ Download, DownloadError, Listing, ListingMode, RemotePath, TimeTravelError, TimeoutOrCancel, + REMOTE_STORAGE_PREFIX_SEPARATOR, }; use super::{RemoteStorage, StorageMetadata}; @@ -93,7 +91,47 @@ impl LocalFs { #[cfg(test)] async fn list_all(&self) -> anyhow::Result> { - Ok(get_all_files(&self.storage_root, true) + use std::{future::Future, pin::Pin}; + fn get_all_files<'a, P>( + directory_path: P, + ) -> Pin>> + Send + Sync + 'a>> + where + P: AsRef + Send + Sync + 'a, + { + Box::pin(async move { + let directory_path = directory_path.as_ref(); + if directory_path.exists() { + if directory_path.is_dir() { + let mut paths = Vec::new(); + let mut dir_contents = fs::read_dir(directory_path).await?; + while let Some(dir_entry) = dir_contents.next_entry().await? { + let file_type = dir_entry.file_type().await?; + let entry_path = + Utf8PathBuf::from_path_buf(dir_entry.path()).map_err(|pb| { + anyhow::Error::msg(format!( + "non-Unicode path: {}", + pb.to_string_lossy() + )) + })?; + if file_type.is_symlink() { + tracing::debug!("{entry_path:?} is a symlink, skipping") + } else if file_type.is_dir() { + paths.extend(get_all_files(&entry_path).await?.into_iter()) + } else { + paths.push(entry_path); + } + } + Ok(paths) + } else { + bail!("Path {directory_path:?} is not a directory") + } + } else { + Ok(Vec::new()) + } + }) + } + + Ok(get_all_files(&self.storage_root) .await? .into_iter() .map(|path| { @@ -120,6 +158,14 @@ impl LocalFs { // S3 object list prefixes can be arbitrary strings, but when reading // the local filesystem we need a directory to start calling read_dir on. let mut initial_dir = full_path.clone(); + + // If there's no trailing slash, we have to start looking from one above: even if + // `initial_dir` is a directory, we should still list any prefixes in the parent + // that start with the same string. + if !full_path.to_string().ends_with('/') { + initial_dir.pop(); + } + loop { // Did we make it to the root? if initial_dir.parent().is_none() { @@ -295,61 +341,66 @@ impl RemoteStorage for LocalFs { let op = async { let mut result = Listing::default(); - if let ListingMode::NoDelimiter = mode { - let keys = self - .list_recursive(prefix) - .await - .map_err(DownloadError::Other)?; - - result.keys = keys - .into_iter() - .filter(|k| { - let path = k.with_base(&self.storage_root); - !path.is_dir() - }) - .collect(); - - if let Some(max_keys) = max_keys { - result.keys.truncate(max_keys.get() as usize); - } - - return Ok(result); - } - - let path = match prefix { - Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)), - None => Cow::Borrowed(&self.storage_root), - }; - - let prefixes_to_filter = get_all_files(path.as_ref(), false) + // Filter out directories: in S3 directories don't exist, only the keys within them do. + let keys = self + .list_recursive(prefix) .await .map_err(DownloadError::Other)?; + let keys = keys + .into_iter() + .filter(|k| { + let path = k.with_base(&self.storage_root); + !path.is_dir() + }) + .collect(); - // filter out empty directories to mirror s3 behavior. - for prefix in prefixes_to_filter { - if prefix.is_dir() - && is_directory_empty(&prefix) - .await - .map_err(DownloadError::Other)? - { - continue; - } - - let stripped = prefix - .strip_prefix(&self.storage_root) - .context("Failed to strip prefix") - .and_then(RemotePath::new) - .expect( - "We list files for storage root, hence should be able to remote the prefix", - ); - - if prefix.is_dir() { - result.prefixes.push(stripped); - } else { - result.keys.push(stripped); + if let ListingMode::NoDelimiter = mode { + result.keys = keys; + } else { + let mut prefixes = HashSet::new(); + for key in keys { + // If the part after the prefix includes a "/", take only the first part and put it in `prefixes`. + let relative_key = if let Some(prefix) = prefix { + let mut prefix = prefix.clone(); + // We only strip the dirname of the prefix, so that when we strip it from the start of keys we + // end up with full file/dir names. + let prefix_full_local_path = prefix.with_base(&self.storage_root); + let has_slash = prefix.0.to_string().ends_with('/'); + let strip_prefix = if prefix_full_local_path.is_dir() && has_slash { + prefix + } else { + prefix.0.pop(); + prefix + }; + + RemotePath::new(key.strip_prefix(&strip_prefix).unwrap()).unwrap() + } else { + key + }; + + let relative_key = format!("{}", relative_key); + if relative_key.contains(REMOTE_STORAGE_PREFIX_SEPARATOR) { + let first_part = relative_key + .split(REMOTE_STORAGE_PREFIX_SEPARATOR) + .next() + .unwrap() + .to_owned(); + prefixes.insert(first_part); + } else { + result + .keys + .push(RemotePath::from_string(&relative_key).unwrap()); + } } + result.prefixes = prefixes + .into_iter() + .map(|s| RemotePath::from_string(&s).unwrap()) + .collect(); } + if let Some(max_keys) = max_keys { + result.keys.truncate(max_keys.get() as usize); + } Ok(result) }; @@ -560,50 +611,6 @@ fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf { path_with_suffix_extension(original_path, "metadata") } -fn get_all_files<'a, P>( - directory_path: P, - recursive: bool, -) -> Pin>> + Send + Sync + 'a>> -where - P: AsRef + Send + Sync + 'a, -{ - Box::pin(async move { - let directory_path = directory_path.as_ref(); - if directory_path.exists() { - if directory_path.is_dir() { - let mut paths = Vec::new(); - let mut dir_contents = fs::read_dir(directory_path).await?; - while let Some(dir_entry) = dir_contents.next_entry().await? { - let file_type = dir_entry.file_type().await?; - let entry_path = - Utf8PathBuf::from_path_buf(dir_entry.path()).map_err(|pb| { - anyhow::Error::msg(format!( - "non-Unicode path: {}", - pb.to_string_lossy() - )) - })?; - if file_type.is_symlink() { - debug!("{entry_path:?} is a symlink, skipping") - } else if file_type.is_dir() { - if recursive { - paths.extend(get_all_files(&entry_path, true).await?.into_iter()) - } else { - paths.push(entry_path) - } - } else { - paths.push(entry_path); - } - } - Ok(paths) - } else { - bail!("Path {directory_path:?} is not a directory") - } - } else { - Ok(Vec::new()) - } - }) -} - async fn create_target_directory(target_file_path: &Utf8Path) -> anyhow::Result<()> { let target_dir = match target_file_path.parent() { Some(parent_dir) => parent_dir, @@ -923,13 +930,18 @@ mod fs_tests { // No delimiter: should recursively list everything let (storage, cancel) = create_storage()?; let child = upload_dummy_file(&storage, "grandparent/parent/child", None, &cancel).await?; + let child_sibling = + upload_dummy_file(&storage, "grandparent/parent/child_sibling", None, &cancel).await?; let uncle = upload_dummy_file(&storage, "grandparent/uncle", None, &cancel).await?; let listing = storage .list(None, ListingMode::NoDelimiter, None, &cancel) .await?; assert!(listing.prefixes.is_empty()); - assert_eq!(listing.keys, [uncle.clone(), child.clone()].to_vec()); + assert_eq!( + listing.keys.into_iter().collect::>(), + HashSet::from([uncle.clone(), child.clone(), child_sibling.clone()]) + ); // Delimiter: should only go one deep let listing = storage @@ -942,7 +954,25 @@ mod fs_tests { ); assert!(listing.keys.is_empty()); - // Delimiter & prefix + // Delimiter & prefix with a trailing slash + let listing = storage + .list( + Some(&RemotePath::from_string("timelines/some_timeline/grandparent/").unwrap()), + ListingMode::WithDelimiter, + None, + &cancel, + ) + .await?; + assert_eq!( + listing.keys, + [RemotePath::from_string("uncle").unwrap()].to_vec() + ); + assert_eq!( + listing.prefixes, + [RemotePath::from_string("parent").unwrap()].to_vec() + ); + + // Delimiter and prefix without a trailing slash let listing = storage .list( Some(&RemotePath::from_string("timelines/some_timeline/grandparent").unwrap()), @@ -951,12 +981,66 @@ mod fs_tests { &cancel, ) .await?; + assert_eq!(listing.keys, [].to_vec()); assert_eq!( listing.prefixes, - [RemotePath::from_string("timelines/some_timeline/grandparent/parent").unwrap()] - .to_vec() + [RemotePath::from_string("grandparent").unwrap()].to_vec() + ); + + // Delimiter and prefix that's partway through a path component + let listing = storage + .list( + Some(&RemotePath::from_string("timelines/some_timeline/grandp").unwrap()), + ListingMode::WithDelimiter, + None, + &cancel, + ) + .await?; + assert_eq!(listing.keys, [].to_vec()); + assert_eq!( + listing.prefixes, + [RemotePath::from_string("grandparent").unwrap()].to_vec() + ); + + Ok(()) + } + + #[tokio::test] + async fn list_part_component() -> anyhow::Result<()> { + // No delimiter: should recursively list everything + let (storage, cancel) = create_storage()?; + + // Imitates what happens in a tenant path when we have an unsharded path and a sharded path, and do a listing + // of the unsharded path: although there is a "directory" at the unsharded path, it should be handled as + // a freeform prefix. + let _child_a = + upload_dummy_file(&storage, "grandparent/tenant-01/child", None, &cancel).await?; + let _child_b = + upload_dummy_file(&storage, "grandparent/tenant/child", None, &cancel).await?; + + // Delimiter and prefix that's partway through a path component + let listing = storage + .list( + Some( + &RemotePath::from_string("timelines/some_timeline/grandparent/tenant").unwrap(), + ), + ListingMode::WithDelimiter, + None, + &cancel, + ) + .await?; + assert_eq!(listing.keys, [].to_vec()); + + let mut found_prefixes = listing.prefixes.clone(); + found_prefixes.sort(); + assert_eq!( + found_prefixes, + [ + RemotePath::from_string("tenant").unwrap(), + RemotePath::from_string("tenant-01").unwrap(), + ] + .to_vec() ); - assert_eq!(listing.keys, [uncle.clone()].to_vec()); Ok(()) } diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 1cb85cfb1b..8091681221 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -178,10 +178,7 @@ impl S3Bucket { pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String { assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR); - let path_string = path - .get_path() - .as_str() - .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR); + let path_string = path.get_path().as_str(); match &self.prefix_in_bucket { Some(prefix) => prefix.clone() + "/" + path_string, None => path_string.to_string(), @@ -471,16 +468,11 @@ impl RemoteStorage for S3Bucket { // get the passed prefix or if it is not set use prefix_in_bucket value let list_prefix = prefix .map(|p| self.relative_path_to_s3_object(p)) - .or_else(|| self.prefix_in_bucket.clone()) - .map(|mut p| { - // required to end with a separator - // otherwise request will return only the entry of a prefix - if matches!(mode, ListingMode::WithDelimiter) - && !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) - { - p.push(REMOTE_STORAGE_PREFIX_SEPARATOR); - } - p + .or_else(|| { + self.prefix_in_bucket.clone().map(|mut s| { + s.push(REMOTE_STORAGE_PREFIX_SEPARATOR); + s + }) }); let _permit = self.permit(kind, cancel).await?; @@ -549,11 +541,15 @@ impl RemoteStorage for S3Bucket { } } - result.prefixes.extend( - prefixes - .iter() - .filter_map(|o| Some(self.s3_object_to_relative_path(o.prefix()?))), - ); + // S3 gives us prefixes like "foo/", we return them like "foo" + result.prefixes.extend(prefixes.iter().filter_map(|o| { + Some( + self.s3_object_to_relative_path( + o.prefix()? + .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR), + ), + ) + })); continuation_token = match response.next_continuation_token { Some(new_token) => Some(new_token), @@ -1050,22 +1046,22 @@ mod tests { Some("/test/prefix/"), ]; let expected_outputs = [ - vec!["", "some/path", "some/path"], - vec!["/", "/some/path", "/some/path"], + vec!["", "some/path", "some/path/"], + vec!["/", "/some/path", "/some/path/"], vec![ "test/prefix/", "test/prefix/some/path", - "test/prefix/some/path", + "test/prefix/some/path/", ], vec![ "test/prefix/", "test/prefix/some/path", - "test/prefix/some/path", + "test/prefix/some/path/", ], vec![ "test/prefix/", "test/prefix/some/path", - "test/prefix/some/path", + "test/prefix/some/path/", ], ]; diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index f5344d3ae2..c467a2d196 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -107,27 +107,6 @@ impl UnreliableWrapper { type VoidStorage = crate::LocalFs; impl RemoteStorage for UnreliableWrapper { - async fn list_prefixes( - &self, - prefix: Option<&RemotePath>, - cancel: &CancellationToken, - ) -> Result, DownloadError> { - self.attempt(RemoteOp::ListPrefixes(prefix.cloned())) - .map_err(DownloadError::Other)?; - self.inner.list_prefixes(prefix, cancel).await - } - - async fn list_files( - &self, - folder: Option<&RemotePath>, - max_keys: Option, - cancel: &CancellationToken, - ) -> Result, DownloadError> { - self.attempt(RemoteOp::ListPrefixes(folder.cloned())) - .map_err(DownloadError::Other)?; - self.inner.list_files(folder, max_keys, cancel).await - } - async fn list( &self, prefix: Option<&RemotePath>, diff --git a/libs/remote_storage/tests/common/tests.rs b/libs/remote_storage/tests/common/tests.rs index 72f6f956e0..673151c8ef 100644 --- a/libs/remote_storage/tests/common/tests.rs +++ b/libs/remote_storage/tests/common/tests.rs @@ -1,5 +1,6 @@ use anyhow::Context; use camino::Utf8Path; +use remote_storage::ListingMode; use remote_storage::RemotePath; use std::sync::Arc; use std::{collections::HashSet, num::NonZeroU32}; @@ -54,9 +55,9 @@ async fn pagination_should_work(ctx: &mut MaybeEnabledStorageWithTestBlobs) -> a let base_prefix = RemotePath::new(Utf8Path::new(ctx.enabled.base_prefix)) .context("common_prefix construction")?; let root_remote_prefixes = test_client - .list_prefixes(None, &cancel) - .await - .context("client list root prefixes failure")? + .list(None, ListingMode::WithDelimiter, None, &cancel) + .await? + .prefixes .into_iter() .collect::>(); assert_eq!( @@ -65,9 +66,14 @@ async fn pagination_should_work(ctx: &mut MaybeEnabledStorageWithTestBlobs) -> a ); let nested_remote_prefixes = test_client - .list_prefixes(Some(&base_prefix), &cancel) - .await - .context("client list nested prefixes failure")? + .list( + Some(&base_prefix.add_trailing_slash()), + ListingMode::WithDelimiter, + None, + &cancel, + ) + .await? + .prefixes .into_iter() .collect::>(); let remote_only_prefixes = nested_remote_prefixes @@ -90,11 +96,13 @@ async fn pagination_should_work(ctx: &mut MaybeEnabledStorageWithTestBlobs) -> a /// /// First, create a set of S3 objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_remote_data`] /// Then performs the following queries: -/// 1. `list_files(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt` -/// 2. `list_files("folder1")`. This should return all files `random_prefix/folder1/blob_{i}.txt` +/// 1. `list(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt` +/// 2. `list("folder1")`. This should return all files `random_prefix/folder1/blob_{i}.txt` #[test_context(MaybeEnabledStorageWithSimpleTestBlobs)] #[tokio::test] -async fn list_files_works(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> anyhow::Result<()> { +async fn list_no_delimiter_works( + ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs, +) -> anyhow::Result<()> { let ctx = match ctx { MaybeEnabledStorageWithSimpleTestBlobs::Enabled(ctx) => ctx, MaybeEnabledStorageWithSimpleTestBlobs::Disabled => return Ok(()), @@ -107,29 +115,36 @@ async fn list_files_works(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> a let base_prefix = RemotePath::new(Utf8Path::new("folder1")).context("common_prefix construction")?; let root_files = test_client - .list_files(None, None, &cancel) + .list(None, ListingMode::NoDelimiter, None, &cancel) .await .context("client list root files failure")? + .keys .into_iter() .collect::>(); assert_eq!( root_files, ctx.remote_blobs.clone(), - "remote storage list_files on root mismatches with the uploads." + "remote storage list on root mismatches with the uploads." ); // Test that max_keys limit works. In total there are about 21 files (see // upload_simple_remote_data call in test_real_s3.rs). let limited_root_files = test_client - .list_files(None, Some(NonZeroU32::new(2).unwrap()), &cancel) + .list( + None, + ListingMode::NoDelimiter, + Some(NonZeroU32::new(2).unwrap()), + &cancel, + ) .await .context("client list root files failure")?; - assert_eq!(limited_root_files.len(), 2); + assert_eq!(limited_root_files.keys.len(), 2); let nested_remote_files = test_client - .list_files(Some(&base_prefix), None, &cancel) + .list(Some(&base_prefix), ListingMode::NoDelimiter, None, &cancel) .await .context("client list nested files failure")? + .keys .into_iter() .collect::>(); let trim_remote_blobs: HashSet<_> = ctx @@ -141,7 +156,7 @@ async fn list_files_works(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> a .collect(); assert_eq!( nested_remote_files, trim_remote_blobs, - "remote storage list_files on subdirrectory mismatches with the uploads." + "remote storage list on subdirrectory mismatches with the uploads." ); Ok(()) } @@ -199,7 +214,11 @@ async fn delete_objects_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<( ctx.client.delete_objects(&[path1, path2], &cancel).await?; - let prefixes = ctx.client.list_prefixes(None, &cancel).await?; + let prefixes = ctx + .client + .list(None, ListingMode::WithDelimiter, None, &cancel) + .await? + .prefixes; assert_eq!(prefixes.len(), 1); diff --git a/libs/remote_storage/tests/test_real_azure.rs b/libs/remote_storage/tests/test_real_azure.rs index 6aa02868e6..cd0b2be4b5 100644 --- a/libs/remote_storage/tests/test_real_azure.rs +++ b/libs/remote_storage/tests/test_real_azure.rs @@ -132,10 +132,6 @@ impl AsyncTestContext for MaybeEnabledStorageWithTestBlobs { } } -// NOTE: the setups for the list_prefixes test and the list_files test are very similar -// However, they are not idential. The list_prefixes function is concerned with listing prefixes, -// whereas the list_files function is concerned with listing files. -// See `RemoteStorage::list_files` documentation for more details enum MaybeEnabledStorageWithSimpleTestBlobs { Enabled(AzureWithSimpleTestBlobs), Disabled, diff --git a/libs/remote_storage/tests/test_real_s3.rs b/libs/remote_storage/tests/test_real_s3.rs index c5d5216f00..01f6a532d6 100644 --- a/libs/remote_storage/tests/test_real_s3.rs +++ b/libs/remote_storage/tests/test_real_s3.rs @@ -12,8 +12,8 @@ use anyhow::Context; use camino::Utf8Path; use futures_util::StreamExt; use remote_storage::{ - DownloadError, GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, - S3Config, + DownloadError, GenericRemoteStorage, ListingMode, RemotePath, RemoteStorageConfig, + RemoteStorageKind, S3Config, }; use test_context::test_context; use test_context::AsyncTestContext; @@ -75,11 +75,14 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow: client: &Arc, cancel: &CancellationToken, ) -> anyhow::Result> { - Ok(retry(|| client.list_files(None, None, cancel)) - .await - .context("list root files failure")? - .into_iter() - .collect::>()) + Ok( + retry(|| client.list(None, ListingMode::NoDelimiter, None, cancel)) + .await + .context("list root files failure")? + .keys + .into_iter() + .collect::>(), + ) } let cancel = CancellationToken::new(); @@ -294,10 +297,6 @@ impl AsyncTestContext for MaybeEnabledStorageWithTestBlobs { } } -// NOTE: the setups for the list_prefixes test and the list_files test are very similar -// However, they are not idential. The list_prefixes function is concerned with listing prefixes, -// whereas the list_files function is concerned with listing files. -// See `RemoteStorage::list_files` documentation for more details enum MaybeEnabledStorageWithSimpleTestBlobs { Enabled(S3WithSimpleTestBlobs), Disabled, diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 1fa3badefb..d02f00adad 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -202,7 +202,9 @@ use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; -use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath, TimeoutOrCancel}; +use remote_storage::{ + DownloadError, GenericRemoteStorage, ListingMode, RemotePath, TimeoutOrCancel, +}; use std::ops::DerefMut; use tracing::{debug, error, info, instrument, warn}; use tracing::{info_span, Instrument}; @@ -1145,7 +1147,7 @@ impl RemoteTimelineClient { // and retry will arrive to different pageserver there wont be any traces of it on remote storage let timeline_storage_path = remote_timeline_path(&self.tenant_shard_id, &self.timeline_id); - // Execute all pending deletions, so that when we proceed to do a list_prefixes below, we aren't + // Execute all pending deletions, so that when we proceed to do a listing below, we aren't // taking the burden of listing all the layers that we already know we should delete. self.flush_deletion_queue().await?; @@ -1154,14 +1156,20 @@ impl RemoteTimelineClient { let remaining = download_retry( || async { self.storage_impl - .list_files(Some(&timeline_storage_path), None, &cancel) + .list( + Some(&timeline_storage_path), + ListingMode::NoDelimiter, + None, + &cancel, + ) .await }, "list remaining files", &cancel, ) .await - .context("list files remaining files")?; + .context("list files remaining files")? + .keys; // We will delete the current index_part object last, since it acts as a deletion // marker via its deleted_at attribute diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 6ee8ad7155..84692aa577 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -258,7 +258,7 @@ pub async fn list_remote_timelines( tenant_shard_id: TenantShardId, cancel: CancellationToken, ) -> anyhow::Result<(HashSet, HashSet)> { - let remote_path = remote_timelines_path(&tenant_shard_id); + let remote_path = remote_timelines_path(&tenant_shard_id).add_trailing_slash(); fail::fail_point!("storage-sync-list-remote-timelines", |_| { anyhow::bail!("storage-sync-list-remote-timelines"); @@ -417,11 +417,16 @@ pub(super) async fn download_index_part( let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none()); let indices = download_retry( - || async { storage.list_files(Some(&index_prefix), None, cancel).await }, + || async { + storage + .list(Some(&index_prefix), ListingMode::NoDelimiter, None, cancel) + .await + }, "list index_part files", cancel, ) - .await?; + .await? + .keys; // General case logic for which index to use: the latest index whose generation // is <= our own. See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index e3f6a606a0..e496f07114 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -18,7 +18,7 @@ use std::time::Duration; use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr; use postgres_ffi::XLogFileName; use postgres_ffi::{XLogSegNo, PG_TLI}; -use remote_storage::{GenericRemoteStorage, RemotePath, StorageMetadata}; +use remote_storage::{GenericRemoteStorage, ListingMode, RemotePath, StorageMetadata}; use tokio::fs::File; use tokio::select; @@ -601,12 +601,18 @@ pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> { backoff::retry( || async { // Do list-delete in batch_size batches to make progress even if there a lot of files. - // Alternatively we could make list_files return iterator, but it is more complicated and + // Alternatively we could make remote storage list return iterator, but it is more complicated and // I'm not sure deleting while iterating is expected in s3. loop { let files = storage - .list_files(Some(&remote_path), Some(batch_size), &cancel) - .await?; + .list( + Some(&remote_path), + ListingMode::NoDelimiter, + Some(batch_size), + &cancel, + ) + .await? + .keys; if files.is_empty() { return Ok(()); // done } @@ -666,8 +672,9 @@ pub async fn copy_s3_segments( let cancel = CancellationToken::new(); let files = storage - .list_files(Some(&remote_path), None, &cancel) - .await?; + .list(Some(&remote_path), ListingMode::NoDelimiter, None, &cancel) + .await? + .keys; let uploaded_segments = &files .iter()