diff --git a/Cargo.lock b/Cargo.lock index e565a0bb96..fc254a352a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6711,8 +6711,6 @@ version = "0.1.0" dependencies = [ "anyhow", "async-stream", - "aws-config", - "aws-sdk-s3", "camino", "chrono", "clap", diff --git a/storage_scrubber/Cargo.toml b/storage_scrubber/Cargo.toml index 7f6544b894..1c8b0e9f4a 100644 --- a/storage_scrubber/Cargo.toml +++ b/storage_scrubber/Cargo.toml @@ -5,8 +5,6 @@ edition = "2024" license.workspace = true [dependencies] -aws-config.workspace = true -aws-sdk-s3.workspace = true either.workspace = true anyhow.workspace = true hex.workspace = true diff --git a/storage_scrubber/src/lib.rs b/storage_scrubber/src/lib.rs index 071f0b9756..25a157f108 100644 --- a/storage_scrubber/src/lib.rs +++ b/storage_scrubber/src/lib.rs @@ -12,14 +12,9 @@ pub mod tenant_snapshot; use std::env; use std::fmt::Display; -use std::sync::Arc; use std::time::{Duration, SystemTime}; use anyhow::Context; -use aws_config::retry::{RetryConfigBuilder, RetryMode}; -use aws_sdk_s3::Client; -use aws_sdk_s3::config::Region; -use aws_sdk_s3::error::DisplayErrorContext; use camino::{Utf8Path, Utf8PathBuf}; use clap::ValueEnum; use futures::{Stream, StreamExt}; @@ -28,7 +23,7 @@ use pageserver::tenant::remote_timeline_client::{remote_tenant_path, remote_time use pageserver_api::shard::TenantShardId; use remote_storage::{ DownloadOpts, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorageConfig, - RemoteStorageKind, S3Config, + RemoteStorageKind, VersionId, }; use reqwest::Url; use serde::{Deserialize, Serialize}; @@ -351,21 +346,6 @@ pub fn init_logging(file_name: &str) -> Option { } } -async fn init_s3_client(bucket_region: Region) -> Client { - let mut retry_config_builder = RetryConfigBuilder::new(); - - retry_config_builder - .set_max_attempts(Some(3)) - .set_mode(Some(RetryMode::Adaptive)); - - let config = aws_config::defaults(aws_config::BehaviorVersion::v2024_03_28()) - .region(bucket_region) - .retry_config(retry_config_builder.build()) - .load() - .await; - Client::new(&config) -} - fn default_prefix_in_bucket(node_kind: NodeKind) -> &'static str { match node_kind { NodeKind::Pageserver => "pageserver/v1/", @@ -385,23 +365,6 @@ fn make_root_target(desc_str: String, prefix_in_bucket: String, node_kind: NodeK } } -async fn init_remote_s3( - bucket_config: S3Config, - node_kind: NodeKind, -) -> anyhow::Result<(Arc, RootTarget)> { - let bucket_region = Region::new(bucket_config.bucket_region); - let s3_client = Arc::new(init_s3_client(bucket_region).await); - let default_prefix = default_prefix_in_bucket(node_kind).to_string(); - - let s3_root = make_root_target( - bucket_config.bucket_name, - bucket_config.prefix_in_bucket.unwrap_or(default_prefix), - node_kind, - ); - - Ok((s3_client, s3_root)) -} - async fn init_remote( mut storage_config: BucketConfig, node_kind: NodeKind, @@ -499,7 +462,7 @@ async fn list_objects_with_retries( remote_client.bucket_name().unwrap_or_default(), s3_target.prefix_in_bucket, s3_target.delimiter, - DisplayErrorContext(e), + e, ); let backoff_time = 1 << trial.min(5); tokio::time::sleep(Duration::from_secs(backoff_time)).await; @@ -549,14 +512,18 @@ async fn download_object_with_retries( anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times") } -async fn download_object_to_file_s3( - s3_client: &Client, - bucket_name: &str, - key: &str, - version_id: Option<&str>, +async fn download_object_to_file( + remote_storage: &GenericRemoteStorage, + key: &RemotePath, + version_id: Option, local_path: &Utf8Path, ) -> anyhow::Result<()> { + let opts = DownloadOpts { + version_id: version_id.clone(), + ..Default::default() + }; let tmp_path = Utf8PathBuf::from(format!("{local_path}.tmp")); + let cancel = CancellationToken::new(); for _ in 0..MAX_RETRIES { tokio::fs::remove_file(&tmp_path) .await @@ -566,28 +533,24 @@ async fn download_object_to_file_s3( .await .context("Opening output file")?; - let request = s3_client.get_object().bucket(bucket_name).key(key); + let res = remote_storage.download(key, &opts, &cancel).await; - let request = match version_id { - Some(version_id) => request.version_id(version_id), - None => request, - }; - - let response_stream = match request.send().await { + let download = match res { Ok(response) => response, Err(e) => { error!( - "Failed to download object for key {key} version {}: {e:#}", - version_id.unwrap_or("") + "Failed to download object for key {key} version {:?}: {e:#}", + &version_id.as_ref().unwrap_or(&VersionId(String::new())) ); tokio::time::sleep(Duration::from_secs(1)).await; continue; } }; - let mut read_stream = response_stream.body.into_async_read(); + //response_stream.download_stream - tokio::io::copy(&mut read_stream, &mut file).await?; + let mut body = tokio_util::io::StreamReader::new(download.download_stream); + tokio::io::copy(&mut body, &mut file).await?; tokio::fs::rename(&tmp_path, local_path).await?; return Ok(()); diff --git a/storage_scrubber/src/tenant_snapshot.rs b/storage_scrubber/src/tenant_snapshot.rs index e17409c20e..24231e32fc 100644 --- a/storage_scrubber/src/tenant_snapshot.rs +++ b/storage_scrubber/src/tenant_snapshot.rs @@ -1,31 +1,30 @@ use std::collections::HashMap; -use std::sync::Arc; use anyhow::Context; use async_stream::stream; -use aws_sdk_s3::Client; use camino::Utf8PathBuf; use futures::{StreamExt, TryStreamExt}; use pageserver::tenant::IndexPart; use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata; +use pageserver::tenant::remote_timeline_client::remote_layer_path; use pageserver::tenant::storage_layer::LayerName; use pageserver_api::shard::TenantShardId; -use remote_storage::{GenericRemoteStorage, S3Config}; +use remote_storage::GenericRemoteStorage; +use tokio_util::sync::CancellationToken; use utils::generation::Generation; use utils::id::TenantId; use crate::checks::{BlobDataParseResult, RemoteTimelineBlobData, list_timeline_blobs}; use crate::metadata_stream::{stream_tenant_shards, stream_tenant_timelines}; use crate::{ - BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, download_object_to_file_s3, - init_remote, init_remote_s3, + BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, download_object_to_file, init_remote, }; pub struct SnapshotDownloader { - s3_client: Arc, - s3_root: RootTarget, + remote_client: GenericRemoteStorage, + #[allow(dead_code)] + target: RootTarget, bucket_config: BucketConfig, - bucket_config_s3: S3Config, tenant_id: TenantId, output_path: Utf8PathBuf, concurrency: usize, @@ -38,17 +37,13 @@ impl SnapshotDownloader { output_path: Utf8PathBuf, concurrency: usize, ) -> anyhow::Result { - let bucket_config_s3 = match &bucket_config.0.storage { - remote_storage::RemoteStorageKind::AwsS3(config) => config.clone(), - _ => panic!("only S3 configuration is supported for snapshot downloading"), - }; - let (s3_client, s3_root) = - init_remote_s3(bucket_config_s3.clone(), NodeKind::Pageserver).await?; + let (remote_client, target) = + init_remote(bucket_config.clone(), NodeKind::Pageserver).await?; + Ok(Self { - s3_client, - s3_root, + remote_client, + target, bucket_config, - bucket_config_s3, tenant_id, output_path, concurrency, @@ -61,6 +56,7 @@ impl SnapshotDownloader { layer_name: LayerName, layer_metadata: LayerFileMetadata, ) -> anyhow::Result<(LayerName, LayerFileMetadata)> { + let cancel = CancellationToken::new(); // Note this is local as in a local copy of S3 data, not local as in the pageserver's local format. They use // different layer names (remote-style has the generation suffix) let local_path = self.output_path.join(format!( @@ -82,30 +78,27 @@ impl SnapshotDownloader { } else { tracing::debug!("{} requires download...", local_path); - let timeline_root = self.s3_root.timeline_root(&ttid); - let remote_layer_path = format!( - "{}{}{}", - timeline_root.prefix_in_bucket, - layer_name, - layer_metadata.generation.get_suffix() + let remote_path = remote_layer_path( + &ttid.tenant_shard_id.tenant_id, + &ttid.timeline_id, + layer_metadata.shard, + &layer_name, + layer_metadata.generation, ); + let mode = remote_storage::ListingMode::NoDelimiter; // List versions: the object might be deleted. let versions = self - .s3_client - .list_object_versions() - .bucket(self.bucket_config_s3.bucket_name.clone()) - .prefix(&remote_layer_path) - .send() + .remote_client + .list_versions(Some(&remote_path), mode, None, &cancel) .await?; - let Some(version) = versions.versions.as_ref().and_then(|v| v.first()) else { - return Err(anyhow::anyhow!("No versions found for {remote_layer_path}")); + let Some(version) = versions.versions.first() else { + return Err(anyhow::anyhow!("No versions found for {remote_path}")); }; - download_object_to_file_s3( - &self.s3_client, - &self.bucket_config_s3.bucket_name, - &remote_layer_path, - version.version_id.as_deref(), + download_object_to_file( + &self.remote_client, + &remote_path, + version.version_id().cloned(), &local_path, ) .await?;