mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
made the s3bucket list_files function much better
This commit is contained in:
@@ -1,11 +1,5 @@
|
||||
/*
|
||||
* This is a MWE of using our RemoteStorage API to call the aws stuff and download multiple files
|
||||
*
|
||||
* STATUS:
|
||||
* The s3bucket listing thing is, miracuously, working.
|
||||
* However, it is not really robust; please fix it to have
|
||||
* the features that the other functions have; pagination; permit; limit.
|
||||
*
|
||||
*/
|
||||
macro_rules! alek { ($expression:expr) => { println!("{:?}", $expression); }; }
|
||||
|
||||
|
||||
@@ -5,7 +5,6 @@
|
||||
//! their bucket prefixes are both specified and different.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::path::Path;
|
||||
|
||||
use anyhow::Context;
|
||||
use aws_config::{
|
||||
@@ -337,33 +336,45 @@ impl RemoteStorage for S3Bucket {
|
||||
&self,
|
||||
folder: Option<&RemotePath>
|
||||
) -> anyhow::Result<Vec<RemotePath>>{
|
||||
// THIS IS A PROTOTYPE; WIP
|
||||
// TODO: should we use DownloadError error type instead of anyhow::Error?
|
||||
let folder_name = folder.map(|x|
|
||||
String::from(
|
||||
x.object_name().expect("something went wrong computing folder name")
|
||||
)
|
||||
String::from(x.object_name().expect("invalid folder name"))
|
||||
);
|
||||
|
||||
// TODO: worry about continuation character and limits and permit and stuff
|
||||
let resp = self
|
||||
.client
|
||||
.list_objects_v2()
|
||||
.bucket("neon-dev-extensions")
|
||||
.set_prefix(folder_name)
|
||||
.send().await
|
||||
.unwrap();
|
||||
|
||||
let mut continuation_token = None;
|
||||
let mut all_files = vec![];
|
||||
for object in resp.contents().unwrap() {
|
||||
let mut object_path = Path::new(object.key().unwrap());
|
||||
if let Some(prefix_in_bucket) = &self.prefix_in_bucket {
|
||||
object_path = object_path.strip_prefix(prefix_in_bucket)?;
|
||||
if object_path == Path::new("") {
|
||||
continue;
|
||||
}
|
||||
loop {
|
||||
let _guard = self
|
||||
.concurrency_limiter
|
||||
.acquire()
|
||||
.await
|
||||
.context("Concurrency limiter semaphore got closed during S3 list_files")?;
|
||||
metrics::inc_list_objects();
|
||||
|
||||
let response = self
|
||||
.client
|
||||
.list_objects_v2()
|
||||
.bucket(self.bucket_name.clone())
|
||||
.set_prefix(folder_name.clone())
|
||||
.set_continuation_token(continuation_token)
|
||||
.set_max_keys(self.max_keys_per_list_response)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| {
|
||||
metrics::inc_list_objects_fail();
|
||||
e
|
||||
})
|
||||
.context("Failed to list files in S3 bucket")?;
|
||||
|
||||
for object in response.contents().unwrap_or_default() {
|
||||
let object_path = object.key().unwrap();
|
||||
println!("{:?}", object_path);
|
||||
let remote_path = self.s3_object_to_relative_path(object_path);
|
||||
all_files.push(remote_path);
|
||||
}
|
||||
match response.next_continuation_token {
|
||||
Some(new_token) => continuation_token = Some(new_token),
|
||||
None => break,
|
||||
}
|
||||
let remote_path = RemotePath::new(object_path)?;
|
||||
all_files.push(remote_path);
|
||||
}
|
||||
Ok(all_files)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user