libs: introduce SegmentSize type

Fixes: https://github.com/neondatabase/neon/issues/612
This commit is contained in:
John Spray
2025-06-23 15:30:02 +01:00
parent 7c4c36f5ac
commit 68491147f5
26 changed files with 120 additions and 106 deletions

View File

@@ -6,7 +6,7 @@ use postgres_ffi::v17::waldecoder_handler::WalStreamDecoderHandler;
use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_versioninfo::PgMajorVersion;
use pprof::criterion::{Output, PProfProfiler};
use utils::lsn::Lsn;
use utils::lsn::{Lsn, SegmentSize};
const KB: usize = 1024;
@@ -22,23 +22,26 @@ criterion_main!(benches);
fn bench_complete_record(c: &mut Criterion) {
let mut g = c.benchmark_group("complete_record");
for size in [64, KB, 8 * KB, 128 * KB] {
let value_size = size as SegmentSize;
// Kind of weird to change the group throughput per benchmark, but it's the only way
// to vary it per benchmark. It works.
g.throughput(criterion::Throughput::Bytes(size as u64));
g.bench_function(format!("size={size}"), |b| run_bench(b, size).unwrap());
g.throughput(criterion::Throughput::Bytes(value_size as u64));
g.bench_function(format!("size={size}"), |b| {
run_bench(b, value_size).unwrap()
});
}
fn run_bench(b: &mut Bencher, size: usize) -> anyhow::Result<()> {
fn run_bench(b: &mut Bencher, size: SegmentSize) -> anyhow::Result<()> {
const PREFIX: &CStr = c"";
let value_size = LogicalMessageGenerator::make_value_size(size, PREFIX);
let value = vec![1; value_size];
let value = vec![1; value_size as usize];
let mut decoder = WalStreamDecoder::new(Lsn(0), PgMajorVersion::PG17);
let msg = LogicalMessageGenerator::new(PREFIX, &value)
.next()
.unwrap()
.encode(Lsn(0));
assert_eq!(msg.len(), size);
assert_eq!(msg.len(), size as usize);
b.iter(|| {
let msg = msg.clone(); // Bytes::clone() is cheap

View File

@@ -12,7 +12,7 @@
use bytes::Bytes;
use utils::bin_ser::SerializeError;
use utils::lsn::Lsn;
use utils::lsn::{Lsn, SegmentSize};
pub use postgres_versioninfo::PgMajorVersion;
@@ -241,7 +241,7 @@ pub use v14::xlog_utils::{
pub const BLCKSZ: u16 = 8192;
pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32);
pub const XLOG_BLCKSZ: usize = 8192;
pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024;
pub const WAL_SEGMENT_SIZE: SegmentSize = 16 * 1024 * 1024;
pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16;

View File

@@ -2,7 +2,7 @@ use std::ffi::{CStr, CString};
use bytes::{Bytes, BytesMut};
use crc32c::crc32c_append;
use utils::lsn::Lsn;
use utils::lsn::{Lsn, SegmentSize};
use super::bindings::{RmgrId, XLogLongPageHeaderData, XLogPageHeaderData, XLOG_PAGE_MAGIC};
use super::xlog_utils::{
@@ -39,7 +39,7 @@ impl Record {
// Construct the WAL record header.
let mut header = XLogRecord {
xl_tot_len: (XLOG_SIZE_OF_XLOG_RECORD + data_header.len() + self.data.len()) as u32,
xl_tot_len: XLOG_SIZE_OF_XLOG_RECORD + data_header.len() as SegmentSize + self.data.len() as SegmentSize,
xl_xid: 0,
xl_prev: prev_lsn.into(),
xl_info: self.info,
@@ -158,7 +158,7 @@ impl<R: RecordGenerator> WalGenerator<R> {
XLogLongPageHeaderData {
std: page_header,
xlp_sysid: Self::SYS_ID,
xlp_seg_size: WAL_SEGMENT_SIZE as u32,
xlp_seg_size: WAL_SEGMENT_SIZE,
xlp_xlog_blcksz: XLOG_BLCKSZ as u32,
}
.encode()
@@ -234,10 +234,10 @@ impl LogicalMessageGenerator {
/// Computes how large a value must be to get a record of the given size. Convenience method to
/// construct records of pre-determined size. Panics if the record size is too small.
pub fn make_value_size(record_size: usize, prefix: &CStr) -> usize {
pub fn make_value_size(record_size: SegmentSize, prefix: &CStr) -> SegmentSize {
let xlog_header_size = XLOG_SIZE_OF_XLOG_RECORD;
let lm_header_size = size_of::<XlLogicalMessage>();
let prefix_size = prefix.to_bytes_with_nul().len();
let lm_header_size = size_of::<XlLogicalMessage>() as SegmentSize;
let prefix_size = prefix.to_bytes_with_nul().len() as SegmentSize;
let data_header_size = match record_size - xlog_header_size - 2 {
0..=255 => 2,
256..=258 => panic!("impossible record_size {record_size}"),

View File

@@ -108,7 +108,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder {
if self.lsn.segment_offset(WAL_SEGMENT_SIZE) == 0 {
// parse long header
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD {
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD as usize{
return Ok(None);
}
@@ -123,7 +123,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder {
self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
} else if self.lsn.block_offset() == 0 {
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD {
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD as usize{
return Ok(None);
}
@@ -153,7 +153,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder {
// peek xl_tot_len at the beginning of the record.
// FIXME: assumes little-endian
let xl_tot_len = (&self.inputbuf[0..4]).get_u32_le();
if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
if xl_tot_len < XLOG_SIZE_OF_XLOG_RECORD {
return Err(WalDecodeError {
msg: format!("invalid xl_tot_len {xl_tot_len}"),
lsn: self.lsn,
@@ -216,7 +216,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder {
fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError> {
// We now have a record in the 'recordbuf' local variable.
let xlogrec =
XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]).map_err(|e| {
XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD as usize]).map_err(|e| {
WalDecodeError {
msg: format!("xlog record deserialization failed {e}"),
lsn: self.lsn,

View File

@@ -266,7 +266,7 @@ pub fn decode_wal_record(
xlogrec.xl_info
);
let remaining: usize = xlogrec.xl_tot_len as usize - XLOG_SIZE_OF_XLOG_RECORD;
let remaining: usize = (xlogrec.xl_tot_len - XLOG_SIZE_OF_XLOG_RECORD) as usize;
if buf.remaining() != remaining {
//TODO error

View File

@@ -35,7 +35,7 @@ use std::time::SystemTime;
use utils::bin_ser::DeserializeError;
use utils::bin_ser::SerializeError;
use utils::lsn::Lsn;
use utils::lsn::{Lsn, SegmentSize};
pub const XLOG_FNAME_LEN: usize = 24;
pub const XLP_BKP_REMOVABLE: u16 = 0x0004;
@@ -43,9 +43,9 @@ pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001;
pub const XLP_REM_LEN_OFFS: usize = 2 + 2 + 4 + 8;
pub const XLOG_RECORD_CRC_OFFS: usize = 4 + 4 + 8 + 1 + 1 + 2;
pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = size_of::<XLogPageHeaderData>();
pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = size_of::<XLogLongPageHeaderData>();
pub const XLOG_SIZE_OF_XLOG_RECORD: usize = size_of::<XLogRecord>();
pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: SegmentSize = size_of::<XLogPageHeaderData>() as SegmentSize;
pub const XLOG_SIZE_OF_XLOG_LONG_PHD: SegmentSize = size_of::<XLogLongPageHeaderData>() as SegmentSize;
pub const XLOG_SIZE_OF_XLOG_RECORD: SegmentSize = size_of::<XLogRecord>() as SegmentSize;
#[allow(clippy::identity_op)]
pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
@@ -58,19 +58,19 @@ pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
/// in order to let CLOG_TRUNCATE mechanism correctly extend CLOG.
const XID_CHECKPOINT_INTERVAL: u32 = 1024;
pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: SegmentSize) -> XLogSegNo {
(0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo
}
pub fn XLogSegNoOffsetToRecPtr(
segno: XLogSegNo,
offset: u32,
wal_segsz_bytes: usize,
wal_segsz_bytes: SegmentSize,
) -> XLogRecPtr {
segno * (wal_segsz_bytes as u64) + (offset as u64)
}
pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String {
pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: SegmentSize) -> String {
format!(
"{:>08X}{:>08X}{:>08X}",
tli,
@@ -81,7 +81,7 @@ pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize
pub fn XLogFromFileName(
fname: &OsStr,
wal_seg_size: usize,
wal_seg_size: SegmentSize,
) -> anyhow::Result<(XLogSegNo, TimeLineID)> {
if let Some(fname_str) = fname.to_str() {
let tli = u32::from_str_radix(&fname_str[0..8], 16)?;
@@ -111,7 +111,7 @@ pub fn IsPartialXLogFileName(fname: &OsStr) -> bool {
/// If LSN points to the beginning of the page, then shift it to first record,
/// otherwise align on 8-bytes boundary (required for WAL records)
pub fn normalize_lsn(lsn: Lsn, seg_sz: usize) -> Lsn {
pub fn normalize_lsn(lsn: Lsn, seg_sz: SegmentSize) -> Lsn {
if lsn.0 % XLOG_BLCKSZ as u64 == 0 {
let hdr_size = if lsn.0 % seg_sz as u64 == 0 {
XLOG_SIZE_OF_XLOG_LONG_PHD
@@ -227,7 +227,7 @@ pub use timestamp_conversions::{to_pg_timestamp, try_from_pg_timestamp};
// back.
pub fn find_end_of_wal(
data_dir: &Path,
wal_seg_size: usize,
wal_seg_size: SegmentSize,
start_lsn: Lsn, // start reading WAL at this point; must point at record start_lsn.
) -> anyhow::Result<Lsn> {
let mut result = start_lsn;
@@ -431,14 +431,14 @@ impl CheckPoint {
/// page of the segment and the page that contains the given LSN.
/// We need this segment to start compute node.
pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result<Bytes, SerializeError> {
let mut seg_buf = BytesMut::with_capacity(WAL_SEGMENT_SIZE);
let mut seg_buf = BytesMut::with_capacity(WAL_SEGMENT_SIZE as usize);
let pageaddr = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
let page_off = lsn.block_offset();
let seg_off = lsn.segment_offset(WAL_SEGMENT_SIZE);
let first_page_only = seg_off < XLOG_BLCKSZ;
let first_page_only = seg_off < XLOG_BLCKSZ as SegmentSize;
// If first records starts in the middle of the page, pretend in page header
// there is a fake record which ends where first real record starts. This
// makes pg_waldump etc happy.
@@ -460,12 +460,12 @@ pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result<Byte
xlp_info: pg_constants::XLP_LONG_HEADER | infoflags,
xlp_tli: PG_TLI,
xlp_pageaddr: pageaddr,
xlp_rem_len: shdr_rem_len as u32,
xlp_rem_len: shdr_rem_len,
..Default::default() // Put 0 in padding fields.
}
},
xlp_sysid: system_id,
xlp_seg_size: WAL_SEGMENT_SIZE as u32,
xlp_seg_size: WAL_SEGMENT_SIZE,
xlp_xlog_blcksz: XLOG_BLCKSZ as u32,
};
@@ -473,7 +473,7 @@ pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result<Byte
seg_buf.extend_from_slice(&hdr_bytes);
//zero out the rest of the file
seg_buf.resize(WAL_SEGMENT_SIZE, 0);
seg_buf.resize(WAL_SEGMENT_SIZE as usize, 0);
if !first_page_only {
let block_offset = lsn.page_offset_in_segment(WAL_SEGMENT_SIZE) as usize;

View File

@@ -389,7 +389,7 @@ impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
let xlog_switch_record_end: PgLsn =
client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
if u64::from(xlog_switch_record_end) as usize % XLOG_BLCKSZ
if (u64::from(xlog_switch_record_end) % XLOG_BLCKSZ as u64) as u32
!= XLOG_SIZE_OF_XLOG_SHORT_PHD
{
warn!(

View File

@@ -81,10 +81,10 @@ fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
continue;
}
let mut f = File::options().write(true).open(file.path()).unwrap();
static ZEROS: [u8; WAL_SEGMENT_SIZE] = [0u8; WAL_SEGMENT_SIZE];
static ZEROS: [u8; WAL_SEGMENT_SIZE as usize] = [0u8; WAL_SEGMENT_SIZE as usize];
f.write_all(
&ZEROS[0..min(
WAL_SEGMENT_SIZE,
WAL_SEGMENT_SIZE as usize,
(u64::from(*start_lsn) - seg_start_lsn) as usize,
)],
)

View File

@@ -17,6 +17,9 @@ pub const XLOG_BLCKSZ: u32 = 8192;
#[derive(Clone, Copy, Default, Eq, Ord, PartialEq, PartialOrd, Hash)]
pub struct Lsn(pub u64);
/// Size of a Postgres WAL segment. These are always small enough to fit in a u32.
pub type SegmentSize = u32;
impl Serialize for Lsn {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
@@ -163,19 +166,19 @@ impl Lsn {
/// Compute the offset into a segment
#[inline]
pub fn segment_offset(self, seg_sz: usize) -> usize {
(self.0 % seg_sz as u64) as usize
pub fn segment_offset(self, seg_sz: SegmentSize) -> SegmentSize {
(self.0 % seg_sz as u64) as SegmentSize
}
/// Compute LSN of the segment start.
#[inline]
pub fn segment_lsn(self, seg_sz: usize) -> Lsn {
pub fn segment_lsn(self, seg_sz: SegmentSize) -> Lsn {
Lsn(self.0 - (self.0 % seg_sz as u64))
}
/// Compute the segment number
#[inline]
pub fn segment_number(self, seg_sz: usize) -> u64 {
pub fn segment_number(self, seg_sz: SegmentSize) -> u64 {
self.0 / seg_sz as u64
}
@@ -196,7 +199,7 @@ impl Lsn {
/// Compute the block offset of the first byte of this Lsn within this
/// segment
#[inline]
pub fn page_offset_in_segment(self, seg_sz: usize) -> u64 {
pub fn page_offset_in_segment(self, seg_sz: SegmentSize) -> u64 {
(self.0 - self.block_offset()) - self.segment_lsn(seg_sz).0
}
@@ -463,7 +466,7 @@ mod tests {
assert_eq!(Lsn(u64::MAX).widening_sub(0u64), i128::from(u64::MAX));
assert_eq!(Lsn(0).widening_sub(u64::MAX), -i128::from(u64::MAX));
let seg_sz: usize = 16 * 1024 * 1024;
let seg_sz: SegmentSize = 16 * 1024 * 1024;
assert_eq!(Lsn(0x1000007).segment_offset(seg_sz), 7);
assert_eq!(Lsn(0x1000007).segment_number(seg_sz), 1u64);

View File

@@ -226,9 +226,9 @@ fn decode_interpret_main(bench: &BenchmarkData, shards: &[ShardIdentity]) {
fn decode_interpret(bench: &BenchmarkData, shard: &[ShardIdentity]) -> anyhow::Result<()> {
let mut decoder = WalStreamDecoder::new(bench.meta.start_lsn, bench.meta.pg_version);
let xlogoff: usize = bench.meta.start_lsn.segment_offset(WAL_SEGMENT_SIZE);
let xlogoff = bench.meta.start_lsn.segment_offset(WAL_SEGMENT_SIZE);
for chunk in bench.wal[xlogoff..].chunks(MAX_SEND_SIZE) {
for chunk in bench.wal[xlogoff as usize..].chunks(MAX_SEND_SIZE) {
decoder.feed_bytes(chunk);
while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
assert!(lsn.is_aligned());

View File

@@ -29,7 +29,7 @@ use tokio::io;
use tokio::io::AsyncWrite;
use tokio_tar::{Builder, EntryType, Header};
use tracing::*;
use utils::lsn::Lsn;
use utils::lsn::{Lsn, SegmentSize};
use crate::context::RequestContext;
use crate::pgdatadir_mapping::Version;
@@ -773,7 +773,7 @@ where
self.lsn,
)
.map_err(|e| anyhow!(e).context("Failed generating wal segment"))?;
if wal_seg.len() != WAL_SEGMENT_SIZE {
if SegmentSize::try_from(wal_seg.len()) != Ok(WAL_SEGMENT_SIZE) {
return Err(BasebackupError::Server(anyhow!(
"wal_seg.len() != WAL_SEGMENT_SIZE, wal_seg.len()={}",
wal_seg.len()

View File

@@ -301,7 +301,7 @@ async fn import_wal(
use std::io::Read;
let nread = file.read_to_end(&mut buf)?;
if nread != WAL_SEGMENT_SIZE - offset {
if nread != WAL_SEGMENT_SIZE as usize - offset as usize {
// Maybe allow this for .partial files?
error!("read only {} bytes from WAL file", nread);
}
@@ -455,7 +455,7 @@ pub async fn import_wal_from_tar(
}
};
waldecoder.feed_bytes(&bytes[offset..]);
waldecoder.feed_bytes(&bytes[offset as usize..]);
let mut modification = tline.begin_modification(last_lsn);
while last_lsn <= end_lsn {

View File

@@ -2383,17 +2383,17 @@ mod tests {
let started_at = std::time::Instant::now();
// Initialize walingest
let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
let xlogoff = startpoint.segment_offset(WAL_SEGMENT_SIZE);
let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
.await
.unwrap();
let mut modification = tline.begin_modification(startpoint);
println!("decoding {} bytes", bytes.len() - xlogoff);
println!("decoding {} bytes", bytes.len() - xlogoff as usize);
// Decode and ingest wal. We process the wal in chunks because
// that's what happens when we get bytes from safekeepers.
for chunk in bytes[xlogoff..].chunks(50) {
for chunk in bytes[xlogoff as usize..].chunks(50) {
decoder.feed_bytes(chunk);
while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
let interpreted = InterpretedWalRecord::from_bytes_filtered(

View File

@@ -9,7 +9,7 @@ use tokio::fs::OpenOptions;
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
use tracing::{info, warn};
use utils::id::TenantTimelineId;
use utils::lsn::Lsn;
use utils::lsn::{Lsn, SegmentSize};
use crate::GlobalTimelines;
use crate::control_file::FileStorage;
@@ -100,7 +100,7 @@ pub async fn handle_request(
}
}
let wal_seg_size = state.server.wal_seg_size as usize;
let wal_seg_size = state.server.wal_seg_size;
if wal_seg_size == 0 {
bail!("wal_seg_size is not set");
}
@@ -171,7 +171,7 @@ pub async fn handle_request(
async fn copy_disk_segments(
tli: &WalResidentTimeline,
wal_seg_size: usize,
wal_seg_size: SegmentSize,
start_lsn: Lsn,
end_lsn: Lsn,
tli_dir_path: &Utf8PathBuf,

View File

@@ -103,7 +103,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
let server_info = ServerInfo {
pg_version: request_data.pg_version,
system_id: request_data.system_id.unwrap_or(0),
wal_seg_size: request_data.wal_seg_size.unwrap_or(WAL_SEGMENT_SIZE as u32),
wal_seg_size: request_data.wal_seg_size.unwrap_or(WAL_SEGMENT_SIZE),
};
let global_timelines = get_global_timelines(&request);
global_timelines

View File

@@ -831,7 +831,7 @@ impl Collector for TimelineCollector {
if tli.last_removed_segno != 0 {
let segno_count = tli
.flush_lsn
.segment_number(tli.persisted_state.server.wal_seg_size as usize)
.segment_number(tli.persisted_state.server.wal_seg_size)
- tli.last_removed_segno;
let disk_usage_bytes = segno_count * tli.persisted_state.server.wal_seg_size as u64;
self.disk_usage

View File

@@ -27,7 +27,7 @@ use tracing::{error, info, instrument};
use utils::crashsafe::fsync_async_opt;
use utils::id::{NodeId, TenantTimelineId};
use utils::logging::SecretString;
use utils::lsn::Lsn;
use utils::lsn::{Lsn, SegmentSize};
use utils::pausable_failpoint;
use crate::control_file::CONTROL_FILE_NAME;
@@ -100,7 +100,7 @@ pub struct SnapshotContext {
pub term: Term,
pub last_log_term: Term,
pub flush_lsn: Lsn,
pub wal_seg_size: usize,
pub wal_seg_size: SegmentSize,
// used to remove WAL hold off in Drop.
pub tli: WalResidentTimeline,
}

View File

@@ -1439,7 +1439,7 @@ mod tests {
fn test_sk_state() -> TimelinePersistentState {
let mut state = TimelinePersistentState::empty();
state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32;
state.server.wal_seg_size = WAL_SEGMENT_SIZE;
state.tenant_id = TenantId::from([1u8; 16]);
state.timeline_id = TimelineId::from([1u8; 16]);
state

View File

@@ -152,7 +152,7 @@ impl TimelinePersistentState {
ServerInfo {
pg_version: PgVersionId::from(PgMajorVersion::PG17),
system_id: 0, /* Postgres system identifier */
wal_seg_size: WAL_SEGMENT_SIZE as u32,
wal_seg_size: WAL_SEGMENT_SIZE,
},
Lsn::INVALID,
Lsn::INVALID,

View File

@@ -23,7 +23,7 @@ use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::id::{NodeId, TenantId, TenantTimelineId};
use utils::lsn::Lsn;
use utils::lsn::{Lsn, SegmentSize};
use utils::sync::gate::Gate;
use crate::metrics::{FullTimelineInfo, MISC_OPERATION_SECONDS, WalStorageMetrics};
@@ -338,8 +338,8 @@ impl SharedState {
Ok(Self::new(sk))
}
pub(crate) fn get_wal_seg_size(&self) -> usize {
self.sk.state().server.wal_seg_size as usize
pub(crate) fn get_wal_seg_size(&self) -> SegmentSize {
self.sk.state().server.wal_seg_size
}
fn get_safekeeper_info(
@@ -747,7 +747,7 @@ impl Timeline {
}
/// Returns wal_seg_size.
pub async fn get_wal_seg_size(&self) -> usize {
pub async fn get_wal_seg_size(&self) -> SegmentSize {
self.read_shared_state().await.get_wal_seg_size()
}

View File

@@ -11,6 +11,7 @@ use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncWriteExt};
use tracing::{debug, info, instrument, warn};
use utils::crashsafe::durable_rename;
use utils::lsn::SegmentSize;
use crate::metrics::{
EVICTION_EVENTS_COMPLETED, EVICTION_EVENTS_STARTED, EvictionEvent, NUM_EVICTED_TIMELINES,
@@ -276,12 +277,12 @@ async fn compare_local_segment_with_remote(
async fn do_validation(
mgr: &Manager,
file: &mut File,
wal_seg_size: usize,
wal_seg_size: SegmentSize,
partial: &PartialRemoteSegment,
storage: &GenericRemoteStorage,
) -> anyhow::Result<()> {
let local_size = file.metadata().await?.len() as usize;
if local_size != wal_seg_size {
let local_size = file.metadata().await?.len();
if SegmentSize::try_from(local_size) != Ok(wal_seg_size) {
anyhow::bail!(
"local segment size is invalid: found {}, expected {}",
local_size,
@@ -296,12 +297,12 @@ async fn do_validation(
// remote segment should have bytes excatly up to `flush_lsn`
let expected_remote_size = partial.flush_lsn.segment_offset(mgr.wal_seg_size);
// let's compare the first `expected_remote_size` bytes
compare_n_bytes(&mut remote_reader, file, expected_remote_size).await?;
compare_n_bytes(&mut remote_reader, file, expected_remote_size as usize).await?;
// and check that the remote segment ends here
check_end(&mut remote_reader).await?;
// if local segment is longer, the rest should be zeroes
read_n_zeroes(file, mgr.wal_seg_size - expected_remote_size).await?;
read_n_zeroes(file, (mgr.wal_seg_size - expected_remote_size) as usize).await?;
// and check that the local segment ends here
check_end(file).await?;

View File

@@ -20,7 +20,7 @@ use tokio::task::{JoinError, JoinHandle};
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, info, info_span, instrument, warn};
use utils::lsn::Lsn;
use utils::lsn::{Lsn, SegmentSize};
use crate::SafeKeeperConf;
use crate::control_file::{FileStorage, Storage};
@@ -198,7 +198,7 @@ pub(crate) struct Manager {
// configuration & dependencies
pub(crate) tli: ManagerTimeline,
pub(crate) conf: SafeKeeperConf,
pub(crate) wal_seg_size: usize,
pub(crate) wal_seg_size: SegmentSize,
pub(crate) walsenders: Arc<WalSenders>,
pub(crate) wal_backup: Arc<WalBackup>,

View File

@@ -23,7 +23,7 @@ use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::id::{NodeId, TenantTimelineId};
use utils::lsn::Lsn;
use utils::lsn::{Lsn, SegmentSize};
use utils::{backoff, pausable_failpoint};
use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS, WAL_BACKUP_TASKS};
@@ -52,7 +52,7 @@ impl WalBackupTaskHandle {
/// Do we have anything to upload to S3, i.e. should safekeepers run backup activity?
pub(crate) fn is_wal_backup_required(
wal_seg_size: usize,
wal_seg_size: SegmentSize,
num_computes: usize,
state: &StateSnapshot,
) -> bool {
@@ -210,7 +210,7 @@ impl WalBackup {
struct WalBackupTask {
timeline: WalResidentTimeline,
timeline_dir: Utf8PathBuf,
wal_seg_size: usize,
wal_seg_size: SegmentSize,
parallel_jobs: usize,
commit_lsn_watch_rx: watch::Receiver<Lsn>,
storage: Arc<GenericRemoteStorage>,
@@ -338,7 +338,7 @@ async fn backup_lsn_range(
storage: Arc<GenericRemoteStorage>,
backup_lsn: &mut Lsn,
end_lsn: Lsn,
wal_seg_size: usize,
wal_seg_size: SegmentSize,
timeline_dir: &Utf8Path,
parallel_jobs: usize,
) -> Result<()> {
@@ -461,12 +461,12 @@ impl Segment {
remote_timeline_path.join(self.object_name())
}
pub fn size(self) -> usize {
(u64::from(self.end_lsn) - u64::from(self.start_lsn)) as usize
pub fn size(self) -> SegmentSize {
(u64::from(self.end_lsn) - u64::from(self.start_lsn)) as SegmentSize
}
}
fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec<Segment> {
fn get_segments(start: Lsn, end: Lsn, seg_size: SegmentSize) -> Vec<Segment> {
let first_seg = start.segment_number(seg_size);
let last_seg = end.segment_number(seg_size);
@@ -484,7 +484,7 @@ async fn backup_object(
storage: &GenericRemoteStorage,
source_file: &Utf8Path,
target_file: &RemotePath,
size: usize,
size: SegmentSize,
) -> Result<()> {
let file = File::open(&source_file)
.await
@@ -495,7 +495,7 @@ async fn backup_object(
let cancel = CancellationToken::new();
storage
.upload_storage_object(file, size, target_file, &cancel)
.upload_storage_object(file, size as usize, target_file, &cancel)
.await
}
@@ -503,7 +503,7 @@ pub(crate) async fn backup_partial_segment(
storage: &GenericRemoteStorage,
source_file: &Utf8Path,
target_file: &RemotePath,
size: usize,
size: SegmentSize,
) -> Result<()> {
let file = File::open(&source_file)
.await
@@ -519,7 +519,7 @@ pub(crate) async fn backup_partial_segment(
storage
.upload(
file,
size,
size as usize,
target_file,
Some(StorageMetadata::from([("sk_type", "partial_segment")])),
&cancel,
@@ -647,7 +647,7 @@ pub async fn delete_objects(storage: &GenericRemoteStorage, paths: &[RemotePath]
/// Copy segments from one timeline to another. Used in copy_timeline.
pub async fn copy_s3_segments(
storage: &GenericRemoteStorage,
wal_seg_size: usize,
wal_seg_size: SegmentSize,
src_ttid: &TenantTimelineId,
dst_ttid: &TenantTimelineId,
from_segment: XLogSegNo,

View File

@@ -29,7 +29,7 @@ use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, warn};
use utils::id::NodeId;
use utils::lsn::Lsn;
use utils::lsn::{Lsn, SegmentSize};
use crate::SafeKeeperConf;
use crate::metrics::{
@@ -151,7 +151,7 @@ impl State {
}
pub struct PartialBackup {
wal_seg_size: usize,
wal_seg_size: SegmentSize,
tli: WalResidentTimeline,
conf: SafeKeeperConf,
local_prefix: Utf8PathBuf,

View File

@@ -28,7 +28,7 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use tracing::*;
use utils::crashsafe::durable_rename;
use utils::id::TenantTimelineId;
use utils::lsn::Lsn;
use utils::lsn::{Lsn, SegmentSize};
use crate::metrics::{
REMOVED_WAL_SEGMENTS, WAL_STORAGE_OPERATION_SECONDS, WalStorageMetrics, time_io_closure,
@@ -92,7 +92,7 @@ pub struct PhysicalStorage {
no_sync: bool,
/// Size of WAL segment in bytes.
wal_seg_size: usize,
wal_seg_size: SegmentSize,
pg_version: PgVersionId,
system_id: u64,
@@ -170,7 +170,7 @@ impl PhysicalStorage {
state: &TimelinePersistentState,
no_sync: bool,
) -> Result<PhysicalStorage> {
let wal_seg_size = state.server.wal_seg_size as usize;
let wal_seg_size = state.server.wal_seg_size;
// Find out where stored WAL ends, starting at commit_lsn which is a
// known recent record boundary (unless we don't have WAL at all).
@@ -315,7 +315,12 @@ impl PhysicalStorage {
/// Write WAL bytes, which are known to be located in a single WAL segment. Returns true if the
/// segment was completed, closed, and flushed to disk.
async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<bool> {
async fn write_in_segment(
&mut self,
segno: u64,
xlogoff: SegmentSize,
buf: &[u8],
) -> Result<bool> {
let mut file = if let Some(file) = self.file.take() {
file
} else {
@@ -331,7 +336,7 @@ impl PhysicalStorage {
// syscall, but needed in case of async). It does *not* fsyncs the file.
file.flush().await?;
if xlogoff + buf.len() == self.wal_seg_size {
if xlogoff as usize + buf.len() == self.wal_seg_size as usize {
// If we reached the end of a WAL segment, flush and close it.
self.fdatasync_file(&file).await?;
@@ -372,8 +377,8 @@ impl PhysicalStorage {
let segno = self.write_lsn.segment_number(self.wal_seg_size);
// If crossing a WAL boundary, only write up until we reach wal segment size.
let bytes_write = if xlogoff + buf.len() > self.wal_seg_size {
self.wal_seg_size - xlogoff
let bytes_write = if xlogoff as usize + buf.len() > self.wal_seg_size as usize {
(self.wal_seg_size - xlogoff) as usize
} else {
buf.len()
};
@@ -604,7 +609,7 @@ impl Storage for PhysicalStorage {
/// Remove all WAL segments in timeline_dir that match the given predicate.
async fn remove_segments_from_disk(
timeline_dir: &Utf8Path,
wal_seg_size: usize,
wal_seg_size: SegmentSize,
remove_predicate: impl Fn(XLogSegNo) -> bool,
) -> Result<()> {
let _timer = WAL_STORAGE_OPERATION_SECONDS
@@ -645,7 +650,7 @@ async fn remove_segments_from_disk(
pub struct WalReader {
remote_path: RemotePath,
timeline_dir: Utf8PathBuf,
wal_seg_size: usize,
wal_seg_size: SegmentSize,
pos: Lsn,
wal_segment: Option<Pin<Box<dyn AsyncRead + Send + Sync>>>,
@@ -683,7 +688,7 @@ impl WalReader {
if start_pos
< state
.timeline_start_lsn
.segment_lsn(state.server.wal_seg_size as usize)
.segment_lsn(state.server.wal_seg_size)
{
bail!(
"Requested streaming from {}, which is before the start of the timeline {}, and also doesn't start at the first segment of that timeline",
@@ -695,7 +700,7 @@ impl WalReader {
Ok(Self {
remote_path: remote_timeline_path(ttid)?,
timeline_dir,
wal_seg_size: state.server.wal_seg_size as usize,
wal_seg_size: state.server.wal_seg_size,
pos: start_pos,
wal_segment: None,
wal_backup,
@@ -743,12 +748,14 @@ impl WalReader {
// How many bytes may we consume in total?
let tl_start_seg_offset = self.timeline_start_lsn.segment_offset(self.wal_seg_size);
debug_assert!(seg_bytes.len() > pos_seg_offset);
debug_assert!(seg_bytes.len() > tl_start_seg_offset);
debug_assert!(seg_bytes.len() > pos_seg_offset as usize);
debug_assert!(seg_bytes.len() > tl_start_seg_offset as usize);
// Copy as many bytes as possible into the buffer
let len = (tl_start_seg_offset - pos_seg_offset).min(buf.len());
buf[0..len].copy_from_slice(&seg_bytes[pos_seg_offset..pos_seg_offset + len]);
let len = ((tl_start_seg_offset - pos_seg_offset) as usize).min(buf.len());
buf[0..len].copy_from_slice(
&seg_bytes[pos_seg_offset as usize..pos_seg_offset as usize + len],
);
self.pos += len as u64;
@@ -770,7 +777,7 @@ impl WalReader {
// How much to read and send in message? We cannot cross the WAL file
// boundary, and we don't want send more than provided buffer.
let xlogoff = self.pos.segment_offset(self.wal_seg_size);
let send_size = min(buf.len(), self.wal_seg_size - xlogoff);
let send_size = min(buf.len(), (self.wal_seg_size - xlogoff) as usize);
// Read some data from the file.
let buf = &mut buf[0..send_size];
@@ -831,7 +838,7 @@ impl WalReader {
pub(crate) async fn open_wal_file(
timeline_dir: &Utf8Path,
segno: XLogSegNo,
wal_seg_size: usize,
wal_seg_size: SegmentSize,
) -> Result<(tokio::fs::File, bool)> {
let (wal_file_path, wal_file_partial_path) = wal_file_paths(timeline_dir, segno, wal_seg_size);
@@ -858,7 +865,7 @@ pub(crate) async fn open_wal_file(
pub fn wal_file_paths(
timeline_dir: &Utf8Path,
segno: XLogSegNo,
wal_seg_size: usize,
wal_seg_size: SegmentSize,
) -> (Utf8PathBuf, Utf8PathBuf) {
let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
let wal_file_path = timeline_dir.join(wal_file_name.clone());

View File

@@ -13,7 +13,7 @@ use serde::Serialize;
use tokio_postgres::types::PgLsn;
use tracing::{debug, error, info};
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
use utils::lsn::{Lsn, SegmentSize};
use crate::cloud_admin_api::CloudAdminApiClient;
use crate::metadata_stream::stream_listing;
@@ -22,7 +22,7 @@ use crate::{
};
/// Generally we should ask safekeepers, but so far we use everywhere default 16MB.
const WAL_SEGSIZE: usize = 16 * 1024 * 1024;
const WAL_SEGSIZE: SegmentSize = 16 * 1024 * 1024;
#[derive(Serialize)]
pub struct MetadataSummary {