diff --git a/Cargo.lock b/Cargo.lock index 4d9b20dae4..3e9a7198a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4419,6 +4419,7 @@ dependencies = [ "itertools", "pageserver", "rand 0.8.5", + "remote_storage", "reqwest", "serde", "serde_json", diff --git a/libs/utils/src/generation.rs b/libs/utils/src/generation.rs index 88d50905c6..49e290dab8 100644 --- a/libs/utils/src/generation.rs +++ b/libs/utils/src/generation.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; /// /// See docs/rfcs/025-generation-numbers.md for detail on how generation /// numbers are used. -#[derive(Copy, Clone, Eq, PartialEq, PartialOrd, Ord)] +#[derive(Copy, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)] pub enum Generation { // Generations with this magic value will not add a suffix to S3 keys, and will not // be included in persisted index_part.json. This value is only to be used diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 76e75253fb..bbf6a0c5c5 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -1542,7 +1542,7 @@ pub fn remote_index_path( } /// Given the key of an index, parse out the generation part of the name -pub(crate) fn parse_remote_index_path(path: RemotePath) -> Option { +pub fn parse_remote_index_path(path: RemotePath) -> Option { let file_name = match path.get_path().file_name() { Some(f) => f, None => { diff --git a/pageserver/src/tenant/remote_timeline_client/index.rs b/pageserver/src/tenant/remote_timeline_client/index.rs index 7cf963ca9d..fdab74a8be 100644 --- a/pageserver/src/tenant/remote_timeline_client/index.rs +++ b/pageserver/src/tenant/remote_timeline_client/index.rs @@ -155,7 +155,7 @@ pub struct IndexLayerMetadata { #[serde(default = "Generation::none")] #[serde(skip_serializing_if = "Generation::is_none")] - pub(super) generation: Generation, + pub generation: Generation, } impl From for IndexLayerMetadata { diff --git a/s3_scrubber/Cargo.toml b/s3_scrubber/Cargo.toml index f3ea6e222c..0f3e5630e8 100644 --- a/s3_scrubber/Cargo.toml +++ b/s3_scrubber/Cargo.toml @@ -33,6 +33,7 @@ reqwest = { workspace = true, default-features = false, features = ["rustls-tls" aws-config = { workspace = true, default-features = false, features = ["rustls", "credentials-sso"] } pageserver = { path = "../pageserver" } +remote_storage = { path = "../libs/remote_storage" } tracing.workspace = true tracing-subscriber.workspace = true diff --git a/s3_scrubber/src/checks.rs b/s3_scrubber/src/checks.rs index c11f1f9779..64702fca3d 100644 --- a/s3_scrubber/src/checks.rs +++ b/s3_scrubber/src/checks.rs @@ -1,13 +1,18 @@ use std::collections::HashSet; use anyhow::Context; -use aws_sdk_s3::Client; +use aws_sdk_s3::{types::ObjectIdentifier, Client}; use tracing::{error, info, warn}; +use utils::generation::Generation; use crate::cloud_admin_api::BranchData; -use crate::{download_object_with_retries, list_objects_with_retries, RootTarget}; +use crate::metadata_stream::stream_listing; +use crate::{download_object_with_retries, RootTarget}; +use futures_util::{pin_mut, StreamExt}; +use pageserver::tenant::remote_timeline_client::parse_remote_index_path; use pageserver::tenant::storage_layer::LayerFileName; use pageserver::tenant::IndexPart; +use remote_storage::RemotePath; use utils::id::TenantTimelineId; pub(crate) struct TimelineAnalysis { @@ -68,6 +73,7 @@ pub(crate) async fn branch_cleanup_and_check_errors( match s3_data.blob_data { BlobDataParseResult::Parsed { index_part, + index_part_generation, mut s3_layers, } => { if !IndexPart::KNOWN_VERSIONS.contains(&index_part.get_version()) { @@ -107,33 +113,62 @@ pub(crate) async fn branch_cleanup_and_check_errors( )) } - if !s3_layers.remove(&layer) { + let layer_map_key = (layer, metadata.generation); + if !s3_layers.remove(&layer_map_key) { + // FIXME: this will emit false positives if an index was + // uploaded concurrently with our scan. To make this check + // correct, we need to try sending a HEAD request for the + // layer we think is missing. result.errors.push(format!( - "index_part.json contains a layer {} that is not present in S3", - layer.file_name(), + "index_part.json contains a layer {}{} that is not present in remote storage", + layer_map_key.0.file_name(), + layer_map_key.1.get_suffix() )) } } - if !s3_layers.is_empty() { + let orphan_layers: Vec<(LayerFileName, Generation)> = s3_layers + .into_iter() + .filter(|(_layer_name, gen)| + // A layer is only considered orphaned if it has a generation below + // the index. If the generation is >= the index, then the layer may + // be an upload from a running pageserver, or even an upload from + // a new generation that didn't upload an index yet. + // + // Even so, a layer that is not referenced by the index could just + // be something enqueued for deletion, so while this check is valid + // for indicating that a layer is garbage, it is not an indicator + // of a problem. + gen < &index_part_generation) + .collect(); + + if !orphan_layers.is_empty() { result.errors.push(format!( "index_part.json does not contain layers from S3: {:?}", - s3_layers + orphan_layers .iter() - .map(|layer_name| layer_name.file_name()) + .map(|(layer_name, gen)| format!( + "{}{}", + layer_name.file_name(), + gen.get_suffix() + )) .collect::>(), )); - result - .garbage_keys - .extend(s3_layers.iter().map(|layer_name| { + result.garbage_keys.extend(orphan_layers.iter().map( + |(layer_name, layer_gen)| { let mut key = s3_root.timeline_root(id).prefix_in_bucket; let delimiter = s3_root.delimiter(); if !key.ends_with(delimiter) { key.push_str(delimiter); } - key.push_str(&layer_name.file_name()); + key.push_str(&format!( + "{}{}", + &layer_name.file_name(), + layer_gen.get_suffix() + )); key - })); + }, + )); } } BlobDataParseResult::Incorrect(parse_errors) => result.errors.extend( @@ -178,70 +213,97 @@ pub(crate) struct S3TimelineBlobData { pub(crate) enum BlobDataParseResult { Parsed { index_part: IndexPart, - s3_layers: HashSet, + index_part_generation: Generation, + s3_layers: HashSet<(LayerFileName, Generation)>, }, Incorrect(Vec), } +fn parse_layer_object_name(name: &str) -> Result<(LayerFileName, Generation), String> { + match name.rsplit_once('-') { + // FIXME: this is gross, just use a regex? + Some((layer_filename, gen)) if gen.len() == 8 => { + let layer = layer_filename.parse::()?; + let gen = + Generation::parse_suffix(gen).ok_or("Malformed generation suffix".to_string())?; + Ok((layer, gen)) + } + _ => Ok((name.parse::()?, Generation::none())), + } +} + pub(crate) async fn list_timeline_blobs( s3_client: &Client, id: TenantTimelineId, s3_root: &RootTarget, ) -> anyhow::Result { let mut s3_layers = HashSet::new(); - let mut index_part_object = None; - - let timeline_dir_target = s3_root.timeline_root(&id); - let mut continuation_token = None; let mut errors = Vec::new(); let mut keys_to_remove = Vec::new(); - loop { - let fetch_response = - list_objects_with_retries(s3_client, &timeline_dir_target, continuation_token.clone()) - .await?; + let mut timeline_dir_target = s3_root.timeline_root(&id); + timeline_dir_target.delimiter = String::new(); - let subdirectories = fetch_response.common_prefixes().unwrap_or_default(); - if !subdirectories.is_empty() { - errors.push(format!( - "S3 list response should not contain any subdirectories, but got {subdirectories:?}" - )); - } + let mut index_parts: Vec = Vec::new(); - for (object, key) in fetch_response - .contents() - .unwrap_or_default() - .iter() - .filter_map(|object| Some((object, object.key()?))) - { - let blob_name = key.strip_prefix(&timeline_dir_target.prefix_in_bucket); - match blob_name { - Some("index_part.json") => index_part_object = Some(object.clone()), - Some(maybe_layer_name) => match maybe_layer_name.parse::() { - Ok(new_layer) => { - s3_layers.insert(new_layer); - } - Err(e) => { - errors.push( - format!("S3 list response got an object with key {key} that is not a layer name: {e}"), - ); - keys_to_remove.push(key.to_string()); - } - }, - None => { - errors.push(format!("S3 list response got an object with odd key {key}")); + let stream = stream_listing(s3_client, &timeline_dir_target); + pin_mut!(stream); + while let Some(obj) = stream.next().await { + let obj = obj?; + let key = match obj.key() { + Some(k) => k, + None => continue, + }; + + let blob_name = key.strip_prefix(&timeline_dir_target.prefix_in_bucket); + match blob_name { + Some(name) if name.starts_with("index_part.json") => { + tracing::info!("Index key {key}"); + index_parts.push(obj) + } + Some(maybe_layer_name) => match parse_layer_object_name(maybe_layer_name) { + Ok((new_layer, gen)) => { + tracing::info!("Parsed layer key: {} {:?}", new_layer, gen); + s3_layers.insert((new_layer, gen)); + } + Err(e) => { + tracing::info!("Error parsing key {maybe_layer_name}"); + errors.push( + format!("S3 list response got an object with key {key} that is not a layer name: {e}"), + ); keys_to_remove.push(key.to_string()); } + }, + None => { + tracing::info!("Peculiar key {}", key); + errors.push(format!("S3 list response got an object with odd key {key}")); + keys_to_remove.push(key.to_string()); } } - - match fetch_response.next_continuation_token { - Some(new_token) => continuation_token = Some(new_token), - None => break, - } } + // Choose the index_part with the highest generation + let (index_part_object, index_part_generation) = match index_parts + .iter() + .filter_map(|k| { + let key = k.key().unwrap(); + // Stripping the index key to the last part, because RemotePath doesn't + // like absolute paths, and depending on prefix_in_bucket it's possible + // for the keys we read back to start with a slash. + let basename = key.rsplit_once('/').unwrap().1; + parse_remote_index_path(RemotePath::from_string(basename).unwrap()).map(|g| (k, g)) + }) + .max_by_key(|i| i.1) + .map(|(k, g)| (k.clone(), g)) + { + Some((key, gen)) => (Some(key), gen), + None => { + // Legacy/missing case: one or zero index parts, which did not have a generation + (index_parts.pop(), Generation::none()) + } + }; + if index_part_object.is_none() { errors.push("S3 list response got no index_part.json file".to_string()); } @@ -261,6 +323,7 @@ pub(crate) async fn list_timeline_blobs( return Ok(S3TimelineBlobData { blob_data: BlobDataParseResult::Parsed { index_part, + index_part_generation, s3_layers, }, keys_to_remove, diff --git a/s3_scrubber/src/lib.rs b/s3_scrubber/src/lib.rs index 98e7ed5334..d892438633 100644 --- a/s3_scrubber/src/lib.rs +++ b/s3_scrubber/src/lib.rs @@ -34,6 +34,9 @@ const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN"; #[derive(Debug, Clone)] pub struct S3Target { pub bucket_name: String, + /// This `prefix_in_bucket` is only equal to the PS/SK config of the same + /// name for the RootTarget: other instances of S3Target will have prefix_in_bucket + /// with extra parts. pub prefix_in_bucket: String, pub delimiter: String, } @@ -77,9 +80,13 @@ impl Display for NodeKind { impl S3Target { pub fn with_sub_segment(&self, new_segment: &str) -> Self { let mut new_self = self.clone(); - let _ = new_self.prefix_in_bucket.pop(); - new_self.prefix_in_bucket = - [&new_self.prefix_in_bucket, new_segment, ""].join(&new_self.delimiter); + if new_self.prefix_in_bucket.is_empty() { + new_self.prefix_in_bucket = format!("/{}/", new_segment); + } else { + let _ = new_self.prefix_in_bucket.pop(); + new_self.prefix_in_bucket = + [&new_self.prefix_in_bucket, new_segment, ""].join(&new_self.delimiter); + } new_self } } @@ -91,10 +98,10 @@ pub enum RootTarget { } impl RootTarget { - pub fn tenants_root(&self) -> &S3Target { + pub fn tenants_root(&self) -> S3Target { match self { - Self::Pageserver(root) => root, - Self::Safekeeper(root) => root, + Self::Pageserver(root) => root.with_sub_segment(TENANTS_SEGMENT_NAME), + Self::Safekeeper(root) => root.with_sub_segment("wal"), } } @@ -133,6 +140,7 @@ impl RootTarget { pub struct BucketConfig { pub region: String, pub bucket: String, + pub prefix_in_bucket: Option, /// Use SSO if this is set, else rely on AWS_* environment vars pub sso_account_id: Option, @@ -155,10 +163,12 @@ impl BucketConfig { let sso_account_id = env::var("SSO_ACCOUNT_ID").ok(); let region = env::var("REGION").context("'REGION' param retrieval")?; let bucket = env::var("BUCKET").context("'BUCKET' param retrieval")?; + let prefix_in_bucket = env::var("BUCKET_PREFIX").ok(); Ok(Self { region, bucket, + prefix_in_bucket, sso_account_id, }) } @@ -191,14 +201,14 @@ pub fn init_logging(file_name: &str) -> WorkerGuard { .with_target(false) .with_ansi(false) .with_writer(file_writer); - let stdout_logs = fmt::Layer::new() - .with_ansi(std::io::stdout().is_terminal()) + let stderr_logs = fmt::Layer::new() + .with_ansi(std::io::stderr().is_terminal()) .with_target(false) - .with_writer(std::io::stdout); + .with_writer(std::io::stderr); tracing_subscriber::registry() .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"))) .with(file_logs) - .with(stdout_logs) + .with(stderr_logs) .init(); guard @@ -250,15 +260,20 @@ fn init_remote( let bucket_region = Region::new(bucket_config.region); let delimiter = "/".to_string(); let s3_client = Arc::new(init_s3_client(bucket_config.sso_account_id, bucket_region)); + let s3_root = match node_kind { NodeKind::Pageserver => RootTarget::Pageserver(S3Target { bucket_name: bucket_config.bucket, - prefix_in_bucket: ["pageserver", "v1", TENANTS_SEGMENT_NAME, ""].join(&delimiter), + prefix_in_bucket: bucket_config + .prefix_in_bucket + .unwrap_or("pageserver/v1".to_string()), delimiter, }), NodeKind::Safekeeper => RootTarget::Safekeeper(S3Target { bucket_name: bucket_config.bucket, - prefix_in_bucket: ["safekeeper", "v1", "wal", ""].join(&delimiter), + prefix_in_bucket: bucket_config + .prefix_in_bucket + .unwrap_or("safekeeper/v1".to_string()), delimiter, }), }; diff --git a/s3_scrubber/src/main.rs b/s3_scrubber/src/main.rs index 9a0d6c9ae8..1f0ceebdaf 100644 --- a/s3_scrubber/src/main.rs +++ b/s3_scrubber/src/main.rs @@ -31,7 +31,10 @@ enum Command { #[arg(short, long, default_value_t = PurgeMode::DeletedOnly)] mode: PurgeMode, }, - ScanMetadata {}, + ScanMetadata { + #[arg(short, long, default_value_t = false)] + json: bool, + }, } #[tokio::main] @@ -54,13 +57,17 @@ async fn main() -> anyhow::Result<()> { )); match cli.command { - Command::ScanMetadata {} => match scan_metadata(bucket_config).await { + Command::ScanMetadata { json } => match scan_metadata(bucket_config).await { Err(e) => { tracing::error!("Failed: {e}"); Err(e) } Ok(summary) => { - println!("{}", summary.summary_string()); + if json { + println!("{}", serde_json::to_string(&summary).unwrap()) + } else { + println!("{}", summary.summary_string()); + } if summary.is_fatal() { Err(anyhow::anyhow!("Fatal scrub errors detected")) } else { diff --git a/s3_scrubber/src/metadata_stream.rs b/s3_scrubber/src/metadata_stream.rs index 125a370e90..8095071c1f 100644 --- a/s3_scrubber/src/metadata_stream.rs +++ b/s3_scrubber/src/metadata_stream.rs @@ -13,10 +13,10 @@ pub fn stream_tenants<'a>( ) -> impl Stream> + 'a { try_stream! { let mut continuation_token = None; + let tenants_target = target.tenants_root(); loop { - let tenants_target = target.tenants_root(); let fetch_response = - list_objects_with_retries(s3_client, tenants_target, continuation_token.clone()).await?; + list_objects_with_retries(s3_client, &tenants_target, continuation_token.clone()).await?; let new_entry_ids = fetch_response .common_prefixes() diff --git a/s3_scrubber/src/scan_metadata.rs b/s3_scrubber/src/scan_metadata.rs index 33acd41a5b..ad82db1e76 100644 --- a/s3_scrubber/src/scan_metadata.rs +++ b/s3_scrubber/src/scan_metadata.rs @@ -10,8 +10,10 @@ use aws_sdk_s3::Client; use futures_util::{pin_mut, StreamExt, TryStreamExt}; use histogram::Histogram; use pageserver::tenant::IndexPart; +use serde::Serialize; use utils::id::TenantTimelineId; +#[derive(Serialize)] pub struct MetadataSummary { count: usize, with_errors: HashSet, @@ -25,7 +27,9 @@ pub struct MetadataSummary { } /// A histogram plus minimum and maximum tracking +#[derive(Serialize)] struct MinMaxHisto { + #[serde(skip)] histo: Histogram, min: u64, max: u64, @@ -109,6 +113,7 @@ impl MetadataSummary { self.count += 1; if let BlobDataParseResult::Parsed { index_part, + index_part_generation: _, s3_layers: _, } = &data.blob_data { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 81a7b0750d..02faf715da 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2968,24 +2968,33 @@ class S3Scrubber: self.env = env self.log_dir = log_dir - def scrubber_cli(self, args, timeout): + def scrubber_cli(self, args: list[str], timeout) -> str: assert isinstance(self.env.pageserver_remote_storage, S3Storage) s3_storage = self.env.pageserver_remote_storage env = { "REGION": s3_storage.bucket_region, "BUCKET": s3_storage.bucket_name, + "BUCKET_PREFIX": s3_storage.prefix_in_bucket, + "RUST_LOG": "DEBUG", } env.update(s3_storage.access_env_vars()) if s3_storage.endpoint is not None: env.update({"AWS_ENDPOINT_URL": s3_storage.endpoint}) - base_args = [self.env.neon_binpath / "s3_scrubber"] + base_args = [str(self.env.neon_binpath / "s3_scrubber")] args = base_args + args - (output_path, _, status_code) = subprocess_capture( - self.log_dir, args, echo_stderr=True, echo_stdout=True, env=env, check=False + (output_path, stdout, status_code) = subprocess_capture( + self.log_dir, + args, + echo_stderr=True, + echo_stdout=True, + env=env, + check=False, + capture_stdout=True, + timeout=timeout, ) if status_code: log.warning(f"Scrub command {args} failed") @@ -2994,8 +3003,18 @@ class S3Scrubber: raise RuntimeError("Remote storage scrub failed") - def scan_metadata(self): - self.scrubber_cli(["scan-metadata"], timeout=30) + assert stdout is not None + return stdout + + def scan_metadata(self) -> Any: + stdout = self.scrubber_cli(["scan-metadata", "--json"], timeout=30) + + try: + return json.loads(stdout) + except: + log.error("Failed to decode JSON output from `scan-metadata`. Dumping stdout:") + log.error(stdout) + raise def get_test_output_dir(request: FixtureRequest, top_output_dir: Path) -> Path: diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index e54b82dfb4..ba8d70d5a9 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -35,6 +35,7 @@ def subprocess_capture( echo_stderr=False, echo_stdout=False, capture_stdout=False, + timeout=None, **kwargs: Any, ) -> Tuple[str, Optional[str], int]: """Run a process and bifurcate its output to files and the `log` logger @@ -104,7 +105,7 @@ def subprocess_capture( stderr_handler = OutputHandler(p.stderr, stderr_f, echo=echo_stderr, capture=False) stderr_handler.start() - r = p.wait() + r = p.wait(timeout=timeout) stdout_handler.join() stderr_handler.join() diff --git a/test_runner/regress/test_pageserver_generations.py b/test_runner/regress/test_pageserver_generations.py index 78ef6b9165..3e5021ae06 100644 --- a/test_runner/regress/test_pageserver_generations.py +++ b/test_runner/regress/test_pageserver_generations.py @@ -21,6 +21,7 @@ from fixtures.neon_fixtures import ( NeonEnv, NeonEnvBuilder, PgBin, + S3Scrubber, last_flush_lsn_upload, wait_for_last_flush_lsn, ) @@ -234,8 +235,22 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder): assert len(suffixed_objects) > 0 assert len(legacy_objects) > 0 + # Flush through deletions to get a clean state for scrub: we are implicitly validating + # that our generations-enabled pageserver was able to do deletions of layers + # from earlier which don't have a generation. + env.pageserver.http_client().deletion_queue_flush(execute=True) + assert get_deletion_queue_unexpected_errors(env.pageserver.http_client()) == 0 + # Having written a mixture of generation-aware and legacy index_part.json, + # ensure the scrubber handles the situation as expected. + metadata_summary = S3Scrubber( + neon_env_builder.test_output_dir, neon_env_builder + ).scan_metadata() + assert metadata_summary["count"] == 1 # Scrubber should have seen our timeline + assert not metadata_summary["with_errors"] + assert not metadata_summary["with_warnings"] + def test_deferred_deletion(neon_env_builder: NeonEnvBuilder): neon_env_builder.enable_generations = True