Files
neon/safekeeper/tests/walproposer_sim/walproposer_disk.rs
Arpad Müller bb2a3f9b02 Update Rust to 1.80.0 (#8518)
We keep the practice of keeping the compiler up to date, pointing to the
latest release. This is done by many other projects in the Rust ecosystem as well.

[Release notes](https://github.com/rust-lang/rust/blob/master/RELEASES.md#version-180-2024-07-25).

Prior update was in #8048
2024-07-26 11:17:33 +02:00

315 lines
9.8 KiB
Rust

use std::{ffi::CString, sync::Arc};
use byteorder::{LittleEndian, WriteBytesExt};
use crc32c::crc32c_append;
use parking_lot::{Mutex, MutexGuard};
use postgres_ffi::{
pg_constants::{
RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE, XLP_LONG_HEADER, XLR_BLOCK_ID_DATA_LONG,
XLR_BLOCK_ID_DATA_SHORT,
},
v16::{
wal_craft_test_export::{XLogLongPageHeaderData, XLogPageHeaderData, XLOG_PAGE_MAGIC},
xlog_utils::{
XLogSegNoOffsetToRecPtr, XlLogicalMessage, XLOG_RECORD_CRC_OFFS,
XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
XLP_FIRST_IS_CONTRECORD,
},
XLogRecord,
},
WAL_SEGMENT_SIZE, XLOG_BLCKSZ,
};
use utils::lsn::Lsn;
use super::block_storage::BlockStorage;
/// Simulation implementation of walproposer WAL storage.
pub struct DiskWalProposer {
state: Mutex<State>,
}
impl DiskWalProposer {
pub fn new() -> Arc<DiskWalProposer> {
Arc::new(DiskWalProposer {
state: Mutex::new(State {
internal_available_lsn: Lsn(0),
prev_lsn: Lsn(0),
disk: BlockStorage::new(),
}),
})
}
pub fn lock(&self) -> MutexGuard<State> {
self.state.lock()
}
}
pub struct State {
// flush_lsn
internal_available_lsn: Lsn,
// needed for WAL generation
prev_lsn: Lsn,
// actual WAL storage
disk: BlockStorage,
}
impl State {
pub fn read(&self, pos: u64, buf: &mut [u8]) {
self.disk.read(pos, buf);
// TODO: fail on reading uninitialized data
}
pub fn write(&mut self, pos: u64, buf: &[u8]) {
self.disk.write(pos, buf);
}
/// Update the internal available LSN to the given value.
pub fn reset_to(&mut self, lsn: Lsn) {
self.internal_available_lsn = lsn;
}
/// Get current LSN.
pub fn flush_rec_ptr(&self) -> Lsn {
self.internal_available_lsn
}
/// Generate a new WAL record at the current LSN.
pub fn insert_logical_message(&mut self, prefix: &str, msg: &[u8]) -> anyhow::Result<()> {
let prefix_cstr = CString::new(prefix)?;
let prefix_bytes = prefix_cstr.as_bytes_with_nul();
let lm = XlLogicalMessage {
db_id: 0,
transactional: 0,
prefix_size: prefix_bytes.len() as ::std::os::raw::c_ulong,
message_size: msg.len() as ::std::os::raw::c_ulong,
};
let record_bytes = lm.encode();
let rdatas: Vec<&[u8]> = vec![&record_bytes, prefix_bytes, msg];
insert_wal_record(self, rdatas, RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE)
}
}
fn insert_wal_record(
state: &mut State,
rdatas: Vec<&[u8]>,
rmid: u8,
info: u8,
) -> anyhow::Result<()> {
// bytes right after the header, in the same rdata block
let mut scratch = Vec::new();
let mainrdata_len: usize = rdatas.iter().map(|rdata| rdata.len()).sum();
if mainrdata_len > 0 {
if mainrdata_len > 255 {
scratch.push(XLR_BLOCK_ID_DATA_LONG);
// TODO: verify endiness
let _ = scratch.write_u32::<LittleEndian>(mainrdata_len as u32);
} else {
scratch.push(XLR_BLOCK_ID_DATA_SHORT);
scratch.push(mainrdata_len as u8);
}
}
let total_len: u32 = (XLOG_SIZE_OF_XLOG_RECORD + scratch.len() + mainrdata_len) as u32;
let size = maxalign(total_len);
assert!(size as usize > XLOG_SIZE_OF_XLOG_RECORD);
let start_bytepos = recptr_to_bytepos(state.internal_available_lsn);
let end_bytepos = start_bytepos + size as u64;
let start_recptr = bytepos_to_recptr(start_bytepos);
let end_recptr = bytepos_to_recptr(end_bytepos);
assert!(recptr_to_bytepos(start_recptr) == start_bytepos);
assert!(recptr_to_bytepos(end_recptr) == end_bytepos);
let mut crc = crc32c_append(0, &scratch);
for rdata in &rdatas {
crc = crc32c_append(crc, rdata);
}
let mut header = XLogRecord {
xl_tot_len: total_len,
xl_xid: 0,
xl_prev: state.prev_lsn.0,
xl_info: info,
xl_rmid: rmid,
__bindgen_padding_0: [0u8; 2usize],
xl_crc: crc,
};
// now we have the header and can finish the crc
let header_bytes = header.encode()?;
let crc = crc32c_append(crc, &header_bytes[0..XLOG_RECORD_CRC_OFFS]);
header.xl_crc = crc;
let mut header_bytes = header.encode()?.to_vec();
assert!(header_bytes.len() == XLOG_SIZE_OF_XLOG_RECORD);
header_bytes.extend_from_slice(&scratch);
// finish rdatas
let mut rdatas = rdatas;
rdatas.insert(0, &header_bytes);
write_walrecord_to_disk(state, total_len as u64, rdatas, start_recptr, end_recptr)?;
state.internal_available_lsn = end_recptr;
state.prev_lsn = start_recptr;
Ok(())
}
fn write_walrecord_to_disk(
state: &mut State,
total_len: u64,
rdatas: Vec<&[u8]>,
start: Lsn,
end: Lsn,
) -> anyhow::Result<()> {
let mut curr_ptr = start;
let mut freespace = insert_freespace(curr_ptr);
let mut written: usize = 0;
assert!(freespace >= size_of::<u32>());
for mut rdata in rdatas {
while rdata.len() >= freespace {
assert!(
curr_ptr.segment_offset(WAL_SEGMENT_SIZE) >= XLOG_SIZE_OF_XLOG_SHORT_PHD
|| freespace == 0
);
state.write(curr_ptr.0, &rdata[..freespace]);
rdata = &rdata[freespace..];
written += freespace;
curr_ptr = Lsn(curr_ptr.0 + freespace as u64);
let mut new_page = XLogPageHeaderData {
xlp_magic: XLOG_PAGE_MAGIC as u16,
xlp_info: XLP_BKP_REMOVABLE,
xlp_tli: 1,
xlp_pageaddr: curr_ptr.0,
xlp_rem_len: (total_len - written as u64) as u32,
..Default::default() // Put 0 in padding fields.
};
if new_page.xlp_rem_len > 0 {
new_page.xlp_info |= XLP_FIRST_IS_CONTRECORD;
}
if curr_ptr.segment_offset(WAL_SEGMENT_SIZE) == 0 {
new_page.xlp_info |= XLP_LONG_HEADER;
let long_page = XLogLongPageHeaderData {
std: new_page,
xlp_sysid: 0,
xlp_seg_size: WAL_SEGMENT_SIZE as u32,
xlp_xlog_blcksz: XLOG_BLCKSZ as u32,
};
let header_bytes = long_page.encode()?;
assert!(header_bytes.len() == XLOG_SIZE_OF_XLOG_LONG_PHD);
state.write(curr_ptr.0, &header_bytes);
curr_ptr = Lsn(curr_ptr.0 + header_bytes.len() as u64);
} else {
let header_bytes = new_page.encode()?;
assert!(header_bytes.len() == XLOG_SIZE_OF_XLOG_SHORT_PHD);
state.write(curr_ptr.0, &header_bytes);
curr_ptr = Lsn(curr_ptr.0 + header_bytes.len() as u64);
}
freespace = insert_freespace(curr_ptr);
}
assert!(
curr_ptr.segment_offset(WAL_SEGMENT_SIZE) >= XLOG_SIZE_OF_XLOG_SHORT_PHD
|| rdata.is_empty()
);
state.write(curr_ptr.0, rdata);
curr_ptr = Lsn(curr_ptr.0 + rdata.len() as u64);
written += rdata.len();
freespace -= rdata.len();
}
assert!(written == total_len as usize);
curr_ptr.0 = maxalign(curr_ptr.0);
assert!(curr_ptr == end);
Ok(())
}
fn maxalign<T>(size: T) -> T
where
T: std::ops::BitAnd<Output = T>
+ std::ops::Add<Output = T>
+ std::ops::Not<Output = T>
+ From<u8>,
{
(size + T::from(7)) & !T::from(7)
}
fn insert_freespace(ptr: Lsn) -> usize {
if ptr.block_offset() == 0 {
0
} else {
(XLOG_BLCKSZ as u64 - ptr.block_offset()) as usize
}
}
const XLP_BKP_REMOVABLE: u16 = 0x0004;
const USABLE_BYTES_IN_PAGE: u64 = (XLOG_BLCKSZ - XLOG_SIZE_OF_XLOG_SHORT_PHD) as u64;
const USABLE_BYTES_IN_SEGMENT: u64 = ((WAL_SEGMENT_SIZE / XLOG_BLCKSZ) as u64
* USABLE_BYTES_IN_PAGE)
- (XLOG_SIZE_OF_XLOG_RECORD - XLOG_SIZE_OF_XLOG_SHORT_PHD) as u64;
fn bytepos_to_recptr(bytepos: u64) -> Lsn {
let fullsegs = bytepos / USABLE_BYTES_IN_SEGMENT;
let mut bytesleft = bytepos % USABLE_BYTES_IN_SEGMENT;
let seg_offset = if bytesleft < (XLOG_BLCKSZ - XLOG_SIZE_OF_XLOG_SHORT_PHD) as u64 {
// fits on first page of segment
bytesleft + XLOG_SIZE_OF_XLOG_SHORT_PHD as u64
} else {
// account for the first page on segment with long header
bytesleft -= (XLOG_BLCKSZ - XLOG_SIZE_OF_XLOG_SHORT_PHD) as u64;
let fullpages = bytesleft / USABLE_BYTES_IN_PAGE;
bytesleft %= USABLE_BYTES_IN_PAGE;
XLOG_BLCKSZ as u64
+ fullpages * XLOG_BLCKSZ as u64
+ bytesleft
+ XLOG_SIZE_OF_XLOG_SHORT_PHD as u64
};
Lsn(XLogSegNoOffsetToRecPtr(
fullsegs,
seg_offset as u32,
WAL_SEGMENT_SIZE,
))
}
fn recptr_to_bytepos(ptr: Lsn) -> u64 {
let fullsegs = ptr.segment_number(WAL_SEGMENT_SIZE);
let offset = ptr.segment_offset(WAL_SEGMENT_SIZE) as u64;
let fullpages = offset / XLOG_BLCKSZ as u64;
let offset = offset % XLOG_BLCKSZ as u64;
if fullpages == 0 {
fullsegs * USABLE_BYTES_IN_SEGMENT
+ if offset > 0 {
assert!(offset >= XLOG_SIZE_OF_XLOG_SHORT_PHD as u64);
offset - XLOG_SIZE_OF_XLOG_SHORT_PHD as u64
} else {
0
}
} else {
fullsegs * USABLE_BYTES_IN_SEGMENT
+ (XLOG_BLCKSZ - XLOG_SIZE_OF_XLOG_SHORT_PHD) as u64
+ (fullpages - 1) * USABLE_BYTES_IN_PAGE
+ if offset > 0 {
assert!(offset >= XLOG_SIZE_OF_XLOG_SHORT_PHD as u64);
offset - XLOG_SIZE_OF_XLOG_SHORT_PHD as u64
} else {
0
}
}
}