From e8cc41c858115800d0f135b8152af3f01d6a22bc Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 19 Apr 2024 17:39:05 +0100 Subject: [PATCH] s3_scrubber: add tenant-snapshot command --- Cargo.lock | 1 + s3_scrubber/Cargo.toml | 1 + s3_scrubber/src/lib.rs | 49 ++++++- s3_scrubber/src/main.rs | 17 +++ s3_scrubber/src/metadata_stream.rs | 58 +++++++- s3_scrubber/src/tenant_snapshot.rs | 222 +++++++++++++++++++++++++++++ 6 files changed, 345 insertions(+), 3 deletions(-) create mode 100644 s3_scrubber/src/tenant_snapshot.rs diff --git a/Cargo.lock b/Cargo.lock index 6faf4b72f0..a140daac7d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5084,6 +5084,7 @@ dependencies = [ "aws-smithy-async", "bincode", "bytes", + "camino", "chrono", "clap", "crc32c", diff --git a/s3_scrubber/Cargo.toml b/s3_scrubber/Cargo.toml index 4d136472e0..0ee9112010 100644 --- a/s3_scrubber/Cargo.toml +++ b/s3_scrubber/Cargo.toml @@ -25,6 +25,7 @@ async-stream.workspace = true tokio-stream.workspace = true futures-util.workspace = true itertools.workspace = true +camino.workspace = true tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } chrono = { workspace = true, default-features = false, features = ["clock", "serde"] } diff --git a/s3_scrubber/src/lib.rs b/s3_scrubber/src/lib.rs index d2842877d0..879dcede02 100644 --- a/s3_scrubber/src/lib.rs +++ b/s3_scrubber/src/lib.rs @@ -5,6 +5,7 @@ pub mod cloud_admin_api; pub mod garbage; pub mod metadata_stream; pub mod scan_metadata; +pub mod tenant_snapshot; use std::env; use std::fmt::Display; @@ -23,12 +24,12 @@ use aws_sdk_s3::config::{AsyncSleep, Region, SharedAsyncSleep}; use aws_sdk_s3::{Client, Config}; use aws_smithy_async::rt::sleep::TokioSleep; +use camino::{Utf8Path, Utf8PathBuf}; use clap::ValueEnum; use pageserver::tenant::TENANTS_SEGMENT_NAME; use pageserver_api::shard::TenantShardId; use reqwest::Url; use serde::{Deserialize, Serialize}; -use std::io::IsTerminal; use tokio::io::AsyncReadExt; use tracing::error; use tracing_appender::non_blocking::WorkerGuard; @@ -240,7 +241,6 @@ pub fn init_logging(file_name: &str) -> WorkerGuard { .with_ansi(false) .with_writer(file_writer); let stderr_logs = fmt::Layer::new() - .with_ansi(std::io::stderr().is_terminal()) .with_target(false) .with_writer(std::io::stderr); tracing_subscriber::registry() @@ -396,3 +396,48 @@ async fn download_object_with_retries( anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times") } + +async fn download_object_to_file( + s3_client: &Client, + bucket_name: &str, + key: &str, + version_id: Option<&str>, + local_path: &Utf8Path, +) -> anyhow::Result<()> { + let tmp_path = Utf8PathBuf::from(format!("{local_path}.tmp")); + for _ in 0..MAX_RETRIES { + tokio::fs::remove_file(&tmp_path).await.ok(); + + let mut file = tokio::fs::File::create(&tmp_path) + .await + .context("Opening output file")?; + + let request = s3_client.get_object().bucket(bucket_name).key(key); + + let request = match version_id { + Some(version_id) => request.version_id(version_id), + None => request, + }; + + let response_stream = match request.send().await { + Ok(response) => response, + Err(e) => { + error!( + "Failed to download object for key {key} version {}: {e:#}", + version_id.unwrap_or("") + ); + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + } + }; + + let mut read_stream = response_stream.body.into_async_read(); + + tokio::io::copy(&mut read_stream, &mut file).await?; + + tokio::fs::rename(&tmp_path, local_path).await?; + return Ok(()); + } + + anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times") +} diff --git a/s3_scrubber/src/main.rs b/s3_scrubber/src/main.rs index 957213856b..8fe514ab97 100644 --- a/s3_scrubber/src/main.rs +++ b/s3_scrubber/src/main.rs @@ -1,9 +1,12 @@ +use camino::Utf8PathBuf; use pageserver_api::shard::TenantShardId; use s3_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode}; use s3_scrubber::scan_metadata::scan_metadata; +use s3_scrubber::tenant_snapshot::SnapshotDownloader; use s3_scrubber::{init_logging, BucketConfig, ConsoleConfig, NodeKind, TraversingDepth}; use clap::{Parser, Subcommand}; +use utils::id::TenantId; #[derive(Parser)] #[command(author, version, about, long_about = None)] @@ -38,6 +41,12 @@ enum Command { #[arg(long = "tenant-id", num_args = 0..)] tenant_ids: Vec, }, + TenantSnapshot { + #[arg(long = "tenant-id", num_args = 0..)] + tenant_id: TenantId, + #[arg(short, long)] + output_path: Utf8PathBuf, + }, } #[tokio::main] @@ -50,6 +59,7 @@ async fn main() -> anyhow::Result<()> { Command::ScanMetadata { .. } => "scan", Command::FindGarbage { .. } => "find-garbage", Command::PurgeGarbage { .. } => "purge-garbage", + Command::TenantSnapshot { .. } => "tenant-snapshot", }; let _guard = init_logging(&format!( "{}_{}_{}_{}.log", @@ -102,5 +112,12 @@ async fn main() -> anyhow::Result<()> { Command::PurgeGarbage { input_path, mode } => { purge_garbage(input_path, mode, !cli.delete).await } + Command::TenantSnapshot { + tenant_id, + output_path, + } => { + let downloader = SnapshotDownloader::new(bucket_config, tenant_id, output_path)?; + downloader.download().await + } } } diff --git a/s3_scrubber/src/metadata_stream.rs b/s3_scrubber/src/metadata_stream.rs index 073f37f319..a60cc9f337 100644 --- a/s3_scrubber/src/metadata_stream.rs +++ b/s3_scrubber/src/metadata_stream.rs @@ -5,7 +5,7 @@ use tokio_stream::Stream; use crate::{list_objects_with_retries, RootTarget, S3Target, TenantShardTimelineId}; use pageserver_api::shard::TenantShardId; -use utils::id::TimelineId; +use utils::id::{TenantId, TimelineId}; /// Given an S3 bucket, output a stream of TenantIds discovered via ListObjectsv2 pub fn stream_tenants<'a>( @@ -45,6 +45,62 @@ pub fn stream_tenants<'a>( } } +pub async fn stream_tenant_shards<'a>( + s3_client: &'a Client, + target: &'a RootTarget, + tenant_id: TenantId, +) -> anyhow::Result> + 'a> { + let mut tenant_shard_ids: Vec> = Vec::new(); + let mut continuation_token = None; + let shards_target = target.tenant_root(&TenantShardId::unsharded(tenant_id)); + + loop { + tracing::info!("Listing in {}", shards_target.prefix_in_bucket); + let fetch_response = + list_objects_with_retries(s3_client, &shards_target, continuation_token.clone()).await; + let fetch_response = match fetch_response { + Err(e) => { + tenant_shard_ids.push(Err(e)); + break; + } + Ok(r) => r, + }; + + let new_entry_ids = fetch_response + .common_prefixes() + .iter() + .filter_map(|prefix| prefix.prefix()) + .filter_map(|prefix| -> Option<&str> { + prefix + .strip_prefix(&target.tenants_root().prefix_in_bucket)? + .strip_suffix('/') + }) + .map(|entry_id_str| { + let first_part = entry_id_str.split('/').next().unwrap(); + + first_part + .parse::() + .with_context(|| format!("Incorrect entry id str: {first_part}")) + }); + + for i in new_entry_ids { + tenant_shard_ids.push(i); + } + + match fetch_response.next_continuation_token { + Some(new_token) => continuation_token = Some(new_token), + None => break, + } + } + + Ok(stream! { + for i in tenant_shard_ids { + let id = i?; + yield Ok(id); + } + }) +} + /// Given a TenantShardId, output a stream of the timelines within that tenant, discovered /// using ListObjectsv2. The listing is done before the stream is built, so that this /// function can be used to generate concurrency on a stream using buffer_unordered. diff --git a/s3_scrubber/src/tenant_snapshot.rs b/s3_scrubber/src/tenant_snapshot.rs new file mode 100644 index 0000000000..9175108f96 --- /dev/null +++ b/s3_scrubber/src/tenant_snapshot.rs @@ -0,0 +1,222 @@ +use std::sync::Arc; + +use crate::checks::{list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData}; +use crate::metadata_stream::{stream_tenant_shards, stream_tenant_timelines}; +use crate::{ + download_object_to_file, init_remote, BucketConfig, NodeKind, RootTarget, S3Target, + TenantShardTimelineId, +}; +use anyhow::Context; +use async_stream::stream; +use aws_sdk_s3::Client; +use camino::Utf8PathBuf; +use futures::{StreamExt, TryStreamExt}; +use pageserver::tenant::remote_timeline_client::index::IndexLayerMetadata; +use pageserver::tenant::storage_layer::LayerFileName; +use pageserver::tenant::IndexPart; +use utils::generation::Generation; +use utils::id::TenantId; + +pub struct SnapshotDownloader { + s3_client: Arc, + s3_root: RootTarget, + bucket_config: BucketConfig, + tenant_id: TenantId, + output_path: Utf8PathBuf, +} + +impl SnapshotDownloader { + pub fn new( + bucket_config: BucketConfig, + tenant_id: TenantId, + output_path: Utf8PathBuf, + ) -> anyhow::Result { + let (s3_client, s3_root) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?; + Ok(Self { + s3_client, + s3_root, + bucket_config, + tenant_id, + output_path, + }) + } + + async fn download_layer( + &self, + local_path: Utf8PathBuf, + remote_timeline_path: S3Target, + layer_name: LayerFileName, + layer_metadata: IndexLayerMetadata, + ) -> anyhow::Result<(LayerFileName, IndexLayerMetadata)> { + // Assumption: we always write layer files atomically, and layer files are immutable. Therefore if the file + // already exists on local disk, we assume it is fully correct and skip it. + if tokio::fs::try_exists(&local_path).await? { + tracing::debug!("{} already exists", local_path); + return Ok((layer_name, layer_metadata)); + } else { + tracing::debug!("{} requires download...", local_path); + let remote_layer_path = format!( + "{}{}{}", + remote_timeline_path.prefix_in_bucket, + layer_name.file_name(), + layer_metadata.generation.get_suffix() + ); + + // List versions: the object might be deleted. + let versions = self + .s3_client + .list_object_versions() + .bucket(self.bucket_config.bucket.clone()) + .prefix(&remote_layer_path) + .send() + .await?; + let Some(versions) = versions.versions else { + return Err(anyhow::anyhow!("No versions found for {remote_layer_path}")); + }; + let Some(version) = versions.first() else { + return Err(anyhow::anyhow!( + "Empty versions found for {remote_layer_path}" + )); + }; + download_object_to_file( + &self.s3_client, + &self.bucket_config.bucket, + &remote_layer_path, + version.version_id.as_deref(), + &local_path, + ) + .await?; + + tracing::debug!("Downloaded successfully to {local_path}"); + } + + Ok((layer_name, layer_metadata)) + } + + async fn download_timeline( + &self, + ttid: TenantShardTimelineId, + index_part: IndexPart, + index_part_generation: Generation, + ) -> anyhow::Result<()> { + let timeline_root = self.s3_root.timeline_root(&ttid); + + let layer_count = index_part.layer_metadata.len(); + tracing::info!( + "Downloading {} layers for timeline {ttid}...", + index_part.layer_metadata.len() + ); + + tokio::fs::create_dir_all(self.output_path.join(format!( + "{}/timelines/{}", + ttid.tenant_shard_id, ttid.timeline_id + ))) + .await?; + + let index_bytes = serde_json::to_string(&index_part).unwrap(); + let layers_stream = stream! { + for (layer_name, layer_metadata) in index_part.layer_metadata { + // Note this is local as in a local copy of S3 data, not local as in the pageserver's local format. They use + // different layer names (remote-style has the generation suffix) + let local_path = self.output_path.join(format!( + "{}/timelines/{}/{}{}", + ttid.tenant_shard_id, + ttid.timeline_id, + layer_name.file_name(), + layer_metadata.generation.get_suffix() + )); + + yield self.download_layer(local_path, timeline_root.clone(), layer_name, layer_metadata); + } + }; + + let layer_results = layers_stream.buffered(8); + let mut layer_results = std::pin::pin!(layer_results); + + let mut err = None; + let mut download_count = 0; + while let Some(i) = layer_results.next().await { + download_count += 1; + match i { + Ok((layer_name, layer_metadata)) => { + tracing::info!( + "[{download_count}/{layer_count}] OK: {} bytes {ttid} {}", + layer_metadata.file_size, + layer_name.file_name() + ); + } + Err(e) => { + // Warn and continue: we will download what we can + tracing::warn!("Download error: {e}"); + err = Some(e); + } + } + } + + // Write index last, once all the layers it references are downloaded + let local_index_path = self.output_path.join(format!( + "{}/timelines/{}/index_part.json{}", + ttid.tenant_shard_id, + ttid.timeline_id, + index_part_generation.get_suffix() + )); + tokio::fs::write(&local_index_path, index_bytes) + .await + .context("writing index")?; + + if let Some(e) = err { + tracing::warn!("Some errors occurred, last error: {e}"); + Err(e) + } else { + Ok(()) + } + } + + pub async fn download(&self) -> anyhow::Result<()> { + let (s3_client, target) = init_remote(self.bucket_config.clone(), NodeKind::Pageserver)?; + + // Generate a stream of TenantShardId + let shards = stream_tenant_shards(&s3_client, &target, self.tenant_id).await?; + let mut shards = std::pin::pin!(shards); + + while let Some(shard) = shards.next().await { + let shard = shard?; + + // Generate a stream of TenantTimelineId + let timelines = stream_tenant_timelines(&s3_client, &self.s3_root, shard).await?; + + // Generate a stream of S3TimelineBlobData + async fn load_timeline_index( + s3_client: &Client, + target: &RootTarget, + ttid: TenantShardTimelineId, + ) -> anyhow::Result<(TenantShardTimelineId, S3TimelineBlobData)> { + let data = list_timeline_blobs(s3_client, ttid, target).await?; + Ok((ttid, data)) + } + let timelines = timelines.map_ok(|ttid| load_timeline_index(&s3_client, &target, ttid)); + let mut timelines = std::pin::pin!(timelines.try_buffered(8)); + + while let Some(i) = timelines.next().await { + let (ttid, data) = i?; + match data.blob_data { + BlobDataParseResult::Parsed { + index_part, + index_part_generation, + s3_layers: _, + } => { + self.download_timeline(ttid, index_part, index_part_generation) + .await + .context("Downloading timeline")?; + } + BlobDataParseResult::Relic => {} + BlobDataParseResult::Incorrect(_) => { + tracing::error!("Bad metadata in timeline {ttid}"); + } + }; + } + } + + Ok(()) + } +}