diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 2b88f09b3d..fb88d4da96 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -21,6 +21,7 @@ env: COPT: '-Werror' AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }} AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }} + NEXTEST_RETRIES: 3 jobs: check-permissions: diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index e3edfd08a5..abab32470b 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -8,6 +8,7 @@ use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use std::time::SystemTime; use super::REMOTE_STORAGE_PREFIX_SEPARATOR; use anyhow::Result; @@ -23,6 +24,7 @@ use futures::stream::Stream; use futures_util::StreamExt; use http_types::{StatusCode, Url}; use tokio::time::Instant; +use tokio_util::sync::CancellationToken; use tracing::debug; use crate::s3_bucket::RequestKind; @@ -370,6 +372,20 @@ impl RemoteStorage for AzureBlobStorage { copy_status = status; } } + + async fn time_travel_recover( + &self, + _prefix: Option<&RemotePath>, + _timestamp: SystemTime, + _done_if_after: SystemTime, + _cancel: CancellationToken, + ) -> anyhow::Result<()> { + // TODO use Azure point in time recovery feature for this + // https://learn.microsoft.com/en-us/azure/storage/blobs/point-in-time-restore-overview + Err(anyhow::anyhow!( + "time travel recovery for azure blob storage is not implemented" + )) + } } pin_project_lite::pin_project! { diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index f247fcdc38..bf9c51ad1a 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -25,6 +25,7 @@ use bytes::Bytes; use futures::stream::Stream; use serde::{Deserialize, Serialize}; use tokio::sync::Semaphore; +use tokio_util::sync::CancellationToken; use toml_edit::Item; use tracing::info; @@ -210,6 +211,15 @@ pub trait RemoteStorage: Send + Sync + 'static { /// Copy a remote object inside a bucket from one path to another. async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()>; + + /// Resets the content of everything with the given prefix to the given state + async fn time_travel_recover( + &self, + prefix: Option<&RemotePath>, + timestamp: SystemTime, + done_if_after: SystemTime, + cancel: CancellationToken, + ) -> anyhow::Result<()>; } pub type DownloadStream = Pin> + Unpin + Send + Sync>>; @@ -387,6 +397,33 @@ impl GenericRemoteStorage> { Self::Unreliable(s) => s.copy(from, to).await, } } + + pub async fn time_travel_recover( + &self, + prefix: Option<&RemotePath>, + timestamp: SystemTime, + done_if_after: SystemTime, + cancel: CancellationToken, + ) -> anyhow::Result<()> { + match self { + Self::LocalFs(s) => { + s.time_travel_recover(prefix, timestamp, done_if_after, cancel) + .await + } + Self::AwsS3(s) => { + s.time_travel_recover(prefix, timestamp, done_if_after, cancel) + .await + } + Self::AzureBlob(s) => { + s.time_travel_recover(prefix, timestamp, done_if_after, cancel) + .await + } + Self::Unreliable(s) => { + s.time_travel_recover(prefix, timestamp, done_if_after, cancel) + .await + } + } + } } impl GenericRemoteStorage { @@ -674,6 +711,7 @@ impl ConcurrencyLimiter { RequestKind::List => &self.read, RequestKind::Delete => &self.write, RequestKind::Copy => &self.write, + RequestKind::TimeTravel => &self.write, } } diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 5f5bc1a9fa..34a6658a69 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -4,7 +4,7 @@ //! This storage used in tests, but can also be used in cases when a certain persistent //! volume is mounted to the local FS. -use std::{borrow::Cow, future::Future, io::ErrorKind, pin::Pin}; +use std::{borrow::Cow, future::Future, io::ErrorKind, pin::Pin, time::SystemTime}; use anyhow::{bail, ensure, Context}; use bytes::Bytes; @@ -14,7 +14,7 @@ use tokio::{ fs, io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}, }; -use tokio_util::io::ReaderStream; +use tokio_util::{io::ReaderStream, sync::CancellationToken}; use tracing::*; use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty}; @@ -422,6 +422,17 @@ impl RemoteStorage for LocalFs { })?; Ok(()) } + + #[allow(clippy::diverging_sub_expression)] + async fn time_travel_recover( + &self, + _prefix: Option<&RemotePath>, + _timestamp: SystemTime, + _done_if_after: SystemTime, + _cancel: CancellationToken, + ) -> anyhow::Result<()> { + unimplemented!() + } } fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf { diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index e72cf28228..4909b8522b 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -6,12 +6,14 @@ use std::{ borrow::Cow, + collections::HashMap, pin::Pin, sync::Arc, task::{Context, Poll}, + time::SystemTime, }; -use anyhow::Context as _; +use anyhow::{anyhow, Context as _}; use aws_config::{ environment::credentials::EnvironmentVariableCredentialsProvider, imds::credentials::ImdsCredentialsProvider, @@ -27,17 +29,19 @@ use aws_sdk_s3::{ config::{AsyncSleep, Builder, IdentityCache, Region, SharedAsyncSleep}, error::SdkError, operation::get_object::GetObjectError, - types::{Delete, ObjectIdentifier}, + types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion}, Client, }; use aws_smithy_async::rt::sleep::TokioSleep; -use aws_smithy_types::body::SdkBody; use aws_smithy_types::byte_stream::ByteStream; +use aws_smithy_types::{body::SdkBody, DateTime}; use bytes::Bytes; use futures::stream::Stream; use hyper::Body; use scopeguard::ScopeGuard; +use tokio_util::sync::CancellationToken; +use utils::backoff; use super::StorageMetadata; use crate::{ @@ -270,6 +274,59 @@ impl S3Bucket { } } } + + async fn delete_oids( + &self, + kind: RequestKind, + delete_objects: &[ObjectIdentifier], + ) -> anyhow::Result<()> { + for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) { + let started_at = start_measuring_requests(kind); + + let resp = self + .client + .delete_objects() + .bucket(self.bucket_name.clone()) + .delete( + Delete::builder() + .set_objects(Some(chunk.to_vec())) + .build()?, + ) + .send() + .await; + + let started_at = ScopeGuard::into_inner(started_at); + metrics::BUCKET_METRICS + .req_seconds + .observe_elapsed(kind, &resp, started_at); + + let resp = resp?; + metrics::BUCKET_METRICS + .deleted_objects_total + .inc_by(chunk.len() as u64); + if let Some(errors) = resp.errors { + // Log a bounded number of the errors within the response: + // these requests can carry 1000 keys so logging each one + // would be too verbose, especially as errors may lead us + // to retry repeatedly. + const LOG_UP_TO_N_ERRORS: usize = 10; + for e in errors.iter().take(LOG_UP_TO_N_ERRORS) { + tracing::warn!( + "DeleteObjects key {} failed: {}: {}", + e.key.as_ref().map(Cow::from).unwrap_or("".into()), + e.code.as_ref().map(Cow::from).unwrap_or("".into()), + e.message.as_ref().map(Cow::from).unwrap_or("".into()) + ); + } + + return Err(anyhow::format_err!( + "Failed to delete {} objects", + errors.len() + )); + } + } + Ok(()) + } } pin_project_lite::pin_project! { @@ -568,64 +625,168 @@ impl RemoteStorage for S3Bucket { delete_objects.push(obj_id); } - for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) { - let started_at = start_measuring_requests(kind); - - let resp = self - .client - .delete_objects() - .bucket(self.bucket_name.clone()) - .delete( - Delete::builder() - .set_objects(Some(chunk.to_vec())) - .build()?, - ) - .send() - .await; - - let started_at = ScopeGuard::into_inner(started_at); - metrics::BUCKET_METRICS - .req_seconds - .observe_elapsed(kind, &resp, started_at); - - match resp { - Ok(resp) => { - metrics::BUCKET_METRICS - .deleted_objects_total - .inc_by(chunk.len() as u64); - if let Some(errors) = resp.errors { - // Log a bounded number of the errors within the response: - // these requests can carry 1000 keys so logging each one - // would be too verbose, especially as errors may lead us - // to retry repeatedly. - const LOG_UP_TO_N_ERRORS: usize = 10; - for e in errors.iter().take(LOG_UP_TO_N_ERRORS) { - tracing::warn!( - "DeleteObjects key {} failed: {}: {}", - e.key.as_ref().map(Cow::from).unwrap_or("".into()), - e.code.as_ref().map(Cow::from).unwrap_or("".into()), - e.message.as_ref().map(Cow::from).unwrap_or("".into()) - ); - } - - return Err(anyhow::format_err!( - "Failed to delete {} objects", - errors.len() - )); - } - } - Err(e) => { - return Err(e.into()); - } - } - } - Ok(()) + self.delete_oids(kind, &delete_objects).await } async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> { let paths = std::array::from_ref(path); self.delete_objects(paths).await } + + async fn time_travel_recover( + &self, + prefix: Option<&RemotePath>, + timestamp: SystemTime, + done_if_after: SystemTime, + cancel: CancellationToken, + ) -> anyhow::Result<()> { + let kind = RequestKind::TimeTravel; + let _guard = self.permit(kind).await; + + let timestamp = DateTime::from(timestamp); + let done_if_after = DateTime::from(done_if_after); + + tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}"); + + // get the passed prefix or if it is not set use prefix_in_bucket value + let prefix = prefix + .map(|p| self.relative_path_to_s3_object(p)) + .or_else(|| self.prefix_in_bucket.clone()); + + let warn_threshold = 3; + let max_retries = 10; + let is_permanent = |_e: &_| false; + + let list = backoff::retry( + || async { + Ok(self + .client + .list_object_versions() + .bucket(self.bucket_name.clone()) + .set_prefix(prefix.clone()) + .send() + .await?) + }, + is_permanent, + warn_threshold, + max_retries, + "listing object versions for time_travel_recover", + backoff::Cancel::new(cancel.clone(), || anyhow!("Cancelled")), + ) + .await?; + + if list.is_truncated().unwrap_or_default() { + anyhow::bail!("Received truncated ListObjectVersions response for prefix={prefix:?}"); + } + + let mut versions_deletes = list + .versions() + .iter() + .map(VerOrDelete::Version) + .chain(list.delete_markers().iter().map(VerOrDelete::DeleteMarker)) + .collect::>(); + + versions_deletes.sort_by_key(|vd| (vd.key(), vd.last_modified())); + + let mut vds_for_key = HashMap::<_, Vec<_>>::new(); + + for vd in versions_deletes { + let last_modified = vd.last_modified(); + let version_id = vd.version_id(); + let key = vd.key(); + let (Some(last_modified), Some(version_id), Some(key)) = + (last_modified, version_id, key) + else { + anyhow::bail!( + "One (or more) of last_modified, key, and id is None. \ + Is versioning enabled in the bucket? last_modified={:?} key={:?} version_id={:?}", + last_modified, key, version_id, + ); + }; + if version_id == "null" { + anyhow::bail!("Received ListVersions response for key={key} with version_id='null', \ + indicating either disabled versioning, or legacy objects with null version id values"); + } + tracing::trace!( + "Parsing version key={key} version_id={version_id} is_delete={}", + matches!(vd, VerOrDelete::DeleteMarker(_)) + ); + + vds_for_key + .entry(key) + .or_default() + .push((vd, last_modified, version_id)); + } + for (key, versions) in vds_for_key { + let (last_vd, last_last_modified, _version_id) = versions.last().unwrap(); + if last_last_modified > &&done_if_after { + tracing::trace!("Key {key} has version later than done_if_after, skipping"); + continue; + } + // the version we want to restore to. + let version_to_restore_to = + match versions.binary_search_by_key(×tamp, |tpl| *tpl.1) { + Ok(v) => v, + Err(e) => e, + }; + if version_to_restore_to == versions.len() { + tracing::trace!("Key {key} has no changes since timestamp, skipping"); + continue; + } + let mut do_delete = false; + if version_to_restore_to == 0 { + // All versions more recent, so the key didn't exist at the specified time point. + tracing::trace!( + "All {} versions more recent for {key}, deleting", + versions.len() + ); + do_delete = true; + } else { + match &versions[version_to_restore_to - 1] { + (VerOrDelete::Version(_), _last_modified, version_id) => { + tracing::trace!("Copying old version {version_id} for {key}..."); + // Restore the state to the last version by copying + let source_id = + format!("{}/{key}?versionId={version_id}", self.bucket_name); + + backoff::retry( + || async { + Ok(self + .client + .copy_object() + .bucket(self.bucket_name.clone()) + .key(key) + .copy_source(&source_id) + .send() + .await?) + }, + is_permanent, + warn_threshold, + max_retries, + "listing object versions for time_travel_recover", + backoff::Cancel::new(cancel.clone(), || anyhow!("Cancelled")), + ) + .await?; + } + (VerOrDelete::DeleteMarker(_), _last_modified, _version_id) => { + do_delete = true; + } + } + }; + if do_delete { + if matches!(last_vd, VerOrDelete::DeleteMarker(_)) { + // Key has since been deleted (but there was some history), no need to do anything + tracing::trace!("Key {key} already deleted, skipping."); + } else { + tracing::trace!("Deleting {key}..."); + + let oid = ObjectIdentifier::builder().key(key.to_owned()).build()?; + self.delete_oids(kind, &[oid]).await?; + } + } + } + Ok(()) + } } /// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`]. @@ -650,6 +811,32 @@ fn start_measuring_requests( }) } +enum VerOrDelete<'a> { + Version(&'a ObjectVersion), + DeleteMarker(&'a DeleteMarkerEntry), +} + +impl<'a> VerOrDelete<'a> { + fn last_modified(&self) -> Option<&'a DateTime> { + match self { + VerOrDelete::Version(v) => v.last_modified(), + VerOrDelete::DeleteMarker(v) => v.last_modified(), + } + } + fn version_id(&self) -> Option<&'a str> { + match self { + VerOrDelete::Version(v) => v.version_id(), + VerOrDelete::DeleteMarker(v) => v.version_id(), + } + } + fn key(&self) -> Option<&'a str> { + match self { + VerOrDelete::Version(v) => v.key(), + VerOrDelete::DeleteMarker(v) => v.key(), + } + } +} + #[cfg(test)] mod tests { use camino::Utf8Path; diff --git a/libs/remote_storage/src/s3_bucket/metrics.rs b/libs/remote_storage/src/s3_bucket/metrics.rs index 21dde14906..beca755920 100644 --- a/libs/remote_storage/src/s3_bucket/metrics.rs +++ b/libs/remote_storage/src/s3_bucket/metrics.rs @@ -12,6 +12,7 @@ pub(crate) enum RequestKind { Delete = 2, List = 3, Copy = 4, + TimeTravel = 5, } use RequestKind::*; @@ -24,6 +25,7 @@ impl RequestKind { Delete => "delete_object", List => "list_objects", Copy => "copy_object", + TimeTravel => "time_travel_recover", } } const fn as_index(&self) -> usize { @@ -31,7 +33,7 @@ impl RequestKind { } } -pub(super) struct RequestTyped([C; 5]); +pub(super) struct RequestTyped([C; 6]); impl RequestTyped { pub(super) fn get(&self, kind: RequestKind) -> &C { @@ -40,8 +42,8 @@ impl RequestTyped { fn build_with(mut f: impl FnMut(RequestKind) -> C) -> Self { use RequestKind::*; - let mut it = [Get, Put, Delete, List, Copy].into_iter(); - let arr = std::array::from_fn::(|index| { + let mut it = [Get, Put, Delete, List, Copy, TimeTravel].into_iter(); + let arr = std::array::from_fn::(|index| { let next = it.next().unwrap(); assert_eq!(index, next.as_index()); f(next) diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index 90d90163a7..fc4c4b315b 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -5,7 +5,9 @@ use bytes::Bytes; use futures::stream::Stream; use std::collections::HashMap; use std::sync::Mutex; +use std::time::SystemTime; use std::{collections::hash_map::Entry, sync::Arc}; +use tokio_util::sync::CancellationToken; use crate::{ Download, DownloadError, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorage, @@ -30,6 +32,7 @@ enum RemoteOp { Download(RemotePath), Delete(RemotePath), DeleteObjects(Vec), + TimeTravelRecover(Option), } impl UnreliableWrapper { @@ -181,4 +184,17 @@ impl RemoteStorage for UnreliableWrapper { self.attempt(RemoteOp::Upload(to.clone()))?; self.inner.copy_object(from, to).await } + + async fn time_travel_recover( + &self, + prefix: Option<&RemotePath>, + timestamp: SystemTime, + done_if_after: SystemTime, + cancel: CancellationToken, + ) -> anyhow::Result<()> { + self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned())))?; + self.inner + .time_travel_recover(prefix, timestamp, done_if_after, cancel) + .await + } } diff --git a/libs/remote_storage/tests/test_real_s3.rs b/libs/remote_storage/tests/test_real_s3.rs index 4a999d115e..9e1b989e4d 100644 --- a/libs/remote_storage/tests/test_real_s3.rs +++ b/libs/remote_storage/tests/test_real_s3.rs @@ -1,15 +1,19 @@ -use std::collections::HashSet; use std::env; use std::num::NonZeroUsize; use std::ops::ControlFlow; use std::sync::Arc; -use std::time::UNIX_EPOCH; +use std::time::{Duration, UNIX_EPOCH}; +use std::{collections::HashSet, time::SystemTime}; +use crate::common::{download_to_vec, upload_stream}; use anyhow::Context; +use camino::Utf8Path; use remote_storage::{ GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config, }; +use test_context::test_context; use test_context::AsyncTestContext; +use tokio_util::sync::CancellationToken; use tracing::info; mod common; @@ -23,6 +27,121 @@ const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_ const BASE_PREFIX: &str = "test"; +#[test_context(MaybeEnabledStorage)] +#[tokio::test] +async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> { + let ctx = match ctx { + MaybeEnabledStorage::Enabled(ctx) => ctx, + MaybeEnabledStorage::Disabled => return Ok(()), + }; + // Our test depends on discrepancies in the clock between S3 and the environment the tests + // run in. Therefore, wait a little bit before and after. The alternative would be + // to take the time from S3 response headers. + const WAIT_TIME: Duration = Duration::from_millis(3_000); + + async fn time_point() -> SystemTime { + tokio::time::sleep(WAIT_TIME).await; + let ret = SystemTime::now(); + tokio::time::sleep(WAIT_TIME).await; + ret + } + + async fn list_files(client: &Arc) -> anyhow::Result> { + Ok(client + .list_files(None) + .await + .context("list root files failure")? + .into_iter() + .collect::>()) + } + + let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str())) + .with_context(|| "RemotePath conversion")?; + + let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str())) + .with_context(|| "RemotePath conversion")?; + + let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str())) + .with_context(|| "RemotePath conversion")?; + + let (data, len) = upload_stream("remote blob data1".as_bytes().into()); + ctx.client.upload(data, len, &path1, None).await?; + + let t0_files = list_files(&ctx.client).await?; + let t0 = time_point().await; + println!("at t0: {t0_files:?}"); + + let old_data = "remote blob data2"; + let (data, len) = upload_stream(old_data.as_bytes().into()); + ctx.client.upload(data, len, &path2, None).await?; + + let t1_files = list_files(&ctx.client).await?; + let t1 = time_point().await; + println!("at t1: {t1_files:?}"); + + // A little check to ensure that our clock is not too far off from the S3 clock + { + let dl = ctx.client.download(&path2).await?; + let last_modified = dl.last_modified.unwrap(); + let half_wt = WAIT_TIME.mul_f32(0.5); + let t0_hwt = t0 + half_wt; + let t1_hwt = t1 - half_wt; + if !(t0_hwt..=t1_hwt).contains(&last_modified) { + panic!("last_modified={last_modified:?} is not between t0_hwt={t0_hwt:?} and t1_hwt={t1_hwt:?}. \ + This likely means a large lock discrepancy between S3 and the local clock."); + } + } + + let (data, len) = upload_stream("remote blob data3".as_bytes().into()); + ctx.client.upload(data, len, &path3, None).await?; + + let new_data = "new remote blob data2"; + let (data, len) = upload_stream(new_data.as_bytes().into()); + ctx.client.upload(data, len, &path2, None).await?; + + ctx.client.delete(&path1).await?; + + let t2_files = list_files(&ctx.client).await?; + let t2 = time_point().await; + println!("at t2: {t2_files:?}"); + + // No changes after recovery to t2 (no-op) + let t_final = time_point().await; + ctx.client + .time_travel_recover(None, t2, t_final, CancellationToken::new()) + .await?; + let t2_files_recovered = list_files(&ctx.client).await?; + println!("after recovery to t2: {t2_files_recovered:?}"); + assert_eq!(t2_files, t2_files_recovered); + let path2_recovered_t2 = download_to_vec(ctx.client.download(&path2).await?).await?; + assert_eq!(path2_recovered_t2, new_data.as_bytes()); + + // after recovery to t1: path1 is back, path2 has the old content + let t_final = time_point().await; + ctx.client + .time_travel_recover(None, t1, t_final, CancellationToken::new()) + .await?; + let t1_files_recovered = list_files(&ctx.client).await?; + println!("after recovery to t1: {t1_files_recovered:?}"); + assert_eq!(t1_files, t1_files_recovered); + let path2_recovered_t1 = download_to_vec(ctx.client.download(&path2).await?).await?; + assert_eq!(path2_recovered_t1, old_data.as_bytes()); + + // after recovery to t0: everything is gone except for path1 + let t_final = time_point().await; + ctx.client + .time_travel_recover(None, t0, t_final, CancellationToken::new()) + .await?; + let t0_files_recovered = list_files(&ctx.client).await?; + println!("after recovery to t0: {t0_files_recovered:?}"); + assert_eq!(t0_files, t0_files_recovered); + + // cleanup + ctx.client.delete_objects(&[path1, path2, path3]).await?; + + Ok(()) +} + struct EnabledS3 { client: Arc, base_prefix: &'static str,