diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 83f3015eab..e615a1ce7e 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -687,12 +687,19 @@ impl RemoteStorage for S3Bucket { response.version_id_marker, response.key_marker ); - let versions = response.versions.unwrap_or_default(); - let delete_markers = response.delete_markers.unwrap_or_default(); - let new_versions = versions.into_iter().map(VerOrDelete::Version); - let new_deletes = delete_markers.into_iter().map(VerOrDelete::DeleteMarker); - let new_versions_and_deletes = new_versions.chain(new_deletes); - versions_and_deletes.extend(new_versions_and_deletes); + let versions = response + .versions + .unwrap_or_default() + .into_iter() + .map(VerOrDelete::from_version); + let deletes = response + .delete_markers + .unwrap_or_default() + .into_iter() + .map(VerOrDelete::from_delete_marker); + itertools::process_results(versions.chain(deletes), |n_vds| { + versions_and_deletes.extend(n_vds) + })?; fn none_if_empty(v: Option) -> Option { v.filter(|v| !v.is_empty()) } @@ -707,52 +714,51 @@ impl RemoteStorage for S3Bucket { } break; } + // Limit the number of versions deletions, mostly so that we don't + // keep requesting forever if the list is too long, as we'd put the + // list in RAM. + // Building a list of 100k entries that reaches the limit roughly takes + // 40 seconds, and roughly corresponds to tenants of 2 TiB physical size. + const COMPLEXITY_LIMIT: usize = 100_000; + if versions_and_deletes.len() >= COMPLEXITY_LIMIT { + anyhow::bail!( + "Limit for number of versions/deletions exceeded for prefix={prefix:?}" + ); + } } // Work on the list of references instead of the objects directly, // otherwise we get lifetime errors in the sort_by_key call below. let mut versions_and_deletes = versions_and_deletes.iter().collect::>(); - versions_and_deletes.sort_by_key(|vd| (vd.key(), vd.last_modified())); + versions_and_deletes.sort_by_key(|vd| (&vd.key, &vd.last_modified)); let mut vds_for_key = HashMap::<_, Vec<_>>::new(); for vd in &versions_and_deletes { - let last_modified = vd.last_modified(); - let version_id = vd.version_id(); - let key = vd.key(); - let (Some(last_modified), Some(version_id), Some(key)) = - (last_modified, version_id, key) - else { - anyhow::bail!( - "One (or more) of last_modified, key, and id is None. \ - Is versioning enabled in the bucket? last_modified={:?} key={:?} version_id={:?}", - last_modified, key, version_id, - ); - }; + let VerOrDelete { + version_id, key, .. + } = &vd; if version_id == "null" { anyhow::bail!("Received ListVersions response for key={key} with version_id='null', \ indicating either disabled versioning, or legacy objects with null version id values"); } tracing::trace!( - "Parsing version key={key} version_id={version_id} is_delete={}", - matches!(vd, VerOrDelete::DeleteMarker(_)) + "Parsing version key={key} version_id={version_id} kind={:?}", + vd.kind ); - vds_for_key - .entry(key) - .or_default() - .push((vd, last_modified, version_id)); + vds_for_key.entry(key).or_default().push(vd); } for (key, versions) in vds_for_key { - let (last_vd, last_last_modified, _version_id) = versions.last().unwrap(); - if last_last_modified > &&done_if_after { + let last_vd = versions.last().unwrap(); + if last_vd.last_modified > done_if_after { tracing::trace!("Key {key} has version later than done_if_after, skipping"); continue; } // the version we want to restore to. let version_to_restore_to = - match versions.binary_search_by_key(×tamp, |tpl| *tpl.1) { + match versions.binary_search_by_key(×tamp, |tpl| tpl.last_modified) { Ok(v) => v, Err(e) => e, }; @@ -770,7 +776,11 @@ impl RemoteStorage for S3Bucket { do_delete = true; } else { match &versions[version_to_restore_to - 1] { - (VerOrDelete::Version(_), _last_modified, version_id) => { + VerOrDelete { + kind: VerOrDeleteKind::Version, + version_id, + .. + } => { tracing::trace!("Copying old version {version_id} for {key}..."); // Restore the state to the last version by copying let source_id = @@ -795,13 +805,16 @@ impl RemoteStorage for S3Bucket { ) .await?; } - (VerOrDelete::DeleteMarker(_), _last_modified, _version_id) => { + VerOrDelete { + kind: VerOrDeleteKind::DeleteMarker, + .. + } => { do_delete = true; } } }; if do_delete { - if matches!(last_vd, VerOrDelete::DeleteMarker(_)) { + if matches!(last_vd.kind, VerOrDeleteKind::DeleteMarker) { // Key has since been deleted (but there was some history), no need to do anything tracing::trace!("Key {key} already deleted, skipping."); } else { @@ -838,29 +851,59 @@ fn start_measuring_requests( }) } -enum VerOrDelete { - Version(ObjectVersion), - DeleteMarker(DeleteMarkerEntry), +// Save RAM and only store the needed data instead of the entire ObjectVersion/DeleteMarkerEntry +struct VerOrDelete { + kind: VerOrDeleteKind, + last_modified: DateTime, + version_id: String, + key: String, +} + +#[derive(Debug)] +enum VerOrDeleteKind { + Version, + DeleteMarker, } impl VerOrDelete { - fn last_modified(&self) -> Option<&DateTime> { - match self { - VerOrDelete::Version(v) => v.last_modified(), - VerOrDelete::DeleteMarker(v) => v.last_modified(), - } + fn with_kind( + kind: VerOrDeleteKind, + last_modified: Option, + version_id: Option, + key: Option, + ) -> anyhow::Result { + let lvk = (last_modified, version_id, key); + let (Some(last_modified), Some(version_id), Some(key)) = lvk else { + anyhow::bail!( + "One (or more) of last_modified, key, and id is None. \ + Is versioning enabled in the bucket? last_modified={:?}, version_id={:?}, key={:?}", + lvk.0, + lvk.1, + lvk.2, + ); + }; + Ok(Self { + kind, + last_modified, + version_id, + key, + }) } - fn version_id(&self) -> Option<&str> { - match self { - VerOrDelete::Version(v) => v.version_id(), - VerOrDelete::DeleteMarker(v) => v.version_id(), - } + fn from_version(v: ObjectVersion) -> anyhow::Result { + Self::with_kind( + VerOrDeleteKind::Version, + v.last_modified, + v.version_id, + v.key, + ) } - fn key(&self) -> Option<&str> { - match self { - VerOrDelete::Version(v) => v.key(), - VerOrDelete::DeleteMarker(v) => v.key(), - } + fn from_delete_marker(v: DeleteMarkerEntry) -> anyhow::Result { + Self::with_kind( + VerOrDeleteKind::DeleteMarker, + v.last_modified, + v.version_id, + v.key, + ) } }