From 0ba55719b0d7fdb3b97168cd6cba8bb8cd36cca4 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Thu, 28 Dec 2023 17:55:04 +0000 Subject: [PATCH] Check bytes in segment intersection --- safekeeper/src/bin/gluegun.rs | 60 +++++++++++++++++++++++++++++++++-- 1 file changed, 57 insertions(+), 3 deletions(-) diff --git a/safekeeper/src/bin/gluegun.rs b/safekeeper/src/bin/gluegun.rs index f0d04d4754..634a03ac61 100644 --- a/safekeeper/src/bin/gluegun.rs +++ b/safekeeper/src/bin/gluegun.rs @@ -4,7 +4,8 @@ use clap::{ArgAction, Parser}; use futures::future::BoxFuture; use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; -use postgres_ffi::WAL_SEGMENT_SIZE; +use postgres_ffi::{WAL_SEGMENT_SIZE, dispatch_pgversion}; +use postgres_ffi::v16::xlog_utils::{XLogSegNoOffsetToRecPtr, find_end_of_wal}; use remote_storage::RemoteStorageConfig; use safekeeper::control_file::FileStorage; use safekeeper::safekeeper::SafeKeeperState; @@ -15,9 +16,10 @@ use tokio::signal::unix::{signal, SignalKind}; use tokio::task::JoinError; use toml_edit::Document; use utils::id::{TenantId, TimelineId, TenantTimelineId}; +use utils::lsn::Lsn; use std::fs::{self, File}; -use std::io::{ErrorKind, Write}; +use std::io::{ErrorKind, Write, Read, Seek}; use std::path::Path; use std::str::FromStr; use std::sync::Arc; @@ -155,11 +157,50 @@ async fn main() -> anyhow::Result<()> { panic!("Cannot find valid segment for timeline {}, this file doesn't exist {}", tli.ttid, valid_segnames.0); }; + let mut copy_start_lsn = XLogSegNoOffsetToRecPtr(segnum, 0, wal_seg_size); + if tli.control_file.timeline_start_lsn.0 > copy_start_lsn { + copy_start_lsn = tli.control_file.timeline_start_lsn.0; + } + + let copy_start_lsn = Lsn(copy_start_lsn); + let copy_end_lsn = new_tli.control_file.local_start_lsn; + + assert!(copy_end_lsn > copy_start_lsn); + if args.dryrun { continue; } - info!("ACTION: Copying bytes from {} to {}", valid_segname, new_segname); + let pg_version = tli.control_file.server.pg_version / 10000; + // find the flush_lsn from data2 + let flush_lsn = dispatch_pgversion!( + pg_version, + pgv::xlog_utils::find_end_of_wal( + tli.timeline_dir.as_path().as_std_path(), + wal_seg_size, + tli.control_file.commit_lsn, + )?, + bail!("unsupported postgres version: {}", pg_version) + ); + + info!("ACTION: Copying bytes {} - {} (commit_lsn={}, flush_lsn={}) from {} to {}", copy_start_lsn, copy_end_lsn, tli.control_file.commit_lsn, flush_lsn, valid_segname, new_segname); + + assert!(copy_end_lsn <= flush_lsn); + + if flush_lsn > copy_end_lsn { + // check intersection from two segments + let valid_bytes = read_slice(&valid_segname, copy_end_lsn, flush_lsn, wal_seg_size)?; + let new_bytes = read_slice(&new_segname, copy_end_lsn, flush_lsn, wal_seg_size)?; + info!("Checking bytes intersection, from {} up to {}", copy_end_lsn, flush_lsn); + assert!(valid_bytes == new_bytes); + } + + if copy_end_lsn > tli.control_file.commit_lsn { + info!("Missing some committed bytes"); + continue; + } + + // TODO: } Ok(()) @@ -224,3 +265,16 @@ async fn copy_directory(tli: &TimelineDirInfo, new_tli_dir: &Utf8Path) -> Result // TODO: Ok(()) } + +fn read_slice(path: &Utf8Path, start: Lsn, end: Lsn, wal_seg_size: usize) -> Result> { + assert!(end >= start); + let start = start.segment_offset(wal_seg_size); + let end = end.segment_offset(wal_seg_size); + assert!(end >= start); + + let mut buf = Vec::new(); + let mut file = File::open(path)?; + file.seek(std::io::SeekFrom::Start(start as u64))?; + file.take((end - start) as u64).read_to_end(&mut buf)?; + Ok(buf) +}