reuse previous decoded wal record when decoding a new one

This commit is contained in:
Thang Pham
2022-06-28 16:40:48 -04:00
parent 9bc2287bcf
commit b5d9e5b06f
5 changed files with 42 additions and 34 deletions

View File

@@ -16,6 +16,7 @@ use crate::reltag::{RelTag, SlruKind};
use crate::repository::Repository;
use crate::repository::Timeline;
use crate::walingest::WalIngest;
use crate::walrecord::DecodedWALRecord;
use postgres_ffi::relfile_utils::*;
use postgres_ffi::waldecoder::*;
use postgres_ffi::xlog_utils::*;
@@ -268,10 +269,12 @@ fn import_wal<R: Repository>(
waldecoder.feed_bytes(&buf);
let mut nrecords = 0;
let mut modification = tline.begin_modification(last_lsn);
let mut decoded = DecodedWALRecord::default();
while last_lsn <= endpoint {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
let mut modification = tline.begin_modification(lsn);
walingest.ingest_record(recdata, lsn, &mut modification)?;
modification.lsn = lsn;
walingest.ingest_record(recdata, lsn, &mut modification, &mut decoded)?;
last_lsn = lsn;
nrecords += 1;
@@ -384,10 +387,12 @@ pub fn import_wal_from_tar<R: Repository, Reader: Read>(
waldecoder.feed_bytes(&bytes[offset..]);
let mut modification = tline.begin_modification(last_lsn);
let mut decoded = DecodedWALRecord::default();
while last_lsn <= end_lsn {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
let mut modification = tline.begin_modification(lsn);
walingest.ingest_record(recdata, lsn, &mut modification)?;
modification.lsn = lsn;
walingest.ingest_record(recdata, lsn, &mut modification, &mut decoded)?;
last_lsn = lsn;
debug!("imported record at {} (end {})", lsn, end_lsn);

View File

@@ -546,12 +546,6 @@ pub struct DatadirModification<'a, R: Repository> {
}
impl<'a, R: Repository> DatadirModification<'a, R> {
pub fn clear(&mut self) {
self.pending_updates.clear();
self.pending_deletions.clear();
self.pending_nblocks = 0;
}
/// Initialize a completely new repository.
///
/// This inserts the directory metadata entries that are assumed to
@@ -914,14 +908,15 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
/// Finish this atomic update, writing all the updated keys to the
/// underlying timeline.
///
pub fn commit(&self) -> Result<()> {
pub fn commit(&mut self) -> Result<()> {
let pending_nblocks = self.pending_nblocks;
self.pending_nblocks = 0;
for (key, value) in &self.pending_updates {
self.writer.put(key.clone(), self.lsn, value.clone())?;
for (key, value) in self.pending_updates.drain() {
self.writer.put(key, self.lsn, value)?;
}
for key_range in &self.pending_deletions {
self.writer.delete(key_range.clone(), self.lsn)?;
for key_range in self.pending_deletions.drain(..) {
self.writer.delete(key_range, self.lsn)?;
}
self.writer.finish_write(self.lsn);

View File

@@ -81,8 +81,10 @@ impl<'a, R: Repository> WalIngest<'a, R> {
recdata: Bytes,
lsn: Lsn,
modification: &mut DatadirModification<R>,
decoded: &mut DecodedWALRecord,
) -> Result<()> {
let mut decoded = decode_wal_record(recdata).context("failed decoding wal record")?;
decode_wal_record(recdata, decoded).context("failed decoding wal record")?;
let mut buf = decoded.record.clone();
buf.advance(decoded.main_data_offset);
@@ -96,7 +98,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
if decoded.xl_rmid == pg_constants::RM_HEAP_ID
|| decoded.xl_rmid == pg_constants::RM_HEAP2_ID
{
self.ingest_heapam_record(&mut buf, modification, &mut decoded)?;
self.ingest_heapam_record(&mut buf, modification, decoded)?;
}
// Handle other special record types
if decoded.xl_rmid == pg_constants::RM_SMGR_ID
@@ -211,7 +213,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
}
} else if decoded.xl_rmid == pg_constants::RM_RELMAP_ID {
let xlrec = XlRelmapUpdate::decode(&mut buf);
self.ingest_relmap_page(modification, &xlrec, &decoded)?;
self.ingest_relmap_page(modification, &xlrec, decoded)?;
} else if decoded.xl_rmid == pg_constants::RM_XLOG_ID {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_NEXTOID {
@@ -246,7 +248,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
// Iterate through all the blocks that the record modifies, and
// "put" a separate copy of the record for each block.
for blk in decoded.blocks.iter() {
self.ingest_decoded_block(modification, lsn, &decoded, blk)?;
self.ingest_decoded_block(modification, lsn, decoded, blk)?;
}
// If checkpoint data was updated, store the new version in the repository

View File

@@ -23,6 +23,7 @@ use crate::{
repository::{Repository, Timeline},
tenant_mgr,
walingest::WalIngest,
walrecord::DecodedWALRecord,
};
use postgres_ffi::waldecoder::WalStreamDecoder;
use utils::{lsn::Lsn, pq_proto::ReplicationFeedback, zid::ZTenantTimelineId};
@@ -153,19 +154,19 @@ pub async fn handle_walreceiver_connection(
// let mut n_records = 0;
// timer = std::time::Instant::now();
{
let mut decoded = DecodedWALRecord::default();
let mut modification = timeline.begin_modification(last_rec_lsn);
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
let _enter = info_span!("processing record", lsn = %lsn).entered();
// let _enter = info_span!("processing record", lsn = %lsn).entered();
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
// at risk of hitting a deadlock.
ensure!(lsn.is_aligned());
modification.clear();
modification.lsn = lsn;
walingest.ingest_record(recdata, lsn, &mut modification)?;
walingest.ingest_record(recdata, lsn, &mut modification, &mut decoded)?;
fail_point!("walreceiver-after-ingest");

View File

@@ -96,6 +96,7 @@ impl DecodedBkpBlock {
}
}
#[derive(Default)]
pub struct DecodedWALRecord {
pub xl_xid: TransactionId,
pub xl_info: u8,
@@ -505,7 +506,10 @@ impl XlMultiXactTruncate {
// block data
// ...
// main data
pub fn decode_wal_record(record: Bytes) -> Result<DecodedWALRecord, DeserializeError> {
pub fn decode_wal_record(
record: Bytes,
decoded: &mut DecodedWALRecord,
) -> Result<(), DeserializeError> {
let mut rnode_spcnode: u32 = 0;
let mut rnode_dbnode: u32 = 0;
let mut rnode_relnode: u32 = 0;
@@ -534,7 +538,9 @@ pub fn decode_wal_record(record: Bytes) -> Result<DecodedWALRecord, DeserializeE
let mut blocks_total_len: u32 = 0;
let mut main_data_len = 0;
let mut datatotal: u32 = 0;
let mut blocks: Vec<DecodedBkpBlock> = Vec::new();
if !decoded.blocks.is_empty() {
decoded.blocks.clear();
}
// 2. Decode the headers.
// XLogRecordBlockHeaders if any,
@@ -713,7 +719,7 @@ pub fn decode_wal_record(record: Bytes) -> Result<DecodedWALRecord, DeserializeE
blk.blkno
);
blocks.push(blk);
decoded.blocks.push(blk);
}
_ => {
@@ -724,7 +730,7 @@ pub fn decode_wal_record(record: Bytes) -> Result<DecodedWALRecord, DeserializeE
// 3. Decode blocks.
let mut ptr = record.len() - buf.remaining();
for blk in blocks.iter_mut() {
for blk in decoded.blocks.iter_mut() {
if blk.has_image {
blk.bimg_offset = ptr as u32;
ptr += blk.bimg_len as usize;
@@ -744,14 +750,13 @@ pub fn decode_wal_record(record: Bytes) -> Result<DecodedWALRecord, DeserializeE
assert_eq!(buf.remaining(), main_data_len as usize);
}
Ok(DecodedWALRecord {
xl_xid: xlogrec.xl_xid,
xl_info: xlogrec.xl_info,
xl_rmid: xlogrec.xl_rmid,
record,
blocks,
main_data_offset,
})
decoded.xl_xid = xlogrec.xl_xid;
decoded.xl_info = xlogrec.xl_info;
decoded.xl_rmid = xlogrec.xl_rmid;
decoded.record = record;
decoded.main_data_offset = main_data_offset;
Ok(())
}
///