Reuse remote_client from the SnapshotDownloader instead of recreating in download function (#11812)

## Problem
At the moment, remote_client and target are recreated in download
function. We could reuse it from SnapshotDownloader instance. This isn't
a problem per se, just a quality of life improvement but it caught my
attention when we were trying out snapshot downloading in one of the
older version and ran into a curious case of s3 clients behaving in two
different manners. One client that used `force_path_style` and other one
didn't.

**Logs from this run:**
```
2025-05-02T12:56:22.384626Z DEBUG /data/snappie/2739e7da34e625e3934ef0b76fa12483/timelines/d44b831adb0a6ba96792dc3a5cc30910/000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000014E8F20-00000000014E8F99-00000001 requires download...
2025-05-02T12:56:22.384689Z DEBUG invoke{service=s3 operation=ListObjectVersions sdk_invocation_id=7315885}:apply_configuration: timeout settings for this operation: TimeoutConfig { connect_timeout: Set(3.1s), read_timeout: Disabled, operation_timeout: Disabled, operation_attempt_timeout: Disabled }
2025-05-02T12:56:22.384730Z DEBUG invoke{service=s3 operation=ListObjectVersions sdk_invocation_id=7315885}:try_op: entering 'serialization' phase
2025-05-02T12:56:22.384784Z DEBUG invoke{service=s3 operation=ListObjectVersions sdk_invocation_id=7315885}:try_op: entering 'before transmit' phase
2025-05-02T12:56:22.384813Z DEBUG invoke{service=s3 operation=ListObjectVersions sdk_invocation_id=7315885}:try_op: retry strategy has OKed initial request
2025-05-02T12:56:22.384841Z DEBUG invoke{service=s3 operation=ListObjectVersions sdk_invocation_id=7315885}:try_op: beginning attempt #1
2025-05-02T12:56:22.384870Z DEBUG invoke{service=s3 operation=ListObjectVersions sdk_invocation_id=7315885}:try_op:try_attempt: resolving endpoint endpoint_params=EndpointResolverParams(TypeErasedBox[!Clone]:Params { bucket: Some("bucket"), region: Some("eu-north-1"), use_fips: false, use_dual_stack: false, endpoint: Some("https://s3.self-hosted.company.com"), force_path_style: false, accelerate: false, use_global_endpoint: false, use_object_lambda_endpoint: None, key: None, prefix: Some("/pageserver/tenants/2739e7da34e625e3934ef0b76fa12483/timelines/d44b831adb0a6ba96792dc3a5cc30910/000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000014E8F20-00000000014E8F99-00000001"), copy_source: None, disable_access_points: None, disable_multi_region_access_points: false, use_arn_region: None, use_s3_express_control_endpoint: None, disable_s3_express_session_auth: None }) endpoint_prefix=None
2025-05-02T12:56:22.384979Z DEBUG invoke{service=s3 operation=ListObjectVersions sdk_invocation_id=7315885}:try_op:try_attempt: will use endpoint Endpoint { url: "https://neon.s3.self-hosted.company.com", headers: {}, properties: {"authSchemes": Array([Object({"signingRegion": String("eu-north-1"), "disableDoubleEncoding": Bool(true), "name": String("sigv4"), "signingName": String("s3")})])} }
2025-05-02T12:56:22.385042Z DEBUG invoke{service=s3 operation=ListObjectVersions sdk_invocation_id=7315885}:try_op:try_attempt:lazy_load_identity:provide_credentials{provider=default_chain}: loaded credentials provider=Environment
2025-05-02T12:56:22.385066Z DEBUG invoke{service=s3 operation=ListObjectVersions sdk_invocation_id=7315885}:try_op:try_attempt:lazy_load_identity: identity cache miss occurred; added new identity (took 35.958µs) new_expiration=2025-05-02T13:11:22.385028Z valid_for=899.999961437s partition=IdentityCachePartition(5)
2025-05-02T12:56:22.385090Z DEBUG invoke{service=s3 operation=ListObjectVersions sdk_invocation_id=7315885}:try_op:try_attempt: loaded identity
2025-05-02T12:56:22.385162Z DEBUG invoke{service=s3 operation=ListObjectVersions sdk_invocation_id=7315885}:try_op:try_attempt: entering 'transmit' phase
2025-05-02T12:56:22.385211Z DEBUG invoke{service=s3 operation=ListObjectVersions sdk_invocation_id=7315885}:try_op:try_attempt: new TCP connector created in 361ns
2025-05-02T12:56:22.385288Z DEBUG resolving host="neon.s3.self-hosted.company.com"
2025-05-02T12:56:22.390796Z DEBUG invoke{service=s3 operation=ListObjectVersions sdk_invocation_id=7315885}:try_op:try_attempt: encountered orchestrator error; halting
```
This commit is contained in:
Santosh Pingale
2025-05-08 16:09:15 +02:00
committed by GitHub
parent 42d93031a1
commit 659366060d

View File

@@ -24,7 +24,6 @@ pub struct SnapshotDownloader {
remote_client: GenericRemoteStorage,
#[allow(dead_code)]
target: RootTarget,
bucket_config: BucketConfig,
tenant_id: TenantId,
output_path: Utf8PathBuf,
concurrency: usize,
@@ -43,7 +42,6 @@ impl SnapshotDownloader {
Ok(Self {
remote_client,
target,
bucket_config,
tenant_id,
output_path,
concurrency,
@@ -218,11 +216,9 @@ impl SnapshotDownloader {
}
pub async fn download(&self) -> anyhow::Result<()> {
let (remote_client, target) =
init_remote(self.bucket_config.clone(), NodeKind::Pageserver).await?;
// Generate a stream of TenantShardId
let shards = stream_tenant_shards(&remote_client, &target, self.tenant_id).await?;
let shards =
stream_tenant_shards(&self.remote_client, &self.target, self.tenant_id).await?;
let shards: Vec<TenantShardId> = shards.try_collect().await?;
// Only read from shards that have the highest count: avoids redundantly downloading
@@ -240,7 +236,8 @@ impl SnapshotDownloader {
for shard in shards.into_iter().filter(|s| s.shard_count == shard_count) {
// Generate a stream of TenantTimelineId
let timelines = stream_tenant_timelines(&remote_client, &target, shard).await?;
let timelines =
stream_tenant_timelines(&self.remote_client, &self.target, shard).await?;
// Generate a stream of S3TimelineBlobData
async fn load_timeline_index(
@@ -251,8 +248,8 @@ impl SnapshotDownloader {
let data = list_timeline_blobs(remote_client, ttid, target).await?;
Ok((ttid, data))
}
let timelines =
timelines.map_ok(|ttid| load_timeline_index(&remote_client, &target, ttid));
let timelines = timelines
.map_ok(|ttid| load_timeline_index(&self.remote_client, &self.target, ttid));
let mut timelines = std::pin::pin!(timelines.try_buffered(8));
while let Some(i) = timelines.next().await {