Compare commits

...

14 Commits

Author SHA1 Message Date
Arseny Sher
a8a2f62bc3 imactive 2022-01-11 18:14:59 +03:00
Konstantin Knizhnik
26060dd68e Disable write WAL to files at pageserver 2021-08-31 11:13:55 +03:00
Konstantin Knizhnik
73d823e53c Make it possible for WAL decoder to skip continuation records 2021-08-31 10:59:26 +03:00
Konstantin Knizhnik
112909c5e4 Handle wal records larger than WAL segment size in find_end_of_wal 2021-08-30 17:32:40 +03:00
Konstantin Knizhnik
07adc9dbda Fix unit test for find_end_of_wal 2021-08-27 14:59:07 +03:00
Konstantin Knizhnik
c05cedc626 Do not check cont record for second segment because itcontains dummy checkpoint record 2021-08-27 12:48:28 +03:00
Konstantin Knizhnik
815528e0ce Use last record LSN as flush position reported by safekeepers to walproposer to prevent moving VCL backward on compute node restart 2021-08-26 18:08:29 +03:00
Konstantin Knizhnik
a2e135b404 Maintain safe LSN position at safekeepers 2021-08-25 10:24:45 +03:00
Stas Kelvich
72de70a8cc Change test_restart_compute to expose safekeeper problems 2021-08-25 00:42:08 +03:00
Konstantin Knizhnik
4051c5d4ff Undo some redundant fixes 2021-08-20 12:31:53 +03:00
Konstantin Knizhnik
f86bf26466 Restore icluding postgresql.conf in basebackup 2021-08-20 11:23:57 +03:00
Konstantin Knizhnik
3ca4b638ac Merge with main 2021-08-20 10:55:34 +03:00
Konstantin Knizhnik
d61699b0f8 [refer #439] Fix submodule version 2021-08-19 19:56:49 +03:00
Konstantin Knizhnik
ead94feb05 [refer #439] Correctly handle LSN parameter in BASEBACKUP command 2021-08-19 19:53:22 +03:00
19 changed files with 557 additions and 378 deletions

View File

@@ -320,7 +320,7 @@ impl PostgresNode {
// Never clean up old WAL. TODO: We should use a replication
// slot or something proper, to prevent the compute node
// from removing WAL that hasn't been streamed to the safekeepr or
// from removing WAL that hasn't been streamed to the safekeeper or
// page server yet. (gh issue #349)
self.append_conf("postgresql.conf", "wal_keep_size='10TB'\n")?;

0
f Normal file
View File

View File

@@ -842,6 +842,34 @@ impl Timeline for LayeredTimeline {
fn get_prev_record_lsn(&self) -> Lsn {
self.prev_record_lsn.load()
}
///
/// Wait until WAL has been received up to the given LSN.
///
fn wait_lsn(&self, mut lsn: Lsn) -> anyhow::Result<Lsn> {
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
// This is necessary for bootstrap.
if lsn == Lsn(0) {
let last_valid_lsn = self.last_valid_lsn.load();
trace!(
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
last_valid_lsn,
lsn
);
lsn = last_valid_lsn;
}
self.last_valid_lsn
.wait_for_timeout(lsn, TIMEOUT)
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {} to arrive",
lsn
)
})?;
Ok(lsn)
}
}
impl LayeredTimeline {
@@ -1055,34 +1083,6 @@ impl LayeredTimeline {
Ok(layer_rc)
}
///
/// Wait until WAL has been received up to the given LSN.
///
fn wait_lsn(&self, mut lsn: Lsn) -> anyhow::Result<Lsn> {
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
// This is necessary for bootstrap.
if lsn == Lsn(0) {
let last_valid_lsn = self.last_valid_lsn.load();
trace!(
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
last_valid_lsn,
lsn
);
lsn = last_valid_lsn;
}
self.last_valid_lsn
.wait_for_timeout(lsn, TIMEOUT)
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {} to arrive",
lsn
)
})?;
Ok(lsn)
}
///
/// Flush to disk all data that was written with the put_* functions
///

View File

@@ -663,7 +663,7 @@ impl Timeline for ObjectTimeline {
assert!(old <= lsn);
// Use old value of last_record_lsn as prev_record_lsn
self.prev_record_lsn.fetch_max(Lsn((old.0 + 7) & !7));
self.prev_record_lsn.fetch_max(old.align());
// Also advance last_valid_lsn
let old = self.last_valid_lsn.advance(lsn);
@@ -712,6 +712,41 @@ impl Timeline for ObjectTimeline {
let iter = self.obj_store.objects(self.timelineid, lsn)?;
Ok(Box::new(ObjectHistory { lsn, iter }))
}
//
// Wait until WAL has been received up to the given LSN.
//
fn wait_lsn(&self, req_lsn: Lsn) -> Result<Lsn> {
let mut lsn = req_lsn;
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
// This is necessary for bootstrap.
if lsn == Lsn(0) {
let last_valid_lsn = self.last_valid_lsn.load();
trace!(
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
last_valid_lsn,
lsn
);
lsn = last_valid_lsn;
}
trace!(
"Start waiting for LSN {}, valid LSN is {}",
lsn,
self.last_valid_lsn.load()
);
self.last_valid_lsn
.wait_for_timeout(lsn, TIMEOUT)
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {} to arrive. valid LSN in {}",
lsn,
self.last_valid_lsn.load(),
)
})?;
//trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
Ok(lsn)
}
}
impl ObjectTimeline {
@@ -820,40 +855,6 @@ impl ObjectTimeline {
}
}
//
// Wait until WAL has been received up to the given LSN.
//
fn wait_lsn(&self, mut lsn: Lsn) -> Result<Lsn> {
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
// This is necessary for bootstrap.
if lsn == Lsn(0) {
let last_valid_lsn = self.last_valid_lsn.load();
trace!(
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
last_valid_lsn,
lsn
);
lsn = last_valid_lsn;
}
trace!(
"Start waiting for LSN {}, valid LSN is {}",
lsn,
self.last_valid_lsn.load()
);
self.last_valid_lsn
.wait_for_timeout(lsn, TIMEOUT)
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {} to arrive. valid LSN in {}",
lsn,
self.last_valid_lsn.load(),
)
})?;
//trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
Ok(lsn)
}
///
/// Iterate through object versions with given key, in reverse LSN order.
///

View File

@@ -357,8 +357,13 @@ impl PageServerHandler {
/* Send a tarball of the latest snapshot on the timeline */
let req_lsn = lsn.unwrap_or_else(|| timeline.get_last_valid_lsn());
let req_lsn = match lsn {
Some(lsn) => {
timeline.wait_lsn(lsn)?;
lsn
}
None => timeline.get_last_record_lsn(),
};
{
let mut writer = CopyDataSink { pgb };
let mut basebackup = basebackup::Basebackup::new(
@@ -469,7 +474,7 @@ impl postgres_backend::Handler for PageServerHandler {
let (_, params_raw) = query_string.split_at("basebackup ".len());
let params = params_raw.split(" ").collect::<Vec<_>>();
ensure!(
params.len() == 2,
params.len() >= 2,
"invalid param number for basebackup command"
);
@@ -479,7 +484,7 @@ impl postgres_backend::Handler for PageServerHandler {
self.check_permission(Some(tenantid))?;
// TODO are there any tests with lsn option?
let lsn = if params.len() == 3 {
let lsn = if params.len() == 3 && params[2].len() != 0 {
Some(Lsn::from_str(params[2])?)
} else {
None
@@ -575,6 +580,10 @@ impl postgres_backend::Handler for PageServerHandler {
timeline.advance_last_valid_lsn(last_lsn);
break;
}
FeMessage::CopyFailed => {
info!("Copy failed");
break;
}
FeMessage::Sync => {}
_ => bail!("unexpected message {:?}", msg),
}

View File

@@ -203,6 +203,11 @@ pub trait Timeline: Send + Sync {
/// Relation size is increased implicitly and decreased with Truncate updates.
// TODO ordering guarantee?
fn history<'a>(&'a self) -> Result<Box<dyn History + 'a>>;
//
// Wait until WAL has been received up to the given LSN.
//
fn wait_lsn(&self, lsn: Lsn) -> Result<Lsn>;
}
pub trait History: Iterator<Item = Result<Modification>> {

View File

@@ -264,7 +264,7 @@ fn import_slru_file(timeline: &dyn Timeline, lsn: Lsn, slru: SlruKind, path: &Pa
/// Scan PostgreSQL WAL files in given directory
/// and load all records >= 'startpoint' into the repository.
pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: Lsn) -> Result<()> {
let mut waldecoder = WalStreamDecoder::new(startpoint);
let mut waldecoder = WalStreamDecoder::new(startpoint, true);
let mut segno = startpoint.segment_number(pg_constants::WAL_SEGMENT_SIZE);
let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE);
@@ -427,7 +427,9 @@ pub fn save_decoded_record(
// Remove twophase file. see RemoveTwoPhaseFile() in postgres code
trace!(
"unlink twophaseFile for xid {} parsed_xact.xid {} here at {}",
decoded.xl_xid, parsed_xact.xid, lsn
decoded.xl_xid,
parsed_xact.xid,
lsn
);
timeline.put_unlink(
RelishTag::TwoPhase {

View File

@@ -25,13 +25,13 @@ pub type MultiXactStatus = u32;
pub struct WalStreamDecoder {
lsn: Lsn,
startlsn: Lsn, // LSN where this record starts
contlen: u32,
padlen: u32,
inputbuf: BytesMut,
recordbuf: BytesMut,
crc_check: bool,
}
#[derive(Error, Debug, Clone)]
@@ -46,19 +46,24 @@ pub struct WalDecodeError {
// FIXME: This isn't a proper rust stream
//
impl WalStreamDecoder {
pub fn new(lsn: Lsn) -> WalStreamDecoder {
pub fn new(lsn: Lsn, crc_check: bool) -> WalStreamDecoder {
WalStreamDecoder {
lsn,
startlsn: Lsn(0),
contlen: 0,
padlen: 0,
inputbuf: BytesMut::new(),
recordbuf: BytesMut::new(),
crc_check,
}
}
pub fn available(&self) -> Lsn {
self.lsn + self.inputbuf.remaining() as u64
}
pub fn feed_bytes(&mut self, buf: &[u8]) {
self.inputbuf.extend_from_slice(buf);
}
@@ -92,7 +97,9 @@ impl WalStreamDecoder {
// TODO: verify the remaining fields in the header
self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
continue;
if !self.crc_check && self.contlen != hdr.std.xlp_rem_len {
self.contlen = hdr.std.xlp_rem_len; // skip continuation record
}
} else if self.lsn.block_offset() == 0 {
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD {
return Ok(None);
@@ -102,14 +109,19 @@ impl WalStreamDecoder {
if hdr.xlp_pageaddr != self.lsn.0 {
return Err(WalDecodeError {
msg: "invalid xlog page header".into(),
msg: format!(
"invalid xlog page header: xlp_pageaddr={} vs. lsn={}",
hdr.xlp_pageaddr, self.lsn
),
lsn: self.lsn,
});
}
// TODO: verify the remaining fields in the header
self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
continue;
if !self.crc_check && self.contlen != hdr.xlp_rem_len {
self.contlen = hdr.xlp_rem_len; // skip continuation record
}
} else if self.padlen > 0 {
if self.inputbuf.remaining() < self.padlen as usize {
return Ok(None);
@@ -127,7 +139,6 @@ impl WalStreamDecoder {
}
// read xl_tot_len FIXME: assumes little-endian
self.startlsn = self.lsn;
let xl_tot_len = self.inputbuf.get_u32_le();
if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
return Err(WalDecodeError {
@@ -142,7 +153,6 @@ impl WalStreamDecoder {
self.recordbuf.put_u32_le(xl_tot_len);
self.contlen = xl_tot_len - 4;
continue;
} else {
// we're continuing a record, possibly from previous page.
let pageleft = self.lsn.remaining_in_block() as u32;
@@ -164,17 +174,10 @@ impl WalStreamDecoder {
let recordbuf = recordbuf.freeze();
let mut buf = recordbuf.clone();
let xlogrec = XLogRecord::from_bytes(&mut buf);
// XLOG_SWITCH records are special. If we see one, we need to skip
// to the next WAL segment.
let xlogrec = XLogRecord::from_bytes(&mut buf);
let mut crc = crc32c_append(0, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
if crc != xlogrec.xl_crc {
return Err(WalDecodeError {
msg: "WAL record crc mismatch".into(),
lsn: self.lsn,
});
}
if xlogrec.is_xlog_switch_record() {
trace!("saw xlog switch record at {}", self.lsn);
self.padlen =
@@ -184,10 +187,29 @@ impl WalStreamDecoder {
self.padlen = self.lsn.calc_padding(8u32) as u32;
}
let result = (self.lsn, recordbuf);
// Check record CRC
if self.crc_check {
let mut crc = crc32c_append(0, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
if crc != xlogrec.xl_crc {
info!("WAL record crc mismatch n={}, buf.len()={}, lsn={}, rec={:?}, recordbuf={:?}",
n, recordbuf.len(), self.lsn, xlogrec, recordbuf);
return Err(WalDecodeError {
msg: format!(
"WAL record crc mismatch n={}, buf.len()={}, lsn={}, rec={:?}",
n,
buf.len(),
self.lsn,
xlogrec
),
lsn: self.lsn,
});
}
}
let result = (self.lsn.align(), recordbuf);
return Ok(Some(result));
}
continue;
}
}
// check record boundaries

View File

@@ -22,8 +22,6 @@ use postgres_types::PgLsn;
use std::cmp::{max, min};
use std::collections::HashMap;
use std::fs;
use std::fs::{File, OpenOptions};
use std::io::{Seek, SeekFrom, Write};
use std::str::FromStr;
use std::sync::Mutex;
use std::thread;
@@ -178,7 +176,7 @@ fn walreceiver_main(
let copy_stream = rclient.copy_both_simple(&query)?;
let mut physical_stream = ReplicationIter::new(copy_stream);
let mut waldecoder = WalStreamDecoder::new(startpoint);
let mut waldecoder = WalStreamDecoder::new(startpoint, true);
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(RelishTag::Checkpoint, 0, startpoint)?;
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
@@ -194,45 +192,51 @@ fn walreceiver_main(
let endlsn = startlsn + data.len() as u64;
let prev_last_rec_lsn = last_rec_lsn;
write_wal_file(
conf,
startlsn,
&timelineid,
pg_constants::WAL_SEGMENT_SIZE,
data,
tenantid,
)?;
trace!("received XLogData between {} and {}", startlsn, endlsn);
waldecoder.feed_bytes(data);
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
// Save old checkpoint value to compare with it after decoding WAL record
let old_checkpoint_bytes = checkpoint.encode();
let decoded = decode_wal_record(recdata.clone());
restore_local_repo::save_decoded_record(
&mut checkpoint,
&*timeline,
&decoded,
recdata,
lsn,
)?;
last_rec_lsn = lsn;
loop {
match waldecoder.poll_decode() {
Ok(Some((lsn, recdata))) => {
// Save old checkpoint value to compare with it after decoding WAL record
let old_checkpoint_bytes = checkpoint.encode();
let decoded = decode_wal_record(recdata.clone());
restore_local_repo::save_decoded_record(
&mut checkpoint,
&*timeline,
&decoded,
recdata,
lsn,
)?;
last_rec_lsn = lsn;
let new_checkpoint_bytes = checkpoint.encode();
// Check if checkpoint data was updated by save_decoded_record
if new_checkpoint_bytes != old_checkpoint_bytes {
timeline.put_page_image(
RelishTag::Checkpoint,
0,
lsn,
new_checkpoint_bytes,
false,
)?;
let new_checkpoint_bytes = checkpoint.encode();
// Check if checkpoint data was updated by save_decoded_record
if new_checkpoint_bytes != old_checkpoint_bytes {
timeline.put_page_image(
RelishTag::Checkpoint,
0,
lsn,
new_checkpoint_bytes,
false,
)?;
}
}
Ok(None) => {
trace!(
"End of replication stream {}..{} at {}",
startlsn,
endlsn,
last_rec_lsn
);
break;
}
Err(e) => {
info!("Decode error {}", e);
return Err(e.into());
}
}
}
// Update the last_valid LSN value in the page cache one more time. We updated
// it in the loop above, between each WAL record, but we might have received
// a partial record after the last completed record. Our page cache's value
@@ -407,98 +411,3 @@ pub fn identify_system(client: &mut Client) -> Result<IdentifySystem, Error> {
Err(IdentifyError.into())
}
}
fn write_wal_file(
conf: &PageServerConf,
startpos: Lsn,
timelineid: &ZTimelineId,
wal_seg_size: usize,
buf: &[u8],
tenantid: &ZTenantId,
) -> anyhow::Result<()> {
let mut bytes_left: usize = buf.len();
let mut bytes_written: usize = 0;
let mut partial;
let mut start_pos = startpos;
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
let wal_dir = conf.wal_dir_path(timelineid, tenantid);
/* Extract WAL location for this block */
let mut xlogoff = start_pos.segment_offset(wal_seg_size);
while bytes_left != 0 {
let bytes_to_write;
/*
* If crossing a WAL boundary, only write up until we reach wal
* segment size.
*/
if xlogoff + bytes_left > wal_seg_size {
bytes_to_write = wal_seg_size - xlogoff;
} else {
bytes_to_write = bytes_left;
}
/* Open file */
let segno = start_pos.segment_number(wal_seg_size);
let wal_file_name = XLogFileName(
1, // FIXME: always use Postgres timeline 1
segno,
wal_seg_size,
);
let wal_file_path = wal_dir.join(wal_file_name.clone());
let wal_file_partial_path = wal_dir.join(wal_file_name.clone() + ".partial");
{
let mut wal_file: File;
/* Try to open already completed segment */
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
wal_file = file;
partial = false;
} else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) {
/* Try to open existed partial file */
wal_file = file;
partial = true;
} else {
/* Create and fill new partial file */
partial = true;
match OpenOptions::new()
.create(true)
.write(true)
.open(&wal_file_partial_path)
{
Ok(mut file) => {
for _ in 0..(wal_seg_size / XLOG_BLCKSZ) {
file.write_all(&ZERO_BLOCK)?;
}
wal_file = file;
}
Err(e) => {
error!("Failed to open log file {:?}: {}", &wal_file_path, e);
return Err(e.into());
}
}
}
wal_file.seek(SeekFrom::Start(xlogoff as u64))?;
wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?;
// FIXME: Flush the file
//wal_file.sync_all()?;
}
/* Write was successful, advance our position */
bytes_written += bytes_to_write;
bytes_left -= bytes_to_write;
start_pos += bytes_to_write as u64;
xlogoff += bytes_to_write;
/* Did we reach the end of a WAL segment? */
if start_pos.segment_offset(wal_seg_size) == 0 {
xlogoff = 0;
if partial {
fs::rename(&wal_file_partial_path, &wal_file_path)?;
}
}
}
Ok(())
}

View File

@@ -108,17 +108,23 @@ fn find_end_of_wal_segment(
segno: XLogSegNo,
tli: TimeLineID,
wal_seg_size: usize,
is_partial: bool,
rec_offs: &mut usize,
rec_hdr: &mut [u8; XLOG_SIZE_OF_XLOG_RECORD],
crc: &mut u32,
check_contrec: bool,
) -> u32 {
let mut offs: usize = 0;
let mut contlen: usize = 0;
let mut wal_crc: u32 = 0;
let mut crc: u32 = 0;
let mut rec_offs: usize = 0;
let mut buf = [0u8; XLOG_BLCKSZ];
let file_name = XLogFileName(tli, segno, wal_seg_size);
let mut last_valid_rec_pos: usize = 0;
let mut file = File::open(data_dir.join(file_name.clone() + ".partial")).unwrap();
let mut rec_hdr = [0u8; XLOG_RECORD_CRC_OFFS];
let file_path = data_dir.join(if is_partial {
file_name.clone() + ".partial"
} else {
file_name
});
let mut file = File::open(&file_path).unwrap();
while offs < wal_seg_size {
if offs % XLOG_BLCKSZ == 0 {
@@ -133,13 +139,33 @@ fn find_end_of_wal_segment(
let xlp_info = LittleEndian::read_u16(&buf[2..4]);
let xlp_rem_len = LittleEndian::read_u32(&buf[XLP_REM_LEN_OFFS..XLP_REM_LEN_OFFS + 4]);
if xlp_magic != XLOG_PAGE_MAGIC as u16 {
info!("Invalid WAL file {}.partial magic {}", file_name, xlp_magic);
info!("Invalid WAL file {:?} magic {}", &file_path, xlp_magic);
break;
}
if offs == 0 {
offs = XLOG_SIZE_OF_XLOG_LONG_PHD;
if (xlp_info & XLP_FIRST_IS_CONTRECORD) != 0 {
offs += ((xlp_rem_len + 7) & !7) as usize;
if check_contrec {
let xl_tot_len = LittleEndian::read_u32(&rec_hdr[0..4]) as usize;
contlen = xlp_rem_len as usize;
if *rec_offs + contlen < xl_tot_len
|| (*rec_offs + contlen != xl_tot_len
&& contlen != XLOG_BLCKSZ - XLOG_SIZE_OF_XLOG_LONG_PHD)
{
info!(
"Corrupted continuation record: offs={}, contlen={}, xl_tot_len={}",
*rec_offs, contlen, xl_tot_len
);
return 0;
}
} else {
offs += ((xlp_rem_len + 7) & !7) as usize;
}
} else if *rec_offs != 0 {
// There is incompleted page at previous segment but no cont record:
// it means that current segment is not valid and we have to return back.
info!("CONTRECORD flag is missed in page header");
return 0;
}
} else {
offs += XLOG_SIZE_OF_XLOG_SHORT_PHD;
@@ -150,9 +176,8 @@ fn find_end_of_wal_segment(
if xl_tot_len == 0 {
break;
}
last_valid_rec_pos = offs;
offs += 4;
rec_offs = 4;
*rec_offs = 4;
contlen = xl_tot_len - 4;
rec_hdr[0..4].copy_from_slice(&buf[page_offs..page_offs + 4]);
} else {
@@ -162,34 +187,33 @@ fn find_end_of_wal_segment(
// read the rest of the record, or as much as fits on this page.
let n = min(contlen, pageleft);
if rec_offs < XLOG_RECORD_CRC_OFFS {
let len = min(XLOG_RECORD_CRC_OFFS - rec_offs, n);
rec_hdr[rec_offs..rec_offs + len].copy_from_slice(&buf[page_offs..page_offs + len]);
let mut hdr_len: usize = 0;
if *rec_offs < XLOG_SIZE_OF_XLOG_RECORD {
// copy header
hdr_len = min(XLOG_SIZE_OF_XLOG_RECORD - *rec_offs, n);
rec_hdr[*rec_offs..*rec_offs + hdr_len]
.copy_from_slice(&buf[page_offs..page_offs + hdr_len]);
}
if rec_offs <= XLOG_RECORD_CRC_OFFS && rec_offs + n >= XLOG_SIZE_OF_XLOG_RECORD {
let crc_offs = page_offs - rec_offs + XLOG_RECORD_CRC_OFFS;
wal_crc = LittleEndian::read_u32(&buf[crc_offs..crc_offs + 4]);
crc = crc32c_append(0, &buf[crc_offs + 4..page_offs + n]);
crc = !crc;
} else {
crc ^= 0xFFFFFFFFu32;
crc = crc32c_append(crc, &buf[page_offs..page_offs + n]);
crc = !crc;
}
rec_offs += n;
*crc = crc32c_append(*crc, &buf[page_offs + hdr_len..page_offs + n]);
*rec_offs += n;
offs += n;
contlen -= n;
if contlen == 0 {
crc = !crc;
crc = crc32c_append(crc, &rec_hdr);
*crc = crc32c_append(*crc, &rec_hdr[0..XLOG_RECORD_CRC_OFFS]);
offs = (offs + 7) & !7; // pad on 8 bytes boundary */
if crc == wal_crc {
let wal_crc = LittleEndian::read_u32(
&rec_hdr[XLOG_RECORD_CRC_OFFS..XLOG_RECORD_CRC_OFFS + 4],
);
if *crc == wal_crc {
last_valid_rec_pos = offs;
// Reset rec_offs and crc for start of new record
*rec_offs = 0;
*crc = 0;
} else {
info!(
"CRC mismatch {} vs {} at {}",
crc, wal_crc, last_valid_rec_pos
"CRC mismatch {} vs {} at offset {} lsn {}",
*crc, wal_crc, offs, last_valid_rec_pos
);
break;
}
@@ -240,20 +264,142 @@ pub fn find_end_of_wal(
}
if high_segno > 0 {
let mut high_offs = 0;
/*
* Move the starting pointer to the start of the next segment, if the
* highest one we saw was completed.
*/
if !high_ispartial {
high_segno += 1;
} else if precise {
/* otherwise locate last record in last partial segment */
high_offs = find_end_of_wal_segment(data_dir, high_segno, high_tli, wal_seg_size);
if precise {
let mut crc: u32 = 0;
let mut rec_offs: usize = 0;
let mut rec_hdr = [0u8; XLOG_SIZE_OF_XLOG_RECORD];
let wal_dir = data_dir.join("pg_wal");
/*
* To be able to calculate CRC of records crossing segment boundary,
* we need to parse previous segments.
* So first traverse segments in backward direction to locate record start
* and then traverse forward, accumulating CRC.
*/
let mut prev_segno = high_segno - 1;
let mut prev_offs: u32 = 0;
while prev_segno > 1 {
// TOFO: first segment constains dummy checkpoint record at the beginning
prev_offs = find_end_of_wal_segment(
data_dir,
prev_segno,
high_tli,
wal_seg_size,
false,
&mut rec_offs,
&mut rec_hdr,
&mut crc,
false,
);
if prev_offs != 0 {
break;
}
prev_segno -= 1;
}
if prev_offs != 0 {
// found start of WAL record
let first_segno = prev_segno;
let first_offs = prev_offs;
while prev_segno + 1 < high_segno {
// now traverse record in forward direction, accumulating CRC
prev_segno += 1;
prev_offs = find_end_of_wal_segment(
data_dir,
prev_segno,
high_tli,
wal_seg_size,
false,
&mut rec_offs,
&mut rec_hdr,
&mut crc,
true,
);
if prev_offs == 0 {
info!("Segment {} is corrupted", prev_segno,);
break;
}
}
if prev_offs != 0 {
high_offs = find_end_of_wal_segment(
data_dir,
high_segno,
high_tli,
wal_seg_size,
high_ispartial,
&mut rec_offs,
&mut rec_hdr,
&mut crc,
true,
);
}
if high_offs == 0 {
// If last segment contais no valid records, then return back
info!("Last WAL segment {} contains no valid record, truncate WAL till {} segment",
high_segno, first_segno);
// Remove last segments containing corrupted WAL record
for segno in first_segno + 1..high_segno {
let file_name = XLogFileName(high_tli, segno, wal_seg_size);
let file_path = wal_dir.join(file_name);
if let Err(e) = fs::remove_file(&file_path) {
info!("Failed to remove file {:?}: {}", &file_path, e);
}
}
let file_name = XLogFileName(high_tli, high_segno, wal_seg_size);
let file_path = if high_ispartial {
wal_dir.join(file_name.clone() + ".partial")
} else {
wal_dir.join(file_name.clone())
};
if let Err(e) = fs::remove_file(&file_path) {
info!("Failed to remove file {:?}: {}", &file_path, e);
}
high_ispartial = false; // previous segment should not be partial
high_segno = first_segno;
high_offs = first_offs;
}
} else {
// failed to locate previous segment
assert!(prev_segno <= 1);
high_offs = find_end_of_wal_segment(
data_dir,
high_segno,
high_tli,
wal_seg_size,
high_ispartial,
&mut rec_offs,
&mut rec_hdr,
&mut crc,
false,
);
}
// If last segment is not marked as partial, it means that next segment
// was not written. Let's make this segment partial once again.
if !high_ispartial {
let file_name = XLogFileName(high_tli, high_segno, wal_seg_size);
if let Err(e) = fs::rename(
wal_dir.join(file_name.clone()),
wal_dir.join(file_name.clone() + ".partial"),
) {
info!(
"Failed to rename {} to {}.partial: {}",
&file_name, &file_name, e
);
}
}
} else {
/*
* Move the starting pointer to the start of the next segment, if the
* highest one we saw was completed.
*/
if !high_ispartial {
high_segno += 1;
}
}
let high_ptr = XLogSegNoOffsetToRecPtr(high_segno, high_offs, wal_seg_size);
return (high_ptr, high_tli);
}
(0, 0)
(0, 1) // First timeline is 1
}
pub fn main() {
@@ -469,7 +615,7 @@ mod tests {
let (wal_end, tli) = find_end_of_wal(&wal_dir, wal_seg_size, true);
let wal_end = Lsn(wal_end);
println!("wal_end={}, tli={}", wal_end, tli);
assert_eq!(wal_end, "0/2000000".parse::<Lsn>().unwrap());
assert_eq!(wal_end, "0/1699D10".parse::<Lsn>().unwrap());
// 4. Get the actual end of WAL by pg_waldump
let waldump_path = top_path.join("tmp_install/bin/pg_waldump");

View File

@@ -9,7 +9,7 @@ pytest_plugins = ("fixtures.zenith_fixtures")
#
# Test restarting and recreating a postgres instance
#
@pytest.mark.parametrize('with_wal_acceptors', [False, True])
@pytest.mark.parametrize('with_wal_acceptors', [True, False])
def test_restart_compute(
zenith_cli,
pageserver: ZenithPageserver,
@@ -31,31 +31,56 @@ def test_restart_compute(
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# Create table, and insert a row
cur.execute('CREATE TABLE foo (t text)')
cur.execute("INSERT INTO foo VALUES ('bar')")
cur.execute('CREATE TABLE t(key int primary key, value text)')
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
cur.execute('SELECT sum(key) FROM t')
r = cur.fetchone()
assert r == (5000050000, )
print("res = ", r)
# Stop and restart the Postgres instance
# Remove data directory and restart
pg.stop_and_destroy().create_start('test_restart_compute',
wal_acceptors=wal_acceptor_connstrs)
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# We can still see the row
cur.execute('SELECT count(*) FROM foo')
assert cur.fetchone() == (1, )
cur.execute('SELECT sum(key) FROM t')
r = cur.fetchone()
assert r == (5000050000, )
print("res = ", r)
# Insert another row
cur.execute("INSERT INTO foo VALUES ('bar2')")
cur.execute('SELECT count(*) FROM foo')
assert cur.fetchone() == (2, )
cur.execute("INSERT INTO t VALUES (100001, 'payload2')")
cur.execute('SELECT count(*) FROM t')
# Stop, and destroy the Postgres instance. Then recreate and restart it.
r = cur.fetchone()
assert r == (100001, )
print("res = ", r)
# Again remove data directory and restart
pg.stop_and_destroy().create_start('test_restart_compute',
wal_acceptors=wal_acceptor_connstrs)
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# We can still see the rows
cur.execute('SELECT count(*) FROM foo')
assert cur.fetchone() == (2, )
cur.execute('SELECT count(*) FROM t')
r = cur.fetchone()
assert r == (100001, )
print("res = ", r)
# And again remove data directory and restart
pg.stop_and_destroy().create_start('test_restart_compute',
wal_acceptors=wal_acceptor_connstrs)
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# We can still see the rows
cur.execute('SELECT count(*) FROM t')
r = cur.fetchone()
assert r == (100001, )
print("res = ", r)

View File

@@ -249,10 +249,68 @@ def pageserver(zenith_cli: ZenithCli) -> Iterator[ZenithPageserver]:
print('Starting pageserver cleanup')
ps.stop()
class PgBin:
""" A helper class for executing postgres binaries """
def __init__(self, log_dir: str, pg_distrib_dir: str):
self.log_dir = log_dir
self.pg_install_path = pg_distrib_dir
self.pg_bin_path = os.path.join(self.pg_install_path, 'bin')
self.env = os.environ.copy()
self.env['LD_LIBRARY_PATH'] = os.path.join(self.pg_install_path, 'lib')
def _fixpath(self, command: List[str]) -> None:
if '/' not in command[0]:
command[0] = os.path.join(self.pg_bin_path, command[0])
def _build_env(self, env_add: Optional[Env]) -> Env:
if env_add is None:
return self.env
env = self.env.copy()
env.update(env_add)
return env
def run(self, command: List[str], env: Optional[Env] = None, cwd: Optional[str] = None) -> None:
"""
Run one of the postgres binaries.
The command should be in list form, e.g. ['pgbench', '-p', '55432']
All the necessary environment variables will be set.
If the first argument (the command name) doesn't include a path (no '/'
characters present), then it will be edited to include the correct path.
If you want stdout/stderr captured to files, use `run_capture` instead.
"""
self._fixpath(command)
print('Running command "{}"'.format(' '.join(command)))
env = self._build_env(env)
subprocess.run(command, env=env, cwd=cwd, check=True)
def run_capture(self,
command: List[str],
env: Optional[Env] = None,
cwd: Optional[str] = None) -> None:
"""
Run one of the postgres binaries, with stderr and stdout redirected to a file.
This is just like `run`, but for chatty programs.
"""
self._fixpath(command)
print('Running command "{}"'.format(' '.join(command)))
env = self._build_env(env)
subprocess_capture(self.log_dir, command, env=env, cwd=cwd, check=True)
@zenfixture
def pg_bin(test_output_dir: str, pg_distrib_dir: str) -> PgBin:
return PgBin(test_output_dir, pg_distrib_dir)
class Postgres(PgProtocol):
""" An object representing a running postgres daemon. """
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, tenant_id: str, port: int):
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, pg_bin: PgBin, tenant_id: str, port: int):
super().__init__(host='localhost', port=port)
self.zenith_cli = zenith_cli
@@ -260,6 +318,7 @@ class Postgres(PgProtocol):
self.repo_dir = repo_dir
self.branch: Optional[str] = None # dubious, see asserts below
self.tenant_id = tenant_id
self.pg_bin = pg_bin
# path to conf is <repo_dir>/pgdatadirs/tenants/<tenant_id>/<branch_name>/postgresql.conf
def create(
@@ -299,27 +358,32 @@ class Postgres(PgProtocol):
"""
assert self.branch is not None
print(f"Starting postgres on brach {self.branch}")
self.zenith_cli.run(['pg', 'start', self.branch, f'--tenantid={self.tenant_id}'])
self.running = True
self.pg_bin.run(['pg_controldata', self.pg_data_dir_path()])
return self
def pg_data_dir_path(self) -> str:
""" Path to data directory """
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch
return os.path.join(self.repo_dir, path)
def pg_xact_dir_path(self) -> str:
""" Path to pg_xact dir """
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch / 'pg_xact'
return os.path.join(self.repo_dir, path)
return os.path.join(self.pg_data_dir_path(), 'pg_xact')
def pg_twophase_dir_path(self) -> str:
""" Path to pg_twophase dir """
print(self.tenant_id)
print(self.branch)
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch / 'pg_twophase'
return os.path.join(self.repo_dir, path)
return os.path.join(self.pg_data_dir_path(), 'pg_twophase')
def config_file_path(self) -> str:
""" Path to postgresql.conf """
filename = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch / 'postgresql.conf'
return os.path.join(self.repo_dir, filename)
return os.path.join(self.pg_data_dir_path(), 'postgresql.conf')
def adjust_for_wal_acceptors(self, wal_acceptors: str) -> 'Postgres':
"""
@@ -411,13 +475,14 @@ class Postgres(PgProtocol):
class PostgresFactory:
""" An object representing multiple running postgres daemons. """
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, initial_tenant: str, base_port: int = 55431):
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, pg_bin: PgBin, initial_tenant: str, base_port: int = 55431):
self.zenith_cli = zenith_cli
self.repo_dir = repo_dir
self.num_instances = 0
self.instances: List[Postgres] = []
self.initial_tenant: str = initial_tenant
self.base_port = base_port
self.pg_bin = pg_bin
def create_start(
self,
@@ -430,6 +495,7 @@ class PostgresFactory:
pg = Postgres(
zenith_cli=self.zenith_cli,
repo_dir=self.repo_dir,
pg_bin=self.pg_bin,
tenant_id=tenant_id or self.initial_tenant,
port=self.base_port + self.num_instances + 1,
)
@@ -503,8 +569,8 @@ def initial_tenant(pageserver: ZenithPageserver):
@zenfixture
def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str) -> Iterator[PostgresFactory]:
pgfactory = PostgresFactory(zenith_cli, repo_dir, initial_tenant=initial_tenant)
def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str, pg_bin: PgBin) -> Iterator[PostgresFactory]:
pgfactory = PostgresFactory(zenith_cli, repo_dir, pg_bin, initial_tenant=initial_tenant)
yield pgfactory
@@ -512,67 +578,6 @@ def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str) -> Itera
print('Starting postgres cleanup')
pgfactory.stop_all()
class PgBin:
""" A helper class for executing postgres binaries """
def __init__(self, log_dir: str, pg_distrib_dir: str):
self.log_dir = log_dir
self.pg_install_path = pg_distrib_dir
self.pg_bin_path = os.path.join(self.pg_install_path, 'bin')
self.env = os.environ.copy()
self.env['LD_LIBRARY_PATH'] = os.path.join(self.pg_install_path, 'lib')
def _fixpath(self, command: List[str]) -> None:
if '/' not in command[0]:
command[0] = os.path.join(self.pg_bin_path, command[0])
def _build_env(self, env_add: Optional[Env]) -> Env:
if env_add is None:
return self.env
env = self.env.copy()
env.update(env_add)
return env
def run(self, command: List[str], env: Optional[Env] = None, cwd: Optional[str] = None) -> None:
"""
Run one of the postgres binaries.
The command should be in list form, e.g. ['pgbench', '-p', '55432']
All the necessary environment variables will be set.
If the first argument (the command name) doesn't include a path (no '/'
characters present), then it will be edited to include the correct path.
If you want stdout/stderr captured to files, use `run_capture` instead.
"""
self._fixpath(command)
print('Running command "{}"'.format(' '.join(command)))
env = self._build_env(env)
subprocess.run(command, env=env, cwd=cwd, check=True)
def run_capture(self,
command: List[str],
env: Optional[Env] = None,
cwd: Optional[str] = None) -> None:
"""
Run one of the postgres binaries, with stderr and stdout redirected to a file.
This is just like `run`, but for chatty programs.
"""
self._fixpath(command)
print('Running command "{}"'.format(' '.join(command)))
env = self._build_env(env)
subprocess_capture(self.log_dir, command, env=env, cwd=cwd, check=True)
@zenfixture
def pg_bin(test_output_dir: str, pg_distrib_dir: str) -> PgBin:
return PgBin(test_output_dir, pg_distrib_dir)
def read_pid(path: Path):
""" Read content of file into number """
return int(path.read_text())

View File

@@ -27,6 +27,7 @@ use crate::replication::HotStandbyFeedback;
use crate::send_wal::SendWalHandler;
use crate::timeline::{Timeline, TimelineTools};
use crate::WalAcceptorConf;
use pageserver::waldecoder::WalStreamDecoder;
use postgres_ffi::xlog_utils::{TimeLineID, XLogFileName, MAX_SEND_SIZE, XLOG_BLCKSZ};
pub const SK_MAGIC: u32 = 0xcafeceefu32;
@@ -300,8 +301,8 @@ impl<'pg> ReceiveWalConn<'pg> {
);
}
my_info.server.node_id = prop.node_id;
this_timeline.get().set_info(&my_info);
/* Need to persist our vote first */
this_timeline.get().set_info(&my_info);
this_timeline.get().save_control_file(true)?;
let mut flushed_restart_lsn = Lsn(0);
@@ -322,9 +323,11 @@ impl<'pg> ReceiveWalConn<'pg> {
}
info!(
"Start streaming from timeline {} tenant {} address {:?} flush_lsn={}",
server_info.timeline_id, server_info.tenant_id, self.peer_addr, my_info.flush_lsn
"Start accepting WAL for timeline {} tenant {} address {:?} flush_lsn={}",
server_info.timeline_id, server_info.tenant_id, self.peer_addr, my_info.flush_lsn,
);
let mut last_rec_lsn = Lsn(0);
let mut decoder = WalStreamDecoder::new(last_rec_lsn, false);
// Main loop
loop {
@@ -347,27 +350,52 @@ impl<'pg> ReceiveWalConn<'pg> {
let end_pos = req.end_lsn;
let rec_size = end_pos.checked_sub(start_pos).unwrap().0 as usize;
assert!(rec_size <= MAX_SEND_SIZE);
if rec_size != 0 {
debug!(
"received for {} bytes between {} and {}",
rec_size, start_pos, end_pos,
);
debug!(
"received for {} bytes between {} and {}",
rec_size, start_pos, end_pos,
);
/* Receive message body (from the rest of the message) */
let mut buf = Vec::with_capacity(rec_size);
msg_reader.read_to_end(&mut buf)?;
assert_eq!(buf.len(), rec_size);
/* Receive message body (from the rest of the message) */
let mut buf = Vec::with_capacity(rec_size);
msg_reader.read_to_end(&mut buf)?;
assert_eq!(buf.len(), rec_size);
/* Save message in file */
Self::write_wal_file(
swh,
start_pos,
timeline_id,
this_timeline.get(),
wal_seg_size,
&buf,
)?;
if decoder.available() != start_pos {
info!(
"Restart decoder from {} to {}",
decoder.available(),
start_pos
);
decoder = WalStreamDecoder::new(start_pos, false);
}
decoder.feed_bytes(&buf);
loop {
match decoder.poll_decode() {
Err(e) => info!("Decode error {}", e),
Ok(None) => {},
Ok(Some((lsn, _rec))) => {
last_rec_lsn = lsn;
continue;
}
}
break;
}
info!(
"Receive WAL {}..{} last_rec_lsn={}",
start_pos, end_pos, last_rec_lsn
);
/* Save message in file */
Self::write_wal_file(
swh,
start_pos,
timeline_id,
this_timeline.get(),
wal_seg_size,
&buf,
)?;
}
my_info.restart_lsn = req.restart_lsn;
my_info.commit_lsn = req.commit_lsn;
@@ -377,13 +405,13 @@ impl<'pg> ReceiveWalConn<'pg> {
* maximum (vcl) determined by WAL proposer during handshake.
* Switching epoch means that node completes recovery and start writing in the WAL new data.
*/
if my_info.epoch < prop.epoch && end_pos > max(my_info.flush_lsn, prop.vcl) {
if my_info.epoch < prop.epoch && end_pos >= max(my_info.flush_lsn, prop.vcl) {
info!("Switch to new epoch {}", prop.epoch);
my_info.epoch = prop.epoch; /* bump epoch */
sync_control_file = true;
}
if end_pos > my_info.flush_lsn {
my_info.flush_lsn = end_pos;
if last_rec_lsn > my_info.flush_lsn {
my_info.flush_lsn = last_rec_lsn;
}
/*
* Update restart LSN in control file.
@@ -391,6 +419,7 @@ impl<'pg> ReceiveWalConn<'pg> {
* when restart_lsn delta exceeds WAL segment size.
*/
sync_control_file |= flushed_restart_lsn + (wal_seg_size as u64) < my_info.restart_lsn;
this_timeline.get().set_info(&my_info);
this_timeline.get().save_control_file(sync_control_file)?;
if sync_control_file {
@@ -401,7 +430,7 @@ impl<'pg> ReceiveWalConn<'pg> {
//info!("Confirm LSN: {:X}/{:>08X}", (end_pos>>32) as u32, end_pos as u32);
let resp = SafeKeeperResponse {
epoch: my_info.epoch,
flush_lsn: end_pos,
flush_lsn: my_info.flush_lsn,
hs_feedback: this_timeline.get().get_hs_feedback(),
};
self.write_msg(&resp)?;
@@ -410,9 +439,15 @@ impl<'pg> ReceiveWalConn<'pg> {
* Ping wal sender that new data is available.
* FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper.
*/
trace!(
"Notify WAL senders min({}, {})={}",
req.commit_lsn,
my_info.flush_lsn,
min(req.commit_lsn, my_info.flush_lsn)
);
this_timeline
.get()
.notify_wal_senders(min(req.commit_lsn, end_pos));
.notify_wal_senders(min(req.commit_lsn, my_info.flush_lsn));
}
Ok(())

View File

@@ -76,8 +76,10 @@ impl ReplicationConn {
let feedback = HotStandbyFeedback::des(&m)?;
subscriber.add_hs_feedback(feedback);
}
FeMessage::Sync => {}
FeMessage::CopyFailed => return Err(anyhow!("Copy failed")),
_ => {
// We only handle `CopyData` messages. Anything else is ignored.
// We only handle `CopyData`, 'Sync', 'CopyFailed' messages. Anything else is ignored.
info!("unexpected message {:?}", msg);
}
}
@@ -215,9 +217,14 @@ impl ReplicationConn {
data: &file_buf,
}))?;
start_pos += send_size as u64;
debug!(
"Sent WAL to page server {}..{}, end_pos={}",
start_pos,
start_pos + send_size as u64,
end_pos
);
debug!("Sent WAL to page server up to {}", end_pos);
start_pos += send_size as u64;
// Decide whether to reuse this file. If we don't set wal_file here
// a new file will be opened next time.

View File

@@ -175,7 +175,7 @@ impl Timeline {
}
}
fn _stop_wal_senders(&self) {
pub fn stop_wal_senders(&self) {
self.notify_wal_senders(END_REPLICATION_MARKER);
}

View File

@@ -24,6 +24,11 @@ impl Lsn {
/// Maximum possible value for an LSN
pub const MAX: Lsn = Lsn(u64::MAX);
/// Align LSN on 8-byte boundary (alignment of WAL records).
pub fn align(&self) -> Lsn {
Lsn((self.0 + 7) & !7)
}
/// Subtract a number, returning None on overflow.
pub fn checked_sub<T: Into<u64>>(self, other: T) -> Option<Lsn> {
let other: u64 = other.into();

View File

@@ -341,7 +341,7 @@ impl PostgresBackend {
// We prefer explicit pattern matching to wildcards, because
// this helps us spot the places where new variants are missing
FeMessage::CopyData(_) | FeMessage::CopyDone => {
FeMessage::CopyData(_) | FeMessage::CopyDone | FeMessage::CopyFailed => {
bail!("unexpected message type: {:?}", msg);
}
}

View File

@@ -31,6 +31,7 @@ pub enum FeMessage {
Terminate,
CopyData(Bytes),
CopyDone,
CopyFailed,
PasswordMessage(Bytes),
}
@@ -138,6 +139,7 @@ impl FeMessage {
b'X' => Ok(Some(FeMessage::Terminate)),
b'd' => Ok(Some(FeMessage::CopyData(body))),
b'c' => Ok(Some(FeMessage::CopyDone)),
b'f' => Ok(Some(FeMessage::CopyFailed)),
b'p' => Ok(Some(FeMessage::PasswordMessage(body))),
tag => Err(anyhow!("unknown message tag: {},'{:?}'", tag, body)),
}
@@ -338,6 +340,7 @@ pub enum BeMessage<'a> {
ControlFile,
CopyData(&'a [u8]),
CopyDone,
CopyFailed,
CopyInResponse,
CopyOutResponse,
CopyBothResponse,
@@ -546,6 +549,11 @@ impl<'a> BeMessage<'a> {
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
}
BeMessage::CopyFailed => {
buf.put_u8(b'f');
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
}
BeMessage::CopyInResponse => {
buf.put_u8(b'G');
write_body(buf, |buf| {