mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
pageserver: use conditional GET for secondary tenant heatmaps (#9236)
## Problem Secondary tenant heatmaps were always downloaded, even when they hadn't changed. This can be avoided by using a conditional GET request passing the `ETag` of the previous heatmap. ## Summary of changes The `ETag` was already plumbed down into the heatmap downloader, and just needed further plumbing into the remote storage backends. * Add a `DownloadOpts` struct and pass it to `RemoteStorage::download()`. * Add an optional `DownloadOpts::etag` field, which uses a conditional GET and returns `DownloadError::Unmodified` on match.
This commit is contained in:
@@ -14,7 +14,7 @@ use std::time::SystemTime;
|
||||
|
||||
use super::REMOTE_STORAGE_PREFIX_SEPARATOR;
|
||||
use anyhow::Result;
|
||||
use azure_core::request_options::{MaxResults, Metadata, Range};
|
||||
use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range};
|
||||
use azure_core::{Continuable, RetryOptions};
|
||||
use azure_identity::DefaultAzureCredential;
|
||||
use azure_storage::StorageCredentials;
|
||||
@@ -33,10 +33,10 @@ use tracing::debug;
|
||||
use utils::backoff;
|
||||
|
||||
use crate::metrics::{start_measuring_requests, AttemptOutcome, RequestKind};
|
||||
use crate::ListingObject;
|
||||
use crate::{
|
||||
config::AzureConfig, error::Cancelled, ConcurrencyLimiter, Download, DownloadError, Listing,
|
||||
ListingMode, RemotePath, RemoteStorage, StorageMetadata, TimeTravelError, TimeoutOrCancel,
|
||||
config::AzureConfig, error::Cancelled, ConcurrencyLimiter, Download, DownloadError,
|
||||
DownloadOpts, Listing, ListingMode, ListingObject, RemotePath, RemoteStorage, StorageMetadata,
|
||||
TimeTravelError, TimeoutOrCancel,
|
||||
};
|
||||
|
||||
pub struct AzureBlobStorage {
|
||||
@@ -259,6 +259,7 @@ fn to_download_error(error: azure_core::Error) -> DownloadError {
|
||||
if let Some(http_err) = error.as_http_error() {
|
||||
match http_err.status() {
|
||||
StatusCode::NotFound => DownloadError::NotFound,
|
||||
StatusCode::NotModified => DownloadError::Unmodified,
|
||||
StatusCode::BadRequest => DownloadError::BadInput(anyhow::Error::new(error)),
|
||||
_ => DownloadError::Other(anyhow::Error::new(error)),
|
||||
}
|
||||
@@ -484,11 +485,16 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
async fn download(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
opts: &DownloadOpts,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Download, DownloadError> {
|
||||
let blob_client = self.client.blob_client(self.relative_path_to_name(from));
|
||||
|
||||
let builder = blob_client.get();
|
||||
let mut builder = blob_client.get();
|
||||
|
||||
if let Some(ref etag) = opts.etag {
|
||||
builder = builder.if_match(IfMatchCondition::NotMatch(etag.to_string()))
|
||||
}
|
||||
|
||||
self.download_for_builder(builder, cancel).await
|
||||
}
|
||||
|
||||
@@ -5,6 +5,8 @@ pub enum DownloadError {
|
||||
BadInput(anyhow::Error),
|
||||
/// The file was not found in the remote storage.
|
||||
NotFound,
|
||||
/// The caller provided an ETag, and the file was not modified.
|
||||
Unmodified,
|
||||
/// A cancellation token aborted the download, typically during
|
||||
/// tenant detach or process shutdown.
|
||||
Cancelled,
|
||||
@@ -24,6 +26,7 @@ impl std::fmt::Display for DownloadError {
|
||||
write!(f, "Failed to download a remote file due to user input: {e}")
|
||||
}
|
||||
DownloadError::NotFound => write!(f, "No file found for the remote object id given"),
|
||||
DownloadError::Unmodified => write!(f, "File was not modified"),
|
||||
DownloadError::Cancelled => write!(f, "Cancelled, shutting down"),
|
||||
DownloadError::Timeout => write!(f, "timeout"),
|
||||
DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"),
|
||||
@@ -38,7 +41,7 @@ impl DownloadError {
|
||||
pub fn is_permanent(&self) -> bool {
|
||||
use DownloadError::*;
|
||||
match self {
|
||||
BadInput(_) | NotFound | Cancelled => true,
|
||||
BadInput(_) | NotFound | Unmodified | Cancelled => true,
|
||||
Timeout | Other(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -161,6 +161,14 @@ pub struct Listing {
|
||||
pub keys: Vec<ListingObject>,
|
||||
}
|
||||
|
||||
/// Options for downloads. The default value is a plain GET.
|
||||
#[derive(Default)]
|
||||
pub struct DownloadOpts {
|
||||
/// If given, returns [`DownloadError::Unmodified`] if the object still has
|
||||
/// the same ETag (using If-None-Match).
|
||||
pub etag: Option<Etag>,
|
||||
}
|
||||
|
||||
/// Storage (potentially remote) API to manage its state.
|
||||
/// This storage tries to be unaware of any layered repository context,
|
||||
/// providing basic CRUD operations for storage files.
|
||||
@@ -245,6 +253,7 @@ pub trait RemoteStorage: Send + Sync + 'static {
|
||||
async fn download(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
opts: &DownloadOpts,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Download, DownloadError>;
|
||||
|
||||
@@ -401,16 +410,18 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
}
|
||||
}
|
||||
|
||||
/// See [`RemoteStorage::download`]
|
||||
pub async fn download(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
opts: &DownloadOpts,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Download, DownloadError> {
|
||||
match self {
|
||||
Self::LocalFs(s) => s.download(from, cancel).await,
|
||||
Self::AwsS3(s) => s.download(from, cancel).await,
|
||||
Self::AzureBlob(s) => s.download(from, cancel).await,
|
||||
Self::Unreliable(s) => s.download(from, cancel).await,
|
||||
Self::LocalFs(s) => s.download(from, opts, cancel).await,
|
||||
Self::AwsS3(s) => s.download(from, opts, cancel).await,
|
||||
Self::AzureBlob(s) => s.download(from, opts, cancel).await,
|
||||
Self::Unreliable(s) => s.download(from, opts, cancel).await,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -572,7 +583,7 @@ impl GenericRemoteStorage {
|
||||
) -> Result<Download, DownloadError> {
|
||||
match byte_range {
|
||||
Some((start, end)) => self.download_byte_range(from, start, end, cancel).await,
|
||||
None => self.download(from, cancel).await,
|
||||
None => self.download(from, &DownloadOpts::default(), cancel).await,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -23,8 +23,8 @@ use tokio_util::{io::ReaderStream, sync::CancellationToken};
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
|
||||
use crate::{
|
||||
Download, DownloadError, Listing, ListingMode, ListingObject, RemotePath, TimeTravelError,
|
||||
TimeoutOrCancel, REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||
Download, DownloadError, DownloadOpts, Listing, ListingMode, ListingObject, RemotePath,
|
||||
TimeTravelError, TimeoutOrCancel, REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||
};
|
||||
|
||||
use super::{RemoteStorage, StorageMetadata};
|
||||
@@ -494,11 +494,17 @@ impl RemoteStorage for LocalFs {
|
||||
async fn download(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
opts: &DownloadOpts,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Download, DownloadError> {
|
||||
let target_path = from.with_base(&self.storage_root);
|
||||
|
||||
let file_metadata = file_metadata(&target_path).await?;
|
||||
let etag = mock_etag(&file_metadata);
|
||||
|
||||
if opts.etag.as_ref() == Some(&etag) {
|
||||
return Err(DownloadError::Unmodified);
|
||||
}
|
||||
|
||||
let source = ReaderStream::new(
|
||||
fs::OpenOptions::new()
|
||||
@@ -519,7 +525,6 @@ impl RemoteStorage for LocalFs {
|
||||
let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
|
||||
let source = crate::support::DownloadStream::new(cancel_or_timeout, source);
|
||||
|
||||
let etag = mock_etag(&file_metadata);
|
||||
Ok(Download {
|
||||
metadata,
|
||||
last_modified: file_metadata
|
||||
@@ -692,7 +697,7 @@ mod fs_tests {
|
||||
) -> anyhow::Result<String> {
|
||||
let cancel = CancellationToken::new();
|
||||
let download = storage
|
||||
.download(remote_storage_path, &cancel)
|
||||
.download(remote_storage_path, &DownloadOpts::default(), &cancel)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
|
||||
ensure!(
|
||||
@@ -773,8 +778,8 @@ mod fs_tests {
|
||||
"We should upload and download the same contents"
|
||||
);
|
||||
|
||||
let non_existing_path = "somewhere/else";
|
||||
match storage.download(&RemotePath::new(Utf8Path::new(non_existing_path))?, &cancel).await {
|
||||
let non_existing_path = RemotePath::new(Utf8Path::new("somewhere/else"))?;
|
||||
match storage.download(&non_existing_path, &DownloadOpts::default(), &cancel).await {
|
||||
Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
|
||||
other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"),
|
||||
}
|
||||
@@ -1101,7 +1106,13 @@ mod fs_tests {
|
||||
storage.upload(body, len, &path, None, &cancel).await?;
|
||||
}
|
||||
|
||||
let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?;
|
||||
let read = aggregate(
|
||||
storage
|
||||
.download(&path, &DownloadOpts::default(), &cancel)
|
||||
.await?
|
||||
.download_stream,
|
||||
)
|
||||
.await?;
|
||||
assert_eq!(body, read);
|
||||
|
||||
let shorter = Bytes::from_static(b"shorter body");
|
||||
@@ -1112,7 +1123,13 @@ mod fs_tests {
|
||||
storage.upload(body, len, &path, None, &cancel).await?;
|
||||
}
|
||||
|
||||
let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?;
|
||||
let read = aggregate(
|
||||
storage
|
||||
.download(&path, &DownloadOpts::default(), &cancel)
|
||||
.await?
|
||||
.download_stream,
|
||||
)
|
||||
.await?;
|
||||
assert_eq!(shorter, read);
|
||||
Ok(())
|
||||
}
|
||||
@@ -1145,7 +1162,13 @@ mod fs_tests {
|
||||
storage.upload(body, len, &path, None, &cancel).await?;
|
||||
}
|
||||
|
||||
let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?;
|
||||
let read = aggregate(
|
||||
storage
|
||||
.download(&path, &DownloadOpts::default(), &cancel)
|
||||
.await?
|
||||
.download_stream,
|
||||
)
|
||||
.await?;
|
||||
assert_eq!(body, read);
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -28,6 +28,7 @@ use aws_sdk_s3::{
|
||||
Client,
|
||||
};
|
||||
use aws_smithy_async::rt::sleep::TokioSleep;
|
||||
use http_types::StatusCode;
|
||||
|
||||
use aws_smithy_types::{body::SdkBody, DateTime};
|
||||
use aws_smithy_types::{byte_stream::ByteStream, date_time::ConversionError};
|
||||
@@ -44,8 +45,8 @@ use crate::{
|
||||
error::Cancelled,
|
||||
metrics::{start_counting_cancelled_wait, start_measuring_requests},
|
||||
support::PermitCarrying,
|
||||
ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, ListingObject, RemotePath,
|
||||
RemoteStorage, TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE,
|
||||
ConcurrencyLimiter, Download, DownloadError, DownloadOpts, Listing, ListingMode, ListingObject,
|
||||
RemotePath, RemoteStorage, TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE,
|
||||
REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||
};
|
||||
|
||||
@@ -67,6 +68,7 @@ pub struct S3Bucket {
|
||||
struct GetObjectRequest {
|
||||
bucket: String,
|
||||
key: String,
|
||||
etag: Option<String>,
|
||||
range: Option<String>,
|
||||
}
|
||||
impl S3Bucket {
|
||||
@@ -248,13 +250,18 @@ impl S3Bucket {
|
||||
|
||||
let started_at = start_measuring_requests(kind);
|
||||
|
||||
let get_object = self
|
||||
let mut builder = self
|
||||
.client
|
||||
.get_object()
|
||||
.bucket(request.bucket)
|
||||
.key(request.key)
|
||||
.set_range(request.range)
|
||||
.send();
|
||||
.set_range(request.range);
|
||||
|
||||
if let Some(etag) = request.etag {
|
||||
builder = builder.if_none_match(etag);
|
||||
}
|
||||
|
||||
let get_object = builder.send();
|
||||
|
||||
let get_object = tokio::select! {
|
||||
res = get_object => res,
|
||||
@@ -277,6 +284,20 @@ impl S3Bucket {
|
||||
);
|
||||
return Err(DownloadError::NotFound);
|
||||
}
|
||||
Err(SdkError::ServiceError(e))
|
||||
// aws_smithy_runtime_api::http::response::StatusCode isn't
|
||||
// re-exported by any aws crates, so just check the numeric
|
||||
// status against http_types::StatusCode instead of pulling it.
|
||||
if e.raw().status().as_u16() == StatusCode::NotModified =>
|
||||
{
|
||||
// Count an unmodified file as a success.
|
||||
crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
|
||||
kind,
|
||||
AttemptOutcome::Ok,
|
||||
started_at,
|
||||
);
|
||||
return Err(DownloadError::Unmodified);
|
||||
}
|
||||
Err(e) => {
|
||||
crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
|
||||
kind,
|
||||
@@ -773,6 +794,7 @@ impl RemoteStorage for S3Bucket {
|
||||
async fn download(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
opts: &DownloadOpts,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Download, DownloadError> {
|
||||
// if prefix is not none then download file `prefix/from`
|
||||
@@ -781,6 +803,7 @@ impl RemoteStorage for S3Bucket {
|
||||
GetObjectRequest {
|
||||
bucket: self.bucket_name.clone(),
|
||||
key: self.relative_path_to_s3_object(from),
|
||||
etag: opts.etag.as_ref().map(|e| e.to_string()),
|
||||
range: None,
|
||||
},
|
||||
cancel,
|
||||
@@ -807,6 +830,7 @@ impl RemoteStorage for S3Bucket {
|
||||
GetObjectRequest {
|
||||
bucket: self.bucket_name.clone(),
|
||||
key: self.relative_path_to_s3_object(from),
|
||||
etag: None,
|
||||
range,
|
||||
},
|
||||
cancel,
|
||||
|
||||
@@ -12,8 +12,8 @@ use std::{collections::hash_map::Entry, sync::Arc};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::{
|
||||
Download, DownloadError, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorage,
|
||||
StorageMetadata, TimeTravelError,
|
||||
Download, DownloadError, DownloadOpts, GenericRemoteStorage, Listing, ListingMode, RemotePath,
|
||||
RemoteStorage, StorageMetadata, TimeTravelError,
|
||||
};
|
||||
|
||||
pub struct UnreliableWrapper {
|
||||
@@ -167,11 +167,12 @@ impl RemoteStorage for UnreliableWrapper {
|
||||
async fn download(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
opts: &DownloadOpts,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Download, DownloadError> {
|
||||
self.attempt(RemoteOp::Download(from.clone()))
|
||||
.map_err(DownloadError::Other)?;
|
||||
self.inner.download(from, cancel).await
|
||||
self.inner.download(from, opts, cancel).await
|
||||
}
|
||||
|
||||
async fn download_byte_range(
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
use anyhow::Context;
|
||||
use camino::Utf8Path;
|
||||
use futures::StreamExt;
|
||||
use remote_storage::ListingMode;
|
||||
use remote_storage::RemotePath;
|
||||
use remote_storage::{DownloadError, DownloadOpts, ListingMode, RemotePath};
|
||||
use std::sync::Arc;
|
||||
use std::{collections::HashSet, num::NonZeroU32};
|
||||
use test_context::test_context;
|
||||
@@ -284,7 +283,10 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<
|
||||
ctx.client.upload(data, len, &path, None, &cancel).await?;
|
||||
|
||||
// Normal download request
|
||||
let dl = ctx.client.download(&path, &cancel).await?;
|
||||
let dl = ctx
|
||||
.client
|
||||
.download(&path, &DownloadOpts::default(), &cancel)
|
||||
.await?;
|
||||
let buf = download_to_vec(dl).await?;
|
||||
assert_eq!(&buf, &orig);
|
||||
|
||||
@@ -337,6 +339,54 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Tests that conditional downloads work properly, by returning
|
||||
/// DownloadError::Unmodified when the object ETag matches the given ETag.
|
||||
#[test_context(MaybeEnabledStorage)]
|
||||
#[tokio::test]
|
||||
async fn download_conditional(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
|
||||
let MaybeEnabledStorage::Enabled(ctx) = ctx else {
|
||||
return Ok(());
|
||||
};
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
// Create a file.
|
||||
let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str()))?;
|
||||
let data = bytes::Bytes::from_static("foo".as_bytes());
|
||||
let (stream, len) = wrap_stream(data);
|
||||
ctx.client.upload(stream, len, &path, None, &cancel).await?;
|
||||
|
||||
// Download it to obtain its etag.
|
||||
let mut opts = DownloadOpts::default();
|
||||
let download = ctx.client.download(&path, &opts, &cancel).await?;
|
||||
|
||||
// Download with the etag yields DownloadError::Unmodified.
|
||||
opts.etag = Some(download.etag);
|
||||
let result = ctx.client.download(&path, &opts, &cancel).await;
|
||||
assert!(
|
||||
matches!(result, Err(DownloadError::Unmodified)),
|
||||
"expected DownloadError::Unmodified, got {result:?}"
|
||||
);
|
||||
|
||||
// Replace the file contents.
|
||||
let data = bytes::Bytes::from_static("bar".as_bytes());
|
||||
let (stream, len) = wrap_stream(data);
|
||||
ctx.client.upload(stream, len, &path, None, &cancel).await?;
|
||||
|
||||
// A download with the old etag should yield the new file.
|
||||
let download = ctx.client.download(&path, &opts, &cancel).await?;
|
||||
assert_ne!(download.etag, opts.etag.unwrap(), "ETag did not change");
|
||||
|
||||
// A download with the new etag should yield Unmodified again.
|
||||
opts.etag = Some(download.etag);
|
||||
let result = ctx.client.download(&path, &opts, &cancel).await;
|
||||
assert!(
|
||||
matches!(result, Err(DownloadError::Unmodified)),
|
||||
"expected DownloadError::Unmodified, got {result:?}"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test_context(MaybeEnabledStorage)]
|
||||
#[tokio::test]
|
||||
async fn copy_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
|
||||
@@ -364,7 +414,10 @@ async fn copy_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
|
||||
// Normal download request
|
||||
ctx.client.copy_object(&path, &path_dest, &cancel).await?;
|
||||
|
||||
let dl = ctx.client.download(&path_dest, &cancel).await?;
|
||||
let dl = ctx
|
||||
.client
|
||||
.download(&path_dest, &DownloadOpts::default(), &cancel)
|
||||
.await?;
|
||||
let buf = download_to_vec(dl).await?;
|
||||
assert_eq!(&buf, &orig);
|
||||
|
||||
|
||||
@@ -12,8 +12,8 @@ use anyhow::Context;
|
||||
use camino::Utf8Path;
|
||||
use futures_util::StreamExt;
|
||||
use remote_storage::{
|
||||
DownloadError, GenericRemoteStorage, ListingMode, RemotePath, RemoteStorageConfig,
|
||||
RemoteStorageKind, S3Config,
|
||||
DownloadError, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath,
|
||||
RemoteStorageConfig, RemoteStorageKind, S3Config,
|
||||
};
|
||||
use test_context::test_context;
|
||||
use test_context::AsyncTestContext;
|
||||
@@ -121,7 +121,8 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
|
||||
|
||||
// A little check to ensure that our clock is not too far off from the S3 clock
|
||||
{
|
||||
let dl = retry(|| ctx.client.download(&path2, &cancel)).await?;
|
||||
let opts = DownloadOpts::default();
|
||||
let dl = retry(|| ctx.client.download(&path2, &opts, &cancel)).await?;
|
||||
let last_modified = dl.last_modified;
|
||||
let half_wt = WAIT_TIME.mul_f32(0.5);
|
||||
let t0_hwt = t0 + half_wt;
|
||||
@@ -159,7 +160,12 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
|
||||
let t2_files_recovered = list_files(&ctx.client, &cancel).await?;
|
||||
println!("after recovery to t2: {t2_files_recovered:?}");
|
||||
assert_eq!(t2_files, t2_files_recovered);
|
||||
let path2_recovered_t2 = download_to_vec(ctx.client.download(&path2, &cancel).await?).await?;
|
||||
let path2_recovered_t2 = download_to_vec(
|
||||
ctx.client
|
||||
.download(&path2, &DownloadOpts::default(), &cancel)
|
||||
.await?,
|
||||
)
|
||||
.await?;
|
||||
assert_eq!(path2_recovered_t2, new_data.as_bytes());
|
||||
|
||||
// after recovery to t1: path1 is back, path2 has the old content
|
||||
@@ -170,7 +176,12 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
|
||||
let t1_files_recovered = list_files(&ctx.client, &cancel).await?;
|
||||
println!("after recovery to t1: {t1_files_recovered:?}");
|
||||
assert_eq!(t1_files, t1_files_recovered);
|
||||
let path2_recovered_t1 = download_to_vec(ctx.client.download(&path2, &cancel).await?).await?;
|
||||
let path2_recovered_t1 = download_to_vec(
|
||||
ctx.client
|
||||
.download(&path2, &DownloadOpts::default(), &cancel)
|
||||
.await?,
|
||||
)
|
||||
.await?;
|
||||
assert_eq!(path2_recovered_t1, old_data.as_bytes());
|
||||
|
||||
// after recovery to t0: everything is gone except for path1
|
||||
@@ -416,7 +427,7 @@ async fn download_is_timeouted(ctx: &mut MaybeEnabledStorage) {
|
||||
let started_at = std::time::Instant::now();
|
||||
let mut stream = ctx
|
||||
.client
|
||||
.download(&path, &cancel)
|
||||
.download(&path, &DownloadOpts::default(), &cancel)
|
||||
.await
|
||||
.expect("download succeeds")
|
||||
.download_stream;
|
||||
@@ -491,7 +502,7 @@ async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) {
|
||||
{
|
||||
let stream = ctx
|
||||
.client
|
||||
.download(&path, &cancel)
|
||||
.download(&path, &DownloadOpts::default(), &cancel)
|
||||
.await
|
||||
.expect("download succeeds")
|
||||
.download_stream;
|
||||
|
||||
@@ -27,7 +27,7 @@ use crate::tenant::Generation;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
|
||||
use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile};
|
||||
use crate::TEMP_FILE_SUFFIX;
|
||||
use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode, RemotePath};
|
||||
use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath};
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::pausable_failpoint;
|
||||
@@ -153,7 +153,9 @@ async fn download_object<'a>(
|
||||
.with_context(|| format!("create a destination file for layer '{dst_path}'"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
let download = storage.download(src_path, cancel).await?;
|
||||
let download = storage
|
||||
.download(src_path, &DownloadOpts::default(), cancel)
|
||||
.await?;
|
||||
|
||||
pausable_failpoint!("before-downloading-layer-stream-pausable");
|
||||
|
||||
@@ -204,7 +206,9 @@ async fn download_object<'a>(
|
||||
.with_context(|| format!("create a destination file for layer '{dst_path}'"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
let mut download = storage.download(src_path, cancel).await?;
|
||||
let mut download = storage
|
||||
.download(src_path, &DownloadOpts::default(), cancel)
|
||||
.await?;
|
||||
|
||||
pausable_failpoint!("before-downloading-layer-stream-pausable");
|
||||
|
||||
@@ -344,7 +348,9 @@ async fn do_download_index_part(
|
||||
|
||||
let index_part_bytes = download_retry_forever(
|
||||
|| async {
|
||||
let download = storage.download(&remote_path, cancel).await?;
|
||||
let download = storage
|
||||
.download(&remote_path, &DownloadOpts::default(), cancel)
|
||||
.await?;
|
||||
|
||||
let mut bytes = Vec::new();
|
||||
|
||||
@@ -526,10 +532,15 @@ pub(crate) async fn download_initdb_tar_zst(
|
||||
.with_context(|| format!("tempfile creation {temp_path}"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
let download = match storage.download(&remote_path, cancel).await {
|
||||
let download = match storage
|
||||
.download(&remote_path, &DownloadOpts::default(), cancel)
|
||||
.await
|
||||
{
|
||||
Ok(dl) => dl,
|
||||
Err(DownloadError::NotFound) => {
|
||||
storage.download(&remote_preserved_path, cancel).await?
|
||||
storage
|
||||
.download(&remote_preserved_path, &DownloadOpts::default(), cancel)
|
||||
.await?
|
||||
}
|
||||
Err(other) => Err(other)?,
|
||||
};
|
||||
|
||||
@@ -49,7 +49,7 @@ use futures::Future;
|
||||
use metrics::UIntGauge;
|
||||
use pageserver_api::models::SecondaryProgress;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::{DownloadError, Etag, GenericRemoteStorage};
|
||||
use remote_storage::{DownloadError, DownloadOpts, Etag, GenericRemoteStorage};
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info_span, instrument, warn, Instrument};
|
||||
@@ -944,27 +944,26 @@ impl<'a> TenantDownloader<'a> {
|
||||
) -> Result<HeatMapDownload, UpdateError> {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
|
||||
// TODO: pull up etag check into the request, to do a conditional GET rather than
|
||||
// issuing a GET and then maybe ignoring the response body
|
||||
// (https://github.com/neondatabase/neon/issues/6199)
|
||||
tracing::debug!("Downloading heatmap for secondary tenant",);
|
||||
|
||||
let heatmap_path = remote_heatmap_path(tenant_shard_id);
|
||||
let cancel = &self.secondary_state.cancel;
|
||||
let opts = DownloadOpts {
|
||||
etag: prev_etag.cloned(),
|
||||
};
|
||||
|
||||
backoff::retry(
|
||||
|| async {
|
||||
let download = self
|
||||
let download = match self
|
||||
.remote_storage
|
||||
.download(&heatmap_path, cancel)
|
||||
.download(&heatmap_path, &opts, cancel)
|
||||
.await
|
||||
.map_err(UpdateError::from)?;
|
||||
{
|
||||
Ok(download) => download,
|
||||
Err(DownloadError::Unmodified) => return Ok(HeatMapDownload::Unmodified),
|
||||
Err(err) => return Err(err.into()),
|
||||
};
|
||||
|
||||
SECONDARY_MODE.download_heatmap.inc();
|
||||
|
||||
if Some(&download.etag) == prev_etag {
|
||||
Ok(HeatMapDownload::Unmodified)
|
||||
} else {
|
||||
let mut heatmap_bytes = Vec::new();
|
||||
let mut body = tokio_util::io::StreamReader::new(download.download_stream);
|
||||
let _size = tokio::io::copy_buf(&mut body, &mut heatmap_bytes).await?;
|
||||
@@ -973,7 +972,6 @@ impl<'a> TenantDownloader<'a> {
|
||||
last_modified: download.last_modified,
|
||||
bytes: heatmap_bytes,
|
||||
}))
|
||||
}
|
||||
},
|
||||
|e| matches!(e, UpdateError::NoData | UpdateError::Cancelled),
|
||||
FAILED_DOWNLOAD_WARN_THRESHOLD,
|
||||
@@ -984,6 +982,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
.await
|
||||
.ok_or_else(|| UpdateError::Cancelled)
|
||||
.and_then(|x| x)
|
||||
.inspect(|_| SECONDARY_MODE.download_heatmap.inc())
|
||||
}
|
||||
|
||||
/// Download heatmap layers that are not present on local disk, or update their
|
||||
|
||||
@@ -28,8 +28,9 @@ use pageserver::tenant::remote_timeline_client::{remote_tenant_path, remote_time
|
||||
use pageserver::tenant::TENANTS_SEGMENT_NAME;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::{
|
||||
GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorageConfig, RemoteStorageKind,
|
||||
S3Config, DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
|
||||
DownloadOpts, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorageConfig,
|
||||
RemoteStorageKind, S3Config, DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
|
||||
DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
|
||||
};
|
||||
use reqwest::Url;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -488,7 +489,10 @@ async fn download_object_with_retries(
|
||||
let cancel = CancellationToken::new();
|
||||
for trial in 0..MAX_RETRIES {
|
||||
let mut buf = Vec::new();
|
||||
let download = match remote_client.download(key, &cancel).await {
|
||||
let download = match remote_client
|
||||
.download(key, &DownloadOpts::default(), &cancel)
|
||||
.await
|
||||
{
|
||||
Ok(response) => response,
|
||||
Err(e) => {
|
||||
error!("Failed to download object for key {key}: {e}");
|
||||
|
||||
Reference in New Issue
Block a user