From b844c6f0c754f0994182f8c367a50a01e4b7e023 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Mon, 29 Jan 2024 17:59:26 +0100 Subject: [PATCH] Do pagination in list_object_versions call (#6500) ## Problem The tenants we want to recover might have tens of thousands of keys, or more. At that point, the AWS API returns a paginated response. ## Summary of changes Support paginated responses for `list_object_versions` requests. Follow-up of #6155, part of https://github.com/neondatabase/cloud/issues/8233 --- libs/remote_storage/src/s3_bucket.rs | 97 ++++++++++++++++++---------- 1 file changed, 62 insertions(+), 35 deletions(-) diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 4909b8522b..83f3015eab 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -646,7 +646,7 @@ impl RemoteStorage for S3Bucket { let timestamp = DateTime::from(timestamp); let done_if_after = DateTime::from(done_if_after); - tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}"); + tracing::info!("Target time: {timestamp:?}, done_if_after {done_if_after:?}"); // get the passed prefix or if it is not set use prefix_in_bucket value let prefix = prefix @@ -657,40 +657,67 @@ impl RemoteStorage for S3Bucket { let max_retries = 10; let is_permanent = |_e: &_| false; - let list = backoff::retry( - || async { - Ok(self - .client - .list_object_versions() - .bucket(self.bucket_name.clone()) - .set_prefix(prefix.clone()) - .send() - .await?) - }, - is_permanent, - warn_threshold, - max_retries, - "listing object versions for time_travel_recover", - backoff::Cancel::new(cancel.clone(), || anyhow!("Cancelled")), - ) - .await?; + let mut key_marker = None; + let mut version_id_marker = None; + let mut versions_and_deletes = Vec::new(); - if list.is_truncated().unwrap_or_default() { - anyhow::bail!("Received truncated ListObjectVersions response for prefix={prefix:?}"); + loop { + let response = backoff::retry( + || async { + Ok(self + .client + .list_object_versions() + .bucket(self.bucket_name.clone()) + .set_prefix(prefix.clone()) + .set_key_marker(key_marker.clone()) + .set_version_id_marker(version_id_marker.clone()) + .send() + .await?) + }, + is_permanent, + warn_threshold, + max_retries, + "listing object versions for time_travel_recover", + backoff::Cancel::new(cancel.clone(), || anyhow!("Cancelled")), + ) + .await?; + + tracing::trace!( + " Got List response version_id_marker={:?}, key_marker={:?}", + response.version_id_marker, + response.key_marker + ); + let versions = response.versions.unwrap_or_default(); + let delete_markers = response.delete_markers.unwrap_or_default(); + let new_versions = versions.into_iter().map(VerOrDelete::Version); + let new_deletes = delete_markers.into_iter().map(VerOrDelete::DeleteMarker); + let new_versions_and_deletes = new_versions.chain(new_deletes); + versions_and_deletes.extend(new_versions_and_deletes); + fn none_if_empty(v: Option) -> Option { + v.filter(|v| !v.is_empty()) + } + version_id_marker = none_if_empty(response.next_version_id_marker); + key_marker = none_if_empty(response.next_key_marker); + if version_id_marker.is_none() { + // The final response is not supposed to be truncated + if response.is_truncated.unwrap_or_default() { + anyhow::bail!( + "Received truncated ListObjectVersions response for prefix={prefix:?}" + ); + } + break; + } } - let mut versions_deletes = list - .versions() - .iter() - .map(VerOrDelete::Version) - .chain(list.delete_markers().iter().map(VerOrDelete::DeleteMarker)) - .collect::>(); + // Work on the list of references instead of the objects directly, + // otherwise we get lifetime errors in the sort_by_key call below. + let mut versions_and_deletes = versions_and_deletes.iter().collect::>(); - versions_deletes.sort_by_key(|vd| (vd.key(), vd.last_modified())); + versions_and_deletes.sort_by_key(|vd| (vd.key(), vd.last_modified())); let mut vds_for_key = HashMap::<_, Vec<_>>::new(); - for vd in versions_deletes { + for vd in &versions_and_deletes { let last_modified = vd.last_modified(); let version_id = vd.version_id(); let key = vd.key(); @@ -811,25 +838,25 @@ fn start_measuring_requests( }) } -enum VerOrDelete<'a> { - Version(&'a ObjectVersion), - DeleteMarker(&'a DeleteMarkerEntry), +enum VerOrDelete { + Version(ObjectVersion), + DeleteMarker(DeleteMarkerEntry), } -impl<'a> VerOrDelete<'a> { - fn last_modified(&self) -> Option<&'a DateTime> { +impl VerOrDelete { + fn last_modified(&self) -> Option<&DateTime> { match self { VerOrDelete::Version(v) => v.last_modified(), VerOrDelete::DeleteMarker(v) => v.last_modified(), } } - fn version_id(&self) -> Option<&'a str> { + fn version_id(&self) -> Option<&str> { match self { VerOrDelete::Version(v) => v.version_id(), VerOrDelete::DeleteMarker(v) => v.version_id(), } } - fn key(&self) -> Option<&'a str> { + fn key(&self) -> Option<&str> { match self { VerOrDelete::Version(v) => v.key(), VerOrDelete::DeleteMarker(v) => v.key(),