diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index a03dc15b2e..47e555af10 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -320,7 +320,7 @@ impl PostgresNode { // Never clean up old WAL. TODO: We should use a replication // slot or something proper, to prevent the compute node - // from removing WAL that hasn't been streamed to the safekeepr or + // from removing WAL that hasn't been streamed to the safekeeper or // page server yet. (gh issue #349) self.append_conf("postgresql.conf", "wal_keep_size='10TB'\n")?; diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index f87012403f..99bc4a50a9 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -264,7 +264,7 @@ fn import_slru_file(timeline: &dyn Timeline, lsn: Lsn, slru: SlruKind, path: &Pa /// Scan PostgreSQL WAL files in given directory /// and load all records >= 'startpoint' into the repository. pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: Lsn) -> Result<()> { - let mut waldecoder = WalStreamDecoder::new(startpoint); + let mut waldecoder = WalStreamDecoder::new(startpoint, true); let mut segno = startpoint.segment_number(pg_constants::WAL_SEGMENT_SIZE); let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE); diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index bc072ff657..337b31eb9e 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -25,13 +25,13 @@ pub type MultiXactStatus = u32; pub struct WalStreamDecoder { lsn: Lsn, - startlsn: Lsn, // LSN where this record starts contlen: u32, padlen: u32, inputbuf: BytesMut, - recordbuf: BytesMut, + + crc_check: bool, } #[derive(Error, Debug, Clone)] @@ -46,21 +46,28 @@ pub struct WalDecodeError { // FIXME: This isn't a proper rust stream // impl WalStreamDecoder { - pub fn new(lsn: Lsn) -> WalStreamDecoder { + pub fn new(lsn: Lsn, crc_check: bool) -> WalStreamDecoder { WalStreamDecoder { lsn, - startlsn: Lsn(0), contlen: 0, padlen: 0, inputbuf: BytesMut::new(), recordbuf: BytesMut::new(), + + crc_check, } } + pub fn available(&self) -> Lsn { + self.lsn + self.inputbuf.remaining() as u64 + } + pub fn feed_bytes(&mut self, buf: &[u8]) { self.inputbuf.extend_from_slice(buf); + info!("WAL decoder: self.lsn={}, buf.len()={}, self.inputbuf.len()={}, self.inputbuf.remaining()={}", + self.lsn, buf.len(), self.inputbuf.len(), self.inputbuf.remaining()); } /// Attempt to decode another WAL record from the input that has been fed to the @@ -78,6 +85,7 @@ impl WalStreamDecoder { // parse long header if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD { + info!("self.inputbuf.remaining()={} < XLOG_SIZE_OF_XLOG_LONG_PHD at {}, self.recordbuf.len()={}", self.inputbuf.remaining(), self.lsn, self.recordbuf.len()); return Ok(None); } @@ -92,9 +100,9 @@ impl WalStreamDecoder { // TODO: verify the remaining fields in the header self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64; - continue; } else if self.lsn.block_offset() == 0 { if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD { + info!("self.inputbuf.remaining()={} < XLOG_SIZE_OF_XLOG_SHORT_PHD at {}, self.recordbuf.len()={}, self.contlen={}", self.inputbuf.remaining(), self.lsn, self.recordbuf.len(), self.contlen); return Ok(None); } @@ -109,9 +117,9 @@ impl WalStreamDecoder { // TODO: verify the remaining fields in the header self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64; - continue; } else if self.padlen > 0 { if self.inputbuf.remaining() < self.padlen as usize { + info!("self.inputbuf.remaining()={} < self.padlen={}", self.inputbuf.remaining(), self.padlen); return Ok(None); } @@ -123,11 +131,11 @@ impl WalStreamDecoder { // need to have at least the xl_tot_len field if self.inputbuf.remaining() < 4 { + info!("self.inputbuf.remaining()={} < 4", self.inputbuf.remaining()); return Ok(None); } // read xl_tot_len FIXME: assumes little-endian - self.startlsn = self.lsn; let xl_tot_len = self.inputbuf.get_u32_le(); if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD { return Err(WalDecodeError { @@ -142,7 +150,6 @@ impl WalStreamDecoder { self.recordbuf.put_u32_le(xl_tot_len); self.contlen = xl_tot_len - 4; - continue; } else { // we're continuing a record, possibly from previous page. let pageleft = self.lsn.remaining_in_block() as u32; @@ -151,6 +158,7 @@ impl WalStreamDecoder { let n = min(self.contlen, pageleft) as usize; if self.inputbuf.remaining() < n { + info!("self.inputbuf.remaining()={} < n={} at {}, self.recordbuf.len()={}, self.contlen={}", self.inputbuf.remaining(), n, self.lsn, self.recordbuf.len(), self.contlen); return Ok(None); } @@ -164,17 +172,10 @@ impl WalStreamDecoder { let recordbuf = recordbuf.freeze(); let mut buf = recordbuf.clone(); + let xlogrec = XLogRecord::from_bytes(&mut buf); + // XLOG_SWITCH records are special. If we see one, we need to skip // to the next WAL segment. - let xlogrec = XLogRecord::from_bytes(&mut buf); - let mut crc = crc32c_append(0, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]); - crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]); - if crc != xlogrec.xl_crc { - return Err(WalDecodeError { - msg: "WAL record crc mismatch".into(), - lsn: self.lsn, - }); - } if xlogrec.is_xlog_switch_record() { trace!("saw xlog switch record at {}", self.lsn); self.padlen = @@ -184,10 +185,23 @@ impl WalStreamDecoder { self.padlen = self.lsn.calc_padding(8u32) as u32; } + // Check record CRC + if self.crc_check { + let mut crc = crc32c_append(0, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]); + crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]); + if crc != xlogrec.xl_crc { + info!("WAL record crc mismatch n={}, buf.len()={}, lsn={}, rec={:?}, recordbuf={:?}", + n, recordbuf.len(), self.lsn, xlogrec, recordbuf); + return Err(WalDecodeError { + msg: format!("WAL record crc mismatch n={}, buf.len()={}, lsn={}, rec={:?}", n, buf.len(), self.lsn, xlogrec), + lsn: self.lsn, + }); + } + } + let result = (self.lsn, recordbuf); return Ok(Some(result)); } - continue; } } // check record boundaries diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 069f84e2ff..3f6151724e 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -168,7 +168,7 @@ fn walreceiver_main( // too. Just for the sake of paranoia. startpoint += startpoint.calc_padding(8u32); - debug!( + info!( "last_record_lsn {} starting replication from {} for timeline {}, server is at {}...", last_rec_lsn, startpoint, timelineid, end_of_wal ); @@ -178,7 +178,7 @@ fn walreceiver_main( let copy_stream = rclient.copy_both_simple(&query)?; let mut physical_stream = ReplicationIter::new(copy_stream); - let mut waldecoder = WalStreamDecoder::new(startpoint); + let mut waldecoder = WalStreamDecoder::new(startpoint, true); let checkpoint_bytes = timeline.get_page_at_lsn_nowait(RelishTag::Checkpoint, 0, startpoint)?; let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?; @@ -203,14 +203,17 @@ fn walreceiver_main( tenantid, )?; - trace!("received XLogData between {} and {}", startlsn, endlsn); - + info!("received XLogData between {} and {}", startlsn, endlsn); + //assert!(waldecoder.lsn + waldecoder.padlen as u64 == startlsn); waldecoder.feed_bytes(data); - while let Some((lsn, recdata)) = waldecoder.poll_decode()? { + loop { + match waldecoder.poll_decode() { + Ok(Some((lsn, recdata))) => { // Save old checkpoint value to compare with it after decoding WAL record let old_checkpoint_bytes = checkpoint.encode(); - let decoded = decode_wal_record(recdata.clone()); + let decoded = decode_wal_record(recdata.clone()); + let lsn = Lsn((lsn.0 + 7) & !7); // align on 8 restore_local_repo::save_decoded_record( &mut checkpoint, &*timeline, @@ -231,8 +234,17 @@ fn walreceiver_main( false, )?; } - } - + } + Ok(None) => { + info!("End of replication stream {}..{} at {}", startlsn, endlsn, last_rec_lsn); + break; + } + Err(e) => { + info!("Decode error {}", e); + return Err(e.into()); + } + } + } // Update the last_valid LSN value in the page cache one more time. We updated // it in the loop above, between each WAL record, but we might have received // a partial record after the last completed record. Our page cache's value diff --git a/postgres_ffi/src/xlog_utils.rs b/postgres_ffi/src/xlog_utils.rs index 2b6297fa10..3fa6ef83a8 100644 --- a/postgres_ffi/src/xlog_utils.rs +++ b/postgres_ffi/src/xlog_utils.rs @@ -108,17 +108,23 @@ fn find_end_of_wal_segment( segno: XLogSegNo, tli: TimeLineID, wal_seg_size: usize, + is_partial: bool, + rec_offs: &mut usize, + rec_hdr: &mut [u8; XLOG_SIZE_OF_XLOG_RECORD], + crc: &mut u32, + check_contrec: bool ) -> u32 { let mut offs: usize = 0; let mut contlen: usize = 0; - let mut wal_crc: u32 = 0; - let mut crc: u32 = 0; - let mut rec_offs: usize = 0; let mut buf = [0u8; XLOG_BLCKSZ]; let file_name = XLogFileName(tli, segno, wal_seg_size); let mut last_valid_rec_pos: usize = 0; - let mut file = File::open(data_dir.join(file_name.clone() + ".partial")).unwrap(); - let mut rec_hdr = [0u8; XLOG_RECORD_CRC_OFFS]; + let file_path = data_dir.join(if is_partial { + file_name.clone() + ".partial" + } else { + file_name + }); + let mut file = File::open(&file_path).unwrap(); while offs < wal_seg_size { if offs % XLOG_BLCKSZ == 0 { @@ -133,14 +139,26 @@ fn find_end_of_wal_segment( let xlp_info = LittleEndian::read_u16(&buf[2..4]); let xlp_rem_len = LittleEndian::read_u32(&buf[XLP_REM_LEN_OFFS..XLP_REM_LEN_OFFS + 4]); if xlp_magic != XLOG_PAGE_MAGIC as u16 { - info!("Invalid WAL file {}.partial magic {}", file_name, xlp_magic); + info!("Invalid WAL file {:?} magic {}", &file_path, xlp_magic); break; } if offs == 0 { + info!("xlp_rem_len={}", xlp_rem_len); offs = XLOG_SIZE_OF_XLOG_LONG_PHD; if (xlp_info & XLP_FIRST_IS_CONTRECORD) != 0 { - offs += ((xlp_rem_len + 7) & !7) as usize; - } + if check_contrec { + let xl_tot_len = LittleEndian::read_u32(&rec_hdr[0..4]) as usize; + contlen = xlp_rem_len as usize; + assert!(*rec_offs + contlen == xl_tot_len); + } else { + offs += ((xlp_rem_len + 7) & !7) as usize; + } + } else if *rec_offs != 0 { + // There is incompleted page at previous segment but not cont record: + // it means that current segment is not valid and we have to return back. + info!("CONTRECORD flag is missed in page header"); + return 0; + } } else { offs += XLOG_SIZE_OF_XLOG_SHORT_PHD; } @@ -150,9 +168,8 @@ fn find_end_of_wal_segment( if xl_tot_len == 0 { break; } - last_valid_rec_pos = offs; offs += 4; - rec_offs = 4; + *rec_offs = 4; contlen = xl_tot_len - 4; rec_hdr[0..4].copy_from_slice(&buf[page_offs..page_offs + 4]); } else { @@ -162,34 +179,30 @@ fn find_end_of_wal_segment( // read the rest of the record, or as much as fits on this page. let n = min(contlen, pageleft); - if rec_offs < XLOG_RECORD_CRC_OFFS { - let len = min(XLOG_RECORD_CRC_OFFS - rec_offs, n); - rec_hdr[rec_offs..rec_offs + len].copy_from_slice(&buf[page_offs..page_offs + len]); + let mut hdr_len: usize = 0; + if *rec_offs < XLOG_SIZE_OF_XLOG_RECORD { + // copy header + hdr_len = min(XLOG_SIZE_OF_XLOG_RECORD - *rec_offs, n); + rec_hdr[*rec_offs..*rec_offs + hdr_len].copy_from_slice(&buf[page_offs..page_offs + hdr_len]); } - if rec_offs <= XLOG_RECORD_CRC_OFFS && rec_offs + n >= XLOG_SIZE_OF_XLOG_RECORD { - let crc_offs = page_offs - rec_offs + XLOG_RECORD_CRC_OFFS; - wal_crc = LittleEndian::read_u32(&buf[crc_offs..crc_offs + 4]); - crc = crc32c_append(0, &buf[crc_offs + 4..page_offs + n]); - crc = !crc; - } else { - crc ^= 0xFFFFFFFFu32; - crc = crc32c_append(crc, &buf[page_offs..page_offs + n]); - crc = !crc; - } - rec_offs += n; + *crc = crc32c_append(*crc, &buf[page_offs+hdr_len..page_offs+n]); + *rec_offs += n; offs += n; contlen -= n; if contlen == 0 { - crc = !crc; - crc = crc32c_append(crc, &rec_hdr); + *crc = crc32c_append(*crc, &rec_hdr[0..XLOG_RECORD_CRC_OFFS]); offs = (offs + 7) & !7; // pad on 8 bytes boundary */ - if crc == wal_crc { + let wal_crc = LittleEndian::read_u32(&rec_hdr[XLOG_RECORD_CRC_OFFS..XLOG_RECORD_CRC_OFFS+4]); + if *crc == wal_crc { last_valid_rec_pos = offs; + // Reset rec_offs and crc for start of new record + *rec_offs = 0; + *crc = 0; } else { info!( - "CRC mismatch {} vs {} at {}", - crc, wal_crc, last_valid_rec_pos + "CRC mismatch {} vs {} at offset {} lsn {}", + *crc, wal_crc, offs, last_valid_rec_pos ); break; } @@ -239,21 +252,67 @@ pub fn find_end_of_wal( } } if high_segno > 0 { - let mut high_offs = 0; - /* - * Move the starting pointer to the start of the next segment, if the - * highest one we saw was completed. - */ - if !high_ispartial { - high_segno += 1; - } else if precise { - /* otherwise locate last record in last partial segment */ - high_offs = find_end_of_wal_segment(data_dir, high_segno, high_tli, wal_seg_size); + let mut high_offs = 0; + if precise { + let mut crc: u32 = 0; + let mut rec_offs: usize = 0; + let mut rec_hdr = [0u8; XLOG_SIZE_OF_XLOG_RECORD]; + + /* + * To be able to calculate CRC of records crossing segment boundary, + * we need to parse previous segment + */ + assert!(high_segno > 1); + let prev_offs = find_end_of_wal_segment(data_dir, high_segno-1, high_tli, wal_seg_size, false, &mut rec_offs, &mut rec_hdr, &mut crc, false); + if prev_offs as usize <= XLOG_SIZE_OF_XLOG_LONG_PHD { + info!("Segment {} doesn't contain any valid record {}", high_segno-1, prev_offs); + } + assert!(prev_offs as usize > XLOG_SIZE_OF_XLOG_LONG_PHD); + high_offs = find_end_of_wal_segment(data_dir, high_segno, high_tli, wal_seg_size, high_ispartial, &mut rec_offs, &mut rec_hdr, &mut crc, true); + + let file_name = XLogFileName(high_tli, high_segno, wal_seg_size); + if high_offs as usize <= XLOG_SIZE_OF_XLOG_LONG_PHD { + // If last segment contais no valid records, then return back to previous segment + let wal_dir = data_dir.join("pg_wal"); + let file_path = if high_ispartial { + wal_dir.join(file_name.clone() + ".partial") + } else { + wal_dir.join(file_name.clone()) + }; + info!("Remove empty WAL segment {:?}", &file_path); + if let Err(e) = fs::remove_file(&file_path) { + info!("Failed to remove file {:?}: {}", &file_path, e); + } + high_ispartial = false; // previous segment should not be partial + high_segno -= 1; + high_offs = prev_offs; + } + // If last segment is not marked as partial, it means that next segment + // was not written. Let's make this segment partial once again. + if !high_ispartial { + let wal_dir = data_dir.join("pg_wal"); + if let Err(e) = fs::rename( + wal_dir.join(file_name.clone()), + wal_dir.join(file_name.clone() + ".partial") + ) { + info!( + "Failed to rename {} to {}.partial: {}", + &file_name, &file_name, e); + } + } + } else { + /* + * Move the starting pointer to the start of the next segment, if the + * highest one we saw was completed. + */ + if !high_ispartial { + high_segno += 1; + } } let high_ptr = XLogSegNoOffsetToRecPtr(high_segno, high_offs, wal_seg_size); return (high_ptr, high_tli); } - (0, 0) + (0, 1) // First timeline is 1 } pub fn main() { diff --git a/vendor/postgres b/vendor/postgres index 022285aea5..603feabe77 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 022285aea5767d3c33e0a95e8a2cffec6421369d +Subproject commit 603feabe776fee7ad173b921a7d4947a87a76e34 diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index ac9b620b65..2efe8018b5 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -28,6 +28,7 @@ use crate::send_wal::SendWalHandler; use crate::timeline::{Timeline, TimelineTools}; use crate::WalAcceptorConf; use postgres_ffi::xlog_utils::{TimeLineID, XLogFileName, MAX_SEND_SIZE, XLOG_BLCKSZ}; +use pageserver::waldecoder::WalStreamDecoder; pub const SK_MAGIC: u32 = 0xcafeceefu32; pub const SK_FORMAT_VERSION: u32 = 1; @@ -84,6 +85,8 @@ pub struct SafeKeeperInfo { pub commit_lsn: Lsn, /// locally flushed part of WAL pub flush_lsn: Lsn, + /// locally flushed last record LSN + pub safe_lsn: Lsn, /// minimal LSN which may be needed for recovery of some safekeeper: min(commit_lsn) for all safekeepers pub restart_lsn: Lsn, } @@ -110,6 +113,7 @@ impl SafeKeeperInfo { }, commit_lsn: Lsn(0), /* part of WAL acknowledged by quorum */ flush_lsn: Lsn(0), /* locally flushed part of WAL */ + safe_lsn: Lsn(0), /* locally flushed last record LSN */ restart_lsn: Lsn(0), /* minimal LSN which may be needed for recovery of some safekeeper */ } } @@ -135,6 +139,7 @@ struct SafeKeeperRequest { struct SafeKeeperResponse { epoch: u64, flush_lsn: Lsn, + safe_lsn: Lsn, hs_feedback: HotStandbyFeedback, } @@ -275,6 +280,7 @@ impl<'pg> ReceiveWalConn<'pg> { /* Calculate WAL end based on local data */ let (flush_lsn, timeline_id) = this_timeline.find_end_of_wal(&swh.conf.data_dir, true); my_info.flush_lsn = flush_lsn; + my_info.safe_lsn = flush_lsn; // end_of_wal actually returns last record LSN my_info.server.timeline = timeline_id; info!( @@ -300,8 +306,8 @@ impl<'pg> ReceiveWalConn<'pg> { ); } my_info.server.node_id = prop.node_id; - this_timeline.get().set_info(&my_info); /* Need to persist our vote first */ + this_timeline.get().set_info(&my_info); this_timeline.get().save_control_file(true)?; let mut flushed_restart_lsn = Lsn(0); @@ -322,9 +328,11 @@ impl<'pg> ReceiveWalConn<'pg> { } info!( - "Start accepting WAL for timeline {} tenant {} address {:?} flush_lsn={}", - server_info.timeline_id, server_info.tenant_id, self.peer_addr, my_info.flush_lsn + "Start accepting WAL for timeline {} tenant {} address {:?} flush_lsn={} safe_lsn={}", + server_info.timeline_id, server_info.tenant_id, self.peer_addr, my_info.flush_lsn, my_info.safe_lsn ); + let mut last_rec_lsn = Lsn(0); + let mut decoder = WalStreamDecoder::new(last_rec_lsn, false); // Main loop loop { @@ -347,27 +355,47 @@ impl<'pg> ReceiveWalConn<'pg> { let end_pos = req.end_lsn; let rec_size = end_pos.checked_sub(start_pos).unwrap().0 as usize; assert!(rec_size <= MAX_SEND_SIZE); + if rec_size != 0 { + debug!( + "received for {} bytes between {} and {}", + rec_size, start_pos, end_pos, + ); - debug!( - "received for {} bytes between {} and {}", - rec_size, start_pos, end_pos, - ); + /* Receive message body (from the rest of the message) */ + let mut buf = Vec::with_capacity(rec_size); + msg_reader.read_to_end(&mut buf)?; + assert_eq!(buf.len(), rec_size); - /* Receive message body (from the rest of the message) */ - let mut buf = Vec::with_capacity(rec_size); - msg_reader.read_to_end(&mut buf)?; - assert_eq!(buf.len(), rec_size); - - /* Save message in file */ - Self::write_wal_file( - swh, - start_pos, - timeline_id, - this_timeline.get(), - wal_seg_size, - &buf, - )?; + if decoder.available() != start_pos { + info!("Restart decoder from {} to {}", decoder.available(), start_pos); + decoder = WalStreamDecoder::new(start_pos, false); + } + decoder.feed_bytes(&buf); + //while let Ok(Some((lsn,_rec))) = decoder.poll_decode() { + loop { + match decoder.poll_decode() { + Err(e) => info!("Decode error {}", e), + Ok(None) => info!("Decode end"), + Ok(Some((lsn,_rec))) => { + last_rec_lsn = lsn; + continue; + } + } + break; + } + last_rec_lsn = Lsn((last_rec_lsn.0 + 7) & !7); // align record start position on 8 + info!("Receive WAL {}..{} last_rec_lsn={}", start_pos, end_pos, last_rec_lsn); + /* Save message in file */ + Self::write_wal_file( + swh, + start_pos, + timeline_id, + this_timeline.get(), + wal_seg_size, + &buf, + )?; + } my_info.restart_lsn = req.restart_lsn; my_info.commit_lsn = req.commit_lsn; @@ -385,12 +413,16 @@ impl<'pg> ReceiveWalConn<'pg> { if end_pos > my_info.flush_lsn { my_info.flush_lsn = end_pos; } + if last_rec_lsn > my_info.safe_lsn { + my_info.safe_lsn = last_rec_lsn; + } /* * Update restart LSN in control file. * To avoid negative impact on performance of extra fsync, do it only * when restart_lsn delta exceeds WAL segment size. */ sync_control_file |= flushed_restart_lsn + (wal_seg_size as u64) < my_info.restart_lsn; + this_timeline.get().set_info(&my_info); this_timeline.get().save_control_file(sync_control_file)?; if sync_control_file { @@ -401,7 +433,8 @@ impl<'pg> ReceiveWalConn<'pg> { //info!("Confirm LSN: {:X}/{:>08X}", (end_pos>>32) as u32, end_pos as u32); let resp = SafeKeeperResponse { epoch: my_info.epoch, - flush_lsn: end_pos, + flush_lsn: my_info.flush_lsn, + safe_lsn: my_info.safe_lsn, hs_feedback: this_timeline.get().get_hs_feedback(), }; self.write_msg(&resp)?; @@ -410,9 +443,10 @@ impl<'pg> ReceiveWalConn<'pg> { * Ping wal sender that new data is available. * FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper. */ + info!("Notify WAL senders min({}, {})={}", req.commit_lsn, my_info.safe_lsn, min(req.commit_lsn, my_info.safe_lsn)); this_timeline .get() - .notify_wal_senders(min(req.commit_lsn, end_pos)); + .notify_wal_senders(min(req.commit_lsn, my_info.safe_lsn)); } Ok(()) diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index 8d38adc4cf..6b083cabe0 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -217,9 +217,9 @@ impl ReplicationConn { data: &file_buf, }))?; - start_pos += send_size as u64; + info!("Sent WAL to page server {}..{}, end_pos={}", start_pos, start_pos + send_size as u64, end_pos); - debug!("Sent WAL to page server up to {}", end_pos); + start_pos += send_size as u64; // Decide whether to reuse this file. If we don't set wal_file here // a new file will be opened next time. diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index 6aaabeaa78..b5a49d2eb2 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -175,7 +175,7 @@ impl Timeline { } } - fn _stop_wal_senders(&self) { + pub fn stop_wal_senders(&self) { self.notify_wal_senders(END_REPLICATION_MARKER); } diff --git a/zenith_utils/src/seqwait.rs b/zenith_utils/src/seqwait.rs index d5ddc92b7a..381ef9edca 100644 --- a/zenith_utils/src/seqwait.rs +++ b/zenith_utils/src/seqwait.rs @@ -209,6 +209,11 @@ where pub fn load(&self) -> T { self.internal.lock().unwrap().current } + + /// Set the current value. + pub fn store(&self, num: T) { + self.internal.lock().unwrap().current = num + } } #[cfg(test)]