mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-12 07:52:55 +00:00
s3_scrubber: add tenant-snapshot (#7444)
## Problem Downloading tenant data for analysis/debug with `aws s3 cp` works well for small tenants, but for larger tenants it is unlikely that one ends up with an index that matches layer files, due to the time taken to download. ## Summary of changes - Add a `tenant-snapshot` command to the scrubber, which reads timeline indices and then downloads the layers referenced in the index, even if they were deleted. The result is a snapshot of the tenant's remote storage state that should be usable when imported (#7399 ).
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -5085,6 +5085,7 @@ dependencies = [
|
||||
"aws-smithy-async",
|
||||
"bincode",
|
||||
"bytes",
|
||||
"camino",
|
||||
"chrono",
|
||||
"clap",
|
||||
"crc32c",
|
||||
|
||||
@@ -25,6 +25,7 @@ async-stream.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
futures-util.workspace = true
|
||||
itertools.workspace = true
|
||||
camino.workspace = true
|
||||
|
||||
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
|
||||
chrono = { workspace = true, default-features = false, features = ["clock", "serde"] }
|
||||
|
||||
@@ -5,6 +5,7 @@ pub mod cloud_admin_api;
|
||||
pub mod garbage;
|
||||
pub mod metadata_stream;
|
||||
pub mod scan_metadata;
|
||||
pub mod tenant_snapshot;
|
||||
|
||||
use std::env;
|
||||
use std::fmt::Display;
|
||||
@@ -23,17 +24,18 @@ use aws_sdk_s3::config::{AsyncSleep, Region, SharedAsyncSleep};
|
||||
use aws_sdk_s3::{Client, Config};
|
||||
use aws_smithy_async::rt::sleep::TokioSleep;
|
||||
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use clap::ValueEnum;
|
||||
use pageserver::tenant::TENANTS_SEGMENT_NAME;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use reqwest::Url;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::io::IsTerminal;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tracing::error;
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
|
||||
use utils::id::TimelineId;
|
||||
use utils::fs_ext;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
const MAX_RETRIES: usize = 20;
|
||||
const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN";
|
||||
@@ -147,6 +149,23 @@ impl RootTarget {
|
||||
self.tenants_root().with_sub_segment(&tenant_id.to_string())
|
||||
}
|
||||
|
||||
pub(crate) fn tenant_shards_prefix(&self, tenant_id: &TenantId) -> S3Target {
|
||||
// Only pageserver remote storage contains tenant-shards
|
||||
assert!(matches!(self, Self::Pageserver(_)));
|
||||
let Self::Pageserver(root) = self else {
|
||||
panic!();
|
||||
};
|
||||
|
||||
S3Target {
|
||||
bucket_name: root.bucket_name.clone(),
|
||||
prefix_in_bucket: format!(
|
||||
"{}/{TENANTS_SEGMENT_NAME}/{tenant_id}",
|
||||
root.prefix_in_bucket
|
||||
),
|
||||
delimiter: root.delimiter.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn timelines_root(&self, tenant_id: &TenantShardId) -> S3Target {
|
||||
match self {
|
||||
Self::Pageserver(_) => self.tenant_root(tenant_id).with_sub_segment("timelines"),
|
||||
@@ -240,7 +259,6 @@ pub fn init_logging(file_name: &str) -> WorkerGuard {
|
||||
.with_ansi(false)
|
||||
.with_writer(file_writer);
|
||||
let stderr_logs = fmt::Layer::new()
|
||||
.with_ansi(std::io::stderr().is_terminal())
|
||||
.with_target(false)
|
||||
.with_writer(std::io::stderr);
|
||||
tracing_subscriber::registry()
|
||||
@@ -396,3 +414,50 @@ async fn download_object_with_retries(
|
||||
|
||||
anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
|
||||
}
|
||||
|
||||
async fn download_object_to_file(
|
||||
s3_client: &Client,
|
||||
bucket_name: &str,
|
||||
key: &str,
|
||||
version_id: Option<&str>,
|
||||
local_path: &Utf8Path,
|
||||
) -> anyhow::Result<()> {
|
||||
let tmp_path = Utf8PathBuf::from(format!("{local_path}.tmp"));
|
||||
for _ in 0..MAX_RETRIES {
|
||||
tokio::fs::remove_file(&tmp_path)
|
||||
.await
|
||||
.or_else(fs_ext::ignore_not_found)?;
|
||||
|
||||
let mut file = tokio::fs::File::create(&tmp_path)
|
||||
.await
|
||||
.context("Opening output file")?;
|
||||
|
||||
let request = s3_client.get_object().bucket(bucket_name).key(key);
|
||||
|
||||
let request = match version_id {
|
||||
Some(version_id) => request.version_id(version_id),
|
||||
None => request,
|
||||
};
|
||||
|
||||
let response_stream = match request.send().await {
|
||||
Ok(response) => response,
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Failed to download object for key {key} version {}: {e:#}",
|
||||
version_id.unwrap_or("")
|
||||
);
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let mut read_stream = response_stream.body.into_async_read();
|
||||
|
||||
tokio::io::copy(&mut read_stream, &mut file).await?;
|
||||
|
||||
tokio::fs::rename(&tmp_path, local_path).await?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
|
||||
}
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use s3_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode};
|
||||
use s3_scrubber::scan_metadata::scan_metadata;
|
||||
use s3_scrubber::tenant_snapshot::SnapshotDownloader;
|
||||
use s3_scrubber::{init_logging, BucketConfig, ConsoleConfig, NodeKind, TraversingDepth};
|
||||
|
||||
use clap::{Parser, Subcommand};
|
||||
use utils::id::TenantId;
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(author, version, about, long_about = None)]
|
||||
@@ -38,6 +41,14 @@ enum Command {
|
||||
#[arg(long = "tenant-id", num_args = 0..)]
|
||||
tenant_ids: Vec<TenantShardId>,
|
||||
},
|
||||
TenantSnapshot {
|
||||
#[arg(long = "tenant-id")]
|
||||
tenant_id: TenantId,
|
||||
#[arg(long = "concurrency", short = 'j', default_value_t = 8)]
|
||||
concurrency: usize,
|
||||
#[arg(short, long)]
|
||||
output_path: Utf8PathBuf,
|
||||
},
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
@@ -50,6 +61,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
Command::ScanMetadata { .. } => "scan",
|
||||
Command::FindGarbage { .. } => "find-garbage",
|
||||
Command::PurgeGarbage { .. } => "purge-garbage",
|
||||
Command::TenantSnapshot { .. } => "tenant-snapshot",
|
||||
};
|
||||
let _guard = init_logging(&format!(
|
||||
"{}_{}_{}_{}.log",
|
||||
@@ -102,5 +114,14 @@ async fn main() -> anyhow::Result<()> {
|
||||
Command::PurgeGarbage { input_path, mode } => {
|
||||
purge_garbage(input_path, mode, !cli.delete).await
|
||||
}
|
||||
Command::TenantSnapshot {
|
||||
tenant_id,
|
||||
output_path,
|
||||
concurrency,
|
||||
} => {
|
||||
let downloader =
|
||||
SnapshotDownloader::new(bucket_config, tenant_id, output_path, concurrency)?;
|
||||
downloader.download().await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ use tokio_stream::Stream;
|
||||
|
||||
use crate::{list_objects_with_retries, RootTarget, S3Target, TenantShardTimelineId};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use utils::id::TimelineId;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
/// Given an S3 bucket, output a stream of TenantIds discovered via ListObjectsv2
|
||||
pub fn stream_tenants<'a>(
|
||||
@@ -45,6 +45,62 @@ pub fn stream_tenants<'a>(
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn stream_tenant_shards<'a>(
|
||||
s3_client: &'a Client,
|
||||
target: &'a RootTarget,
|
||||
tenant_id: TenantId,
|
||||
) -> anyhow::Result<impl Stream<Item = Result<TenantShardId, anyhow::Error>> + 'a> {
|
||||
let mut tenant_shard_ids: Vec<Result<TenantShardId, anyhow::Error>> = Vec::new();
|
||||
let mut continuation_token = None;
|
||||
let shards_target = target.tenant_shards_prefix(&tenant_id);
|
||||
|
||||
loop {
|
||||
tracing::info!("Listing in {}", shards_target.prefix_in_bucket);
|
||||
let fetch_response =
|
||||
list_objects_with_retries(s3_client, &shards_target, continuation_token.clone()).await;
|
||||
let fetch_response = match fetch_response {
|
||||
Err(e) => {
|
||||
tenant_shard_ids.push(Err(e));
|
||||
break;
|
||||
}
|
||||
Ok(r) => r,
|
||||
};
|
||||
|
||||
let new_entry_ids = fetch_response
|
||||
.common_prefixes()
|
||||
.iter()
|
||||
.filter_map(|prefix| prefix.prefix())
|
||||
.filter_map(|prefix| -> Option<&str> {
|
||||
prefix
|
||||
.strip_prefix(&target.tenants_root().prefix_in_bucket)?
|
||||
.strip_suffix('/')
|
||||
})
|
||||
.map(|entry_id_str| {
|
||||
let first_part = entry_id_str.split('/').next().unwrap();
|
||||
|
||||
first_part
|
||||
.parse::<TenantShardId>()
|
||||
.with_context(|| format!("Incorrect entry id str: {first_part}"))
|
||||
});
|
||||
|
||||
for i in new_entry_ids {
|
||||
tenant_shard_ids.push(i);
|
||||
}
|
||||
|
||||
match fetch_response.next_continuation_token {
|
||||
Some(new_token) => continuation_token = Some(new_token),
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
|
||||
Ok(stream! {
|
||||
for i in tenant_shard_ids {
|
||||
let id = i?;
|
||||
yield Ok(id);
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Given a TenantShardId, output a stream of the timelines within that tenant, discovered
|
||||
/// using ListObjectsv2. The listing is done before the stream is built, so that this
|
||||
/// function can be used to generate concurrency on a stream using buffer_unordered.
|
||||
|
||||
293
s3_scrubber/src/tenant_snapshot.rs
Normal file
293
s3_scrubber/src/tenant_snapshot.rs
Normal file
@@ -0,0 +1,293 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::checks::{list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData};
|
||||
use crate::metadata_stream::{stream_tenant_shards, stream_tenant_timelines};
|
||||
use crate::{
|
||||
download_object_to_file, init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId,
|
||||
};
|
||||
use anyhow::Context;
|
||||
use async_stream::stream;
|
||||
use aws_sdk_s3::Client;
|
||||
use camino::Utf8PathBuf;
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
use pageserver::tenant::remote_timeline_client::index::IndexLayerMetadata;
|
||||
use pageserver::tenant::storage_layer::LayerFileName;
|
||||
use pageserver::tenant::IndexPart;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use utils::generation::Generation;
|
||||
use utils::id::TenantId;
|
||||
|
||||
pub struct SnapshotDownloader {
|
||||
s3_client: Arc<Client>,
|
||||
s3_root: RootTarget,
|
||||
bucket_config: BucketConfig,
|
||||
tenant_id: TenantId,
|
||||
output_path: Utf8PathBuf,
|
||||
concurrency: usize,
|
||||
}
|
||||
|
||||
impl SnapshotDownloader {
|
||||
pub fn new(
|
||||
bucket_config: BucketConfig,
|
||||
tenant_id: TenantId,
|
||||
output_path: Utf8PathBuf,
|
||||
concurrency: usize,
|
||||
) -> anyhow::Result<Self> {
|
||||
let (s3_client, s3_root) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
|
||||
Ok(Self {
|
||||
s3_client,
|
||||
s3_root,
|
||||
bucket_config,
|
||||
tenant_id,
|
||||
output_path,
|
||||
concurrency,
|
||||
})
|
||||
}
|
||||
|
||||
async fn download_layer(
|
||||
&self,
|
||||
ttid: TenantShardTimelineId,
|
||||
layer_name: LayerFileName,
|
||||
layer_metadata: IndexLayerMetadata,
|
||||
) -> anyhow::Result<(LayerFileName, IndexLayerMetadata)> {
|
||||
// Note this is local as in a local copy of S3 data, not local as in the pageserver's local format. They use
|
||||
// different layer names (remote-style has the generation suffix)
|
||||
let local_path = self.output_path.join(format!(
|
||||
"{}/timelines/{}/{}{}",
|
||||
ttid.tenant_shard_id,
|
||||
ttid.timeline_id,
|
||||
layer_name.file_name(),
|
||||
layer_metadata.generation.get_suffix()
|
||||
));
|
||||
|
||||
// We should only be called for layers that are owned by the input TTID
|
||||
assert_eq!(layer_metadata.shard, ttid.tenant_shard_id.to_index());
|
||||
|
||||
// Assumption: we always write layer files atomically, and layer files are immutable. Therefore if the file
|
||||
// already exists on local disk, we assume it is fully correct and skip it.
|
||||
if tokio::fs::try_exists(&local_path).await? {
|
||||
tracing::debug!("{} already exists", local_path);
|
||||
return Ok((layer_name, layer_metadata));
|
||||
} else {
|
||||
tracing::debug!("{} requires download...", local_path);
|
||||
|
||||
let timeline_root = self.s3_root.timeline_root(&ttid);
|
||||
let remote_layer_path = format!(
|
||||
"{}{}{}",
|
||||
timeline_root.prefix_in_bucket,
|
||||
layer_name.file_name(),
|
||||
layer_metadata.generation.get_suffix()
|
||||
);
|
||||
|
||||
// List versions: the object might be deleted.
|
||||
let versions = self
|
||||
.s3_client
|
||||
.list_object_versions()
|
||||
.bucket(self.bucket_config.bucket.clone())
|
||||
.prefix(&remote_layer_path)
|
||||
.send()
|
||||
.await?;
|
||||
let Some(version) = versions.versions.as_ref().and_then(|v| v.first()) else {
|
||||
return Err(anyhow::anyhow!("No versions found for {remote_layer_path}"));
|
||||
};
|
||||
download_object_to_file(
|
||||
&self.s3_client,
|
||||
&self.bucket_config.bucket,
|
||||
&remote_layer_path,
|
||||
version.version_id.as_deref(),
|
||||
&local_path,
|
||||
)
|
||||
.await?;
|
||||
|
||||
tracing::debug!("Downloaded successfully to {local_path}");
|
||||
}
|
||||
|
||||
Ok((layer_name, layer_metadata))
|
||||
}
|
||||
|
||||
/// Download many layers belonging to the same TTID, with some concurrency
|
||||
async fn download_layers(
|
||||
&self,
|
||||
ttid: TenantShardTimelineId,
|
||||
layers: Vec<(LayerFileName, IndexLayerMetadata)>,
|
||||
) -> anyhow::Result<()> {
|
||||
let layer_count = layers.len();
|
||||
tracing::info!("Downloading {} layers for timeline {ttid}...", layer_count);
|
||||
let layers_stream = stream! {
|
||||
for (layer_name, layer_metadata) in layers {
|
||||
yield self.download_layer(ttid, layer_name, layer_metadata);
|
||||
}
|
||||
};
|
||||
|
||||
tokio::fs::create_dir_all(self.output_path.join(format!(
|
||||
"{}/timelines/{}",
|
||||
ttid.tenant_shard_id, ttid.timeline_id
|
||||
)))
|
||||
.await?;
|
||||
|
||||
let layer_results = layers_stream.buffered(self.concurrency);
|
||||
let mut layer_results = std::pin::pin!(layer_results);
|
||||
|
||||
let mut err = None;
|
||||
let mut download_count = 0;
|
||||
while let Some(i) = layer_results.next().await {
|
||||
download_count += 1;
|
||||
match i {
|
||||
Ok((layer_name, layer_metadata)) => {
|
||||
tracing::info!(
|
||||
"[{download_count}/{layer_count}] OK: {} bytes {ttid} {}",
|
||||
layer_metadata.file_size,
|
||||
layer_name.file_name()
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
// Warn and continue: we will download what we can
|
||||
tracing::warn!("Download error: {e}");
|
||||
err = Some(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(e) = err {
|
||||
tracing::warn!("Some errors occurred downloading {ttid} layers, last error: {e}");
|
||||
Err(e)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn download_timeline(
|
||||
&self,
|
||||
ttid: TenantShardTimelineId,
|
||||
index_part: IndexPart,
|
||||
index_part_generation: Generation,
|
||||
ancestor_layers: &mut HashMap<
|
||||
TenantShardTimelineId,
|
||||
HashMap<LayerFileName, IndexLayerMetadata>,
|
||||
>,
|
||||
) -> anyhow::Result<()> {
|
||||
let index_bytes = serde_json::to_string(&index_part).unwrap();
|
||||
|
||||
let layers = index_part
|
||||
.layer_metadata
|
||||
.into_iter()
|
||||
.filter_map(|(layer_name, layer_metadata)| {
|
||||
if layer_metadata.shard.shard_count != ttid.tenant_shard_id.shard_count {
|
||||
// Accumulate ancestor layers for later download
|
||||
let ancestor_ttid = TenantShardTimelineId::new(
|
||||
TenantShardId {
|
||||
tenant_id: ttid.tenant_shard_id.tenant_id,
|
||||
shard_number: layer_metadata.shard.shard_number,
|
||||
shard_count: layer_metadata.shard.shard_count,
|
||||
},
|
||||
ttid.timeline_id,
|
||||
);
|
||||
let ancestor_ttid_layers = ancestor_layers.entry(ancestor_ttid).or_default();
|
||||
use std::collections::hash_map::Entry;
|
||||
match ancestor_ttid_layers.entry(layer_name) {
|
||||
Entry::Occupied(entry) => {
|
||||
// Descendent shards that reference a layer from an ancestor should always have matching metadata,
|
||||
// as their siblings, because it is read atomically during a shard split.
|
||||
assert_eq!(entry.get(), &layer_metadata);
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(layer_metadata);
|
||||
}
|
||||
}
|
||||
None
|
||||
} else {
|
||||
Some((layer_name, layer_metadata))
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
let download_result = self.download_layers(ttid, layers).await;
|
||||
|
||||
// Write index last, once all the layers it references are downloaded
|
||||
let local_index_path = self.output_path.join(format!(
|
||||
"{}/timelines/{}/index_part.json{}",
|
||||
ttid.tenant_shard_id,
|
||||
ttid.timeline_id,
|
||||
index_part_generation.get_suffix()
|
||||
));
|
||||
tokio::fs::write(&local_index_path, index_bytes)
|
||||
.await
|
||||
.context("writing index")?;
|
||||
|
||||
download_result
|
||||
}
|
||||
|
||||
pub async fn download(&self) -> anyhow::Result<()> {
|
||||
let (s3_client, target) = init_remote(self.bucket_config.clone(), NodeKind::Pageserver)?;
|
||||
|
||||
// Generate a stream of TenantShardId
|
||||
let shards = stream_tenant_shards(&s3_client, &target, self.tenant_id).await?;
|
||||
let shards: Vec<TenantShardId> = shards.try_collect().await?;
|
||||
|
||||
// Only read from shards that have the highest count: avoids redundantly downloading
|
||||
// from ancestor shards.
|
||||
let Some(shard_count) = shards.iter().map(|s| s.shard_count).max() else {
|
||||
anyhow::bail!("No shards found");
|
||||
};
|
||||
|
||||
// We will build a collection of layers in anccestor shards to download (this will only
|
||||
// happen if this tenant has been split at some point)
|
||||
let mut ancestor_layers: HashMap<
|
||||
TenantShardTimelineId,
|
||||
HashMap<LayerFileName, IndexLayerMetadata>,
|
||||
> = Default::default();
|
||||
|
||||
for shard in shards.into_iter().filter(|s| s.shard_count == shard_count) {
|
||||
// Generate a stream of TenantTimelineId
|
||||
let timelines = stream_tenant_timelines(&s3_client, &self.s3_root, shard).await?;
|
||||
|
||||
// Generate a stream of S3TimelineBlobData
|
||||
async fn load_timeline_index(
|
||||
s3_client: &Client,
|
||||
target: &RootTarget,
|
||||
ttid: TenantShardTimelineId,
|
||||
) -> anyhow::Result<(TenantShardTimelineId, S3TimelineBlobData)> {
|
||||
let data = list_timeline_blobs(s3_client, ttid, target).await?;
|
||||
Ok((ttid, data))
|
||||
}
|
||||
let timelines = timelines.map_ok(|ttid| load_timeline_index(&s3_client, &target, ttid));
|
||||
let mut timelines = std::pin::pin!(timelines.try_buffered(8));
|
||||
|
||||
while let Some(i) = timelines.next().await {
|
||||
let (ttid, data) = i?;
|
||||
match data.blob_data {
|
||||
BlobDataParseResult::Parsed {
|
||||
index_part,
|
||||
index_part_generation,
|
||||
s3_layers: _,
|
||||
} => {
|
||||
self.download_timeline(
|
||||
ttid,
|
||||
index_part,
|
||||
index_part_generation,
|
||||
&mut ancestor_layers,
|
||||
)
|
||||
.await
|
||||
.context("Downloading timeline")?;
|
||||
}
|
||||
BlobDataParseResult::Relic => {}
|
||||
BlobDataParseResult::Incorrect(_) => {
|
||||
tracing::error!("Bad metadata in timeline {ttid}");
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
for (ttid, layers) in ancestor_layers.into_iter() {
|
||||
tracing::info!(
|
||||
"Downloading {} layers from ancvestor timeline {ttid}...",
|
||||
layers.len()
|
||||
);
|
||||
|
||||
self.download_layers(ttid, layers.into_iter().collect())
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -2310,20 +2310,24 @@ class NeonPageserver(PgProtocol):
|
||||
# The entries in the list are regular experessions.
|
||||
self.allowed_errors: List[str] = list(DEFAULT_PAGESERVER_ALLOWED_ERRORS)
|
||||
|
||||
def timeline_dir(self, tenant_id: TenantId, timeline_id: Optional[TimelineId] = None) -> Path:
|
||||
def timeline_dir(
|
||||
self,
|
||||
tenant_shard_id: Union[TenantId, TenantShardId],
|
||||
timeline_id: Optional[TimelineId] = None,
|
||||
) -> Path:
|
||||
"""Get a timeline directory's path based on the repo directory of the test environment"""
|
||||
if timeline_id is None:
|
||||
return self.tenant_dir(tenant_id) / "timelines"
|
||||
return self.tenant_dir(tenant_id) / "timelines" / str(timeline_id)
|
||||
return self.tenant_dir(tenant_shard_id) / "timelines"
|
||||
return self.tenant_dir(tenant_shard_id) / "timelines" / str(timeline_id)
|
||||
|
||||
def tenant_dir(
|
||||
self,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
tenant_shard_id: Optional[Union[TenantId, TenantShardId]] = None,
|
||||
) -> Path:
|
||||
"""Get a tenant directory's path based on the repo directory of the test environment"""
|
||||
if tenant_id is None:
|
||||
if tenant_shard_id is None:
|
||||
return self.workdir / "tenants"
|
||||
return self.workdir / "tenants" / str(tenant_id)
|
||||
return self.workdir / "tenants" / str(tenant_shard_id)
|
||||
|
||||
def start(
|
||||
self,
|
||||
@@ -2510,8 +2514,10 @@ class NeonPageserver(PgProtocol):
|
||||
client = self.http_client()
|
||||
return client.tenant_location_conf(tenant_id, config, **kwargs)
|
||||
|
||||
def read_tenant_location_conf(self, tenant_id: TenantId) -> dict[str, Any]:
|
||||
path = self.tenant_dir(tenant_id) / "config-v1"
|
||||
def read_tenant_location_conf(
|
||||
self, tenant_shard_id: Union[TenantId, TenantShardId]
|
||||
) -> dict[str, Any]:
|
||||
path = self.tenant_dir(tenant_shard_id) / "config-v1"
|
||||
log.info(f"Reading location conf from {path}")
|
||||
bytes = open(path, "r").read()
|
||||
try:
|
||||
@@ -3715,7 +3721,7 @@ class S3Scrubber:
|
||||
log.warning(f"Scrub environment: {env}")
|
||||
log.warning(f"Output at: {output_path}")
|
||||
|
||||
raise RuntimeError("Remote storage scrub failed")
|
||||
raise RuntimeError(f"Scrubber failed while running {args}")
|
||||
|
||||
assert stdout is not None
|
||||
return stdout
|
||||
@@ -3730,6 +3736,13 @@ class S3Scrubber:
|
||||
log.error(stdout)
|
||||
raise
|
||||
|
||||
def tenant_snapshot(self, tenant_id: TenantId, output_path: Path):
|
||||
stdout = self.scrubber_cli(
|
||||
["tenant-snapshot", "--tenant-id", str(tenant_id), "--output-path", str(output_path)],
|
||||
timeout=30,
|
||||
)
|
||||
log.info(f"tenant-snapshot output: {stdout}")
|
||||
|
||||
|
||||
def _get_test_dir(request: FixtureRequest, top_output_dir: Path, prefix: str) -> Path:
|
||||
"""Compute the path to a working directory for an individual test."""
|
||||
|
||||
@@ -252,8 +252,11 @@ class S3Storage:
|
||||
|
||||
log.info(f"deleted {cnt} objects from remote storage")
|
||||
|
||||
def tenants_path(self) -> str:
|
||||
return f"{self.prefix_in_bucket}/tenants"
|
||||
|
||||
def tenant_path(self, tenant_id: TenantId) -> str:
|
||||
return f"{self.prefix_in_bucket}/tenants/{tenant_id}"
|
||||
return f"{self.tenants_path()}/{tenant_id}"
|
||||
|
||||
def heatmap_key(self, tenant_id: TenantId) -> str:
|
||||
return f"{self.tenant_path(tenant_id)}/{TENANT_HEATMAP_FILE_NAME}"
|
||||
@@ -262,6 +265,9 @@ class S3Storage:
|
||||
r = self.client.get_object(Bucket=self.bucket_name, Key=self.heatmap_key(tenant_id))
|
||||
return json.loads(r["Body"].read().decode("utf-8"))
|
||||
|
||||
def mock_remote_tenant_path(self, tenant_id: TenantId):
|
||||
assert self.real is False
|
||||
|
||||
|
||||
RemoteStorage = Union[LocalFsStorage, S3Storage]
|
||||
|
||||
|
||||
@@ -156,7 +156,11 @@ class TenantShardId:
|
||||
raise ValueError(f"Invalid TenantShardId '{input}'")
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.tenant_id}-{self.shard_number:02x}{self.shard_count:02x}"
|
||||
if self.shard_count > 0:
|
||||
return f"{self.tenant_id}-{self.shard_number:02x}{self.shard_count:02x}"
|
||||
else:
|
||||
# Unsharded case: equivalent of Rust TenantShardId::unsharded(tenant_id)
|
||||
return str(self.tenant_id)
|
||||
|
||||
def __repr__(self):
|
||||
return self.__str__()
|
||||
|
||||
111
test_runner/regress/test_s3_scrubber.py
Normal file
111
test_runner/regress/test_s3_scrubber.py
Normal file
@@ -0,0 +1,111 @@
|
||||
import os
|
||||
import shutil
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
S3Scrubber,
|
||||
)
|
||||
from fixtures.remote_storage import S3Storage, s3_storage
|
||||
from fixtures.types import TenantShardId
|
||||
from fixtures.workload import Workload
|
||||
|
||||
|
||||
@pytest.mark.parametrize("shard_count", [None, 4])
|
||||
def test_scrubber_tenant_snapshot(neon_env_builder: NeonEnvBuilder, shard_count: Optional[int]):
|
||||
"""
|
||||
Test the `tenant-snapshot` subcommand, which grabs data from remote storage
|
||||
|
||||
This is only a support/debug tool, but worth testing to ensure the tool does not regress.
|
||||
"""
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||
neon_env_builder.num_pageservers = shard_count if shard_count is not None else 1
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
branch = "main"
|
||||
|
||||
# Do some work
|
||||
workload = Workload(env, tenant_id, timeline_id, branch)
|
||||
workload.init()
|
||||
|
||||
# Multiple write/flush passes to generate multiple layers
|
||||
for _n in range(0, 3):
|
||||
workload.write_rows(128)
|
||||
|
||||
# Do some more work after a restart, so that we have multiple generations
|
||||
for pageserver in env.pageservers:
|
||||
pageserver.stop()
|
||||
pageserver.start()
|
||||
|
||||
for _n in range(0, 3):
|
||||
workload.write_rows(128)
|
||||
|
||||
# If we're doing multiple shards, split: this is important to exercise
|
||||
# the scrubber's ability to understand the references from child shards to parent shard's layers
|
||||
if shard_count is not None:
|
||||
tenant_shard_ids = env.storage_controller.tenant_shard_split(
|
||||
tenant_id, shard_count=shard_count
|
||||
)
|
||||
|
||||
# Write after shard split: this will result in shards containing a mixture of owned
|
||||
# and parent layers in their index.
|
||||
workload.write_rows(128)
|
||||
else:
|
||||
tenant_shard_ids = [TenantShardId(tenant_id, 0, 0)]
|
||||
|
||||
output_path = neon_env_builder.test_output_dir / "snapshot"
|
||||
os.makedirs(output_path)
|
||||
|
||||
scrubber = S3Scrubber(neon_env_builder)
|
||||
scrubber.tenant_snapshot(tenant_id, output_path)
|
||||
|
||||
assert len(os.listdir(output_path)) > 0
|
||||
|
||||
workload.stop()
|
||||
|
||||
# Stop pageservers
|
||||
for pageserver in env.pageservers:
|
||||
pageserver.stop()
|
||||
|
||||
# Drop all shards' local storage
|
||||
for tenant_shard_id in tenant_shard_ids:
|
||||
pageserver = env.get_tenant_pageserver(tenant_shard_id)
|
||||
shutil.rmtree(pageserver.timeline_dir(tenant_shard_id, timeline_id))
|
||||
|
||||
# Replace remote storage contents with the snapshot we downloaded
|
||||
assert isinstance(env.pageserver_remote_storage, S3Storage)
|
||||
|
||||
remote_tenant_path = env.pageserver_remote_storage.tenant_path(tenant_id)
|
||||
|
||||
# Delete current remote storage contents
|
||||
bucket = env.pageserver_remote_storage.bucket_name
|
||||
remote_client = env.pageserver_remote_storage.client
|
||||
deleted = 0
|
||||
for object in remote_client.list_objects_v2(Bucket=bucket, Prefix=remote_tenant_path)[
|
||||
"Contents"
|
||||
]:
|
||||
key = object["Key"]
|
||||
remote_client.delete_object(Key=key, Bucket=bucket)
|
||||
deleted += 1
|
||||
assert deleted > 0
|
||||
|
||||
# Upload from snapshot
|
||||
for root, _dirs, files in os.walk(output_path):
|
||||
for file in files:
|
||||
full_local_path = os.path.join(root, file)
|
||||
full_remote_path = (
|
||||
env.pageserver_remote_storage.tenants_path()
|
||||
+ "/"
|
||||
+ full_local_path.removeprefix(f"{output_path}/")
|
||||
)
|
||||
remote_client.upload_file(full_local_path, bucket, full_remote_path)
|
||||
|
||||
for pageserver in env.pageservers:
|
||||
pageserver.start()
|
||||
|
||||
# Check we can read everything
|
||||
workload.validate()
|
||||
Reference in New Issue
Block a user