mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 13:32:57 +00:00
Do pagination in list_object_versions call (#6500)
## Problem The tenants we want to recover might have tens of thousands of keys, or more. At that point, the AWS API returns a paginated response. ## Summary of changes Support paginated responses for `list_object_versions` requests. Follow-up of #6155, part of https://github.com/neondatabase/cloud/issues/8233
This commit is contained in:
@@ -646,7 +646,7 @@ impl RemoteStorage for S3Bucket {
|
||||
let timestamp = DateTime::from(timestamp);
|
||||
let done_if_after = DateTime::from(done_if_after);
|
||||
|
||||
tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
|
||||
tracing::info!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
|
||||
|
||||
// get the passed prefix or if it is not set use prefix_in_bucket value
|
||||
let prefix = prefix
|
||||
@@ -657,40 +657,67 @@ impl RemoteStorage for S3Bucket {
|
||||
let max_retries = 10;
|
||||
let is_permanent = |_e: &_| false;
|
||||
|
||||
let list = backoff::retry(
|
||||
|| async {
|
||||
Ok(self
|
||||
.client
|
||||
.list_object_versions()
|
||||
.bucket(self.bucket_name.clone())
|
||||
.set_prefix(prefix.clone())
|
||||
.send()
|
||||
.await?)
|
||||
},
|
||||
is_permanent,
|
||||
warn_threshold,
|
||||
max_retries,
|
||||
"listing object versions for time_travel_recover",
|
||||
backoff::Cancel::new(cancel.clone(), || anyhow!("Cancelled")),
|
||||
)
|
||||
.await?;
|
||||
let mut key_marker = None;
|
||||
let mut version_id_marker = None;
|
||||
let mut versions_and_deletes = Vec::new();
|
||||
|
||||
if list.is_truncated().unwrap_or_default() {
|
||||
anyhow::bail!("Received truncated ListObjectVersions response for prefix={prefix:?}");
|
||||
loop {
|
||||
let response = backoff::retry(
|
||||
|| async {
|
||||
Ok(self
|
||||
.client
|
||||
.list_object_versions()
|
||||
.bucket(self.bucket_name.clone())
|
||||
.set_prefix(prefix.clone())
|
||||
.set_key_marker(key_marker.clone())
|
||||
.set_version_id_marker(version_id_marker.clone())
|
||||
.send()
|
||||
.await?)
|
||||
},
|
||||
is_permanent,
|
||||
warn_threshold,
|
||||
max_retries,
|
||||
"listing object versions for time_travel_recover",
|
||||
backoff::Cancel::new(cancel.clone(), || anyhow!("Cancelled")),
|
||||
)
|
||||
.await?;
|
||||
|
||||
tracing::trace!(
|
||||
" Got List response version_id_marker={:?}, key_marker={:?}",
|
||||
response.version_id_marker,
|
||||
response.key_marker
|
||||
);
|
||||
let versions = response.versions.unwrap_or_default();
|
||||
let delete_markers = response.delete_markers.unwrap_or_default();
|
||||
let new_versions = versions.into_iter().map(VerOrDelete::Version);
|
||||
let new_deletes = delete_markers.into_iter().map(VerOrDelete::DeleteMarker);
|
||||
let new_versions_and_deletes = new_versions.chain(new_deletes);
|
||||
versions_and_deletes.extend(new_versions_and_deletes);
|
||||
fn none_if_empty(v: Option<String>) -> Option<String> {
|
||||
v.filter(|v| !v.is_empty())
|
||||
}
|
||||
version_id_marker = none_if_empty(response.next_version_id_marker);
|
||||
key_marker = none_if_empty(response.next_key_marker);
|
||||
if version_id_marker.is_none() {
|
||||
// The final response is not supposed to be truncated
|
||||
if response.is_truncated.unwrap_or_default() {
|
||||
anyhow::bail!(
|
||||
"Received truncated ListObjectVersions response for prefix={prefix:?}"
|
||||
);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let mut versions_deletes = list
|
||||
.versions()
|
||||
.iter()
|
||||
.map(VerOrDelete::Version)
|
||||
.chain(list.delete_markers().iter().map(VerOrDelete::DeleteMarker))
|
||||
.collect::<Vec<_>>();
|
||||
// Work on the list of references instead of the objects directly,
|
||||
// otherwise we get lifetime errors in the sort_by_key call below.
|
||||
let mut versions_and_deletes = versions_and_deletes.iter().collect::<Vec<_>>();
|
||||
|
||||
versions_deletes.sort_by_key(|vd| (vd.key(), vd.last_modified()));
|
||||
versions_and_deletes.sort_by_key(|vd| (vd.key(), vd.last_modified()));
|
||||
|
||||
let mut vds_for_key = HashMap::<_, Vec<_>>::new();
|
||||
|
||||
for vd in versions_deletes {
|
||||
for vd in &versions_and_deletes {
|
||||
let last_modified = vd.last_modified();
|
||||
let version_id = vd.version_id();
|
||||
let key = vd.key();
|
||||
@@ -811,25 +838,25 @@ fn start_measuring_requests(
|
||||
})
|
||||
}
|
||||
|
||||
enum VerOrDelete<'a> {
|
||||
Version(&'a ObjectVersion),
|
||||
DeleteMarker(&'a DeleteMarkerEntry),
|
||||
enum VerOrDelete {
|
||||
Version(ObjectVersion),
|
||||
DeleteMarker(DeleteMarkerEntry),
|
||||
}
|
||||
|
||||
impl<'a> VerOrDelete<'a> {
|
||||
fn last_modified(&self) -> Option<&'a DateTime> {
|
||||
impl VerOrDelete {
|
||||
fn last_modified(&self) -> Option<&DateTime> {
|
||||
match self {
|
||||
VerOrDelete::Version(v) => v.last_modified(),
|
||||
VerOrDelete::DeleteMarker(v) => v.last_modified(),
|
||||
}
|
||||
}
|
||||
fn version_id(&self) -> Option<&'a str> {
|
||||
fn version_id(&self) -> Option<&str> {
|
||||
match self {
|
||||
VerOrDelete::Version(v) => v.version_id(),
|
||||
VerOrDelete::DeleteMarker(v) => v.version_id(),
|
||||
}
|
||||
}
|
||||
fn key(&self) -> Option<&'a str> {
|
||||
fn key(&self) -> Option<&str> {
|
||||
match self {
|
||||
VerOrDelete::Version(v) => v.key(),
|
||||
VerOrDelete::DeleteMarker(v) => v.key(),
|
||||
|
||||
Reference in New Issue
Block a user