mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-22 15:41:15 +00:00
remove usage of list_files
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
use anyhow::Context;
|
||||
use camino::Utf8Path;
|
||||
use remote_storage::ListingMode;
|
||||
use remote_storage::RemotePath;
|
||||
use std::sync::Arc;
|
||||
use std::{collections::HashSet, num::NonZeroU32};
|
||||
@@ -54,12 +55,7 @@ async fn pagination_should_work(ctx: &mut MaybeEnabledStorageWithTestBlobs) -> a
|
||||
let base_prefix = RemotePath::new(Utf8Path::new(ctx.enabled.base_prefix))
|
||||
.context("common_prefix construction")?;
|
||||
let root_remote_prefixes = test_client
|
||||
.list(
|
||||
None,
|
||||
remote_storage::ListingMode::WithDelimiter,
|
||||
None,
|
||||
&cancel,
|
||||
)
|
||||
.list(None, ListingMode::WithDelimiter, None, &cancel)
|
||||
.await?
|
||||
.prefixes
|
||||
.into_iter()
|
||||
@@ -72,7 +68,7 @@ async fn pagination_should_work(ctx: &mut MaybeEnabledStorageWithTestBlobs) -> a
|
||||
let nested_remote_prefixes = test_client
|
||||
.list(
|
||||
Some(&base_prefix.add_trailing_slash()),
|
||||
remote_storage::ListingMode::WithDelimiter,
|
||||
ListingMode::WithDelimiter,
|
||||
None,
|
||||
&cancel,
|
||||
)
|
||||
@@ -100,11 +96,13 @@ async fn pagination_should_work(ctx: &mut MaybeEnabledStorageWithTestBlobs) -> a
|
||||
///
|
||||
/// First, create a set of S3 objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_remote_data`]
|
||||
/// Then performs the following queries:
|
||||
/// 1. `list_files(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt`
|
||||
/// 2. `list_files("folder1")`. This should return all files `random_prefix/folder1/blob_{i}.txt`
|
||||
/// 1. `list(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt`
|
||||
/// 2. `list("folder1")`. This should return all files `random_prefix/folder1/blob_{i}.txt`
|
||||
#[test_context(MaybeEnabledStorageWithSimpleTestBlobs)]
|
||||
#[tokio::test]
|
||||
async fn list_files_works(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> anyhow::Result<()> {
|
||||
async fn list_no_delimiter_works(
|
||||
ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs,
|
||||
) -> anyhow::Result<()> {
|
||||
let ctx = match ctx {
|
||||
MaybeEnabledStorageWithSimpleTestBlobs::Enabled(ctx) => ctx,
|
||||
MaybeEnabledStorageWithSimpleTestBlobs::Disabled => return Ok(()),
|
||||
@@ -117,29 +115,36 @@ async fn list_files_works(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> a
|
||||
let base_prefix =
|
||||
RemotePath::new(Utf8Path::new("folder1")).context("common_prefix construction")?;
|
||||
let root_files = test_client
|
||||
.list_files(None, None, &cancel)
|
||||
.list(None, ListingMode::NoDelimiter, None, &cancel)
|
||||
.await
|
||||
.context("client list root files failure")?
|
||||
.keys
|
||||
.into_iter()
|
||||
.collect::<HashSet<_>>();
|
||||
assert_eq!(
|
||||
root_files,
|
||||
ctx.remote_blobs.clone(),
|
||||
"remote storage list_files on root mismatches with the uploads."
|
||||
"remote storage list on root mismatches with the uploads."
|
||||
);
|
||||
|
||||
// Test that max_keys limit works. In total there are about 21 files (see
|
||||
// upload_simple_remote_data call in test_real_s3.rs).
|
||||
let limited_root_files = test_client
|
||||
.list_files(None, Some(NonZeroU32::new(2).unwrap()), &cancel)
|
||||
.list(
|
||||
None,
|
||||
ListingMode::NoDelimiter,
|
||||
Some(NonZeroU32::new(2).unwrap()),
|
||||
&cancel,
|
||||
)
|
||||
.await
|
||||
.context("client list root files failure")?;
|
||||
assert_eq!(limited_root_files.len(), 2);
|
||||
assert_eq!(limited_root_files.keys.len(), 2);
|
||||
|
||||
let nested_remote_files = test_client
|
||||
.list_files(Some(&base_prefix), None, &cancel)
|
||||
.list(Some(&base_prefix), ListingMode::NoDelimiter, None, &cancel)
|
||||
.await
|
||||
.context("client list nested files failure")?
|
||||
.keys
|
||||
.into_iter()
|
||||
.collect::<HashSet<_>>();
|
||||
let trim_remote_blobs: HashSet<_> = ctx
|
||||
@@ -151,7 +156,7 @@ async fn list_files_works(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> a
|
||||
.collect();
|
||||
assert_eq!(
|
||||
nested_remote_files, trim_remote_blobs,
|
||||
"remote storage list_files on subdirrectory mismatches with the uploads."
|
||||
"remote storage list on subdirrectory mismatches with the uploads."
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
@@ -211,12 +216,7 @@ async fn delete_objects_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<(
|
||||
|
||||
let prefixes = ctx
|
||||
.client
|
||||
.list(
|
||||
None,
|
||||
remote_storage::ListingMode::WithDelimiter,
|
||||
None,
|
||||
&cancel,
|
||||
)
|
||||
.list(None, ListingMode::WithDelimiter, None, &cancel)
|
||||
.await?
|
||||
.prefixes;
|
||||
|
||||
|
||||
@@ -12,8 +12,8 @@ use anyhow::Context;
|
||||
use camino::Utf8Path;
|
||||
use futures_util::StreamExt;
|
||||
use remote_storage::{
|
||||
DownloadError, GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind,
|
||||
S3Config,
|
||||
DownloadError, GenericRemoteStorage, ListingMode, RemotePath, RemoteStorageConfig,
|
||||
RemoteStorageKind, S3Config,
|
||||
};
|
||||
use test_context::test_context;
|
||||
use test_context::AsyncTestContext;
|
||||
@@ -75,11 +75,14 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
|
||||
client: &Arc<GenericRemoteStorage>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<HashSet<RemotePath>> {
|
||||
Ok(retry(|| client.list_files(None, None, cancel))
|
||||
.await
|
||||
.context("list root files failure")?
|
||||
.into_iter()
|
||||
.collect::<HashSet<_>>())
|
||||
Ok(
|
||||
retry(|| client.list(None, ListingMode::NoDelimiter, None, cancel))
|
||||
.await
|
||||
.context("list root files failure")?
|
||||
.keys
|
||||
.into_iter()
|
||||
.collect::<HashSet<_>>(),
|
||||
)
|
||||
}
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
@@ -202,7 +202,9 @@ use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath, TimeoutOrCancel};
|
||||
use remote_storage::{
|
||||
DownloadError, GenericRemoteStorage, ListingMode, RemotePath, TimeoutOrCancel,
|
||||
};
|
||||
use std::ops::DerefMut;
|
||||
use tracing::{debug, error, info, instrument, warn};
|
||||
use tracing::{info_span, Instrument};
|
||||
@@ -1131,14 +1133,20 @@ impl RemoteTimelineClient {
|
||||
let remaining = download_retry(
|
||||
|| async {
|
||||
self.storage_impl
|
||||
.list_files(Some(&timeline_storage_path), None, &cancel)
|
||||
.list(
|
||||
Some(&timeline_storage_path),
|
||||
ListingMode::NoDelimiter,
|
||||
None,
|
||||
&cancel,
|
||||
)
|
||||
.await
|
||||
},
|
||||
"list remaining files",
|
||||
&cancel,
|
||||
)
|
||||
.await
|
||||
.context("list files remaining files")?;
|
||||
.context("list files remaining files")?
|
||||
.keys;
|
||||
|
||||
// We will delete the current index_part object last, since it acts as a deletion
|
||||
// marker via its deleted_at attribute
|
||||
|
||||
@@ -417,11 +417,16 @@ pub(super) async fn download_index_part(
|
||||
let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none());
|
||||
|
||||
let indices = download_retry(
|
||||
|| async { storage.list_files(Some(&index_prefix), None, cancel).await },
|
||||
|| async {
|
||||
storage
|
||||
.list(Some(&index_prefix), ListingMode::NoDelimiter, None, cancel)
|
||||
.await
|
||||
},
|
||||
"list index_part files",
|
||||
cancel,
|
||||
)
|
||||
.await?;
|
||||
.await?
|
||||
.keys;
|
||||
|
||||
// General case logic for which index to use: the latest index whose generation
|
||||
// is <= our own. See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
|
||||
|
||||
@@ -18,7 +18,7 @@ use std::time::Duration;
|
||||
use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr;
|
||||
use postgres_ffi::XLogFileName;
|
||||
use postgres_ffi::{XLogSegNo, PG_TLI};
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath, StorageMetadata};
|
||||
use remote_storage::{GenericRemoteStorage, ListingMode, RemotePath, StorageMetadata};
|
||||
use tokio::fs::File;
|
||||
|
||||
use tokio::select;
|
||||
@@ -601,12 +601,18 @@ pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> {
|
||||
backoff::retry(
|
||||
|| async {
|
||||
// Do list-delete in batch_size batches to make progress even if there a lot of files.
|
||||
// Alternatively we could make list_files return iterator, but it is more complicated and
|
||||
// Alternatively we could make remote storage list return iterator, but it is more complicated and
|
||||
// I'm not sure deleting while iterating is expected in s3.
|
||||
loop {
|
||||
let files = storage
|
||||
.list_files(Some(&remote_path), Some(batch_size), &cancel)
|
||||
.await?;
|
||||
.list(
|
||||
Some(&remote_path),
|
||||
ListingMode::NoDelimiter,
|
||||
Some(batch_size),
|
||||
&cancel,
|
||||
)
|
||||
.await?
|
||||
.keys;
|
||||
if files.is_empty() {
|
||||
return Ok(()); // done
|
||||
}
|
||||
@@ -666,8 +672,9 @@ pub async fn copy_s3_segments(
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let files = storage
|
||||
.list_files(Some(&remote_path), None, &cancel)
|
||||
.await?;
|
||||
.list(Some(&remote_path), ListingMode::NoDelimiter, None, &cancel)
|
||||
.await?
|
||||
.keys;
|
||||
|
||||
let uploaded_segments = &files
|
||||
.iter()
|
||||
|
||||
Reference in New Issue
Block a user