Check bytes in segment intersection

This commit is contained in:
Arthur Petukhovsky
2023-12-28 17:55:04 +00:00
parent b162b4a9cf
commit 0ba55719b0

View File

@@ -4,7 +4,8 @@ use clap::{ArgAction, Parser};
use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use postgres_ffi::WAL_SEGMENT_SIZE;
use postgres_ffi::{WAL_SEGMENT_SIZE, dispatch_pgversion};
use postgres_ffi::v16::xlog_utils::{XLogSegNoOffsetToRecPtr, find_end_of_wal};
use remote_storage::RemoteStorageConfig;
use safekeeper::control_file::FileStorage;
use safekeeper::safekeeper::SafeKeeperState;
@@ -15,9 +16,10 @@ use tokio::signal::unix::{signal, SignalKind};
use tokio::task::JoinError;
use toml_edit::Document;
use utils::id::{TenantId, TimelineId, TenantTimelineId};
use utils::lsn::Lsn;
use std::fs::{self, File};
use std::io::{ErrorKind, Write};
use std::io::{ErrorKind, Write, Read, Seek};
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
@@ -155,11 +157,50 @@ async fn main() -> anyhow::Result<()> {
panic!("Cannot find valid segment for timeline {}, this file doesn't exist {}", tli.ttid, valid_segnames.0);
};
let mut copy_start_lsn = XLogSegNoOffsetToRecPtr(segnum, 0, wal_seg_size);
if tli.control_file.timeline_start_lsn.0 > copy_start_lsn {
copy_start_lsn = tli.control_file.timeline_start_lsn.0;
}
let copy_start_lsn = Lsn(copy_start_lsn);
let copy_end_lsn = new_tli.control_file.local_start_lsn;
assert!(copy_end_lsn > copy_start_lsn);
if args.dryrun {
continue;
}
info!("ACTION: Copying bytes from {} to {}", valid_segname, new_segname);
let pg_version = tli.control_file.server.pg_version / 10000;
// find the flush_lsn from data2
let flush_lsn = dispatch_pgversion!(
pg_version,
pgv::xlog_utils::find_end_of_wal(
tli.timeline_dir.as_path().as_std_path(),
wal_seg_size,
tli.control_file.commit_lsn,
)?,
bail!("unsupported postgres version: {}", pg_version)
);
info!("ACTION: Copying bytes {} - {} (commit_lsn={}, flush_lsn={}) from {} to {}", copy_start_lsn, copy_end_lsn, tli.control_file.commit_lsn, flush_lsn, valid_segname, new_segname);
assert!(copy_end_lsn <= flush_lsn);
if flush_lsn > copy_end_lsn {
// check intersection from two segments
let valid_bytes = read_slice(&valid_segname, copy_end_lsn, flush_lsn, wal_seg_size)?;
let new_bytes = read_slice(&new_segname, copy_end_lsn, flush_lsn, wal_seg_size)?;
info!("Checking bytes intersection, from {} up to {}", copy_end_lsn, flush_lsn);
assert!(valid_bytes == new_bytes);
}
if copy_end_lsn > tli.control_file.commit_lsn {
info!("Missing some committed bytes");
continue;
}
// TODO:
}
Ok(())
@@ -224,3 +265,16 @@ async fn copy_directory(tli: &TimelineDirInfo, new_tli_dir: &Utf8Path) -> Result
// TODO:
Ok(())
}
fn read_slice(path: &Utf8Path, start: Lsn, end: Lsn, wal_seg_size: usize) -> Result<Vec<u8>> {
assert!(end >= start);
let start = start.segment_offset(wal_seg_size);
let end = end.segment_offset(wal_seg_size);
assert!(end >= start);
let mut buf = Vec::new();
let mut file = File::open(path)?;
file.seek(std::io::SeekFrom::Start(start as u64))?;
file.take((end - start) as u64).read_to_end(&mut buf)?;
Ok(buf)
}