mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
versioning API for remote_storage (#11671)
Adds a versioning API to remote_storage. We want to use it in the scrubber, both for tenant snapshot as well as for metadata checks. for #8830 and for #11588
This commit is contained in:
@@ -14,8 +14,9 @@ use anyhow::{Context, Result};
|
||||
use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range};
|
||||
use azure_core::{Continuable, HttpClient, RetryOptions, TransportOptions};
|
||||
use azure_storage::StorageCredentials;
|
||||
use azure_storage_blobs::blob::CopyStatus;
|
||||
use azure_storage_blobs::blob::operations::GetBlobBuilder;
|
||||
use azure_storage_blobs::blob::{Blob, CopyStatus};
|
||||
use azure_storage_blobs::container::operations::ListBlobsBuilder;
|
||||
use azure_storage_blobs::prelude::{ClientBuilder, ContainerClient};
|
||||
use bytes::Bytes;
|
||||
use futures::FutureExt;
|
||||
@@ -253,53 +254,15 @@ impl AzureBlobStorage {
|
||||
download
|
||||
}
|
||||
|
||||
async fn permit(
|
||||
&self,
|
||||
kind: RequestKind,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
|
||||
let acquire = self.concurrency_limiter.acquire(kind);
|
||||
|
||||
tokio::select! {
|
||||
permit = acquire => Ok(permit.expect("never closed")),
|
||||
_ = cancel.cancelled() => Err(Cancelled),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn container_name(&self) -> &str {
|
||||
&self.container_name
|
||||
}
|
||||
}
|
||||
|
||||
fn to_azure_metadata(metadata: StorageMetadata) -> Metadata {
|
||||
let mut res = Metadata::new();
|
||||
for (k, v) in metadata.0.into_iter() {
|
||||
res.insert(k, v);
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
fn to_download_error(error: azure_core::Error) -> DownloadError {
|
||||
if let Some(http_err) = error.as_http_error() {
|
||||
match http_err.status() {
|
||||
StatusCode::NotFound => DownloadError::NotFound,
|
||||
StatusCode::NotModified => DownloadError::Unmodified,
|
||||
StatusCode::BadRequest => DownloadError::BadInput(anyhow::Error::new(error)),
|
||||
_ => DownloadError::Other(anyhow::Error::new(error)),
|
||||
}
|
||||
} else {
|
||||
DownloadError::Other(error.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl RemoteStorage for AzureBlobStorage {
|
||||
fn list_streaming(
|
||||
fn list_streaming_for_fn<T: Default + ListingCollector>(
|
||||
&self,
|
||||
prefix: Option<&RemotePath>,
|
||||
mode: ListingMode,
|
||||
max_keys: Option<NonZeroU32>,
|
||||
cancel: &CancellationToken,
|
||||
) -> impl Stream<Item = Result<Listing, DownloadError>> {
|
||||
request_kind: RequestKind,
|
||||
customize_builder: impl Fn(ListBlobsBuilder) -> ListBlobsBuilder,
|
||||
) -> impl Stream<Item = Result<T, DownloadError>> {
|
||||
// get the passed prefix or if it is not set use prefix_in_bucket value
|
||||
let list_prefix = prefix.map(|p| self.relative_path_to_name(p)).or_else(|| {
|
||||
self.prefix_in_container.clone().map(|mut s| {
|
||||
@@ -311,7 +274,7 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
});
|
||||
|
||||
async_stream::stream! {
|
||||
let _permit = self.permit(RequestKind::List, cancel).await?;
|
||||
let _permit = self.permit(request_kind, cancel).await?;
|
||||
|
||||
let mut builder = self.client.list_blobs();
|
||||
|
||||
@@ -327,6 +290,8 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
builder = builder.max_results(MaxResults::new(limit));
|
||||
}
|
||||
|
||||
builder = customize_builder(builder);
|
||||
|
||||
let mut next_marker = None;
|
||||
|
||||
let mut timeout_try_cnt = 1;
|
||||
@@ -382,26 +347,20 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
break;
|
||||
};
|
||||
|
||||
let mut res = Listing::default();
|
||||
let mut res = T::default();
|
||||
next_marker = entry.continuation();
|
||||
let prefix_iter = entry
|
||||
.blobs
|
||||
.prefixes()
|
||||
.map(|prefix| self.name_to_relative_path(&prefix.name));
|
||||
res.prefixes.extend(prefix_iter);
|
||||
res.add_prefixes(self, prefix_iter);
|
||||
|
||||
let blob_iter = entry
|
||||
.blobs
|
||||
.blobs()
|
||||
.map(|k| ListingObject{
|
||||
key: self.name_to_relative_path(&k.name),
|
||||
last_modified: k.properties.last_modified.into(),
|
||||
size: k.properties.content_length,
|
||||
}
|
||||
);
|
||||
.blobs();
|
||||
|
||||
for key in blob_iter {
|
||||
res.keys.push(key);
|
||||
res.add_blob(self, key);
|
||||
|
||||
if let Some(mut mk) = max_keys {
|
||||
assert!(mk > 0);
|
||||
@@ -423,6 +382,128 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
}
|
||||
}
|
||||
|
||||
async fn permit(
|
||||
&self,
|
||||
kind: RequestKind,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
|
||||
let acquire = self.concurrency_limiter.acquire(kind);
|
||||
|
||||
tokio::select! {
|
||||
permit = acquire => Ok(permit.expect("never closed")),
|
||||
_ = cancel.cancelled() => Err(Cancelled),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn container_name(&self) -> &str {
|
||||
&self.container_name
|
||||
}
|
||||
}
|
||||
|
||||
trait ListingCollector {
|
||||
fn add_prefixes(&mut self, abs: &AzureBlobStorage, prefix_it: impl Iterator<Item = RemotePath>);
|
||||
fn add_blob(&mut self, abs: &AzureBlobStorage, blob: &Blob);
|
||||
}
|
||||
|
||||
impl ListingCollector for Listing {
|
||||
fn add_prefixes(
|
||||
&mut self,
|
||||
_abs: &AzureBlobStorage,
|
||||
prefix_it: impl Iterator<Item = RemotePath>,
|
||||
) {
|
||||
self.prefixes.extend(prefix_it);
|
||||
}
|
||||
fn add_blob(&mut self, abs: &AzureBlobStorage, blob: &Blob) {
|
||||
self.keys.push(ListingObject {
|
||||
key: abs.name_to_relative_path(&blob.name),
|
||||
last_modified: blob.properties.last_modified.into(),
|
||||
size: blob.properties.content_length,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl ListingCollector for crate::VersionListing {
|
||||
fn add_prefixes(
|
||||
&mut self,
|
||||
_abs: &AzureBlobStorage,
|
||||
_prefix_it: impl Iterator<Item = RemotePath>,
|
||||
) {
|
||||
// nothing
|
||||
}
|
||||
fn add_blob(&mut self, abs: &AzureBlobStorage, blob: &Blob) {
|
||||
let id = crate::VersionId(blob.version_id.clone().expect("didn't find version ID"));
|
||||
self.versions.push(crate::Version {
|
||||
key: abs.name_to_relative_path(&blob.name),
|
||||
last_modified: blob.properties.last_modified.into(),
|
||||
kind: crate::VersionKind::Version(id),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn to_azure_metadata(metadata: StorageMetadata) -> Metadata {
|
||||
let mut res = Metadata::new();
|
||||
for (k, v) in metadata.0.into_iter() {
|
||||
res.insert(k, v);
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
fn to_download_error(error: azure_core::Error) -> DownloadError {
|
||||
if let Some(http_err) = error.as_http_error() {
|
||||
match http_err.status() {
|
||||
StatusCode::NotFound => DownloadError::NotFound,
|
||||
StatusCode::NotModified => DownloadError::Unmodified,
|
||||
StatusCode::BadRequest => DownloadError::BadInput(anyhow::Error::new(error)),
|
||||
_ => DownloadError::Other(anyhow::Error::new(error)),
|
||||
}
|
||||
} else {
|
||||
DownloadError::Other(error.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl RemoteStorage for AzureBlobStorage {
|
||||
fn list_streaming(
|
||||
&self,
|
||||
prefix: Option<&RemotePath>,
|
||||
mode: ListingMode,
|
||||
max_keys: Option<NonZeroU32>,
|
||||
cancel: &CancellationToken,
|
||||
) -> impl Stream<Item = Result<Listing, DownloadError>> {
|
||||
let customize_builder = |builder| builder;
|
||||
let kind = RequestKind::ListVersions;
|
||||
self.list_streaming_for_fn(prefix, mode, max_keys, cancel, kind, customize_builder)
|
||||
}
|
||||
|
||||
async fn list_versions(
|
||||
&self,
|
||||
prefix: Option<&RemotePath>,
|
||||
mode: ListingMode,
|
||||
max_keys: Option<NonZeroU32>,
|
||||
cancel: &CancellationToken,
|
||||
) -> std::result::Result<crate::VersionListing, DownloadError> {
|
||||
let customize_builder = |mut builder: ListBlobsBuilder| {
|
||||
builder = builder.include_versions(true);
|
||||
builder
|
||||
};
|
||||
let kind = RequestKind::ListVersions;
|
||||
|
||||
let mut stream = std::pin::pin!(self.list_streaming_for_fn(
|
||||
prefix,
|
||||
mode,
|
||||
max_keys,
|
||||
cancel,
|
||||
kind,
|
||||
customize_builder
|
||||
));
|
||||
let mut combined: crate::VersionListing =
|
||||
stream.next().await.expect("At least one item required")?;
|
||||
while let Some(list) = stream.next().await {
|
||||
let list = list?;
|
||||
combined.versions.extend(list.versions.into_iter());
|
||||
}
|
||||
Ok(combined)
|
||||
}
|
||||
|
||||
async fn head_object(
|
||||
&self,
|
||||
key: &RemotePath,
|
||||
@@ -532,7 +613,12 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
let mut builder = blob_client.get();
|
||||
|
||||
if let Some(ref etag) = opts.etag {
|
||||
builder = builder.if_match(IfMatchCondition::NotMatch(etag.to_string()))
|
||||
builder = builder.if_match(IfMatchCondition::NotMatch(etag.to_string()));
|
||||
}
|
||||
|
||||
if let Some(ref version_id) = opts.version_id {
|
||||
let version_id = azure_storage_blobs::prelude::VersionId::new(version_id.0.clone());
|
||||
builder = builder.blob_versioning(version_id);
|
||||
}
|
||||
|
||||
if let Some((start, end)) = opts.byte_range() {
|
||||
|
||||
@@ -176,6 +176,32 @@ pub struct Listing {
|
||||
pub keys: Vec<ListingObject>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct VersionListing {
|
||||
pub versions: Vec<Version>,
|
||||
}
|
||||
|
||||
pub struct Version {
|
||||
pub key: RemotePath,
|
||||
pub last_modified: SystemTime,
|
||||
pub kind: VersionKind,
|
||||
}
|
||||
|
||||
impl Version {
|
||||
pub fn version_id(&self) -> Option<&VersionId> {
|
||||
match &self.kind {
|
||||
VersionKind::Version(id) => Some(id),
|
||||
VersionKind::DeletionMarker => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum VersionKind {
|
||||
DeletionMarker,
|
||||
Version(VersionId),
|
||||
}
|
||||
|
||||
/// Options for downloads. The default value is a plain GET.
|
||||
pub struct DownloadOpts {
|
||||
/// If given, returns [`DownloadError::Unmodified`] if the object still has
|
||||
@@ -186,6 +212,8 @@ pub struct DownloadOpts {
|
||||
/// The end of the byte range to download, or unbounded. Must be after the
|
||||
/// start bound.
|
||||
pub byte_end: Bound<u64>,
|
||||
/// Optionally request a specific version of a key
|
||||
pub version_id: Option<VersionId>,
|
||||
/// Indicate whether we're downloading something small or large: this indirectly controls
|
||||
/// timeouts: for something like an index/manifest/heatmap, we should time out faster than
|
||||
/// for layer files
|
||||
@@ -197,12 +225,16 @@ pub enum DownloadKind {
|
||||
Small,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct VersionId(pub String);
|
||||
|
||||
impl Default for DownloadOpts {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
etag: Default::default(),
|
||||
byte_start: Bound::Unbounded,
|
||||
byte_end: Bound::Unbounded,
|
||||
version_id: None,
|
||||
kind: DownloadKind::Large,
|
||||
}
|
||||
}
|
||||
@@ -295,6 +327,14 @@ pub trait RemoteStorage: Send + Sync + 'static {
|
||||
Ok(combined)
|
||||
}
|
||||
|
||||
async fn list_versions(
|
||||
&self,
|
||||
prefix: Option<&RemotePath>,
|
||||
mode: ListingMode,
|
||||
max_keys: Option<NonZeroU32>,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<VersionListing, DownloadError>;
|
||||
|
||||
/// Obtain metadata information about an object.
|
||||
async fn head_object(
|
||||
&self,
|
||||
@@ -475,6 +515,22 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
}
|
||||
}
|
||||
|
||||
// See [`RemoteStorage::list_versions`].
|
||||
pub async fn list_versions<'a>(
|
||||
&'a self,
|
||||
prefix: Option<&'a RemotePath>,
|
||||
mode: ListingMode,
|
||||
max_keys: Option<NonZeroU32>,
|
||||
cancel: &'a CancellationToken,
|
||||
) -> Result<VersionListing, DownloadError> {
|
||||
match self {
|
||||
Self::LocalFs(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
|
||||
Self::AwsS3(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
|
||||
Self::AzureBlob(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
|
||||
Self::Unreliable(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
|
||||
}
|
||||
}
|
||||
|
||||
// See [`RemoteStorage::head_object`].
|
||||
pub async fn head_object(
|
||||
&self,
|
||||
@@ -727,6 +783,7 @@ impl ConcurrencyLimiter {
|
||||
RequestKind::Copy => &self.write,
|
||||
RequestKind::TimeTravel => &self.write,
|
||||
RequestKind::Head => &self.read,
|
||||
RequestKind::ListVersions => &self.read,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -445,6 +445,16 @@ impl RemoteStorage for LocalFs {
|
||||
}
|
||||
}
|
||||
|
||||
async fn list_versions(
|
||||
&self,
|
||||
_prefix: Option<&RemotePath>,
|
||||
_mode: ListingMode,
|
||||
_max_keys: Option<NonZeroU32>,
|
||||
_cancel: &CancellationToken,
|
||||
) -> Result<crate::VersionListing, DownloadError> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn head_object(
|
||||
&self,
|
||||
key: &RemotePath,
|
||||
|
||||
@@ -14,6 +14,7 @@ pub(crate) enum RequestKind {
|
||||
Copy = 4,
|
||||
TimeTravel = 5,
|
||||
Head = 6,
|
||||
ListVersions = 7,
|
||||
}
|
||||
|
||||
use RequestKind::*;
|
||||
@@ -29,6 +30,7 @@ impl RequestKind {
|
||||
Copy => "copy_object",
|
||||
TimeTravel => "time_travel_recover",
|
||||
Head => "head_object",
|
||||
ListVersions => "list_versions",
|
||||
}
|
||||
}
|
||||
const fn as_index(&self) -> usize {
|
||||
@@ -36,7 +38,10 @@ impl RequestKind {
|
||||
}
|
||||
}
|
||||
|
||||
const REQUEST_KIND_COUNT: usize = 7;
|
||||
const REQUEST_KIND_LIST: &[RequestKind] =
|
||||
&[Get, Put, Delete, List, Copy, TimeTravel, Head, ListVersions];
|
||||
|
||||
const REQUEST_KIND_COUNT: usize = REQUEST_KIND_LIST.len();
|
||||
pub(crate) struct RequestTyped<C>([C; REQUEST_KIND_COUNT]);
|
||||
|
||||
impl<C> RequestTyped<C> {
|
||||
@@ -45,12 +50,11 @@ impl<C> RequestTyped<C> {
|
||||
}
|
||||
|
||||
fn build_with(mut f: impl FnMut(RequestKind) -> C) -> Self {
|
||||
use RequestKind::*;
|
||||
let mut it = [Get, Put, Delete, List, Copy, TimeTravel, Head].into_iter();
|
||||
let mut it = REQUEST_KIND_LIST.iter();
|
||||
let arr = std::array::from_fn::<C, REQUEST_KIND_COUNT, _>(|index| {
|
||||
let next = it.next().unwrap();
|
||||
assert_eq!(index, next.as_index());
|
||||
f(next)
|
||||
f(*next)
|
||||
});
|
||||
|
||||
if let Some(next) = it.next() {
|
||||
|
||||
@@ -21,9 +21,8 @@ use aws_sdk_s3::config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep};
|
||||
use aws_sdk_s3::error::SdkError;
|
||||
use aws_sdk_s3::operation::get_object::GetObjectError;
|
||||
use aws_sdk_s3::operation::head_object::HeadObjectError;
|
||||
use aws_sdk_s3::types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion, StorageClass};
|
||||
use aws_sdk_s3::types::{Delete, ObjectIdentifier, StorageClass};
|
||||
use aws_smithy_async::rt::sleep::TokioSleep;
|
||||
use aws_smithy_types::DateTime;
|
||||
use aws_smithy_types::body::SdkBody;
|
||||
use aws_smithy_types::byte_stream::ByteStream;
|
||||
use aws_smithy_types::date_time::ConversionError;
|
||||
@@ -46,7 +45,7 @@ use crate::support::PermitCarrying;
|
||||
use crate::{
|
||||
ConcurrencyLimiter, Download, DownloadError, DownloadOpts, Listing, ListingMode, ListingObject,
|
||||
MAX_KEYS_PER_DELETE_S3, REMOTE_STORAGE_PREFIX_SEPARATOR, RemotePath, RemoteStorage,
|
||||
TimeTravelError, TimeoutOrCancel,
|
||||
TimeTravelError, TimeoutOrCancel, Version, VersionId, VersionKind, VersionListing,
|
||||
};
|
||||
|
||||
/// AWS S3 storage.
|
||||
@@ -66,6 +65,7 @@ struct GetObjectRequest {
|
||||
key: String,
|
||||
etag: Option<String>,
|
||||
range: Option<String>,
|
||||
version_id: Option<String>,
|
||||
}
|
||||
impl S3Bucket {
|
||||
/// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
|
||||
@@ -251,6 +251,7 @@ impl S3Bucket {
|
||||
.get_object()
|
||||
.bucket(request.bucket)
|
||||
.key(request.key)
|
||||
.set_version_id(request.version_id)
|
||||
.set_range(request.range);
|
||||
|
||||
if let Some(etag) = request.etag {
|
||||
@@ -405,6 +406,124 @@ impl S3Bucket {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn list_versions_with_permit(
|
||||
&self,
|
||||
_permit: &tokio::sync::SemaphorePermit<'_>,
|
||||
prefix: Option<&RemotePath>,
|
||||
mode: ListingMode,
|
||||
max_keys: Option<NonZeroU32>,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<crate::VersionListing, DownloadError> {
|
||||
// get the passed prefix or if it is not set use prefix_in_bucket value
|
||||
let prefix = prefix
|
||||
.map(|p| self.relative_path_to_s3_object(p))
|
||||
.or_else(|| self.prefix_in_bucket.clone());
|
||||
|
||||
let warn_threshold = 3;
|
||||
let max_retries = 10;
|
||||
let is_permanent = |e: &_| matches!(e, DownloadError::Cancelled);
|
||||
|
||||
let mut key_marker = None;
|
||||
let mut version_id_marker = None;
|
||||
let mut versions_and_deletes = Vec::new();
|
||||
|
||||
loop {
|
||||
let response = backoff::retry(
|
||||
|| async {
|
||||
let mut request = self
|
||||
.client
|
||||
.list_object_versions()
|
||||
.bucket(self.bucket_name.clone())
|
||||
.set_prefix(prefix.clone())
|
||||
.set_key_marker(key_marker.clone())
|
||||
.set_version_id_marker(version_id_marker.clone());
|
||||
|
||||
if let ListingMode::WithDelimiter = mode {
|
||||
request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
|
||||
}
|
||||
|
||||
let op = request.send();
|
||||
|
||||
tokio::select! {
|
||||
res = op => res.map_err(|e| DownloadError::Other(e.into())),
|
||||
_ = cancel.cancelled() => Err(DownloadError::Cancelled),
|
||||
}
|
||||
},
|
||||
is_permanent,
|
||||
warn_threshold,
|
||||
max_retries,
|
||||
"listing object versions",
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
.ok_or_else(|| DownloadError::Cancelled)
|
||||
.and_then(|x| x)?;
|
||||
|
||||
tracing::trace!(
|
||||
" Got List response version_id_marker={:?}, key_marker={:?}",
|
||||
response.version_id_marker,
|
||||
response.key_marker
|
||||
);
|
||||
let versions = response
|
||||
.versions
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.map(|version| {
|
||||
let key = version.key.expect("response does not contain a key");
|
||||
let key = self.s3_object_to_relative_path(&key);
|
||||
let version_id = VersionId(version.version_id.expect("needing version id"));
|
||||
let last_modified =
|
||||
SystemTime::try_from(version.last_modified.expect("no last_modified"))?;
|
||||
Ok(Version {
|
||||
key,
|
||||
last_modified,
|
||||
kind: crate::VersionKind::Version(version_id),
|
||||
})
|
||||
});
|
||||
let deletes = response
|
||||
.delete_markers
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.map(|version| {
|
||||
let key = version.key.expect("response does not contain a key");
|
||||
let key = self.s3_object_to_relative_path(&key);
|
||||
let last_modified =
|
||||
SystemTime::try_from(version.last_modified.expect("no last_modified"))?;
|
||||
Ok(Version {
|
||||
key,
|
||||
last_modified,
|
||||
kind: crate::VersionKind::DeletionMarker,
|
||||
})
|
||||
});
|
||||
itertools::process_results(versions.chain(deletes), |n_vds| {
|
||||
versions_and_deletes.extend(n_vds)
|
||||
})
|
||||
.map_err(DownloadError::Other)?;
|
||||
fn none_if_empty(v: Option<String>) -> Option<String> {
|
||||
v.filter(|v| !v.is_empty())
|
||||
}
|
||||
version_id_marker = none_if_empty(response.next_version_id_marker);
|
||||
key_marker = none_if_empty(response.next_key_marker);
|
||||
if version_id_marker.is_none() {
|
||||
// The final response is not supposed to be truncated
|
||||
if response.is_truncated.unwrap_or_default() {
|
||||
return Err(DownloadError::Other(anyhow::anyhow!(
|
||||
"Received truncated ListObjectVersions response for prefix={prefix:?}"
|
||||
)));
|
||||
}
|
||||
break;
|
||||
}
|
||||
if let Some(max_keys) = max_keys {
|
||||
if versions_and_deletes.len() >= max_keys.get().try_into().unwrap() {
|
||||
return Err(DownloadError::Other(anyhow::anyhow!("too many versions")));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(VersionListing {
|
||||
versions: versions_and_deletes,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn bucket_name(&self) -> &str {
|
||||
&self.bucket_name
|
||||
}
|
||||
@@ -621,6 +740,19 @@ impl RemoteStorage for S3Bucket {
|
||||
}
|
||||
}
|
||||
|
||||
async fn list_versions(
|
||||
&self,
|
||||
prefix: Option<&RemotePath>,
|
||||
mode: ListingMode,
|
||||
max_keys: Option<NonZeroU32>,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<crate::VersionListing, DownloadError> {
|
||||
let kind = RequestKind::ListVersions;
|
||||
let permit = self.permit(kind, cancel).await?;
|
||||
self.list_versions_with_permit(&permit, prefix, mode, max_keys, cancel)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn head_object(
|
||||
&self,
|
||||
key: &RemotePath,
|
||||
@@ -801,6 +933,7 @@ impl RemoteStorage for S3Bucket {
|
||||
key: self.relative_path_to_s3_object(from),
|
||||
etag: opts.etag.as_ref().map(|e| e.to_string()),
|
||||
range: opts.byte_range_header(),
|
||||
version_id: opts.version_id.as_ref().map(|v| v.0.to_owned()),
|
||||
},
|
||||
cancel,
|
||||
)
|
||||
@@ -845,94 +978,25 @@ impl RemoteStorage for S3Bucket {
|
||||
let kind = RequestKind::TimeTravel;
|
||||
let permit = self.permit(kind, cancel).await?;
|
||||
|
||||
let timestamp = DateTime::from(timestamp);
|
||||
let done_if_after = DateTime::from(done_if_after);
|
||||
|
||||
tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
|
||||
|
||||
// get the passed prefix or if it is not set use prefix_in_bucket value
|
||||
let prefix = prefix
|
||||
.map(|p| self.relative_path_to_s3_object(p))
|
||||
.or_else(|| self.prefix_in_bucket.clone());
|
||||
// Limit the number of versions deletions, mostly so that we don't
|
||||
// keep requesting forever if the list is too long, as we'd put the
|
||||
// list in RAM.
|
||||
// Building a list of 100k entries that reaches the limit roughly takes
|
||||
// 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
|
||||
const COMPLEXITY_LIMIT: Option<NonZeroU32> = NonZeroU32::new(100_000);
|
||||
|
||||
let warn_threshold = 3;
|
||||
let max_retries = 10;
|
||||
let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
|
||||
|
||||
let mut key_marker = None;
|
||||
let mut version_id_marker = None;
|
||||
let mut versions_and_deletes = Vec::new();
|
||||
|
||||
loop {
|
||||
let response = backoff::retry(
|
||||
|| async {
|
||||
let op = self
|
||||
.client
|
||||
.list_object_versions()
|
||||
.bucket(self.bucket_name.clone())
|
||||
.set_prefix(prefix.clone())
|
||||
.set_key_marker(key_marker.clone())
|
||||
.set_version_id_marker(version_id_marker.clone())
|
||||
.send();
|
||||
|
||||
tokio::select! {
|
||||
res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
|
||||
_ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
|
||||
}
|
||||
},
|
||||
is_permanent,
|
||||
warn_threshold,
|
||||
max_retries,
|
||||
"listing object versions for time_travel_recover",
|
||||
cancel,
|
||||
)
|
||||
let mode = ListingMode::NoDelimiter;
|
||||
let version_listing = self
|
||||
.list_versions_with_permit(&permit, prefix, mode, COMPLEXITY_LIMIT, cancel)
|
||||
.await
|
||||
.ok_or_else(|| TimeTravelError::Cancelled)
|
||||
.and_then(|x| x)?;
|
||||
|
||||
tracing::trace!(
|
||||
" Got List response version_id_marker={:?}, key_marker={:?}",
|
||||
response.version_id_marker,
|
||||
response.key_marker
|
||||
);
|
||||
let versions = response
|
||||
.versions
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.map(VerOrDelete::from_version);
|
||||
let deletes = response
|
||||
.delete_markers
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.map(VerOrDelete::from_delete_marker);
|
||||
itertools::process_results(versions.chain(deletes), |n_vds| {
|
||||
versions_and_deletes.extend(n_vds)
|
||||
})
|
||||
.map_err(TimeTravelError::Other)?;
|
||||
fn none_if_empty(v: Option<String>) -> Option<String> {
|
||||
v.filter(|v| !v.is_empty())
|
||||
}
|
||||
version_id_marker = none_if_empty(response.next_version_id_marker);
|
||||
key_marker = none_if_empty(response.next_key_marker);
|
||||
if version_id_marker.is_none() {
|
||||
// The final response is not supposed to be truncated
|
||||
if response.is_truncated.unwrap_or_default() {
|
||||
return Err(TimeTravelError::Other(anyhow::anyhow!(
|
||||
"Received truncated ListObjectVersions response for prefix={prefix:?}"
|
||||
)));
|
||||
}
|
||||
break;
|
||||
}
|
||||
// Limit the number of versions deletions, mostly so that we don't
|
||||
// keep requesting forever if the list is too long, as we'd put the
|
||||
// list in RAM.
|
||||
// Building a list of 100k entries that reaches the limit roughly takes
|
||||
// 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
|
||||
const COMPLEXITY_LIMIT: usize = 100_000;
|
||||
if versions_and_deletes.len() >= COMPLEXITY_LIMIT {
|
||||
return Err(TimeTravelError::TooManyVersions);
|
||||
}
|
||||
}
|
||||
.map_err(|err| match err {
|
||||
DownloadError::Other(e) => TimeTravelError::Other(e),
|
||||
DownloadError::Cancelled => TimeTravelError::Cancelled,
|
||||
other => TimeTravelError::Other(other.into()),
|
||||
})?;
|
||||
let versions_and_deletes = version_listing.versions;
|
||||
|
||||
tracing::info!(
|
||||
"Built list for time travel with {} versions and deletions",
|
||||
@@ -948,24 +1012,26 @@ impl RemoteStorage for S3Bucket {
|
||||
let mut vds_for_key = HashMap::<_, Vec<_>>::new();
|
||||
|
||||
for vd in &versions_and_deletes {
|
||||
let VerOrDelete {
|
||||
version_id, key, ..
|
||||
} = &vd;
|
||||
if version_id == "null" {
|
||||
let Version { key, .. } = &vd;
|
||||
let version_id = vd.version_id().map(|v| v.0.as_str());
|
||||
if version_id == Some("null") {
|
||||
return Err(TimeTravelError::Other(anyhow!(
|
||||
"Received ListVersions response for key={key} with version_id='null', \
|
||||
indicating either disabled versioning, or legacy objects with null version id values"
|
||||
)));
|
||||
}
|
||||
tracing::trace!(
|
||||
"Parsing version key={key} version_id={version_id} kind={:?}",
|
||||
vd.kind
|
||||
);
|
||||
tracing::trace!("Parsing version key={key} kind={:?}", vd.kind);
|
||||
|
||||
vds_for_key.entry(key).or_default().push(vd);
|
||||
}
|
||||
|
||||
let warn_threshold = 3;
|
||||
let max_retries = 10;
|
||||
let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
|
||||
|
||||
for (key, versions) in vds_for_key {
|
||||
let last_vd = versions.last().unwrap();
|
||||
let key = self.relative_path_to_s3_object(key);
|
||||
if last_vd.last_modified > done_if_after {
|
||||
tracing::trace!("Key {key} has version later than done_if_after, skipping");
|
||||
continue;
|
||||
@@ -990,11 +1056,11 @@ impl RemoteStorage for S3Bucket {
|
||||
do_delete = true;
|
||||
} else {
|
||||
match &versions[version_to_restore_to - 1] {
|
||||
VerOrDelete {
|
||||
kind: VerOrDeleteKind::Version,
|
||||
version_id,
|
||||
Version {
|
||||
kind: VersionKind::Version(version_id),
|
||||
..
|
||||
} => {
|
||||
let version_id = &version_id.0;
|
||||
tracing::trace!("Copying old version {version_id} for {key}...");
|
||||
// Restore the state to the last version by copying
|
||||
let source_id =
|
||||
@@ -1006,7 +1072,7 @@ impl RemoteStorage for S3Bucket {
|
||||
.client
|
||||
.copy_object()
|
||||
.bucket(self.bucket_name.clone())
|
||||
.key(key)
|
||||
.key(&key)
|
||||
.set_storage_class(self.upload_storage_class.clone())
|
||||
.copy_source(&source_id)
|
||||
.send();
|
||||
@@ -1027,8 +1093,8 @@ impl RemoteStorage for S3Bucket {
|
||||
.and_then(|x| x)?;
|
||||
tracing::info!(%version_id, %key, "Copied old version in S3");
|
||||
}
|
||||
VerOrDelete {
|
||||
kind: VerOrDeleteKind::DeleteMarker,
|
||||
Version {
|
||||
kind: VersionKind::DeletionMarker,
|
||||
..
|
||||
} => {
|
||||
do_delete = true;
|
||||
@@ -1036,7 +1102,7 @@ impl RemoteStorage for S3Bucket {
|
||||
}
|
||||
};
|
||||
if do_delete {
|
||||
if matches!(last_vd.kind, VerOrDeleteKind::DeleteMarker) {
|
||||
if matches!(last_vd.kind, VersionKind::DeletionMarker) {
|
||||
// Key has since been deleted (but there was some history), no need to do anything
|
||||
tracing::trace!("Key {key} already deleted, skipping.");
|
||||
} else {
|
||||
@@ -1064,62 +1130,6 @@ impl RemoteStorage for S3Bucket {
|
||||
}
|
||||
}
|
||||
|
||||
// Save RAM and only store the needed data instead of the entire ObjectVersion/DeleteMarkerEntry
|
||||
struct VerOrDelete {
|
||||
kind: VerOrDeleteKind,
|
||||
last_modified: DateTime,
|
||||
version_id: String,
|
||||
key: String,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum VerOrDeleteKind {
|
||||
Version,
|
||||
DeleteMarker,
|
||||
}
|
||||
|
||||
impl VerOrDelete {
|
||||
fn with_kind(
|
||||
kind: VerOrDeleteKind,
|
||||
last_modified: Option<DateTime>,
|
||||
version_id: Option<String>,
|
||||
key: Option<String>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let lvk = (last_modified, version_id, key);
|
||||
let (Some(last_modified), Some(version_id), Some(key)) = lvk else {
|
||||
anyhow::bail!(
|
||||
"One (or more) of last_modified, key, and id is None. \
|
||||
Is versioning enabled in the bucket? last_modified={:?}, version_id={:?}, key={:?}",
|
||||
lvk.0,
|
||||
lvk.1,
|
||||
lvk.2,
|
||||
);
|
||||
};
|
||||
Ok(Self {
|
||||
kind,
|
||||
last_modified,
|
||||
version_id,
|
||||
key,
|
||||
})
|
||||
}
|
||||
fn from_version(v: ObjectVersion) -> anyhow::Result<Self> {
|
||||
Self::with_kind(
|
||||
VerOrDeleteKind::Version,
|
||||
v.last_modified,
|
||||
v.version_id,
|
||||
v.key,
|
||||
)
|
||||
}
|
||||
fn from_delete_marker(v: DeleteMarkerEntry) -> anyhow::Result<Self> {
|
||||
Self::with_kind(
|
||||
VerOrDeleteKind::DeleteMarker,
|
||||
v.last_modified,
|
||||
v.version_id,
|
||||
v.key,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
@@ -139,6 +139,20 @@ impl RemoteStorage for UnreliableWrapper {
|
||||
self.inner.list(prefix, mode, max_keys, cancel).await
|
||||
}
|
||||
|
||||
async fn list_versions(
|
||||
&self,
|
||||
prefix: Option<&RemotePath>,
|
||||
mode: ListingMode,
|
||||
max_keys: Option<NonZeroU32>,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<crate::VersionListing, DownloadError> {
|
||||
self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
|
||||
.map_err(DownloadError::Other)?;
|
||||
self.inner
|
||||
.list_versions(prefix, mode, max_keys, cancel)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn head_object(
|
||||
&self,
|
||||
key: &RemotePath,
|
||||
|
||||
@@ -238,7 +238,8 @@ impl RemoteStorageWrapper {
|
||||
kind: DownloadKind::Large,
|
||||
etag: None,
|
||||
byte_start: Bound::Included(start_inclusive),
|
||||
byte_end: Bound::Excluded(end_exclusive)
|
||||
byte_end: Bound::Excluded(end_exclusive),
|
||||
version_id: None,
|
||||
},
|
||||
&self.cancel)
|
||||
.await?;
|
||||
|
||||
Reference in New Issue
Block a user