mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-26 09:30:37 +00:00
remote_storage: expose last_modified in listings (#8497)
## Problem
The scrubber would like to check the highest mtime in a tenant's objects
as a safety check during purges. It recently switched to use
GenericRemoteStorage, so we need to expose that in the listing methods.
## Summary of changes
- In Listing.keys, return a ListingObject{} including a last_modified
field, instead of a RemotePath
---------
Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
This commit is contained in:
@@ -33,6 +33,7 @@ use tracing::debug;
|
||||
use utils::backoff;
|
||||
|
||||
use crate::metrics::{start_measuring_requests, AttemptOutcome, RequestKind};
|
||||
use crate::ListingObject;
|
||||
use crate::{
|
||||
config::AzureConfig, error::Cancelled, ConcurrencyLimiter, Download, DownloadError, Listing,
|
||||
ListingMode, RemotePath, RemoteStorage, StorageMetadata, TimeTravelError, TimeoutOrCancel,
|
||||
@@ -352,7 +353,11 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
let blob_iter = entry
|
||||
.blobs
|
||||
.blobs()
|
||||
.map(|k| self.name_to_relative_path(&k.name));
|
||||
.map(|k| ListingObject{
|
||||
key: self.name_to_relative_path(&k.name),
|
||||
last_modified: k.properties.last_modified.into()
|
||||
}
|
||||
);
|
||||
|
||||
for key in blob_iter {
|
||||
res.keys.push(key);
|
||||
|
||||
@@ -149,10 +149,16 @@ pub enum ListingMode {
|
||||
NoDelimiter,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, Debug)]
|
||||
pub struct ListingObject {
|
||||
pub key: RemotePath,
|
||||
pub last_modified: SystemTime,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct Listing {
|
||||
pub prefixes: Vec<RemotePath>,
|
||||
pub keys: Vec<RemotePath>,
|
||||
pub keys: Vec<ListingObject>,
|
||||
}
|
||||
|
||||
/// Storage (potentially remote) API to manage its state.
|
||||
@@ -201,7 +207,7 @@ pub trait RemoteStorage: Send + Sync + 'static {
|
||||
let mut combined = stream.next().await.expect("At least one item required")?;
|
||||
while let Some(list) = stream.next().await {
|
||||
let list = list?;
|
||||
combined.keys.extend_from_slice(&list.keys);
|
||||
combined.keys.extend(list.keys.into_iter());
|
||||
combined.prefixes.extend_from_slice(&list.prefixes);
|
||||
}
|
||||
Ok(combined)
|
||||
|
||||
@@ -23,8 +23,8 @@ use tokio_util::{io::ReaderStream, sync::CancellationToken};
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
|
||||
use crate::{
|
||||
Download, DownloadError, Listing, ListingMode, RemotePath, TimeTravelError, TimeoutOrCancel,
|
||||
REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||
Download, DownloadError, Listing, ListingMode, ListingObject, RemotePath, TimeTravelError,
|
||||
TimeoutOrCancel, REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||
};
|
||||
|
||||
use super::{RemoteStorage, StorageMetadata};
|
||||
@@ -357,19 +357,28 @@ impl RemoteStorage for LocalFs {
|
||||
.list_recursive(prefix)
|
||||
.await
|
||||
.map_err(DownloadError::Other)?;
|
||||
let keys = keys
|
||||
let objects = keys
|
||||
.into_iter()
|
||||
.filter(|k| {
|
||||
.filter_map(|k| {
|
||||
let path = k.with_base(&self.storage_root);
|
||||
!path.is_dir()
|
||||
if path.is_dir() {
|
||||
None
|
||||
} else {
|
||||
Some(ListingObject {
|
||||
key: k.clone(),
|
||||
// LocalFs is just for testing, so just specify a dummy time
|
||||
last_modified: SystemTime::now(),
|
||||
})
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
if let ListingMode::NoDelimiter = mode {
|
||||
result.keys = keys;
|
||||
result.keys = objects;
|
||||
} else {
|
||||
let mut prefixes = HashSet::new();
|
||||
for key in keys {
|
||||
for object in objects {
|
||||
let key = object.key;
|
||||
// If the part after the prefix includes a "/", take only the first part and put it in `prefixes`.
|
||||
let relative_key = if let Some(prefix) = prefix {
|
||||
let mut prefix = prefix.clone();
|
||||
@@ -398,9 +407,11 @@ impl RemoteStorage for LocalFs {
|
||||
.to_owned();
|
||||
prefixes.insert(first_part);
|
||||
} else {
|
||||
result
|
||||
.keys
|
||||
.push(RemotePath::from_string(&relative_key).unwrap());
|
||||
result.keys.push(ListingObject {
|
||||
key: RemotePath::from_string(&relative_key).unwrap(),
|
||||
// LocalFs is just for testing
|
||||
last_modified: SystemTime::now(),
|
||||
});
|
||||
}
|
||||
}
|
||||
result.prefixes = prefixes
|
||||
@@ -950,7 +961,11 @@ mod fs_tests {
|
||||
.await?;
|
||||
assert!(listing.prefixes.is_empty());
|
||||
assert_eq!(
|
||||
listing.keys.into_iter().collect::<HashSet<_>>(),
|
||||
listing
|
||||
.keys
|
||||
.into_iter()
|
||||
.map(|o| o.key)
|
||||
.collect::<HashSet<_>>(),
|
||||
HashSet::from([uncle.clone(), child.clone(), child_sibling.clone()])
|
||||
);
|
||||
|
||||
@@ -975,7 +990,7 @@ mod fs_tests {
|
||||
)
|
||||
.await?;
|
||||
assert_eq!(
|
||||
listing.keys,
|
||||
listing.keys.into_iter().map(|o| o.key).collect::<Vec<_>>(),
|
||||
[RemotePath::from_string("uncle").unwrap()].to_vec()
|
||||
);
|
||||
assert_eq!(
|
||||
@@ -992,7 +1007,7 @@ mod fs_tests {
|
||||
&cancel,
|
||||
)
|
||||
.await?;
|
||||
assert_eq!(listing.keys, [].to_vec());
|
||||
assert_eq!(listing.keys, vec![]);
|
||||
assert_eq!(
|
||||
listing.prefixes,
|
||||
[RemotePath::from_string("grandparent").unwrap()].to_vec()
|
||||
@@ -1007,7 +1022,7 @@ mod fs_tests {
|
||||
&cancel,
|
||||
)
|
||||
.await?;
|
||||
assert_eq!(listing.keys, [].to_vec());
|
||||
assert_eq!(listing.keys, vec![]);
|
||||
assert_eq!(
|
||||
listing.prefixes,
|
||||
[RemotePath::from_string("grandparent").unwrap()].to_vec()
|
||||
@@ -1040,7 +1055,7 @@ mod fs_tests {
|
||||
&cancel,
|
||||
)
|
||||
.await?;
|
||||
assert_eq!(listing.keys, [].to_vec());
|
||||
assert_eq!(listing.keys, vec![]);
|
||||
|
||||
let mut found_prefixes = listing.prefixes.clone();
|
||||
found_prefixes.sort();
|
||||
|
||||
@@ -44,8 +44,9 @@ use crate::{
|
||||
error::Cancelled,
|
||||
metrics::{start_counting_cancelled_wait, start_measuring_requests},
|
||||
support::PermitCarrying,
|
||||
ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage,
|
||||
TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||
ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, ListingObject, RemotePath,
|
||||
RemoteStorage, TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE,
|
||||
REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||
};
|
||||
|
||||
use crate::metrics::AttemptOutcome;
|
||||
@@ -548,9 +549,26 @@ impl RemoteStorage for S3Bucket {
|
||||
let mut result = Listing::default();
|
||||
|
||||
for object in keys {
|
||||
let object_path = object.key().expect("response does not contain a key");
|
||||
let remote_path = self.s3_object_to_relative_path(object_path);
|
||||
result.keys.push(remote_path);
|
||||
let key = object.key().expect("response does not contain a key");
|
||||
let key = self.s3_object_to_relative_path(key);
|
||||
|
||||
let last_modified = match object.last_modified.map(SystemTime::try_from) {
|
||||
Some(Ok(t)) => t,
|
||||
Some(Err(_)) => {
|
||||
tracing::warn!("Remote storage last_modified {:?} for {} is out of bounds",
|
||||
object.last_modified, key
|
||||
);
|
||||
SystemTime::now()
|
||||
},
|
||||
None => {
|
||||
SystemTime::now()
|
||||
}
|
||||
};
|
||||
|
||||
result.keys.push(ListingObject{
|
||||
key,
|
||||
last_modified
|
||||
});
|
||||
if let Some(mut mk) = max_keys {
|
||||
assert!(mk > 0);
|
||||
mk -= 1;
|
||||
|
||||
@@ -156,6 +156,7 @@ async fn list_no_delimiter_works(
|
||||
.context("client list root files failure")?
|
||||
.keys
|
||||
.into_iter()
|
||||
.map(|o| o.key)
|
||||
.collect::<HashSet<_>>();
|
||||
assert_eq!(
|
||||
root_files,
|
||||
@@ -182,6 +183,7 @@ async fn list_no_delimiter_works(
|
||||
.context("client list nested files failure")?
|
||||
.keys
|
||||
.into_iter()
|
||||
.map(|o| o.key)
|
||||
.collect::<HashSet<_>>();
|
||||
let trim_remote_blobs: HashSet<_> = ctx
|
||||
.remote_blobs
|
||||
|
||||
@@ -81,6 +81,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
|
||||
.context("list root files failure")?
|
||||
.keys
|
||||
.into_iter()
|
||||
.map(|o| o.key)
|
||||
.collect::<HashSet<_>>(),
|
||||
)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user