From f43f8401ee72c48ec0d4c998721fdaf2b4794407 Mon Sep 17 00:00:00 2001 From: anastasia Date: Mon, 25 Oct 2021 14:37:36 +0300 Subject: [PATCH] Don't wait for wal-redo process for non-relational records replay --- pageserver/src/walredo.rs | 344 +++++++++++++++++++++----------------- 1 file changed, 187 insertions(+), 157 deletions(-) diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 8cd696e8f3..4ceb4e2b37 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -153,6 +153,13 @@ struct WalRedoRequest { records: Vec<(Lsn, WALRecord)>, } +impl WalRedoRequest { + // Can this request be served by zenith redo funcitons + // or we need to pass it to wal-redo postgres process? + fn can_apply_in_zenith(&self) -> bool { + !matches!(self.rel, RelishTag::Relation(_)) + } +} /// An error happened in WAL redo #[derive(Debug, thiserror::Error)] pub enum WalRedoError { @@ -161,6 +168,8 @@ pub enum WalRedoError { #[error("cannot perform WAL redo now")] InvalidState, + #[error("cannot perform WAL redo for this request")] + InvalidRequest, } /// @@ -182,7 +191,6 @@ impl WalRedoManager for PostgresRedoManager { records: Vec<(Lsn, WALRecord)>, ) -> Result { let start_time; - let lock_time; let end_time; let request = WalRedoRequest { @@ -194,9 +202,16 @@ impl WalRedoManager for PostgresRedoManager { }; start_time = Instant::now(); - let result = { + let result; + + if request.can_apply_in_zenith() { + result = self.handle_apply_request_zenith(&request); + + end_time = Instant::now(); + WAL_REDO_TIME.observe(end_time.duration_since(start_time).as_secs_f64()); + } else { let mut process_guard = self.process.lock().unwrap(); - lock_time = Instant::now(); + let lock_time = Instant::now(); // launch the WAL redo process on first use if process_guard.is_none() { @@ -207,13 +222,14 @@ impl WalRedoManager for PostgresRedoManager { } let process = process_guard.as_mut().unwrap(); - self.runtime - .block_on(self.handle_apply_request(process, &request)) - }; - end_time = Instant::now(); + result = self + .runtime + .block_on(self.handle_apply_request_postgres(process, &request)); - WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64()); - WAL_REDO_TIME.observe(end_time.duration_since(lock_time).as_secs_f64()); + WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64()); + end_time = Instant::now(); + WAL_REDO_TIME.observe(end_time.duration_since(lock_time).as_secs_f64()); + } result } @@ -242,13 +258,47 @@ impl PostgresRedoManager { } /// - /// Process one request for WAL redo. + /// Process one request for WAL redo using wal-redo postgres /// - async fn handle_apply_request( + async fn handle_apply_request_postgres( &self, process: &mut PostgresRedoProcess, request: &WalRedoRequest, ) -> Result { + let blknum = request.blknum; + let lsn = request.lsn; + let base_img = request.base_img.clone(); + let records = &request.records; + let nrecords = records.len(); + + let start = Instant::now(); + + let apply_result: Result; + + if let RelishTag::Relation(rel) = request.rel { + // Relational WAL records are applied using wal-redo-postgres + let buf_tag = BufferTag { rel, blknum }; + apply_result = process.apply_wal_records(buf_tag, base_img, records).await; + + let duration = start.elapsed(); + + debug!( + "postgres applied {} WAL records in {} ms to reconstruct page image at LSN {}", + nrecords, + duration.as_millis(), + lsn + ); + + apply_result.map_err(WalRedoError::IoError) + } else { + Err(WalRedoError::InvalidRequest) + } + } + + /// + /// Process one request for WAL redo using custom zenith code + /// + fn handle_apply_request_zenith(&self, request: &WalRedoRequest) -> Result { let rel = request.rel; let blknum = request.blknum; let lsn = request.lsn; @@ -260,178 +310,158 @@ impl PostgresRedoManager { let start = Instant::now(); let apply_result: Result; - if let RelishTag::Relation(rel) = rel { - // Relational WAL records are applied using wal-redo-postgres - let buf_tag = BufferTag { rel, blknum }; - apply_result = process.apply_wal_records(buf_tag, base_img, records).await; + + // Non-relational WAL records are handled here, with custom code that has the + // same effects as the corresponding Postgres WAL redo function. + const ZERO_PAGE: [u8; 8192] = [0u8; 8192]; + let mut page = BytesMut::new(); + if let Some(fpi) = base_img { + // If full-page image is provided, then use it... + page.extend_from_slice(&fpi[..]); } else { - // Non-relational WAL records are handled here, with custom code that has the - // same effects as the corresponding Postgres WAL redo function. - const ZERO_PAGE: [u8; 8192] = [0u8; 8192]; - let mut page = BytesMut::new(); - if let Some(fpi) = base_img { - // If full-page image is provided, then use it... - page.extend_from_slice(&fpi[..]); - } else { - // otherwise initialize page with zeros - page.extend_from_slice(&ZERO_PAGE); + // otherwise initialize page with zeros + page.extend_from_slice(&ZERO_PAGE); + } + // Apply all collected WAL records + for (_lsn, record) in records { + let mut buf = record.rec.clone(); + + WAL_REDO_RECORD_COUNTER.inc(); + + // 1. Parse XLogRecord struct + // FIXME: refactor to avoid code duplication. + let xlogrec = XLogRecord::from_bytes(&mut buf); + + //move to main data + // TODO probably, we should store some records in our special format + // to avoid this weird parsing on replay + let skip = (record.main_data_offset - pg_constants::SIZEOF_XLOGRECORD) as usize; + if buf.remaining() > skip { + buf.advance(skip); } - // Apply all collected WAL records - for (_lsn, record) in records { - let mut buf = record.rec.clone(); - WAL_REDO_RECORD_COUNTER.inc(); - - // 1. Parse XLogRecord struct - // FIXME: refactor to avoid code duplication. - let xlogrec = XLogRecord::from_bytes(&mut buf); - - //move to main data - // TODO probably, we should store some records in our special format - // to avoid this weird parsing on replay - let skip = (record.main_data_offset - pg_constants::SIZEOF_XLOGRECORD) as usize; - if buf.remaining() > skip { - buf.advance(skip); - } - - if xlogrec.xl_rmid == pg_constants::RM_XACT_ID { - // Transaction manager stuff - let rec_segno = match rel { - RelishTag::Slru { slru, segno } => { - assert!( - slru == SlruKind::Clog, - "Not valid XACT relish tag {:?}", - rel + if xlogrec.xl_rmid == pg_constants::RM_XACT_ID { + // Transaction manager stuff + let rec_segno = match rel { + RelishTag::Slru { slru, segno } => { + assert!( + slru == SlruKind::Clog, + "Not valid XACT relish tag {:?}", + rel + ); + segno + } + _ => panic!("Not valid XACT relish tag {:?}", rel), + }; + let parsed_xact = + XlXactParsedRecord::decode(&mut buf, xlogrec.xl_xid, xlogrec.xl_info); + if parsed_xact.info == pg_constants::XLOG_XACT_COMMIT + || parsed_xact.info == pg_constants::XLOG_XACT_COMMIT_PREPARED + { + transaction_id_set_status( + parsed_xact.xid, + pg_constants::TRANSACTION_STATUS_COMMITTED, + &mut page, + ); + for subxact in &parsed_xact.subxacts { + let pageno = *subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE; + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + // only update xids on the requested page + if rec_segno == segno && blknum == rpageno { + transaction_id_set_status( + *subxact, + pg_constants::TRANSACTION_STATUS_COMMITTED, + &mut page, ); - segno - } - _ => panic!("Not valid XACT relish tag {:?}", rel), - }; - let parsed_xact = - XlXactParsedRecord::decode(&mut buf, xlogrec.xl_xid, xlogrec.xl_info); - if parsed_xact.info == pg_constants::XLOG_XACT_COMMIT - || parsed_xact.info == pg_constants::XLOG_XACT_COMMIT_PREPARED - { - transaction_id_set_status( - parsed_xact.xid, - pg_constants::TRANSACTION_STATUS_COMMITTED, - &mut page, - ); - for subxact in &parsed_xact.subxacts { - let pageno = *subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE; - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - // only update xids on the requested page - if rec_segno == segno && blknum == rpageno { - transaction_id_set_status( - *subxact, - pg_constants::TRANSACTION_STATUS_COMMITTED, - &mut page, - ); - } - } - } else if parsed_xact.info == pg_constants::XLOG_XACT_ABORT - || parsed_xact.info == pg_constants::XLOG_XACT_ABORT_PREPARED - { - transaction_id_set_status( - parsed_xact.xid, - pg_constants::TRANSACTION_STATUS_ABORTED, - &mut page, - ); - for subxact in &parsed_xact.subxacts { - let pageno = *subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE; - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - // only update xids on the requested page - if rec_segno == segno && blknum == rpageno { - transaction_id_set_status( - *subxact, - pg_constants::TRANSACTION_STATUS_ABORTED, - &mut page, - ); - } } } - } else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID { - // Multixact operations - let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { - let xlrec = XlMultiXactCreate::decode(&mut buf); - if let RelishTag::Slru { - slru, - segno: rec_segno, - } = rel - { - if slru == SlruKind::MultiXactMembers { - for i in 0..xlrec.nmembers { - let pageno = - i / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - if segno == rec_segno && rpageno == blknum { - // update only target block - let offset = xlrec.moff + i; - let memberoff = mx_offset_to_member_offset(offset); - let flagsoff = mx_offset_to_flags_offset(offset); - let bshift = mx_offset_to_flags_bitshift(offset); - let mut flagsval = - LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]); - flagsval &= !(((1 - << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - - 1) + } else if parsed_xact.info == pg_constants::XLOG_XACT_ABORT + || parsed_xact.info == pg_constants::XLOG_XACT_ABORT_PREPARED + { + transaction_id_set_status( + parsed_xact.xid, + pg_constants::TRANSACTION_STATUS_ABORTED, + &mut page, + ); + for subxact in &parsed_xact.subxacts { + let pageno = *subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE; + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + // only update xids on the requested page + if rec_segno == segno && blknum == rpageno { + transaction_id_set_status( + *subxact, + pg_constants::TRANSACTION_STATUS_ABORTED, + &mut page, + ); + } + } + } + } else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID { + // Multixact operations + let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { + let xlrec = XlMultiXactCreate::decode(&mut buf); + if let RelishTag::Slru { + slru, + segno: rec_segno, + } = rel + { + if slru == SlruKind::MultiXactMembers { + for i in 0..xlrec.nmembers { + let pageno = i / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + if segno == rec_segno && rpageno == blknum { + // update only target block + let offset = xlrec.moff + i; + let memberoff = mx_offset_to_member_offset(offset); + let flagsoff = mx_offset_to_flags_offset(offset); + let bshift = mx_offset_to_flags_bitshift(offset); + let mut flagsval = + LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]); + flagsval &= + !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) << bshift); - flagsval |= xlrec.members[i as usize].status << bshift; - LittleEndian::write_u32( - &mut page[flagsoff..flagsoff + 4], - flagsval, - ); - LittleEndian::write_u32( - &mut page[memberoff..memberoff + 4], - xlrec.members[i as usize].xid, - ); - } + flagsval |= xlrec.members[i as usize].status << bshift; + LittleEndian::write_u32( + &mut page[flagsoff..flagsoff + 4], + flagsval, + ); + LittleEndian::write_u32( + &mut page[memberoff..memberoff + 4], + xlrec.members[i as usize].xid, + ); } - } else { - // Multixact offsets SLRU - let offs = (xlrec.mid - % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32 - * 4) as usize; - LittleEndian::write_u32(&mut page[offs..offs + 4], xlrec.moff); } } else { - panic!(); + // Multixact offsets SLRU + let offs = (xlrec.mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32 + * 4) as usize; + LittleEndian::write_u32(&mut page[offs..offs + 4], xlrec.moff); } } else { panic!(); } + } else { + panic!(); } } - - apply_result = Ok::(page.freeze()); } + apply_result = Ok::(page.freeze()); + let duration = start.elapsed(); - let result: Result; - debug!( - "applied {} WAL records in {} ms to reconstruct page image at LSN {}", + "zenith applied {} WAL records in {} ms to reconstruct page image at LSN {}", nrecords, duration.as_millis(), lsn ); - if let Err(e) = apply_result { - error!("could not apply WAL records: {:#}", e); - result = Err(WalRedoError::IoError(e)); - } else { - let img = apply_result.unwrap(); - - result = Ok(img); - } - - // The caller is responsible for sending the response - result + apply_result.map_err(WalRedoError::IoError) } }