Compare commits

...

4 Commits

Author SHA1 Message Date
Arthur Petukhovsky
f2e98b2b0a Write WAL bytes to data dir 2023-12-28 18:49:51 +00:00
Arthur Petukhovsky
0ba55719b0 Check bytes in segment intersection 2023-12-28 17:55:04 +00:00
Arthur Petukhovsky
b162b4a9cf Fix clap arg 2023-12-28 17:10:05 +00:00
Arthur Petukhovsky
bbc8da687e Add dryrun version of the script 2023-12-28 16:55:21 +00:00
4 changed files with 298 additions and 2 deletions

View File

@@ -221,6 +221,9 @@ tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", br
# This is useful for profiling and, to some extent, debug.
# Besides, debug info should not affect the performance.
debug = true
strip = true # Automatically strip symbols from the binary.
opt-level = "z" # Optimize for size.
lto = true
# disable debug symbols for all packages except this one to decrease binaries size
[profile.release.package."*"]

View File

@@ -0,0 +1,294 @@
use anyhow::{bail, Context, Result};
use camino::{Utf8Path, Utf8PathBuf};
use clap::{ArgAction, Parser};
use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use postgres_ffi::{WAL_SEGMENT_SIZE, dispatch_pgversion};
use postgres_ffi::v16::xlog_utils::{XLogSegNoOffsetToRecPtr, find_end_of_wal};
use remote_storage::RemoteStorageConfig;
use safekeeper::control_file::FileStorage;
use safekeeper::safekeeper::SafeKeeperState;
use safekeeper::wal_storage::wal_file_paths;
use sd_notify::NotifyState;
use tokio::runtime::Handle;
use tokio::signal::unix::{signal, SignalKind};
use tokio::task::JoinError;
use toml_edit::Document;
use utils::id::{TenantId, TimelineId, TenantTimelineId};
use utils::lsn::Lsn;
use std::fs::{self, File, OpenOptions};
use std::io::{ErrorKind, Write, Read, Seek};
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use storage_broker::Uri;
use tokio::sync::mpsc;
use tracing::*;
use utils::pid_file;
use metrics::set_build_info_metric;
use safekeeper::defaults::{
DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES,
DEFAULT_PG_LISTEN_ADDR,
};
use safekeeper::wal_service;
use safekeeper::GlobalTimelines;
use safekeeper::SafeKeeperConf;
use safekeeper::{broker, WAL_SERVICE_RUNTIME};
use safekeeper::{control_file, BROKER_RUNTIME};
use safekeeper::{http, WAL_REMOVER_RUNTIME};
use safekeeper::{remove_wal, WAL_BACKUP_RUNTIME};
use safekeeper::{wal_backup, HTTP_RUNTIME};
use storage_broker::DEFAULT_ENDPOINT;
use utils::auth::{JwtAuth, Scope, SwappableJwtAuth};
use utils::{
id::NodeId,
logging::{self, LogFormat},
project_build_tag, project_git_version,
sentry_init::init_sentry,
tcp_listener,
};
const PID_FILE_NAME: &str = "safekeeper.pid";
const ID_FILE_NAME: &str = "safekeeper.id";
const CONTROL_FILE_NAME: &str = "safekeeper.control";
project_git_version!(GIT_VERSION);
project_build_tag!(BUILD_TAG);
const ABOUT: &str = r#"
Fixing the issue of some WAL files missing the prefix bytes.
"#;
#[derive(Parser)]
#[command(name = "Neon safekeeper", version = GIT_VERSION, about = ABOUT, long_about = None)]
struct Args {
/// Path to the data2 directory.
datafrom: Utf8PathBuf,
/// Path to the data directory.
datato: Utf8PathBuf,
#[arg(long, default_value = "false", action=ArgAction::Set)]
dryrun: bool,
}
struct TimelineDirInfo {
ttid: TenantTimelineId,
timeline_dir: Utf8PathBuf,
control_file: SafeKeeperState,
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
// We want to allow multiple occurences of the same arg (taking the last) so
// that neon_local could generate command with defaults + overrides without
// getting 'argument cannot be used multiple times' error. This seems to be
// impossible with pure Derive API, so convert struct to Command, modify it,
// parse arguments, and then fill the struct back.
let cmd = <Args as clap::CommandFactory>::command().args_override_self(true);
let mut matches = cmd.get_matches();
let mut args = <Args as clap::FromArgMatches>::from_arg_matches_mut(&mut matches)?;
logging::init(
LogFormat::from_config("plain")?,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stdout,
)?;
let all_timelines = read_all_timelines(&args.datafrom).await?;
let wal_seg_size = WAL_SEGMENT_SIZE;
for tli in all_timelines {
assert!(tli.control_file.local_start_lsn == tli.control_file.timeline_start_lsn);
info!("Found timeline {}, start_lsn={}, commit_lsn={}", tli.ttid, tli.control_file.local_start_lsn, tli.control_file.commit_lsn);
let new_tli_dir = args.datato.join(tli.ttid.tenant_id.to_string()).join(tli.ttid.timeline_id.to_string());
// check existence
if !new_tli_dir.exists() {
info!("Timeline {} does not exist in the target directory {}", tli.ttid, new_tli_dir);
if args.dryrun {
continue;
}
copy_directory(&tli, &new_tli_dir).await?;
continue;
}
let new_tli = read_timeline(tli.ttid.clone(), new_tli_dir.as_path().as_std_path()).await?;
if new_tli.control_file.local_start_lsn == tli.control_file.timeline_start_lsn {
info!("Timeline {} is already fixed in the target directory {}", tli.ttid, new_tli_dir);
continue;
}
let segnum = new_tli.control_file.local_start_lsn.segment_number(wal_seg_size);
let valid_segnames = wal_file_paths(&tli.timeline_dir, segnum, wal_seg_size)?;
let new_segnames = wal_file_paths(&new_tli.timeline_dir, segnum, wal_seg_size)?;
info!(
"Timeline {} has local_start_lsn={}, timeline_start_lsn={}, commit_lsn={} //// can be fixed with bytes from {} up to commit_lsn={}",
new_tli.ttid,
new_tli.control_file.local_start_lsn,
new_tli.control_file.timeline_start_lsn,
new_tli.control_file.commit_lsn,
valid_segnames.0,
tli.control_file.commit_lsn,
);
assert!(new_tli.control_file.timeline_start_lsn == tli.control_file.timeline_start_lsn);
let new_segname = if new_segnames.0.exists() {
new_segnames.0
} else if new_segnames.1.exists() {
new_segnames.1
} else {
info!("Segment {} was already deleted, nothing to backfill", new_segnames.0);
continue;
};
let valid_segname = if valid_segnames.0.exists() {
valid_segnames.0
} else if valid_segnames.1.exists() {
valid_segnames.1
} else {
panic!("Cannot find valid segment for timeline {}, this file doesn't exist {}", tli.ttid, valid_segnames.0);
};
let mut copy_start_lsn = XLogSegNoOffsetToRecPtr(segnum, 0, wal_seg_size);
if tli.control_file.timeline_start_lsn.0 > copy_start_lsn {
copy_start_lsn = tli.control_file.timeline_start_lsn.0;
}
let copy_start_lsn = Lsn(copy_start_lsn);
let copy_end_lsn = new_tli.control_file.local_start_lsn;
assert!(copy_end_lsn >= copy_start_lsn);
if args.dryrun {
continue;
}
let pg_version = tli.control_file.server.pg_version / 10000;
// find the flush_lsn from data2
let flush_lsn = dispatch_pgversion!(
pg_version,
pgv::xlog_utils::find_end_of_wal(
tli.timeline_dir.as_path().as_std_path(),
wal_seg_size,
tli.control_file.commit_lsn,
)?,
bail!("unsupported postgres version: {}", pg_version)
);
info!("ACTION: Copying bytes {} - {} (commit_lsn={}, flush_lsn={}) from {} to {}", copy_start_lsn, copy_end_lsn, tli.control_file.commit_lsn, flush_lsn, valid_segname, new_segname);
assert!(copy_end_lsn <= flush_lsn);
assert!(new_tli.control_file.commit_lsn >= new_tli.control_file.local_start_lsn);
if flush_lsn > copy_end_lsn {
// check intersection from two segments
let valid_bytes = read_slice(&valid_segname, copy_end_lsn, flush_lsn, wal_seg_size)?;
let new_bytes = read_slice(&new_segname, copy_end_lsn, flush_lsn, wal_seg_size)?;
info!("Checking bytes intersection, from {} up to {}", copy_end_lsn, flush_lsn);
assert!(valid_bytes == new_bytes);
}
if copy_end_lsn > tli.control_file.commit_lsn {
info!("Missing some committed bytes");
}
let valid_slice = read_slice(&valid_segname, copy_start_lsn, copy_end_lsn, wal_seg_size)?;
info!("Read {} bytes, going to write", valid_slice.len());
assert!(valid_slice.len() == (copy_end_lsn.0 - copy_start_lsn.0) as usize);
write_slice(&new_segname, copy_start_lsn, wal_seg_size, &valid_slice)?;
info!("Success, timeline {} WAL is fixed", new_tli.ttid);
}
Ok(())
}
async fn read_all_timelines(dir: &Utf8Path) -> Result<Vec<TimelineDirInfo>> {
info!("Reading all timelines from {:?}", dir);
let mut timelines = Vec::new();
for tenant_entry in fs::read_dir(dir).with_context(|| format!("Failed to read {:?}", dir))? {
let tenant_entry = tenant_entry.with_context(|| format!("Failed to read {:?}", dir))?;
let path = tenant_entry.path();
if !path.is_dir() {
info!("Skipping non-directory {:?}", path);
continue;
}
let dirname = path.file_name().unwrap().to_str().unwrap();
let tenant_id = TenantId::from_str(dirname);
if tenant_id.is_err() {
info!("Skipping non-tenant directory {:?}", path);
continue;
}
let tenant_id = tenant_id.unwrap();
for timeline_entry in fs::read_dir(&path).with_context(|| format!("Failed to read {:?}", path))?
{
let timeline_entry =
timeline_entry.with_context(|| format!("Failed to read {:?}", path))?;
let path = timeline_entry.path();
if !path.is_dir() {
info!("Skipping non-directory {:?}", path);
continue;
}
let dirname = path.file_name().unwrap().to_str().unwrap();
let timeline_id = TimelineId::from_str(dirname);
if timeline_id.is_err() {
info!("Skipping non-timeline directory {:?}", path);
continue;
}
let timeline_id = timeline_id.unwrap();
let ttid = TenantTimelineId::new(tenant_id, timeline_id);
let tliinfo = read_timeline(ttid, &path).await?;
timelines.push(tliinfo);
}
}
Ok(timelines)
}
async fn read_timeline(ttid: TenantTimelineId, dir: &Path) -> Result<TimelineDirInfo> {
let control_file_path = dir.join(CONTROL_FILE_NAME);
let control_file = FileStorage::load_control_file(control_file_path)?;
Ok(TimelineDirInfo {
ttid,
timeline_dir: Utf8PathBuf::from_path_buf(dir.to_path_buf()).expect("valid utf8"),
control_file,
})
}
async fn copy_directory(tli: &TimelineDirInfo, new_tli_dir: &Utf8Path) -> Result<()> {
info!("ACTION: Copying timeline {} to {}", tli.ttid, new_tli_dir);
// TODO:
Ok(())
}
fn read_slice(path: &Utf8Path, start: Lsn, end: Lsn, wal_seg_size: usize) -> Result<Vec<u8>> {
assert!(end >= start);
let start = start.segment_offset(wal_seg_size);
let end = end.segment_offset(wal_seg_size);
assert!(end >= start);
let mut buf = Vec::new();
let mut file = File::open(path)?;
file.seek(std::io::SeekFrom::Start(start as u64))?;
file.take((end - start) as u64).read_to_end(&mut buf)?;
Ok(buf)
}
fn write_slice(path: &Utf8Path, start: Lsn, wal_seg_size: usize, buf: &[u8]) -> Result<()> {
let start = start.segment_offset(wal_seg_size);
let mut file = OpenOptions::new().write(true).open(path)?;
file.seek(std::io::SeekFrom::Start(start as u64))?;
file.write_all(buf)?;
Ok(())
}

View File

@@ -115,7 +115,6 @@ impl FileStorage {
pub fn load_control_file<P: AsRef<Path>>(control_file_path: P) -> Result<SafeKeeperState> {
let mut control_file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&control_file_path)
.with_context(|| {
format!(

View File

@@ -725,7 +725,7 @@ async fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> {
}
/// Helper returning full path to WAL segment file and its .partial brother.
fn wal_file_paths(
pub fn wal_file_paths(
timeline_dir: &Utf8Path,
segno: XLogSegNo,
wal_seg_size: usize,