From d52b81340f77f37a2cad3237cfb553d5257d634e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 25 Jan 2024 18:23:18 +0100 Subject: [PATCH] S3 based recovery (#6155) Adds a new `time_travel_recover` function to the `RemoteStorage` trait that allows time travel like functionality for S3 buckets, regardless of their content (it is not even pageserver related). It takes a different approach from [this post](https://aws.amazon.com/blogs/storage/point-in-time-restore-for-amazon-s3-buckets/) that is more complicated. It takes as input a prefix a target timestamp, and a limit timestamp: * executes [`ListObjectVersions`](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html) * obtains the latest version that comes before the target timestamp * copies that latest version to the same prefix * if there is versions newer than the limit timestamp, it doesn't do anything for the file The limit timestamp is meant to be some timestamp before the start of the recovery operation and after any changes that one wants to revert. For example, it might be the time point after a tenant was detached from all involved pageservers. The limiting mechanism ensures that the operation is idempotent and can be retried without causing additional writes/copies. The approach fulfills all the requirements laid out in 8233, and is a recoverable operation. Nothing is deleted permanently, only new entries added to the version log. I also enable [nextest retries](https://nexte.st/book/retries.html) to help with some general S3 flakiness (on top of low level retries). Part of https://github.com/neondatabase/cloud/issues/8233 --- .github/workflows/build_and_test.yml | 1 + libs/remote_storage/src/azure_blob.rs | 16 + libs/remote_storage/src/lib.rs | 38 +++ libs/remote_storage/src/local_fs.rs | 15 +- libs/remote_storage/src/s3_bucket.rs | 297 +++++++++++++++---- libs/remote_storage/src/s3_bucket/metrics.rs | 8 +- libs/remote_storage/src/simulate_failures.rs | 16 + libs/remote_storage/tests/test_real_s3.rs | 123 +++++++- 8 files changed, 452 insertions(+), 62 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 2b88f09b3d..fb88d4da96 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -21,6 +21,7 @@ env: COPT: '-Werror' AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }} AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }} + NEXTEST_RETRIES: 3 jobs: check-permissions: diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index e3edfd08a5..abab32470b 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -8,6 +8,7 @@ use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use std::time::SystemTime; use super::REMOTE_STORAGE_PREFIX_SEPARATOR; use anyhow::Result; @@ -23,6 +24,7 @@ use futures::stream::Stream; use futures_util::StreamExt; use http_types::{StatusCode, Url}; use tokio::time::Instant; +use tokio_util::sync::CancellationToken; use tracing::debug; use crate::s3_bucket::RequestKind; @@ -370,6 +372,20 @@ impl RemoteStorage for AzureBlobStorage { copy_status = status; } } + + async fn time_travel_recover( + &self, + _prefix: Option<&RemotePath>, + _timestamp: SystemTime, + _done_if_after: SystemTime, + _cancel: CancellationToken, + ) -> anyhow::Result<()> { + // TODO use Azure point in time recovery feature for this + // https://learn.microsoft.com/en-us/azure/storage/blobs/point-in-time-restore-overview + Err(anyhow::anyhow!( + "time travel recovery for azure blob storage is not implemented" + )) + } } pin_project_lite::pin_project! { diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index f247fcdc38..bf9c51ad1a 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -25,6 +25,7 @@ use bytes::Bytes; use futures::stream::Stream; use serde::{Deserialize, Serialize}; use tokio::sync::Semaphore; +use tokio_util::sync::CancellationToken; use toml_edit::Item; use tracing::info; @@ -210,6 +211,15 @@ pub trait RemoteStorage: Send + Sync + 'static { /// Copy a remote object inside a bucket from one path to another. async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()>; + + /// Resets the content of everything with the given prefix to the given state + async fn time_travel_recover( + &self, + prefix: Option<&RemotePath>, + timestamp: SystemTime, + done_if_after: SystemTime, + cancel: CancellationToken, + ) -> anyhow::Result<()>; } pub type DownloadStream = Pin> + Unpin + Send + Sync>>; @@ -387,6 +397,33 @@ impl GenericRemoteStorage> { Self::Unreliable(s) => s.copy(from, to).await, } } + + pub async fn time_travel_recover( + &self, + prefix: Option<&RemotePath>, + timestamp: SystemTime, + done_if_after: SystemTime, + cancel: CancellationToken, + ) -> anyhow::Result<()> { + match self { + Self::LocalFs(s) => { + s.time_travel_recover(prefix, timestamp, done_if_after, cancel) + .await + } + Self::AwsS3(s) => { + s.time_travel_recover(prefix, timestamp, done_if_after, cancel) + .await + } + Self::AzureBlob(s) => { + s.time_travel_recover(prefix, timestamp, done_if_after, cancel) + .await + } + Self::Unreliable(s) => { + s.time_travel_recover(prefix, timestamp, done_if_after, cancel) + .await + } + } + } } impl GenericRemoteStorage { @@ -674,6 +711,7 @@ impl ConcurrencyLimiter { RequestKind::List => &self.read, RequestKind::Delete => &self.write, RequestKind::Copy => &self.write, + RequestKind::TimeTravel => &self.write, } } diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 5f5bc1a9fa..34a6658a69 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -4,7 +4,7 @@ //! This storage used in tests, but can also be used in cases when a certain persistent //! volume is mounted to the local FS. -use std::{borrow::Cow, future::Future, io::ErrorKind, pin::Pin}; +use std::{borrow::Cow, future::Future, io::ErrorKind, pin::Pin, time::SystemTime}; use anyhow::{bail, ensure, Context}; use bytes::Bytes; @@ -14,7 +14,7 @@ use tokio::{ fs, io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}, }; -use tokio_util::io::ReaderStream; +use tokio_util::{io::ReaderStream, sync::CancellationToken}; use tracing::*; use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty}; @@ -422,6 +422,17 @@ impl RemoteStorage for LocalFs { })?; Ok(()) } + + #[allow(clippy::diverging_sub_expression)] + async fn time_travel_recover( + &self, + _prefix: Option<&RemotePath>, + _timestamp: SystemTime, + _done_if_after: SystemTime, + _cancel: CancellationToken, + ) -> anyhow::Result<()> { + unimplemented!() + } } fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf { diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index e72cf28228..4909b8522b 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -6,12 +6,14 @@ use std::{ borrow::Cow, + collections::HashMap, pin::Pin, sync::Arc, task::{Context, Poll}, + time::SystemTime, }; -use anyhow::Context as _; +use anyhow::{anyhow, Context as _}; use aws_config::{ environment::credentials::EnvironmentVariableCredentialsProvider, imds::credentials::ImdsCredentialsProvider, @@ -27,17 +29,19 @@ use aws_sdk_s3::{ config::{AsyncSleep, Builder, IdentityCache, Region, SharedAsyncSleep}, error::SdkError, operation::get_object::GetObjectError, - types::{Delete, ObjectIdentifier}, + types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion}, Client, }; use aws_smithy_async::rt::sleep::TokioSleep; -use aws_smithy_types::body::SdkBody; use aws_smithy_types::byte_stream::ByteStream; +use aws_smithy_types::{body::SdkBody, DateTime}; use bytes::Bytes; use futures::stream::Stream; use hyper::Body; use scopeguard::ScopeGuard; +use tokio_util::sync::CancellationToken; +use utils::backoff; use super::StorageMetadata; use crate::{ @@ -270,6 +274,59 @@ impl S3Bucket { } } } + + async fn delete_oids( + &self, + kind: RequestKind, + delete_objects: &[ObjectIdentifier], + ) -> anyhow::Result<()> { + for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) { + let started_at = start_measuring_requests(kind); + + let resp = self + .client + .delete_objects() + .bucket(self.bucket_name.clone()) + .delete( + Delete::builder() + .set_objects(Some(chunk.to_vec())) + .build()?, + ) + .send() + .await; + + let started_at = ScopeGuard::into_inner(started_at); + metrics::BUCKET_METRICS + .req_seconds + .observe_elapsed(kind, &resp, started_at); + + let resp = resp?; + metrics::BUCKET_METRICS + .deleted_objects_total + .inc_by(chunk.len() as u64); + if let Some(errors) = resp.errors { + // Log a bounded number of the errors within the response: + // these requests can carry 1000 keys so logging each one + // would be too verbose, especially as errors may lead us + // to retry repeatedly. + const LOG_UP_TO_N_ERRORS: usize = 10; + for e in errors.iter().take(LOG_UP_TO_N_ERRORS) { + tracing::warn!( + "DeleteObjects key {} failed: {}: {}", + e.key.as_ref().map(Cow::from).unwrap_or("".into()), + e.code.as_ref().map(Cow::from).unwrap_or("".into()), + e.message.as_ref().map(Cow::from).unwrap_or("".into()) + ); + } + + return Err(anyhow::format_err!( + "Failed to delete {} objects", + errors.len() + )); + } + } + Ok(()) + } } pin_project_lite::pin_project! { @@ -568,64 +625,168 @@ impl RemoteStorage for S3Bucket { delete_objects.push(obj_id); } - for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) { - let started_at = start_measuring_requests(kind); - - let resp = self - .client - .delete_objects() - .bucket(self.bucket_name.clone()) - .delete( - Delete::builder() - .set_objects(Some(chunk.to_vec())) - .build()?, - ) - .send() - .await; - - let started_at = ScopeGuard::into_inner(started_at); - metrics::BUCKET_METRICS - .req_seconds - .observe_elapsed(kind, &resp, started_at); - - match resp { - Ok(resp) => { - metrics::BUCKET_METRICS - .deleted_objects_total - .inc_by(chunk.len() as u64); - if let Some(errors) = resp.errors { - // Log a bounded number of the errors within the response: - // these requests can carry 1000 keys so logging each one - // would be too verbose, especially as errors may lead us - // to retry repeatedly. - const LOG_UP_TO_N_ERRORS: usize = 10; - for e in errors.iter().take(LOG_UP_TO_N_ERRORS) { - tracing::warn!( - "DeleteObjects key {} failed: {}: {}", - e.key.as_ref().map(Cow::from).unwrap_or("".into()), - e.code.as_ref().map(Cow::from).unwrap_or("".into()), - e.message.as_ref().map(Cow::from).unwrap_or("".into()) - ); - } - - return Err(anyhow::format_err!( - "Failed to delete {} objects", - errors.len() - )); - } - } - Err(e) => { - return Err(e.into()); - } - } - } - Ok(()) + self.delete_oids(kind, &delete_objects).await } async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> { let paths = std::array::from_ref(path); self.delete_objects(paths).await } + + async fn time_travel_recover( + &self, + prefix: Option<&RemotePath>, + timestamp: SystemTime, + done_if_after: SystemTime, + cancel: CancellationToken, + ) -> anyhow::Result<()> { + let kind = RequestKind::TimeTravel; + let _guard = self.permit(kind).await; + + let timestamp = DateTime::from(timestamp); + let done_if_after = DateTime::from(done_if_after); + + tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}"); + + // get the passed prefix or if it is not set use prefix_in_bucket value + let prefix = prefix + .map(|p| self.relative_path_to_s3_object(p)) + .or_else(|| self.prefix_in_bucket.clone()); + + let warn_threshold = 3; + let max_retries = 10; + let is_permanent = |_e: &_| false; + + let list = backoff::retry( + || async { + Ok(self + .client + .list_object_versions() + .bucket(self.bucket_name.clone()) + .set_prefix(prefix.clone()) + .send() + .await?) + }, + is_permanent, + warn_threshold, + max_retries, + "listing object versions for time_travel_recover", + backoff::Cancel::new(cancel.clone(), || anyhow!("Cancelled")), + ) + .await?; + + if list.is_truncated().unwrap_or_default() { + anyhow::bail!("Received truncated ListObjectVersions response for prefix={prefix:?}"); + } + + let mut versions_deletes = list + .versions() + .iter() + .map(VerOrDelete::Version) + .chain(list.delete_markers().iter().map(VerOrDelete::DeleteMarker)) + .collect::>(); + + versions_deletes.sort_by_key(|vd| (vd.key(), vd.last_modified())); + + let mut vds_for_key = HashMap::<_, Vec<_>>::new(); + + for vd in versions_deletes { + let last_modified = vd.last_modified(); + let version_id = vd.version_id(); + let key = vd.key(); + let (Some(last_modified), Some(version_id), Some(key)) = + (last_modified, version_id, key) + else { + anyhow::bail!( + "One (or more) of last_modified, key, and id is None. \ + Is versioning enabled in the bucket? last_modified={:?} key={:?} version_id={:?}", + last_modified, key, version_id, + ); + }; + if version_id == "null" { + anyhow::bail!("Received ListVersions response for key={key} with version_id='null', \ + indicating either disabled versioning, or legacy objects with null version id values"); + } + tracing::trace!( + "Parsing version key={key} version_id={version_id} is_delete={}", + matches!(vd, VerOrDelete::DeleteMarker(_)) + ); + + vds_for_key + .entry(key) + .or_default() + .push((vd, last_modified, version_id)); + } + for (key, versions) in vds_for_key { + let (last_vd, last_last_modified, _version_id) = versions.last().unwrap(); + if last_last_modified > &&done_if_after { + tracing::trace!("Key {key} has version later than done_if_after, skipping"); + continue; + } + // the version we want to restore to. + let version_to_restore_to = + match versions.binary_search_by_key(×tamp, |tpl| *tpl.1) { + Ok(v) => v, + Err(e) => e, + }; + if version_to_restore_to == versions.len() { + tracing::trace!("Key {key} has no changes since timestamp, skipping"); + continue; + } + let mut do_delete = false; + if version_to_restore_to == 0 { + // All versions more recent, so the key didn't exist at the specified time point. + tracing::trace!( + "All {} versions more recent for {key}, deleting", + versions.len() + ); + do_delete = true; + } else { + match &versions[version_to_restore_to - 1] { + (VerOrDelete::Version(_), _last_modified, version_id) => { + tracing::trace!("Copying old version {version_id} for {key}..."); + // Restore the state to the last version by copying + let source_id = + format!("{}/{key}?versionId={version_id}", self.bucket_name); + + backoff::retry( + || async { + Ok(self + .client + .copy_object() + .bucket(self.bucket_name.clone()) + .key(key) + .copy_source(&source_id) + .send() + .await?) + }, + is_permanent, + warn_threshold, + max_retries, + "listing object versions for time_travel_recover", + backoff::Cancel::new(cancel.clone(), || anyhow!("Cancelled")), + ) + .await?; + } + (VerOrDelete::DeleteMarker(_), _last_modified, _version_id) => { + do_delete = true; + } + } + }; + if do_delete { + if matches!(last_vd, VerOrDelete::DeleteMarker(_)) { + // Key has since been deleted (but there was some history), no need to do anything + tracing::trace!("Key {key} already deleted, skipping."); + } else { + tracing::trace!("Deleting {key}..."); + + let oid = ObjectIdentifier::builder().key(key.to_owned()).build()?; + self.delete_oids(kind, &[oid]).await?; + } + } + } + Ok(()) + } } /// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`]. @@ -650,6 +811,32 @@ fn start_measuring_requests( }) } +enum VerOrDelete<'a> { + Version(&'a ObjectVersion), + DeleteMarker(&'a DeleteMarkerEntry), +} + +impl<'a> VerOrDelete<'a> { + fn last_modified(&self) -> Option<&'a DateTime> { + match self { + VerOrDelete::Version(v) => v.last_modified(), + VerOrDelete::DeleteMarker(v) => v.last_modified(), + } + } + fn version_id(&self) -> Option<&'a str> { + match self { + VerOrDelete::Version(v) => v.version_id(), + VerOrDelete::DeleteMarker(v) => v.version_id(), + } + } + fn key(&self) -> Option<&'a str> { + match self { + VerOrDelete::Version(v) => v.key(), + VerOrDelete::DeleteMarker(v) => v.key(), + } + } +} + #[cfg(test)] mod tests { use camino::Utf8Path; diff --git a/libs/remote_storage/src/s3_bucket/metrics.rs b/libs/remote_storage/src/s3_bucket/metrics.rs index 21dde14906..beca755920 100644 --- a/libs/remote_storage/src/s3_bucket/metrics.rs +++ b/libs/remote_storage/src/s3_bucket/metrics.rs @@ -12,6 +12,7 @@ pub(crate) enum RequestKind { Delete = 2, List = 3, Copy = 4, + TimeTravel = 5, } use RequestKind::*; @@ -24,6 +25,7 @@ impl RequestKind { Delete => "delete_object", List => "list_objects", Copy => "copy_object", + TimeTravel => "time_travel_recover", } } const fn as_index(&self) -> usize { @@ -31,7 +33,7 @@ impl RequestKind { } } -pub(super) struct RequestTyped([C; 5]); +pub(super) struct RequestTyped([C; 6]); impl RequestTyped { pub(super) fn get(&self, kind: RequestKind) -> &C { @@ -40,8 +42,8 @@ impl RequestTyped { fn build_with(mut f: impl FnMut(RequestKind) -> C) -> Self { use RequestKind::*; - let mut it = [Get, Put, Delete, List, Copy].into_iter(); - let arr = std::array::from_fn::(|index| { + let mut it = [Get, Put, Delete, List, Copy, TimeTravel].into_iter(); + let arr = std::array::from_fn::(|index| { let next = it.next().unwrap(); assert_eq!(index, next.as_index()); f(next) diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index 90d90163a7..fc4c4b315b 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -5,7 +5,9 @@ use bytes::Bytes; use futures::stream::Stream; use std::collections::HashMap; use std::sync::Mutex; +use std::time::SystemTime; use std::{collections::hash_map::Entry, sync::Arc}; +use tokio_util::sync::CancellationToken; use crate::{ Download, DownloadError, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorage, @@ -30,6 +32,7 @@ enum RemoteOp { Download(RemotePath), Delete(RemotePath), DeleteObjects(Vec), + TimeTravelRecover(Option), } impl UnreliableWrapper { @@ -181,4 +184,17 @@ impl RemoteStorage for UnreliableWrapper { self.attempt(RemoteOp::Upload(to.clone()))?; self.inner.copy_object(from, to).await } + + async fn time_travel_recover( + &self, + prefix: Option<&RemotePath>, + timestamp: SystemTime, + done_if_after: SystemTime, + cancel: CancellationToken, + ) -> anyhow::Result<()> { + self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned())))?; + self.inner + .time_travel_recover(prefix, timestamp, done_if_after, cancel) + .await + } } diff --git a/libs/remote_storage/tests/test_real_s3.rs b/libs/remote_storage/tests/test_real_s3.rs index 4a999d115e..9e1b989e4d 100644 --- a/libs/remote_storage/tests/test_real_s3.rs +++ b/libs/remote_storage/tests/test_real_s3.rs @@ -1,15 +1,19 @@ -use std::collections::HashSet; use std::env; use std::num::NonZeroUsize; use std::ops::ControlFlow; use std::sync::Arc; -use std::time::UNIX_EPOCH; +use std::time::{Duration, UNIX_EPOCH}; +use std::{collections::HashSet, time::SystemTime}; +use crate::common::{download_to_vec, upload_stream}; use anyhow::Context; +use camino::Utf8Path; use remote_storage::{ GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config, }; +use test_context::test_context; use test_context::AsyncTestContext; +use tokio_util::sync::CancellationToken; use tracing::info; mod common; @@ -23,6 +27,121 @@ const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_ const BASE_PREFIX: &str = "test"; +#[test_context(MaybeEnabledStorage)] +#[tokio::test] +async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> { + let ctx = match ctx { + MaybeEnabledStorage::Enabled(ctx) => ctx, + MaybeEnabledStorage::Disabled => return Ok(()), + }; + // Our test depends on discrepancies in the clock between S3 and the environment the tests + // run in. Therefore, wait a little bit before and after. The alternative would be + // to take the time from S3 response headers. + const WAIT_TIME: Duration = Duration::from_millis(3_000); + + async fn time_point() -> SystemTime { + tokio::time::sleep(WAIT_TIME).await; + let ret = SystemTime::now(); + tokio::time::sleep(WAIT_TIME).await; + ret + } + + async fn list_files(client: &Arc) -> anyhow::Result> { + Ok(client + .list_files(None) + .await + .context("list root files failure")? + .into_iter() + .collect::>()) + } + + let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str())) + .with_context(|| "RemotePath conversion")?; + + let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str())) + .with_context(|| "RemotePath conversion")?; + + let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str())) + .with_context(|| "RemotePath conversion")?; + + let (data, len) = upload_stream("remote blob data1".as_bytes().into()); + ctx.client.upload(data, len, &path1, None).await?; + + let t0_files = list_files(&ctx.client).await?; + let t0 = time_point().await; + println!("at t0: {t0_files:?}"); + + let old_data = "remote blob data2"; + let (data, len) = upload_stream(old_data.as_bytes().into()); + ctx.client.upload(data, len, &path2, None).await?; + + let t1_files = list_files(&ctx.client).await?; + let t1 = time_point().await; + println!("at t1: {t1_files:?}"); + + // A little check to ensure that our clock is not too far off from the S3 clock + { + let dl = ctx.client.download(&path2).await?; + let last_modified = dl.last_modified.unwrap(); + let half_wt = WAIT_TIME.mul_f32(0.5); + let t0_hwt = t0 + half_wt; + let t1_hwt = t1 - half_wt; + if !(t0_hwt..=t1_hwt).contains(&last_modified) { + panic!("last_modified={last_modified:?} is not between t0_hwt={t0_hwt:?} and t1_hwt={t1_hwt:?}. \ + This likely means a large lock discrepancy between S3 and the local clock."); + } + } + + let (data, len) = upload_stream("remote blob data3".as_bytes().into()); + ctx.client.upload(data, len, &path3, None).await?; + + let new_data = "new remote blob data2"; + let (data, len) = upload_stream(new_data.as_bytes().into()); + ctx.client.upload(data, len, &path2, None).await?; + + ctx.client.delete(&path1).await?; + + let t2_files = list_files(&ctx.client).await?; + let t2 = time_point().await; + println!("at t2: {t2_files:?}"); + + // No changes after recovery to t2 (no-op) + let t_final = time_point().await; + ctx.client + .time_travel_recover(None, t2, t_final, CancellationToken::new()) + .await?; + let t2_files_recovered = list_files(&ctx.client).await?; + println!("after recovery to t2: {t2_files_recovered:?}"); + assert_eq!(t2_files, t2_files_recovered); + let path2_recovered_t2 = download_to_vec(ctx.client.download(&path2).await?).await?; + assert_eq!(path2_recovered_t2, new_data.as_bytes()); + + // after recovery to t1: path1 is back, path2 has the old content + let t_final = time_point().await; + ctx.client + .time_travel_recover(None, t1, t_final, CancellationToken::new()) + .await?; + let t1_files_recovered = list_files(&ctx.client).await?; + println!("after recovery to t1: {t1_files_recovered:?}"); + assert_eq!(t1_files, t1_files_recovered); + let path2_recovered_t1 = download_to_vec(ctx.client.download(&path2).await?).await?; + assert_eq!(path2_recovered_t1, old_data.as_bytes()); + + // after recovery to t0: everything is gone except for path1 + let t_final = time_point().await; + ctx.client + .time_travel_recover(None, t0, t_final, CancellationToken::new()) + .await?; + let t0_files_recovered = list_files(&ctx.client).await?; + println!("after recovery to t0: {t0_files_recovered:?}"); + assert_eq!(t0_files, t0_files_recovered); + + // cleanup + ctx.client.delete_objects(&[path1, path2, path3]).await?; + + Ok(()) +} + struct EnabledS3 { client: Arc, base_prefix: &'static str,