Add fix-local-start-lsn to patch local_start_lsn

This commit is contained in:
Arthur Petukhovsky
2024-05-02 12:34:10 +02:00
parent 9bbbfe5ee0
commit 0f7629cc42
3 changed files with 110 additions and 12 deletions

View File

@@ -4,7 +4,7 @@ use pageserver_api::shard::TenantShardId;
use s3_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode};
use s3_scrubber::scan_pageserver_metadata::scan_metadata;
use s3_scrubber::tenant_snapshot::SnapshotDownloader;
use s3_scrubber::validate_safekeeper_timeline::validate_timelines;
use s3_scrubber::validate_safekeeper_timeline::{fix_local_start_lsn, validate_timelines};
use s3_scrubber::{
init_logging, scan_safekeeper_metadata::scan_safekeeper_metadata, BucketConfig, ConsoleConfig,
NodeKind, TraversingDepth,
@@ -55,6 +55,14 @@ enum Command {
#[arg(long, default_value = None)]
dump_db_table: Option<String>,
},
TenantSnapshot {
#[arg(long = "tenant-id")]
tenant_id: TenantId,
#[arg(long = "concurrency", short = 'j', default_value_t = 8)]
concurrency: usize,
#[arg(short, long)]
output_path: Utf8PathBuf,
},
#[command(verbatim_doc_comment)]
ValidateTimelines {
// points to db with debug dump
@@ -64,13 +72,10 @@ enum Command {
// validation script will be written to this file
script_file: String,
},
TenantSnapshot {
#[arg(long = "tenant-id")]
tenant_id: TenantId,
#[arg(long = "concurrency", short = 'j', default_value_t = 8)]
concurrency: usize,
#[arg(short, long)]
output_path: Utf8PathBuf,
#[command(verbatim_doc_comment)]
FixLocalStartLsn {
// file with timelines to fix in "<tenant>/<timeline>" format, on each line
tli_list_file: String,
},
}
@@ -86,6 +91,7 @@ async fn main() -> anyhow::Result<()> {
Command::PurgeGarbage { .. } => "purge-garbage",
Command::TenantSnapshot { .. } => "tenant-snapshot",
Command::ValidateTimelines { .. } => "validate-timelines",
Command::FixLocalStartLsn { .. } => "fix-local-start-lsn",
};
let _guard = init_logging(&format!(
"{}_{}_{}_{}.log",
@@ -202,5 +208,12 @@ async fn main() -> anyhow::Result<()> {
)
.await
}
Command::FixLocalStartLsn {
tli_list_file,
} => {
fix_local_start_lsn(
tli_list_file,
).await
}
}
}

View File

@@ -1,7 +1,7 @@
use std::{
cmp::max,
str::FromStr,
sync::{Arc, Mutex},
sync::{Arc, Mutex}, time::Duration,
};
use aws_sdk_s3::{types::ObjectIdentifier, Client};
@@ -10,7 +10,9 @@ use futures::stream::{StreamExt, TryStreamExt};
use postgres_ffi::{XLogFileName, PG_TLI};
use reqwest::Url;
use tokio::{fs::File, io::AsyncWriteExt};
use safekeeper::patch_control_file;
use serde_json::json;
use tokio::{fs::File, io::{AsyncBufReadExt, AsyncWriteExt, BufReader}};
use tokio_postgres::types::PgLsn;
use tracing::{error, info, info_span, Instrument};
use utils::{
@@ -63,6 +65,32 @@ impl SafekeeperClient {
Ok(response)
}
pub async fn patch_control_file(
&self,
ttid: TenantTimelineId,
request: patch_control_file::Request,
) -> anyhow::Result<patch_control_file::Response> {
// /v1/tenant/:tenant_id/timeline/:timeline_id/control_file
let req = self
.http_client
.patch(self.append_url(format!(
"v1/tenant/{}/timeline/{}/control_file",
ttid.tenant_id, ttid.timeline_id,
)))
.json(&request)
.bearer_auth(&self.token);
let response = req.send().await?;
let mut response: patch_control_file::Response = response.json().await?;
// this field is noisy
response.old_control_file.acceptor_state.term_history.0.clear();
response.new_control_file.acceptor_state.term_history.0.clear();
Ok(response)
}
fn append_url(&self, subpath: String) -> Url {
// TODO fugly, but `.join` does not work when called
(self.base_url.to_string() + &subpath)
@@ -336,3 +364,60 @@ AND timeline_id != 'e7cfa4a2bd15c88ff011d69feef4b076';",
Ok(())
}
pub async fn fix_local_start_lsn(tli_list_file: String) -> anyhow::Result<()> {
let sk_api_config = SafekeeperApiConfig::from_env()?;
let sk_api_client = SafekeeperClient::new(sk_api_config);
let mut tlis: Vec<TenantTimelineId> = Vec::new();
let file = File::open(tli_list_file).await?;
let reader = BufReader::new(file);
let mut lines = reader.lines();
while let Some(line) = lines.next_line().await? {
let ttid = TenantTimelineId::from_str(&line)?;
tlis.push(ttid);
}
drop(lines);
info!("Read {} timelines from the file, starting applying new local_start_lsn", tlis.len());
for ttid in tlis {
let res = fix_one_timeline(&sk_api_client, ttid).await;
if res.is_err() {
error!("error while fixing timeline {}: {:?}", ttid, res);
}
}
Ok(())
}
async fn fix_one_timeline(api_client: &SafekeeperClient, ttid: TenantTimelineId) -> anyhow::Result<()> {
// fetching current status from safekeeper HTTP API
let res = api_client.timeline_status(ttid).await;
if res.is_err() {
info!("skipping, failed to fetch info about timeline: {res:?}");
}
let status = res?;
info!("status from sk: {status:?}");
if status.timeline_start_lsn == status.local_start_lsn {
info!("nothing to do, LSNs are equal");
return Ok(());
}
let request = patch_control_file::Request {
updates: json!({
"local_start_lsn": status.timeline_start_lsn.to_string(),
}),
apply_fields: vec!["local_start_lsn".to_string()],
};
info!("sending request {:?}", request);
let res = api_client.patch_control_file(ttid, request).await?;
info!("patch finished: {:?}", res);
Ok(())
}

View File

@@ -6,7 +6,7 @@ use tracing::info;
use crate::{state::TimelinePersistentState, timeline::Timeline};
#[derive(Deserialize, Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Request {
/// JSON object with fields to update
pub updates: serde_json::Value,
@@ -14,7 +14,7 @@ pub struct Request {
pub apply_fields: Vec<String>,
}
#[derive(Serialize)]
#[derive(Serialize, Deserialize, Debug)]
pub struct Response {
pub old_control_file: TimelinePersistentState,
pub new_control_file: TimelinePersistentState,