From 6711087ddf3f4f3bcbe2a89e026f436d5fe415d3 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 26 Jul 2024 08:57:52 +0100 Subject: [PATCH] remote_storage: expose last_modified in listings (#8497) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Problem The scrubber would like to check the highest mtime in a tenant's objects as a safety check during purges. It recently switched to use GenericRemoteStorage, so we need to expose that in the listing methods. ## Summary of changes - In Listing.keys, return a ListingObject{} including a last_modified field, instead of a RemotePath --------- Co-authored-by: Arpad Müller --- libs/remote_storage/src/azure_blob.rs | 7 ++- libs/remote_storage/src/lib.rs | 10 ++++- libs/remote_storage/src/local_fs.rs | 45 ++++++++++++------- libs/remote_storage/src/s3_bucket.rs | 28 +++++++++--- libs/remote_storage/tests/common/tests.rs | 2 + libs/remote_storage/tests/test_real_s3.rs | 1 + pageserver/src/tenant/mgr.rs | 1 + .../src/tenant/remote_timeline_client.rs | 19 ++++---- .../tenant/remote_timeline_client/download.rs | 9 ++-- safekeeper/src/wal_backup.rs | 7 ++- storage_scrubber/src/garbage.rs | 10 +++-- 11 files changed, 96 insertions(+), 43 deletions(-) diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index acd95a5255..6ca4ae43f2 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -33,6 +33,7 @@ use tracing::debug; use utils::backoff; use crate::metrics::{start_measuring_requests, AttemptOutcome, RequestKind}; +use crate::ListingObject; use crate::{ config::AzureConfig, error::Cancelled, ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage, StorageMetadata, TimeTravelError, TimeoutOrCancel, @@ -352,7 +353,11 @@ impl RemoteStorage for AzureBlobStorage { let blob_iter = entry .blobs .blobs() - .map(|k| self.name_to_relative_path(&k.name)); + .map(|k| ListingObject{ + key: self.name_to_relative_path(&k.name), + last_modified: k.properties.last_modified.into() + } + ); for key in blob_iter { res.keys.push(key); diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 0fed86f4b8..75aa28233b 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -149,10 +149,16 @@ pub enum ListingMode { NoDelimiter, } +#[derive(PartialEq, Eq, Debug)] +pub struct ListingObject { + pub key: RemotePath, + pub last_modified: SystemTime, +} + #[derive(Default)] pub struct Listing { pub prefixes: Vec, - pub keys: Vec, + pub keys: Vec, } /// Storage (potentially remote) API to manage its state. @@ -201,7 +207,7 @@ pub trait RemoteStorage: Send + Sync + 'static { let mut combined = stream.next().await.expect("At least one item required")?; while let Some(list) = stream.next().await { let list = list?; - combined.keys.extend_from_slice(&list.keys); + combined.keys.extend(list.keys.into_iter()); combined.prefixes.extend_from_slice(&list.prefixes); } Ok(combined) diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index a4857b0bba..bc6b10aa51 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -23,8 +23,8 @@ use tokio_util::{io::ReaderStream, sync::CancellationToken}; use utils::crashsafe::path_with_suffix_extension; use crate::{ - Download, DownloadError, Listing, ListingMode, RemotePath, TimeTravelError, TimeoutOrCancel, - REMOTE_STORAGE_PREFIX_SEPARATOR, + Download, DownloadError, Listing, ListingMode, ListingObject, RemotePath, TimeTravelError, + TimeoutOrCancel, REMOTE_STORAGE_PREFIX_SEPARATOR, }; use super::{RemoteStorage, StorageMetadata}; @@ -357,19 +357,28 @@ impl RemoteStorage for LocalFs { .list_recursive(prefix) .await .map_err(DownloadError::Other)?; - let keys = keys + let objects = keys .into_iter() - .filter(|k| { + .filter_map(|k| { let path = k.with_base(&self.storage_root); - !path.is_dir() + if path.is_dir() { + None + } else { + Some(ListingObject { + key: k.clone(), + // LocalFs is just for testing, so just specify a dummy time + last_modified: SystemTime::now(), + }) + } }) .collect(); if let ListingMode::NoDelimiter = mode { - result.keys = keys; + result.keys = objects; } else { let mut prefixes = HashSet::new(); - for key in keys { + for object in objects { + let key = object.key; // If the part after the prefix includes a "/", take only the first part and put it in `prefixes`. let relative_key = if let Some(prefix) = prefix { let mut prefix = prefix.clone(); @@ -398,9 +407,11 @@ impl RemoteStorage for LocalFs { .to_owned(); prefixes.insert(first_part); } else { - result - .keys - .push(RemotePath::from_string(&relative_key).unwrap()); + result.keys.push(ListingObject { + key: RemotePath::from_string(&relative_key).unwrap(), + // LocalFs is just for testing + last_modified: SystemTime::now(), + }); } } result.prefixes = prefixes @@ -950,7 +961,11 @@ mod fs_tests { .await?; assert!(listing.prefixes.is_empty()); assert_eq!( - listing.keys.into_iter().collect::>(), + listing + .keys + .into_iter() + .map(|o| o.key) + .collect::>(), HashSet::from([uncle.clone(), child.clone(), child_sibling.clone()]) ); @@ -975,7 +990,7 @@ mod fs_tests { ) .await?; assert_eq!( - listing.keys, + listing.keys.into_iter().map(|o| o.key).collect::>(), [RemotePath::from_string("uncle").unwrap()].to_vec() ); assert_eq!( @@ -992,7 +1007,7 @@ mod fs_tests { &cancel, ) .await?; - assert_eq!(listing.keys, [].to_vec()); + assert_eq!(listing.keys, vec![]); assert_eq!( listing.prefixes, [RemotePath::from_string("grandparent").unwrap()].to_vec() @@ -1007,7 +1022,7 @@ mod fs_tests { &cancel, ) .await?; - assert_eq!(listing.keys, [].to_vec()); + assert_eq!(listing.keys, vec![]); assert_eq!( listing.prefixes, [RemotePath::from_string("grandparent").unwrap()].to_vec() @@ -1040,7 +1055,7 @@ mod fs_tests { &cancel, ) .await?; - assert_eq!(listing.keys, [].to_vec()); + assert_eq!(listing.keys, vec![]); let mut found_prefixes = listing.prefixes.clone(); found_prefixes.sort(); diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 90ed48e06c..412f307445 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -44,8 +44,9 @@ use crate::{ error::Cancelled, metrics::{start_counting_cancelled_wait, start_measuring_requests}, support::PermitCarrying, - ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage, - TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR, + ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, ListingObject, RemotePath, + RemoteStorage, TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE, + REMOTE_STORAGE_PREFIX_SEPARATOR, }; use crate::metrics::AttemptOutcome; @@ -548,9 +549,26 @@ impl RemoteStorage for S3Bucket { let mut result = Listing::default(); for object in keys { - let object_path = object.key().expect("response does not contain a key"); - let remote_path = self.s3_object_to_relative_path(object_path); - result.keys.push(remote_path); + let key = object.key().expect("response does not contain a key"); + let key = self.s3_object_to_relative_path(key); + + let last_modified = match object.last_modified.map(SystemTime::try_from) { + Some(Ok(t)) => t, + Some(Err(_)) => { + tracing::warn!("Remote storage last_modified {:?} for {} is out of bounds", + object.last_modified, key + ); + SystemTime::now() + }, + None => { + SystemTime::now() + } + }; + + result.keys.push(ListingObject{ + key, + last_modified + }); if let Some(mut mk) = max_keys { assert!(mk > 0); mk -= 1; diff --git a/libs/remote_storage/tests/common/tests.rs b/libs/remote_storage/tests/common/tests.rs index 38c316397a..86c55872c1 100644 --- a/libs/remote_storage/tests/common/tests.rs +++ b/libs/remote_storage/tests/common/tests.rs @@ -156,6 +156,7 @@ async fn list_no_delimiter_works( .context("client list root files failure")? .keys .into_iter() + .map(|o| o.key) .collect::>(); assert_eq!( root_files, @@ -182,6 +183,7 @@ async fn list_no_delimiter_works( .context("client list nested files failure")? .keys .into_iter() + .map(|o| o.key) .collect::>(); let trim_remote_blobs: HashSet<_> = ctx .remote_blobs diff --git a/libs/remote_storage/tests/test_real_s3.rs b/libs/remote_storage/tests/test_real_s3.rs index 342bc6da0b..b893beeebd 100644 --- a/libs/remote_storage/tests/test_real_s3.rs +++ b/libs/remote_storage/tests/test_real_s3.rs @@ -81,6 +81,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow: .context("list root files failure")? .keys .into_iter() + .map(|o| o.key) .collect::>(), ) } diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index f23e6ff9d6..75c8682c97 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -1407,6 +1407,7 @@ impl TenantManager { tracing::info!("Remote storage already deleted"); } else { tracing::info!("Deleting {} keys from remote storage", keys.len()); + let keys = keys.into_iter().map(|o| o.key).collect::>(); self.resources .remote_storage .delete_objects(&keys, &self.cancel) diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 8b26f122cf..2f3c6c188b 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -1380,12 +1380,13 @@ impl RemoteTimelineClient { // marker via its deleted_at attribute let latest_index = remaining .iter() - .filter(|p| { - p.object_name() + .filter(|o| { + o.key + .object_name() .map(|n| n.starts_with(IndexPart::FILE_NAME)) .unwrap_or(false) }) - .filter_map(|path| parse_remote_index_path(path.clone()).map(|gen| (path, gen))) + .filter_map(|o| parse_remote_index_path(o.key.clone()).map(|gen| (o.key.clone(), gen))) .max_by_key(|i| i.1) .map(|i| i.0.clone()) .unwrap_or( @@ -1396,14 +1397,12 @@ impl RemoteTimelineClient { let remaining_layers: Vec = remaining .into_iter() - .filter(|p| { - if p == &latest_index { - return false; + .filter_map(|o| { + if o.key == latest_index || o.key.object_name() == Some(INITDB_PRESERVED_PATH) { + None + } else { + Some(o.key) } - if p.object_name() == Some(INITDB_PRESERVED_PATH) { - return false; - } - true }) .inspect(|path| { if let Some(name) = path.object_name() { diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index d0385e4aee..a17b32c983 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -295,10 +295,11 @@ where }; } - for key in listing.keys { - let object_name = key + for object in listing.keys { + let object_name = object + .key .object_name() - .ok_or_else(|| anyhow::anyhow!("object name for key {key}"))?; + .ok_or_else(|| anyhow::anyhow!("object name for key {}", object.key))?; other_prefixes.insert(object_name.to_string()); } @@ -459,7 +460,7 @@ pub(crate) async fn download_index_part( // is <= our own. See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md let max_previous_generation = indices .into_iter() - .filter_map(parse_remote_index_path) + .filter_map(|o| parse_remote_index_path(o.key)) .filter(|g| g <= &my_generation) .max(); diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 7ecee178f3..234273e133 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -545,7 +545,10 @@ pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> { &cancel, ) .await? - .keys; + .keys + .into_iter() + .map(|o| o.key) + .collect::>(); if files.is_empty() { return Ok(()); // done } @@ -613,7 +616,7 @@ pub async fn copy_s3_segments( let uploaded_segments = &files .iter() - .filter_map(|file| file.object_name().map(ToOwned::to_owned)) + .filter_map(|o| o.key.object_name().map(ToOwned::to_owned)) .collect::>(); debug!( diff --git a/storage_scrubber/src/garbage.rs b/storage_scrubber/src/garbage.rs index c7e21d7e26..333269ec7e 100644 --- a/storage_scrubber/src/garbage.rs +++ b/storage_scrubber/src/garbage.rs @@ -10,7 +10,7 @@ use std::{ use anyhow::Context; use futures_util::TryStreamExt; use pageserver_api::shard::TenantShardId; -use remote_storage::{GenericRemoteStorage, ListingMode, RemotePath}; +use remote_storage::{GenericRemoteStorage, ListingMode, ListingObject, RemotePath}; use serde::{Deserialize, Serialize}; use tokio_stream::StreamExt; use tokio_util::sync::CancellationToken; @@ -324,7 +324,7 @@ impl std::fmt::Display for PurgeMode { pub async fn get_tenant_objects( s3_client: &GenericRemoteStorage, tenant_shard_id: TenantShardId, -) -> anyhow::Result> { +) -> anyhow::Result> { tracing::debug!("Listing objects in tenant {tenant_shard_id}"); let tenant_root = super::remote_tenant_path(&tenant_shard_id); @@ -345,7 +345,7 @@ pub async fn get_tenant_objects( pub async fn get_timeline_objects( s3_client: &GenericRemoteStorage, ttid: TenantShardTimelineId, -) -> anyhow::Result> { +) -> anyhow::Result> { tracing::debug!("Listing objects in timeline {ttid}"); let timeline_root = super::remote_timeline_path_id(&ttid); @@ -372,7 +372,7 @@ const MAX_KEYS_PER_DELETE: usize = 1000; /// `num_deleted` returns number of deleted keys. async fn do_delete( remote_client: &GenericRemoteStorage, - keys: &mut Vec, + keys: &mut Vec, dry_run: bool, drain: bool, progress_tracker: &mut DeletionProgressTracker, @@ -382,6 +382,8 @@ async fn do_delete( let request_keys = keys.split_off(keys.len() - (std::cmp::min(MAX_KEYS_PER_DELETE, keys.len()))); + let request_keys: Vec = request_keys.into_iter().map(|o| o.key).collect(); + let num_deleted = request_keys.len(); if dry_run { tracing::info!("Dry-run deletion of objects: ");