mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 06:52:55 +00:00
Maintain safe LSN position at safekeepers
This commit is contained in:
@@ -320,7 +320,7 @@ impl PostgresNode {
|
||||
|
||||
// Never clean up old WAL. TODO: We should use a replication
|
||||
// slot or something proper, to prevent the compute node
|
||||
// from removing WAL that hasn't been streamed to the safekeepr or
|
||||
// from removing WAL that hasn't been streamed to the safekeeper or
|
||||
// page server yet. (gh issue #349)
|
||||
self.append_conf("postgresql.conf", "wal_keep_size='10TB'\n")?;
|
||||
|
||||
|
||||
@@ -264,7 +264,7 @@ fn import_slru_file(timeline: &dyn Timeline, lsn: Lsn, slru: SlruKind, path: &Pa
|
||||
/// Scan PostgreSQL WAL files in given directory
|
||||
/// and load all records >= 'startpoint' into the repository.
|
||||
pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: Lsn) -> Result<()> {
|
||||
let mut waldecoder = WalStreamDecoder::new(startpoint);
|
||||
let mut waldecoder = WalStreamDecoder::new(startpoint, true);
|
||||
|
||||
let mut segno = startpoint.segment_number(pg_constants::WAL_SEGMENT_SIZE);
|
||||
let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE);
|
||||
|
||||
@@ -25,13 +25,13 @@ pub type MultiXactStatus = u32;
|
||||
pub struct WalStreamDecoder {
|
||||
lsn: Lsn,
|
||||
|
||||
startlsn: Lsn, // LSN where this record starts
|
||||
contlen: u32,
|
||||
padlen: u32,
|
||||
|
||||
inputbuf: BytesMut,
|
||||
|
||||
recordbuf: BytesMut,
|
||||
|
||||
crc_check: bool,
|
||||
}
|
||||
|
||||
#[derive(Error, Debug, Clone)]
|
||||
@@ -46,21 +46,28 @@ pub struct WalDecodeError {
|
||||
// FIXME: This isn't a proper rust stream
|
||||
//
|
||||
impl WalStreamDecoder {
|
||||
pub fn new(lsn: Lsn) -> WalStreamDecoder {
|
||||
pub fn new(lsn: Lsn, crc_check: bool) -> WalStreamDecoder {
|
||||
WalStreamDecoder {
|
||||
lsn,
|
||||
|
||||
startlsn: Lsn(0),
|
||||
contlen: 0,
|
||||
padlen: 0,
|
||||
|
||||
inputbuf: BytesMut::new(),
|
||||
recordbuf: BytesMut::new(),
|
||||
|
||||
crc_check,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn available(&self) -> Lsn {
|
||||
self.lsn + self.inputbuf.remaining() as u64
|
||||
}
|
||||
|
||||
pub fn feed_bytes(&mut self, buf: &[u8]) {
|
||||
self.inputbuf.extend_from_slice(buf);
|
||||
info!("WAL decoder: self.lsn={}, buf.len()={}, self.inputbuf.len()={}, self.inputbuf.remaining()={}",
|
||||
self.lsn, buf.len(), self.inputbuf.len(), self.inputbuf.remaining());
|
||||
}
|
||||
|
||||
/// Attempt to decode another WAL record from the input that has been fed to the
|
||||
@@ -78,6 +85,7 @@ impl WalStreamDecoder {
|
||||
// parse long header
|
||||
|
||||
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD {
|
||||
info!("self.inputbuf.remaining()={} < XLOG_SIZE_OF_XLOG_LONG_PHD at {}, self.recordbuf.len()={}", self.inputbuf.remaining(), self.lsn, self.recordbuf.len());
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
@@ -92,9 +100,9 @@ impl WalStreamDecoder {
|
||||
// TODO: verify the remaining fields in the header
|
||||
|
||||
self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
|
||||
continue;
|
||||
} else if self.lsn.block_offset() == 0 {
|
||||
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD {
|
||||
info!("self.inputbuf.remaining()={} < XLOG_SIZE_OF_XLOG_SHORT_PHD at {}, self.recordbuf.len()={}, self.contlen={}", self.inputbuf.remaining(), self.lsn, self.recordbuf.len(), self.contlen);
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
@@ -109,9 +117,9 @@ impl WalStreamDecoder {
|
||||
// TODO: verify the remaining fields in the header
|
||||
|
||||
self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
|
||||
continue;
|
||||
} else if self.padlen > 0 {
|
||||
if self.inputbuf.remaining() < self.padlen as usize {
|
||||
info!("self.inputbuf.remaining()={} < self.padlen={}", self.inputbuf.remaining(), self.padlen);
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
@@ -123,11 +131,11 @@ impl WalStreamDecoder {
|
||||
// need to have at least the xl_tot_len field
|
||||
|
||||
if self.inputbuf.remaining() < 4 {
|
||||
info!("self.inputbuf.remaining()={} < 4", self.inputbuf.remaining());
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// read xl_tot_len FIXME: assumes little-endian
|
||||
self.startlsn = self.lsn;
|
||||
let xl_tot_len = self.inputbuf.get_u32_le();
|
||||
if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
|
||||
return Err(WalDecodeError {
|
||||
@@ -142,7 +150,6 @@ impl WalStreamDecoder {
|
||||
self.recordbuf.put_u32_le(xl_tot_len);
|
||||
|
||||
self.contlen = xl_tot_len - 4;
|
||||
continue;
|
||||
} else {
|
||||
// we're continuing a record, possibly from previous page.
|
||||
let pageleft = self.lsn.remaining_in_block() as u32;
|
||||
@@ -151,6 +158,7 @@ impl WalStreamDecoder {
|
||||
let n = min(self.contlen, pageleft) as usize;
|
||||
|
||||
if self.inputbuf.remaining() < n {
|
||||
info!("self.inputbuf.remaining()={} < n={} at {}, self.recordbuf.len()={}, self.contlen={}", self.inputbuf.remaining(), n, self.lsn, self.recordbuf.len(), self.contlen);
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
@@ -164,17 +172,10 @@ impl WalStreamDecoder {
|
||||
let recordbuf = recordbuf.freeze();
|
||||
let mut buf = recordbuf.clone();
|
||||
|
||||
let xlogrec = XLogRecord::from_bytes(&mut buf);
|
||||
|
||||
// XLOG_SWITCH records are special. If we see one, we need to skip
|
||||
// to the next WAL segment.
|
||||
let xlogrec = XLogRecord::from_bytes(&mut buf);
|
||||
let mut crc = crc32c_append(0, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
|
||||
crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
|
||||
if crc != xlogrec.xl_crc {
|
||||
return Err(WalDecodeError {
|
||||
msg: "WAL record crc mismatch".into(),
|
||||
lsn: self.lsn,
|
||||
});
|
||||
}
|
||||
if xlogrec.is_xlog_switch_record() {
|
||||
trace!("saw xlog switch record at {}", self.lsn);
|
||||
self.padlen =
|
||||
@@ -184,10 +185,23 @@ impl WalStreamDecoder {
|
||||
self.padlen = self.lsn.calc_padding(8u32) as u32;
|
||||
}
|
||||
|
||||
// Check record CRC
|
||||
if self.crc_check {
|
||||
let mut crc = crc32c_append(0, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
|
||||
crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
|
||||
if crc != xlogrec.xl_crc {
|
||||
info!("WAL record crc mismatch n={}, buf.len()={}, lsn={}, rec={:?}, recordbuf={:?}",
|
||||
n, recordbuf.len(), self.lsn, xlogrec, recordbuf);
|
||||
return Err(WalDecodeError {
|
||||
msg: format!("WAL record crc mismatch n={}, buf.len()={}, lsn={}, rec={:?}", n, buf.len(), self.lsn, xlogrec),
|
||||
lsn: self.lsn,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let result = (self.lsn, recordbuf);
|
||||
return Ok(Some(result));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
// check record boundaries
|
||||
|
||||
@@ -168,7 +168,7 @@ fn walreceiver_main(
|
||||
// too. Just for the sake of paranoia.
|
||||
startpoint += startpoint.calc_padding(8u32);
|
||||
|
||||
debug!(
|
||||
info!(
|
||||
"last_record_lsn {} starting replication from {} for timeline {}, server is at {}...",
|
||||
last_rec_lsn, startpoint, timelineid, end_of_wal
|
||||
);
|
||||
@@ -178,7 +178,7 @@ fn walreceiver_main(
|
||||
let copy_stream = rclient.copy_both_simple(&query)?;
|
||||
let mut physical_stream = ReplicationIter::new(copy_stream);
|
||||
|
||||
let mut waldecoder = WalStreamDecoder::new(startpoint);
|
||||
let mut waldecoder = WalStreamDecoder::new(startpoint, true);
|
||||
|
||||
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(RelishTag::Checkpoint, 0, startpoint)?;
|
||||
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
|
||||
@@ -203,14 +203,17 @@ fn walreceiver_main(
|
||||
tenantid,
|
||||
)?;
|
||||
|
||||
trace!("received XLogData between {} and {}", startlsn, endlsn);
|
||||
|
||||
info!("received XLogData between {} and {}", startlsn, endlsn);
|
||||
//assert!(waldecoder.lsn + waldecoder.padlen as u64 == startlsn);
|
||||
waldecoder.feed_bytes(data);
|
||||
|
||||
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
loop {
|
||||
match waldecoder.poll_decode() {
|
||||
Ok(Some((lsn, recdata))) => {
|
||||
// Save old checkpoint value to compare with it after decoding WAL record
|
||||
let old_checkpoint_bytes = checkpoint.encode();
|
||||
let decoded = decode_wal_record(recdata.clone());
|
||||
let decoded = decode_wal_record(recdata.clone());
|
||||
let lsn = Lsn((lsn.0 + 7) & !7); // align on 8
|
||||
restore_local_repo::save_decoded_record(
|
||||
&mut checkpoint,
|
||||
&*timeline,
|
||||
@@ -231,8 +234,17 @@ fn walreceiver_main(
|
||||
false,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
Ok(None) => {
|
||||
info!("End of replication stream {}..{} at {}", startlsn, endlsn, last_rec_lsn);
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
info!("Decode error {}", e);
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
// Update the last_valid LSN value in the page cache one more time. We updated
|
||||
// it in the loop above, between each WAL record, but we might have received
|
||||
// a partial record after the last completed record. Our page cache's value
|
||||
|
||||
@@ -108,17 +108,23 @@ fn find_end_of_wal_segment(
|
||||
segno: XLogSegNo,
|
||||
tli: TimeLineID,
|
||||
wal_seg_size: usize,
|
||||
is_partial: bool,
|
||||
rec_offs: &mut usize,
|
||||
rec_hdr: &mut [u8; XLOG_SIZE_OF_XLOG_RECORD],
|
||||
crc: &mut u32,
|
||||
check_contrec: bool
|
||||
) -> u32 {
|
||||
let mut offs: usize = 0;
|
||||
let mut contlen: usize = 0;
|
||||
let mut wal_crc: u32 = 0;
|
||||
let mut crc: u32 = 0;
|
||||
let mut rec_offs: usize = 0;
|
||||
let mut buf = [0u8; XLOG_BLCKSZ];
|
||||
let file_name = XLogFileName(tli, segno, wal_seg_size);
|
||||
let mut last_valid_rec_pos: usize = 0;
|
||||
let mut file = File::open(data_dir.join(file_name.clone() + ".partial")).unwrap();
|
||||
let mut rec_hdr = [0u8; XLOG_RECORD_CRC_OFFS];
|
||||
let file_path = data_dir.join(if is_partial {
|
||||
file_name.clone() + ".partial"
|
||||
} else {
|
||||
file_name
|
||||
});
|
||||
let mut file = File::open(&file_path).unwrap();
|
||||
|
||||
while offs < wal_seg_size {
|
||||
if offs % XLOG_BLCKSZ == 0 {
|
||||
@@ -133,14 +139,26 @@ fn find_end_of_wal_segment(
|
||||
let xlp_info = LittleEndian::read_u16(&buf[2..4]);
|
||||
let xlp_rem_len = LittleEndian::read_u32(&buf[XLP_REM_LEN_OFFS..XLP_REM_LEN_OFFS + 4]);
|
||||
if xlp_magic != XLOG_PAGE_MAGIC as u16 {
|
||||
info!("Invalid WAL file {}.partial magic {}", file_name, xlp_magic);
|
||||
info!("Invalid WAL file {:?} magic {}", &file_path, xlp_magic);
|
||||
break;
|
||||
}
|
||||
if offs == 0 {
|
||||
info!("xlp_rem_len={}", xlp_rem_len);
|
||||
offs = XLOG_SIZE_OF_XLOG_LONG_PHD;
|
||||
if (xlp_info & XLP_FIRST_IS_CONTRECORD) != 0 {
|
||||
offs += ((xlp_rem_len + 7) & !7) as usize;
|
||||
}
|
||||
if check_contrec {
|
||||
let xl_tot_len = LittleEndian::read_u32(&rec_hdr[0..4]) as usize;
|
||||
contlen = xlp_rem_len as usize;
|
||||
assert!(*rec_offs + contlen == xl_tot_len);
|
||||
} else {
|
||||
offs += ((xlp_rem_len + 7) & !7) as usize;
|
||||
}
|
||||
} else if *rec_offs != 0 {
|
||||
// There is incompleted page at previous segment but not cont record:
|
||||
// it means that current segment is not valid and we have to return back.
|
||||
info!("CONTRECORD flag is missed in page header");
|
||||
return 0;
|
||||
}
|
||||
} else {
|
||||
offs += XLOG_SIZE_OF_XLOG_SHORT_PHD;
|
||||
}
|
||||
@@ -150,9 +168,8 @@ fn find_end_of_wal_segment(
|
||||
if xl_tot_len == 0 {
|
||||
break;
|
||||
}
|
||||
last_valid_rec_pos = offs;
|
||||
offs += 4;
|
||||
rec_offs = 4;
|
||||
*rec_offs = 4;
|
||||
contlen = xl_tot_len - 4;
|
||||
rec_hdr[0..4].copy_from_slice(&buf[page_offs..page_offs + 4]);
|
||||
} else {
|
||||
@@ -162,34 +179,30 @@ fn find_end_of_wal_segment(
|
||||
|
||||
// read the rest of the record, or as much as fits on this page.
|
||||
let n = min(contlen, pageleft);
|
||||
if rec_offs < XLOG_RECORD_CRC_OFFS {
|
||||
let len = min(XLOG_RECORD_CRC_OFFS - rec_offs, n);
|
||||
rec_hdr[rec_offs..rec_offs + len].copy_from_slice(&buf[page_offs..page_offs + len]);
|
||||
let mut hdr_len: usize = 0;
|
||||
if *rec_offs < XLOG_SIZE_OF_XLOG_RECORD {
|
||||
// copy header
|
||||
hdr_len = min(XLOG_SIZE_OF_XLOG_RECORD - *rec_offs, n);
|
||||
rec_hdr[*rec_offs..*rec_offs + hdr_len].copy_from_slice(&buf[page_offs..page_offs + hdr_len]);
|
||||
}
|
||||
if rec_offs <= XLOG_RECORD_CRC_OFFS && rec_offs + n >= XLOG_SIZE_OF_XLOG_RECORD {
|
||||
let crc_offs = page_offs - rec_offs + XLOG_RECORD_CRC_OFFS;
|
||||
wal_crc = LittleEndian::read_u32(&buf[crc_offs..crc_offs + 4]);
|
||||
crc = crc32c_append(0, &buf[crc_offs + 4..page_offs + n]);
|
||||
crc = !crc;
|
||||
} else {
|
||||
crc ^= 0xFFFFFFFFu32;
|
||||
crc = crc32c_append(crc, &buf[page_offs..page_offs + n]);
|
||||
crc = !crc;
|
||||
}
|
||||
rec_offs += n;
|
||||
*crc = crc32c_append(*crc, &buf[page_offs+hdr_len..page_offs+n]);
|
||||
*rec_offs += n;
|
||||
offs += n;
|
||||
contlen -= n;
|
||||
|
||||
if contlen == 0 {
|
||||
crc = !crc;
|
||||
crc = crc32c_append(crc, &rec_hdr);
|
||||
*crc = crc32c_append(*crc, &rec_hdr[0..XLOG_RECORD_CRC_OFFS]);
|
||||
offs = (offs + 7) & !7; // pad on 8 bytes boundary */
|
||||
if crc == wal_crc {
|
||||
let wal_crc = LittleEndian::read_u32(&rec_hdr[XLOG_RECORD_CRC_OFFS..XLOG_RECORD_CRC_OFFS+4]);
|
||||
if *crc == wal_crc {
|
||||
last_valid_rec_pos = offs;
|
||||
// Reset rec_offs and crc for start of new record
|
||||
*rec_offs = 0;
|
||||
*crc = 0;
|
||||
} else {
|
||||
info!(
|
||||
"CRC mismatch {} vs {} at {}",
|
||||
crc, wal_crc, last_valid_rec_pos
|
||||
"CRC mismatch {} vs {} at offset {} lsn {}",
|
||||
*crc, wal_crc, offs, last_valid_rec_pos
|
||||
);
|
||||
break;
|
||||
}
|
||||
@@ -239,21 +252,67 @@ pub fn find_end_of_wal(
|
||||
}
|
||||
}
|
||||
if high_segno > 0 {
|
||||
let mut high_offs = 0;
|
||||
/*
|
||||
* Move the starting pointer to the start of the next segment, if the
|
||||
* highest one we saw was completed.
|
||||
*/
|
||||
if !high_ispartial {
|
||||
high_segno += 1;
|
||||
} else if precise {
|
||||
/* otherwise locate last record in last partial segment */
|
||||
high_offs = find_end_of_wal_segment(data_dir, high_segno, high_tli, wal_seg_size);
|
||||
let mut high_offs = 0;
|
||||
if precise {
|
||||
let mut crc: u32 = 0;
|
||||
let mut rec_offs: usize = 0;
|
||||
let mut rec_hdr = [0u8; XLOG_SIZE_OF_XLOG_RECORD];
|
||||
|
||||
/*
|
||||
* To be able to calculate CRC of records crossing segment boundary,
|
||||
* we need to parse previous segment
|
||||
*/
|
||||
assert!(high_segno > 1);
|
||||
let prev_offs = find_end_of_wal_segment(data_dir, high_segno-1, high_tli, wal_seg_size, false, &mut rec_offs, &mut rec_hdr, &mut crc, false);
|
||||
if prev_offs as usize <= XLOG_SIZE_OF_XLOG_LONG_PHD {
|
||||
info!("Segment {} doesn't contain any valid record {}", high_segno-1, prev_offs);
|
||||
}
|
||||
assert!(prev_offs as usize > XLOG_SIZE_OF_XLOG_LONG_PHD);
|
||||
high_offs = find_end_of_wal_segment(data_dir, high_segno, high_tli, wal_seg_size, high_ispartial, &mut rec_offs, &mut rec_hdr, &mut crc, true);
|
||||
|
||||
let file_name = XLogFileName(high_tli, high_segno, wal_seg_size);
|
||||
if high_offs as usize <= XLOG_SIZE_OF_XLOG_LONG_PHD {
|
||||
// If last segment contais no valid records, then return back to previous segment
|
||||
let wal_dir = data_dir.join("pg_wal");
|
||||
let file_path = if high_ispartial {
|
||||
wal_dir.join(file_name.clone() + ".partial")
|
||||
} else {
|
||||
wal_dir.join(file_name.clone())
|
||||
};
|
||||
info!("Remove empty WAL segment {:?}", &file_path);
|
||||
if let Err(e) = fs::remove_file(&file_path) {
|
||||
info!("Failed to remove file {:?}: {}", &file_path, e);
|
||||
}
|
||||
high_ispartial = false; // previous segment should not be partial
|
||||
high_segno -= 1;
|
||||
high_offs = prev_offs;
|
||||
}
|
||||
// If last segment is not marked as partial, it means that next segment
|
||||
// was not written. Let's make this segment partial once again.
|
||||
if !high_ispartial {
|
||||
let wal_dir = data_dir.join("pg_wal");
|
||||
if let Err(e) = fs::rename(
|
||||
wal_dir.join(file_name.clone()),
|
||||
wal_dir.join(file_name.clone() + ".partial")
|
||||
) {
|
||||
info!(
|
||||
"Failed to rename {} to {}.partial: {}",
|
||||
&file_name, &file_name, e);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
/*
|
||||
* Move the starting pointer to the start of the next segment, if the
|
||||
* highest one we saw was completed.
|
||||
*/
|
||||
if !high_ispartial {
|
||||
high_segno += 1;
|
||||
}
|
||||
}
|
||||
let high_ptr = XLogSegNoOffsetToRecPtr(high_segno, high_offs, wal_seg_size);
|
||||
return (high_ptr, high_tli);
|
||||
}
|
||||
(0, 0)
|
||||
(0, 1) // First timeline is 1
|
||||
}
|
||||
|
||||
pub fn main() {
|
||||
|
||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: 022285aea5...603feabe77
@@ -28,6 +28,7 @@ use crate::send_wal::SendWalHandler;
|
||||
use crate::timeline::{Timeline, TimelineTools};
|
||||
use crate::WalAcceptorConf;
|
||||
use postgres_ffi::xlog_utils::{TimeLineID, XLogFileName, MAX_SEND_SIZE, XLOG_BLCKSZ};
|
||||
use pageserver::waldecoder::WalStreamDecoder;
|
||||
|
||||
pub const SK_MAGIC: u32 = 0xcafeceefu32;
|
||||
pub const SK_FORMAT_VERSION: u32 = 1;
|
||||
@@ -84,6 +85,8 @@ pub struct SafeKeeperInfo {
|
||||
pub commit_lsn: Lsn,
|
||||
/// locally flushed part of WAL
|
||||
pub flush_lsn: Lsn,
|
||||
/// locally flushed last record LSN
|
||||
pub safe_lsn: Lsn,
|
||||
/// minimal LSN which may be needed for recovery of some safekeeper: min(commit_lsn) for all safekeepers
|
||||
pub restart_lsn: Lsn,
|
||||
}
|
||||
@@ -110,6 +113,7 @@ impl SafeKeeperInfo {
|
||||
},
|
||||
commit_lsn: Lsn(0), /* part of WAL acknowledged by quorum */
|
||||
flush_lsn: Lsn(0), /* locally flushed part of WAL */
|
||||
safe_lsn: Lsn(0), /* locally flushed last record LSN */
|
||||
restart_lsn: Lsn(0), /* minimal LSN which may be needed for recovery of some safekeeper */
|
||||
}
|
||||
}
|
||||
@@ -135,6 +139,7 @@ struct SafeKeeperRequest {
|
||||
struct SafeKeeperResponse {
|
||||
epoch: u64,
|
||||
flush_lsn: Lsn,
|
||||
safe_lsn: Lsn,
|
||||
hs_feedback: HotStandbyFeedback,
|
||||
}
|
||||
|
||||
@@ -275,6 +280,7 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
/* Calculate WAL end based on local data */
|
||||
let (flush_lsn, timeline_id) = this_timeline.find_end_of_wal(&swh.conf.data_dir, true);
|
||||
my_info.flush_lsn = flush_lsn;
|
||||
my_info.safe_lsn = flush_lsn; // end_of_wal actually returns last record LSN
|
||||
my_info.server.timeline = timeline_id;
|
||||
|
||||
info!(
|
||||
@@ -300,8 +306,8 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
);
|
||||
}
|
||||
my_info.server.node_id = prop.node_id;
|
||||
this_timeline.get().set_info(&my_info);
|
||||
/* Need to persist our vote first */
|
||||
this_timeline.get().set_info(&my_info);
|
||||
this_timeline.get().save_control_file(true)?;
|
||||
|
||||
let mut flushed_restart_lsn = Lsn(0);
|
||||
@@ -322,9 +328,11 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
}
|
||||
|
||||
info!(
|
||||
"Start accepting WAL for timeline {} tenant {} address {:?} flush_lsn={}",
|
||||
server_info.timeline_id, server_info.tenant_id, self.peer_addr, my_info.flush_lsn
|
||||
"Start accepting WAL for timeline {} tenant {} address {:?} flush_lsn={} safe_lsn={}",
|
||||
server_info.timeline_id, server_info.tenant_id, self.peer_addr, my_info.flush_lsn, my_info.safe_lsn
|
||||
);
|
||||
let mut last_rec_lsn = Lsn(0);
|
||||
let mut decoder = WalStreamDecoder::new(last_rec_lsn, false);
|
||||
|
||||
// Main loop
|
||||
loop {
|
||||
@@ -347,27 +355,47 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
let end_pos = req.end_lsn;
|
||||
let rec_size = end_pos.checked_sub(start_pos).unwrap().0 as usize;
|
||||
assert!(rec_size <= MAX_SEND_SIZE);
|
||||
if rec_size != 0 {
|
||||
debug!(
|
||||
"received for {} bytes between {} and {}",
|
||||
rec_size, start_pos, end_pos,
|
||||
);
|
||||
|
||||
debug!(
|
||||
"received for {} bytes between {} and {}",
|
||||
rec_size, start_pos, end_pos,
|
||||
);
|
||||
/* Receive message body (from the rest of the message) */
|
||||
let mut buf = Vec::with_capacity(rec_size);
|
||||
msg_reader.read_to_end(&mut buf)?;
|
||||
assert_eq!(buf.len(), rec_size);
|
||||
|
||||
/* Receive message body (from the rest of the message) */
|
||||
let mut buf = Vec::with_capacity(rec_size);
|
||||
msg_reader.read_to_end(&mut buf)?;
|
||||
assert_eq!(buf.len(), rec_size);
|
||||
|
||||
/* Save message in file */
|
||||
Self::write_wal_file(
|
||||
swh,
|
||||
start_pos,
|
||||
timeline_id,
|
||||
this_timeline.get(),
|
||||
wal_seg_size,
|
||||
&buf,
|
||||
)?;
|
||||
if decoder.available() != start_pos {
|
||||
info!("Restart decoder from {} to {}", decoder.available(), start_pos);
|
||||
decoder = WalStreamDecoder::new(start_pos, false);
|
||||
}
|
||||
decoder.feed_bytes(&buf);
|
||||
//while let Ok(Some((lsn,_rec))) = decoder.poll_decode() {
|
||||
loop {
|
||||
match decoder.poll_decode() {
|
||||
Err(e) => info!("Decode error {}", e),
|
||||
Ok(None) => info!("Decode end"),
|
||||
Ok(Some((lsn,_rec))) => {
|
||||
last_rec_lsn = lsn;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
last_rec_lsn = Lsn((last_rec_lsn.0 + 7) & !7); // align record start position on 8
|
||||
info!("Receive WAL {}..{} last_rec_lsn={}", start_pos, end_pos, last_rec_lsn);
|
||||
|
||||
/* Save message in file */
|
||||
Self::write_wal_file(
|
||||
swh,
|
||||
start_pos,
|
||||
timeline_id,
|
||||
this_timeline.get(),
|
||||
wal_seg_size,
|
||||
&buf,
|
||||
)?;
|
||||
}
|
||||
my_info.restart_lsn = req.restart_lsn;
|
||||
my_info.commit_lsn = req.commit_lsn;
|
||||
|
||||
@@ -385,12 +413,16 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
if end_pos > my_info.flush_lsn {
|
||||
my_info.flush_lsn = end_pos;
|
||||
}
|
||||
if last_rec_lsn > my_info.safe_lsn {
|
||||
my_info.safe_lsn = last_rec_lsn;
|
||||
}
|
||||
/*
|
||||
* Update restart LSN in control file.
|
||||
* To avoid negative impact on performance of extra fsync, do it only
|
||||
* when restart_lsn delta exceeds WAL segment size.
|
||||
*/
|
||||
sync_control_file |= flushed_restart_lsn + (wal_seg_size as u64) < my_info.restart_lsn;
|
||||
this_timeline.get().set_info(&my_info);
|
||||
this_timeline.get().save_control_file(sync_control_file)?;
|
||||
|
||||
if sync_control_file {
|
||||
@@ -401,7 +433,8 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
//info!("Confirm LSN: {:X}/{:>08X}", (end_pos>>32) as u32, end_pos as u32);
|
||||
let resp = SafeKeeperResponse {
|
||||
epoch: my_info.epoch,
|
||||
flush_lsn: end_pos,
|
||||
flush_lsn: my_info.flush_lsn,
|
||||
safe_lsn: my_info.safe_lsn,
|
||||
hs_feedback: this_timeline.get().get_hs_feedback(),
|
||||
};
|
||||
self.write_msg(&resp)?;
|
||||
@@ -410,9 +443,10 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
* Ping wal sender that new data is available.
|
||||
* FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper.
|
||||
*/
|
||||
info!("Notify WAL senders min({}, {})={}", req.commit_lsn, my_info.safe_lsn, min(req.commit_lsn, my_info.safe_lsn));
|
||||
this_timeline
|
||||
.get()
|
||||
.notify_wal_senders(min(req.commit_lsn, end_pos));
|
||||
.notify_wal_senders(min(req.commit_lsn, my_info.safe_lsn));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -217,9 +217,9 @@ impl ReplicationConn {
|
||||
data: &file_buf,
|
||||
}))?;
|
||||
|
||||
start_pos += send_size as u64;
|
||||
info!("Sent WAL to page server {}..{}, end_pos={}", start_pos, start_pos + send_size as u64, end_pos);
|
||||
|
||||
debug!("Sent WAL to page server up to {}", end_pos);
|
||||
start_pos += send_size as u64;
|
||||
|
||||
// Decide whether to reuse this file. If we don't set wal_file here
|
||||
// a new file will be opened next time.
|
||||
|
||||
@@ -175,7 +175,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
fn _stop_wal_senders(&self) {
|
||||
pub fn stop_wal_senders(&self) {
|
||||
self.notify_wal_senders(END_REPLICATION_MARKER);
|
||||
}
|
||||
|
||||
|
||||
@@ -209,6 +209,11 @@ where
|
||||
pub fn load(&self) -> T {
|
||||
self.internal.lock().unwrap().current
|
||||
}
|
||||
|
||||
/// Set the current value.
|
||||
pub fn store(&self, num: T) {
|
||||
self.internal.lock().unwrap().current = num
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
Reference in New Issue
Block a user