diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs index e245381690..6e411a1584 100644 --- a/pageserver/src/object_repository.rs +++ b/pageserver/src/object_repository.rs @@ -663,7 +663,7 @@ impl Timeline for ObjectTimeline { assert!(old <= lsn); // Use old value of last_record_lsn as prev_record_lsn - self.prev_record_lsn.fetch_max(Lsn((old.0 + 7) & !7)); + self.prev_record_lsn.fetch_max(old.align()); // Also advance last_valid_lsn let old = self.last_valid_lsn.advance(lsn); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 831c7a5301..ac85aa04e3 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -362,7 +362,7 @@ impl PageServerHandler { timeline.wait_lsn(lsn)?; lsn } - None => timeline.get_last_valid_lsn(), + None => timeline.get_last_record_lsn(), }; { let mut writer = CopyDataSink { pgb }; diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 337b31eb9e..c249ea7fc8 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -31,7 +31,7 @@ pub struct WalStreamDecoder { inputbuf: BytesMut, recordbuf: BytesMut, - crc_check: bool, + crc_check: bool, } #[derive(Error, Debug, Clone)] @@ -56,18 +56,16 @@ impl WalStreamDecoder { inputbuf: BytesMut::new(), recordbuf: BytesMut::new(), - crc_check, + crc_check, } } - pub fn available(&self) -> Lsn { - self.lsn + self.inputbuf.remaining() as u64 - } + pub fn available(&self) -> Lsn { + self.lsn + self.inputbuf.remaining() as u64 + } pub fn feed_bytes(&mut self, buf: &[u8]) { self.inputbuf.extend_from_slice(buf); - info!("WAL decoder: self.lsn={}, buf.len()={}, self.inputbuf.len()={}, self.inputbuf.remaining()={}", - self.lsn, buf.len(), self.inputbuf.len(), self.inputbuf.remaining()); } /// Attempt to decode another WAL record from the input that has been fed to the @@ -85,7 +83,6 @@ impl WalStreamDecoder { // parse long header if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD { - info!("self.inputbuf.remaining()={} < XLOG_SIZE_OF_XLOG_LONG_PHD at {}, self.recordbuf.len()={}", self.inputbuf.remaining(), self.lsn, self.recordbuf.len()); return Ok(None); } @@ -102,7 +99,6 @@ impl WalStreamDecoder { self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64; } else if self.lsn.block_offset() == 0 { if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD { - info!("self.inputbuf.remaining()={} < XLOG_SIZE_OF_XLOG_SHORT_PHD at {}, self.recordbuf.len()={}, self.contlen={}", self.inputbuf.remaining(), self.lsn, self.recordbuf.len(), self.contlen); return Ok(None); } @@ -110,7 +106,10 @@ impl WalStreamDecoder { if hdr.xlp_pageaddr != self.lsn.0 { return Err(WalDecodeError { - msg: "invalid xlog page header".into(), + msg: format!( + "invalid xlog page header: xlp_pageaddr={} vs. lsn={}", + hdr.xlp_pageaddr, self.lsn + ), lsn: self.lsn, }); } @@ -119,7 +118,6 @@ impl WalStreamDecoder { self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64; } else if self.padlen > 0 { if self.inputbuf.remaining() < self.padlen as usize { - info!("self.inputbuf.remaining()={} < self.padlen={}", self.inputbuf.remaining(), self.padlen); return Ok(None); } @@ -131,7 +129,6 @@ impl WalStreamDecoder { // need to have at least the xl_tot_len field if self.inputbuf.remaining() < 4 { - info!("self.inputbuf.remaining()={} < 4", self.inputbuf.remaining()); return Ok(None); } @@ -158,7 +155,6 @@ impl WalStreamDecoder { let n = min(self.contlen, pageleft) as usize; if self.inputbuf.remaining() < n { - info!("self.inputbuf.remaining()={} < n={} at {}, self.recordbuf.len()={}, self.contlen={}", self.inputbuf.remaining(), n, self.lsn, self.recordbuf.len(), self.contlen); return Ok(None); } @@ -185,21 +181,27 @@ impl WalStreamDecoder { self.padlen = self.lsn.calc_padding(8u32) as u32; } - // Check record CRC - if self.crc_check { - let mut crc = crc32c_append(0, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]); - crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]); - if crc != xlogrec.xl_crc { - info!("WAL record crc mismatch n={}, buf.len()={}, lsn={}, rec={:?}, recordbuf={:?}", + // Check record CRC + if self.crc_check { + let mut crc = crc32c_append(0, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]); + crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]); + if crc != xlogrec.xl_crc { + info!("WAL record crc mismatch n={}, buf.len()={}, lsn={}, rec={:?}, recordbuf={:?}", n, recordbuf.len(), self.lsn, xlogrec, recordbuf); - return Err(WalDecodeError { - msg: format!("WAL record crc mismatch n={}, buf.len()={}, lsn={}, rec={:?}", n, buf.len(), self.lsn, xlogrec), - lsn: self.lsn, - }); - } - } + return Err(WalDecodeError { + msg: format!( + "WAL record crc mismatch n={}, buf.len()={}, lsn={}, rec={:?}", + n, + buf.len(), + self.lsn, + xlogrec + ), + lsn: self.lsn, + }); + } + } - let result = (self.lsn, recordbuf); + let result = (self.lsn.align(), recordbuf); return Ok(Some(result)); } } diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 3f6151724e..ce804aacf4 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -168,7 +168,7 @@ fn walreceiver_main( // too. Just for the sake of paranoia. startpoint += startpoint.calc_padding(8u32); - info!( + debug!( "last_record_lsn {} starting replication from {} for timeline {}, server is at {}...", last_rec_lsn, startpoint, timelineid, end_of_wal ); @@ -203,48 +203,49 @@ fn walreceiver_main( tenantid, )?; - info!("received XLogData between {} and {}", startlsn, endlsn); - //assert!(waldecoder.lsn + waldecoder.padlen as u64 == startlsn); + trace!("received XLogData between {} and {}", startlsn, endlsn); waldecoder.feed_bytes(data); - loop { - match waldecoder.poll_decode() { - Ok(Some((lsn, recdata))) => { - // Save old checkpoint value to compare with it after decoding WAL record - let old_checkpoint_bytes = checkpoint.encode(); - let decoded = decode_wal_record(recdata.clone()); - let lsn = Lsn((lsn.0 + 7) & !7); // align on 8 - restore_local_repo::save_decoded_record( - &mut checkpoint, - &*timeline, - &decoded, - recdata, - lsn, - )?; - last_rec_lsn = lsn; + loop { + match waldecoder.poll_decode() { + Ok(Some((lsn, recdata))) => { + // Save old checkpoint value to compare with it after decoding WAL record + let old_checkpoint_bytes = checkpoint.encode(); + let decoded = decode_wal_record(recdata.clone()); + restore_local_repo::save_decoded_record( + &mut checkpoint, + &*timeline, + &decoded, + recdata, + lsn, + )?; + last_rec_lsn = lsn; - let new_checkpoint_bytes = checkpoint.encode(); - // Check if checkpoint data was updated by save_decoded_record - if new_checkpoint_bytes != old_checkpoint_bytes { - timeline.put_page_image( - RelishTag::Checkpoint, - 0, - lsn, - new_checkpoint_bytes, - false, - )?; + let new_checkpoint_bytes = checkpoint.encode(); + // Check if checkpoint data was updated by save_decoded_record + if new_checkpoint_bytes != old_checkpoint_bytes { + timeline.put_page_image( + RelishTag::Checkpoint, + 0, + lsn, + new_checkpoint_bytes, + false, + )?; + } + } + Ok(None) => { + trace!( + "End of replication stream {}..{} at {}", + startlsn, endlsn, last_rec_lsn + ); + break; + } + Err(e) => { + info!("Decode error {}", e); + return Err(e.into()); + } } - } - Ok(None) => { - info!("End of replication stream {}..{} at {}", startlsn, endlsn, last_rec_lsn); - break; - } - Err(e) => { - info!("Decode error {}", e); - return Err(e.into()); - } - } - } + } // Update the last_valid LSN value in the page cache one more time. We updated // it in the loop above, between each WAL record, but we might have received // a partial record after the last completed record. Our page cache's value diff --git a/postgres_ffi/src/xlog_utils.rs b/postgres_ffi/src/xlog_utils.rs index 3fa6ef83a8..7279f28b5d 100644 --- a/postgres_ffi/src/xlog_utils.rs +++ b/postgres_ffi/src/xlog_utils.rs @@ -108,22 +108,22 @@ fn find_end_of_wal_segment( segno: XLogSegNo, tli: TimeLineID, wal_seg_size: usize, - is_partial: bool, - rec_offs: &mut usize, - rec_hdr: &mut [u8; XLOG_SIZE_OF_XLOG_RECORD], - crc: &mut u32, - check_contrec: bool + is_partial: bool, + rec_offs: &mut usize, + rec_hdr: &mut [u8; XLOG_SIZE_OF_XLOG_RECORD], + crc: &mut u32, + check_contrec: bool, ) -> u32 { let mut offs: usize = 0; let mut contlen: usize = 0; let mut buf = [0u8; XLOG_BLCKSZ]; let file_name = XLogFileName(tli, segno, wal_seg_size); let mut last_valid_rec_pos: usize = 0; - let file_path = data_dir.join(if is_partial { - file_name.clone() + ".partial" - } else { - file_name - }); + let file_path = data_dir.join(if is_partial { + file_name.clone() + ".partial" + } else { + file_name + }); let mut file = File::open(&file_path).unwrap(); while offs < wal_seg_size { @@ -143,22 +143,21 @@ fn find_end_of_wal_segment( break; } if offs == 0 { - info!("xlp_rem_len={}", xlp_rem_len); offs = XLOG_SIZE_OF_XLOG_LONG_PHD; if (xlp_info & XLP_FIRST_IS_CONTRECORD) != 0 { - if check_contrec { - let xl_tot_len = LittleEndian::read_u32(&rec_hdr[0..4]) as usize; - contlen = xlp_rem_len as usize; - assert!(*rec_offs + contlen == xl_tot_len); - } else { - offs += ((xlp_rem_len + 7) & !7) as usize; - } + if check_contrec { + let xl_tot_len = LittleEndian::read_u32(&rec_hdr[0..4]) as usize; + contlen = xlp_rem_len as usize; + assert!(*rec_offs + contlen == xl_tot_len); + } else { + offs += ((xlp_rem_len + 7) & !7) as usize; + } } else if *rec_offs != 0 { - // There is incompleted page at previous segment but not cont record: - // it means that current segment is not valid and we have to return back. - info!("CONTRECORD flag is missed in page header"); - return 0; - } + // There is incompleted page at previous segment but no cont record: + // it means that current segment is not valid and we have to return back. + info!("CONTRECORD flag is missed in page header"); + return 0; + } } else { offs += XLOG_SIZE_OF_XLOG_SHORT_PHD; } @@ -179,13 +178,14 @@ fn find_end_of_wal_segment( // read the rest of the record, or as much as fits on this page. let n = min(contlen, pageleft); - let mut hdr_len: usize = 0; + let mut hdr_len: usize = 0; if *rec_offs < XLOG_SIZE_OF_XLOG_RECORD { - // copy header + // copy header hdr_len = min(XLOG_SIZE_OF_XLOG_RECORD - *rec_offs, n); - rec_hdr[*rec_offs..*rec_offs + hdr_len].copy_from_slice(&buf[page_offs..page_offs + hdr_len]); + rec_hdr[*rec_offs..*rec_offs + hdr_len] + .copy_from_slice(&buf[page_offs..page_offs + hdr_len]); } - *crc = crc32c_append(*crc, &buf[page_offs+hdr_len..page_offs+n]); + *crc = crc32c_append(*crc, &buf[page_offs + hdr_len..page_offs + n]); *rec_offs += n; offs += n; contlen -= n; @@ -193,12 +193,14 @@ fn find_end_of_wal_segment( if contlen == 0 { *crc = crc32c_append(*crc, &rec_hdr[0..XLOG_RECORD_CRC_OFFS]); offs = (offs + 7) & !7; // pad on 8 bytes boundary */ - let wal_crc = LittleEndian::read_u32(&rec_hdr[XLOG_RECORD_CRC_OFFS..XLOG_RECORD_CRC_OFFS+4]); + let wal_crc = LittleEndian::read_u32( + &rec_hdr[XLOG_RECORD_CRC_OFFS..XLOG_RECORD_CRC_OFFS + 4], + ); if *crc == wal_crc { last_valid_rec_pos = offs; - // Reset rec_offs and crc for start of new record - *rec_offs = 0; - *crc = 0; + // Reset rec_offs and crc for start of new record + *rec_offs = 0; + *crc = 0; } else { info!( "CRC mismatch {} vs {} at offset {} lsn {}", @@ -252,62 +254,102 @@ pub fn find_end_of_wal( } } if high_segno > 0 { - let mut high_offs = 0; - if precise { - let mut crc: u32 = 0; - let mut rec_offs: usize = 0; - let mut rec_hdr = [0u8; XLOG_SIZE_OF_XLOG_RECORD]; + let mut high_offs = 0; + if precise { + let mut crc: u32 = 0; + let mut rec_offs: usize = 0; + let mut rec_hdr = [0u8; XLOG_SIZE_OF_XLOG_RECORD]; + let file_name = XLogFileName(high_tli, high_segno, wal_seg_size); - /* - * To be able to calculate CRC of records crossing segment boundary, - * we need to parse previous segment - */ - assert!(high_segno > 1); - let prev_offs = find_end_of_wal_segment(data_dir, high_segno-1, high_tli, wal_seg_size, false, &mut rec_offs, &mut rec_hdr, &mut crc, false); - if prev_offs as usize <= XLOG_SIZE_OF_XLOG_LONG_PHD { - info!("Segment {} doesn't contain any valid record {}", high_segno-1, prev_offs); - } - assert!(prev_offs as usize > XLOG_SIZE_OF_XLOG_LONG_PHD); - high_offs = find_end_of_wal_segment(data_dir, high_segno, high_tli, wal_seg_size, high_ispartial, &mut rec_offs, &mut rec_hdr, &mut crc, true); + /* + * To be able to calculate CRC of records crossing segment boundary, + * we need to parse previous segment. + * FIXME: handle case when wal record is larger than WAL segment + */ + if high_segno > 1 { + let prev_offs = find_end_of_wal_segment( + data_dir, + high_segno - 1, + high_tli, + wal_seg_size, + false, + &mut rec_offs, + &mut rec_hdr, + &mut crc, + false, + ); + if prev_offs as usize <= XLOG_SIZE_OF_XLOG_LONG_PHD { + info!( + "Segment {} doesn't contain any valid record {}", + high_segno - 1, + prev_offs + ); + } + assert!(prev_offs as usize > XLOG_SIZE_OF_XLOG_LONG_PHD); + high_offs = find_end_of_wal_segment( + data_dir, + high_segno, + high_tli, + wal_seg_size, + high_ispartial, + &mut rec_offs, + &mut rec_hdr, + &mut crc, + true, + ); - let file_name = XLogFileName(high_tli, high_segno, wal_seg_size); - if high_offs as usize <= XLOG_SIZE_OF_XLOG_LONG_PHD { - // If last segment contais no valid records, then return back to previous segment - let wal_dir = data_dir.join("pg_wal"); - let file_path = if high_ispartial { - wal_dir.join(file_name.clone() + ".partial") - } else { - wal_dir.join(file_name.clone()) - }; - info!("Remove empty WAL segment {:?}", &file_path); - if let Err(e) = fs::remove_file(&file_path) { - info!("Failed to remove file {:?}: {}", &file_path, e); - } - high_ispartial = false; // previous segment should not be partial - high_segno -= 1; - high_offs = prev_offs; - } - // If last segment is not marked as partial, it means that next segment - // was not written. Let's make this segment partial once again. + if high_offs as usize <= XLOG_SIZE_OF_XLOG_LONG_PHD { + // If last segment contais no valid records, then return back to previous segment + let wal_dir = data_dir.join("pg_wal"); + let file_path = if high_ispartial { + wal_dir.join(file_name.clone() + ".partial") + } else { + wal_dir.join(file_name.clone()) + }; + info!("Remove empty WAL segment {:?}", &file_path); + if let Err(e) = fs::remove_file(&file_path) { + info!("Failed to remove file {:?}: {}", &file_path, e); + } + high_ispartial = false; // previous segment should not be partial + high_segno -= 1; + high_offs = prev_offs; + } + } else { + high_offs = find_end_of_wal_segment( + data_dir, + high_segno, + high_tli, + wal_seg_size, + high_ispartial, + &mut rec_offs, + &mut rec_hdr, + &mut crc, + false, + ); + } + + // If last segment is not marked as partial, it means that next segment + // was not written. Let's make this segment partial once again. if !high_ispartial { - let wal_dir = data_dir.join("pg_wal"); - if let Err(e) = fs::rename( - wal_dir.join(file_name.clone()), - wal_dir.join(file_name.clone() + ".partial") - ) { - info!( - "Failed to rename {} to {}.partial: {}", - &file_name, &file_name, e); - } - } - } else { - /* - * Move the starting pointer to the start of the next segment, if the - * highest one we saw was completed. - */ + let wal_dir = data_dir.join("pg_wal"); + if let Err(e) = fs::rename( + wal_dir.join(file_name.clone()), + wal_dir.join(file_name.clone() + ".partial"), + ) { + info!( + "Failed to rename {} to {}.partial: {}", + &file_name, &file_name, e + ); + } + } + } else { + /* + * Move the starting pointer to the start of the next segment, if the + * highest one we saw was completed. + */ if !high_ispartial { - high_segno += 1; - } + high_segno += 1; + } } let high_ptr = XLogSegNoOffsetToRecPtr(high_segno, high_offs, wal_seg_size); return (high_ptr, high_tli); diff --git a/vendor/postgres b/vendor/postgres index 603feabe77..d125cc56b8 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 603feabe776fee7ad173b921a7d4947a87a76e34 +Subproject commit d125cc56b8f493a5d3886a4534e4b1a6db198a8f diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index 2efe8018b5..aa3acf05fe 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -27,8 +27,8 @@ use crate::replication::HotStandbyFeedback; use crate::send_wal::SendWalHandler; use crate::timeline::{Timeline, TimelineTools}; use crate::WalAcceptorConf; -use postgres_ffi::xlog_utils::{TimeLineID, XLogFileName, MAX_SEND_SIZE, XLOG_BLCKSZ}; use pageserver::waldecoder::WalStreamDecoder; +use postgres_ffi::xlog_utils::{TimeLineID, XLogFileName, MAX_SEND_SIZE, XLOG_BLCKSZ}; pub const SK_MAGIC: u32 = 0xcafeceefu32; pub const SK_FORMAT_VERSION: u32 = 1; @@ -85,8 +85,6 @@ pub struct SafeKeeperInfo { pub commit_lsn: Lsn, /// locally flushed part of WAL pub flush_lsn: Lsn, - /// locally flushed last record LSN - pub safe_lsn: Lsn, /// minimal LSN which may be needed for recovery of some safekeeper: min(commit_lsn) for all safekeepers pub restart_lsn: Lsn, } @@ -113,7 +111,6 @@ impl SafeKeeperInfo { }, commit_lsn: Lsn(0), /* part of WAL acknowledged by quorum */ flush_lsn: Lsn(0), /* locally flushed part of WAL */ - safe_lsn: Lsn(0), /* locally flushed last record LSN */ restart_lsn: Lsn(0), /* minimal LSN which may be needed for recovery of some safekeeper */ } } @@ -139,7 +136,6 @@ struct SafeKeeperRequest { struct SafeKeeperResponse { epoch: u64, flush_lsn: Lsn, - safe_lsn: Lsn, hs_feedback: HotStandbyFeedback, } @@ -280,7 +276,6 @@ impl<'pg> ReceiveWalConn<'pg> { /* Calculate WAL end based on local data */ let (flush_lsn, timeline_id) = this_timeline.find_end_of_wal(&swh.conf.data_dir, true); my_info.flush_lsn = flush_lsn; - my_info.safe_lsn = flush_lsn; // end_of_wal actually returns last record LSN my_info.server.timeline = timeline_id; info!( @@ -328,11 +323,14 @@ impl<'pg> ReceiveWalConn<'pg> { } info!( - "Start accepting WAL for timeline {} tenant {} address {:?} flush_lsn={} safe_lsn={}", - server_info.timeline_id, server_info.tenant_id, self.peer_addr, my_info.flush_lsn, my_info.safe_lsn + "Start accepting WAL for timeline {} tenant {} address {:?} flush_lsn={}", + server_info.timeline_id, + server_info.tenant_id, + self.peer_addr, + my_info.flush_lsn, ); - let mut last_rec_lsn = Lsn(0); - let mut decoder = WalStreamDecoder::new(last_rec_lsn, false); + let mut last_rec_lsn = Lsn(0); + let mut decoder = WalStreamDecoder::new(last_rec_lsn, false); // Main loop loop { @@ -355,47 +353,52 @@ impl<'pg> ReceiveWalConn<'pg> { let end_pos = req.end_lsn; let rec_size = end_pos.checked_sub(start_pos).unwrap().0 as usize; assert!(rec_size <= MAX_SEND_SIZE); - if rec_size != 0 { - debug!( - "received for {} bytes between {} and {}", - rec_size, start_pos, end_pos, - ); + if rec_size != 0 { + debug!( + "received for {} bytes between {} and {}", + rec_size, start_pos, end_pos, + ); - /* Receive message body (from the rest of the message) */ - let mut buf = Vec::with_capacity(rec_size); - msg_reader.read_to_end(&mut buf)?; - assert_eq!(buf.len(), rec_size); + /* Receive message body (from the rest of the message) */ + let mut buf = Vec::with_capacity(rec_size); + msg_reader.read_to_end(&mut buf)?; + assert_eq!(buf.len(), rec_size); - if decoder.available() != start_pos { - info!("Restart decoder from {} to {}", decoder.available(), start_pos); - decoder = WalStreamDecoder::new(start_pos, false); - } - decoder.feed_bytes(&buf); - //while let Ok(Some((lsn,_rec))) = decoder.poll_decode() { - loop { - match decoder.poll_decode() { - Err(e) => info!("Decode error {}", e), - Ok(None) => info!("Decode end"), - Ok(Some((lsn,_rec))) => { - last_rec_lsn = lsn; - continue; - } - } - break; - } - last_rec_lsn = Lsn((last_rec_lsn.0 + 7) & !7); // align record start position on 8 - info!("Receive WAL {}..{} last_rec_lsn={}", start_pos, end_pos, last_rec_lsn); + if decoder.available() != start_pos { + info!( + "Restart decoder from {} to {}", + decoder.available(), + start_pos + ); + decoder = WalStreamDecoder::new(start_pos, false); + } + decoder.feed_bytes(&buf); + loop { + match decoder.poll_decode() { + Err(e) => info!("Decode error {}", e), + Ok(None) => {}, + Ok(Some((lsn, _rec))) => { + last_rec_lsn = lsn; + continue; + } + } + break; + } + info!( + "Receive WAL {}..{} last_rec_lsn={}", + start_pos, end_pos, last_rec_lsn + ); - /* Save message in file */ - Self::write_wal_file( - swh, - start_pos, - timeline_id, - this_timeline.get(), - wal_seg_size, - &buf, - )?; - } + /* Save message in file */ + Self::write_wal_file( + swh, + start_pos, + timeline_id, + this_timeline.get(), + wal_seg_size, + &buf, + )?; + } my_info.restart_lsn = req.restart_lsn; my_info.commit_lsn = req.commit_lsn; @@ -410,11 +413,8 @@ impl<'pg> ReceiveWalConn<'pg> { my_info.epoch = prop.epoch; /* bump epoch */ sync_control_file = true; } - if end_pos > my_info.flush_lsn { - my_info.flush_lsn = end_pos; - } - if last_rec_lsn > my_info.safe_lsn { - my_info.safe_lsn = last_rec_lsn; + if last_rec_lsn > my_info.flush_lsn { + my_info.flush_lsn = last_rec_lsn; } /* * Update restart LSN in control file. @@ -434,7 +434,6 @@ impl<'pg> ReceiveWalConn<'pg> { let resp = SafeKeeperResponse { epoch: my_info.epoch, flush_lsn: my_info.flush_lsn, - safe_lsn: my_info.safe_lsn, hs_feedback: this_timeline.get().get_hs_feedback(), }; self.write_msg(&resp)?; @@ -443,10 +442,15 @@ impl<'pg> ReceiveWalConn<'pg> { * Ping wal sender that new data is available. * FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper. */ - info!("Notify WAL senders min({}, {})={}", req.commit_lsn, my_info.safe_lsn, min(req.commit_lsn, my_info.safe_lsn)); + trace!( + "Notify WAL senders min({}, {})={}", + req.commit_lsn, + my_info.flush_lsn, + min(req.commit_lsn, my_info.flush_lsn) + ); this_timeline .get() - .notify_wal_senders(min(req.commit_lsn, my_info.safe_lsn)); + .notify_wal_senders(min(req.commit_lsn, my_info.flush_lsn)); } Ok(()) diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index 6b083cabe0..39018f510f 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -217,7 +217,12 @@ impl ReplicationConn { data: &file_buf, }))?; - info!("Sent WAL to page server {}..{}, end_pos={}", start_pos, start_pos + send_size as u64, end_pos); + debug!( + "Sent WAL to page server {}..{}, end_pos={}", + start_pos, + start_pos + send_size as u64, + end_pos + ); start_pos += send_size as u64; diff --git a/zenith_utils/src/lsn.rs b/zenith_utils/src/lsn.rs index 859d6db09d..f91be8c58c 100644 --- a/zenith_utils/src/lsn.rs +++ b/zenith_utils/src/lsn.rs @@ -24,6 +24,11 @@ impl Lsn { /// Maximum possible value for an LSN pub const MAX: Lsn = Lsn(u64::MAX); + /// Align LSN on 8-byte boundary (alignment of WAL records). + pub fn align(&self) -> Lsn { + Lsn((self.0 + 7) & !7) + } + /// Subtract a number, returning None on overflow. pub fn checked_sub>(self, other: T) -> Option { let other: u64 = other.into(); diff --git a/zenith_utils/src/seqwait.rs b/zenith_utils/src/seqwait.rs index 381ef9edca..d5ddc92b7a 100644 --- a/zenith_utils/src/seqwait.rs +++ b/zenith_utils/src/seqwait.rs @@ -209,11 +209,6 @@ where pub fn load(&self) -> T { self.internal.lock().unwrap().current } - - /// Set the current value. - pub fn store(&self, num: T) { - self.internal.lock().unwrap().current = num - } } #[cfg(test)]