Retry remote downloads.

Remote operations fail sometimes due to network failures or other
external reasons. Add retry logic to all the remote downloads, so that
a transient failure at pageserver startup or tenant attach doesn't
cause the whole tenant to be marked as Broken.

Like in the uploads retry logic, we print the failure to the log as a
WARNing after three retries, but keep retrying. We will retry up to 10
times now, before returning the error to the caller.

To test the retries, I created a new RemoteStorage wrapper that simulates
failures, by returning an error for the first N times that a remote
operation is performed. It can be enabled by setting a new
"test_remote_failures" option in the pageserver config file.

Fixes #3112
This commit is contained in:
Heikki Linnakangas
2022-12-20 13:34:21 +02:00
committed by Heikki Linnakangas
parent 4cda9919bf
commit 8e2edfcf39
12 changed files with 480 additions and 83 deletions

View File

@@ -7,6 +7,7 @@
//!
mod local_fs;
mod s3_bucket;
mod simulate_failures;
use std::{
collections::HashMap,
@@ -24,7 +25,7 @@ use tokio::io;
use toml_edit::Item;
use tracing::info;
pub use self::{local_fs::LocalFs, s3_bucket::S3Bucket};
pub use self::{local_fs::LocalFs, s3_bucket::S3Bucket, simulate_failures::UnreliableWrapper};
/// How many different timelines can be processed simultaneously when synchronizing layers with the remote storage.
/// During regular work, pageserver produces one layer file per timeline checkpoint, with bursts of concurrency
@@ -77,7 +78,10 @@ pub trait RemoteStorage: Send + Sync + 'static {
/// Note: here we assume that if the prefix is passed it was obtained via remote_object_id
/// which already takes into account any kind of global prefix (prefix_in_bucket for S3 or storage_root for LocalFS)
/// so this method doesnt need to.
async fn list_prefixes(&self, prefix: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>>;
async fn list_prefixes(
&self,
prefix: Option<&RemotePath>,
) -> Result<Vec<RemotePath>, DownloadError>;
/// Streams the local file contents into remote into the remote storage entry.
async fn upload(
@@ -150,6 +154,7 @@ impl std::error::Error for DownloadError {}
pub enum GenericRemoteStorage {
LocalFs(LocalFs),
AwsS3(Arc<S3Bucket>),
Unreliable(Arc<UnreliableWrapper>),
}
impl Deref for GenericRemoteStorage {
@@ -159,6 +164,7 @@ impl Deref for GenericRemoteStorage {
match self {
GenericRemoteStorage::LocalFs(local_fs) => local_fs,
GenericRemoteStorage::AwsS3(s3_bucket) => s3_bucket.as_ref(),
GenericRemoteStorage::Unreliable(s) => s.as_ref(),
}
}
}
@@ -178,6 +184,10 @@ impl GenericRemoteStorage {
})
}
pub fn unreliable_wrapper(s: Self, fail_first: u64) -> Self {
Self::Unreliable(Arc::new(UnreliableWrapper::new(s, fail_first)))
}
/// Takes storage object contents and its size and uploads to remote storage,
/// mapping `from_path` to the corresponding remote object id in the storage.
///

View File

@@ -92,13 +92,17 @@ impl RemoteStorage for LocalFs {
.collect())
}
async fn list_prefixes(&self, prefix: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
async fn list_prefixes(
&self,
prefix: Option<&RemotePath>,
) -> Result<Vec<RemotePath>, DownloadError> {
let path = match prefix {
Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)),
None => Cow::Borrowed(&self.storage_root),
};
Ok(get_all_files(path.as_ref(), false)
.await?
.await
.map_err(DownloadError::Other)?
.into_iter()
.map(|path| {
path.strip_prefix(&self.storage_root)

View File

@@ -286,7 +286,10 @@ impl RemoteStorage for S3Bucket {
/// See the doc for `RemoteStorage::list_prefixes`
/// Note: it wont include empty "directories"
async fn list_prefixes(&self, prefix: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
async fn list_prefixes(
&self,
prefix: Option<&RemotePath>,
) -> Result<Vec<RemotePath>, DownloadError> {
// get the passed prefix or if it is not set use prefix_in_bucket value
let list_prefix = prefix
.map(|p| self.relative_path_to_s3_object(p))
@@ -308,7 +311,8 @@ impl RemoteStorage for S3Bucket {
.concurrency_limiter
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 list")?;
.context("Concurrency limiter semaphore got closed during S3 list")
.map_err(DownloadError::Other)?;
metrics::inc_list_objects();
@@ -324,7 +328,9 @@ impl RemoteStorage for S3Bucket {
.map_err(|e| {
metrics::inc_list_objects_fail();
e
})?;
})
.context("Failed to list S3 prefixes")
.map_err(DownloadError::Other)?;
document_keys.extend(
fetch_response

View File

@@ -0,0 +1,129 @@
//! This module provides a wrapper around a real RemoteStorage implementation that
//! causes the first N attempts at each upload or download operatio to fail. For
//! testing purposes.
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Mutex;
use crate::{Download, DownloadError, RemotePath, RemoteStorage, StorageMetadata};
pub struct UnreliableWrapper {
inner: crate::GenericRemoteStorage,
// This many attempts of each operation will fail, then we let it succeed.
attempts_to_fail: u64,
// Tracks how many failed attempts of each operation has been made.
attempts: Mutex<HashMap<RemoteOp, u64>>,
}
/// Used to identify retries of different unique operation.
#[derive(Debug, Hash, Eq, PartialEq)]
enum RemoteOp {
List,
ListPrefixes(Option<RemotePath>),
Upload(RemotePath),
Download(RemotePath),
Delete(RemotePath),
}
impl UnreliableWrapper {
pub fn new(inner: crate::GenericRemoteStorage, attempts_to_fail: u64) -> Self {
assert!(attempts_to_fail > 0);
UnreliableWrapper {
inner,
attempts_to_fail,
attempts: Mutex::new(HashMap::new()),
}
}
///
/// Common functionality for all operations.
///
/// On the first attempts of this operation, return an error. After 'attempts_to_fail'
/// attempts, let the operation go ahead, and clear the counter.
///
fn attempt(&self, op: RemoteOp) -> Result<u64, DownloadError> {
let mut attempts = self.attempts.lock().unwrap();
match attempts.entry(op) {
Entry::Occupied(mut e) => {
let attempts_before_this = {
let p = e.get_mut();
*p += 1;
*p
};
if attempts_before_this >= self.attempts_to_fail {
// let it succeed
e.remove();
Ok(attempts_before_this)
} else {
let error =
anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
Err(DownloadError::Other(error))
}
}
Entry::Vacant(e) => {
let error = anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
e.insert(1);
Err(DownloadError::Other(error))
}
}
}
}
#[async_trait::async_trait]
impl RemoteStorage for UnreliableWrapper {
/// Lists all items the storage has right now.
async fn list(&self) -> anyhow::Result<Vec<RemotePath>> {
self.attempt(RemoteOp::List)?;
self.inner.list().await
}
async fn list_prefixes(
&self,
prefix: Option<&RemotePath>,
) -> Result<Vec<RemotePath>, DownloadError> {
self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))?;
self.inner.list_prefixes(prefix).await
}
async fn upload(
&self,
data: Box<(dyn tokio::io::AsyncRead + Unpin + Send + Sync + 'static)>,
// S3 PUT request requires the content length to be specified,
// otherwise it starts to fail with the concurrent connection count increasing.
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
) -> anyhow::Result<()> {
self.attempt(RemoteOp::Upload(to.clone()))?;
self.inner.upload(data, data_size_bytes, to, metadata).await
}
async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
self.attempt(RemoteOp::Download(from.clone()))?;
self.inner.download(from).await
}
async fn download_byte_range(
&self,
from: &RemotePath,
start_inclusive: u64,
end_exclusive: Option<u64>,
) -> Result<Download, DownloadError> {
// Note: We treat any download_byte_range as an "attempt" of the same
// operation. We don't pay attention to the ranges. That's good enough
// for now.
self.attempt(RemoteOp::Download(from.clone()))?;
self.inner
.download_byte_range(from, start_inclusive, end_exclusive)
.await
}
async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
self.attempt(RemoteOp::Delete(path.clone()))?;
self.inner.delete(path).await
}
}

View File

@@ -12,14 +12,13 @@ use tracing::*;
use metrics::set_build_info_metric;
use pageserver::{
config::{defaults::*, PageServerConf},
http, page_cache, page_service, profiling, task_mgr,
http, page_cache, page_service, profiling, storage_sync2, task_mgr,
task_mgr::TaskKind,
task_mgr::{
BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME, WALRECEIVER_RUNTIME,
},
tenant_mgr, virtual_file,
};
use remote_storage::GenericRemoteStorage;
use utils::{
auth::JwtAuth,
logging,
@@ -281,12 +280,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
};
// Set up remote storage client
let remote_storage = conf
.remote_storage_config
.as_ref()
.map(GenericRemoteStorage::from_config)
.transpose()
.context("Failed to init generic remote storage")?;
let remote_storage = storage_sync2::create_remote_storage_client(conf)?;
// Scan the local 'tenants/' directory and start loading the tenants
BACKGROUND_RUNTIME.block_on(tenant_mgr::init_tenant_mgr(conf, remote_storage.clone()))?;

View File

@@ -143,6 +143,8 @@ pub struct PageServerConf {
/// Number of concurrent [`Tenant::gather_size_inputs`] allowed.
pub concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore,
pub test_remote_failures: u64,
}
/// We do not want to store this in a PageServerConf because the latter may be logged
@@ -221,6 +223,8 @@ struct PageServerConfigBuilder {
log_format: BuilderValue<LogFormat>,
concurrent_tenant_size_logical_size_queries: BuilderValue<ConfigurableSemaphore>,
test_remote_failures: BuilderValue<u64>,
}
impl Default for PageServerConfigBuilder {
@@ -256,6 +260,8 @@ impl Default for PageServerConfigBuilder {
log_format: Set(LogFormat::from_str(DEFAULT_LOG_FORMAT).unwrap()),
concurrent_tenant_size_logical_size_queries: Set(ConfigurableSemaphore::default()),
test_remote_failures: Set(0),
}
}
}
@@ -336,6 +342,10 @@ impl PageServerConfigBuilder {
self.concurrent_tenant_size_logical_size_queries = BuilderValue::Set(u);
}
pub fn test_remote_failures(&mut self, fail_first: u64) {
self.test_remote_failures = BuilderValue::Set(fail_first);
}
pub fn build(self) -> anyhow::Result<PageServerConf> {
Ok(PageServerConf {
listen_pg_addr: self
@@ -384,6 +394,9 @@ impl PageServerConfigBuilder {
.ok_or(anyhow!(
"missing concurrent_tenant_size_logical_size_queries"
))?,
test_remote_failures: self
.test_remote_failures
.ok_or(anyhow!("missing test_remote_failuers"))?,
})
}
}
@@ -555,6 +568,7 @@ impl PageServerConf {
let permits = NonZeroUsize::new(permits).context("initial semaphore permits out of range: 0, use other configuration to disable a feature")?;
ConfigurableSemaphore::new(permits)
}),
"test_remote_failures" => builder.test_remote_failures(parse_toml_u64(key, item)?),
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
@@ -676,6 +690,7 @@ impl PageServerConf {
broker_keepalive_interval: Duration::from_secs(5000),
log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
test_remote_failures: 0,
}
}
}
@@ -849,6 +864,7 @@ log_format = 'json'
)?,
log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
test_remote_failures: 0,
},
"Correct defaults should be used when no config values are provided"
);
@@ -893,6 +909,7 @@ log_format = 'json'
broker_keepalive_interval: Duration::from_secs(5),
log_format: LogFormat::Json,
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
test_remote_failures: 0,
},
"Should be able to parse all basic config values correctly"
);

View File

@@ -227,6 +227,18 @@ use crate::{
use utils::id::{TenantId, TimelineId};
// Occasional network issues and such can cause remote operations to fail, and
// that's expected. If a download fails, we log it at info-level, and retry.
// But after FAILED_DOWNLOAD_WARN_THRESHOLD retries, we start to log it at WARN
// level instead, as repeated failures can mean a more serious problem. If it
// fails more than FAILED_DOWNLOAD_RETRIES times, we give up
const FAILED_DOWNLOAD_WARN_THRESHOLD: u32 = 3;
const FAILED_DOWNLOAD_RETRIES: u32 = 10;
// Similarly log failed uploads and deletions at WARN level, after this many
// retries. Uploads and deletions are retried forever, though.
const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
/// A client for accessing a timeline's data in remote storage.
///
/// This takes care of managing the number of connections, and balancing them
@@ -977,12 +989,14 @@ impl RemoteTimelineClient {
Err(e) => {
let retries = task.retries.fetch_add(1, Ordering::SeqCst);
// uploads may fail due to rate limts (IAM, S3) or spurious network and external errors
// such issues are relatively regular, so don't use WARN or ERROR to avoid alerting
// people and tests until the retries are definitely causing delays.
if retries < 3 {
// Uploads can fail due to rate limits (IAM, S3), spurious network problems,
// or other external reasons. Such issues are relatively regular, so log them
// at info level at first, and only WARN if the operation fails repeatedly.
//
// (See similar logic for downloads in `download::download_retry`)
if retries < FAILED_UPLOAD_WARN_THRESHOLD {
info!(
"failed to perform remote task {}, will retry (attempt {}): {:?}",
"failed to perform remote task {}, will retry (attempt {}): {:#}",
task.op, retries, e
);
} else {
@@ -1148,6 +1162,39 @@ pub fn create_remote_timeline_client(
})
}
///
/// Create GenericRemoteStorage client from the pageserver config
///
pub fn create_remote_storage_client(
conf: &'static PageServerConf,
) -> anyhow::Result<Option<GenericRemoteStorage>> {
let config = if let Some(config) = &conf.remote_storage_config {
config
} else {
// No remote storage configured.
return Ok(None);
};
// Create the client
let mut remote_storage = GenericRemoteStorage::from_config(config)?;
// If `test_remote_failures` is non-zero, wrap the client with a
// wrapper that simulates failures.
if conf.test_remote_failures > 0 {
if !cfg!(feature = "testing") {
anyhow::bail!("test_remote_failures option is not available because pageserver was compiled without the 'testing' feature");
}
info!(
"Simulating remote failures for first {} attempts of each op",
conf.test_remote_failures
);
remote_storage =
GenericRemoteStorage::unreliable_wrapper(remote_storage, conf.test_remote_failures);
}
Ok(Some(remote_storage))
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -1,21 +1,28 @@
//! Helper functions to download files from remote storage with a RemoteStorage
//!
//! The functions in this module retry failed operations automatically, according
//! to the FAILED_DOWNLOAD_RETRIES constant.
use std::collections::HashSet;
use std::future::Future;
use std::path::Path;
use anyhow::{bail, Context};
use anyhow::{anyhow, Context};
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::fs;
use tokio::io::AsyncWriteExt;
use tracing::{debug, info_span, Instrument};
use tracing::{debug, error, info, info_span, warn, Instrument};
use crate::config::PageServerConf;
use crate::storage_sync::index::LayerFileMetadata;
use crate::tenant::filename::LayerFileName;
use crate::{exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS};
use remote_storage::{DownloadError, GenericRemoteStorage};
use utils::crashsafe::path_with_suffix_extension;
use utils::id::{TenantId, TimelineId};
use super::index::{IndexPart, IndexPartUnclean};
use super::{FAILED_DOWNLOAD_RETRIES, FAILED_DOWNLOAD_WARN_THRESHOLD};
async fn fsync_path(path: impl AsRef<std::path::Path>) -> Result<(), std::io::Error> {
fs::File::open(path).await?.sync_all().await
@@ -33,12 +40,14 @@ pub async fn download_layer_file<'a>(
timeline_id: TimelineId,
layer_file_name: &'a LayerFileName,
layer_metadata: &'a LayerFileMetadata,
) -> anyhow::Result<u64> {
) -> Result<u64, DownloadError> {
let timeline_path = conf.timeline_path(&timeline_id, &tenant_id);
let local_path = timeline_path.join(layer_file_name.file_name());
let remote_path = conf.remote_path(&local_path)?;
let remote_path = conf
.remote_path(&local_path)
.map_err(DownloadError::Other)?;
// Perform a rename inspired by durable_rename from file_utils.c.
// The sequence:
@@ -52,21 +61,30 @@ pub async fn download_layer_file<'a>(
// If pageserver crashes the temp file will be deleted on startup and re-downloaded.
let temp_file_path = path_with_suffix_extension(&local_path, TEMP_DOWNLOAD_EXTENSION);
// TODO: this doesn't use the cached fd for some reason?
let mut destination_file = fs::File::create(&temp_file_path).await.with_context(|| {
format!(
"Failed to create a destination file for layer '{}'",
temp_file_path.display()
)
})?;
let mut download = storage.download(&remote_path).await.with_context(|| {
format!(
"Failed to open a download stream for layer with remote storage path '{remote_path:?}'"
)
})?;
let bytes_amount = tokio::io::copy(&mut download.download_stream, &mut destination_file).await.with_context(|| {
format!("Failed to download layer with remote storage path '{remote_path:?}' into file {temp_file_path:?}")
})?;
let (mut destination_file, bytes_amount) = download_retry(
|| async {
// TODO: this doesn't use the cached fd for some reason?
let mut destination_file = fs::File::create(&temp_file_path).await.with_context(|| {
format!(
"Failed to create a destination file for layer '{}'",
temp_file_path.display()
)
})
.map_err(DownloadError::Other)?;
let mut download = storage.download(&remote_path).await.with_context(|| {
format!(
"Failed to open a download stream for layer with remote storage path '{remote_path:?}'"
)
})
.map_err(DownloadError::Other)?;
let bytes_amount = tokio::io::copy(&mut download.download_stream, &mut destination_file).await.with_context(|| {
format!("Failed to download layer with remote storage path '{remote_path:?}' into file {temp_file_path:?}")
})
.map_err(DownloadError::Other)?;
Ok((destination_file, bytes_amount))
},
&format!("download {remote_path:?}"),
).await?;
// Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
// A file will not be closed immediately when it goes out of scope if there are any IO operations
@@ -76,19 +94,23 @@ pub async fn download_layer_file<'a>(
// From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
// we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
// But for additional safety lets check/wait for any pending operations.
destination_file.flush().await.with_context(|| {
format!(
"failed to flush source file at {}",
temp_file_path.display()
)
})?;
destination_file
.flush()
.await
.with_context(|| {
format!(
"failed to flush source file at {}",
temp_file_path.display()
)
})
.map_err(DownloadError::Other)?;
match layer_metadata.file_size() {
Some(expected) if expected != bytes_amount => {
anyhow::bail!(
"According to layer file metadata should had downloaded {expected} bytes but downloaded {bytes_amount} bytes into file '{}'",
return Err(DownloadError::Other(anyhow!(
"According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file '{}'",
temp_file_path.display()
);
)));
}
Some(_) | None => {
// matches, or upgrading from an earlier IndexPart version
@@ -96,23 +118,38 @@ pub async fn download_layer_file<'a>(
}
// not using sync_data because it can lose file size update
destination_file.sync_all().await.with_context(|| {
format!(
"failed to fsync source file at {}",
temp_file_path.display()
)
})?;
destination_file
.sync_all()
.await
.with_context(|| {
format!(
"failed to fsync source file at {}",
temp_file_path.display()
)
})
.map_err(DownloadError::Other)?;
drop(destination_file);
fail::fail_point!("remote-storage-download-pre-rename", |_| {
bail!("remote-storage-download-pre-rename failpoint triggered")
Err(DownloadError::Other(anyhow!(
"remote-storage-download-pre-rename failpoint triggered"
)))
});
fs::rename(&temp_file_path, &local_path).await?;
fs::rename(&temp_file_path, &local_path)
.await
.with_context(|| {
format!(
"Could not rename download layer file to {}",
local_path.display(),
)
})
.map_err(DownloadError::Other)?;
fsync_path(&local_path)
.await
.with_context(|| format!("Could not fsync layer file {}", local_path.display(),))?;
.with_context(|| format!("Could not fsync layer file {}", local_path.display(),))
.map_err(DownloadError::Other)?;
tracing::info!("download complete: {}", local_path.display());
@@ -143,14 +180,11 @@ pub async fn list_remote_timelines<'a>(
let tenant_path = conf.timelines_path(&tenant_id);
let tenant_storage_path = conf.remote_path(&tenant_path)?;
let timelines = storage
.list_prefixes(Some(&tenant_storage_path))
.await
.with_context(|| {
format!(
"Failed to list tenant storage path {tenant_storage_path:?} to get remote timelines to download"
)
})?;
let timelines = download_retry(
|| storage.list_prefixes(Some(&tenant_storage_path)),
&format!("list prefixes for {tenant_path:?}"),
)
.await?;
if timelines.is_empty() {
anyhow::bail!("no timelines found on the remote storage")
@@ -209,16 +243,25 @@ pub async fn download_index_part(
.remote_path(&index_part_path)
.map_err(DownloadError::BadInput)?;
let mut index_part_download = storage.download(&part_storage_path).await?;
let index_part_bytes = download_retry(
|| async {
let mut index_part_download = storage.download(&part_storage_path).await?;
let mut index_part_bytes = Vec::new();
tokio::io::copy(
&mut index_part_download.download_stream,
&mut index_part_bytes,
let mut index_part_bytes = Vec::new();
tokio::io::copy(
&mut index_part_download.download_stream,
&mut index_part_bytes,
)
.await
.with_context(|| {
format!("Failed to download an index part into file {index_part_path:?}")
})
.map_err(DownloadError::Other)?;
Ok(index_part_bytes)
},
&format!("download {part_storage_path:?}"),
)
.await
.with_context(|| format!("Failed to download an index part into file {index_part_path:?}"))
.map_err(DownloadError::Other)?;
.await?;
let index_part: IndexPartUnclean = serde_json::from_slice(&index_part_bytes)
.with_context(|| {
@@ -230,3 +273,56 @@ pub async fn download_index_part(
Ok(index_part)
}
///
/// Helper function to handle retries for a download operation.
///
/// Remote operations can fail due to rate limits (IAM, S3), spurious network
/// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times,
/// with backoff.
///
/// (See similar logic for uploads in `perform_upload_task`)
async fn download_retry<T, O, F>(mut op: O, description: &str) -> Result<T, DownloadError>
where
O: FnMut() -> F,
F: Future<Output = Result<T, DownloadError>>,
{
let mut attempts = 0;
loop {
let result = op().await;
match result {
Ok(_) => {
if attempts > 0 {
info!("{description} succeeded after {attempts} retries");
}
return result;
}
// These are "permanent" errors that should not be retried.
Err(DownloadError::BadInput(_)) | Err(DownloadError::NotFound) => {
return result;
}
// Assume that any other failure might be transient, and the operation might
// succeed if we just keep trying.
Err(DownloadError::Other(err)) if attempts < FAILED_DOWNLOAD_WARN_THRESHOLD => {
info!("{description} failed, will retry (attempt {attempts}): {err:#}");
}
Err(DownloadError::Other(err)) if attempts < FAILED_DOWNLOAD_RETRIES => {
warn!("{description} failed, will retry (attempt {attempts}): {err:#}");
}
Err(DownloadError::Other(ref err)) => {
// Operation failed FAILED_DOWNLOAD_RETRIES times. Time to give up.
error!("{description} still failed after {attempts} retries, giving up: {err:?}");
return result;
}
}
// sleep and retry
exponential_backoff(
attempts,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
)
.await;
attempts += 1;
}
}

View File

@@ -1904,6 +1904,28 @@ class NeonPageserver(PgProtocol):
assert not errors
def log_contains(self, pattern: str) -> Optional[str]:
"""Check that the pageserver log contains a line that matches the given regex"""
logfile = open(os.path.join(self.env.repo_dir, "pageserver.log"), "r")
contains_re = re.compile(pattern)
# XXX: Our rust logging machinery buffers the messages, so if you
# call this function immediately after it's been logged, there is
# no guarantee it is already present in the log file. This hasn't
# been a problem in practice, our python tests are not fast enough
# to hit that race condition.
while True:
line = logfile.readline()
if not line:
break
if contains_re.search(line):
# found it!
return line
return None
def append_pageserver_param_overrides(
params_to_update: List[str],

View File

@@ -56,6 +56,11 @@ def test_remote_storage_backup_and_restore(
test_name="test_remote_storage_backup_and_restore",
)
# Exercise retry code path by making all uploads and downloads fail for the
# first time. The retries print INFO-messages to the log; we will check
# that they are present after the test.
neon_env_builder.pageserver_config_override = "test_remote_failures=1"
data_id = 1
data_secret = "very secret secret"
@@ -76,6 +81,7 @@ def test_remote_storage_backup_and_restore(
env.pageserver.allowed_errors.append(
".*Cannot attach tenant .*?, local tenant directory already exists.*"
)
env.pageserver.allowed_errors.append(".*simulated failure of remote operation.*")
pageserver_http = env.pageserver.http_client()
pg = env.postgres.create_start("main")
@@ -87,16 +93,6 @@ def test_remote_storage_backup_and_restore(
checkpoint_numbers = range(1, 3)
# On the first iteration, exercise retry code path by making the uploads
# fail for the first 3 times
action = "3*return->off"
pageserver_http.configure_failpoints(
[
("before-upload-layer", action),
("before-upload-index", action),
]
)
for checkpoint_number in checkpoint_numbers:
with pg.cursor() as cur:
cur.execute(
@@ -118,6 +114,14 @@ def test_remote_storage_backup_and_restore(
wait_for_upload(client, tenant_id, timeline_id, current_lsn)
log.info(f"upload of checkpoint {checkpoint_number} is done")
# Check that we had to retry the uploads
assert env.pageserver.log_contains(
".*failed to perform remote task UploadLayer.*, will retry.*"
)
assert env.pageserver.log_contains(
".*failed to perform remote task UploadMetadata.*, will retry.*"
)
##### Stop the first pageserver instance, erase all its data
env.postgres.stop_all()
env.pageserver.stop()

View File

@@ -32,6 +32,58 @@ def do_gc_target(
log.info("gc http thread returning")
# Basic detach and re-attach test
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_tenant_reattach(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_tenant_reattach",
)
# Exercise retry code path by making all uploads and downloads fail for the
# first time. The retries print INFO-messages to the log; we will check
# that they are present after the test.
neon_env_builder.pageserver_config_override = "test_remote_failures=1"
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
# create new nenant
tenant_id, timeline_id = env.neon_cli.create_tenant()
pg = env.postgres.create_start("main", tenant_id=tenant_id)
with pg.cursor() as cur:
cur.execute("CREATE TABLE t(key int primary key, value text)")
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
# Wait for the all data to be processed by the pageserver and uploaded in remote storage
wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id, current_lsn)
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn)
# Check that we had to retry the uploads
assert env.pageserver.log_contains(
".*failed to perform remote task UploadLayer.*, will retry.*"
)
assert env.pageserver.log_contains(
".*failed to perform remote task UploadMetadata.*, will retry.*"
)
pageserver_http.tenant_detach(tenant_id)
pageserver_http.tenant_attach(tenant_id)
with pg.cursor() as cur:
assert query_scalar(cur, "SELECT count(*) FROM t") == 100000
# Check that we had to retry the downloads
assert env.pageserver.log_contains(".*list prefixes.*failed, will retry.*")
assert env.pageserver.log_contains(".*download.*failed, will retry.*")
def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()

View File

@@ -121,6 +121,11 @@ def test_tenants_attached_after_download(
data_id = 1
data_secret = "very secret secret"
# Exercise retry code path by making all uploads and downloads fail for the
# first time. The retries print INFO-messages to the log; we will check
# that they are present after the test.
neon_env_builder.pageserver_config_override = "test_remote_failures=1"
##### First start, insert secret data and upload it to the remote storage
env = neon_env_builder.init_start()
@@ -159,6 +164,14 @@ def test_tenants_attached_after_download(
wait_for_upload(client, tenant_id, timeline_id, current_lsn)
log.info(f"upload of checkpoint {checkpoint_number} is done")
# Check that we had to retry the uploads
assert env.pageserver.log_contains(
".*failed to perform remote task UploadLayer.*, will retry.*"
)
assert env.pageserver.log_contains(
".*failed to perform remote task UploadMetadata.*, will retry.*"
)
##### Stop the pageserver, erase its layer file to force it being downloaded from S3
env.postgres.stop_all()
@@ -211,6 +224,9 @@ def test_tenants_attached_after_download(
)
assert detail_before["current_physical_size"] == detail_after["current_physical_size"]
# Check that we had to retry the downloads
assert env.pageserver.log_contains(".*download .* succeeded after 1 retries.*")
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
def test_tenant_upgrades_index_json_from_v0(