diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index 5363e935e3..f64cd9e206 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -10,7 +10,7 @@ use std::sync::Arc; use std::time::{Duration, SystemTime}; use std::{env, io}; -use anyhow::{Context, Result}; +use anyhow::{Context, Result, anyhow}; use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range}; use azure_core::{Continuable, HttpClient, RetryOptions, TransportOptions}; use azure_storage::StorageCredentials; @@ -37,6 +37,7 @@ use crate::metrics::{AttemptOutcome, RequestKind, start_measuring_requests}; use crate::{ ConcurrencyLimiter, Download, DownloadError, DownloadKind, DownloadOpts, Listing, ListingMode, ListingObject, RemotePath, RemoteStorage, StorageMetadata, TimeTravelError, TimeoutOrCancel, + Version, VersionKind, }; pub struct AzureBlobStorage { @@ -405,6 +406,39 @@ impl AzureBlobStorage { pub fn container_name(&self) -> &str { &self.container_name } + + async fn list_versions_with_permit( + &self, + _permit: &tokio::sync::SemaphorePermit<'_>, + prefix: Option<&RemotePath>, + mode: ListingMode, + max_keys: Option, + cancel: &CancellationToken, + ) -> Result { + let customize_builder = |mut builder: ListBlobsBuilder| { + builder = builder.include_versions(true); + // We do not return this info back to `VersionListing` yet. + builder = builder.include_deleted(true); + builder + }; + let kind = RequestKind::ListVersions; + + let mut stream = std::pin::pin!(self.list_streaming_for_fn( + prefix, + mode, + max_keys, + cancel, + kind, + customize_builder + )); + let mut combined: crate::VersionListing = + stream.next().await.expect("At least one item required")?; + while let Some(list) = stream.next().await { + let list = list?; + combined.versions.extend(list.versions.into_iter()); + } + Ok(combined) + } } trait ListingCollector { @@ -488,27 +522,10 @@ impl RemoteStorage for AzureBlobStorage { max_keys: Option, cancel: &CancellationToken, ) -> std::result::Result { - let customize_builder = |mut builder: ListBlobsBuilder| { - builder = builder.include_versions(true); - builder - }; let kind = RequestKind::ListVersions; - - let mut stream = std::pin::pin!(self.list_streaming_for_fn( - prefix, - mode, - max_keys, - cancel, - kind, - customize_builder - )); - let mut combined: crate::VersionListing = - stream.next().await.expect("At least one item required")?; - while let Some(list) = stream.next().await { - let list = list?; - combined.versions.extend(list.versions.into_iter()); - } - Ok(combined) + let permit = self.permit(kind, cancel).await?; + self.list_versions_with_permit(&permit, prefix, mode, max_keys, cancel) + .await } async fn head_object( @@ -803,14 +820,158 @@ impl RemoteStorage for AzureBlobStorage { async fn time_travel_recover( &self, - _prefix: Option<&RemotePath>, - _timestamp: SystemTime, - _done_if_after: SystemTime, - _cancel: &CancellationToken, + prefix: Option<&RemotePath>, + timestamp: SystemTime, + done_if_after: SystemTime, + cancel: &CancellationToken, ) -> Result<(), TimeTravelError> { - // TODO use Azure point in time recovery feature for this - // https://learn.microsoft.com/en-us/azure/storage/blobs/point-in-time-restore-overview - Err(TimeTravelError::Unimplemented) + let msg = "PLEASE NOTE: Azure Blob storage time-travel recovery may not work as expected " + .to_string() + + "for some specific files. If a file gets deleted but then overwritten and we want to recover " + + "to the time during the file was not present, this functionality will recover the file. Only " + + "use the functionality for services that can tolerate this. For example, recovering a state of the " + + "pageserver tenants."; + tracing::error!("{}", msg); + + let kind = RequestKind::TimeTravel; + let permit = self.permit(kind, cancel).await?; + + let mode = ListingMode::NoDelimiter; + let version_listing = self + .list_versions_with_permit(&permit, prefix, mode, None, cancel) + .await + .map_err(|err| match err { + DownloadError::Other(e) => TimeTravelError::Other(e), + DownloadError::Cancelled => TimeTravelError::Cancelled, + other => TimeTravelError::Other(other.into()), + })?; + let versions_and_deletes = version_listing.versions; + + tracing::info!( + "Built list for time travel with {} versions and deletions", + versions_and_deletes.len() + ); + + // Work on the list of references instead of the objects directly, + // otherwise we get lifetime errors in the sort_by_key call below. + let mut versions_and_deletes = versions_and_deletes.iter().collect::>(); + + versions_and_deletes.sort_by_key(|vd| (&vd.key, &vd.last_modified)); + + let mut vds_for_key = HashMap::<_, Vec<_>>::new(); + + for vd in &versions_and_deletes { + let Version { key, .. } = &vd; + let version_id = vd.version_id().map(|v| v.0.as_str()); + if version_id == Some("null") { + return Err(TimeTravelError::Other(anyhow!( + "Received ListVersions response for key={key} with version_id='null', \ + indicating either disabled versioning, or legacy objects with null version id values" + ))); + } + tracing::trace!("Parsing version key={key} kind={:?}", vd.kind); + + vds_for_key.entry(key).or_default().push(vd); + } + + let warn_threshold = 3; + let max_retries = 10; + let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled); + + for (key, versions) in vds_for_key { + let last_vd = versions.last().unwrap(); + let key = self.relative_path_to_name(key); + if last_vd.last_modified > done_if_after { + tracing::debug!("Key {key} has version later than done_if_after, skipping"); + continue; + } + // the version we want to restore to. + let version_to_restore_to = + match versions.binary_search_by_key(×tamp, |tpl| tpl.last_modified) { + Ok(v) => v, + Err(e) => e, + }; + if version_to_restore_to == versions.len() { + tracing::debug!("Key {key} has no changes since timestamp, skipping"); + continue; + } + let mut do_delete = false; + if version_to_restore_to == 0 { + // All versions more recent, so the key didn't exist at the specified time point. + tracing::debug!( + "All {} versions more recent for {key}, deleting", + versions.len() + ); + do_delete = true; + } else { + match &versions[version_to_restore_to - 1] { + Version { + kind: VersionKind::Version(version_id), + .. + } => { + let source_url = format!( + "{}/{}?versionid={}", + self.client + .url() + .map_err(|e| TimeTravelError::Other(anyhow!("{e}")))?, + key, + version_id.0 + ); + tracing::debug!( + "Promoting old version {} for {key} at {}...", + version_id.0, + source_url + ); + backoff::retry( + || async { + let blob_client = self.client.blob_client(key.clone()); + let op = blob_client.copy(Url::from_str(&source_url).unwrap()); + tokio::select! { + res = op => res.map_err(|e| TimeTravelError::Other(e.into())), + _ = cancel.cancelled() => Err(TimeTravelError::Cancelled), + } + }, + is_permanent, + warn_threshold, + max_retries, + "copying object version for time_travel_recover", + cancel, + ) + .await + .ok_or_else(|| TimeTravelError::Cancelled) + .and_then(|x| x)?; + tracing::info!(?version_id, %key, "Copied old version in Azure blob storage"); + } + Version { + kind: VersionKind::DeletionMarker, + .. + } => { + do_delete = true; + } + } + }; + if do_delete { + if matches!(last_vd.kind, VersionKind::DeletionMarker) { + // Key has since been deleted (but there was some history), no need to do anything + tracing::debug!("Key {key} already deleted, skipping."); + } else { + tracing::debug!("Deleting {key}..."); + + self.delete(&RemotePath::from_string(&key).unwrap(), cancel) + .await + .map_err(|e| { + // delete_oid0 will use TimeoutOrCancel + if TimeoutOrCancel::caused_by_cancel(&e) { + TimeTravelError::Cancelled + } else { + TimeTravelError::Other(e) + } + })?; + } + } + } + + Ok(()) } }