s3_scrubber: prepare for scrubbing buckets with generation-aware content (#5700)

## Problem

The scrubber didn't know how to find the latest index_part when
generations were in use.

## Summary of changes

- Teach the scrubber to do the same dance that pageserver does when
finding the latest index_part.json
- Teach the scrubber how to understand layer files with generation
suffixes.
- General improvement to testability: scan_metadata has a machine
readable output that the testing `S3Scrubber` wrapper can read.
- Existing test coverage of scrubber was false-passing because it just
didn't see any data due to prefixing of data in the bucket. Fix that.

This is incremental improvement: the more confidence we can have in the
scrubber, the more we can use it in integration tests to validate the
state of remote storage.

---------

Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
This commit is contained in:
John Spray
2023-11-03 17:36:02 +00:00
committed by GitHub
parent 5ceccdc7de
commit 306c4f9967
13 changed files with 209 additions and 82 deletions

1
Cargo.lock generated
View File

@@ -4419,6 +4419,7 @@ dependencies = [
"itertools",
"pageserver",
"rand 0.8.5",
"remote_storage",
"reqwest",
"serde",
"serde_json",

View File

@@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
///
/// See docs/rfcs/025-generation-numbers.md for detail on how generation
/// numbers are used.
#[derive(Copy, Clone, Eq, PartialEq, PartialOrd, Ord)]
#[derive(Copy, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)]
pub enum Generation {
// Generations with this magic value will not add a suffix to S3 keys, and will not
// be included in persisted index_part.json. This value is only to be used

View File

@@ -1542,7 +1542,7 @@ pub fn remote_index_path(
}
/// Given the key of an index, parse out the generation part of the name
pub(crate) fn parse_remote_index_path(path: RemotePath) -> Option<Generation> {
pub fn parse_remote_index_path(path: RemotePath) -> Option<Generation> {
let file_name = match path.get_path().file_name() {
Some(f) => f,
None => {

View File

@@ -155,7 +155,7 @@ pub struct IndexLayerMetadata {
#[serde(default = "Generation::none")]
#[serde(skip_serializing_if = "Generation::is_none")]
pub(super) generation: Generation,
pub generation: Generation,
}
impl From<LayerFileMetadata> for IndexLayerMetadata {

View File

@@ -33,6 +33,7 @@ reqwest = { workspace = true, default-features = false, features = ["rustls-tls"
aws-config = { workspace = true, default-features = false, features = ["rustls", "credentials-sso"] }
pageserver = { path = "../pageserver" }
remote_storage = { path = "../libs/remote_storage" }
tracing.workspace = true
tracing-subscriber.workspace = true

View File

@@ -1,13 +1,18 @@
use std::collections::HashSet;
use anyhow::Context;
use aws_sdk_s3::Client;
use aws_sdk_s3::{types::ObjectIdentifier, Client};
use tracing::{error, info, warn};
use utils::generation::Generation;
use crate::cloud_admin_api::BranchData;
use crate::{download_object_with_retries, list_objects_with_retries, RootTarget};
use crate::metadata_stream::stream_listing;
use crate::{download_object_with_retries, RootTarget};
use futures_util::{pin_mut, StreamExt};
use pageserver::tenant::remote_timeline_client::parse_remote_index_path;
use pageserver::tenant::storage_layer::LayerFileName;
use pageserver::tenant::IndexPart;
use remote_storage::RemotePath;
use utils::id::TenantTimelineId;
pub(crate) struct TimelineAnalysis {
@@ -68,6 +73,7 @@ pub(crate) async fn branch_cleanup_and_check_errors(
match s3_data.blob_data {
BlobDataParseResult::Parsed {
index_part,
index_part_generation,
mut s3_layers,
} => {
if !IndexPart::KNOWN_VERSIONS.contains(&index_part.get_version()) {
@@ -107,33 +113,62 @@ pub(crate) async fn branch_cleanup_and_check_errors(
))
}
if !s3_layers.remove(&layer) {
let layer_map_key = (layer, metadata.generation);
if !s3_layers.remove(&layer_map_key) {
// FIXME: this will emit false positives if an index was
// uploaded concurrently with our scan. To make this check
// correct, we need to try sending a HEAD request for the
// layer we think is missing.
result.errors.push(format!(
"index_part.json contains a layer {} that is not present in S3",
layer.file_name(),
"index_part.json contains a layer {}{} that is not present in remote storage",
layer_map_key.0.file_name(),
layer_map_key.1.get_suffix()
))
}
}
if !s3_layers.is_empty() {
let orphan_layers: Vec<(LayerFileName, Generation)> = s3_layers
.into_iter()
.filter(|(_layer_name, gen)|
// A layer is only considered orphaned if it has a generation below
// the index. If the generation is >= the index, then the layer may
// be an upload from a running pageserver, or even an upload from
// a new generation that didn't upload an index yet.
//
// Even so, a layer that is not referenced by the index could just
// be something enqueued for deletion, so while this check is valid
// for indicating that a layer is garbage, it is not an indicator
// of a problem.
gen < &index_part_generation)
.collect();
if !orphan_layers.is_empty() {
result.errors.push(format!(
"index_part.json does not contain layers from S3: {:?}",
s3_layers
orphan_layers
.iter()
.map(|layer_name| layer_name.file_name())
.map(|(layer_name, gen)| format!(
"{}{}",
layer_name.file_name(),
gen.get_suffix()
))
.collect::<Vec<_>>(),
));
result
.garbage_keys
.extend(s3_layers.iter().map(|layer_name| {
result.garbage_keys.extend(orphan_layers.iter().map(
|(layer_name, layer_gen)| {
let mut key = s3_root.timeline_root(id).prefix_in_bucket;
let delimiter = s3_root.delimiter();
if !key.ends_with(delimiter) {
key.push_str(delimiter);
}
key.push_str(&layer_name.file_name());
key.push_str(&format!(
"{}{}",
&layer_name.file_name(),
layer_gen.get_suffix()
));
key
}));
},
));
}
}
BlobDataParseResult::Incorrect(parse_errors) => result.errors.extend(
@@ -178,70 +213,97 @@ pub(crate) struct S3TimelineBlobData {
pub(crate) enum BlobDataParseResult {
Parsed {
index_part: IndexPart,
s3_layers: HashSet<LayerFileName>,
index_part_generation: Generation,
s3_layers: HashSet<(LayerFileName, Generation)>,
},
Incorrect(Vec<String>),
}
fn parse_layer_object_name(name: &str) -> Result<(LayerFileName, Generation), String> {
match name.rsplit_once('-') {
// FIXME: this is gross, just use a regex?
Some((layer_filename, gen)) if gen.len() == 8 => {
let layer = layer_filename.parse::<LayerFileName>()?;
let gen =
Generation::parse_suffix(gen).ok_or("Malformed generation suffix".to_string())?;
Ok((layer, gen))
}
_ => Ok((name.parse::<LayerFileName>()?, Generation::none())),
}
}
pub(crate) async fn list_timeline_blobs(
s3_client: &Client,
id: TenantTimelineId,
s3_root: &RootTarget,
) -> anyhow::Result<S3TimelineBlobData> {
let mut s3_layers = HashSet::new();
let mut index_part_object = None;
let timeline_dir_target = s3_root.timeline_root(&id);
let mut continuation_token = None;
let mut errors = Vec::new();
let mut keys_to_remove = Vec::new();
loop {
let fetch_response =
list_objects_with_retries(s3_client, &timeline_dir_target, continuation_token.clone())
.await?;
let mut timeline_dir_target = s3_root.timeline_root(&id);
timeline_dir_target.delimiter = String::new();
let subdirectories = fetch_response.common_prefixes().unwrap_or_default();
if !subdirectories.is_empty() {
errors.push(format!(
"S3 list response should not contain any subdirectories, but got {subdirectories:?}"
));
}
let mut index_parts: Vec<ObjectIdentifier> = Vec::new();
for (object, key) in fetch_response
.contents()
.unwrap_or_default()
.iter()
.filter_map(|object| Some((object, object.key()?)))
{
let blob_name = key.strip_prefix(&timeline_dir_target.prefix_in_bucket);
match blob_name {
Some("index_part.json") => index_part_object = Some(object.clone()),
Some(maybe_layer_name) => match maybe_layer_name.parse::<LayerFileName>() {
Ok(new_layer) => {
s3_layers.insert(new_layer);
}
Err(e) => {
errors.push(
format!("S3 list response got an object with key {key} that is not a layer name: {e}"),
);
keys_to_remove.push(key.to_string());
}
},
None => {
errors.push(format!("S3 list response got an object with odd key {key}"));
let stream = stream_listing(s3_client, &timeline_dir_target);
pin_mut!(stream);
while let Some(obj) = stream.next().await {
let obj = obj?;
let key = match obj.key() {
Some(k) => k,
None => continue,
};
let blob_name = key.strip_prefix(&timeline_dir_target.prefix_in_bucket);
match blob_name {
Some(name) if name.starts_with("index_part.json") => {
tracing::info!("Index key {key}");
index_parts.push(obj)
}
Some(maybe_layer_name) => match parse_layer_object_name(maybe_layer_name) {
Ok((new_layer, gen)) => {
tracing::info!("Parsed layer key: {} {:?}", new_layer, gen);
s3_layers.insert((new_layer, gen));
}
Err(e) => {
tracing::info!("Error parsing key {maybe_layer_name}");
errors.push(
format!("S3 list response got an object with key {key} that is not a layer name: {e}"),
);
keys_to_remove.push(key.to_string());
}
},
None => {
tracing::info!("Peculiar key {}", key);
errors.push(format!("S3 list response got an object with odd key {key}"));
keys_to_remove.push(key.to_string());
}
}
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
// Choose the index_part with the highest generation
let (index_part_object, index_part_generation) = match index_parts
.iter()
.filter_map(|k| {
let key = k.key().unwrap();
// Stripping the index key to the last part, because RemotePath doesn't
// like absolute paths, and depending on prefix_in_bucket it's possible
// for the keys we read back to start with a slash.
let basename = key.rsplit_once('/').unwrap().1;
parse_remote_index_path(RemotePath::from_string(basename).unwrap()).map(|g| (k, g))
})
.max_by_key(|i| i.1)
.map(|(k, g)| (k.clone(), g))
{
Some((key, gen)) => (Some(key), gen),
None => {
// Legacy/missing case: one or zero index parts, which did not have a generation
(index_parts.pop(), Generation::none())
}
};
if index_part_object.is_none() {
errors.push("S3 list response got no index_part.json file".to_string());
}
@@ -261,6 +323,7 @@ pub(crate) async fn list_timeline_blobs(
return Ok(S3TimelineBlobData {
blob_data: BlobDataParseResult::Parsed {
index_part,
index_part_generation,
s3_layers,
},
keys_to_remove,

View File

@@ -34,6 +34,9 @@ const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN";
#[derive(Debug, Clone)]
pub struct S3Target {
pub bucket_name: String,
/// This `prefix_in_bucket` is only equal to the PS/SK config of the same
/// name for the RootTarget: other instances of S3Target will have prefix_in_bucket
/// with extra parts.
pub prefix_in_bucket: String,
pub delimiter: String,
}
@@ -77,9 +80,13 @@ impl Display for NodeKind {
impl S3Target {
pub fn with_sub_segment(&self, new_segment: &str) -> Self {
let mut new_self = self.clone();
let _ = new_self.prefix_in_bucket.pop();
new_self.prefix_in_bucket =
[&new_self.prefix_in_bucket, new_segment, ""].join(&new_self.delimiter);
if new_self.prefix_in_bucket.is_empty() {
new_self.prefix_in_bucket = format!("/{}/", new_segment);
} else {
let _ = new_self.prefix_in_bucket.pop();
new_self.prefix_in_bucket =
[&new_self.prefix_in_bucket, new_segment, ""].join(&new_self.delimiter);
}
new_self
}
}
@@ -91,10 +98,10 @@ pub enum RootTarget {
}
impl RootTarget {
pub fn tenants_root(&self) -> &S3Target {
pub fn tenants_root(&self) -> S3Target {
match self {
Self::Pageserver(root) => root,
Self::Safekeeper(root) => root,
Self::Pageserver(root) => root.with_sub_segment(TENANTS_SEGMENT_NAME),
Self::Safekeeper(root) => root.with_sub_segment("wal"),
}
}
@@ -133,6 +140,7 @@ impl RootTarget {
pub struct BucketConfig {
pub region: String,
pub bucket: String,
pub prefix_in_bucket: Option<String>,
/// Use SSO if this is set, else rely on AWS_* environment vars
pub sso_account_id: Option<String>,
@@ -155,10 +163,12 @@ impl BucketConfig {
let sso_account_id = env::var("SSO_ACCOUNT_ID").ok();
let region = env::var("REGION").context("'REGION' param retrieval")?;
let bucket = env::var("BUCKET").context("'BUCKET' param retrieval")?;
let prefix_in_bucket = env::var("BUCKET_PREFIX").ok();
Ok(Self {
region,
bucket,
prefix_in_bucket,
sso_account_id,
})
}
@@ -191,14 +201,14 @@ pub fn init_logging(file_name: &str) -> WorkerGuard {
.with_target(false)
.with_ansi(false)
.with_writer(file_writer);
let stdout_logs = fmt::Layer::new()
.with_ansi(std::io::stdout().is_terminal())
let stderr_logs = fmt::Layer::new()
.with_ansi(std::io::stderr().is_terminal())
.with_target(false)
.with_writer(std::io::stdout);
.with_writer(std::io::stderr);
tracing_subscriber::registry()
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
.with(file_logs)
.with(stdout_logs)
.with(stderr_logs)
.init();
guard
@@ -250,15 +260,20 @@ fn init_remote(
let bucket_region = Region::new(bucket_config.region);
let delimiter = "/".to_string();
let s3_client = Arc::new(init_s3_client(bucket_config.sso_account_id, bucket_region));
let s3_root = match node_kind {
NodeKind::Pageserver => RootTarget::Pageserver(S3Target {
bucket_name: bucket_config.bucket,
prefix_in_bucket: ["pageserver", "v1", TENANTS_SEGMENT_NAME, ""].join(&delimiter),
prefix_in_bucket: bucket_config
.prefix_in_bucket
.unwrap_or("pageserver/v1".to_string()),
delimiter,
}),
NodeKind::Safekeeper => RootTarget::Safekeeper(S3Target {
bucket_name: bucket_config.bucket,
prefix_in_bucket: ["safekeeper", "v1", "wal", ""].join(&delimiter),
prefix_in_bucket: bucket_config
.prefix_in_bucket
.unwrap_or("safekeeper/v1".to_string()),
delimiter,
}),
};

View File

@@ -31,7 +31,10 @@ enum Command {
#[arg(short, long, default_value_t = PurgeMode::DeletedOnly)]
mode: PurgeMode,
},
ScanMetadata {},
ScanMetadata {
#[arg(short, long, default_value_t = false)]
json: bool,
},
}
#[tokio::main]
@@ -54,13 +57,17 @@ async fn main() -> anyhow::Result<()> {
));
match cli.command {
Command::ScanMetadata {} => match scan_metadata(bucket_config).await {
Command::ScanMetadata { json } => match scan_metadata(bucket_config).await {
Err(e) => {
tracing::error!("Failed: {e}");
Err(e)
}
Ok(summary) => {
println!("{}", summary.summary_string());
if json {
println!("{}", serde_json::to_string(&summary).unwrap())
} else {
println!("{}", summary.summary_string());
}
if summary.is_fatal() {
Err(anyhow::anyhow!("Fatal scrub errors detected"))
} else {

View File

@@ -13,10 +13,10 @@ pub fn stream_tenants<'a>(
) -> impl Stream<Item = anyhow::Result<TenantId>> + 'a {
try_stream! {
let mut continuation_token = None;
let tenants_target = target.tenants_root();
loop {
let tenants_target = target.tenants_root();
let fetch_response =
list_objects_with_retries(s3_client, tenants_target, continuation_token.clone()).await?;
list_objects_with_retries(s3_client, &tenants_target, continuation_token.clone()).await?;
let new_entry_ids = fetch_response
.common_prefixes()

View File

@@ -10,8 +10,10 @@ use aws_sdk_s3::Client;
use futures_util::{pin_mut, StreamExt, TryStreamExt};
use histogram::Histogram;
use pageserver::tenant::IndexPart;
use serde::Serialize;
use utils::id::TenantTimelineId;
#[derive(Serialize)]
pub struct MetadataSummary {
count: usize,
with_errors: HashSet<TenantTimelineId>,
@@ -25,7 +27,9 @@ pub struct MetadataSummary {
}
/// A histogram plus minimum and maximum tracking
#[derive(Serialize)]
struct MinMaxHisto {
#[serde(skip)]
histo: Histogram,
min: u64,
max: u64,
@@ -109,6 +113,7 @@ impl MetadataSummary {
self.count += 1;
if let BlobDataParseResult::Parsed {
index_part,
index_part_generation: _,
s3_layers: _,
} = &data.blob_data
{

View File

@@ -2968,24 +2968,33 @@ class S3Scrubber:
self.env = env
self.log_dir = log_dir
def scrubber_cli(self, args, timeout):
def scrubber_cli(self, args: list[str], timeout) -> str:
assert isinstance(self.env.pageserver_remote_storage, S3Storage)
s3_storage = self.env.pageserver_remote_storage
env = {
"REGION": s3_storage.bucket_region,
"BUCKET": s3_storage.bucket_name,
"BUCKET_PREFIX": s3_storage.prefix_in_bucket,
"RUST_LOG": "DEBUG",
}
env.update(s3_storage.access_env_vars())
if s3_storage.endpoint is not None:
env.update({"AWS_ENDPOINT_URL": s3_storage.endpoint})
base_args = [self.env.neon_binpath / "s3_scrubber"]
base_args = [str(self.env.neon_binpath / "s3_scrubber")]
args = base_args + args
(output_path, _, status_code) = subprocess_capture(
self.log_dir, args, echo_stderr=True, echo_stdout=True, env=env, check=False
(output_path, stdout, status_code) = subprocess_capture(
self.log_dir,
args,
echo_stderr=True,
echo_stdout=True,
env=env,
check=False,
capture_stdout=True,
timeout=timeout,
)
if status_code:
log.warning(f"Scrub command {args} failed")
@@ -2994,8 +3003,18 @@ class S3Scrubber:
raise RuntimeError("Remote storage scrub failed")
def scan_metadata(self):
self.scrubber_cli(["scan-metadata"], timeout=30)
assert stdout is not None
return stdout
def scan_metadata(self) -> Any:
stdout = self.scrubber_cli(["scan-metadata", "--json"], timeout=30)
try:
return json.loads(stdout)
except:
log.error("Failed to decode JSON output from `scan-metadata`. Dumping stdout:")
log.error(stdout)
raise
def get_test_output_dir(request: FixtureRequest, top_output_dir: Path) -> Path:

View File

@@ -35,6 +35,7 @@ def subprocess_capture(
echo_stderr=False,
echo_stdout=False,
capture_stdout=False,
timeout=None,
**kwargs: Any,
) -> Tuple[str, Optional[str], int]:
"""Run a process and bifurcate its output to files and the `log` logger
@@ -104,7 +105,7 @@ def subprocess_capture(
stderr_handler = OutputHandler(p.stderr, stderr_f, echo=echo_stderr, capture=False)
stderr_handler.start()
r = p.wait()
r = p.wait(timeout=timeout)
stdout_handler.join()
stderr_handler.join()

View File

@@ -21,6 +21,7 @@ from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PgBin,
S3Scrubber,
last_flush_lsn_upload,
wait_for_last_flush_lsn,
)
@@ -234,8 +235,22 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
assert len(suffixed_objects) > 0
assert len(legacy_objects) > 0
# Flush through deletions to get a clean state for scrub: we are implicitly validating
# that our generations-enabled pageserver was able to do deletions of layers
# from earlier which don't have a generation.
env.pageserver.http_client().deletion_queue_flush(execute=True)
assert get_deletion_queue_unexpected_errors(env.pageserver.http_client()) == 0
# Having written a mixture of generation-aware and legacy index_part.json,
# ensure the scrubber handles the situation as expected.
metadata_summary = S3Scrubber(
neon_env_builder.test_output_dir, neon_env_builder
).scan_metadata()
assert metadata_summary["count"] == 1 # Scrubber should have seen our timeline
assert not metadata_summary["with_errors"]
assert not metadata_summary["with_warnings"]
def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_generations = True