diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index 18146c5464..a5cddb840f 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -14,8 +14,9 @@ use anyhow::{Context, Result}; use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range}; use azure_core::{Continuable, HttpClient, RetryOptions, TransportOptions}; use azure_storage::StorageCredentials; -use azure_storage_blobs::blob::CopyStatus; use azure_storage_blobs::blob::operations::GetBlobBuilder; +use azure_storage_blobs::blob::{Blob, CopyStatus}; +use azure_storage_blobs::container::operations::ListBlobsBuilder; use azure_storage_blobs::prelude::{ClientBuilder, ContainerClient}; use bytes::Bytes; use futures::FutureExt; @@ -253,53 +254,15 @@ impl AzureBlobStorage { download } - async fn permit( - &self, - kind: RequestKind, - cancel: &CancellationToken, - ) -> Result, Cancelled> { - let acquire = self.concurrency_limiter.acquire(kind); - - tokio::select! { - permit = acquire => Ok(permit.expect("never closed")), - _ = cancel.cancelled() => Err(Cancelled), - } - } - - pub fn container_name(&self) -> &str { - &self.container_name - } -} - -fn to_azure_metadata(metadata: StorageMetadata) -> Metadata { - let mut res = Metadata::new(); - for (k, v) in metadata.0.into_iter() { - res.insert(k, v); - } - res -} - -fn to_download_error(error: azure_core::Error) -> DownloadError { - if let Some(http_err) = error.as_http_error() { - match http_err.status() { - StatusCode::NotFound => DownloadError::NotFound, - StatusCode::NotModified => DownloadError::Unmodified, - StatusCode::BadRequest => DownloadError::BadInput(anyhow::Error::new(error)), - _ => DownloadError::Other(anyhow::Error::new(error)), - } - } else { - DownloadError::Other(error.into()) - } -} - -impl RemoteStorage for AzureBlobStorage { - fn list_streaming( + fn list_streaming_for_fn( &self, prefix: Option<&RemotePath>, mode: ListingMode, max_keys: Option, cancel: &CancellationToken, - ) -> impl Stream> { + request_kind: RequestKind, + customize_builder: impl Fn(ListBlobsBuilder) -> ListBlobsBuilder, + ) -> impl Stream> { // get the passed prefix or if it is not set use prefix_in_bucket value let list_prefix = prefix.map(|p| self.relative_path_to_name(p)).or_else(|| { self.prefix_in_container.clone().map(|mut s| { @@ -311,7 +274,7 @@ impl RemoteStorage for AzureBlobStorage { }); async_stream::stream! { - let _permit = self.permit(RequestKind::List, cancel).await?; + let _permit = self.permit(request_kind, cancel).await?; let mut builder = self.client.list_blobs(); @@ -327,6 +290,8 @@ impl RemoteStorage for AzureBlobStorage { builder = builder.max_results(MaxResults::new(limit)); } + builder = customize_builder(builder); + let mut next_marker = None; let mut timeout_try_cnt = 1; @@ -382,26 +347,20 @@ impl RemoteStorage for AzureBlobStorage { break; }; - let mut res = Listing::default(); + let mut res = T::default(); next_marker = entry.continuation(); let prefix_iter = entry .blobs .prefixes() .map(|prefix| self.name_to_relative_path(&prefix.name)); - res.prefixes.extend(prefix_iter); + res.add_prefixes(self, prefix_iter); let blob_iter = entry .blobs - .blobs() - .map(|k| ListingObject{ - key: self.name_to_relative_path(&k.name), - last_modified: k.properties.last_modified.into(), - size: k.properties.content_length, - } - ); + .blobs(); for key in blob_iter { - res.keys.push(key); + res.add_blob(self, key); if let Some(mut mk) = max_keys { assert!(mk > 0); @@ -423,6 +382,128 @@ impl RemoteStorage for AzureBlobStorage { } } + async fn permit( + &self, + kind: RequestKind, + cancel: &CancellationToken, + ) -> Result, Cancelled> { + let acquire = self.concurrency_limiter.acquire(kind); + + tokio::select! { + permit = acquire => Ok(permit.expect("never closed")), + _ = cancel.cancelled() => Err(Cancelled), + } + } + + pub fn container_name(&self) -> &str { + &self.container_name + } +} + +trait ListingCollector { + fn add_prefixes(&mut self, abs: &AzureBlobStorage, prefix_it: impl Iterator); + fn add_blob(&mut self, abs: &AzureBlobStorage, blob: &Blob); +} + +impl ListingCollector for Listing { + fn add_prefixes( + &mut self, + _abs: &AzureBlobStorage, + prefix_it: impl Iterator, + ) { + self.prefixes.extend(prefix_it); + } + fn add_blob(&mut self, abs: &AzureBlobStorage, blob: &Blob) { + self.keys.push(ListingObject { + key: abs.name_to_relative_path(&blob.name), + last_modified: blob.properties.last_modified.into(), + size: blob.properties.content_length, + }); + } +} + +impl ListingCollector for crate::VersionListing { + fn add_prefixes( + &mut self, + _abs: &AzureBlobStorage, + _prefix_it: impl Iterator, + ) { + // nothing + } + fn add_blob(&mut self, abs: &AzureBlobStorage, blob: &Blob) { + let id = crate::VersionId(blob.version_id.clone().expect("didn't find version ID")); + self.versions.push(crate::Version { + key: abs.name_to_relative_path(&blob.name), + last_modified: blob.properties.last_modified.into(), + kind: crate::VersionKind::Version(id), + }); + } +} + +fn to_azure_metadata(metadata: StorageMetadata) -> Metadata { + let mut res = Metadata::new(); + for (k, v) in metadata.0.into_iter() { + res.insert(k, v); + } + res +} + +fn to_download_error(error: azure_core::Error) -> DownloadError { + if let Some(http_err) = error.as_http_error() { + match http_err.status() { + StatusCode::NotFound => DownloadError::NotFound, + StatusCode::NotModified => DownloadError::Unmodified, + StatusCode::BadRequest => DownloadError::BadInput(anyhow::Error::new(error)), + _ => DownloadError::Other(anyhow::Error::new(error)), + } + } else { + DownloadError::Other(error.into()) + } +} + +impl RemoteStorage for AzureBlobStorage { + fn list_streaming( + &self, + prefix: Option<&RemotePath>, + mode: ListingMode, + max_keys: Option, + cancel: &CancellationToken, + ) -> impl Stream> { + let customize_builder = |builder| builder; + let kind = RequestKind::ListVersions; + self.list_streaming_for_fn(prefix, mode, max_keys, cancel, kind, customize_builder) + } + + async fn list_versions( + &self, + prefix: Option<&RemotePath>, + mode: ListingMode, + max_keys: Option, + cancel: &CancellationToken, + ) -> std::result::Result { + let customize_builder = |mut builder: ListBlobsBuilder| { + builder = builder.include_versions(true); + builder + }; + let kind = RequestKind::ListVersions; + + let mut stream = std::pin::pin!(self.list_streaming_for_fn( + prefix, + mode, + max_keys, + cancel, + kind, + customize_builder + )); + let mut combined: crate::VersionListing = + stream.next().await.expect("At least one item required")?; + while let Some(list) = stream.next().await { + let list = list?; + combined.versions.extend(list.versions.into_iter()); + } + Ok(combined) + } + async fn head_object( &self, key: &RemotePath, @@ -532,7 +613,12 @@ impl RemoteStorage for AzureBlobStorage { let mut builder = blob_client.get(); if let Some(ref etag) = opts.etag { - builder = builder.if_match(IfMatchCondition::NotMatch(etag.to_string())) + builder = builder.if_match(IfMatchCondition::NotMatch(etag.to_string())); + } + + if let Some(ref version_id) = opts.version_id { + let version_id = azure_storage_blobs::prelude::VersionId::new(version_id.0.clone()); + builder = builder.blob_versioning(version_id); } if let Some((start, end)) = opts.byte_range() { diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 6eb5570d9b..b265d37a62 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -176,6 +176,32 @@ pub struct Listing { pub keys: Vec, } +#[derive(Default)] +pub struct VersionListing { + pub versions: Vec, +} + +pub struct Version { + pub key: RemotePath, + pub last_modified: SystemTime, + pub kind: VersionKind, +} + +impl Version { + pub fn version_id(&self) -> Option<&VersionId> { + match &self.kind { + VersionKind::Version(id) => Some(id), + VersionKind::DeletionMarker => None, + } + } +} + +#[derive(Debug)] +pub enum VersionKind { + DeletionMarker, + Version(VersionId), +} + /// Options for downloads. The default value is a plain GET. pub struct DownloadOpts { /// If given, returns [`DownloadError::Unmodified`] if the object still has @@ -186,6 +212,8 @@ pub struct DownloadOpts { /// The end of the byte range to download, or unbounded. Must be after the /// start bound. pub byte_end: Bound, + /// Optionally request a specific version of a key + pub version_id: Option, /// Indicate whether we're downloading something small or large: this indirectly controls /// timeouts: for something like an index/manifest/heatmap, we should time out faster than /// for layer files @@ -197,12 +225,16 @@ pub enum DownloadKind { Small, } +#[derive(Debug, Clone)] +pub struct VersionId(pub String); + impl Default for DownloadOpts { fn default() -> Self { Self { etag: Default::default(), byte_start: Bound::Unbounded, byte_end: Bound::Unbounded, + version_id: None, kind: DownloadKind::Large, } } @@ -295,6 +327,14 @@ pub trait RemoteStorage: Send + Sync + 'static { Ok(combined) } + async fn list_versions( + &self, + prefix: Option<&RemotePath>, + mode: ListingMode, + max_keys: Option, + cancel: &CancellationToken, + ) -> Result; + /// Obtain metadata information about an object. async fn head_object( &self, @@ -475,6 +515,22 @@ impl GenericRemoteStorage> { } } + // See [`RemoteStorage::list_versions`]. + pub async fn list_versions<'a>( + &'a self, + prefix: Option<&'a RemotePath>, + mode: ListingMode, + max_keys: Option, + cancel: &'a CancellationToken, + ) -> Result { + match self { + Self::LocalFs(s) => s.list_versions(prefix, mode, max_keys, cancel).await, + Self::AwsS3(s) => s.list_versions(prefix, mode, max_keys, cancel).await, + Self::AzureBlob(s) => s.list_versions(prefix, mode, max_keys, cancel).await, + Self::Unreliable(s) => s.list_versions(prefix, mode, max_keys, cancel).await, + } + } + // See [`RemoteStorage::head_object`]. pub async fn head_object( &self, @@ -727,6 +783,7 @@ impl ConcurrencyLimiter { RequestKind::Copy => &self.write, RequestKind::TimeTravel => &self.write, RequestKind::Head => &self.read, + RequestKind::ListVersions => &self.read, } } diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index f03d6ac8ee..6607b55f1a 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -445,6 +445,16 @@ impl RemoteStorage for LocalFs { } } + async fn list_versions( + &self, + _prefix: Option<&RemotePath>, + _mode: ListingMode, + _max_keys: Option, + _cancel: &CancellationToken, + ) -> Result { + unimplemented!() + } + async fn head_object( &self, key: &RemotePath, diff --git a/libs/remote_storage/src/metrics.rs b/libs/remote_storage/src/metrics.rs index 81e68e9a29..50d9823a8e 100644 --- a/libs/remote_storage/src/metrics.rs +++ b/libs/remote_storage/src/metrics.rs @@ -14,6 +14,7 @@ pub(crate) enum RequestKind { Copy = 4, TimeTravel = 5, Head = 6, + ListVersions = 7, } use RequestKind::*; @@ -29,6 +30,7 @@ impl RequestKind { Copy => "copy_object", TimeTravel => "time_travel_recover", Head => "head_object", + ListVersions => "list_versions", } } const fn as_index(&self) -> usize { @@ -36,7 +38,10 @@ impl RequestKind { } } -const REQUEST_KIND_COUNT: usize = 7; +const REQUEST_KIND_LIST: &[RequestKind] = + &[Get, Put, Delete, List, Copy, TimeTravel, Head, ListVersions]; + +const REQUEST_KIND_COUNT: usize = REQUEST_KIND_LIST.len(); pub(crate) struct RequestTyped([C; REQUEST_KIND_COUNT]); impl RequestTyped { @@ -45,12 +50,11 @@ impl RequestTyped { } fn build_with(mut f: impl FnMut(RequestKind) -> C) -> Self { - use RequestKind::*; - let mut it = [Get, Put, Delete, List, Copy, TimeTravel, Head].into_iter(); + let mut it = REQUEST_KIND_LIST.iter(); let arr = std::array::from_fn::(|index| { let next = it.next().unwrap(); assert_eq!(index, next.as_index()); - f(next) + f(*next) }); if let Some(next) = it.next() { diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index ba7ce9e1e7..918d9d5a6b 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -21,9 +21,8 @@ use aws_sdk_s3::config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep}; use aws_sdk_s3::error::SdkError; use aws_sdk_s3::operation::get_object::GetObjectError; use aws_sdk_s3::operation::head_object::HeadObjectError; -use aws_sdk_s3::types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion, StorageClass}; +use aws_sdk_s3::types::{Delete, ObjectIdentifier, StorageClass}; use aws_smithy_async::rt::sleep::TokioSleep; -use aws_smithy_types::DateTime; use aws_smithy_types::body::SdkBody; use aws_smithy_types::byte_stream::ByteStream; use aws_smithy_types::date_time::ConversionError; @@ -46,7 +45,7 @@ use crate::support::PermitCarrying; use crate::{ ConcurrencyLimiter, Download, DownloadError, DownloadOpts, Listing, ListingMode, ListingObject, MAX_KEYS_PER_DELETE_S3, REMOTE_STORAGE_PREFIX_SEPARATOR, RemotePath, RemoteStorage, - TimeTravelError, TimeoutOrCancel, + TimeTravelError, TimeoutOrCancel, Version, VersionId, VersionKind, VersionListing, }; /// AWS S3 storage. @@ -66,6 +65,7 @@ struct GetObjectRequest { key: String, etag: Option, range: Option, + version_id: Option, } impl S3Bucket { /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided. @@ -251,6 +251,7 @@ impl S3Bucket { .get_object() .bucket(request.bucket) .key(request.key) + .set_version_id(request.version_id) .set_range(request.range); if let Some(etag) = request.etag { @@ -405,6 +406,124 @@ impl S3Bucket { Ok(()) } + async fn list_versions_with_permit( + &self, + _permit: &tokio::sync::SemaphorePermit<'_>, + prefix: Option<&RemotePath>, + mode: ListingMode, + max_keys: Option, + cancel: &CancellationToken, + ) -> Result { + // get the passed prefix or if it is not set use prefix_in_bucket value + let prefix = prefix + .map(|p| self.relative_path_to_s3_object(p)) + .or_else(|| self.prefix_in_bucket.clone()); + + let warn_threshold = 3; + let max_retries = 10; + let is_permanent = |e: &_| matches!(e, DownloadError::Cancelled); + + let mut key_marker = None; + let mut version_id_marker = None; + let mut versions_and_deletes = Vec::new(); + + loop { + let response = backoff::retry( + || async { + let mut request = self + .client + .list_object_versions() + .bucket(self.bucket_name.clone()) + .set_prefix(prefix.clone()) + .set_key_marker(key_marker.clone()) + .set_version_id_marker(version_id_marker.clone()); + + if let ListingMode::WithDelimiter = mode { + request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string()); + } + + let op = request.send(); + + tokio::select! { + res = op => res.map_err(|e| DownloadError::Other(e.into())), + _ = cancel.cancelled() => Err(DownloadError::Cancelled), + } + }, + is_permanent, + warn_threshold, + max_retries, + "listing object versions", + cancel, + ) + .await + .ok_or_else(|| DownloadError::Cancelled) + .and_then(|x| x)?; + + tracing::trace!( + " Got List response version_id_marker={:?}, key_marker={:?}", + response.version_id_marker, + response.key_marker + ); + let versions = response + .versions + .unwrap_or_default() + .into_iter() + .map(|version| { + let key = version.key.expect("response does not contain a key"); + let key = self.s3_object_to_relative_path(&key); + let version_id = VersionId(version.version_id.expect("needing version id")); + let last_modified = + SystemTime::try_from(version.last_modified.expect("no last_modified"))?; + Ok(Version { + key, + last_modified, + kind: crate::VersionKind::Version(version_id), + }) + }); + let deletes = response + .delete_markers + .unwrap_or_default() + .into_iter() + .map(|version| { + let key = version.key.expect("response does not contain a key"); + let key = self.s3_object_to_relative_path(&key); + let last_modified = + SystemTime::try_from(version.last_modified.expect("no last_modified"))?; + Ok(Version { + key, + last_modified, + kind: crate::VersionKind::DeletionMarker, + }) + }); + itertools::process_results(versions.chain(deletes), |n_vds| { + versions_and_deletes.extend(n_vds) + }) + .map_err(DownloadError::Other)?; + fn none_if_empty(v: Option) -> Option { + v.filter(|v| !v.is_empty()) + } + version_id_marker = none_if_empty(response.next_version_id_marker); + key_marker = none_if_empty(response.next_key_marker); + if version_id_marker.is_none() { + // The final response is not supposed to be truncated + if response.is_truncated.unwrap_or_default() { + return Err(DownloadError::Other(anyhow::anyhow!( + "Received truncated ListObjectVersions response for prefix={prefix:?}" + ))); + } + break; + } + if let Some(max_keys) = max_keys { + if versions_and_deletes.len() >= max_keys.get().try_into().unwrap() { + return Err(DownloadError::Other(anyhow::anyhow!("too many versions"))); + } + } + } + Ok(VersionListing { + versions: versions_and_deletes, + }) + } + pub fn bucket_name(&self) -> &str { &self.bucket_name } @@ -621,6 +740,19 @@ impl RemoteStorage for S3Bucket { } } + async fn list_versions( + &self, + prefix: Option<&RemotePath>, + mode: ListingMode, + max_keys: Option, + cancel: &CancellationToken, + ) -> Result { + let kind = RequestKind::ListVersions; + let permit = self.permit(kind, cancel).await?; + self.list_versions_with_permit(&permit, prefix, mode, max_keys, cancel) + .await + } + async fn head_object( &self, key: &RemotePath, @@ -801,6 +933,7 @@ impl RemoteStorage for S3Bucket { key: self.relative_path_to_s3_object(from), etag: opts.etag.as_ref().map(|e| e.to_string()), range: opts.byte_range_header(), + version_id: opts.version_id.as_ref().map(|v| v.0.to_owned()), }, cancel, ) @@ -845,94 +978,25 @@ impl RemoteStorage for S3Bucket { let kind = RequestKind::TimeTravel; let permit = self.permit(kind, cancel).await?; - let timestamp = DateTime::from(timestamp); - let done_if_after = DateTime::from(done_if_after); - tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}"); - // get the passed prefix or if it is not set use prefix_in_bucket value - let prefix = prefix - .map(|p| self.relative_path_to_s3_object(p)) - .or_else(|| self.prefix_in_bucket.clone()); + // Limit the number of versions deletions, mostly so that we don't + // keep requesting forever if the list is too long, as we'd put the + // list in RAM. + // Building a list of 100k entries that reaches the limit roughly takes + // 40 seconds, and roughly corresponds to tenants of 2 TiB physical size. + const COMPLEXITY_LIMIT: Option = NonZeroU32::new(100_000); - let warn_threshold = 3; - let max_retries = 10; - let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled); - - let mut key_marker = None; - let mut version_id_marker = None; - let mut versions_and_deletes = Vec::new(); - - loop { - let response = backoff::retry( - || async { - let op = self - .client - .list_object_versions() - .bucket(self.bucket_name.clone()) - .set_prefix(prefix.clone()) - .set_key_marker(key_marker.clone()) - .set_version_id_marker(version_id_marker.clone()) - .send(); - - tokio::select! { - res = op => res.map_err(|e| TimeTravelError::Other(e.into())), - _ = cancel.cancelled() => Err(TimeTravelError::Cancelled), - } - }, - is_permanent, - warn_threshold, - max_retries, - "listing object versions for time_travel_recover", - cancel, - ) + let mode = ListingMode::NoDelimiter; + let version_listing = self + .list_versions_with_permit(&permit, prefix, mode, COMPLEXITY_LIMIT, cancel) .await - .ok_or_else(|| TimeTravelError::Cancelled) - .and_then(|x| x)?; - - tracing::trace!( - " Got List response version_id_marker={:?}, key_marker={:?}", - response.version_id_marker, - response.key_marker - ); - let versions = response - .versions - .unwrap_or_default() - .into_iter() - .map(VerOrDelete::from_version); - let deletes = response - .delete_markers - .unwrap_or_default() - .into_iter() - .map(VerOrDelete::from_delete_marker); - itertools::process_results(versions.chain(deletes), |n_vds| { - versions_and_deletes.extend(n_vds) - }) - .map_err(TimeTravelError::Other)?; - fn none_if_empty(v: Option) -> Option { - v.filter(|v| !v.is_empty()) - } - version_id_marker = none_if_empty(response.next_version_id_marker); - key_marker = none_if_empty(response.next_key_marker); - if version_id_marker.is_none() { - // The final response is not supposed to be truncated - if response.is_truncated.unwrap_or_default() { - return Err(TimeTravelError::Other(anyhow::anyhow!( - "Received truncated ListObjectVersions response for prefix={prefix:?}" - ))); - } - break; - } - // Limit the number of versions deletions, mostly so that we don't - // keep requesting forever if the list is too long, as we'd put the - // list in RAM. - // Building a list of 100k entries that reaches the limit roughly takes - // 40 seconds, and roughly corresponds to tenants of 2 TiB physical size. - const COMPLEXITY_LIMIT: usize = 100_000; - if versions_and_deletes.len() >= COMPLEXITY_LIMIT { - return Err(TimeTravelError::TooManyVersions); - } - } + .map_err(|err| match err { + DownloadError::Other(e) => TimeTravelError::Other(e), + DownloadError::Cancelled => TimeTravelError::Cancelled, + other => TimeTravelError::Other(other.into()), + })?; + let versions_and_deletes = version_listing.versions; tracing::info!( "Built list for time travel with {} versions and deletions", @@ -948,24 +1012,26 @@ impl RemoteStorage for S3Bucket { let mut vds_for_key = HashMap::<_, Vec<_>>::new(); for vd in &versions_and_deletes { - let VerOrDelete { - version_id, key, .. - } = &vd; - if version_id == "null" { + let Version { key, .. } = &vd; + let version_id = vd.version_id().map(|v| v.0.as_str()); + if version_id == Some("null") { return Err(TimeTravelError::Other(anyhow!( "Received ListVersions response for key={key} with version_id='null', \ indicating either disabled versioning, or legacy objects with null version id values" ))); } - tracing::trace!( - "Parsing version key={key} version_id={version_id} kind={:?}", - vd.kind - ); + tracing::trace!("Parsing version key={key} kind={:?}", vd.kind); vds_for_key.entry(key).or_default().push(vd); } + + let warn_threshold = 3; + let max_retries = 10; + let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled); + for (key, versions) in vds_for_key { let last_vd = versions.last().unwrap(); + let key = self.relative_path_to_s3_object(key); if last_vd.last_modified > done_if_after { tracing::trace!("Key {key} has version later than done_if_after, skipping"); continue; @@ -990,11 +1056,11 @@ impl RemoteStorage for S3Bucket { do_delete = true; } else { match &versions[version_to_restore_to - 1] { - VerOrDelete { - kind: VerOrDeleteKind::Version, - version_id, + Version { + kind: VersionKind::Version(version_id), .. } => { + let version_id = &version_id.0; tracing::trace!("Copying old version {version_id} for {key}..."); // Restore the state to the last version by copying let source_id = @@ -1006,7 +1072,7 @@ impl RemoteStorage for S3Bucket { .client .copy_object() .bucket(self.bucket_name.clone()) - .key(key) + .key(&key) .set_storage_class(self.upload_storage_class.clone()) .copy_source(&source_id) .send(); @@ -1027,8 +1093,8 @@ impl RemoteStorage for S3Bucket { .and_then(|x| x)?; tracing::info!(%version_id, %key, "Copied old version in S3"); } - VerOrDelete { - kind: VerOrDeleteKind::DeleteMarker, + Version { + kind: VersionKind::DeletionMarker, .. } => { do_delete = true; @@ -1036,7 +1102,7 @@ impl RemoteStorage for S3Bucket { } }; if do_delete { - if matches!(last_vd.kind, VerOrDeleteKind::DeleteMarker) { + if matches!(last_vd.kind, VersionKind::DeletionMarker) { // Key has since been deleted (but there was some history), no need to do anything tracing::trace!("Key {key} already deleted, skipping."); } else { @@ -1064,62 +1130,6 @@ impl RemoteStorage for S3Bucket { } } -// Save RAM and only store the needed data instead of the entire ObjectVersion/DeleteMarkerEntry -struct VerOrDelete { - kind: VerOrDeleteKind, - last_modified: DateTime, - version_id: String, - key: String, -} - -#[derive(Debug)] -enum VerOrDeleteKind { - Version, - DeleteMarker, -} - -impl VerOrDelete { - fn with_kind( - kind: VerOrDeleteKind, - last_modified: Option, - version_id: Option, - key: Option, - ) -> anyhow::Result { - let lvk = (last_modified, version_id, key); - let (Some(last_modified), Some(version_id), Some(key)) = lvk else { - anyhow::bail!( - "One (or more) of last_modified, key, and id is None. \ - Is versioning enabled in the bucket? last_modified={:?}, version_id={:?}, key={:?}", - lvk.0, - lvk.1, - lvk.2, - ); - }; - Ok(Self { - kind, - last_modified, - version_id, - key, - }) - } - fn from_version(v: ObjectVersion) -> anyhow::Result { - Self::with_kind( - VerOrDeleteKind::Version, - v.last_modified, - v.version_id, - v.key, - ) - } - fn from_delete_marker(v: DeleteMarkerEntry) -> anyhow::Result { - Self::with_kind( - VerOrDeleteKind::DeleteMarker, - v.last_modified, - v.version_id, - v.key, - ) - } -} - #[cfg(test)] mod tests { use std::num::NonZeroUsize; diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index f56be873c4..894cf600be 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -139,6 +139,20 @@ impl RemoteStorage for UnreliableWrapper { self.inner.list(prefix, mode, max_keys, cancel).await } + async fn list_versions( + &self, + prefix: Option<&RemotePath>, + mode: ListingMode, + max_keys: Option, + cancel: &CancellationToken, + ) -> Result { + self.attempt(RemoteOp::ListPrefixes(prefix.cloned())) + .map_err(DownloadError::Other)?; + self.inner + .list_versions(prefix, mode, max_keys, cancel) + .await + } + async fn head_object( &self, key: &RemotePath, diff --git a/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs b/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs index 2929f30dce..e7aa8f6038 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs @@ -238,7 +238,8 @@ impl RemoteStorageWrapper { kind: DownloadKind::Large, etag: None, byte_start: Bound::Included(start_inclusive), - byte_end: Bound::Excluded(end_exclusive) + byte_end: Bound::Excluded(end_exclusive), + version_id: None, }, &self.cancel) .await?;