Do backoff::retry in s3 timetravel test (#6493)

The top level retries weren't enough, probably because we do so many
network requests. Fine grained retries ensure that there is higher
potential for the entire test to succeed.

To demonstrate this, consider the following example: let's assume that
each request has 5% chance of failing and we do 10 requests. Then
chances of success without any retries is 0.95^10 = 0.6. With 3 top
level retries it is 1-0.4^3 = 0.936. With 3 fine grained retries it is
(1-0.05^3)^10 = 0.9988 (roundings implicit). So chances of failure are
6.4% for the top level retry vs 0.12% for the fine grained retry.

Follow-up of #6155
This commit is contained in:
Arpad Müller
2024-01-26 17:43:56 +01:00
committed by GitHub
parent 4c245b0f5a
commit dcc7610ad6

View File

@@ -1,4 +1,5 @@
use std::env;
use std::fmt::{Debug, Display};
use std::num::NonZeroUsize;
use std::ops::ControlFlow;
use std::sync::Arc;
@@ -8,6 +9,7 @@ use std::{collections::HashSet, time::SystemTime};
use crate::common::{download_to_vec, upload_stream};
use anyhow::Context;
use camino::Utf8Path;
use futures_util::Future;
use remote_storage::{
GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config,
};
@@ -22,6 +24,7 @@ mod common;
mod tests_s3;
use common::{cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data};
use utils::backoff;
const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
@@ -39,6 +42,25 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
// to take the time from S3 response headers.
const WAIT_TIME: Duration = Duration::from_millis(3_000);
async fn retry<T, O, F, E>(op: O) -> Result<T, E>
where
E: Display + Debug + 'static,
O: FnMut() -> F,
F: Future<Output = Result<T, E>>,
{
let warn_threshold = 3;
let max_retries = 10;
backoff::retry(
op,
|_e| false,
warn_threshold,
max_retries,
"test retry",
backoff::Cancel::new(CancellationToken::new(), || unreachable!()),
)
.await
}
async fn time_point() -> SystemTime {
tokio::time::sleep(WAIT_TIME).await;
let ret = SystemTime::now();
@@ -47,8 +69,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
}
async fn list_files(client: &Arc<GenericRemoteStorage>) -> anyhow::Result<HashSet<RemotePath>> {
Ok(client
.list_files(None)
Ok(retry(|| client.list_files(None))
.await
.context("list root files failure")?
.into_iter()
@@ -64,16 +85,23 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
.with_context(|| "RemotePath conversion")?;
let (data, len) = upload_stream("remote blob data1".as_bytes().into());
ctx.client.upload(data, len, &path1, None).await?;
retry(|| {
let (data, len) = upload_stream("remote blob data1".as_bytes().into());
ctx.client.upload(data, len, &path1, None)
})
.await?;
let t0_files = list_files(&ctx.client).await?;
let t0 = time_point().await;
println!("at t0: {t0_files:?}");
let old_data = "remote blob data2";
let (data, len) = upload_stream(old_data.as_bytes().into());
ctx.client.upload(data, len, &path2, None).await?;
retry(|| {
let (data, len) = upload_stream(old_data.as_bytes().into());
ctx.client.upload(data, len, &path2, None)
})
.await?;
let t1_files = list_files(&ctx.client).await?;
let t1 = time_point().await;
@@ -81,7 +109,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
// A little check to ensure that our clock is not too far off from the S3 clock
{
let dl = ctx.client.download(&path2).await?;
let dl = retry(|| ctx.client.download(&path2)).await?;
let last_modified = dl.last_modified.unwrap();
let half_wt = WAIT_TIME.mul_f32(0.5);
let t0_hwt = t0 + half_wt;
@@ -92,15 +120,21 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
}
}
let (data, len) = upload_stream("remote blob data3".as_bytes().into());
ctx.client.upload(data, len, &path3, None).await?;
retry(|| {
let (data, len) = upload_stream("remote blob data3".as_bytes().into());
ctx.client.upload(data, len, &path3, None)
})
.await?;
let new_data = "new remote blob data2";
let (data, len) = upload_stream(new_data.as_bytes().into());
ctx.client.upload(data, len, &path2, None).await?;
ctx.client.delete(&path1).await?;
retry(|| {
let (data, len) = upload_stream(new_data.as_bytes().into());
ctx.client.upload(data, len, &path2, None)
})
.await?;
retry(|| ctx.client.delete(&path1)).await?;
let t2_files = list_files(&ctx.client).await?;
let t2 = time_point().await;
println!("at t2: {t2_files:?}");
@@ -137,7 +171,9 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
assert_eq!(t0_files, t0_files_recovered);
// cleanup
ctx.client.delete_objects(&[path1, path2, path3]).await?;
let paths = &[path1, path2, path3];
retry(|| ctx.client.delete_objects(paths)).await?;
Ok(())
}