mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 20:12:54 +00:00
pageserver: basic cancel/timeout for remote storage operations (#6097)
## Problem Various places in remote storage were not subject to a timeout (thereby stuck TCP connections could hold things up), and did not respect a cancellation token (so things like timeline deletion or tenant detach would have to wait arbitrarily long). ## Summary of changes - Add download_cancellable and upload_cancellable helpers, and use them in all the places we wait for remote storage operations (with the exception of initdb downloads, where it would not have been safe). - Add a cancellation token arg to `download_retry`. - Use cancellation token args in various places that were missing one per #5066 Closes: #5066 Why is this only "basic" handling? - Doesn't express difference between shutdown and errors in return types, to avoid refactoring all the places that use an anyhow::Error (these should all eventually return a more structured error type) - Implements timeouts on top of remote storage, rather than within it: this means that operations hitting their timeout will lose their semaphore permit and thereby go to the back of the queue for their retry. - Doing a nicer job is tracked in https://github.com/neondatabase/neon/issues/6096
This commit is contained in:
@@ -2,8 +2,11 @@ use std::time::Duration;
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum TimeoutCancellableError {
|
||||
#[error("Timed out")]
|
||||
Timeout,
|
||||
#[error("Cancelled")]
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
|
||||
@@ -3059,6 +3059,7 @@ impl Tenant {
|
||||
storage,
|
||||
&self.tenant_shard_id,
|
||||
&existing_initdb_timeline_id,
|
||||
&self.cancel,
|
||||
)
|
||||
.await
|
||||
.context("download initdb tar")?;
|
||||
@@ -3099,6 +3100,7 @@ impl Tenant {
|
||||
&timeline_id,
|
||||
pgdata_zstd.try_clone().await?,
|
||||
tar_zst_size,
|
||||
&self.cancel,
|
||||
)
|
||||
.await
|
||||
},
|
||||
@@ -3106,9 +3108,7 @@ impl Tenant {
|
||||
3,
|
||||
u32::MAX,
|
||||
"persist_initdb_tar_zst",
|
||||
backoff::Cancel::new(self.cancel.clone(), || {
|
||||
anyhow::anyhow!("initdb upload cancelled")
|
||||
}),
|
||||
backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Cancelled")),
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -71,6 +71,7 @@ async fn create_remote_delete_mark(
|
||||
conf: &PageServerConf,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
let remote_mark_path = remote_tenant_delete_mark_path(conf, tenant_shard_id)?;
|
||||
|
||||
@@ -87,8 +88,7 @@ async fn create_remote_delete_mark(
|
||||
FAILED_UPLOAD_WARN_THRESHOLD,
|
||||
FAILED_REMOTE_OP_RETRIES,
|
||||
"mark_upload",
|
||||
// TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
|
||||
backoff::Cancel::new(CancellationToken::new(), || unreachable!()),
|
||||
backoff::Cancel::new(cancel.clone(), || anyhow::anyhow!("Cancelled")),
|
||||
)
|
||||
.await
|
||||
.context("mark_upload")?;
|
||||
@@ -170,6 +170,7 @@ async fn remove_tenant_remote_delete_mark(
|
||||
conf: &PageServerConf,
|
||||
remote_storage: Option<&GenericRemoteStorage>,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
if let Some(remote_storage) = remote_storage {
|
||||
let path = remote_tenant_delete_mark_path(conf, tenant_shard_id)?;
|
||||
@@ -179,8 +180,7 @@ async fn remove_tenant_remote_delete_mark(
|
||||
FAILED_UPLOAD_WARN_THRESHOLD,
|
||||
FAILED_REMOTE_OP_RETRIES,
|
||||
"remove_tenant_remote_delete_mark",
|
||||
// TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
|
||||
backoff::Cancel::new(CancellationToken::new(), || unreachable!()),
|
||||
backoff::Cancel::new(cancel.clone(), || anyhow::anyhow!("Cancelled")),
|
||||
)
|
||||
.await
|
||||
.context("remove_tenant_remote_delete_mark")?;
|
||||
@@ -322,9 +322,15 @@ impl DeleteTenantFlow {
|
||||
// Though sounds scary, different mark name?
|
||||
// Detach currently uses remove_dir_all so in case of a crash we can end up in a weird state.
|
||||
if let Some(remote_storage) = &remote_storage {
|
||||
create_remote_delete_mark(conf, remote_storage, &tenant.tenant_shard_id)
|
||||
.await
|
||||
.context("remote_mark")?
|
||||
create_remote_delete_mark(
|
||||
conf,
|
||||
remote_storage,
|
||||
&tenant.tenant_shard_id,
|
||||
// Can't use tenant.cancel, it's already shut down. TODO: wire in an appropriate token
|
||||
&CancellationToken::new(),
|
||||
)
|
||||
.await
|
||||
.context("remote_mark")?
|
||||
}
|
||||
|
||||
fail::fail_point!("tenant-delete-before-create-local-mark", |_| {
|
||||
@@ -524,8 +530,14 @@ impl DeleteTenantFlow {
|
||||
.context("timelines dir not empty")?;
|
||||
}
|
||||
|
||||
remove_tenant_remote_delete_mark(conf, remote_storage.as_ref(), &tenant.tenant_shard_id)
|
||||
.await?;
|
||||
remove_tenant_remote_delete_mark(
|
||||
conf,
|
||||
remote_storage.as_ref(),
|
||||
&tenant.tenant_shard_id,
|
||||
// Can't use tenant.cancel, it's already shut down. TODO: wire in an appropriate token
|
||||
&CancellationToken::new(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
fail::fail_point!("tenant-delete-before-cleanup-remaining-fs-traces", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
|
||||
@@ -196,10 +196,12 @@ pub(crate) use upload::upload_initdb_dir;
|
||||
use utils::backoff::{
|
||||
self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
|
||||
};
|
||||
use utils::timeout::{timeout_cancellable, TimeoutCancellableError};
|
||||
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
|
||||
use std::ops::DerefMut;
|
||||
@@ -316,6 +318,47 @@ pub struct RemoteTimelineClient {
|
||||
storage_impl: GenericRemoteStorage,
|
||||
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
/// This timeout is intended to deal with hangs in lower layers, e.g. stuck TCP flows. It is not
|
||||
/// intended to be snappy enough for prompt shutdown, as we have a CancellationToken for that.
|
||||
const UPLOAD_TIMEOUT: Duration = Duration::from_secs(120);
|
||||
const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(120);
|
||||
|
||||
/// Wrapper for timeout_cancellable that flattens result and converts TimeoutCancellableError to anyhow.
|
||||
///
|
||||
/// This is a convenience for the various upload functions. In future
|
||||
/// the anyhow::Error result should be replaced with a more structured type that
|
||||
/// enables callers to avoid handling shutdown as an error.
|
||||
async fn upload_cancellable<F>(cancel: &CancellationToken, future: F) -> anyhow::Result<()>
|
||||
where
|
||||
F: std::future::Future<Output = anyhow::Result<()>>,
|
||||
{
|
||||
match timeout_cancellable(UPLOAD_TIMEOUT, cancel, future).await {
|
||||
Ok(Ok(())) => Ok(()),
|
||||
Ok(Err(e)) => Err(e),
|
||||
Err(TimeoutCancellableError::Timeout) => Err(anyhow::anyhow!("Timeout")),
|
||||
Err(TimeoutCancellableError::Cancelled) => Err(anyhow::anyhow!("Shutting down")),
|
||||
}
|
||||
}
|
||||
/// Wrapper for timeout_cancellable that flattens result and converts TimeoutCancellableError to DownloaDError.
|
||||
async fn download_cancellable<F, R>(
|
||||
cancel: &CancellationToken,
|
||||
future: F,
|
||||
) -> Result<R, DownloadError>
|
||||
where
|
||||
F: std::future::Future<Output = Result<R, DownloadError>>,
|
||||
{
|
||||
match timeout_cancellable(DOWNLOAD_TIMEOUT, cancel, future).await {
|
||||
Ok(Ok(r)) => Ok(r),
|
||||
Ok(Err(e)) => Err(e),
|
||||
Err(TimeoutCancellableError::Timeout) => {
|
||||
Err(DownloadError::Other(anyhow::anyhow!("Timed out")))
|
||||
}
|
||||
Err(TimeoutCancellableError::Cancelled) => Err(DownloadError::Cancelled),
|
||||
}
|
||||
}
|
||||
|
||||
impl RemoteTimelineClient {
|
||||
@@ -351,6 +394,7 @@ impl RemoteTimelineClient {
|
||||
&tenant_shard_id,
|
||||
&timeline_id,
|
||||
)),
|
||||
cancel: CancellationToken::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -501,6 +545,7 @@ impl RemoteTimelineClient {
|
||||
&self,
|
||||
layer_file_name: &LayerFileName,
|
||||
layer_metadata: &LayerFileMetadata,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<u64> {
|
||||
let downloaded_size = {
|
||||
let _unfinished_gauge_guard = self.metrics.call_begin(
|
||||
@@ -517,6 +562,7 @@ impl RemoteTimelineClient {
|
||||
self.timeline_id,
|
||||
layer_file_name,
|
||||
layer_metadata,
|
||||
cancel,
|
||||
)
|
||||
.measure_remote_op(
|
||||
self.tenant_shard_id.tenant_id,
|
||||
@@ -971,6 +1017,7 @@ impl RemoteTimelineClient {
|
||||
&self.timeline_id,
|
||||
self.generation,
|
||||
&index_part_with_deleted_at,
|
||||
&self.cancel,
|
||||
)
|
||||
},
|
||||
|_e| false,
|
||||
@@ -980,8 +1027,7 @@ impl RemoteTimelineClient {
|
||||
// when executed as part of tenant deletion this happens in the background
|
||||
2,
|
||||
"persist_index_part_with_deleted_flag",
|
||||
// TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
|
||||
backoff::Cancel::new(CancellationToken::new(), || unreachable!()),
|
||||
backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Cancelled")),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1281,6 +1327,7 @@ impl RemoteTimelineClient {
|
||||
path,
|
||||
layer_metadata,
|
||||
self.generation,
|
||||
&self.cancel,
|
||||
)
|
||||
.measure_remote_op(
|
||||
self.tenant_shard_id.tenant_id,
|
||||
@@ -1307,6 +1354,7 @@ impl RemoteTimelineClient {
|
||||
&self.timeline_id,
|
||||
self.generation,
|
||||
index_part,
|
||||
&self.cancel,
|
||||
)
|
||||
.measure_remote_op(
|
||||
self.tenant_shard_id.tenant_id,
|
||||
@@ -1828,6 +1876,7 @@ mod tests {
|
||||
&self.harness.tenant_shard_id,
|
||||
&TIMELINE_ID,
|
||||
)),
|
||||
cancel: CancellationToken::new(),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -5,7 +5,6 @@
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::future::Future;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
@@ -14,13 +13,17 @@ use tokio::fs::{self, File, OpenOptions};
|
||||
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::warn;
|
||||
use utils::timeout::timeout_cancellable;
|
||||
use utils::{backoff, crashsafe};
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
|
||||
use crate::tenant::remote_timeline_client::{
|
||||
download_cancellable, remote_layer_path, remote_timelines_path, DOWNLOAD_TIMEOUT,
|
||||
};
|
||||
use crate::tenant::storage_layer::LayerFileName;
|
||||
use crate::tenant::timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::tenant::Generation;
|
||||
use crate::virtual_file::on_fatal_io_error;
|
||||
use crate::TEMP_FILE_SUFFIX;
|
||||
use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode};
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
@@ -32,8 +35,6 @@ use super::{
|
||||
FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, INITDB_PATH,
|
||||
};
|
||||
|
||||
static MAX_DOWNLOAD_DURATION: Duration = Duration::from_secs(120);
|
||||
|
||||
///
|
||||
/// If 'metadata' is given, we will validate that the downloaded file's size matches that
|
||||
/// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
|
||||
@@ -46,6 +47,7 @@ pub async fn download_layer_file<'a>(
|
||||
timeline_id: TimelineId,
|
||||
layer_file_name: &'a LayerFileName,
|
||||
layer_metadata: &'a LayerFileMetadata,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<u64, DownloadError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
@@ -73,14 +75,18 @@ pub async fn download_layer_file<'a>(
|
||||
// If pageserver crashes the temp file will be deleted on startup and re-downloaded.
|
||||
let temp_file_path = path_with_suffix_extension(&local_path, TEMP_DOWNLOAD_EXTENSION);
|
||||
|
||||
let cancel_inner = cancel.clone();
|
||||
let (mut destination_file, bytes_amount) = download_retry(
|
||||
|| async {
|
||||
let destination_file = tokio::fs::File::create(&temp_file_path)
|
||||
.await
|
||||
.with_context(|| format!("create a destination file for layer '{temp_file_path}'"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
let download = storage
|
||||
.download(&remote_path)
|
||||
|
||||
// Cancellation safety: it is safe to cancel this future, because it isn't writing to a local
|
||||
// file: the write to local file doesn't start until after the request header is returned
|
||||
// and we start draining the body stream below
|
||||
let download = download_cancellable(&cancel_inner, storage.download(&remote_path))
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
@@ -94,12 +100,33 @@ pub async fn download_layer_file<'a>(
|
||||
|
||||
let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
|
||||
|
||||
let bytes_amount = tokio::time::timeout(
|
||||
MAX_DOWNLOAD_DURATION,
|
||||
// Cancellation safety: it is safe to cancel this future because it is writing into a temporary file,
|
||||
// and we will unlink the temporary file if there is an error. This unlink is important because we
|
||||
// are in a retry loop, and we wouldn't want to leave behind a rogue write I/O to a file that
|
||||
// we will imminiently try and write to again.
|
||||
let bytes_amount: u64 = match timeout_cancellable(
|
||||
DOWNLOAD_TIMEOUT,
|
||||
&cancel_inner,
|
||||
tokio::io::copy_buf(&mut reader, &mut destination_file),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| DownloadError::Other(anyhow::anyhow!("Timed out {:?}", e)))?
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"download layer at remote path '{remote_path:?}' into file {temp_file_path:?}"
|
||||
)
|
||||
})
|
||||
.map_err(DownloadError::Other)?
|
||||
{
|
||||
Ok(b) => Ok(b),
|
||||
Err(e) => {
|
||||
// Remove incomplete files: on restart Timeline would do this anyway, but we must
|
||||
// do it here for the retry case.
|
||||
if let Err(e) = tokio::fs::remove_file(&temp_file_path).await {
|
||||
on_fatal_io_error(&e, &format!("Removing temporary file {temp_file_path}"));
|
||||
}
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"download layer at remote path '{remote_path:?}' into file {temp_file_path:?}"
|
||||
@@ -112,6 +139,7 @@ pub async fn download_layer_file<'a>(
|
||||
Ok((destination_file, bytes_amount))
|
||||
},
|
||||
&format!("download {remote_path:?}"),
|
||||
cancel,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -188,8 +216,14 @@ pub async fn list_remote_timelines(
|
||||
anyhow::bail!("storage-sync-list-remote-timelines");
|
||||
});
|
||||
|
||||
let cancel_inner = cancel.clone();
|
||||
let listing = download_retry_forever(
|
||||
|| storage.list(Some(&remote_path), ListingMode::WithDelimiter),
|
||||
|| {
|
||||
download_cancellable(
|
||||
&cancel_inner,
|
||||
storage.list(Some(&remote_path), ListingMode::WithDelimiter),
|
||||
)
|
||||
},
|
||||
&format!("list timelines for {tenant_shard_id}"),
|
||||
cancel,
|
||||
)
|
||||
@@ -230,9 +264,13 @@ async fn do_download_index_part(
|
||||
|
||||
let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
|
||||
|
||||
let cancel_inner = cancel.clone();
|
||||
let index_part_bytes = download_retry_forever(
|
||||
|| async {
|
||||
let index_part_download = storage.download(&remote_path).await?;
|
||||
// Cancellation: if is safe to cancel this future because we're just downloading into
|
||||
// a memory buffer, not touching local disk.
|
||||
let index_part_download =
|
||||
download_cancellable(&cancel_inner, storage.download(&remote_path)).await?;
|
||||
|
||||
let mut index_part_bytes = Vec::new();
|
||||
let mut stream = std::pin::pin!(index_part_download.download_stream);
|
||||
@@ -347,10 +385,7 @@ pub(super) async fn download_index_part(
|
||||
FAILED_DOWNLOAD_WARN_THRESHOLD,
|
||||
FAILED_REMOTE_OP_RETRIES,
|
||||
"listing index_part files",
|
||||
// TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
|
||||
backoff::Cancel::new(CancellationToken::new(), || -> anyhow::Error {
|
||||
unreachable!()
|
||||
}),
|
||||
backoff::Cancel::new(cancel.clone(), || anyhow::anyhow!("Cancelled")),
|
||||
)
|
||||
.await
|
||||
.map_err(DownloadError::Other)?;
|
||||
@@ -389,6 +424,7 @@ pub(crate) async fn download_initdb_tar_zst(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(Utf8PathBuf, File), DownloadError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
@@ -406,6 +442,8 @@ pub(crate) async fn download_initdb_tar_zst(
|
||||
"{INITDB_PATH}.download-{timeline_id}.{TEMP_FILE_SUFFIX}"
|
||||
));
|
||||
|
||||
let cancel_inner = cancel.clone();
|
||||
|
||||
let file = download_retry(
|
||||
|| async {
|
||||
let file = OpenOptions::new()
|
||||
@@ -418,10 +456,14 @@ pub(crate) async fn download_initdb_tar_zst(
|
||||
.with_context(|| format!("tempfile creation {temp_path}"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
let download = storage.download(&remote_path).await?;
|
||||
let download =
|
||||
download_cancellable(&cancel_inner, storage.download(&remote_path)).await?;
|
||||
let mut download = tokio_util::io::StreamReader::new(download.download_stream);
|
||||
let mut writer = tokio::io::BufWriter::with_capacity(8 * 1024, file);
|
||||
|
||||
// TODO: this consumption of the response body should be subject to timeout + cancellation, but
|
||||
// not without thinking carefully about how to recover safely from cancelling a write to
|
||||
// local storage (e.g. by writing into a temp file as we do in download_layer)
|
||||
tokio::io::copy_buf(&mut download, &mut writer)
|
||||
.await
|
||||
.with_context(|| format!("download initdb.tar.zst at {remote_path:?}"))
|
||||
@@ -437,6 +479,7 @@ pub(crate) async fn download_initdb_tar_zst(
|
||||
Ok(file)
|
||||
},
|
||||
&format!("download {remote_path}"),
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
@@ -460,7 +503,11 @@ pub(crate) async fn download_initdb_tar_zst(
|
||||
/// with backoff.
|
||||
///
|
||||
/// (See similar logic for uploads in `perform_upload_task`)
|
||||
async fn download_retry<T, O, F>(op: O, description: &str) -> Result<T, DownloadError>
|
||||
async fn download_retry<T, O, F>(
|
||||
op: O,
|
||||
description: &str,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<T, DownloadError>
|
||||
where
|
||||
O: FnMut() -> F,
|
||||
F: Future<Output = Result<T, DownloadError>>,
|
||||
@@ -471,10 +518,7 @@ where
|
||||
FAILED_DOWNLOAD_WARN_THRESHOLD,
|
||||
FAILED_REMOTE_OP_RETRIES,
|
||||
description,
|
||||
// TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
|
||||
backoff::Cancel::new(CancellationToken::new(), || -> DownloadError {
|
||||
unreachable!()
|
||||
}),
|
||||
backoff::Cancel::new(cancel.clone(), || DownloadError::Cancelled),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -7,12 +7,14 @@ use pageserver_api::shard::TenantShardId;
|
||||
use std::io::{ErrorKind, SeekFrom};
|
||||
use tokio::fs::{self, File};
|
||||
use tokio::io::AsyncSeekExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use super::Generation;
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
tenant::remote_timeline_client::{
|
||||
index::IndexPart, remote_index_path, remote_initdb_archive_path, remote_path,
|
||||
upload_cancellable,
|
||||
},
|
||||
};
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
@@ -29,6 +31,7 @@ pub(super) async fn upload_index_part<'a>(
|
||||
timeline_id: &TimelineId,
|
||||
generation: Generation,
|
||||
index_part: &'a IndexPart,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
tracing::trace!("uploading new index part");
|
||||
|
||||
@@ -44,14 +47,16 @@ pub(super) async fn upload_index_part<'a>(
|
||||
let index_part_bytes = bytes::Bytes::from(index_part_bytes);
|
||||
|
||||
let remote_path = remote_index_path(tenant_shard_id, timeline_id, generation);
|
||||
storage
|
||||
.upload_storage_object(
|
||||
upload_cancellable(
|
||||
cancel,
|
||||
storage.upload_storage_object(
|
||||
futures::stream::once(futures::future::ready(Ok(index_part_bytes))),
|
||||
index_part_size,
|
||||
&remote_path,
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("upload index part for '{tenant_shard_id} / {timeline_id}'"))
|
||||
),
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("upload index part for '{tenant_shard_id} / {timeline_id}'"))
|
||||
}
|
||||
|
||||
/// Attempts to upload given layer files.
|
||||
@@ -64,6 +69,7 @@ pub(super) async fn upload_timeline_layer<'a>(
|
||||
source_path: &'a Utf8Path,
|
||||
known_metadata: &'a LayerFileMetadata,
|
||||
generation: Generation,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
fail_point!("before-upload-layer", |_| {
|
||||
bail!("failpoint before-upload-layer")
|
||||
@@ -107,8 +113,7 @@ pub(super) async fn upload_timeline_layer<'a>(
|
||||
|
||||
let reader = tokio_util::io::ReaderStream::with_capacity(source_file, super::BUFFER_SIZE);
|
||||
|
||||
storage
|
||||
.upload(reader, fs_size, &storage_path, None)
|
||||
upload_cancellable(cancel, storage.upload(reader, fs_size, &storage_path, None))
|
||||
.await
|
||||
.with_context(|| format!("upload layer from local path '{source_path}'"))?;
|
||||
|
||||
@@ -122,6 +127,7 @@ pub(crate) async fn upload_initdb_dir(
|
||||
timeline_id: &TimelineId,
|
||||
mut initdb_tar_zst: File,
|
||||
size: u64,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
tracing::trace!("uploading initdb dir");
|
||||
|
||||
@@ -131,8 +137,10 @@ pub(crate) async fn upload_initdb_dir(
|
||||
let file = tokio_util::io::ReaderStream::with_capacity(initdb_tar_zst, super::BUFFER_SIZE);
|
||||
|
||||
let remote_path = remote_initdb_archive_path(tenant_id, timeline_id);
|
||||
storage
|
||||
.upload_storage_object(file, size as usize, &remote_path)
|
||||
.await
|
||||
.with_context(|| format!("upload initdb dir for '{tenant_id} / {timeline_id}'"))
|
||||
upload_cancellable(
|
||||
cancel,
|
||||
storage.upload_storage_object(file, size as usize, &remote_path),
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("upload initdb dir for '{tenant_id} / {timeline_id}'"))
|
||||
}
|
||||
|
||||
@@ -862,6 +862,7 @@ impl LayerInner {
|
||||
let result = client.download_layer_file(
|
||||
&this.desc.filename(),
|
||||
&this.metadata(),
|
||||
&crate::task_mgr::shutdown_token()
|
||||
)
|
||||
.await;
|
||||
|
||||
|
||||
@@ -79,6 +79,9 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = (
|
||||
# AWS S3 may emit 500 errors for keys in a DeleteObjects response: we retry these
|
||||
# and it is not a failure of our code when it happens.
|
||||
".*DeleteObjects.*We encountered an internal error. Please try again.*",
|
||||
# During shutdown, DownloadError::Cancelled may be logged as an error. Cleaning this
|
||||
# up is tracked in https://github.com/neondatabase/neon/issues/6096
|
||||
".*Cancelled, shutting down.*",
|
||||
)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user