Files
neon/libs/remote_storage/src/simulate_failures.rs
John Spray 972470b174 pageserver: use adaptive concurrency in secondary layer downloads (#7675)
## Problem

Secondary downloads are a low priority task, and intentionally do not
try to max out download speeds. This is almost always fine when they are
used through the life of a tenant shard as a continuous "trickle" of
background downloads.

However, there are sometimes circumstances where we would like to
populate a secondary location as fast as we can, within the constraint
that we don't want to impact the activity of attached tenants:
- During node removal, where we will need to create replacements for
secondary locations on the node being removed
- After a shard split, we need new secondary locations for the new
shards to populate before the shards can be migrated to their final
location.

## Summary of changes

- Add an activity() function to the remote storage interface, enabling
callers to query how busy the remote storage backend is
- In the secondary download code, use a very modest amount of
concurrency, driven by the remote storage's state: we only use
concurrency if the remote storage semaphore is 75% free, and scale the
amount of concurrency used within that range.

This is not a super clever form of prioritization, but it should
accomplish the key goals:
- Enable secondary downloads to happen faster when the system is idle
- Make secondary downloads a much lower priority than attached tenants
when the remote storage is busy.

---------

Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
2024-05-13 17:38:30 +00:00

221 lines
7.3 KiB
Rust

//! This module provides a wrapper around a real RemoteStorage implementation that
//! causes the first N attempts at each upload or download operatio to fail. For
//! testing purposes.
use bytes::Bytes;
use futures::stream::Stream;
use std::collections::HashMap;
use std::num::NonZeroU32;
use std::sync::Mutex;
use std::time::SystemTime;
use std::{collections::hash_map::Entry, sync::Arc};
use tokio_util::sync::CancellationToken;
use crate::{
Download, DownloadError, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorage,
RemoteStorageActivity, StorageMetadata, TimeTravelError,
};
pub struct UnreliableWrapper {
inner: GenericRemoteStorage<Arc<VoidStorage>>,
// This many attempts of each operation will fail, then we let it succeed.
attempts_to_fail: u64,
// Tracks how many failed attempts of each operation has been made.
attempts: Mutex<HashMap<RemoteOp, u64>>,
}
/// Used to identify retries of different unique operation.
#[derive(Debug, Hash, Eq, PartialEq)]
enum RemoteOp {
ListPrefixes(Option<RemotePath>),
Upload(RemotePath),
Download(RemotePath),
Delete(RemotePath),
DeleteObjects(Vec<RemotePath>),
TimeTravelRecover(Option<RemotePath>),
}
impl UnreliableWrapper {
pub fn new(inner: crate::GenericRemoteStorage, attempts_to_fail: u64) -> Self {
assert!(attempts_to_fail > 0);
let inner = match inner {
GenericRemoteStorage::AwsS3(s) => GenericRemoteStorage::AwsS3(s),
GenericRemoteStorage::AzureBlob(s) => GenericRemoteStorage::AzureBlob(s),
GenericRemoteStorage::LocalFs(s) => GenericRemoteStorage::LocalFs(s),
// We could also make this a no-op, as in, extract the inner of the passed generic remote storage
GenericRemoteStorage::Unreliable(_s) => {
panic!("Can't wrap unreliable wrapper unreliably")
}
};
UnreliableWrapper {
inner,
attempts_to_fail,
attempts: Mutex::new(HashMap::new()),
}
}
///
/// Common functionality for all operations.
///
/// On the first attempts of this operation, return an error. After 'attempts_to_fail'
/// attempts, let the operation go ahead, and clear the counter.
///
fn attempt(&self, op: RemoteOp) -> anyhow::Result<u64> {
let mut attempts = self.attempts.lock().unwrap();
match attempts.entry(op) {
Entry::Occupied(mut e) => {
let attempts_before_this = {
let p = e.get_mut();
*p += 1;
*p
};
if attempts_before_this >= self.attempts_to_fail {
// let it succeed
e.remove();
Ok(attempts_before_this)
} else {
let error =
anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
Err(error)
}
}
Entry::Vacant(e) => {
let error = anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
e.insert(1);
Err(error)
}
}
}
async fn delete_inner(
&self,
path: &RemotePath,
attempt: bool,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
if attempt {
self.attempt(RemoteOp::Delete(path.clone()))?;
}
self.inner.delete(path, cancel).await
}
}
// We never construct this, so the type is not important, just has to not be UnreliableWrapper and impl RemoteStorage.
type VoidStorage = crate::LocalFs;
impl RemoteStorage for UnreliableWrapper {
async fn list(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> Result<Listing, DownloadError> {
self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
.map_err(DownloadError::Other)?;
self.inner.list(prefix, mode, max_keys, cancel).await
}
async fn upload(
&self,
data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
// S3 PUT request requires the content length to be specified,
// otherwise it starts to fail with the concurrent connection count increasing.
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
self.attempt(RemoteOp::Upload(to.clone()))?;
self.inner
.upload(data, data_size_bytes, to, metadata, cancel)
.await
}
async fn download(
&self,
from: &RemotePath,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
self.attempt(RemoteOp::Download(from.clone()))
.map_err(DownloadError::Other)?;
self.inner.download(from, cancel).await
}
async fn download_byte_range(
&self,
from: &RemotePath,
start_inclusive: u64,
end_exclusive: Option<u64>,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
// Note: We treat any download_byte_range as an "attempt" of the same
// operation. We don't pay attention to the ranges. That's good enough
// for now.
self.attempt(RemoteOp::Download(from.clone()))
.map_err(DownloadError::Other)?;
self.inner
.download_byte_range(from, start_inclusive, end_exclusive, cancel)
.await
}
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
self.delete_inner(path, true, cancel).await
}
async fn delete_objects<'a>(
&self,
paths: &'a [RemotePath],
cancel: &CancellationToken,
) -> anyhow::Result<()> {
self.attempt(RemoteOp::DeleteObjects(paths.to_vec()))?;
let mut error_counter = 0;
for path in paths {
// Dont record attempt because it was already recorded above
if (self.delete_inner(path, false, cancel).await).is_err() {
error_counter += 1;
}
}
if error_counter > 0 {
return Err(anyhow::anyhow!(
"failed to delete {} objects",
error_counter
));
}
Ok(())
}
async fn copy(
&self,
from: &RemotePath,
to: &RemotePath,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
// copy is equivalent to download + upload
self.attempt(RemoteOp::Download(from.clone()))?;
self.attempt(RemoteOp::Upload(to.clone()))?;
self.inner.copy_object(from, to, cancel).await
}
async fn time_travel_recover(
&self,
prefix: Option<&RemotePath>,
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: &CancellationToken,
) -> Result<(), TimeTravelError> {
self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned())))
.map_err(TimeTravelError::Other)?;
self.inner
.time_travel_recover(prefix, timestamp, done_if_after, cancel)
.await
}
fn activity(&self) -> RemoteStorageActivity {
self.inner.activity()
}
}