pageserver - remove lsn from WALRecord

This commit is contained in:
Patrick Insinger
2021-10-12 20:45:21 -07:00
committed by Patrick Insinger
parent f658263543
commit 1c29de81de
7 changed files with 30 additions and 33 deletions

View File

@@ -1695,7 +1695,7 @@ impl LayeredTimeline {
//
// If we don't have a base image, then the oldest WAL record better initialize
// the page
if data.page_img.is_none() && !data.records.first().unwrap().will_init {
if data.page_img.is_none() && !data.records.first().unwrap().1.will_init {
// FIXME: this ought to be an error?
warn!(
"Base image for page {}/{} at {} not found, but got {} WAL records",
@@ -1773,7 +1773,7 @@ impl Deref for LayeredTimelineWriter<'_> {
}
impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
fn put_wal_record(&self, rel: RelishTag, blknum: u32, rec: WALRecord) -> Result<()> {
fn put_wal_record(&self, lsn: Lsn, rel: RelishTag, blknum: u32, rec: WALRecord) -> Result<()> {
if !rel.is_blocky() && blknum != 0 {
bail!(
"invalid request for block {} for non-blocky relish {}",
@@ -1781,11 +1781,11 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
rel
);
}
ensure!(rec.lsn.is_aligned(), "unaligned record LSN");
ensure!(lsn.is_aligned(), "unaligned record LSN");
let seg = SegmentTag::from_blknum(rel, blknum);
let layer = self.tl.get_layer_for_write(seg, rec.lsn)?;
let delta_size = layer.put_wal_record(blknum, rec);
let layer = self.tl.get_layer_for_write(seg, lsn)?;
let delta_size = layer.put_wal_record(lsn, blknum, rec);
self.tl
.increase_current_logical_size(delta_size * BLCKSZ as u32);
Ok(())

View File

@@ -198,7 +198,7 @@ impl Layer for DeltaLayer {
.slice_range((Included(&minkey), Included(&maxkey)))
.iter()
.rev();
for ((_blknum, _lsn), blob_range) in iter {
for ((_blknum, pv_lsn), blob_range) in iter {
let pv = PageVersion::des(&read_blob(&page_version_reader, blob_range)?)?;
if let Some(img) = pv.page_image {
@@ -208,7 +208,7 @@ impl Layer for DeltaLayer {
break;
} else if let Some(rec) = pv.record {
let will_init = rec.will_init;
reconstruct_data.records.push(rec);
reconstruct_data.records.push((*pv_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;

View File

@@ -160,13 +160,13 @@ impl Layer for InMemoryLayer {
.get_block_lsn_range(blknum, ..=lsn)
.iter()
.rev();
for (_entry_lsn, entry) in iter {
for (entry_lsn, entry) in iter {
if let Some(img) = &entry.page_image {
reconstruct_data.page_img = Some(img.clone());
need_image = false;
break;
} else if let Some(rec) = &entry.record {
reconstruct_data.records.push(rec.clone());
reconstruct_data.records.push((*entry_lsn, rec.clone()));
if rec.will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
@@ -337,10 +337,10 @@ impl InMemoryLayer {
// Write operations
/// Remember new page version, as a WAL record over previous version
pub fn put_wal_record(&self, blknum: u32, rec: WALRecord) -> u32 {
pub fn put_wal_record(&self, lsn: Lsn, blknum: u32, rec: WALRecord) -> u32 {
self.put_page_version(
blknum,
rec.lsn,
lsn,
PageVersion {
page_image: None,
record: Some(rec),

View File

@@ -78,7 +78,7 @@ pub struct PageVersion {
/// 'records' contains the records to apply over the base image.
///
pub struct PageReconstructData {
pub records: Vec<WALRecord>,
pub records: Vec<(Lsn, WALRecord)>,
pub page_img: Option<Bytes>,
}

View File

@@ -162,7 +162,7 @@ pub trait TimelineWriter: Deref<Target = dyn Timeline> {
///
/// This will implicitly extend the relation, if the page is beyond the
/// current end-of-file.
fn put_wal_record(&self, tag: RelishTag, blknum: u32, rec: WALRecord) -> Result<()>;
fn put_wal_record(&self, lsn: Lsn, tag: RelishTag, blknum: u32, rec: WALRecord) -> Result<()>;
/// Like put_wal_record, but with ready-made image of the page.
fn put_page_image(&self, tag: RelishTag, blknum: u32, lsn: Lsn, img: Bytes) -> Result<()>;
@@ -182,7 +182,6 @@ pub trait TimelineWriter: Deref<Target = dyn Timeline> {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct WALRecord {
pub lsn: Lsn, // LSN at the *end* of the record
pub will_init: bool,
pub rec: Bytes,
// Remember the offset of main_data in rec,
@@ -193,22 +192,19 @@ pub struct WALRecord {
impl WALRecord {
pub fn pack(&self, buf: &mut BytesMut) {
buf.put_u64(self.lsn.0);
buf.put_u8(self.will_init as u8);
buf.put_u32(self.main_data_offset);
buf.put_u32(self.rec.len() as u32);
buf.put_slice(&self.rec[..]);
}
pub fn unpack(buf: &mut Bytes) -> WALRecord {
let lsn = Lsn::from(buf.get_u64());
let will_init = buf.get_u8() != 0;
let main_data_offset = buf.get_u32();
let mut dst = vec![0u8; buf.get_u32() as usize];
buf.copy_to_slice(&mut dst);
let rec_len = buf.get_u32() as usize;
let rec = buf.split_to(rec_len);
WALRecord {
lsn,
will_init,
rec: Bytes::from(dst),
rec,
main_data_offset,
}
}
@@ -832,7 +828,7 @@ mod tests {
blknum: u32,
lsn: Lsn,
base_img: Option<Bytes>,
records: Vec<WALRecord>,
records: Vec<(Lsn, WALRecord)>,
) -> Result<Bytes, WalRedoError> {
let s = format!(
"redo for {} blk {} to get to {}, with {} and {} records",

View File

@@ -312,13 +312,12 @@ pub fn save_decoded_record(
});
let rec = WALRecord {
lsn,
will_init: blk.will_init || blk.apply_image,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
timeline.put_wal_record(tag, blk.blkno, rec)?;
timeline.put_wal_record(lsn, tag, blk.blkno, rec)?;
}
let mut buf = decoded.record.clone();
@@ -656,12 +655,12 @@ fn save_xact_record(
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
let rec = WALRecord {
lsn,
will_init: false,
rec: decoded.record.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
timeline.put_wal_record(
lsn,
RelishTag::Slru {
slru: SlruKind::Clog,
segno,
@@ -677,6 +676,7 @@ fn save_xact_record(
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
timeline.put_wal_record(
lsn,
RelishTag::Slru {
slru: SlruKind::Clog,
segno,
@@ -771,7 +771,6 @@ fn save_multixact_create_record(
decoded: &DecodedWALRecord,
) -> Result<()> {
let rec = WALRecord {
lsn,
will_init: false,
rec: decoded.record.clone(),
main_data_offset: decoded.main_data_offset as u32,
@@ -780,6 +779,7 @@ fn save_multixact_create_record(
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
timeline.put_wal_record(
lsn,
RelishTag::Slru {
slru: SlruKind::MultiXactOffsets,
segno,
@@ -799,6 +799,7 @@ fn save_multixact_create_record(
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
timeline.put_wal_record(
lsn,
RelishTag::Slru {
slru: SlruKind::MultiXactMembers,
segno,

View File

@@ -82,7 +82,7 @@ pub trait WalRedoManager: Send + Sync {
blknum: u32,
lsn: Lsn,
base_img: Option<Bytes>,
records: Vec<WALRecord>,
records: Vec<(Lsn, WALRecord)>,
) -> Result<Bytes, WalRedoError>;
}
@@ -99,7 +99,7 @@ impl crate::walredo::WalRedoManager for DummyRedoManager {
_blknum: u32,
_lsn: Lsn,
_base_img: Option<Bytes>,
_records: Vec<WALRecord>,
_records: Vec<(Lsn, WALRecord)>,
) -> Result<Bytes, WalRedoError> {
Err(WalRedoError::InvalidState)
}
@@ -150,7 +150,7 @@ struct WalRedoRequest {
lsn: Lsn,
base_img: Option<Bytes>,
records: Vec<WALRecord>,
records: Vec<(Lsn, WALRecord)>,
}
/// An error happened in WAL redo
@@ -179,7 +179,7 @@ impl WalRedoManager for PostgresRedoManager {
blknum: u32,
lsn: Lsn,
base_img: Option<Bytes>,
records: Vec<WALRecord>,
records: Vec<(Lsn, WALRecord)>,
) -> Result<Bytes, WalRedoError> {
let start_time;
let lock_time;
@@ -277,7 +277,7 @@ impl PostgresRedoManager {
page.extend_from_slice(&ZERO_PAGE);
}
// Apply all collected WAL records
for record in records {
for (_lsn, record) in records {
let mut buf = record.rec.clone();
WAL_REDO_RECORD_COUNTER.inc();
@@ -544,7 +544,7 @@ impl PostgresRedoProcess {
&mut self,
tag: BufferTag,
base_img: Option<Bytes>,
records: &[WALRecord],
records: &[(Lsn, WALRecord)],
) -> Result<Bytes, std::io::Error> {
let stdout = &mut self.stdout;
// Buffer the writes to avoid a lot of small syscalls.
@@ -570,11 +570,11 @@ impl PostgresRedoProcess {
}
// Send WAL records.
for rec in records.iter() {
for (lsn, rec) in records.iter() {
WAL_REDO_RECORD_COUNTER.inc();
stdin
.write_all(&build_apply_record_msg(rec.lsn, &rec.rec))
.write_all(&build_apply_record_msg(*lsn, &rec.rec))
.await?;
//debug!("sent WAL record to wal redo postgres process ({:X}/{:X}",