Compare commits

...

4 Commits

Author SHA1 Message Date
Arthur Petukhovsky
e58ea049de Commit latest script updates 2024-05-03 16:01:30 +02:00
Arthur Petukhovsky
a33e271cd7 Download from s3 right away 2024-05-03 11:12:50 +02:00
Arthur Petukhovsky
0f7629cc42 Add fix-local-start-lsn to patch local_start_lsn 2024-05-03 00:37:09 +02:00
Arthur Petukhovsky
9bbbfe5ee0 Add ad-hoc script to sk tli validation 2024-05-03 00:37:08 +02:00
6 changed files with 611 additions and 3 deletions

1
Cargo.lock generated
View File

@@ -5194,6 +5194,7 @@ dependencies = [
"rand 0.8.5",
"remote_storage",
"reqwest 0.12.4",
"safekeeper",
"serde",
"serde_json",
"serde_with",

View File

@@ -40,6 +40,7 @@ aws-config = { workspace = true, default-features = false, features = ["rustls",
pageserver = { path = "../pageserver" }
pageserver_api = { path = "../libs/pageserver_api" }
remote_storage = { path = "../libs/remote_storage" }
safekeeper = { path = "../safekeeper" }
tracing.workspace = true
tracing-subscriber.workspace = true

View File

@@ -7,6 +7,7 @@ pub mod metadata_stream;
pub mod scan_pageserver_metadata;
pub mod scan_safekeeper_metadata;
pub mod tenant_snapshot;
pub mod validate_safekeeper_timeline;
use std::env;
use std::fmt::Display;
@@ -36,7 +37,7 @@ use tracing::error;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
use utils::fs_ext;
use utils::id::{TenantId, TimelineId};
use utils::id::{TenantId, TimelineId, TenantTimelineId};
const MAX_RETRIES: usize = 20;
const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN";
@@ -184,6 +185,12 @@ impl RootTarget {
.with_sub_segment(&id.timeline_id.to_string())
}
pub fn safekeeper_timeline_root(&self, id: &TenantTimelineId) -> S3Target {
self.tenants_root()
.with_sub_segment(&id.tenant_id.to_string())
.with_sub_segment(&id.timeline_id.to_string())
}
pub fn bucket_name(&self) -> &str {
match self {
Self::Pageserver(root) => &root.bucket_name,
@@ -256,6 +263,25 @@ impl ConsoleConfig {
}
}
pub struct SafekeeperApiConfig {
pub token: String,
pub base_url: Url,
}
impl SafekeeperApiConfig {
pub fn from_env() -> anyhow::Result<Self> {
let base_url: Url = env::var("SAFEKEEPER_API_URL")
.context("'SAFEKEEPER_API_URL' param retrieval")?
.parse()
.context("'SAFEKEEPER_API_URL' param parsing")?;
let token = env::var("SAFEKEEPER_API_TOKEN")
.context("'SAFEKEEPER_API_TOKEN' environment variable fetch")?;
Ok(Self { base_url, token })
}
}
pub fn init_logging(file_name: &str) -> WorkerGuard {
let (file_writer, guard) =
tracing_appender::non_blocking(tracing_appender::rolling::never("./logs/", file_name));
@@ -266,6 +292,7 @@ pub fn init_logging(file_name: &str) -> WorkerGuard {
.with_writer(file_writer);
let stderr_logs = fmt::Layer::new()
.with_target(false)
.with_ansi(false)
.with_writer(std::io::stderr);
tracing_subscriber::registry()
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))

View File

@@ -4,6 +4,7 @@ use pageserver_api::shard::TenantShardId;
use s3_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode};
use s3_scrubber::scan_pageserver_metadata::scan_metadata;
use s3_scrubber::tenant_snapshot::SnapshotDownloader;
use s3_scrubber::validate_safekeeper_timeline::{fix_local_start_lsn, validate_timelines};
use s3_scrubber::{
init_logging, scan_safekeeper_metadata::scan_safekeeper_metadata, BucketConfig, ConsoleConfig,
NodeKind, TraversingDepth,
@@ -62,6 +63,20 @@ enum Command {
#[arg(short, long)]
output_path: Utf8PathBuf,
},
#[command(verbatim_doc_comment)]
ValidateTimelines {
// points to db with debug dump
dump_db_connstr: String,
// table in the db with debug dump
dump_db_table: String,
// validation script will be written to this file
script_file: String,
},
#[command(verbatim_doc_comment)]
FixLocalStartLsn {
// file with timelines to fix in "<tenant>/<timeline>" format, on each line
tli_list_file: String,
},
}
#[tokio::main]
@@ -75,6 +90,8 @@ async fn main() -> anyhow::Result<()> {
Command::FindGarbage { .. } => "find-garbage",
Command::PurgeGarbage { .. } => "purge-garbage",
Command::TenantSnapshot { .. } => "tenant-snapshot",
Command::ValidateTimelines { .. } => "validate-timelines",
Command::FixLocalStartLsn { .. } => "fix-local-start-lsn",
};
let _guard = init_logging(&format!(
"{}_{}_{}_{}.log",
@@ -178,5 +195,25 @@ async fn main() -> anyhow::Result<()> {
SnapshotDownloader::new(bucket_config, tenant_id, output_path, concurrency)?;
downloader.download().await
}
Command::ValidateTimelines {
dump_db_connstr,
dump_db_table,
script_file,
} => {
validate_timelines(
bucket_config.clone(),
dump_db_connstr,
dump_db_table,
script_file,
)
.await
}
Command::FixLocalStartLsn {
tli_list_file,
} => {
fix_local_start_lsn(
tli_list_file,
).await
}
}
}

View File

@@ -0,0 +1,542 @@
use std::{
cmp::{max, min},
str::FromStr,
sync::{Arc, Mutex}, time::Duration,
};
use aws_sdk_s3::{types::ObjectIdentifier, Client};
use camino::Utf8Path;
use futures::stream::{StreamExt, TryStreamExt};
use postgres_ffi::{XLogFileName, PG_TLI};
use reqwest::Url;
use safekeeper::patch_control_file;
use serde_json::json;
use tokio::{fs::File, io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, process::Command};
use tokio_postgres::types::PgLsn;
use tracing::{error, info, info_span, Instrument};
use utils::{
id::{TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,
};
use crate::{
download_object_to_file, init_remote, metadata_stream::stream_listing, BucketConfig, NodeKind, RootTarget, SafekeeperApiConfig
};
/// Generally we should ask safekeepers, but so far we use everywhere default 16MB.
const WAL_SEGSIZE: usize = 16 * 1024 * 1024;
pub struct SafekeeperClient {
token: String,
base_url: Url,
http_client: reqwest::Client,
}
impl SafekeeperClient {
pub fn new(config: SafekeeperApiConfig) -> Self {
Self {
token: config.token,
base_url: config.base_url,
http_client: reqwest::Client::new(),
}
}
pub async fn timeline_status(
&self,
ttid: TenantTimelineId,
) -> anyhow::Result<safekeeper::http::routes::TimelineStatus> {
// /v1/tenant/:tenant_id/timeline/:timeline_id
let req = self
.http_client
.get(self.append_url(format!(
"v1/tenant/{}/timeline/{}",
ttid.tenant_id, ttid.timeline_id
)))
.bearer_auth(&self.token);
let response = req.send().await?;
let mut response: safekeeper::http::routes::TimelineStatus = response.json().await?;
// this field is noisy
response.acceptor_state.term_history.clear();
Ok(response)
}
pub async fn patch_control_file(
&self,
ttid: TenantTimelineId,
request: patch_control_file::Request,
) -> anyhow::Result<patch_control_file::Response> {
// /v1/tenant/:tenant_id/timeline/:timeline_id/control_file
let req = self
.http_client
.patch(self.append_url(format!(
"v1/tenant/{}/timeline/{}/control_file",
ttid.tenant_id, ttid.timeline_id,
)))
.json(&request)
.bearer_auth(&self.token);
let response = req.send().await?;
let mut response: patch_control_file::Response = response.json().await?;
// this field is noisy
response.old_control_file.acceptor_state.term_history.0.clear();
response.new_control_file.acceptor_state.term_history.0.clear();
Ok(response)
}
fn append_url(&self, subpath: String) -> Url {
// TODO fugly, but `.join` does not work when called
(self.base_url.to_string() + &subpath)
.parse()
.unwrap_or_else(|e| panic!("Could not append {subpath} to base url: {e}"))
}
}
#[derive(Debug)]
struct DumpedTimeline {
ttid: TenantTimelineId,
timeline_start_lsn: Lsn,
local_start_lsn: Lsn,
backup_lsn: Lsn,
sk_id: u64,
}
fn parse_commit_lsn(formatted_string: &str) -> Option<Lsn> {
// Split the string by the underscore '_'
let parts: Vec<&str> = formatted_string.split('_').collect();
// The fourth element (index 3) is `commit_lsn.0`, formatted as a hexadecimal string
if parts.len() > 3 {
Some(Lsn(u64::from_str_radix(parts[3], 16).ok()?))
} else {
None
}
}
/// Check a single safekeeper timeline with local_start_lsn != timeline_start_lsn.
/// The function will find the segment at local_start_lsn and will try to generate a script to verify it.
///
/// If backup_lsn > local_start_lsn, this segment is no longer available locally and we have only full segment in S3.
/// In this case, segment will be downloaded with `aws s3 cp` and checked with pg_waldump, zero exitcode means success.
///
/// Otherwise, when backup_lsn < local_start_lsn, this segment should be partial and present on 3 safekeepers.
/// Script will download partial segment from S3 for the current safekeeper and other safekeeper. Successful validation
/// means that partial segment on current safekeeper is equal to the partial segment on the other safekeeper.
async fn validate_timeline(
s3_client: &Client,
root: &RootTarget,
tli: DumpedTimeline,
api_client: &SafekeeperClient,
shared: Arc<SharedTimelines>,
) -> anyhow::Result<()> {
info!("found timeline {tli:?}");
// fetching current status from safekeeper HTTP API
let res = api_client.timeline_status(tli.ttid).await;
if res.is_err() {
info!("skipping, failed to fetch info about timeline: {res:?}");
}
let status = res?;
info!("status from sk: {status:?}");
// Path to the timeline directory in S3
let timeline_dir_target = root.safekeeper_timeline_root(&tli.ttid);
assert!(status.backup_lsn >= tli.backup_lsn);
assert!(status.timeline_start_lsn == tli.timeline_start_lsn);
if status.timeline_start_lsn == status.local_start_lsn {
info!("nothing to do, LSNs are equal");
return Ok(());
}
assert!(status.local_start_lsn == tli.local_start_lsn);
let timeline_start_lsn = status.timeline_start_lsn;
let local_start_lsn = status.local_start_lsn;
let segno = local_start_lsn.segment_number(WAL_SEGSIZE);
let segfile = XLogFileName(PG_TLI, segno, WAL_SEGSIZE);
if status.backup_lsn <= status.local_start_lsn {
// we have partial segments, let's find them in S3 and compare in script
info!("timeline without full backed up segment");
let mut target = timeline_dir_target;
target.delimiter = "".to_string();
target.prefix_in_bucket += &segfile;
let vec: Vec<ObjectIdentifier> = stream_listing(s3_client, &target).try_collect().await?;
info!("found partial files: {:?}", vec);
let expected_suffix = format!("_sk{}.partial", tli.sk_id);
let segment_of_interest = vec.iter().find(|obj| obj.key.ends_with(&expected_suffix));
let segment_of_interest = match segment_of_interest {
Some(seg) => seg,
None => {
info!("haven't found a partial segment, skipping");
return Ok(());
}
};
let partial_prefix = segment_of_interest
.key
.strip_suffix(&expected_suffix)
.unwrap();
let another_segment = vec.iter().find(|obj| {
obj.key != segment_of_interest.key
});
if another_segment.is_none() {
info!("haven't found another partial segment to compare to");
return Ok(());
}
let another_segment = another_segment.unwrap();
let ours_commit_lsn = parse_commit_lsn(&segment_of_interest.key);
if ours_commit_lsn.is_none() {
info!("failed to parse commit_lsn from {}", &segment_of_interest.key);
return Ok(());
}
let ours_commit_lsn = ours_commit_lsn.unwrap();
let their_commit_lsn = parse_commit_lsn(&another_segment.key);
if their_commit_lsn.is_none() {
info!("failed to parse commit_lsn from {}", &another_segment.key);
return Ok(());
}
let their_commit_lsn = their_commit_lsn.unwrap();
let min_commit_lsn = min(ours_commit_lsn, their_commit_lsn);
let strip_offset = min_commit_lsn.segment_offset(WAL_SEGSIZE);
info!("found ours_commit_lsn={ours_commit_lsn}, their_commit_lsn={their_commit_lsn}, min={min_commit_lsn}, offset={strip_offset}");
let ours_str = format!("./tmp/ours");
let ours = Utf8Path::new(&ours_str);
download_object_to_file(
s3_client,
root.bucket_name(),
&segment_of_interest.key,
None,
&ours,
).await?;
let their_str = format!("./tmp/their");
let their = Utf8Path::new(&their_str);
download_object_to_file(
s3_client,
root.bucket_name(),
&another_segment.key,
None,
&their,
).await?;
let cmd = format!(
"truncate --size={} ./tmp/ours && truncate --size={} ./tmp/their && cmp -l ./tmp/ours ./tmp/their",
strip_offset,
strip_offset,
);
// Execute the command using `bash -c`
let mut process = Command::new("bash")
.arg("-c")
.arg(&cmd)
.spawn()?;
// Await the process to finish and check the exit status
let status = process.wait().await?;
// let's delete the file
tokio::fs::remove_file(ours).await?;
tokio::fs::remove_file(their).await?;
if status.success() {
info!("Command executed successfully.");
shared.append_script(&[
format!("echo '{}' >> valid.log", tli.ttid),
]);
} else {
shared.append_script(&[
format!("echo '{}' >> invalid.log", tli.ttid),
]);
info!("Command failed with status: {}", status);
if let Some(code) = status.code() {
info!("Exit code: {}", code);
} else {
info!("Process terminated by signal");
}
}
return Ok(());
}
// we have only full segment, let's download it and run pg_waldump
let key = timeline_dir_target.prefix_in_bucket + &segfile;
info!(
"starting s3 download from bucket {}, key {}",
root.bucket_name(),
key
);
let segment_start_lsn = local_start_lsn.segment_lsn(WAL_SEGSIZE);
let path_str = format!("./tmp/{}", segfile);
let path = Utf8Path::new(&path_str);
download_object_to_file(
s3_client,
root.bucket_name(),
&key,
None,
&path,
).await?;
info!("downloaded file to {}", path);
let waldump_cmd = format!(
"./pg_install/v{}/bin/pg_waldump -i{} ./tmp/{}",
status.pg_info.pg_version / 10000,
if segment_start_lsn < timeline_start_lsn {
format!(" -s {}", timeline_start_lsn)
} else {
"".to_string()
},
segfile,
);
info!("running {}", waldump_cmd);
// Execute the command using `bash -c`
let mut process = Command::new("bash")
.arg("-c")
.arg(&waldump_cmd)
.spawn()?;
// Await the process to finish and check the exit status
let status = process.wait().await?;
// let's delete the file
tokio::fs::remove_file(path).await?;
if status.success() {
info!("Command executed successfully.");
shared.append_script(&[
format!("echo '{}' >> valid.log", tli.ttid),
]);
} else {
shared.append_script(&[
format!("echo '{}' >> invalid.log", tli.ttid),
]);
info!("Command failed with status: {}", status);
if let Some(code) = status.code() {
info!("Exit code: {}", code);
} else {
info!("Process terminated by signal");
}
}
// shared.append_script(&[
// String::new(),
// format!("aws s3 cp s3://{}/{} {}", root.bucket_name(), key, segfile),
// format!(
// "/usr/local/v{}/bin/pg_waldump{} {} > /dev/null",
// status.pg_info.pg_version / 10000,
// if segment_start_lsn < timeline_start_lsn {
// format!(" -s {}", timeline_start_lsn)
// } else {
// "".to_string()
// },
// segfile,
// ),
// "if [ $? -ne 0 ]; then".to_string(),
// format!(" echo '{}' >> invalid.log", tli.ttid),
// "else".to_string(),
// format!(" echo '{}' >> valid.log", tli.ttid),
// "fi".to_string(),
// format!("rm {}", segfile),
// ]);
Ok(())
}
pub struct InternalTimelines {
validation_script: Vec<String>,
}
// Shared memory for validation tasks, generated script will be inserted here.
pub struct SharedTimelines {
data: Mutex<InternalTimelines>,
}
impl SharedTimelines {
fn append_script(&self, arr: &[String]) {
let mut lock = self.data.lock().unwrap();
lock.validation_script.extend_from_slice(arr);
}
}
pub async fn validate_timelines(
bucket_config: BucketConfig,
dump_db_connstr: String,
dump_db_table: String,
script_file: String,
) -> anyhow::Result<()> {
info!(
"checking bucket {}, region {}, dump_db_table {}",
bucket_config.bucket, bucket_config.region, dump_db_table
);
let shared = Arc::new(SharedTimelines {
data: Mutex::new(InternalTimelines {
validation_script: vec![
"#!/bin/bash".to_string(),
"echo 'Starting the validation script'".to_string(),
],
}),
});
// Use the native TLS implementation (Neon requires TLS)
let tls_connector =
postgres_native_tls::MakeTlsConnector::new(native_tls::TlsConnector::new().unwrap());
let (client, connection) = tokio_postgres::connect(&dump_db_connstr, tls_connector).await?;
// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
let sk_api_config = SafekeeperApiConfig::from_env()?;
let sk_api_client = SafekeeperClient::new(sk_api_config);
let query = format!(
"
select
tenant_id, timeline_id, timeline_start_lsn, local_start_lsn, backup_lsn, sk_id
from \"{}\"
where
timeline_start_lsn != local_start_lsn
AND sk_id = 51
AND timeline_id != '13a865b39537d5538e0ea74c926d9c6f'
AND timeline_id != 'c5e944b5c13628ba8fe128b01e7e663d'
AND timeline_id != 'e7cfa4a2bd15c88ff011d69feef4b076';",
dump_db_table,
);
info!("query is {}", query);
let timelines = client.query(&query, &[]).await?;
info!("loaded {} timelines", timelines.len());
let (s3_client, target) = init_remote(bucket_config, NodeKind::Safekeeper)?;
let checks = futures::stream::iter(timelines.iter().map(Ok)).map_ok(|row| {
let tenant_id = TenantId::from_str(row.get(0)).expect("failed to parse tenant_id");
let timeline_id = TimelineId::from_str(row.get(1)).expect("failed to parse tenant_id");
let timeline_start_lsn_pg: PgLsn = row.get(2);
let local_start_lsn_pg: PgLsn = row.get(3);
let backup_lsn_pg: PgLsn = row.get(4);
let sk_id: i64 = row.get(5);
let sk_id = sk_id as u64;
let ttid = TenantTimelineId::new(tenant_id, timeline_id);
let shared = shared.clone();
let dumped_tli = DumpedTimeline {
ttid,
timeline_start_lsn: Lsn(u64::from(timeline_start_lsn_pg)),
local_start_lsn: Lsn(u64::from(local_start_lsn_pg)),
backup_lsn: Lsn(u64::from(backup_lsn_pg)),
sk_id,
};
validate_timeline(&s3_client, &target, dumped_tli, &sk_api_client, shared)
.instrument(info_span!("validate", ttid=%ttid))
});
// Run tasks concurrently.
const CONCURRENCY: usize = 1;
let mut timelines = checks.try_buffered(CONCURRENCY);
while let Some(r) = timelines.next().await {
if r.is_err() {
error!("failed to process the timeline, error: {:?}", r);
}
}
// Save resulting script to the file.
let mut file = File::create(script_file).await?;
for line in &shared.data.lock().unwrap().validation_script {
file.write_all(line.as_bytes()).await?;
file.write_all(b"\n").await?;
}
file.flush().await?;
drop(file);
Ok(())
}
pub async fn fix_local_start_lsn(tli_list_file: String) -> anyhow::Result<()> {
let sk_api_config = SafekeeperApiConfig::from_env()?;
let sk_api_client = SafekeeperClient::new(sk_api_config);
let mut tlis: Vec<TenantTimelineId> = Vec::new();
let file = File::open(tli_list_file).await?;
let reader = BufReader::new(file);
let mut lines = reader.lines();
while let Some(line) = lines.next_line().await? {
let ttid = TenantTimelineId::from_str(&line)?;
tlis.push(ttid);
}
drop(lines);
info!("Read {} timelines from the file, starting applying new local_start_lsn", tlis.len());
for ttid in tlis {
let res = fix_one_timeline(&sk_api_client, ttid).await;
if res.is_err() {
error!("error while fixing timeline {}: {:?}", ttid, res);
}
}
Ok(())
}
async fn fix_one_timeline(api_client: &SafekeeperClient, ttid: TenantTimelineId) -> anyhow::Result<()> {
// fetching current status from safekeeper HTTP API
let res = api_client.timeline_status(ttid).await;
if res.is_err() {
info!("skipping, failed to fetch info about timeline: {res:?}");
}
let status = res?;
info!("status from sk: {status:?}");
if status.timeline_start_lsn == status.local_start_lsn {
info!("nothing to do, LSNs are equal");
return Ok(());
}
let request = patch_control_file::Request {
updates: json!({
"local_start_lsn": status.timeline_start_lsn.to_string(),
}),
apply_fields: vec!["local_start_lsn".to_string()],
};
info!("sending request {:?}", request);
let res = api_client.patch_control_file(ttid, request).await?;
info!("patch finished: {:?}", res);
Ok(())
}

View File

@@ -6,7 +6,7 @@ use tracing::info;
use crate::{state::TimelinePersistentState, timeline::Timeline};
#[derive(Deserialize, Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Request {
/// JSON object with fields to update
pub updates: serde_json::Value,
@@ -14,7 +14,7 @@ pub struct Request {
pub apply_fields: Vec<String>,
}
#[derive(Serialize)]
#[derive(Serialize, Deserialize, Debug)]
pub struct Response {
pub old_control_file: TimelinePersistentState,
pub new_control_file: TimelinePersistentState,