From e27527f7c1cb7e536c729d417403154a6ced83ee Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 18 Apr 2024 10:36:39 +0100 Subject: [PATCH] remote_storage: fix local_fs listing --- libs/remote_storage/src/local_fs.rs | 304 ++++++++++++++++++---------- 1 file changed, 200 insertions(+), 104 deletions(-) diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 8cad863731..4974dba9cc 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -5,11 +5,9 @@ //! volume is mounted to the local FS. use std::{ - borrow::Cow, - future::Future, + collections::HashSet, io::ErrorKind, num::NonZeroU32, - pin::Pin, time::{Duration, SystemTime, UNIX_EPOCH}, }; @@ -22,11 +20,11 @@ use tokio::{ io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}, }; use tokio_util::{io::ReaderStream, sync::CancellationToken}; -use tracing::*; -use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty}; +use utils::crashsafe::path_with_suffix_extension; use crate::{ Download, DownloadError, Listing, ListingMode, RemotePath, TimeTravelError, TimeoutOrCancel, + REMOTE_STORAGE_PREFIX_SEPARATOR, }; use super::{RemoteStorage, StorageMetadata}; @@ -93,7 +91,47 @@ impl LocalFs { #[cfg(test)] async fn list_all(&self) -> anyhow::Result> { - Ok(get_all_files(&self.storage_root, true) + use std::{future::Future, pin::Pin}; + fn get_all_files<'a, P>( + directory_path: P, + ) -> Pin>> + Send + Sync + 'a>> + where + P: AsRef + Send + Sync + 'a, + { + Box::pin(async move { + let directory_path = directory_path.as_ref(); + if directory_path.exists() { + if directory_path.is_dir() { + let mut paths = Vec::new(); + let mut dir_contents = fs::read_dir(directory_path).await?; + while let Some(dir_entry) = dir_contents.next_entry().await? { + let file_type = dir_entry.file_type().await?; + let entry_path = + Utf8PathBuf::from_path_buf(dir_entry.path()).map_err(|pb| { + anyhow::Error::msg(format!( + "non-Unicode path: {}", + pb.to_string_lossy() + )) + })?; + if file_type.is_symlink() { + tracing::debug!("{entry_path:?} is a symlink, skipping") + } else if file_type.is_dir() { + paths.extend(get_all_files(&entry_path).await?.into_iter()) + } else { + paths.push(entry_path); + } + } + Ok(paths) + } else { + bail!("Path {directory_path:?} is not a directory") + } + } else { + Ok(Vec::new()) + } + }) + } + + Ok(get_all_files(&self.storage_root) .await? .into_iter() .map(|path| { @@ -115,11 +153,20 @@ impl LocalFs { None => self.storage_root.clone(), }; + eprintln!("local_fs list: searching from {full_path} for initial_dir"); // If we were given a directory, we may use it as our starting point. // Otherwise, we must go up to the first ancestor dir that exists. This is because // S3 object list prefixes can be arbitrary strings, but when reading // the local filesystem we need a directory to start calling read_dir on. let mut initial_dir = full_path.clone(); + + // If there's no trailing slash, we have to start looking from one above: even if + // `initial_dir` is a directory, we should still list any prefixes in the parent + // that start with the same string. + if !full_path.to_string().ends_with('/') { + initial_dir.pop(); + } + loop { // Did we make it to the root? if initial_dir.parent().is_none() { @@ -150,6 +197,8 @@ impl LocalFs { // starts_with later. let prefix = full_path.as_str(); + eprintln!("local_fs list: initial_dir={initial_dir}"); + let mut files = vec![]; let mut directory_queue = vec![initial_dir]; while let Some(cur_folder) = directory_queue.pop() { @@ -163,6 +212,8 @@ impl LocalFs { if full_file_name.is_dir() { directory_queue.push(full_file_name); } + } else { + eprintln!("Drop {full_file_name}, not in prefix"); } } } @@ -292,64 +343,76 @@ impl RemoteStorage for LocalFs { max_keys: Option, cancel: &CancellationToken, ) -> Result { + if let Some(prefix) = prefix { + eprintln!("local_fs list: prefix={}", prefix); + } let op = async { let mut result = Listing::default(); - if let ListingMode::NoDelimiter = mode { - let keys = self - .list_recursive(prefix) - .await - .map_err(DownloadError::Other)?; - - result.keys = keys - .into_iter() - .filter(|k| { - let path = k.with_base(&self.storage_root); - !path.is_dir() - }) - .collect(); - - if let Some(max_keys) = max_keys { - result.keys.truncate(max_keys.get() as usize); - } - - return Ok(result); - } - - let path = match prefix { - Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)), - None => Cow::Borrowed(&self.storage_root), - }; - - let prefixes_to_filter = get_all_files(path.as_ref(), false) + // Filter out directories: in S3 directories don't exist, only the keys within them do. + let keys = self + .list_recursive(prefix) .await .map_err(DownloadError::Other)?; + let keys = keys + .into_iter() + .filter(|k| { + let path = k.with_base(&self.storage_root); + !path.is_dir() + }) + .collect(); - // filter out empty directories to mirror s3 behavior. - for prefix in prefixes_to_filter { - if prefix.is_dir() - && is_directory_empty(&prefix) - .await - .map_err(DownloadError::Other)? - { - continue; - } - - let stripped = prefix - .strip_prefix(&self.storage_root) - .context("Failed to strip prefix") - .and_then(RemotePath::new) - .expect( - "We list files for storage root, hence should be able to remote the prefix", - ); - - if prefix.is_dir() { - result.prefixes.push(stripped); - } else { - result.keys.push(stripped); + if let ListingMode::NoDelimiter = mode { + result.keys = keys; + } else { + let mut prefixes = HashSet::new(); + for key in keys { + eprintln!("key: {key}"); + // If the part after the prefix includes a "/", take only the first part and put it in `prefixes`. + let relative_key = if let Some(prefix) = prefix { + let mut prefix = prefix.clone(); + // We only strip the dirname of the prefix, so that when we strip it from the start of keys we + // end up with full file/dir names. + let prefix_full_local_path = prefix.with_base(&self.storage_root); + let has_slash = prefix.0.to_string().ends_with('/'); + let strip_prefix = if prefix_full_local_path.is_dir() && has_slash { + prefix + } else { + prefix.0.pop(); + prefix + }; + eprintln!("strip_prefix={strip_prefix}"); + + RemotePath::new(key.strip_prefix(&strip_prefix).unwrap()).unwrap() + } else { + key + }; + + eprintln!("relative_key: {relative_key}"); + + let relative_key = format!("{}", relative_key); + if relative_key.contains(REMOTE_STORAGE_PREFIX_SEPARATOR) { + let first_part = relative_key + .split(REMOTE_STORAGE_PREFIX_SEPARATOR) + .next() + .unwrap() + .to_owned(); + prefixes.insert(first_part); + } else { + result + .keys + .push(RemotePath::from_string(&relative_key).unwrap()); + } } + result.prefixes = prefixes + .into_iter() + .map(|s| RemotePath::from_string(&s).unwrap()) + .collect(); } + if let Some(max_keys) = max_keys { + result.keys.truncate(max_keys.get() as usize); + } Ok(result) }; @@ -560,50 +623,6 @@ fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf { path_with_suffix_extension(original_path, "metadata") } -fn get_all_files<'a, P>( - directory_path: P, - recursive: bool, -) -> Pin>> + Send + Sync + 'a>> -where - P: AsRef + Send + Sync + 'a, -{ - Box::pin(async move { - let directory_path = directory_path.as_ref(); - if directory_path.exists() { - if directory_path.is_dir() { - let mut paths = Vec::new(); - let mut dir_contents = fs::read_dir(directory_path).await?; - while let Some(dir_entry) = dir_contents.next_entry().await? { - let file_type = dir_entry.file_type().await?; - let entry_path = - Utf8PathBuf::from_path_buf(dir_entry.path()).map_err(|pb| { - anyhow::Error::msg(format!( - "non-Unicode path: {}", - pb.to_string_lossy() - )) - })?; - if file_type.is_symlink() { - debug!("{entry_path:?} is a symlink, skipping") - } else if file_type.is_dir() { - if recursive { - paths.extend(get_all_files(&entry_path, true).await?.into_iter()) - } else { - paths.push(entry_path) - } - } else { - paths.push(entry_path); - } - } - Ok(paths) - } else { - bail!("Path {directory_path:?} is not a directory") - } - } else { - Ok(Vec::new()) - } - }) -} - async fn create_target_directory(target_file_path: &Utf8Path) -> anyhow::Result<()> { let target_dir = match target_file_path.parent() { Some(parent_dir) => parent_dir, @@ -923,13 +942,18 @@ mod fs_tests { // No delimiter: should recursively list everything let (storage, cancel) = create_storage()?; let child = upload_dummy_file(&storage, "grandparent/parent/child", None, &cancel).await?; + let child_sibling = + upload_dummy_file(&storage, "grandparent/parent/child_sibling", None, &cancel).await?; let uncle = upload_dummy_file(&storage, "grandparent/uncle", None, &cancel).await?; let listing = storage .list(None, ListingMode::NoDelimiter, None, &cancel) .await?; assert!(listing.prefixes.is_empty()); - assert_eq!(listing.keys, [uncle.clone(), child.clone()].to_vec()); + assert_eq!( + listing.keys, + [uncle.clone(), child.clone(), child_sibling.clone()].to_vec() + ); // Delimiter: should only go one deep let listing = storage @@ -942,7 +966,25 @@ mod fs_tests { ); assert!(listing.keys.is_empty()); - // Delimiter & prefix + // Delimiter & prefix with a trailing slash + let listing = storage + .list( + Some(&RemotePath::from_string("timelines/some_timeline/grandparent/").unwrap()), + ListingMode::WithDelimiter, + None, + &cancel, + ) + .await?; + assert_eq!( + listing.keys, + [RemotePath::from_string("uncle").unwrap()].to_vec() + ); + assert_eq!( + listing.prefixes, + [RemotePath::from_string("parent").unwrap()].to_vec() + ); + + // Delimiter and prefix without a trailing slash let listing = storage .list( Some(&RemotePath::from_string("timelines/some_timeline/grandparent").unwrap()), @@ -951,12 +993,66 @@ mod fs_tests { &cancel, ) .await?; + assert_eq!(listing.keys, [].to_vec()); assert_eq!( listing.prefixes, - [RemotePath::from_string("timelines/some_timeline/grandparent/parent").unwrap()] - .to_vec() + [RemotePath::from_string("grandparent").unwrap()].to_vec() + ); + + // Delimiter and prefix that's partway through a path component + let listing = storage + .list( + Some(&RemotePath::from_string("timelines/some_timeline/grandp").unwrap()), + ListingMode::WithDelimiter, + None, + &cancel, + ) + .await?; + assert_eq!(listing.keys, [].to_vec()); + assert_eq!( + listing.prefixes, + [RemotePath::from_string("grandparent").unwrap()].to_vec() + ); + + Ok(()) + } + + #[tokio::test] + async fn list_part_component() -> anyhow::Result<()> { + // No delimiter: should recursively list everything + let (storage, cancel) = create_storage()?; + + // Imitates what happens in a tenant path when we have an unsharded path and a sharded path, and do a listing + // of the unsharded path: although there is a "directory" at the unsharded path, it should be handled as + // a freeform prefix. + let _child_a = + upload_dummy_file(&storage, "grandparent/tenant-01/child", None, &cancel).await?; + let _child_b = + upload_dummy_file(&storage, "grandparent/tenant/child", None, &cancel).await?; + + // Delimiter and prefix that's partway through a path component + let listing = storage + .list( + Some( + &RemotePath::from_string("timelines/some_timeline/grandparent/tenant").unwrap(), + ), + ListingMode::WithDelimiter, + None, + &cancel, + ) + .await?; + assert_eq!(listing.keys, [].to_vec()); + + let mut found_prefixes = listing.prefixes.clone(); + found_prefixes.sort(); + assert_eq!( + found_prefixes, + [ + RemotePath::from_string("tenant").unwrap(), + RemotePath::from_string("tenant-01").unwrap(), + ] + .to_vec() ); - assert_eq!(listing.keys, [uncle.clone()].to_vec()); Ok(()) }