Use last record LSN as flush position reported by safekeepers to walproposer to prevent moving VCL backward on compute node restart

This commit is contained in:
Konstantin Knizhnik
2021-08-26 18:08:29 +03:00
parent a2e135b404
commit 815528e0ce
10 changed files with 267 additions and 213 deletions

View File

@@ -663,7 +663,7 @@ impl Timeline for ObjectTimeline {
assert!(old <= lsn);
// Use old value of last_record_lsn as prev_record_lsn
self.prev_record_lsn.fetch_max(Lsn((old.0 + 7) & !7));
self.prev_record_lsn.fetch_max(old.align());
// Also advance last_valid_lsn
let old = self.last_valid_lsn.advance(lsn);

View File

@@ -362,7 +362,7 @@ impl PageServerHandler {
timeline.wait_lsn(lsn)?;
lsn
}
None => timeline.get_last_valid_lsn(),
None => timeline.get_last_record_lsn(),
};
{
let mut writer = CopyDataSink { pgb };

View File

@@ -31,7 +31,7 @@ pub struct WalStreamDecoder {
inputbuf: BytesMut,
recordbuf: BytesMut,
crc_check: bool,
crc_check: bool,
}
#[derive(Error, Debug, Clone)]
@@ -56,18 +56,16 @@ impl WalStreamDecoder {
inputbuf: BytesMut::new(),
recordbuf: BytesMut::new(),
crc_check,
crc_check,
}
}
pub fn available(&self) -> Lsn {
self.lsn + self.inputbuf.remaining() as u64
}
pub fn available(&self) -> Lsn {
self.lsn + self.inputbuf.remaining() as u64
}
pub fn feed_bytes(&mut self, buf: &[u8]) {
self.inputbuf.extend_from_slice(buf);
info!("WAL decoder: self.lsn={}, buf.len()={}, self.inputbuf.len()={}, self.inputbuf.remaining()={}",
self.lsn, buf.len(), self.inputbuf.len(), self.inputbuf.remaining());
}
/// Attempt to decode another WAL record from the input that has been fed to the
@@ -85,7 +83,6 @@ impl WalStreamDecoder {
// parse long header
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD {
info!("self.inputbuf.remaining()={} < XLOG_SIZE_OF_XLOG_LONG_PHD at {}, self.recordbuf.len()={}", self.inputbuf.remaining(), self.lsn, self.recordbuf.len());
return Ok(None);
}
@@ -102,7 +99,6 @@ impl WalStreamDecoder {
self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
} else if self.lsn.block_offset() == 0 {
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD {
info!("self.inputbuf.remaining()={} < XLOG_SIZE_OF_XLOG_SHORT_PHD at {}, self.recordbuf.len()={}, self.contlen={}", self.inputbuf.remaining(), self.lsn, self.recordbuf.len(), self.contlen);
return Ok(None);
}
@@ -110,7 +106,10 @@ impl WalStreamDecoder {
if hdr.xlp_pageaddr != self.lsn.0 {
return Err(WalDecodeError {
msg: "invalid xlog page header".into(),
msg: format!(
"invalid xlog page header: xlp_pageaddr={} vs. lsn={}",
hdr.xlp_pageaddr, self.lsn
),
lsn: self.lsn,
});
}
@@ -119,7 +118,6 @@ impl WalStreamDecoder {
self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
} else if self.padlen > 0 {
if self.inputbuf.remaining() < self.padlen as usize {
info!("self.inputbuf.remaining()={} < self.padlen={}", self.inputbuf.remaining(), self.padlen);
return Ok(None);
}
@@ -131,7 +129,6 @@ impl WalStreamDecoder {
// need to have at least the xl_tot_len field
if self.inputbuf.remaining() < 4 {
info!("self.inputbuf.remaining()={} < 4", self.inputbuf.remaining());
return Ok(None);
}
@@ -158,7 +155,6 @@ impl WalStreamDecoder {
let n = min(self.contlen, pageleft) as usize;
if self.inputbuf.remaining() < n {
info!("self.inputbuf.remaining()={} < n={} at {}, self.recordbuf.len()={}, self.contlen={}", self.inputbuf.remaining(), n, self.lsn, self.recordbuf.len(), self.contlen);
return Ok(None);
}
@@ -185,21 +181,27 @@ impl WalStreamDecoder {
self.padlen = self.lsn.calc_padding(8u32) as u32;
}
// Check record CRC
if self.crc_check {
let mut crc = crc32c_append(0, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
if crc != xlogrec.xl_crc {
info!("WAL record crc mismatch n={}, buf.len()={}, lsn={}, rec={:?}, recordbuf={:?}",
// Check record CRC
if self.crc_check {
let mut crc = crc32c_append(0, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
if crc != xlogrec.xl_crc {
info!("WAL record crc mismatch n={}, buf.len()={}, lsn={}, rec={:?}, recordbuf={:?}",
n, recordbuf.len(), self.lsn, xlogrec, recordbuf);
return Err(WalDecodeError {
msg: format!("WAL record crc mismatch n={}, buf.len()={}, lsn={}, rec={:?}", n, buf.len(), self.lsn, xlogrec),
lsn: self.lsn,
});
}
}
return Err(WalDecodeError {
msg: format!(
"WAL record crc mismatch n={}, buf.len()={}, lsn={}, rec={:?}",
n,
buf.len(),
self.lsn,
xlogrec
),
lsn: self.lsn,
});
}
}
let result = (self.lsn, recordbuf);
let result = (self.lsn.align(), recordbuf);
return Ok(Some(result));
}
}

View File

@@ -168,7 +168,7 @@ fn walreceiver_main(
// too. Just for the sake of paranoia.
startpoint += startpoint.calc_padding(8u32);
info!(
debug!(
"last_record_lsn {} starting replication from {} for timeline {}, server is at {}...",
last_rec_lsn, startpoint, timelineid, end_of_wal
);
@@ -203,48 +203,49 @@ fn walreceiver_main(
tenantid,
)?;
info!("received XLogData between {} and {}", startlsn, endlsn);
//assert!(waldecoder.lsn + waldecoder.padlen as u64 == startlsn);
trace!("received XLogData between {} and {}", startlsn, endlsn);
waldecoder.feed_bytes(data);
loop {
match waldecoder.poll_decode() {
Ok(Some((lsn, recdata))) => {
// Save old checkpoint value to compare with it after decoding WAL record
let old_checkpoint_bytes = checkpoint.encode();
let decoded = decode_wal_record(recdata.clone());
let lsn = Lsn((lsn.0 + 7) & !7); // align on 8
restore_local_repo::save_decoded_record(
&mut checkpoint,
&*timeline,
&decoded,
recdata,
lsn,
)?;
last_rec_lsn = lsn;
loop {
match waldecoder.poll_decode() {
Ok(Some((lsn, recdata))) => {
// Save old checkpoint value to compare with it after decoding WAL record
let old_checkpoint_bytes = checkpoint.encode();
let decoded = decode_wal_record(recdata.clone());
restore_local_repo::save_decoded_record(
&mut checkpoint,
&*timeline,
&decoded,
recdata,
lsn,
)?;
last_rec_lsn = lsn;
let new_checkpoint_bytes = checkpoint.encode();
// Check if checkpoint data was updated by save_decoded_record
if new_checkpoint_bytes != old_checkpoint_bytes {
timeline.put_page_image(
RelishTag::Checkpoint,
0,
lsn,
new_checkpoint_bytes,
false,
)?;
let new_checkpoint_bytes = checkpoint.encode();
// Check if checkpoint data was updated by save_decoded_record
if new_checkpoint_bytes != old_checkpoint_bytes {
timeline.put_page_image(
RelishTag::Checkpoint,
0,
lsn,
new_checkpoint_bytes,
false,
)?;
}
}
Ok(None) => {
trace!(
"End of replication stream {}..{} at {}",
startlsn, endlsn, last_rec_lsn
);
break;
}
Err(e) => {
info!("Decode error {}", e);
return Err(e.into());
}
}
}
Ok(None) => {
info!("End of replication stream {}..{} at {}", startlsn, endlsn, last_rec_lsn);
break;
}
Err(e) => {
info!("Decode error {}", e);
return Err(e.into());
}
}
}
}
// Update the last_valid LSN value in the page cache one more time. We updated
// it in the loop above, between each WAL record, but we might have received
// a partial record after the last completed record. Our page cache's value

View File

@@ -108,22 +108,22 @@ fn find_end_of_wal_segment(
segno: XLogSegNo,
tli: TimeLineID,
wal_seg_size: usize,
is_partial: bool,
rec_offs: &mut usize,
rec_hdr: &mut [u8; XLOG_SIZE_OF_XLOG_RECORD],
crc: &mut u32,
check_contrec: bool
is_partial: bool,
rec_offs: &mut usize,
rec_hdr: &mut [u8; XLOG_SIZE_OF_XLOG_RECORD],
crc: &mut u32,
check_contrec: bool,
) -> u32 {
let mut offs: usize = 0;
let mut contlen: usize = 0;
let mut buf = [0u8; XLOG_BLCKSZ];
let file_name = XLogFileName(tli, segno, wal_seg_size);
let mut last_valid_rec_pos: usize = 0;
let file_path = data_dir.join(if is_partial {
file_name.clone() + ".partial"
} else {
file_name
});
let file_path = data_dir.join(if is_partial {
file_name.clone() + ".partial"
} else {
file_name
});
let mut file = File::open(&file_path).unwrap();
while offs < wal_seg_size {
@@ -143,22 +143,21 @@ fn find_end_of_wal_segment(
break;
}
if offs == 0 {
info!("xlp_rem_len={}", xlp_rem_len);
offs = XLOG_SIZE_OF_XLOG_LONG_PHD;
if (xlp_info & XLP_FIRST_IS_CONTRECORD) != 0 {
if check_contrec {
let xl_tot_len = LittleEndian::read_u32(&rec_hdr[0..4]) as usize;
contlen = xlp_rem_len as usize;
assert!(*rec_offs + contlen == xl_tot_len);
} else {
offs += ((xlp_rem_len + 7) & !7) as usize;
}
if check_contrec {
let xl_tot_len = LittleEndian::read_u32(&rec_hdr[0..4]) as usize;
contlen = xlp_rem_len as usize;
assert!(*rec_offs + contlen == xl_tot_len);
} else {
offs += ((xlp_rem_len + 7) & !7) as usize;
}
} else if *rec_offs != 0 {
// There is incompleted page at previous segment but not cont record:
// it means that current segment is not valid and we have to return back.
info!("CONTRECORD flag is missed in page header");
return 0;
}
// There is incompleted page at previous segment but no cont record:
// it means that current segment is not valid and we have to return back.
info!("CONTRECORD flag is missed in page header");
return 0;
}
} else {
offs += XLOG_SIZE_OF_XLOG_SHORT_PHD;
}
@@ -179,13 +178,14 @@ fn find_end_of_wal_segment(
// read the rest of the record, or as much as fits on this page.
let n = min(contlen, pageleft);
let mut hdr_len: usize = 0;
let mut hdr_len: usize = 0;
if *rec_offs < XLOG_SIZE_OF_XLOG_RECORD {
// copy header
// copy header
hdr_len = min(XLOG_SIZE_OF_XLOG_RECORD - *rec_offs, n);
rec_hdr[*rec_offs..*rec_offs + hdr_len].copy_from_slice(&buf[page_offs..page_offs + hdr_len]);
rec_hdr[*rec_offs..*rec_offs + hdr_len]
.copy_from_slice(&buf[page_offs..page_offs + hdr_len]);
}
*crc = crc32c_append(*crc, &buf[page_offs+hdr_len..page_offs+n]);
*crc = crc32c_append(*crc, &buf[page_offs + hdr_len..page_offs + n]);
*rec_offs += n;
offs += n;
contlen -= n;
@@ -193,12 +193,14 @@ fn find_end_of_wal_segment(
if contlen == 0 {
*crc = crc32c_append(*crc, &rec_hdr[0..XLOG_RECORD_CRC_OFFS]);
offs = (offs + 7) & !7; // pad on 8 bytes boundary */
let wal_crc = LittleEndian::read_u32(&rec_hdr[XLOG_RECORD_CRC_OFFS..XLOG_RECORD_CRC_OFFS+4]);
let wal_crc = LittleEndian::read_u32(
&rec_hdr[XLOG_RECORD_CRC_OFFS..XLOG_RECORD_CRC_OFFS + 4],
);
if *crc == wal_crc {
last_valid_rec_pos = offs;
// Reset rec_offs and crc for start of new record
*rec_offs = 0;
*crc = 0;
// Reset rec_offs and crc for start of new record
*rec_offs = 0;
*crc = 0;
} else {
info!(
"CRC mismatch {} vs {} at offset {} lsn {}",
@@ -252,62 +254,102 @@ pub fn find_end_of_wal(
}
}
if high_segno > 0 {
let mut high_offs = 0;
if precise {
let mut crc: u32 = 0;
let mut rec_offs: usize = 0;
let mut rec_hdr = [0u8; XLOG_SIZE_OF_XLOG_RECORD];
let mut high_offs = 0;
if precise {
let mut crc: u32 = 0;
let mut rec_offs: usize = 0;
let mut rec_hdr = [0u8; XLOG_SIZE_OF_XLOG_RECORD];
let file_name = XLogFileName(high_tli, high_segno, wal_seg_size);
/*
* To be able to calculate CRC of records crossing segment boundary,
* we need to parse previous segment
*/
assert!(high_segno > 1);
let prev_offs = find_end_of_wal_segment(data_dir, high_segno-1, high_tli, wal_seg_size, false, &mut rec_offs, &mut rec_hdr, &mut crc, false);
if prev_offs as usize <= XLOG_SIZE_OF_XLOG_LONG_PHD {
info!("Segment {} doesn't contain any valid record {}", high_segno-1, prev_offs);
}
assert!(prev_offs as usize > XLOG_SIZE_OF_XLOG_LONG_PHD);
high_offs = find_end_of_wal_segment(data_dir, high_segno, high_tli, wal_seg_size, high_ispartial, &mut rec_offs, &mut rec_hdr, &mut crc, true);
/*
* To be able to calculate CRC of records crossing segment boundary,
* we need to parse previous segment.
* FIXME: handle case when wal record is larger than WAL segment
*/
if high_segno > 1 {
let prev_offs = find_end_of_wal_segment(
data_dir,
high_segno - 1,
high_tli,
wal_seg_size,
false,
&mut rec_offs,
&mut rec_hdr,
&mut crc,
false,
);
if prev_offs as usize <= XLOG_SIZE_OF_XLOG_LONG_PHD {
info!(
"Segment {} doesn't contain any valid record {}",
high_segno - 1,
prev_offs
);
}
assert!(prev_offs as usize > XLOG_SIZE_OF_XLOG_LONG_PHD);
high_offs = find_end_of_wal_segment(
data_dir,
high_segno,
high_tli,
wal_seg_size,
high_ispartial,
&mut rec_offs,
&mut rec_hdr,
&mut crc,
true,
);
let file_name = XLogFileName(high_tli, high_segno, wal_seg_size);
if high_offs as usize <= XLOG_SIZE_OF_XLOG_LONG_PHD {
// If last segment contais no valid records, then return back to previous segment
let wal_dir = data_dir.join("pg_wal");
let file_path = if high_ispartial {
wal_dir.join(file_name.clone() + ".partial")
} else {
wal_dir.join(file_name.clone())
};
info!("Remove empty WAL segment {:?}", &file_path);
if let Err(e) = fs::remove_file(&file_path) {
info!("Failed to remove file {:?}: {}", &file_path, e);
}
high_ispartial = false; // previous segment should not be partial
high_segno -= 1;
high_offs = prev_offs;
}
// If last segment is not marked as partial, it means that next segment
// was not written. Let's make this segment partial once again.
if high_offs as usize <= XLOG_SIZE_OF_XLOG_LONG_PHD {
// If last segment contais no valid records, then return back to previous segment
let wal_dir = data_dir.join("pg_wal");
let file_path = if high_ispartial {
wal_dir.join(file_name.clone() + ".partial")
} else {
wal_dir.join(file_name.clone())
};
info!("Remove empty WAL segment {:?}", &file_path);
if let Err(e) = fs::remove_file(&file_path) {
info!("Failed to remove file {:?}: {}", &file_path, e);
}
high_ispartial = false; // previous segment should not be partial
high_segno -= 1;
high_offs = prev_offs;
}
} else {
high_offs = find_end_of_wal_segment(
data_dir,
high_segno,
high_tli,
wal_seg_size,
high_ispartial,
&mut rec_offs,
&mut rec_hdr,
&mut crc,
false,
);
}
// If last segment is not marked as partial, it means that next segment
// was not written. Let's make this segment partial once again.
if !high_ispartial {
let wal_dir = data_dir.join("pg_wal");
if let Err(e) = fs::rename(
wal_dir.join(file_name.clone()),
wal_dir.join(file_name.clone() + ".partial")
) {
info!(
"Failed to rename {} to {}.partial: {}",
&file_name, &file_name, e);
}
}
} else {
/*
* Move the starting pointer to the start of the next segment, if the
* highest one we saw was completed.
*/
let wal_dir = data_dir.join("pg_wal");
if let Err(e) = fs::rename(
wal_dir.join(file_name.clone()),
wal_dir.join(file_name.clone() + ".partial"),
) {
info!(
"Failed to rename {} to {}.partial: {}",
&file_name, &file_name, e
);
}
}
} else {
/*
* Move the starting pointer to the start of the next segment, if the
* highest one we saw was completed.
*/
if !high_ispartial {
high_segno += 1;
}
high_segno += 1;
}
}
let high_ptr = XLogSegNoOffsetToRecPtr(high_segno, high_offs, wal_seg_size);
return (high_ptr, high_tli);

View File

@@ -27,8 +27,8 @@ use crate::replication::HotStandbyFeedback;
use crate::send_wal::SendWalHandler;
use crate::timeline::{Timeline, TimelineTools};
use crate::WalAcceptorConf;
use postgres_ffi::xlog_utils::{TimeLineID, XLogFileName, MAX_SEND_SIZE, XLOG_BLCKSZ};
use pageserver::waldecoder::WalStreamDecoder;
use postgres_ffi::xlog_utils::{TimeLineID, XLogFileName, MAX_SEND_SIZE, XLOG_BLCKSZ};
pub const SK_MAGIC: u32 = 0xcafeceefu32;
pub const SK_FORMAT_VERSION: u32 = 1;
@@ -85,8 +85,6 @@ pub struct SafeKeeperInfo {
pub commit_lsn: Lsn,
/// locally flushed part of WAL
pub flush_lsn: Lsn,
/// locally flushed last record LSN
pub safe_lsn: Lsn,
/// minimal LSN which may be needed for recovery of some safekeeper: min(commit_lsn) for all safekeepers
pub restart_lsn: Lsn,
}
@@ -113,7 +111,6 @@ impl SafeKeeperInfo {
},
commit_lsn: Lsn(0), /* part of WAL acknowledged by quorum */
flush_lsn: Lsn(0), /* locally flushed part of WAL */
safe_lsn: Lsn(0), /* locally flushed last record LSN */
restart_lsn: Lsn(0), /* minimal LSN which may be needed for recovery of some safekeeper */
}
}
@@ -139,7 +136,6 @@ struct SafeKeeperRequest {
struct SafeKeeperResponse {
epoch: u64,
flush_lsn: Lsn,
safe_lsn: Lsn,
hs_feedback: HotStandbyFeedback,
}
@@ -280,7 +276,6 @@ impl<'pg> ReceiveWalConn<'pg> {
/* Calculate WAL end based on local data */
let (flush_lsn, timeline_id) = this_timeline.find_end_of_wal(&swh.conf.data_dir, true);
my_info.flush_lsn = flush_lsn;
my_info.safe_lsn = flush_lsn; // end_of_wal actually returns last record LSN
my_info.server.timeline = timeline_id;
info!(
@@ -328,11 +323,14 @@ impl<'pg> ReceiveWalConn<'pg> {
}
info!(
"Start accepting WAL for timeline {} tenant {} address {:?} flush_lsn={} safe_lsn={}",
server_info.timeline_id, server_info.tenant_id, self.peer_addr, my_info.flush_lsn, my_info.safe_lsn
"Start accepting WAL for timeline {} tenant {} address {:?} flush_lsn={}",
server_info.timeline_id,
server_info.tenant_id,
self.peer_addr,
my_info.flush_lsn,
);
let mut last_rec_lsn = Lsn(0);
let mut decoder = WalStreamDecoder::new(last_rec_lsn, false);
let mut last_rec_lsn = Lsn(0);
let mut decoder = WalStreamDecoder::new(last_rec_lsn, false);
// Main loop
loop {
@@ -355,47 +353,52 @@ impl<'pg> ReceiveWalConn<'pg> {
let end_pos = req.end_lsn;
let rec_size = end_pos.checked_sub(start_pos).unwrap().0 as usize;
assert!(rec_size <= MAX_SEND_SIZE);
if rec_size != 0 {
debug!(
"received for {} bytes between {} and {}",
rec_size, start_pos, end_pos,
);
if rec_size != 0 {
debug!(
"received for {} bytes between {} and {}",
rec_size, start_pos, end_pos,
);
/* Receive message body (from the rest of the message) */
let mut buf = Vec::with_capacity(rec_size);
msg_reader.read_to_end(&mut buf)?;
assert_eq!(buf.len(), rec_size);
/* Receive message body (from the rest of the message) */
let mut buf = Vec::with_capacity(rec_size);
msg_reader.read_to_end(&mut buf)?;
assert_eq!(buf.len(), rec_size);
if decoder.available() != start_pos {
info!("Restart decoder from {} to {}", decoder.available(), start_pos);
decoder = WalStreamDecoder::new(start_pos, false);
}
decoder.feed_bytes(&buf);
//while let Ok(Some((lsn,_rec))) = decoder.poll_decode() {
loop {
match decoder.poll_decode() {
Err(e) => info!("Decode error {}", e),
Ok(None) => info!("Decode end"),
Ok(Some((lsn,_rec))) => {
last_rec_lsn = lsn;
continue;
}
}
break;
}
last_rec_lsn = Lsn((last_rec_lsn.0 + 7) & !7); // align record start position on 8
info!("Receive WAL {}..{} last_rec_lsn={}", start_pos, end_pos, last_rec_lsn);
if decoder.available() != start_pos {
info!(
"Restart decoder from {} to {}",
decoder.available(),
start_pos
);
decoder = WalStreamDecoder::new(start_pos, false);
}
decoder.feed_bytes(&buf);
loop {
match decoder.poll_decode() {
Err(e) => info!("Decode error {}", e),
Ok(None) => {},
Ok(Some((lsn, _rec))) => {
last_rec_lsn = lsn;
continue;
}
}
break;
}
info!(
"Receive WAL {}..{} last_rec_lsn={}",
start_pos, end_pos, last_rec_lsn
);
/* Save message in file */
Self::write_wal_file(
swh,
start_pos,
timeline_id,
this_timeline.get(),
wal_seg_size,
&buf,
)?;
}
/* Save message in file */
Self::write_wal_file(
swh,
start_pos,
timeline_id,
this_timeline.get(),
wal_seg_size,
&buf,
)?;
}
my_info.restart_lsn = req.restart_lsn;
my_info.commit_lsn = req.commit_lsn;
@@ -410,11 +413,8 @@ impl<'pg> ReceiveWalConn<'pg> {
my_info.epoch = prop.epoch; /* bump epoch */
sync_control_file = true;
}
if end_pos > my_info.flush_lsn {
my_info.flush_lsn = end_pos;
}
if last_rec_lsn > my_info.safe_lsn {
my_info.safe_lsn = last_rec_lsn;
if last_rec_lsn > my_info.flush_lsn {
my_info.flush_lsn = last_rec_lsn;
}
/*
* Update restart LSN in control file.
@@ -434,7 +434,6 @@ impl<'pg> ReceiveWalConn<'pg> {
let resp = SafeKeeperResponse {
epoch: my_info.epoch,
flush_lsn: my_info.flush_lsn,
safe_lsn: my_info.safe_lsn,
hs_feedback: this_timeline.get().get_hs_feedback(),
};
self.write_msg(&resp)?;
@@ -443,10 +442,15 @@ impl<'pg> ReceiveWalConn<'pg> {
* Ping wal sender that new data is available.
* FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper.
*/
info!("Notify WAL senders min({}, {})={}", req.commit_lsn, my_info.safe_lsn, min(req.commit_lsn, my_info.safe_lsn));
trace!(
"Notify WAL senders min({}, {})={}",
req.commit_lsn,
my_info.flush_lsn,
min(req.commit_lsn, my_info.flush_lsn)
);
this_timeline
.get()
.notify_wal_senders(min(req.commit_lsn, my_info.safe_lsn));
.notify_wal_senders(min(req.commit_lsn, my_info.flush_lsn));
}
Ok(())

View File

@@ -217,7 +217,12 @@ impl ReplicationConn {
data: &file_buf,
}))?;
info!("Sent WAL to page server {}..{}, end_pos={}", start_pos, start_pos + send_size as u64, end_pos);
debug!(
"Sent WAL to page server {}..{}, end_pos={}",
start_pos,
start_pos + send_size as u64,
end_pos
);
start_pos += send_size as u64;

View File

@@ -24,6 +24,11 @@ impl Lsn {
/// Maximum possible value for an LSN
pub const MAX: Lsn = Lsn(u64::MAX);
/// Align LSN on 8-byte boundary (alignment of WAL records).
pub fn align(&self) -> Lsn {
Lsn((self.0 + 7) & !7)
}
/// Subtract a number, returning None on overflow.
pub fn checked_sub<T: Into<u64>>(self, other: T) -> Option<Lsn> {
let other: u64 = other.into();

View File

@@ -209,11 +209,6 @@ where
pub fn load(&self) -> T {
self.internal.lock().unwrap().current
}
/// Set the current value.
pub fn store(&self, num: T) {
self.internal.lock().unwrap().current = num
}
}
#[cfg(test)]