From 0e026371eca1c595a45697dc06e536d159b9f3f4 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 14 Oct 2021 14:21:23 +0300 Subject: [PATCH] Optimize WAL decoding slightly. This adds a fast-path for the common case that the record doesn't cross a page boundary. We now split off a new Bytes directly from the original input buffer in that case, instead of copying the record to a new BytesMut. Shaves about 5% of the page server's CPU time on my laptop, in the 'test_bulk_insert' test. --- pageserver/src/waldecoder.rs | 101 ++++++++++++++++++--------------- postgres_ffi/src/xlog_utils.rs | 7 ++- 2 files changed, 62 insertions(+), 46 deletions(-) diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index cb94b9248b..b1e8e3b54f 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -72,6 +72,10 @@ impl WalStreamDecoder { /// Err(WalDecodeError): an error occured while decoding, meaning the input was invalid. /// pub fn poll_decode(&mut self) -> Result, WalDecodeError> { + let recordbuf; + + // Run state machine that validates page headers, and reassembles records + // that cross page boundaries. loop { // parse and verify page boundaries as we go if self.lsn.segment_offset(pg_constants::WAL_SEGMENT_SIZE) == 0 { @@ -120,29 +124,41 @@ impl WalStreamDecoder { self.lsn += self.padlen as u64; self.padlen = 0; } else if self.contlen == 0 { - // need to have at least the xl_tot_len field + assert!(self.recordbuf.is_empty()); + // need to have at least the xl_tot_len field if self.inputbuf.remaining() < 4 { return Ok(None); } - // read xl_tot_len FIXME: assumes little-endian + // peek xl_tot_len at the beginning of the record. + // FIXME: assumes little-endian self.startlsn = self.lsn; - let xl_tot_len = self.inputbuf.get_u32_le(); + let xl_tot_len = (&self.inputbuf[0..4]).get_u32_le(); if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD { return Err(WalDecodeError { msg: format!("invalid xl_tot_len {}", xl_tot_len), lsn: self.lsn, }); } - self.lsn += 4; - self.recordbuf.clear(); - self.recordbuf.reserve(xl_tot_len as usize); - self.recordbuf.put_u32_le(xl_tot_len); - - self.contlen = xl_tot_len - 4; - continue; + // Fast path for the common case that the whole record fits on the page. + let pageleft = self.lsn.remaining_in_block() as u32; + if self.inputbuf.remaining() >= xl_tot_len as usize && xl_tot_len <= pageleft { + // Take the record from the 'inputbuf', and validate it. + recordbuf = self.inputbuf.copy_to_bytes(xl_tot_len as usize); + self.lsn += xl_tot_len as u64; + break; + } else { + // Need to assemble the record from pieces. Remember the size of the + // record, and loop back. On next iteration, we will reach the 'else' + // branch below, and copy the part of the record that was on this page + // to 'recordbuf'. Subsequent iterations will skip page headers, and + // append the continuations from the next pages to 'recordbuf'. + self.recordbuf.reserve(xl_tot_len as usize); + self.contlen = xl_tot_len; + continue; + } } else { // we're continuing a record, possibly from previous page. let pageleft = self.lsn.remaining_in_block() as u32; @@ -159,47 +175,42 @@ impl WalStreamDecoder { self.contlen -= n as u32; if self.contlen == 0 { - let recordbuf = std::mem::replace(&mut self.recordbuf, BytesMut::new()); - - let recordbuf = recordbuf.freeze(); - let mut buf = recordbuf.clone(); - - let xlogrec = XLogRecord::from_bytes(&mut buf); - - // XLOG_SWITCH records are special. If we see one, we need to skip - // to the next WAL segment. - if xlogrec.is_xlog_switch_record() { - trace!("saw xlog switch record at {}", self.lsn); - self.padlen = - self.lsn.calc_padding(pg_constants::WAL_SEGMENT_SIZE as u64) as u32; - } else { - // Pad to an 8-byte boundary - self.padlen = self.lsn.calc_padding(8u32) as u32; - } - - let mut crc = crc32c_append(0, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]); - crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]); - if crc != xlogrec.xl_crc { - return Err(WalDecodeError { - msg: "WAL record crc mismatch".into(), - lsn: self.lsn, - }); - } - - // Always align resulting LSN on 0x8 boundary -- that is important for getPage() - // and WalReceiver integration. Since this code is used both for WalReceiver and - // initial WAL import let's force alignment right here. - let result = (self.lsn.align(), recordbuf); - return Ok(Some(result)); + // The record is now complete. + recordbuf = std::mem::replace(&mut self.recordbuf, BytesMut::new()).freeze(); + break; } continue; } } - // check record boundaries - // deal with continuation records + // We now have a record in the 'recordbuf' local variable. + let xlogrec = XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]); - // deal with xlog_switch records + let mut crc = 0; + crc = crc32c_append(crc, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]); + crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]); + if crc != xlogrec.xl_crc { + return Err(WalDecodeError { + msg: "WAL record crc mismatch".into(), + lsn: self.lsn, + }); + } + + // XLOG_SWITCH records are special. If we see one, we need to skip + // to the next WAL segment. + if xlogrec.is_xlog_switch_record() { + trace!("saw xlog switch record at {}", self.lsn); + self.padlen = self.lsn.calc_padding(pg_constants::WAL_SEGMENT_SIZE as u64) as u32; + } else { + // Pad to an 8-byte boundary + self.padlen = self.lsn.calc_padding(8u32) as u32; + } + + // Always align resulting LSN on 0x8 boundary -- that is important for getPage() + // and WalReceiver integration. Since this code is used both for WalReceiver and + // initial WAL import let's force alignment right here. + let result = (self.lsn.align(), recordbuf); + Ok(Some(result)) } } diff --git a/postgres_ffi/src/xlog_utils.rs b/postgres_ffi/src/xlog_utils.rs index 5cdaec4a2a..7826630a78 100644 --- a/postgres_ffi/src/xlog_utils.rs +++ b/postgres_ffi/src/xlog_utils.rs @@ -329,7 +329,12 @@ pub fn main() { } impl XLogRecord { - pub fn from_bytes(buf: &mut Bytes) -> XLogRecord { + pub fn from_slice(buf: &[u8]) -> XLogRecord { + use zenith_utils::bin_ser::LeSer; + XLogRecord::des(buf).unwrap() + } + + pub fn from_bytes(buf: &mut B) -> XLogRecord { use zenith_utils::bin_ser::LeSer; XLogRecord::des_from(&mut buf.reader()).unwrap() }