From 9bbbfe5ee0cd2c052500a710854bd5714cdcdb76 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Wed, 1 May 2024 21:57:17 +0200 Subject: [PATCH] Add ad-hoc script to sk tli validation --- Cargo.lock | 1 + s3_scrubber/Cargo.toml | 1 + s3_scrubber/src/lib.rs | 28 +- s3_scrubber/src/main.rs | 24 ++ .../src/validate_safekeeper_timeline.rs | 338 ++++++++++++++++++ 5 files changed, 391 insertions(+), 1 deletion(-) create mode 100644 s3_scrubber/src/validate_safekeeper_timeline.rs diff --git a/Cargo.lock b/Cargo.lock index 438b68493b..b2a210e914 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5194,6 +5194,7 @@ dependencies = [ "rand 0.8.5", "remote_storage", "reqwest 0.12.4", + "safekeeper", "serde", "serde_json", "serde_with", diff --git a/s3_scrubber/Cargo.toml b/s3_scrubber/Cargo.toml index dd5d453a2b..3931225422 100644 --- a/s3_scrubber/Cargo.toml +++ b/s3_scrubber/Cargo.toml @@ -40,6 +40,7 @@ aws-config = { workspace = true, default-features = false, features = ["rustls", pageserver = { path = "../pageserver" } pageserver_api = { path = "../libs/pageserver_api" } remote_storage = { path = "../libs/remote_storage" } +safekeeper = { path = "../safekeeper" } tracing.workspace = true tracing-subscriber.workspace = true diff --git a/s3_scrubber/src/lib.rs b/s3_scrubber/src/lib.rs index e976e66748..8eadd09096 100644 --- a/s3_scrubber/src/lib.rs +++ b/s3_scrubber/src/lib.rs @@ -7,6 +7,7 @@ pub mod metadata_stream; pub mod scan_pageserver_metadata; pub mod scan_safekeeper_metadata; pub mod tenant_snapshot; +pub mod validate_safekeeper_timeline; use std::env; use std::fmt::Display; @@ -36,7 +37,7 @@ use tracing::error; use tracing_appender::non_blocking::WorkerGuard; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; use utils::fs_ext; -use utils::id::{TenantId, TimelineId}; +use utils::id::{TenantId, TimelineId, TenantTimelineId}; const MAX_RETRIES: usize = 20; const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN"; @@ -184,6 +185,12 @@ impl RootTarget { .with_sub_segment(&id.timeline_id.to_string()) } + pub fn safekeeper_timeline_root(&self, id: &TenantTimelineId) -> S3Target { + self.tenants_root() + .with_sub_segment(&id.tenant_id.to_string()) + .with_sub_segment(&id.timeline_id.to_string()) + } + pub fn bucket_name(&self) -> &str { match self { Self::Pageserver(root) => &root.bucket_name, @@ -256,6 +263,25 @@ impl ConsoleConfig { } } +pub struct SafekeeperApiConfig { + pub token: String, + pub base_url: Url, +} + +impl SafekeeperApiConfig { + pub fn from_env() -> anyhow::Result { + let base_url: Url = env::var("SAFEKEEPER_API_URL") + .context("'SAFEKEEPER_API_URL' param retrieval")? + .parse() + .context("'SAFEKEEPER_API_URL' param parsing")?; + + let token = env::var("SAFEKEEPER_API_TOKEN") + .context("'SAFEKEEPER_API_TOKEN' environment variable fetch")?; + + Ok(Self { base_url, token }) + } +} + pub fn init_logging(file_name: &str) -> WorkerGuard { let (file_writer, guard) = tracing_appender::non_blocking(tracing_appender::rolling::never("./logs/", file_name)); diff --git a/s3_scrubber/src/main.rs b/s3_scrubber/src/main.rs index e49c280b99..07b6118931 100644 --- a/s3_scrubber/src/main.rs +++ b/s3_scrubber/src/main.rs @@ -4,6 +4,7 @@ use pageserver_api::shard::TenantShardId; use s3_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode}; use s3_scrubber::scan_pageserver_metadata::scan_metadata; use s3_scrubber::tenant_snapshot::SnapshotDownloader; +use s3_scrubber::validate_safekeeper_timeline::validate_timelines; use s3_scrubber::{ init_logging, scan_safekeeper_metadata::scan_safekeeper_metadata, BucketConfig, ConsoleConfig, NodeKind, TraversingDepth, @@ -54,6 +55,15 @@ enum Command { #[arg(long, default_value = None)] dump_db_table: Option, }, + #[command(verbatim_doc_comment)] + ValidateTimelines { + // points to db with debug dump + dump_db_connstr: String, + // table in the db with debug dump + dump_db_table: String, + // validation script will be written to this file + script_file: String, + }, TenantSnapshot { #[arg(long = "tenant-id")] tenant_id: TenantId, @@ -75,6 +85,7 @@ async fn main() -> anyhow::Result<()> { Command::FindGarbage { .. } => "find-garbage", Command::PurgeGarbage { .. } => "purge-garbage", Command::TenantSnapshot { .. } => "tenant-snapshot", + Command::ValidateTimelines { .. } => "validate-timelines", }; let _guard = init_logging(&format!( "{}_{}_{}_{}.log", @@ -178,5 +189,18 @@ async fn main() -> anyhow::Result<()> { SnapshotDownloader::new(bucket_config, tenant_id, output_path, concurrency)?; downloader.download().await } + Command::ValidateTimelines { + dump_db_connstr, + dump_db_table, + script_file, + } => { + validate_timelines( + bucket_config.clone(), + dump_db_connstr, + dump_db_table, + script_file, + ) + .await + } } } diff --git a/s3_scrubber/src/validate_safekeeper_timeline.rs b/s3_scrubber/src/validate_safekeeper_timeline.rs new file mode 100644 index 0000000000..61b16ad86b --- /dev/null +++ b/s3_scrubber/src/validate_safekeeper_timeline.rs @@ -0,0 +1,338 @@ +use std::{ + cmp::max, + str::FromStr, + sync::{Arc, Mutex}, +}; + +use aws_sdk_s3::{types::ObjectIdentifier, Client}; +use futures::stream::{StreamExt, TryStreamExt}; + +use postgres_ffi::{XLogFileName, PG_TLI}; +use reqwest::Url; + +use tokio::{fs::File, io::AsyncWriteExt}; +use tokio_postgres::types::PgLsn; +use tracing::{error, info, info_span, Instrument}; +use utils::{ + id::{TenantId, TenantTimelineId, TimelineId}, + lsn::Lsn, +}; + +use crate::{ + init_remote, metadata_stream::stream_listing, + BucketConfig, NodeKind, RootTarget, SafekeeperApiConfig, +}; + +/// Generally we should ask safekeepers, but so far we use everywhere default 16MB. +const WAL_SEGSIZE: usize = 16 * 1024 * 1024; + +pub struct SafekeeperClient { + token: String, + base_url: Url, + http_client: reqwest::Client, +} + +impl SafekeeperClient { + pub fn new(config: SafekeeperApiConfig) -> Self { + Self { + token: config.token, + base_url: config.base_url, + http_client: reqwest::Client::new(), + } + } + + pub async fn timeline_status( + &self, + ttid: TenantTimelineId, + ) -> anyhow::Result { + // /v1/tenant/:tenant_id/timeline/:timeline_id + let req = self + .http_client + .get(self.append_url(format!( + "v1/tenant/{}/timeline/{}", + ttid.tenant_id, ttid.timeline_id + ))) + .bearer_auth(&self.token); + + let response = req.send().await?; + let mut response: safekeeper::http::routes::TimelineStatus = response.json().await?; + + // this field is noisy + response.acceptor_state.term_history.clear(); + + Ok(response) + } + + fn append_url(&self, subpath: String) -> Url { + // TODO fugly, but `.join` does not work when called + (self.base_url.to_string() + &subpath) + .parse() + .unwrap_or_else(|e| panic!("Could not append {subpath} to base url: {e}")) + } +} + +#[derive(Debug)] +struct DumpedTimeline { + ttid: TenantTimelineId, + timeline_start_lsn: Lsn, + local_start_lsn: Lsn, + backup_lsn: Lsn, + sk_id: u64, +} + +/// Check a single safekeeper timeline with local_start_lsn != timeline_start_lsn. +/// The function will find the segment at local_start_lsn and will try to generate a script to verify it. +/// +/// If backup_lsn > local_start_lsn, this segment is no longer available locally and we have only full segment in S3. +/// In this case, segment will be downloaded with `aws s3 cp` and checked with pg_waldump, zero exitcode means success. +/// +/// Otherwise, when backup_lsn < local_start_lsn, this segment should be partial and present on 3 safekeepers. +/// Script will download partial segment from S3 for the current safekeeper and other safekeeper. Successful validation +/// means that partial segment on current safekeeper is equal to the partial segment on the other safekeeper. +async fn validate_timeline( + s3_client: &Client, + root: &RootTarget, + tli: DumpedTimeline, + api_client: &SafekeeperClient, + shared: Arc, +) -> anyhow::Result<()> { + info!("found timeline {tli:?}"); + + // fetching current status from safekeeper HTTP API + let res = api_client.timeline_status(tli.ttid).await; + if res.is_err() { + info!("skipping, failed to fetch info about timeline: {res:?}"); + } + let status = res?; + info!("status from sk: {status:?}"); + + // Path to the timeline directory in S3 + let timeline_dir_target = root.safekeeper_timeline_root(&tli.ttid); + + assert!(status.backup_lsn >= tli.backup_lsn); + assert!(status.timeline_start_lsn == tli.timeline_start_lsn); + + if status.timeline_start_lsn == status.local_start_lsn { + info!("nothing to do, LSNs are equal"); + return Ok(()); + } + + assert!(status.local_start_lsn == tli.local_start_lsn); + let timeline_start_lsn = status.timeline_start_lsn; + let local_start_lsn = status.local_start_lsn; + + let segno = local_start_lsn.segment_number(WAL_SEGSIZE); + let segfile = XLogFileName(PG_TLI, segno, WAL_SEGSIZE); + + if status.backup_lsn <= status.local_start_lsn { + // we have partial segments, let's find them in S3 and compare in script + info!("timeline without full backed up segment"); + let mut target = timeline_dir_target; + target.delimiter = "".to_string(); + target.prefix_in_bucket += &segfile; + let vec: Vec = stream_listing(s3_client, &target).try_collect().await?; + info!("found partial files: {:?}", vec); + + let expected_suffix = format!("_sk{}.partial", tli.sk_id); + let segment_of_interest = vec.iter().find(|obj| obj.key.ends_with(&expected_suffix)); + + let segment_of_interest = match segment_of_interest { + Some(seg) => seg, + None => { + info!("haven't found a partial segment, skipping"); + return Ok(()); + } + }; + + let partial_prefix = segment_of_interest + .key + .strip_suffix(&expected_suffix) + .unwrap(); + + let another_segment = vec.iter().find(|obj| { + // find another partial segments with the same LSNs + obj.key != segment_of_interest.key && obj.key.starts_with(partial_prefix) + }); + + if another_segment.is_none() { + info!("haven't found another partial segment to compare to"); + return Ok(()); + } + let another_segment = another_segment.unwrap(); + + shared.append_script(&[ + String::new(), + format!( + "aws s3 cp s3://{}/{} {}", + root.bucket_name(), + segment_of_interest.key, + "our_segment" + ), + format!( + "aws s3 cp s3://{}/{} {}", + root.bucket_name(), + another_segment.key, + "their_segment" + ), + // if equal + "if cmp -s our_segment their_segment; then".to_string(), + format!(" echo '{}' >> valid.log", tli.ttid), + "else".to_string(), + format!(" echo '{}' >> invalid.log", tli.ttid), + "fi".to_string(), + format!("rm {} {}", "our_segment", "their_segment"), + ]); + + return Ok(()); + } + + // we have only full segment, let's download it and run pg_waldump + let key = timeline_dir_target.prefix_in_bucket + &segfile; + info!( + "starting s3 download from bucket {}, key {}", + root.bucket_name(), + key + ); + + let segment_start_lsn = local_start_lsn.segment_lsn(WAL_SEGSIZE); + + shared.append_script(&[ + String::new(), + format!("aws s3 cp s3://{}/{} {}", root.bucket_name(), key, segfile), + format!( + "/usr/local/v{}/bin/pg_waldump{} {} > /dev/null", + status.pg_info.pg_version / 10000, + if segment_start_lsn < timeline_start_lsn { + format!(" -s {}", timeline_start_lsn) + } else { + "".to_string() + }, + segfile, + ), + "if [ $? -ne 0 ]; then".to_string(), + format!(" echo '{}' >> invalid.log", tli.ttid), + "else".to_string(), + format!(" echo '{}' >> valid.log", tli.ttid), + "fi".to_string(), + format!("rm {}", segfile), + ]); + + Ok(()) +} + +pub struct InternalTimelines { + validation_script: Vec, +} + +// Shared memory for validation tasks, generated script will be inserted here. +pub struct SharedTimelines { + data: Mutex, +} + +impl SharedTimelines { + fn append_script(&self, arr: &[String]) { + let mut lock = self.data.lock().unwrap(); + lock.validation_script.extend_from_slice(arr); + } +} + +pub async fn validate_timelines( + bucket_config: BucketConfig, + dump_db_connstr: String, + dump_db_table: String, + script_file: String, +) -> anyhow::Result<()> { + info!( + "checking bucket {}, region {}, dump_db_table {}", + bucket_config.bucket, bucket_config.region, dump_db_table + ); + + let shared = Arc::new(SharedTimelines { + data: Mutex::new(InternalTimelines { + validation_script: vec![ + "#!/bin/bash".to_string(), + "echo 'Starting the validation script'".to_string(), + ], + }), + }); + + // Use the native TLS implementation (Neon requires TLS) + let tls_connector = + postgres_native_tls::MakeTlsConnector::new(native_tls::TlsConnector::new().unwrap()); + let (client, connection) = tokio_postgres::connect(&dump_db_connstr, tls_connector).await?; + // The connection object performs the actual communication with the database, + // so spawn it off to run on its own. + tokio::spawn(async move { + if let Err(e) = connection.await { + eprintln!("connection error: {}", e); + } + }); + + let sk_api_config = SafekeeperApiConfig::from_env()?; + let sk_api_client = SafekeeperClient::new(sk_api_config); + + let query = format!( + " + select + tenant_id, timeline_id, timeline_start_lsn, local_start_lsn, backup_lsn, sk_id + from \"{}\" + where + timeline_start_lsn != local_start_lsn +AND sk_id = 51 +AND timeline_id != '13a865b39537d5538e0ea74c926d9c6f' +AND timeline_id != 'c5e944b5c13628ba8fe128b01e7e663d' +AND timeline_id != 'e7cfa4a2bd15c88ff011d69feef4b076';", + dump_db_table, + ); + + info!("query is {}", query); + let timelines = client.query(&query, &[]).await?; + info!("loaded {} timelines", timelines.len()); + + let (s3_client, target) = init_remote(bucket_config, NodeKind::Safekeeper)?; + + let checks = futures::stream::iter(timelines.iter().map(Ok)).map_ok(|row| { + let tenant_id = TenantId::from_str(row.get(0)).expect("failed to parse tenant_id"); + let timeline_id = TimelineId::from_str(row.get(1)).expect("failed to parse tenant_id"); + let timeline_start_lsn_pg: PgLsn = row.get(2); + let local_start_lsn_pg: PgLsn = row.get(3); + let backup_lsn_pg: PgLsn = row.get(4); + let sk_id: i64 = row.get(5); + let sk_id = sk_id as u64; + + let ttid = TenantTimelineId::new(tenant_id, timeline_id); + let shared = shared.clone(); + + let dumped_tli = DumpedTimeline { + ttid, + timeline_start_lsn: Lsn(u64::from(timeline_start_lsn_pg)), + local_start_lsn: Lsn(u64::from(local_start_lsn_pg)), + backup_lsn: Lsn(u64::from(backup_lsn_pg)), + sk_id, + }; + + validate_timeline(&s3_client, &target, dumped_tli, &sk_api_client, shared) + .instrument(info_span!("validate", ttid=%ttid)) + }); + + // Run tasks concurrently. + const CONCURRENCY: usize = 10; + let mut timelines = checks.try_buffered(CONCURRENCY); + + while let Some(r) = timelines.next().await { + if r.is_err() { + error!("failed to process the timeline, error: {:?}", r); + } + } + + // Save resulting script to the file. + let mut file = File::create(script_file).await?; + for line in &shared.data.lock().unwrap().validation_script { + file.write_all(line.as_bytes()).await?; + file.write_all(b"\n").await?; + } + file.flush().await?; + drop(file); + + Ok(()) +}