From 68491147f51277af9596fcbe32690ce8f0d431c9 Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 23 Jun 2025 15:30:02 +0100 Subject: [PATCH] libs: introduce SegmentSize type Fixes: https://github.com/neondatabase/neon/issues/612 --- libs/postgres_ffi/benches/waldecoder.rs | 15 ++++--- libs/postgres_ffi/src/lib.rs | 4 +- libs/postgres_ffi/src/wal_generator.rs | 12 +++--- libs/postgres_ffi/src/waldecoder_handler.rs | 8 ++-- libs/postgres_ffi/src/walrecord.rs | 2 +- libs/postgres_ffi/src/xlog_utils.rs | 30 ++++++------- libs/postgres_ffi/wal_craft/src/lib.rs | 2 +- .../wal_craft/src/xlog_utils_test.rs | 4 +- libs/utils/src/lsn.rs | 15 ++++--- .../benches/bench_interpret_wal.rs | 4 +- pageserver/src/basebackup.rs | 4 +- pageserver/src/import_datadir.rs | 4 +- pageserver/src/walingest.rs | 6 +-- safekeeper/src/copy_timeline.rs | 6 +-- safekeeper/src/http/routes.rs | 2 +- safekeeper/src/metrics.rs | 2 +- safekeeper/src/pull_timeline.rs | 4 +- safekeeper/src/safekeeper.rs | 2 +- safekeeper/src/state.rs | 2 +- safekeeper/src/timeline.rs | 8 ++-- safekeeper/src/timeline_eviction.rs | 11 ++--- safekeeper/src/timeline_manager.rs | 4 +- safekeeper/src/wal_backup.rs | 24 +++++------ safekeeper/src/wal_backup_partial.rs | 4 +- safekeeper/src/wal_storage.rs | 43 +++++++++++-------- .../src/scan_safekeeper_metadata.rs | 4 +- 26 files changed, 120 insertions(+), 106 deletions(-) diff --git a/libs/postgres_ffi/benches/waldecoder.rs b/libs/postgres_ffi/benches/waldecoder.rs index b2a884c7db..583524376b 100644 --- a/libs/postgres_ffi/benches/waldecoder.rs +++ b/libs/postgres_ffi/benches/waldecoder.rs @@ -6,7 +6,7 @@ use postgres_ffi::v17::waldecoder_handler::WalStreamDecoderHandler; use postgres_ffi::waldecoder::WalStreamDecoder; use postgres_versioninfo::PgMajorVersion; use pprof::criterion::{Output, PProfProfiler}; -use utils::lsn::Lsn; +use utils::lsn::{Lsn, SegmentSize}; const KB: usize = 1024; @@ -22,23 +22,26 @@ criterion_main!(benches); fn bench_complete_record(c: &mut Criterion) { let mut g = c.benchmark_group("complete_record"); for size in [64, KB, 8 * KB, 128 * KB] { + let value_size = size as SegmentSize; // Kind of weird to change the group throughput per benchmark, but it's the only way // to vary it per benchmark. It works. - g.throughput(criterion::Throughput::Bytes(size as u64)); - g.bench_function(format!("size={size}"), |b| run_bench(b, size).unwrap()); + g.throughput(criterion::Throughput::Bytes(value_size as u64)); + g.bench_function(format!("size={size}"), |b| { + run_bench(b, value_size).unwrap() + }); } - fn run_bench(b: &mut Bencher, size: usize) -> anyhow::Result<()> { + fn run_bench(b: &mut Bencher, size: SegmentSize) -> anyhow::Result<()> { const PREFIX: &CStr = c""; let value_size = LogicalMessageGenerator::make_value_size(size, PREFIX); - let value = vec![1; value_size]; + let value = vec![1; value_size as usize]; let mut decoder = WalStreamDecoder::new(Lsn(0), PgMajorVersion::PG17); let msg = LogicalMessageGenerator::new(PREFIX, &value) .next() .unwrap() .encode(Lsn(0)); - assert_eq!(msg.len(), size); + assert_eq!(msg.len(), size as usize); b.iter(|| { let msg = msg.clone(); // Bytes::clone() is cheap diff --git a/libs/postgres_ffi/src/lib.rs b/libs/postgres_ffi/src/lib.rs index 9297ac46c9..ac14aeeba1 100644 --- a/libs/postgres_ffi/src/lib.rs +++ b/libs/postgres_ffi/src/lib.rs @@ -12,7 +12,7 @@ use bytes::Bytes; use utils::bin_ser::SerializeError; -use utils::lsn::Lsn; +use utils::lsn::{Lsn, SegmentSize}; pub use postgres_versioninfo::PgMajorVersion; @@ -241,7 +241,7 @@ pub use v14::xlog_utils::{ pub const BLCKSZ: u16 = 8192; pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32); pub const XLOG_BLCKSZ: usize = 8192; -pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024; +pub const WAL_SEGMENT_SIZE: SegmentSize = 16 * 1024 * 1024; pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16; diff --git a/libs/postgres_ffi/src/wal_generator.rs b/libs/postgres_ffi/src/wal_generator.rs index a72b035e17..ae6d30aff3 100644 --- a/libs/postgres_ffi/src/wal_generator.rs +++ b/libs/postgres_ffi/src/wal_generator.rs @@ -2,7 +2,7 @@ use std::ffi::{CStr, CString}; use bytes::{Bytes, BytesMut}; use crc32c::crc32c_append; -use utils::lsn::Lsn; +use utils::lsn::{Lsn, SegmentSize}; use super::bindings::{RmgrId, XLogLongPageHeaderData, XLogPageHeaderData, XLOG_PAGE_MAGIC}; use super::xlog_utils::{ @@ -39,7 +39,7 @@ impl Record { // Construct the WAL record header. let mut header = XLogRecord { - xl_tot_len: (XLOG_SIZE_OF_XLOG_RECORD + data_header.len() + self.data.len()) as u32, + xl_tot_len: XLOG_SIZE_OF_XLOG_RECORD + data_header.len() as SegmentSize + self.data.len() as SegmentSize, xl_xid: 0, xl_prev: prev_lsn.into(), xl_info: self.info, @@ -158,7 +158,7 @@ impl WalGenerator { XLogLongPageHeaderData { std: page_header, xlp_sysid: Self::SYS_ID, - xlp_seg_size: WAL_SEGMENT_SIZE as u32, + xlp_seg_size: WAL_SEGMENT_SIZE, xlp_xlog_blcksz: XLOG_BLCKSZ as u32, } .encode() @@ -234,10 +234,10 @@ impl LogicalMessageGenerator { /// Computes how large a value must be to get a record of the given size. Convenience method to /// construct records of pre-determined size. Panics if the record size is too small. - pub fn make_value_size(record_size: usize, prefix: &CStr) -> usize { + pub fn make_value_size(record_size: SegmentSize, prefix: &CStr) -> SegmentSize { let xlog_header_size = XLOG_SIZE_OF_XLOG_RECORD; - let lm_header_size = size_of::(); - let prefix_size = prefix.to_bytes_with_nul().len(); + let lm_header_size = size_of::() as SegmentSize; + let prefix_size = prefix.to_bytes_with_nul().len() as SegmentSize; let data_header_size = match record_size - xlog_header_size - 2 { 0..=255 => 2, 256..=258 => panic!("impossible record_size {record_size}"), diff --git a/libs/postgres_ffi/src/waldecoder_handler.rs b/libs/postgres_ffi/src/waldecoder_handler.rs index 9cd40645ec..45bca02014 100644 --- a/libs/postgres_ffi/src/waldecoder_handler.rs +++ b/libs/postgres_ffi/src/waldecoder_handler.rs @@ -108,7 +108,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder { if self.lsn.segment_offset(WAL_SEGMENT_SIZE) == 0 { // parse long header - if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD { + if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD as usize{ return Ok(None); } @@ -123,7 +123,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder { self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64; } else if self.lsn.block_offset() == 0 { - if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD { + if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD as usize{ return Ok(None); } @@ -153,7 +153,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder { // peek xl_tot_len at the beginning of the record. // FIXME: assumes little-endian let xl_tot_len = (&self.inputbuf[0..4]).get_u32_le(); - if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD { + if xl_tot_len < XLOG_SIZE_OF_XLOG_RECORD { return Err(WalDecodeError { msg: format!("invalid xl_tot_len {xl_tot_len}"), lsn: self.lsn, @@ -216,7 +216,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder { fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError> { // We now have a record in the 'recordbuf' local variable. let xlogrec = - XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]).map_err(|e| { + XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD as usize]).map_err(|e| { WalDecodeError { msg: format!("xlog record deserialization failed {e}"), lsn: self.lsn, diff --git a/libs/postgres_ffi/src/walrecord.rs b/libs/postgres_ffi/src/walrecord.rs index d593123dc0..4db240ff80 100644 --- a/libs/postgres_ffi/src/walrecord.rs +++ b/libs/postgres_ffi/src/walrecord.rs @@ -266,7 +266,7 @@ pub fn decode_wal_record( xlogrec.xl_info ); - let remaining: usize = xlogrec.xl_tot_len as usize - XLOG_SIZE_OF_XLOG_RECORD; + let remaining: usize = (xlogrec.xl_tot_len - XLOG_SIZE_OF_XLOG_RECORD) as usize; if buf.remaining() != remaining { //TODO error diff --git a/libs/postgres_ffi/src/xlog_utils.rs b/libs/postgres_ffi/src/xlog_utils.rs index f7b6296053..31e85da9b1 100644 --- a/libs/postgres_ffi/src/xlog_utils.rs +++ b/libs/postgres_ffi/src/xlog_utils.rs @@ -35,7 +35,7 @@ use std::time::SystemTime; use utils::bin_ser::DeserializeError; use utils::bin_ser::SerializeError; -use utils::lsn::Lsn; +use utils::lsn::{Lsn, SegmentSize}; pub const XLOG_FNAME_LEN: usize = 24; pub const XLP_BKP_REMOVABLE: u16 = 0x0004; @@ -43,9 +43,9 @@ pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001; pub const XLP_REM_LEN_OFFS: usize = 2 + 2 + 4 + 8; pub const XLOG_RECORD_CRC_OFFS: usize = 4 + 4 + 8 + 1 + 1 + 2; -pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = size_of::(); -pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = size_of::(); -pub const XLOG_SIZE_OF_XLOG_RECORD: usize = size_of::(); +pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: SegmentSize = size_of::() as SegmentSize; +pub const XLOG_SIZE_OF_XLOG_LONG_PHD: SegmentSize = size_of::() as SegmentSize; +pub const XLOG_SIZE_OF_XLOG_RECORD: SegmentSize = size_of::() as SegmentSize; #[allow(clippy::identity_op)] pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2; @@ -58,19 +58,19 @@ pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2; /// in order to let CLOG_TRUNCATE mechanism correctly extend CLOG. const XID_CHECKPOINT_INTERVAL: u32 = 1024; -pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo { +pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: SegmentSize) -> XLogSegNo { (0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo } pub fn XLogSegNoOffsetToRecPtr( segno: XLogSegNo, offset: u32, - wal_segsz_bytes: usize, + wal_segsz_bytes: SegmentSize, ) -> XLogRecPtr { segno * (wal_segsz_bytes as u64) + (offset as u64) } -pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String { +pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: SegmentSize) -> String { format!( "{:>08X}{:>08X}{:>08X}", tli, @@ -81,7 +81,7 @@ pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize pub fn XLogFromFileName( fname: &OsStr, - wal_seg_size: usize, + wal_seg_size: SegmentSize, ) -> anyhow::Result<(XLogSegNo, TimeLineID)> { if let Some(fname_str) = fname.to_str() { let tli = u32::from_str_radix(&fname_str[0..8], 16)?; @@ -111,7 +111,7 @@ pub fn IsPartialXLogFileName(fname: &OsStr) -> bool { /// If LSN points to the beginning of the page, then shift it to first record, /// otherwise align on 8-bytes boundary (required for WAL records) -pub fn normalize_lsn(lsn: Lsn, seg_sz: usize) -> Lsn { +pub fn normalize_lsn(lsn: Lsn, seg_sz: SegmentSize) -> Lsn { if lsn.0 % XLOG_BLCKSZ as u64 == 0 { let hdr_size = if lsn.0 % seg_sz as u64 == 0 { XLOG_SIZE_OF_XLOG_LONG_PHD @@ -227,7 +227,7 @@ pub use timestamp_conversions::{to_pg_timestamp, try_from_pg_timestamp}; // back. pub fn find_end_of_wal( data_dir: &Path, - wal_seg_size: usize, + wal_seg_size: SegmentSize, start_lsn: Lsn, // start reading WAL at this point; must point at record start_lsn. ) -> anyhow::Result { let mut result = start_lsn; @@ -431,14 +431,14 @@ impl CheckPoint { /// page of the segment and the page that contains the given LSN. /// We need this segment to start compute node. pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result { - let mut seg_buf = BytesMut::with_capacity(WAL_SEGMENT_SIZE); + let mut seg_buf = BytesMut::with_capacity(WAL_SEGMENT_SIZE as usize); let pageaddr = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE); let page_off = lsn.block_offset(); let seg_off = lsn.segment_offset(WAL_SEGMENT_SIZE); - let first_page_only = seg_off < XLOG_BLCKSZ; + let first_page_only = seg_off < XLOG_BLCKSZ as SegmentSize; // If first records starts in the middle of the page, pretend in page header // there is a fake record which ends where first real record starts. This // makes pg_waldump etc happy. @@ -460,12 +460,12 @@ pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result Result(test_name: &str) { continue; } let mut f = File::options().write(true).open(file.path()).unwrap(); - static ZEROS: [u8; WAL_SEGMENT_SIZE] = [0u8; WAL_SEGMENT_SIZE]; + static ZEROS: [u8; WAL_SEGMENT_SIZE as usize] = [0u8; WAL_SEGMENT_SIZE as usize]; f.write_all( &ZEROS[0..min( - WAL_SEGMENT_SIZE, + WAL_SEGMENT_SIZE as usize, (u64::from(*start_lsn) - seg_start_lsn) as usize, )], ) diff --git a/libs/utils/src/lsn.rs b/libs/utils/src/lsn.rs index 31e1dda23d..3d4237e2de 100644 --- a/libs/utils/src/lsn.rs +++ b/libs/utils/src/lsn.rs @@ -17,6 +17,9 @@ pub const XLOG_BLCKSZ: u32 = 8192; #[derive(Clone, Copy, Default, Eq, Ord, PartialEq, PartialOrd, Hash)] pub struct Lsn(pub u64); +/// Size of a Postgres WAL segment. These are always small enough to fit in a u32. +pub type SegmentSize = u32; + impl Serialize for Lsn { fn serialize(&self, serializer: S) -> Result where @@ -163,19 +166,19 @@ impl Lsn { /// Compute the offset into a segment #[inline] - pub fn segment_offset(self, seg_sz: usize) -> usize { - (self.0 % seg_sz as u64) as usize + pub fn segment_offset(self, seg_sz: SegmentSize) -> SegmentSize { + (self.0 % seg_sz as u64) as SegmentSize } /// Compute LSN of the segment start. #[inline] - pub fn segment_lsn(self, seg_sz: usize) -> Lsn { + pub fn segment_lsn(self, seg_sz: SegmentSize) -> Lsn { Lsn(self.0 - (self.0 % seg_sz as u64)) } /// Compute the segment number #[inline] - pub fn segment_number(self, seg_sz: usize) -> u64 { + pub fn segment_number(self, seg_sz: SegmentSize) -> u64 { self.0 / seg_sz as u64 } @@ -196,7 +199,7 @@ impl Lsn { /// Compute the block offset of the first byte of this Lsn within this /// segment #[inline] - pub fn page_offset_in_segment(self, seg_sz: usize) -> u64 { + pub fn page_offset_in_segment(self, seg_sz: SegmentSize) -> u64 { (self.0 - self.block_offset()) - self.segment_lsn(seg_sz).0 } @@ -463,7 +466,7 @@ mod tests { assert_eq!(Lsn(u64::MAX).widening_sub(0u64), i128::from(u64::MAX)); assert_eq!(Lsn(0).widening_sub(u64::MAX), -i128::from(u64::MAX)); - let seg_sz: usize = 16 * 1024 * 1024; + let seg_sz: SegmentSize = 16 * 1024 * 1024; assert_eq!(Lsn(0x1000007).segment_offset(seg_sz), 7); assert_eq!(Lsn(0x1000007).segment_number(seg_sz), 1u64); diff --git a/libs/wal_decoder/benches/bench_interpret_wal.rs b/libs/wal_decoder/benches/bench_interpret_wal.rs index e3956eca05..657fcf7282 100644 --- a/libs/wal_decoder/benches/bench_interpret_wal.rs +++ b/libs/wal_decoder/benches/bench_interpret_wal.rs @@ -226,9 +226,9 @@ fn decode_interpret_main(bench: &BenchmarkData, shards: &[ShardIdentity]) { fn decode_interpret(bench: &BenchmarkData, shard: &[ShardIdentity]) -> anyhow::Result<()> { let mut decoder = WalStreamDecoder::new(bench.meta.start_lsn, bench.meta.pg_version); - let xlogoff: usize = bench.meta.start_lsn.segment_offset(WAL_SEGMENT_SIZE); + let xlogoff = bench.meta.start_lsn.segment_offset(WAL_SEGMENT_SIZE); - for chunk in bench.wal[xlogoff..].chunks(MAX_SEND_SIZE) { + for chunk in bench.wal[xlogoff as usize..].chunks(MAX_SEND_SIZE) { decoder.feed_bytes(chunk); while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() { assert!(lsn.is_aligned()); diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 115f0d9ebc..3a4a761ac7 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -29,7 +29,7 @@ use tokio::io; use tokio::io::AsyncWrite; use tokio_tar::{Builder, EntryType, Header}; use tracing::*; -use utils::lsn::Lsn; +use utils::lsn::{Lsn, SegmentSize}; use crate::context::RequestContext; use crate::pgdatadir_mapping::Version; @@ -773,7 +773,7 @@ where self.lsn, ) .map_err(|e| anyhow!(e).context("Failed generating wal segment"))?; - if wal_seg.len() != WAL_SEGMENT_SIZE { + if SegmentSize::try_from(wal_seg.len()) != Ok(WAL_SEGMENT_SIZE) { return Err(BasebackupError::Server(anyhow!( "wal_seg.len() != WAL_SEGMENT_SIZE, wal_seg.len()={}", wal_seg.len() diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 96fe0c1078..2b14f7fa4f 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -301,7 +301,7 @@ async fn import_wal( use std::io::Read; let nread = file.read_to_end(&mut buf)?; - if nread != WAL_SEGMENT_SIZE - offset { + if nread != WAL_SEGMENT_SIZE as usize - offset as usize { // Maybe allow this for .partial files? error!("read only {} bytes from WAL file", nread); } @@ -455,7 +455,7 @@ pub async fn import_wal_from_tar( } }; - waldecoder.feed_bytes(&bytes[offset..]); + waldecoder.feed_bytes(&bytes[offset as usize..]); let mut modification = tline.begin_modification(last_lsn); while last_lsn <= end_lsn { diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index a597aedee3..57bb6a95c5 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -2383,17 +2383,17 @@ mod tests { let started_at = std::time::Instant::now(); // Initialize walingest - let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE); + let xlogoff = startpoint.segment_offset(WAL_SEGMENT_SIZE); let mut decoder = WalStreamDecoder::new(startpoint, pg_version); let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx) .await .unwrap(); let mut modification = tline.begin_modification(startpoint); - println!("decoding {} bytes", bytes.len() - xlogoff); + println!("decoding {} bytes", bytes.len() - xlogoff as usize); // Decode and ingest wal. We process the wal in chunks because // that's what happens when we get bytes from safekeepers. - for chunk in bytes[xlogoff..].chunks(50) { + for chunk in bytes[xlogoff as usize..].chunks(50) { decoder.feed_bytes(chunk); while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() { let interpreted = InterpretedWalRecord::from_bytes_filtered( diff --git a/safekeeper/src/copy_timeline.rs b/safekeeper/src/copy_timeline.rs index 7984c2e2b9..d922671984 100644 --- a/safekeeper/src/copy_timeline.rs +++ b/safekeeper/src/copy_timeline.rs @@ -9,7 +9,7 @@ use tokio::fs::OpenOptions; use tokio::io::{AsyncSeekExt, AsyncWriteExt}; use tracing::{info, warn}; use utils::id::TenantTimelineId; -use utils::lsn::Lsn; +use utils::lsn::{Lsn, SegmentSize}; use crate::GlobalTimelines; use crate::control_file::FileStorage; @@ -100,7 +100,7 @@ pub async fn handle_request( } } - let wal_seg_size = state.server.wal_seg_size as usize; + let wal_seg_size = state.server.wal_seg_size; if wal_seg_size == 0 { bail!("wal_seg_size is not set"); } @@ -171,7 +171,7 @@ pub async fn handle_request( async fn copy_disk_segments( tli: &WalResidentTimeline, - wal_seg_size: usize, + wal_seg_size: SegmentSize, start_lsn: Lsn, end_lsn: Lsn, tli_dir_path: &Utf8PathBuf, diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 384c582678..efa3d9beec 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -103,7 +103,7 @@ async fn timeline_create_handler(mut request: Request) -> Result TimelinePersistentState { let mut state = TimelinePersistentState::empty(); - state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32; + state.server.wal_seg_size = WAL_SEGMENT_SIZE; state.tenant_id = TenantId::from([1u8; 16]); state.timeline_id = TimelineId::from([1u8; 16]); state diff --git a/safekeeper/src/state.rs b/safekeeper/src/state.rs index b6cf73be2e..c37bb4cb12 100644 --- a/safekeeper/src/state.rs +++ b/safekeeper/src/state.rs @@ -152,7 +152,7 @@ impl TimelinePersistentState { ServerInfo { pg_version: PgVersionId::from(PgMajorVersion::PG17), system_id: 0, /* Postgres system identifier */ - wal_seg_size: WAL_SEGMENT_SIZE as u32, + wal_seg_size: WAL_SEGMENT_SIZE, }, Lsn::INVALID, Lsn::INVALID, diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 2bee41537f..8f84bf2a03 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -23,7 +23,7 @@ use tokio::time::Instant; use tokio_util::sync::CancellationToken; use tracing::*; use utils::id::{NodeId, TenantId, TenantTimelineId}; -use utils::lsn::Lsn; +use utils::lsn::{Lsn, SegmentSize}; use utils::sync::gate::Gate; use crate::metrics::{FullTimelineInfo, MISC_OPERATION_SECONDS, WalStorageMetrics}; @@ -338,8 +338,8 @@ impl SharedState { Ok(Self::new(sk)) } - pub(crate) fn get_wal_seg_size(&self) -> usize { - self.sk.state().server.wal_seg_size as usize + pub(crate) fn get_wal_seg_size(&self) -> SegmentSize { + self.sk.state().server.wal_seg_size } fn get_safekeeper_info( @@ -747,7 +747,7 @@ impl Timeline { } /// Returns wal_seg_size. - pub async fn get_wal_seg_size(&self) -> usize { + pub async fn get_wal_seg_size(&self) -> SegmentSize { self.read_shared_state().await.get_wal_seg_size() } diff --git a/safekeeper/src/timeline_eviction.rs b/safekeeper/src/timeline_eviction.rs index 47b65a579a..c049dd231b 100644 --- a/safekeeper/src/timeline_eviction.rs +++ b/safekeeper/src/timeline_eviction.rs @@ -11,6 +11,7 @@ use tokio::fs::File; use tokio::io::{AsyncRead, AsyncWriteExt}; use tracing::{debug, info, instrument, warn}; use utils::crashsafe::durable_rename; +use utils::lsn::SegmentSize; use crate::metrics::{ EVICTION_EVENTS_COMPLETED, EVICTION_EVENTS_STARTED, EvictionEvent, NUM_EVICTED_TIMELINES, @@ -276,12 +277,12 @@ async fn compare_local_segment_with_remote( async fn do_validation( mgr: &Manager, file: &mut File, - wal_seg_size: usize, + wal_seg_size: SegmentSize, partial: &PartialRemoteSegment, storage: &GenericRemoteStorage, ) -> anyhow::Result<()> { - let local_size = file.metadata().await?.len() as usize; - if local_size != wal_seg_size { + let local_size = file.metadata().await?.len(); + if SegmentSize::try_from(local_size) != Ok(wal_seg_size) { anyhow::bail!( "local segment size is invalid: found {}, expected {}", local_size, @@ -296,12 +297,12 @@ async fn do_validation( // remote segment should have bytes excatly up to `flush_lsn` let expected_remote_size = partial.flush_lsn.segment_offset(mgr.wal_seg_size); // let's compare the first `expected_remote_size` bytes - compare_n_bytes(&mut remote_reader, file, expected_remote_size).await?; + compare_n_bytes(&mut remote_reader, file, expected_remote_size as usize).await?; // and check that the remote segment ends here check_end(&mut remote_reader).await?; // if local segment is longer, the rest should be zeroes - read_n_zeroes(file, mgr.wal_seg_size - expected_remote_size).await?; + read_n_zeroes(file, (mgr.wal_seg_size - expected_remote_size) as usize).await?; // and check that the local segment ends here check_end(file).await?; diff --git a/safekeeper/src/timeline_manager.rs b/safekeeper/src/timeline_manager.rs index a68752bfdd..16301ec315 100644 --- a/safekeeper/src/timeline_manager.rs +++ b/safekeeper/src/timeline_manager.rs @@ -20,7 +20,7 @@ use tokio::task::{JoinError, JoinHandle}; use tokio::time::Instant; use tokio_util::sync::CancellationToken; use tracing::{Instrument, debug, info, info_span, instrument, warn}; -use utils::lsn::Lsn; +use utils::lsn::{Lsn, SegmentSize}; use crate::SafeKeeperConf; use crate::control_file::{FileStorage, Storage}; @@ -198,7 +198,7 @@ pub(crate) struct Manager { // configuration & dependencies pub(crate) tli: ManagerTimeline, pub(crate) conf: SafeKeeperConf, - pub(crate) wal_seg_size: usize, + pub(crate) wal_seg_size: SegmentSize, pub(crate) walsenders: Arc, pub(crate) wal_backup: Arc, diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 0beb272a60..c40c9c1c39 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -23,7 +23,7 @@ use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tracing::*; use utils::id::{NodeId, TenantTimelineId}; -use utils::lsn::Lsn; +use utils::lsn::{Lsn, SegmentSize}; use utils::{backoff, pausable_failpoint}; use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS, WAL_BACKUP_TASKS}; @@ -52,7 +52,7 @@ impl WalBackupTaskHandle { /// Do we have anything to upload to S3, i.e. should safekeepers run backup activity? pub(crate) fn is_wal_backup_required( - wal_seg_size: usize, + wal_seg_size: SegmentSize, num_computes: usize, state: &StateSnapshot, ) -> bool { @@ -210,7 +210,7 @@ impl WalBackup { struct WalBackupTask { timeline: WalResidentTimeline, timeline_dir: Utf8PathBuf, - wal_seg_size: usize, + wal_seg_size: SegmentSize, parallel_jobs: usize, commit_lsn_watch_rx: watch::Receiver, storage: Arc, @@ -338,7 +338,7 @@ async fn backup_lsn_range( storage: Arc, backup_lsn: &mut Lsn, end_lsn: Lsn, - wal_seg_size: usize, + wal_seg_size: SegmentSize, timeline_dir: &Utf8Path, parallel_jobs: usize, ) -> Result<()> { @@ -461,12 +461,12 @@ impl Segment { remote_timeline_path.join(self.object_name()) } - pub fn size(self) -> usize { - (u64::from(self.end_lsn) - u64::from(self.start_lsn)) as usize + pub fn size(self) -> SegmentSize { + (u64::from(self.end_lsn) - u64::from(self.start_lsn)) as SegmentSize } } -fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec { +fn get_segments(start: Lsn, end: Lsn, seg_size: SegmentSize) -> Vec { let first_seg = start.segment_number(seg_size); let last_seg = end.segment_number(seg_size); @@ -484,7 +484,7 @@ async fn backup_object( storage: &GenericRemoteStorage, source_file: &Utf8Path, target_file: &RemotePath, - size: usize, + size: SegmentSize, ) -> Result<()> { let file = File::open(&source_file) .await @@ -495,7 +495,7 @@ async fn backup_object( let cancel = CancellationToken::new(); storage - .upload_storage_object(file, size, target_file, &cancel) + .upload_storage_object(file, size as usize, target_file, &cancel) .await } @@ -503,7 +503,7 @@ pub(crate) async fn backup_partial_segment( storage: &GenericRemoteStorage, source_file: &Utf8Path, target_file: &RemotePath, - size: usize, + size: SegmentSize, ) -> Result<()> { let file = File::open(&source_file) .await @@ -519,7 +519,7 @@ pub(crate) async fn backup_partial_segment( storage .upload( file, - size, + size as usize, target_file, Some(StorageMetadata::from([("sk_type", "partial_segment")])), &cancel, @@ -647,7 +647,7 @@ pub async fn delete_objects(storage: &GenericRemoteStorage, paths: &[RemotePath] /// Copy segments from one timeline to another. Used in copy_timeline. pub async fn copy_s3_segments( storage: &GenericRemoteStorage, - wal_seg_size: usize, + wal_seg_size: SegmentSize, src_ttid: &TenantTimelineId, dst_ttid: &TenantTimelineId, from_segment: XLogSegNo, diff --git a/safekeeper/src/wal_backup_partial.rs b/safekeeper/src/wal_backup_partial.rs index cdf68262dd..d7ec7a5c9a 100644 --- a/safekeeper/src/wal_backup_partial.rs +++ b/safekeeper/src/wal_backup_partial.rs @@ -29,7 +29,7 @@ use serde::{Deserialize, Serialize}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, instrument, warn}; use utils::id::NodeId; -use utils::lsn::Lsn; +use utils::lsn::{Lsn, SegmentSize}; use crate::SafeKeeperConf; use crate::metrics::{ @@ -151,7 +151,7 @@ impl State { } pub struct PartialBackup { - wal_seg_size: usize, + wal_seg_size: SegmentSize, tli: WalResidentTimeline, conf: SafeKeeperConf, local_prefix: Utf8PathBuf, diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index da00df2dd7..6b85e7fbed 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -28,7 +28,7 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; use tracing::*; use utils::crashsafe::durable_rename; use utils::id::TenantTimelineId; -use utils::lsn::Lsn; +use utils::lsn::{Lsn, SegmentSize}; use crate::metrics::{ REMOVED_WAL_SEGMENTS, WAL_STORAGE_OPERATION_SECONDS, WalStorageMetrics, time_io_closure, @@ -92,7 +92,7 @@ pub struct PhysicalStorage { no_sync: bool, /// Size of WAL segment in bytes. - wal_seg_size: usize, + wal_seg_size: SegmentSize, pg_version: PgVersionId, system_id: u64, @@ -170,7 +170,7 @@ impl PhysicalStorage { state: &TimelinePersistentState, no_sync: bool, ) -> Result { - let wal_seg_size = state.server.wal_seg_size as usize; + let wal_seg_size = state.server.wal_seg_size; // Find out where stored WAL ends, starting at commit_lsn which is a // known recent record boundary (unless we don't have WAL at all). @@ -315,7 +315,12 @@ impl PhysicalStorage { /// Write WAL bytes, which are known to be located in a single WAL segment. Returns true if the /// segment was completed, closed, and flushed to disk. - async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result { + async fn write_in_segment( + &mut self, + segno: u64, + xlogoff: SegmentSize, + buf: &[u8], + ) -> Result { let mut file = if let Some(file) = self.file.take() { file } else { @@ -331,7 +336,7 @@ impl PhysicalStorage { // syscall, but needed in case of async). It does *not* fsyncs the file. file.flush().await?; - if xlogoff + buf.len() == self.wal_seg_size { + if xlogoff as usize + buf.len() == self.wal_seg_size as usize { // If we reached the end of a WAL segment, flush and close it. self.fdatasync_file(&file).await?; @@ -372,8 +377,8 @@ impl PhysicalStorage { let segno = self.write_lsn.segment_number(self.wal_seg_size); // If crossing a WAL boundary, only write up until we reach wal segment size. - let bytes_write = if xlogoff + buf.len() > self.wal_seg_size { - self.wal_seg_size - xlogoff + let bytes_write = if xlogoff as usize + buf.len() > self.wal_seg_size as usize { + (self.wal_seg_size - xlogoff) as usize } else { buf.len() }; @@ -604,7 +609,7 @@ impl Storage for PhysicalStorage { /// Remove all WAL segments in timeline_dir that match the given predicate. async fn remove_segments_from_disk( timeline_dir: &Utf8Path, - wal_seg_size: usize, + wal_seg_size: SegmentSize, remove_predicate: impl Fn(XLogSegNo) -> bool, ) -> Result<()> { let _timer = WAL_STORAGE_OPERATION_SECONDS @@ -645,7 +650,7 @@ async fn remove_segments_from_disk( pub struct WalReader { remote_path: RemotePath, timeline_dir: Utf8PathBuf, - wal_seg_size: usize, + wal_seg_size: SegmentSize, pos: Lsn, wal_segment: Option>>, @@ -683,7 +688,7 @@ impl WalReader { if start_pos < state .timeline_start_lsn - .segment_lsn(state.server.wal_seg_size as usize) + .segment_lsn(state.server.wal_seg_size) { bail!( "Requested streaming from {}, which is before the start of the timeline {}, and also doesn't start at the first segment of that timeline", @@ -695,7 +700,7 @@ impl WalReader { Ok(Self { remote_path: remote_timeline_path(ttid)?, timeline_dir, - wal_seg_size: state.server.wal_seg_size as usize, + wal_seg_size: state.server.wal_seg_size, pos: start_pos, wal_segment: None, wal_backup, @@ -743,12 +748,14 @@ impl WalReader { // How many bytes may we consume in total? let tl_start_seg_offset = self.timeline_start_lsn.segment_offset(self.wal_seg_size); - debug_assert!(seg_bytes.len() > pos_seg_offset); - debug_assert!(seg_bytes.len() > tl_start_seg_offset); + debug_assert!(seg_bytes.len() > pos_seg_offset as usize); + debug_assert!(seg_bytes.len() > tl_start_seg_offset as usize); // Copy as many bytes as possible into the buffer - let len = (tl_start_seg_offset - pos_seg_offset).min(buf.len()); - buf[0..len].copy_from_slice(&seg_bytes[pos_seg_offset..pos_seg_offset + len]); + let len = ((tl_start_seg_offset - pos_seg_offset) as usize).min(buf.len()); + buf[0..len].copy_from_slice( + &seg_bytes[pos_seg_offset as usize..pos_seg_offset as usize + len], + ); self.pos += len as u64; @@ -770,7 +777,7 @@ impl WalReader { // How much to read and send in message? We cannot cross the WAL file // boundary, and we don't want send more than provided buffer. let xlogoff = self.pos.segment_offset(self.wal_seg_size); - let send_size = min(buf.len(), self.wal_seg_size - xlogoff); + let send_size = min(buf.len(), (self.wal_seg_size - xlogoff) as usize); // Read some data from the file. let buf = &mut buf[0..send_size]; @@ -831,7 +838,7 @@ impl WalReader { pub(crate) async fn open_wal_file( timeline_dir: &Utf8Path, segno: XLogSegNo, - wal_seg_size: usize, + wal_seg_size: SegmentSize, ) -> Result<(tokio::fs::File, bool)> { let (wal_file_path, wal_file_partial_path) = wal_file_paths(timeline_dir, segno, wal_seg_size); @@ -858,7 +865,7 @@ pub(crate) async fn open_wal_file( pub fn wal_file_paths( timeline_dir: &Utf8Path, segno: XLogSegNo, - wal_seg_size: usize, + wal_seg_size: SegmentSize, ) -> (Utf8PathBuf, Utf8PathBuf) { let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size); let wal_file_path = timeline_dir.join(wal_file_name.clone()); diff --git a/storage_scrubber/src/scan_safekeeper_metadata.rs b/storage_scrubber/src/scan_safekeeper_metadata.rs index cf0a3d19e9..c2d0e2f8b3 100644 --- a/storage_scrubber/src/scan_safekeeper_metadata.rs +++ b/storage_scrubber/src/scan_safekeeper_metadata.rs @@ -13,7 +13,7 @@ use serde::Serialize; use tokio_postgres::types::PgLsn; use tracing::{debug, error, info}; use utils::id::{TenantId, TenantTimelineId, TimelineId}; -use utils::lsn::Lsn; +use utils::lsn::{Lsn, SegmentSize}; use crate::cloud_admin_api::CloudAdminApiClient; use crate::metadata_stream::stream_listing; @@ -22,7 +22,7 @@ use crate::{ }; /// Generally we should ask safekeepers, but so far we use everywhere default 16MB. -const WAL_SEGSIZE: usize = 16 * 1024 * 1024; +const WAL_SEGSIZE: SegmentSize = 16 * 1024 * 1024; #[derive(Serialize)] pub struct MetadataSummary {