From 0f7629cc42e2b2839ea43412f38bc15cee39b051 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Thu, 2 May 2024 12:34:10 +0200 Subject: [PATCH] Add fix-local-start-lsn to patch local_start_lsn --- s3_scrubber/src/main.rs | 29 ++++-- .../src/validate_safekeeper_timeline.rs | 89 ++++++++++++++++++- safekeeper/src/patch_control_file.rs | 4 +- 3 files changed, 110 insertions(+), 12 deletions(-) diff --git a/s3_scrubber/src/main.rs b/s3_scrubber/src/main.rs index 07b6118931..2087250725 100644 --- a/s3_scrubber/src/main.rs +++ b/s3_scrubber/src/main.rs @@ -4,7 +4,7 @@ use pageserver_api::shard::TenantShardId; use s3_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode}; use s3_scrubber::scan_pageserver_metadata::scan_metadata; use s3_scrubber::tenant_snapshot::SnapshotDownloader; -use s3_scrubber::validate_safekeeper_timeline::validate_timelines; +use s3_scrubber::validate_safekeeper_timeline::{fix_local_start_lsn, validate_timelines}; use s3_scrubber::{ init_logging, scan_safekeeper_metadata::scan_safekeeper_metadata, BucketConfig, ConsoleConfig, NodeKind, TraversingDepth, @@ -55,6 +55,14 @@ enum Command { #[arg(long, default_value = None)] dump_db_table: Option, }, + TenantSnapshot { + #[arg(long = "tenant-id")] + tenant_id: TenantId, + #[arg(long = "concurrency", short = 'j', default_value_t = 8)] + concurrency: usize, + #[arg(short, long)] + output_path: Utf8PathBuf, + }, #[command(verbatim_doc_comment)] ValidateTimelines { // points to db with debug dump @@ -64,13 +72,10 @@ enum Command { // validation script will be written to this file script_file: String, }, - TenantSnapshot { - #[arg(long = "tenant-id")] - tenant_id: TenantId, - #[arg(long = "concurrency", short = 'j', default_value_t = 8)] - concurrency: usize, - #[arg(short, long)] - output_path: Utf8PathBuf, + #[command(verbatim_doc_comment)] + FixLocalStartLsn { + // file with timelines to fix in "/" format, on each line + tli_list_file: String, }, } @@ -86,6 +91,7 @@ async fn main() -> anyhow::Result<()> { Command::PurgeGarbage { .. } => "purge-garbage", Command::TenantSnapshot { .. } => "tenant-snapshot", Command::ValidateTimelines { .. } => "validate-timelines", + Command::FixLocalStartLsn { .. } => "fix-local-start-lsn", }; let _guard = init_logging(&format!( "{}_{}_{}_{}.log", @@ -202,5 +208,12 @@ async fn main() -> anyhow::Result<()> { ) .await } + Command::FixLocalStartLsn { + tli_list_file, + } => { + fix_local_start_lsn( + tli_list_file, + ).await + } } } diff --git a/s3_scrubber/src/validate_safekeeper_timeline.rs b/s3_scrubber/src/validate_safekeeper_timeline.rs index 61b16ad86b..5d57f04849 100644 --- a/s3_scrubber/src/validate_safekeeper_timeline.rs +++ b/s3_scrubber/src/validate_safekeeper_timeline.rs @@ -1,7 +1,7 @@ use std::{ cmp::max, str::FromStr, - sync::{Arc, Mutex}, + sync::{Arc, Mutex}, time::Duration, }; use aws_sdk_s3::{types::ObjectIdentifier, Client}; @@ -10,7 +10,9 @@ use futures::stream::{StreamExt, TryStreamExt}; use postgres_ffi::{XLogFileName, PG_TLI}; use reqwest::Url; -use tokio::{fs::File, io::AsyncWriteExt}; +use safekeeper::patch_control_file; +use serde_json::json; +use tokio::{fs::File, io::{AsyncBufReadExt, AsyncWriteExt, BufReader}}; use tokio_postgres::types::PgLsn; use tracing::{error, info, info_span, Instrument}; use utils::{ @@ -63,6 +65,32 @@ impl SafekeeperClient { Ok(response) } + pub async fn patch_control_file( + &self, + ttid: TenantTimelineId, + request: patch_control_file::Request, + ) -> anyhow::Result { + // /v1/tenant/:tenant_id/timeline/:timeline_id/control_file + + let req = self + .http_client + .patch(self.append_url(format!( + "v1/tenant/{}/timeline/{}/control_file", + ttid.tenant_id, ttid.timeline_id, + ))) + .json(&request) + .bearer_auth(&self.token); + + let response = req.send().await?; + let mut response: patch_control_file::Response = response.json().await?; + + // this field is noisy + response.old_control_file.acceptor_state.term_history.0.clear(); + response.new_control_file.acceptor_state.term_history.0.clear(); + + Ok(response) + } + fn append_url(&self, subpath: String) -> Url { // TODO fugly, but `.join` does not work when called (self.base_url.to_string() + &subpath) @@ -336,3 +364,60 @@ AND timeline_id != 'e7cfa4a2bd15c88ff011d69feef4b076';", Ok(()) } + +pub async fn fix_local_start_lsn(tli_list_file: String) -> anyhow::Result<()> { + let sk_api_config = SafekeeperApiConfig::from_env()?; + let sk_api_client = SafekeeperClient::new(sk_api_config); + + let mut tlis: Vec = Vec::new(); + + let file = File::open(tli_list_file).await?; + let reader = BufReader::new(file); + let mut lines = reader.lines(); + + while let Some(line) = lines.next_line().await? { + let ttid = TenantTimelineId::from_str(&line)?; + tlis.push(ttid); + } + drop(lines); + + info!("Read {} timelines from the file, starting applying new local_start_lsn", tlis.len()); + + for ttid in tlis { + let res = fix_one_timeline(&sk_api_client, ttid).await; + if res.is_err() { + error!("error while fixing timeline {}: {:?}", ttid, res); + } + } + + Ok(()) +} + +async fn fix_one_timeline(api_client: &SafekeeperClient, ttid: TenantTimelineId) -> anyhow::Result<()> { + // fetching current status from safekeeper HTTP API + let res = api_client.timeline_status(ttid).await; + if res.is_err() { + info!("skipping, failed to fetch info about timeline: {res:?}"); + } + + let status = res?; + info!("status from sk: {status:?}"); + + if status.timeline_start_lsn == status.local_start_lsn { + info!("nothing to do, LSNs are equal"); + return Ok(()); + } + + let request = patch_control_file::Request { + updates: json!({ + "local_start_lsn": status.timeline_start_lsn.to_string(), + }), + apply_fields: vec!["local_start_lsn".to_string()], + }; + + info!("sending request {:?}", request); + let res = api_client.patch_control_file(ttid, request).await?; + info!("patch finished: {:?}", res); + + Ok(()) +} diff --git a/safekeeper/src/patch_control_file.rs b/safekeeper/src/patch_control_file.rs index 2136d1b5f7..89f3144983 100644 --- a/safekeeper/src/patch_control_file.rs +++ b/safekeeper/src/patch_control_file.rs @@ -6,7 +6,7 @@ use tracing::info; use crate::{state::TimelinePersistentState, timeline::Timeline}; -#[derive(Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct Request { /// JSON object with fields to update pub updates: serde_json::Value, @@ -14,7 +14,7 @@ pub struct Request { pub apply_fields: Vec, } -#[derive(Serialize)] +#[derive(Serialize, Deserialize, Debug)] pub struct Response { pub old_control_file: TimelinePersistentState, pub new_control_file: TimelinePersistentState,