s3_scrubber: add tenant-snapshot command

This commit is contained in:
John Spray
2024-04-19 17:39:05 +01:00
parent 98be8b9430
commit e8cc41c858
6 changed files with 345 additions and 3 deletions

1
Cargo.lock generated
View File

@@ -5084,6 +5084,7 @@ dependencies = [
"aws-smithy-async",
"bincode",
"bytes",
"camino",
"chrono",
"clap",
"crc32c",

View File

@@ -25,6 +25,7 @@ async-stream.workspace = true
tokio-stream.workspace = true
futures-util.workspace = true
itertools.workspace = true
camino.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
chrono = { workspace = true, default-features = false, features = ["clock", "serde"] }

View File

@@ -5,6 +5,7 @@ pub mod cloud_admin_api;
pub mod garbage;
pub mod metadata_stream;
pub mod scan_metadata;
pub mod tenant_snapshot;
use std::env;
use std::fmt::Display;
@@ -23,12 +24,12 @@ use aws_sdk_s3::config::{AsyncSleep, Region, SharedAsyncSleep};
use aws_sdk_s3::{Client, Config};
use aws_smithy_async::rt::sleep::TokioSleep;
use camino::{Utf8Path, Utf8PathBuf};
use clap::ValueEnum;
use pageserver::tenant::TENANTS_SEGMENT_NAME;
use pageserver_api::shard::TenantShardId;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use std::io::IsTerminal;
use tokio::io::AsyncReadExt;
use tracing::error;
use tracing_appender::non_blocking::WorkerGuard;
@@ -240,7 +241,6 @@ pub fn init_logging(file_name: &str) -> WorkerGuard {
.with_ansi(false)
.with_writer(file_writer);
let stderr_logs = fmt::Layer::new()
.with_ansi(std::io::stderr().is_terminal())
.with_target(false)
.with_writer(std::io::stderr);
tracing_subscriber::registry()
@@ -396,3 +396,48 @@ async fn download_object_with_retries(
anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
}
async fn download_object_to_file(
s3_client: &Client,
bucket_name: &str,
key: &str,
version_id: Option<&str>,
local_path: &Utf8Path,
) -> anyhow::Result<()> {
let tmp_path = Utf8PathBuf::from(format!("{local_path}.tmp"));
for _ in 0..MAX_RETRIES {
tokio::fs::remove_file(&tmp_path).await.ok();
let mut file = tokio::fs::File::create(&tmp_path)
.await
.context("Opening output file")?;
let request = s3_client.get_object().bucket(bucket_name).key(key);
let request = match version_id {
Some(version_id) => request.version_id(version_id),
None => request,
};
let response_stream = match request.send().await {
Ok(response) => response,
Err(e) => {
error!(
"Failed to download object for key {key} version {}: {e:#}",
version_id.unwrap_or("")
);
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
};
let mut read_stream = response_stream.body.into_async_read();
tokio::io::copy(&mut read_stream, &mut file).await?;
tokio::fs::rename(&tmp_path, local_path).await?;
return Ok(());
}
anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
}

View File

@@ -1,9 +1,12 @@
use camino::Utf8PathBuf;
use pageserver_api::shard::TenantShardId;
use s3_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode};
use s3_scrubber::scan_metadata::scan_metadata;
use s3_scrubber::tenant_snapshot::SnapshotDownloader;
use s3_scrubber::{init_logging, BucketConfig, ConsoleConfig, NodeKind, TraversingDepth};
use clap::{Parser, Subcommand};
use utils::id::TenantId;
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
@@ -38,6 +41,12 @@ enum Command {
#[arg(long = "tenant-id", num_args = 0..)]
tenant_ids: Vec<TenantShardId>,
},
TenantSnapshot {
#[arg(long = "tenant-id", num_args = 0..)]
tenant_id: TenantId,
#[arg(short, long)]
output_path: Utf8PathBuf,
},
}
#[tokio::main]
@@ -50,6 +59,7 @@ async fn main() -> anyhow::Result<()> {
Command::ScanMetadata { .. } => "scan",
Command::FindGarbage { .. } => "find-garbage",
Command::PurgeGarbage { .. } => "purge-garbage",
Command::TenantSnapshot { .. } => "tenant-snapshot",
};
let _guard = init_logging(&format!(
"{}_{}_{}_{}.log",
@@ -102,5 +112,12 @@ async fn main() -> anyhow::Result<()> {
Command::PurgeGarbage { input_path, mode } => {
purge_garbage(input_path, mode, !cli.delete).await
}
Command::TenantSnapshot {
tenant_id,
output_path,
} => {
let downloader = SnapshotDownloader::new(bucket_config, tenant_id, output_path)?;
downloader.download().await
}
}
}

View File

@@ -5,7 +5,7 @@ use tokio_stream::Stream;
use crate::{list_objects_with_retries, RootTarget, S3Target, TenantShardTimelineId};
use pageserver_api::shard::TenantShardId;
use utils::id::TimelineId;
use utils::id::{TenantId, TimelineId};
/// Given an S3 bucket, output a stream of TenantIds discovered via ListObjectsv2
pub fn stream_tenants<'a>(
@@ -45,6 +45,62 @@ pub fn stream_tenants<'a>(
}
}
pub async fn stream_tenant_shards<'a>(
s3_client: &'a Client,
target: &'a RootTarget,
tenant_id: TenantId,
) -> anyhow::Result<impl Stream<Item = Result<TenantShardId, anyhow::Error>> + 'a> {
let mut tenant_shard_ids: Vec<Result<TenantShardId, anyhow::Error>> = Vec::new();
let mut continuation_token = None;
let shards_target = target.tenant_root(&TenantShardId::unsharded(tenant_id));
loop {
tracing::info!("Listing in {}", shards_target.prefix_in_bucket);
let fetch_response =
list_objects_with_retries(s3_client, &shards_target, continuation_token.clone()).await;
let fetch_response = match fetch_response {
Err(e) => {
tenant_shard_ids.push(Err(e));
break;
}
Ok(r) => r,
};
let new_entry_ids = fetch_response
.common_prefixes()
.iter()
.filter_map(|prefix| prefix.prefix())
.filter_map(|prefix| -> Option<&str> {
prefix
.strip_prefix(&target.tenants_root().prefix_in_bucket)?
.strip_suffix('/')
})
.map(|entry_id_str| {
let first_part = entry_id_str.split('/').next().unwrap();
first_part
.parse::<TenantShardId>()
.with_context(|| format!("Incorrect entry id str: {first_part}"))
});
for i in new_entry_ids {
tenant_shard_ids.push(i);
}
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
Ok(stream! {
for i in tenant_shard_ids {
let id = i?;
yield Ok(id);
}
})
}
/// Given a TenantShardId, output a stream of the timelines within that tenant, discovered
/// using ListObjectsv2. The listing is done before the stream is built, so that this
/// function can be used to generate concurrency on a stream using buffer_unordered.

View File

@@ -0,0 +1,222 @@
use std::sync::Arc;
use crate::checks::{list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData};
use crate::metadata_stream::{stream_tenant_shards, stream_tenant_timelines};
use crate::{
download_object_to_file, init_remote, BucketConfig, NodeKind, RootTarget, S3Target,
TenantShardTimelineId,
};
use anyhow::Context;
use async_stream::stream;
use aws_sdk_s3::Client;
use camino::Utf8PathBuf;
use futures::{StreamExt, TryStreamExt};
use pageserver::tenant::remote_timeline_client::index::IndexLayerMetadata;
use pageserver::tenant::storage_layer::LayerFileName;
use pageserver::tenant::IndexPart;
use utils::generation::Generation;
use utils::id::TenantId;
pub struct SnapshotDownloader {
s3_client: Arc<Client>,
s3_root: RootTarget,
bucket_config: BucketConfig,
tenant_id: TenantId,
output_path: Utf8PathBuf,
}
impl SnapshotDownloader {
pub fn new(
bucket_config: BucketConfig,
tenant_id: TenantId,
output_path: Utf8PathBuf,
) -> anyhow::Result<Self> {
let (s3_client, s3_root) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
Ok(Self {
s3_client,
s3_root,
bucket_config,
tenant_id,
output_path,
})
}
async fn download_layer(
&self,
local_path: Utf8PathBuf,
remote_timeline_path: S3Target,
layer_name: LayerFileName,
layer_metadata: IndexLayerMetadata,
) -> anyhow::Result<(LayerFileName, IndexLayerMetadata)> {
// Assumption: we always write layer files atomically, and layer files are immutable. Therefore if the file
// already exists on local disk, we assume it is fully correct and skip it.
if tokio::fs::try_exists(&local_path).await? {
tracing::debug!("{} already exists", local_path);
return Ok((layer_name, layer_metadata));
} else {
tracing::debug!("{} requires download...", local_path);
let remote_layer_path = format!(
"{}{}{}",
remote_timeline_path.prefix_in_bucket,
layer_name.file_name(),
layer_metadata.generation.get_suffix()
);
// List versions: the object might be deleted.
let versions = self
.s3_client
.list_object_versions()
.bucket(self.bucket_config.bucket.clone())
.prefix(&remote_layer_path)
.send()
.await?;
let Some(versions) = versions.versions else {
return Err(anyhow::anyhow!("No versions found for {remote_layer_path}"));
};
let Some(version) = versions.first() else {
return Err(anyhow::anyhow!(
"Empty versions found for {remote_layer_path}"
));
};
download_object_to_file(
&self.s3_client,
&self.bucket_config.bucket,
&remote_layer_path,
version.version_id.as_deref(),
&local_path,
)
.await?;
tracing::debug!("Downloaded successfully to {local_path}");
}
Ok((layer_name, layer_metadata))
}
async fn download_timeline(
&self,
ttid: TenantShardTimelineId,
index_part: IndexPart,
index_part_generation: Generation,
) -> anyhow::Result<()> {
let timeline_root = self.s3_root.timeline_root(&ttid);
let layer_count = index_part.layer_metadata.len();
tracing::info!(
"Downloading {} layers for timeline {ttid}...",
index_part.layer_metadata.len()
);
tokio::fs::create_dir_all(self.output_path.join(format!(
"{}/timelines/{}",
ttid.tenant_shard_id, ttid.timeline_id
)))
.await?;
let index_bytes = serde_json::to_string(&index_part).unwrap();
let layers_stream = stream! {
for (layer_name, layer_metadata) in index_part.layer_metadata {
// Note this is local as in a local copy of S3 data, not local as in the pageserver's local format. They use
// different layer names (remote-style has the generation suffix)
let local_path = self.output_path.join(format!(
"{}/timelines/{}/{}{}",
ttid.tenant_shard_id,
ttid.timeline_id,
layer_name.file_name(),
layer_metadata.generation.get_suffix()
));
yield self.download_layer(local_path, timeline_root.clone(), layer_name, layer_metadata);
}
};
let layer_results = layers_stream.buffered(8);
let mut layer_results = std::pin::pin!(layer_results);
let mut err = None;
let mut download_count = 0;
while let Some(i) = layer_results.next().await {
download_count += 1;
match i {
Ok((layer_name, layer_metadata)) => {
tracing::info!(
"[{download_count}/{layer_count}] OK: {} bytes {ttid} {}",
layer_metadata.file_size,
layer_name.file_name()
);
}
Err(e) => {
// Warn and continue: we will download what we can
tracing::warn!("Download error: {e}");
err = Some(e);
}
}
}
// Write index last, once all the layers it references are downloaded
let local_index_path = self.output_path.join(format!(
"{}/timelines/{}/index_part.json{}",
ttid.tenant_shard_id,
ttid.timeline_id,
index_part_generation.get_suffix()
));
tokio::fs::write(&local_index_path, index_bytes)
.await
.context("writing index")?;
if let Some(e) = err {
tracing::warn!("Some errors occurred, last error: {e}");
Err(e)
} else {
Ok(())
}
}
pub async fn download(&self) -> anyhow::Result<()> {
let (s3_client, target) = init_remote(self.bucket_config.clone(), NodeKind::Pageserver)?;
// Generate a stream of TenantShardId
let shards = stream_tenant_shards(&s3_client, &target, self.tenant_id).await?;
let mut shards = std::pin::pin!(shards);
while let Some(shard) = shards.next().await {
let shard = shard?;
// Generate a stream of TenantTimelineId
let timelines = stream_tenant_timelines(&s3_client, &self.s3_root, shard).await?;
// Generate a stream of S3TimelineBlobData
async fn load_timeline_index(
s3_client: &Client,
target: &RootTarget,
ttid: TenantShardTimelineId,
) -> anyhow::Result<(TenantShardTimelineId, S3TimelineBlobData)> {
let data = list_timeline_blobs(s3_client, ttid, target).await?;
Ok((ttid, data))
}
let timelines = timelines.map_ok(|ttid| load_timeline_index(&s3_client, &target, ttid));
let mut timelines = std::pin::pin!(timelines.try_buffered(8));
while let Some(i) = timelines.next().await {
let (ttid, data) = i?;
match data.blob_data {
BlobDataParseResult::Parsed {
index_part,
index_part_generation,
s3_layers: _,
} => {
self.download_timeline(ttid, index_part, index_part_generation)
.await
.context("Downloading timeline")?;
}
BlobDataParseResult::Relic => {}
BlobDataParseResult::Incorrect(_) => {
tracing::error!("Bad metadata in timeline {ttid}");
}
};
}
}
Ok(())
}
}