Properly initialize first WAL segment on safekeepers.

Previously its segment header and page header of first record weren't
initialized because compute streams data only since first record LSN. Also, fix
a bug in the existing code for initialization: xlp_rem_len must not include page
header.

These changes make first segment pg_waldump'able.
This commit is contained in:
Arseny Sher
2024-05-01 18:22:34 +03:00
committed by Arseny Sher
parent 5da3e2113a
commit ce4d3da3ae
4 changed files with 70 additions and 13 deletions

View File

@@ -331,7 +331,10 @@ impl CheckPoint {
/// Returns 'true' if the XID was updated.
pub fn update_next_xid(&mut self, xid: u32) -> bool {
// nextXid should be greater than any XID in WAL, so increment provided XID and check for wraparround.
let mut new_xid = std::cmp::max(xid.wrapping_add(1), pg_constants::FIRST_NORMAL_TRANSACTION_ID);
let mut new_xid = std::cmp::max(
xid.wrapping_add(1),
pg_constants::FIRST_NORMAL_TRANSACTION_ID,
);
// To reduce number of metadata checkpoints, we forward align XID on XID_CHECKPOINT_INTERVAL.
// XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE
new_xid =
@@ -367,8 +370,16 @@ pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result<Byte
let seg_off = lsn.segment_offset(WAL_SEGMENT_SIZE);
let first_page_only = seg_off < XLOG_BLCKSZ;
let (shdr_rem_len, infoflags) = if first_page_only {
(seg_off, pg_constants::XLP_FIRST_IS_CONTRECORD)
// If first records starts in the middle of the page, pretend in page header
// there is a fake record which ends where first real record starts. This
// makes pg_waldump etc happy.
let (shdr_rem_len, infoflags) = if first_page_only && seg_off > 0 {
assert!(seg_off >= XLOG_SIZE_OF_XLOG_LONG_PHD);
// xlp_rem_len doesn't include page header, hence the subtraction.
(
seg_off - XLOG_SIZE_OF_XLOG_LONG_PHD,
pg_constants::XLP_FIRST_IS_CONTRECORD,
)
} else {
(0, 0)
};
@@ -397,20 +408,22 @@ pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result<Byte
if !first_page_only {
let block_offset = lsn.page_offset_in_segment(WAL_SEGMENT_SIZE) as usize;
// see comments above about XLP_FIRST_IS_CONTRECORD and xlp_rem_len.
let (xlp_rem_len, xlp_info) = if page_off > 0 {
assert!(page_off >= XLOG_SIZE_OF_XLOG_SHORT_PHD as u64);
(
(page_off - XLOG_SIZE_OF_XLOG_SHORT_PHD as u64) as u32,
pg_constants::XLP_FIRST_IS_CONTRECORD,
)
} else {
(0, 0)
};
let header = XLogPageHeaderData {
xlp_magic: XLOG_PAGE_MAGIC as u16,
xlp_info: if page_off >= pg_constants::SIZE_OF_PAGE_HEADER as u64 {
pg_constants::XLP_FIRST_IS_CONTRECORD
} else {
0
},
xlp_info,
xlp_tli: PG_TLI,
xlp_pageaddr: lsn.page_lsn().0,
xlp_rem_len: if page_off >= pg_constants::SIZE_OF_PAGE_HEADER as u64 {
page_off as u32
} else {
0u32
},
xlp_rem_len,
..Default::default() // Put 0 in padding fields.
};
let hdr_bytes = header.encode()?;

View File

@@ -725,6 +725,18 @@ where
self.state.inmem.commit_lsn
);
// Before first WAL write initialize its segment. It makes first segment
// pg_waldump'able because stream from compute doesn't include its
// segment and page headers.
//
// If we fail before first WAL write flush this action would be
// repeated, that's ok because it is idempotent.
if self.wal_store.flush_lsn() == Lsn::INVALID {
self.wal_store
.initialize_first_segment(msg.start_streaming_at)
.await?;
}
// TODO: cross check divergence point, check if msg.start_streaming_at corresponds to
// intersection of our history and history from msg
@@ -1007,6 +1019,10 @@ mod tests {
self.lsn
}
async fn initialize_first_segment(&mut self, _init_lsn: Lsn) -> Result<()> {
Ok(())
}
async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
self.lsn = startpos + buf.len() as u64;
Ok(())

View File

@@ -38,6 +38,12 @@ pub trait Storage {
/// LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn;
/// Initialize segment by creating proper long header at the beginning of
/// the segment and short header at the page of given LSN. This is only used
/// for timeline initialization because compute will stream data only since
/// init_lsn. Other segment headers are included in compute stream.
async fn initialize_first_segment(&mut self, init_lsn: Lsn) -> Result<()>;
/// Write piece of WAL from buf to disk, but not necessarily sync it.
async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>;
@@ -78,6 +84,8 @@ pub struct PhysicalStorage {
/// Size of WAL segment in bytes.
wal_seg_size: usize,
pg_version: u32,
system_id: u64,
/// Written to disk, but possibly still in the cache and not fully persisted.
/// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
@@ -169,6 +177,8 @@ impl PhysicalStorage {
timeline_dir,
conf: conf.clone(),
wal_seg_size,
pg_version: state.server.pg_version,
system_id: state.server.system_id,
write_lsn,
write_record_lsn: write_lsn,
flush_record_lsn: flush_lsn,
@@ -324,6 +334,20 @@ impl Storage for PhysicalStorage {
self.flush_record_lsn
}
async fn initialize_first_segment(&mut self, init_lsn: Lsn) -> Result<()> {
let segno = init_lsn.segment_number(self.wal_seg_size);
let (mut file, _) = self.open_or_create(segno).await?;
let major_pg_version = self.pg_version / 10000;
let wal_seg =
postgres_ffi::generate_wal_segment(segno, self.system_id, major_pg_version, init_lsn)?;
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&wal_seg).await?;
file.flush().await?;
info!("initialized segno {} at lsn {}", segno, init_lsn);
// note: file is *not* fsynced
Ok(())
}
/// Write WAL to disk.
async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
// Disallow any non-sequential writes, which can result in gaps or overwrites.

View File

@@ -182,6 +182,10 @@ impl wal_storage::Storage for DiskWALStorage {
self.flush_record_lsn
}
async fn initialize_first_segment(&mut self, _init_lsn: Lsn) -> Result<()> {
Ok(())
}
/// Write piece of WAL from buf to disk, but not necessarily sync it.
async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
if self.write_lsn != startpos {