Compare commits

...

14 Commits

Author SHA1 Message Date
Arseny Sher
a8a2f62bc3 imactive 2022-01-11 18:14:59 +03:00
Konstantin Knizhnik
26060dd68e Disable write WAL to files at pageserver 2021-08-31 11:13:55 +03:00
Konstantin Knizhnik
73d823e53c Make it possible for WAL decoder to skip continuation records 2021-08-31 10:59:26 +03:00
Konstantin Knizhnik
112909c5e4 Handle wal records larger than WAL segment size in find_end_of_wal 2021-08-30 17:32:40 +03:00
Konstantin Knizhnik
07adc9dbda Fix unit test for find_end_of_wal 2021-08-27 14:59:07 +03:00
Konstantin Knizhnik
c05cedc626 Do not check cont record for second segment because itcontains dummy checkpoint record 2021-08-27 12:48:28 +03:00
Konstantin Knizhnik
815528e0ce Use last record LSN as flush position reported by safekeepers to walproposer to prevent moving VCL backward on compute node restart 2021-08-26 18:08:29 +03:00
Konstantin Knizhnik
a2e135b404 Maintain safe LSN position at safekeepers 2021-08-25 10:24:45 +03:00
Stas Kelvich
72de70a8cc Change test_restart_compute to expose safekeeper problems 2021-08-25 00:42:08 +03:00
Konstantin Knizhnik
4051c5d4ff Undo some redundant fixes 2021-08-20 12:31:53 +03:00
Konstantin Knizhnik
f86bf26466 Restore icluding postgresql.conf in basebackup 2021-08-20 11:23:57 +03:00
Konstantin Knizhnik
3ca4b638ac Merge with main 2021-08-20 10:55:34 +03:00
Konstantin Knizhnik
d61699b0f8 [refer #439] Fix submodule version 2021-08-19 19:56:49 +03:00
Konstantin Knizhnik
ead94feb05 [refer #439] Correctly handle LSN parameter in BASEBACKUP command 2021-08-19 19:53:22 +03:00
19 changed files with 557 additions and 378 deletions

View File

@@ -320,7 +320,7 @@ impl PostgresNode {
// Never clean up old WAL. TODO: We should use a replication // Never clean up old WAL. TODO: We should use a replication
// slot or something proper, to prevent the compute node // slot or something proper, to prevent the compute node
// from removing WAL that hasn't been streamed to the safekeepr or // from removing WAL that hasn't been streamed to the safekeeper or
// page server yet. (gh issue #349) // page server yet. (gh issue #349)
self.append_conf("postgresql.conf", "wal_keep_size='10TB'\n")?; self.append_conf("postgresql.conf", "wal_keep_size='10TB'\n")?;

0
f Normal file
View File

View File

@@ -842,6 +842,34 @@ impl Timeline for LayeredTimeline {
fn get_prev_record_lsn(&self) -> Lsn { fn get_prev_record_lsn(&self) -> Lsn {
self.prev_record_lsn.load() self.prev_record_lsn.load()
} }
///
/// Wait until WAL has been received up to the given LSN.
///
fn wait_lsn(&self, mut lsn: Lsn) -> anyhow::Result<Lsn> {
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
// This is necessary for bootstrap.
if lsn == Lsn(0) {
let last_valid_lsn = self.last_valid_lsn.load();
trace!(
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
last_valid_lsn,
lsn
);
lsn = last_valid_lsn;
}
self.last_valid_lsn
.wait_for_timeout(lsn, TIMEOUT)
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {} to arrive",
lsn
)
})?;
Ok(lsn)
}
} }
impl LayeredTimeline { impl LayeredTimeline {
@@ -1055,34 +1083,6 @@ impl LayeredTimeline {
Ok(layer_rc) Ok(layer_rc)
} }
///
/// Wait until WAL has been received up to the given LSN.
///
fn wait_lsn(&self, mut lsn: Lsn) -> anyhow::Result<Lsn> {
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
// This is necessary for bootstrap.
if lsn == Lsn(0) {
let last_valid_lsn = self.last_valid_lsn.load();
trace!(
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
last_valid_lsn,
lsn
);
lsn = last_valid_lsn;
}
self.last_valid_lsn
.wait_for_timeout(lsn, TIMEOUT)
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {} to arrive",
lsn
)
})?;
Ok(lsn)
}
/// ///
/// Flush to disk all data that was written with the put_* functions /// Flush to disk all data that was written with the put_* functions
/// ///

View File

@@ -663,7 +663,7 @@ impl Timeline for ObjectTimeline {
assert!(old <= lsn); assert!(old <= lsn);
// Use old value of last_record_lsn as prev_record_lsn // Use old value of last_record_lsn as prev_record_lsn
self.prev_record_lsn.fetch_max(Lsn((old.0 + 7) & !7)); self.prev_record_lsn.fetch_max(old.align());
// Also advance last_valid_lsn // Also advance last_valid_lsn
let old = self.last_valid_lsn.advance(lsn); let old = self.last_valid_lsn.advance(lsn);
@@ -712,6 +712,41 @@ impl Timeline for ObjectTimeline {
let iter = self.obj_store.objects(self.timelineid, lsn)?; let iter = self.obj_store.objects(self.timelineid, lsn)?;
Ok(Box::new(ObjectHistory { lsn, iter })) Ok(Box::new(ObjectHistory { lsn, iter }))
} }
//
// Wait until WAL has been received up to the given LSN.
//
fn wait_lsn(&self, req_lsn: Lsn) -> Result<Lsn> {
let mut lsn = req_lsn;
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
// This is necessary for bootstrap.
if lsn == Lsn(0) {
let last_valid_lsn = self.last_valid_lsn.load();
trace!(
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
last_valid_lsn,
lsn
);
lsn = last_valid_lsn;
}
trace!(
"Start waiting for LSN {}, valid LSN is {}",
lsn,
self.last_valid_lsn.load()
);
self.last_valid_lsn
.wait_for_timeout(lsn, TIMEOUT)
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {} to arrive. valid LSN in {}",
lsn,
self.last_valid_lsn.load(),
)
})?;
//trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
Ok(lsn)
}
} }
impl ObjectTimeline { impl ObjectTimeline {
@@ -820,40 +855,6 @@ impl ObjectTimeline {
} }
} }
//
// Wait until WAL has been received up to the given LSN.
//
fn wait_lsn(&self, mut lsn: Lsn) -> Result<Lsn> {
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
// This is necessary for bootstrap.
if lsn == Lsn(0) {
let last_valid_lsn = self.last_valid_lsn.load();
trace!(
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
last_valid_lsn,
lsn
);
lsn = last_valid_lsn;
}
trace!(
"Start waiting for LSN {}, valid LSN is {}",
lsn,
self.last_valid_lsn.load()
);
self.last_valid_lsn
.wait_for_timeout(lsn, TIMEOUT)
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {} to arrive. valid LSN in {}",
lsn,
self.last_valid_lsn.load(),
)
})?;
//trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
Ok(lsn)
}
/// ///
/// Iterate through object versions with given key, in reverse LSN order. /// Iterate through object versions with given key, in reverse LSN order.
/// ///

View File

@@ -357,8 +357,13 @@ impl PageServerHandler {
/* Send a tarball of the latest snapshot on the timeline */ /* Send a tarball of the latest snapshot on the timeline */
let req_lsn = lsn.unwrap_or_else(|| timeline.get_last_valid_lsn()); let req_lsn = match lsn {
Some(lsn) => {
timeline.wait_lsn(lsn)?;
lsn
}
None => timeline.get_last_record_lsn(),
};
{ {
let mut writer = CopyDataSink { pgb }; let mut writer = CopyDataSink { pgb };
let mut basebackup = basebackup::Basebackup::new( let mut basebackup = basebackup::Basebackup::new(
@@ -469,7 +474,7 @@ impl postgres_backend::Handler for PageServerHandler {
let (_, params_raw) = query_string.split_at("basebackup ".len()); let (_, params_raw) = query_string.split_at("basebackup ".len());
let params = params_raw.split(" ").collect::<Vec<_>>(); let params = params_raw.split(" ").collect::<Vec<_>>();
ensure!( ensure!(
params.len() == 2, params.len() >= 2,
"invalid param number for basebackup command" "invalid param number for basebackup command"
); );
@@ -479,7 +484,7 @@ impl postgres_backend::Handler for PageServerHandler {
self.check_permission(Some(tenantid))?; self.check_permission(Some(tenantid))?;
// TODO are there any tests with lsn option? // TODO are there any tests with lsn option?
let lsn = if params.len() == 3 { let lsn = if params.len() == 3 && params[2].len() != 0 {
Some(Lsn::from_str(params[2])?) Some(Lsn::from_str(params[2])?)
} else { } else {
None None
@@ -575,6 +580,10 @@ impl postgres_backend::Handler for PageServerHandler {
timeline.advance_last_valid_lsn(last_lsn); timeline.advance_last_valid_lsn(last_lsn);
break; break;
} }
FeMessage::CopyFailed => {
info!("Copy failed");
break;
}
FeMessage::Sync => {} FeMessage::Sync => {}
_ => bail!("unexpected message {:?}", msg), _ => bail!("unexpected message {:?}", msg),
} }

View File

@@ -203,6 +203,11 @@ pub trait Timeline: Send + Sync {
/// Relation size is increased implicitly and decreased with Truncate updates. /// Relation size is increased implicitly and decreased with Truncate updates.
// TODO ordering guarantee? // TODO ordering guarantee?
fn history<'a>(&'a self) -> Result<Box<dyn History + 'a>>; fn history<'a>(&'a self) -> Result<Box<dyn History + 'a>>;
//
// Wait until WAL has been received up to the given LSN.
//
fn wait_lsn(&self, lsn: Lsn) -> Result<Lsn>;
} }
pub trait History: Iterator<Item = Result<Modification>> { pub trait History: Iterator<Item = Result<Modification>> {

View File

@@ -264,7 +264,7 @@ fn import_slru_file(timeline: &dyn Timeline, lsn: Lsn, slru: SlruKind, path: &Pa
/// Scan PostgreSQL WAL files in given directory /// Scan PostgreSQL WAL files in given directory
/// and load all records >= 'startpoint' into the repository. /// and load all records >= 'startpoint' into the repository.
pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: Lsn) -> Result<()> { pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: Lsn) -> Result<()> {
let mut waldecoder = WalStreamDecoder::new(startpoint); let mut waldecoder = WalStreamDecoder::new(startpoint, true);
let mut segno = startpoint.segment_number(pg_constants::WAL_SEGMENT_SIZE); let mut segno = startpoint.segment_number(pg_constants::WAL_SEGMENT_SIZE);
let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE); let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE);
@@ -427,7 +427,9 @@ pub fn save_decoded_record(
// Remove twophase file. see RemoveTwoPhaseFile() in postgres code // Remove twophase file. see RemoveTwoPhaseFile() in postgres code
trace!( trace!(
"unlink twophaseFile for xid {} parsed_xact.xid {} here at {}", "unlink twophaseFile for xid {} parsed_xact.xid {} here at {}",
decoded.xl_xid, parsed_xact.xid, lsn decoded.xl_xid,
parsed_xact.xid,
lsn
); );
timeline.put_unlink( timeline.put_unlink(
RelishTag::TwoPhase { RelishTag::TwoPhase {

View File

@@ -25,13 +25,13 @@ pub type MultiXactStatus = u32;
pub struct WalStreamDecoder { pub struct WalStreamDecoder {
lsn: Lsn, lsn: Lsn,
startlsn: Lsn, // LSN where this record starts
contlen: u32, contlen: u32,
padlen: u32, padlen: u32,
inputbuf: BytesMut, inputbuf: BytesMut,
recordbuf: BytesMut, recordbuf: BytesMut,
crc_check: bool,
} }
#[derive(Error, Debug, Clone)] #[derive(Error, Debug, Clone)]
@@ -46,19 +46,24 @@ pub struct WalDecodeError {
// FIXME: This isn't a proper rust stream // FIXME: This isn't a proper rust stream
// //
impl WalStreamDecoder { impl WalStreamDecoder {
pub fn new(lsn: Lsn) -> WalStreamDecoder { pub fn new(lsn: Lsn, crc_check: bool) -> WalStreamDecoder {
WalStreamDecoder { WalStreamDecoder {
lsn, lsn,
startlsn: Lsn(0),
contlen: 0, contlen: 0,
padlen: 0, padlen: 0,
inputbuf: BytesMut::new(), inputbuf: BytesMut::new(),
recordbuf: BytesMut::new(), recordbuf: BytesMut::new(),
crc_check,
} }
} }
pub fn available(&self) -> Lsn {
self.lsn + self.inputbuf.remaining() as u64
}
pub fn feed_bytes(&mut self, buf: &[u8]) { pub fn feed_bytes(&mut self, buf: &[u8]) {
self.inputbuf.extend_from_slice(buf); self.inputbuf.extend_from_slice(buf);
} }
@@ -92,7 +97,9 @@ impl WalStreamDecoder {
// TODO: verify the remaining fields in the header // TODO: verify the remaining fields in the header
self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64; self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
continue; if !self.crc_check && self.contlen != hdr.std.xlp_rem_len {
self.contlen = hdr.std.xlp_rem_len; // skip continuation record
}
} else if self.lsn.block_offset() == 0 { } else if self.lsn.block_offset() == 0 {
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD { if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD {
return Ok(None); return Ok(None);
@@ -102,14 +109,19 @@ impl WalStreamDecoder {
if hdr.xlp_pageaddr != self.lsn.0 { if hdr.xlp_pageaddr != self.lsn.0 {
return Err(WalDecodeError { return Err(WalDecodeError {
msg: "invalid xlog page header".into(), msg: format!(
"invalid xlog page header: xlp_pageaddr={} vs. lsn={}",
hdr.xlp_pageaddr, self.lsn
),
lsn: self.lsn, lsn: self.lsn,
}); });
} }
// TODO: verify the remaining fields in the header // TODO: verify the remaining fields in the header
self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64; self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
continue; if !self.crc_check && self.contlen != hdr.xlp_rem_len {
self.contlen = hdr.xlp_rem_len; // skip continuation record
}
} else if self.padlen > 0 { } else if self.padlen > 0 {
if self.inputbuf.remaining() < self.padlen as usize { if self.inputbuf.remaining() < self.padlen as usize {
return Ok(None); return Ok(None);
@@ -127,7 +139,6 @@ impl WalStreamDecoder {
} }
// read xl_tot_len FIXME: assumes little-endian // read xl_tot_len FIXME: assumes little-endian
self.startlsn = self.lsn;
let xl_tot_len = self.inputbuf.get_u32_le(); let xl_tot_len = self.inputbuf.get_u32_le();
if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD { if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
return Err(WalDecodeError { return Err(WalDecodeError {
@@ -142,7 +153,6 @@ impl WalStreamDecoder {
self.recordbuf.put_u32_le(xl_tot_len); self.recordbuf.put_u32_le(xl_tot_len);
self.contlen = xl_tot_len - 4; self.contlen = xl_tot_len - 4;
continue;
} else { } else {
// we're continuing a record, possibly from previous page. // we're continuing a record, possibly from previous page.
let pageleft = self.lsn.remaining_in_block() as u32; let pageleft = self.lsn.remaining_in_block() as u32;
@@ -164,17 +174,10 @@ impl WalStreamDecoder {
let recordbuf = recordbuf.freeze(); let recordbuf = recordbuf.freeze();
let mut buf = recordbuf.clone(); let mut buf = recordbuf.clone();
let xlogrec = XLogRecord::from_bytes(&mut buf);
// XLOG_SWITCH records are special. If we see one, we need to skip // XLOG_SWITCH records are special. If we see one, we need to skip
// to the next WAL segment. // to the next WAL segment.
let xlogrec = XLogRecord::from_bytes(&mut buf);
let mut crc = crc32c_append(0, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
if crc != xlogrec.xl_crc {
return Err(WalDecodeError {
msg: "WAL record crc mismatch".into(),
lsn: self.lsn,
});
}
if xlogrec.is_xlog_switch_record() { if xlogrec.is_xlog_switch_record() {
trace!("saw xlog switch record at {}", self.lsn); trace!("saw xlog switch record at {}", self.lsn);
self.padlen = self.padlen =
@@ -184,10 +187,29 @@ impl WalStreamDecoder {
self.padlen = self.lsn.calc_padding(8u32) as u32; self.padlen = self.lsn.calc_padding(8u32) as u32;
} }
let result = (self.lsn, recordbuf); // Check record CRC
if self.crc_check {
let mut crc = crc32c_append(0, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
if crc != xlogrec.xl_crc {
info!("WAL record crc mismatch n={}, buf.len()={}, lsn={}, rec={:?}, recordbuf={:?}",
n, recordbuf.len(), self.lsn, xlogrec, recordbuf);
return Err(WalDecodeError {
msg: format!(
"WAL record crc mismatch n={}, buf.len()={}, lsn={}, rec={:?}",
n,
buf.len(),
self.lsn,
xlogrec
),
lsn: self.lsn,
});
}
}
let result = (self.lsn.align(), recordbuf);
return Ok(Some(result)); return Ok(Some(result));
} }
continue;
} }
} }
// check record boundaries // check record boundaries

View File

@@ -22,8 +22,6 @@ use postgres_types::PgLsn;
use std::cmp::{max, min}; use std::cmp::{max, min};
use std::collections::HashMap; use std::collections::HashMap;
use std::fs; use std::fs;
use std::fs::{File, OpenOptions};
use std::io::{Seek, SeekFrom, Write};
use std::str::FromStr; use std::str::FromStr;
use std::sync::Mutex; use std::sync::Mutex;
use std::thread; use std::thread;
@@ -178,7 +176,7 @@ fn walreceiver_main(
let copy_stream = rclient.copy_both_simple(&query)?; let copy_stream = rclient.copy_both_simple(&query)?;
let mut physical_stream = ReplicationIter::new(copy_stream); let mut physical_stream = ReplicationIter::new(copy_stream);
let mut waldecoder = WalStreamDecoder::new(startpoint); let mut waldecoder = WalStreamDecoder::new(startpoint, true);
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(RelishTag::Checkpoint, 0, startpoint)?; let checkpoint_bytes = timeline.get_page_at_lsn_nowait(RelishTag::Checkpoint, 0, startpoint)?;
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?; let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
@@ -194,45 +192,51 @@ fn walreceiver_main(
let endlsn = startlsn + data.len() as u64; let endlsn = startlsn + data.len() as u64;
let prev_last_rec_lsn = last_rec_lsn; let prev_last_rec_lsn = last_rec_lsn;
write_wal_file(
conf,
startlsn,
&timelineid,
pg_constants::WAL_SEGMENT_SIZE,
data,
tenantid,
)?;
trace!("received XLogData between {} and {}", startlsn, endlsn); trace!("received XLogData between {} and {}", startlsn, endlsn);
waldecoder.feed_bytes(data); waldecoder.feed_bytes(data);
while let Some((lsn, recdata)) = waldecoder.poll_decode()? { loop {
// Save old checkpoint value to compare with it after decoding WAL record match waldecoder.poll_decode() {
let old_checkpoint_bytes = checkpoint.encode(); Ok(Some((lsn, recdata))) => {
let decoded = decode_wal_record(recdata.clone()); // Save old checkpoint value to compare with it after decoding WAL record
restore_local_repo::save_decoded_record( let old_checkpoint_bytes = checkpoint.encode();
&mut checkpoint, let decoded = decode_wal_record(recdata.clone());
&*timeline, restore_local_repo::save_decoded_record(
&decoded, &mut checkpoint,
recdata, &*timeline,
lsn, &decoded,
)?; recdata,
last_rec_lsn = lsn; lsn,
)?;
last_rec_lsn = lsn;
let new_checkpoint_bytes = checkpoint.encode(); let new_checkpoint_bytes = checkpoint.encode();
// Check if checkpoint data was updated by save_decoded_record // Check if checkpoint data was updated by save_decoded_record
if new_checkpoint_bytes != old_checkpoint_bytes { if new_checkpoint_bytes != old_checkpoint_bytes {
timeline.put_page_image( timeline.put_page_image(
RelishTag::Checkpoint, RelishTag::Checkpoint,
0, 0,
lsn, lsn,
new_checkpoint_bytes, new_checkpoint_bytes,
false, false,
)?; )?;
}
}
Ok(None) => {
trace!(
"End of replication stream {}..{} at {}",
startlsn,
endlsn,
last_rec_lsn
);
break;
}
Err(e) => {
info!("Decode error {}", e);
return Err(e.into());
}
} }
} }
// Update the last_valid LSN value in the page cache one more time. We updated // Update the last_valid LSN value in the page cache one more time. We updated
// it in the loop above, between each WAL record, but we might have received // it in the loop above, between each WAL record, but we might have received
// a partial record after the last completed record. Our page cache's value // a partial record after the last completed record. Our page cache's value
@@ -407,98 +411,3 @@ pub fn identify_system(client: &mut Client) -> Result<IdentifySystem, Error> {
Err(IdentifyError.into()) Err(IdentifyError.into())
} }
} }
fn write_wal_file(
conf: &PageServerConf,
startpos: Lsn,
timelineid: &ZTimelineId,
wal_seg_size: usize,
buf: &[u8],
tenantid: &ZTenantId,
) -> anyhow::Result<()> {
let mut bytes_left: usize = buf.len();
let mut bytes_written: usize = 0;
let mut partial;
let mut start_pos = startpos;
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
let wal_dir = conf.wal_dir_path(timelineid, tenantid);
/* Extract WAL location for this block */
let mut xlogoff = start_pos.segment_offset(wal_seg_size);
while bytes_left != 0 {
let bytes_to_write;
/*
* If crossing a WAL boundary, only write up until we reach wal
* segment size.
*/
if xlogoff + bytes_left > wal_seg_size {
bytes_to_write = wal_seg_size - xlogoff;
} else {
bytes_to_write = bytes_left;
}
/* Open file */
let segno = start_pos.segment_number(wal_seg_size);
let wal_file_name = XLogFileName(
1, // FIXME: always use Postgres timeline 1
segno,
wal_seg_size,
);
let wal_file_path = wal_dir.join(wal_file_name.clone());
let wal_file_partial_path = wal_dir.join(wal_file_name.clone() + ".partial");
{
let mut wal_file: File;
/* Try to open already completed segment */
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
wal_file = file;
partial = false;
} else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) {
/* Try to open existed partial file */
wal_file = file;
partial = true;
} else {
/* Create and fill new partial file */
partial = true;
match OpenOptions::new()
.create(true)
.write(true)
.open(&wal_file_partial_path)
{
Ok(mut file) => {
for _ in 0..(wal_seg_size / XLOG_BLCKSZ) {
file.write_all(&ZERO_BLOCK)?;
}
wal_file = file;
}
Err(e) => {
error!("Failed to open log file {:?}: {}", &wal_file_path, e);
return Err(e.into());
}
}
}
wal_file.seek(SeekFrom::Start(xlogoff as u64))?;
wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?;
// FIXME: Flush the file
//wal_file.sync_all()?;
}
/* Write was successful, advance our position */
bytes_written += bytes_to_write;
bytes_left -= bytes_to_write;
start_pos += bytes_to_write as u64;
xlogoff += bytes_to_write;
/* Did we reach the end of a WAL segment? */
if start_pos.segment_offset(wal_seg_size) == 0 {
xlogoff = 0;
if partial {
fs::rename(&wal_file_partial_path, &wal_file_path)?;
}
}
}
Ok(())
}

View File

@@ -108,17 +108,23 @@ fn find_end_of_wal_segment(
segno: XLogSegNo, segno: XLogSegNo,
tli: TimeLineID, tli: TimeLineID,
wal_seg_size: usize, wal_seg_size: usize,
is_partial: bool,
rec_offs: &mut usize,
rec_hdr: &mut [u8; XLOG_SIZE_OF_XLOG_RECORD],
crc: &mut u32,
check_contrec: bool,
) -> u32 { ) -> u32 {
let mut offs: usize = 0; let mut offs: usize = 0;
let mut contlen: usize = 0; let mut contlen: usize = 0;
let mut wal_crc: u32 = 0;
let mut crc: u32 = 0;
let mut rec_offs: usize = 0;
let mut buf = [0u8; XLOG_BLCKSZ]; let mut buf = [0u8; XLOG_BLCKSZ];
let file_name = XLogFileName(tli, segno, wal_seg_size); let file_name = XLogFileName(tli, segno, wal_seg_size);
let mut last_valid_rec_pos: usize = 0; let mut last_valid_rec_pos: usize = 0;
let mut file = File::open(data_dir.join(file_name.clone() + ".partial")).unwrap(); let file_path = data_dir.join(if is_partial {
let mut rec_hdr = [0u8; XLOG_RECORD_CRC_OFFS]; file_name.clone() + ".partial"
} else {
file_name
});
let mut file = File::open(&file_path).unwrap();
while offs < wal_seg_size { while offs < wal_seg_size {
if offs % XLOG_BLCKSZ == 0 { if offs % XLOG_BLCKSZ == 0 {
@@ -133,13 +139,33 @@ fn find_end_of_wal_segment(
let xlp_info = LittleEndian::read_u16(&buf[2..4]); let xlp_info = LittleEndian::read_u16(&buf[2..4]);
let xlp_rem_len = LittleEndian::read_u32(&buf[XLP_REM_LEN_OFFS..XLP_REM_LEN_OFFS + 4]); let xlp_rem_len = LittleEndian::read_u32(&buf[XLP_REM_LEN_OFFS..XLP_REM_LEN_OFFS + 4]);
if xlp_magic != XLOG_PAGE_MAGIC as u16 { if xlp_magic != XLOG_PAGE_MAGIC as u16 {
info!("Invalid WAL file {}.partial magic {}", file_name, xlp_magic); info!("Invalid WAL file {:?} magic {}", &file_path, xlp_magic);
break; break;
} }
if offs == 0 { if offs == 0 {
offs = XLOG_SIZE_OF_XLOG_LONG_PHD; offs = XLOG_SIZE_OF_XLOG_LONG_PHD;
if (xlp_info & XLP_FIRST_IS_CONTRECORD) != 0 { if (xlp_info & XLP_FIRST_IS_CONTRECORD) != 0 {
offs += ((xlp_rem_len + 7) & !7) as usize; if check_contrec {
let xl_tot_len = LittleEndian::read_u32(&rec_hdr[0..4]) as usize;
contlen = xlp_rem_len as usize;
if *rec_offs + contlen < xl_tot_len
|| (*rec_offs + contlen != xl_tot_len
&& contlen != XLOG_BLCKSZ - XLOG_SIZE_OF_XLOG_LONG_PHD)
{
info!(
"Corrupted continuation record: offs={}, contlen={}, xl_tot_len={}",
*rec_offs, contlen, xl_tot_len
);
return 0;
}
} else {
offs += ((xlp_rem_len + 7) & !7) as usize;
}
} else if *rec_offs != 0 {
// There is incompleted page at previous segment but no cont record:
// it means that current segment is not valid and we have to return back.
info!("CONTRECORD flag is missed in page header");
return 0;
} }
} else { } else {
offs += XLOG_SIZE_OF_XLOG_SHORT_PHD; offs += XLOG_SIZE_OF_XLOG_SHORT_PHD;
@@ -150,9 +176,8 @@ fn find_end_of_wal_segment(
if xl_tot_len == 0 { if xl_tot_len == 0 {
break; break;
} }
last_valid_rec_pos = offs;
offs += 4; offs += 4;
rec_offs = 4; *rec_offs = 4;
contlen = xl_tot_len - 4; contlen = xl_tot_len - 4;
rec_hdr[0..4].copy_from_slice(&buf[page_offs..page_offs + 4]); rec_hdr[0..4].copy_from_slice(&buf[page_offs..page_offs + 4]);
} else { } else {
@@ -162,34 +187,33 @@ fn find_end_of_wal_segment(
// read the rest of the record, or as much as fits on this page. // read the rest of the record, or as much as fits on this page.
let n = min(contlen, pageleft); let n = min(contlen, pageleft);
if rec_offs < XLOG_RECORD_CRC_OFFS { let mut hdr_len: usize = 0;
let len = min(XLOG_RECORD_CRC_OFFS - rec_offs, n); if *rec_offs < XLOG_SIZE_OF_XLOG_RECORD {
rec_hdr[rec_offs..rec_offs + len].copy_from_slice(&buf[page_offs..page_offs + len]); // copy header
hdr_len = min(XLOG_SIZE_OF_XLOG_RECORD - *rec_offs, n);
rec_hdr[*rec_offs..*rec_offs + hdr_len]
.copy_from_slice(&buf[page_offs..page_offs + hdr_len]);
} }
if rec_offs <= XLOG_RECORD_CRC_OFFS && rec_offs + n >= XLOG_SIZE_OF_XLOG_RECORD { *crc = crc32c_append(*crc, &buf[page_offs + hdr_len..page_offs + n]);
let crc_offs = page_offs - rec_offs + XLOG_RECORD_CRC_OFFS; *rec_offs += n;
wal_crc = LittleEndian::read_u32(&buf[crc_offs..crc_offs + 4]);
crc = crc32c_append(0, &buf[crc_offs + 4..page_offs + n]);
crc = !crc;
} else {
crc ^= 0xFFFFFFFFu32;
crc = crc32c_append(crc, &buf[page_offs..page_offs + n]);
crc = !crc;
}
rec_offs += n;
offs += n; offs += n;
contlen -= n; contlen -= n;
if contlen == 0 { if contlen == 0 {
crc = !crc; *crc = crc32c_append(*crc, &rec_hdr[0..XLOG_RECORD_CRC_OFFS]);
crc = crc32c_append(crc, &rec_hdr);
offs = (offs + 7) & !7; // pad on 8 bytes boundary */ offs = (offs + 7) & !7; // pad on 8 bytes boundary */
if crc == wal_crc { let wal_crc = LittleEndian::read_u32(
&rec_hdr[XLOG_RECORD_CRC_OFFS..XLOG_RECORD_CRC_OFFS + 4],
);
if *crc == wal_crc {
last_valid_rec_pos = offs; last_valid_rec_pos = offs;
// Reset rec_offs and crc for start of new record
*rec_offs = 0;
*crc = 0;
} else { } else {
info!( info!(
"CRC mismatch {} vs {} at {}", "CRC mismatch {} vs {} at offset {} lsn {}",
crc, wal_crc, last_valid_rec_pos *crc, wal_crc, offs, last_valid_rec_pos
); );
break; break;
} }
@@ -240,20 +264,142 @@ pub fn find_end_of_wal(
} }
if high_segno > 0 { if high_segno > 0 {
let mut high_offs = 0; let mut high_offs = 0;
/* if precise {
* Move the starting pointer to the start of the next segment, if the let mut crc: u32 = 0;
* highest one we saw was completed. let mut rec_offs: usize = 0;
*/ let mut rec_hdr = [0u8; XLOG_SIZE_OF_XLOG_RECORD];
if !high_ispartial { let wal_dir = data_dir.join("pg_wal");
high_segno += 1;
} else if precise { /*
/* otherwise locate last record in last partial segment */ * To be able to calculate CRC of records crossing segment boundary,
high_offs = find_end_of_wal_segment(data_dir, high_segno, high_tli, wal_seg_size); * we need to parse previous segments.
* So first traverse segments in backward direction to locate record start
* and then traverse forward, accumulating CRC.
*/
let mut prev_segno = high_segno - 1;
let mut prev_offs: u32 = 0;
while prev_segno > 1 {
// TOFO: first segment constains dummy checkpoint record at the beginning
prev_offs = find_end_of_wal_segment(
data_dir,
prev_segno,
high_tli,
wal_seg_size,
false,
&mut rec_offs,
&mut rec_hdr,
&mut crc,
false,
);
if prev_offs != 0 {
break;
}
prev_segno -= 1;
}
if prev_offs != 0 {
// found start of WAL record
let first_segno = prev_segno;
let first_offs = prev_offs;
while prev_segno + 1 < high_segno {
// now traverse record in forward direction, accumulating CRC
prev_segno += 1;
prev_offs = find_end_of_wal_segment(
data_dir,
prev_segno,
high_tli,
wal_seg_size,
false,
&mut rec_offs,
&mut rec_hdr,
&mut crc,
true,
);
if prev_offs == 0 {
info!("Segment {} is corrupted", prev_segno,);
break;
}
}
if prev_offs != 0 {
high_offs = find_end_of_wal_segment(
data_dir,
high_segno,
high_tli,
wal_seg_size,
high_ispartial,
&mut rec_offs,
&mut rec_hdr,
&mut crc,
true,
);
}
if high_offs == 0 {
// If last segment contais no valid records, then return back
info!("Last WAL segment {} contains no valid record, truncate WAL till {} segment",
high_segno, first_segno);
// Remove last segments containing corrupted WAL record
for segno in first_segno + 1..high_segno {
let file_name = XLogFileName(high_tli, segno, wal_seg_size);
let file_path = wal_dir.join(file_name);
if let Err(e) = fs::remove_file(&file_path) {
info!("Failed to remove file {:?}: {}", &file_path, e);
}
}
let file_name = XLogFileName(high_tli, high_segno, wal_seg_size);
let file_path = if high_ispartial {
wal_dir.join(file_name.clone() + ".partial")
} else {
wal_dir.join(file_name.clone())
};
if let Err(e) = fs::remove_file(&file_path) {
info!("Failed to remove file {:?}: {}", &file_path, e);
}
high_ispartial = false; // previous segment should not be partial
high_segno = first_segno;
high_offs = first_offs;
}
} else {
// failed to locate previous segment
assert!(prev_segno <= 1);
high_offs = find_end_of_wal_segment(
data_dir,
high_segno,
high_tli,
wal_seg_size,
high_ispartial,
&mut rec_offs,
&mut rec_hdr,
&mut crc,
false,
);
}
// If last segment is not marked as partial, it means that next segment
// was not written. Let's make this segment partial once again.
if !high_ispartial {
let file_name = XLogFileName(high_tli, high_segno, wal_seg_size);
if let Err(e) = fs::rename(
wal_dir.join(file_name.clone()),
wal_dir.join(file_name.clone() + ".partial"),
) {
info!(
"Failed to rename {} to {}.partial: {}",
&file_name, &file_name, e
);
}
}
} else {
/*
* Move the starting pointer to the start of the next segment, if the
* highest one we saw was completed.
*/
if !high_ispartial {
high_segno += 1;
}
} }
let high_ptr = XLogSegNoOffsetToRecPtr(high_segno, high_offs, wal_seg_size); let high_ptr = XLogSegNoOffsetToRecPtr(high_segno, high_offs, wal_seg_size);
return (high_ptr, high_tli); return (high_ptr, high_tli);
} }
(0, 0) (0, 1) // First timeline is 1
} }
pub fn main() { pub fn main() {
@@ -469,7 +615,7 @@ mod tests {
let (wal_end, tli) = find_end_of_wal(&wal_dir, wal_seg_size, true); let (wal_end, tli) = find_end_of_wal(&wal_dir, wal_seg_size, true);
let wal_end = Lsn(wal_end); let wal_end = Lsn(wal_end);
println!("wal_end={}, tli={}", wal_end, tli); println!("wal_end={}, tli={}", wal_end, tli);
assert_eq!(wal_end, "0/2000000".parse::<Lsn>().unwrap()); assert_eq!(wal_end, "0/1699D10".parse::<Lsn>().unwrap());
// 4. Get the actual end of WAL by pg_waldump // 4. Get the actual end of WAL by pg_waldump
let waldump_path = top_path.join("tmp_install/bin/pg_waldump"); let waldump_path = top_path.join("tmp_install/bin/pg_waldump");

View File

@@ -9,7 +9,7 @@ pytest_plugins = ("fixtures.zenith_fixtures")
# #
# Test restarting and recreating a postgres instance # Test restarting and recreating a postgres instance
# #
@pytest.mark.parametrize('with_wal_acceptors', [False, True]) @pytest.mark.parametrize('with_wal_acceptors', [True, False])
def test_restart_compute( def test_restart_compute(
zenith_cli, zenith_cli,
pageserver: ZenithPageserver, pageserver: ZenithPageserver,
@@ -31,31 +31,56 @@ def test_restart_compute(
with closing(pg.connect()) as conn: with closing(pg.connect()) as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
# Create table, and insert a row cur.execute('CREATE TABLE t(key int primary key, value text)')
cur.execute('CREATE TABLE foo (t text)') cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
cur.execute("INSERT INTO foo VALUES ('bar')") cur.execute('SELECT sum(key) FROM t')
r = cur.fetchone()
assert r == (5000050000, )
print("res = ", r)
# Stop and restart the Postgres instance # Remove data directory and restart
pg.stop_and_destroy().create_start('test_restart_compute', pg.stop_and_destroy().create_start('test_restart_compute',
wal_acceptors=wal_acceptor_connstrs) wal_acceptors=wal_acceptor_connstrs)
with closing(pg.connect()) as conn: with closing(pg.connect()) as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
# We can still see the row # We can still see the row
cur.execute('SELECT count(*) FROM foo') cur.execute('SELECT sum(key) FROM t')
assert cur.fetchone() == (1, ) r = cur.fetchone()
assert r == (5000050000, )
print("res = ", r)
# Insert another row # Insert another row
cur.execute("INSERT INTO foo VALUES ('bar2')") cur.execute("INSERT INTO t VALUES (100001, 'payload2')")
cur.execute('SELECT count(*) FROM foo') cur.execute('SELECT count(*) FROM t')
assert cur.fetchone() == (2, )
# Stop, and destroy the Postgres instance. Then recreate and restart it. r = cur.fetchone()
assert r == (100001, )
print("res = ", r)
# Again remove data directory and restart
pg.stop_and_destroy().create_start('test_restart_compute', pg.stop_and_destroy().create_start('test_restart_compute',
wal_acceptors=wal_acceptor_connstrs) wal_acceptors=wal_acceptor_connstrs)
with closing(pg.connect()) as conn: with closing(pg.connect()) as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
# We can still see the rows # We can still see the rows
cur.execute('SELECT count(*) FROM foo') cur.execute('SELECT count(*) FROM t')
assert cur.fetchone() == (2, )
r = cur.fetchone()
assert r == (100001, )
print("res = ", r)
# And again remove data directory and restart
pg.stop_and_destroy().create_start('test_restart_compute',
wal_acceptors=wal_acceptor_connstrs)
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# We can still see the rows
cur.execute('SELECT count(*) FROM t')
r = cur.fetchone()
assert r == (100001, )
print("res = ", r)

View File

@@ -249,10 +249,68 @@ def pageserver(zenith_cli: ZenithCli) -> Iterator[ZenithPageserver]:
print('Starting pageserver cleanup') print('Starting pageserver cleanup')
ps.stop() ps.stop()
class PgBin:
""" A helper class for executing postgres binaries """
def __init__(self, log_dir: str, pg_distrib_dir: str):
self.log_dir = log_dir
self.pg_install_path = pg_distrib_dir
self.pg_bin_path = os.path.join(self.pg_install_path, 'bin')
self.env = os.environ.copy()
self.env['LD_LIBRARY_PATH'] = os.path.join(self.pg_install_path, 'lib')
def _fixpath(self, command: List[str]) -> None:
if '/' not in command[0]:
command[0] = os.path.join(self.pg_bin_path, command[0])
def _build_env(self, env_add: Optional[Env]) -> Env:
if env_add is None:
return self.env
env = self.env.copy()
env.update(env_add)
return env
def run(self, command: List[str], env: Optional[Env] = None, cwd: Optional[str] = None) -> None:
"""
Run one of the postgres binaries.
The command should be in list form, e.g. ['pgbench', '-p', '55432']
All the necessary environment variables will be set.
If the first argument (the command name) doesn't include a path (no '/'
characters present), then it will be edited to include the correct path.
If you want stdout/stderr captured to files, use `run_capture` instead.
"""
self._fixpath(command)
print('Running command "{}"'.format(' '.join(command)))
env = self._build_env(env)
subprocess.run(command, env=env, cwd=cwd, check=True)
def run_capture(self,
command: List[str],
env: Optional[Env] = None,
cwd: Optional[str] = None) -> None:
"""
Run one of the postgres binaries, with stderr and stdout redirected to a file.
This is just like `run`, but for chatty programs.
"""
self._fixpath(command)
print('Running command "{}"'.format(' '.join(command)))
env = self._build_env(env)
subprocess_capture(self.log_dir, command, env=env, cwd=cwd, check=True)
@zenfixture
def pg_bin(test_output_dir: str, pg_distrib_dir: str) -> PgBin:
return PgBin(test_output_dir, pg_distrib_dir)
class Postgres(PgProtocol): class Postgres(PgProtocol):
""" An object representing a running postgres daemon. """ """ An object representing a running postgres daemon. """
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, tenant_id: str, port: int): def __init__(self, zenith_cli: ZenithCli, repo_dir: str, pg_bin: PgBin, tenant_id: str, port: int):
super().__init__(host='localhost', port=port) super().__init__(host='localhost', port=port)
self.zenith_cli = zenith_cli self.zenith_cli = zenith_cli
@@ -260,6 +318,7 @@ class Postgres(PgProtocol):
self.repo_dir = repo_dir self.repo_dir = repo_dir
self.branch: Optional[str] = None # dubious, see asserts below self.branch: Optional[str] = None # dubious, see asserts below
self.tenant_id = tenant_id self.tenant_id = tenant_id
self.pg_bin = pg_bin
# path to conf is <repo_dir>/pgdatadirs/tenants/<tenant_id>/<branch_name>/postgresql.conf # path to conf is <repo_dir>/pgdatadirs/tenants/<tenant_id>/<branch_name>/postgresql.conf
def create( def create(
@@ -299,27 +358,32 @@ class Postgres(PgProtocol):
""" """
assert self.branch is not None assert self.branch is not None
print(f"Starting postgres on brach {self.branch}")
self.zenith_cli.run(['pg', 'start', self.branch, f'--tenantid={self.tenant_id}']) self.zenith_cli.run(['pg', 'start', self.branch, f'--tenantid={self.tenant_id}'])
self.running = True self.running = True
self.pg_bin.run(['pg_controldata', self.pg_data_dir_path()])
return self return self
def pg_data_dir_path(self) -> str:
""" Path to data directory """
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch
return os.path.join(self.repo_dir, path)
def pg_xact_dir_path(self) -> str: def pg_xact_dir_path(self) -> str:
""" Path to pg_xact dir """ """ Path to pg_xact dir """
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch / 'pg_xact' return os.path.join(self.pg_data_dir_path(), 'pg_xact')
return os.path.join(self.repo_dir, path)
def pg_twophase_dir_path(self) -> str: def pg_twophase_dir_path(self) -> str:
""" Path to pg_twophase dir """ """ Path to pg_twophase dir """
print(self.tenant_id) return os.path.join(self.pg_data_dir_path(), 'pg_twophase')
print(self.branch)
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch / 'pg_twophase'
return os.path.join(self.repo_dir, path)
def config_file_path(self) -> str: def config_file_path(self) -> str:
""" Path to postgresql.conf """ """ Path to postgresql.conf """
filename = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch / 'postgresql.conf' return os.path.join(self.pg_data_dir_path(), 'postgresql.conf')
return os.path.join(self.repo_dir, filename)
def adjust_for_wal_acceptors(self, wal_acceptors: str) -> 'Postgres': def adjust_for_wal_acceptors(self, wal_acceptors: str) -> 'Postgres':
""" """
@@ -411,13 +475,14 @@ class Postgres(PgProtocol):
class PostgresFactory: class PostgresFactory:
""" An object representing multiple running postgres daemons. """ """ An object representing multiple running postgres daemons. """
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, initial_tenant: str, base_port: int = 55431): def __init__(self, zenith_cli: ZenithCli, repo_dir: str, pg_bin: PgBin, initial_tenant: str, base_port: int = 55431):
self.zenith_cli = zenith_cli self.zenith_cli = zenith_cli
self.repo_dir = repo_dir self.repo_dir = repo_dir
self.num_instances = 0 self.num_instances = 0
self.instances: List[Postgres] = [] self.instances: List[Postgres] = []
self.initial_tenant: str = initial_tenant self.initial_tenant: str = initial_tenant
self.base_port = base_port self.base_port = base_port
self.pg_bin = pg_bin
def create_start( def create_start(
self, self,
@@ -430,6 +495,7 @@ class PostgresFactory:
pg = Postgres( pg = Postgres(
zenith_cli=self.zenith_cli, zenith_cli=self.zenith_cli,
repo_dir=self.repo_dir, repo_dir=self.repo_dir,
pg_bin=self.pg_bin,
tenant_id=tenant_id or self.initial_tenant, tenant_id=tenant_id or self.initial_tenant,
port=self.base_port + self.num_instances + 1, port=self.base_port + self.num_instances + 1,
) )
@@ -503,8 +569,8 @@ def initial_tenant(pageserver: ZenithPageserver):
@zenfixture @zenfixture
def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str) -> Iterator[PostgresFactory]: def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str, pg_bin: PgBin) -> Iterator[PostgresFactory]:
pgfactory = PostgresFactory(zenith_cli, repo_dir, initial_tenant=initial_tenant) pgfactory = PostgresFactory(zenith_cli, repo_dir, pg_bin, initial_tenant=initial_tenant)
yield pgfactory yield pgfactory
@@ -512,67 +578,6 @@ def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str) -> Itera
print('Starting postgres cleanup') print('Starting postgres cleanup')
pgfactory.stop_all() pgfactory.stop_all()
class PgBin:
""" A helper class for executing postgres binaries """
def __init__(self, log_dir: str, pg_distrib_dir: str):
self.log_dir = log_dir
self.pg_install_path = pg_distrib_dir
self.pg_bin_path = os.path.join(self.pg_install_path, 'bin')
self.env = os.environ.copy()
self.env['LD_LIBRARY_PATH'] = os.path.join(self.pg_install_path, 'lib')
def _fixpath(self, command: List[str]) -> None:
if '/' not in command[0]:
command[0] = os.path.join(self.pg_bin_path, command[0])
def _build_env(self, env_add: Optional[Env]) -> Env:
if env_add is None:
return self.env
env = self.env.copy()
env.update(env_add)
return env
def run(self, command: List[str], env: Optional[Env] = None, cwd: Optional[str] = None) -> None:
"""
Run one of the postgres binaries.
The command should be in list form, e.g. ['pgbench', '-p', '55432']
All the necessary environment variables will be set.
If the first argument (the command name) doesn't include a path (no '/'
characters present), then it will be edited to include the correct path.
If you want stdout/stderr captured to files, use `run_capture` instead.
"""
self._fixpath(command)
print('Running command "{}"'.format(' '.join(command)))
env = self._build_env(env)
subprocess.run(command, env=env, cwd=cwd, check=True)
def run_capture(self,
command: List[str],
env: Optional[Env] = None,
cwd: Optional[str] = None) -> None:
"""
Run one of the postgres binaries, with stderr and stdout redirected to a file.
This is just like `run`, but for chatty programs.
"""
self._fixpath(command)
print('Running command "{}"'.format(' '.join(command)))
env = self._build_env(env)
subprocess_capture(self.log_dir, command, env=env, cwd=cwd, check=True)
@zenfixture
def pg_bin(test_output_dir: str, pg_distrib_dir: str) -> PgBin:
return PgBin(test_output_dir, pg_distrib_dir)
def read_pid(path: Path): def read_pid(path: Path):
""" Read content of file into number """ """ Read content of file into number """
return int(path.read_text()) return int(path.read_text())

View File

@@ -27,6 +27,7 @@ use crate::replication::HotStandbyFeedback;
use crate::send_wal::SendWalHandler; use crate::send_wal::SendWalHandler;
use crate::timeline::{Timeline, TimelineTools}; use crate::timeline::{Timeline, TimelineTools};
use crate::WalAcceptorConf; use crate::WalAcceptorConf;
use pageserver::waldecoder::WalStreamDecoder;
use postgres_ffi::xlog_utils::{TimeLineID, XLogFileName, MAX_SEND_SIZE, XLOG_BLCKSZ}; use postgres_ffi::xlog_utils::{TimeLineID, XLogFileName, MAX_SEND_SIZE, XLOG_BLCKSZ};
pub const SK_MAGIC: u32 = 0xcafeceefu32; pub const SK_MAGIC: u32 = 0xcafeceefu32;
@@ -300,8 +301,8 @@ impl<'pg> ReceiveWalConn<'pg> {
); );
} }
my_info.server.node_id = prop.node_id; my_info.server.node_id = prop.node_id;
this_timeline.get().set_info(&my_info);
/* Need to persist our vote first */ /* Need to persist our vote first */
this_timeline.get().set_info(&my_info);
this_timeline.get().save_control_file(true)?; this_timeline.get().save_control_file(true)?;
let mut flushed_restart_lsn = Lsn(0); let mut flushed_restart_lsn = Lsn(0);
@@ -322,9 +323,11 @@ impl<'pg> ReceiveWalConn<'pg> {
} }
info!( info!(
"Start streaming from timeline {} tenant {} address {:?} flush_lsn={}", "Start accepting WAL for timeline {} tenant {} address {:?} flush_lsn={}",
server_info.timeline_id, server_info.tenant_id, self.peer_addr, my_info.flush_lsn server_info.timeline_id, server_info.tenant_id, self.peer_addr, my_info.flush_lsn,
); );
let mut last_rec_lsn = Lsn(0);
let mut decoder = WalStreamDecoder::new(last_rec_lsn, false);
// Main loop // Main loop
loop { loop {
@@ -347,27 +350,52 @@ impl<'pg> ReceiveWalConn<'pg> {
let end_pos = req.end_lsn; let end_pos = req.end_lsn;
let rec_size = end_pos.checked_sub(start_pos).unwrap().0 as usize; let rec_size = end_pos.checked_sub(start_pos).unwrap().0 as usize;
assert!(rec_size <= MAX_SEND_SIZE); assert!(rec_size <= MAX_SEND_SIZE);
if rec_size != 0 {
debug!(
"received for {} bytes between {} and {}",
rec_size, start_pos, end_pos,
);
debug!( /* Receive message body (from the rest of the message) */
"received for {} bytes between {} and {}", let mut buf = Vec::with_capacity(rec_size);
rec_size, start_pos, end_pos, msg_reader.read_to_end(&mut buf)?;
); assert_eq!(buf.len(), rec_size);
/* Receive message body (from the rest of the message) */ if decoder.available() != start_pos {
let mut buf = Vec::with_capacity(rec_size); info!(
msg_reader.read_to_end(&mut buf)?; "Restart decoder from {} to {}",
assert_eq!(buf.len(), rec_size); decoder.available(),
start_pos
/* Save message in file */ );
Self::write_wal_file( decoder = WalStreamDecoder::new(start_pos, false);
swh, }
start_pos, decoder.feed_bytes(&buf);
timeline_id, loop {
this_timeline.get(), match decoder.poll_decode() {
wal_seg_size, Err(e) => info!("Decode error {}", e),
&buf, Ok(None) => {},
)?; Ok(Some((lsn, _rec))) => {
last_rec_lsn = lsn;
continue;
}
}
break;
}
info!(
"Receive WAL {}..{} last_rec_lsn={}",
start_pos, end_pos, last_rec_lsn
);
/* Save message in file */
Self::write_wal_file(
swh,
start_pos,
timeline_id,
this_timeline.get(),
wal_seg_size,
&buf,
)?;
}
my_info.restart_lsn = req.restart_lsn; my_info.restart_lsn = req.restart_lsn;
my_info.commit_lsn = req.commit_lsn; my_info.commit_lsn = req.commit_lsn;
@@ -377,13 +405,13 @@ impl<'pg> ReceiveWalConn<'pg> {
* maximum (vcl) determined by WAL proposer during handshake. * maximum (vcl) determined by WAL proposer during handshake.
* Switching epoch means that node completes recovery and start writing in the WAL new data. * Switching epoch means that node completes recovery and start writing in the WAL new data.
*/ */
if my_info.epoch < prop.epoch && end_pos > max(my_info.flush_lsn, prop.vcl) { if my_info.epoch < prop.epoch && end_pos >= max(my_info.flush_lsn, prop.vcl) {
info!("Switch to new epoch {}", prop.epoch); info!("Switch to new epoch {}", prop.epoch);
my_info.epoch = prop.epoch; /* bump epoch */ my_info.epoch = prop.epoch; /* bump epoch */
sync_control_file = true; sync_control_file = true;
} }
if end_pos > my_info.flush_lsn { if last_rec_lsn > my_info.flush_lsn {
my_info.flush_lsn = end_pos; my_info.flush_lsn = last_rec_lsn;
} }
/* /*
* Update restart LSN in control file. * Update restart LSN in control file.
@@ -391,6 +419,7 @@ impl<'pg> ReceiveWalConn<'pg> {
* when restart_lsn delta exceeds WAL segment size. * when restart_lsn delta exceeds WAL segment size.
*/ */
sync_control_file |= flushed_restart_lsn + (wal_seg_size as u64) < my_info.restart_lsn; sync_control_file |= flushed_restart_lsn + (wal_seg_size as u64) < my_info.restart_lsn;
this_timeline.get().set_info(&my_info);
this_timeline.get().save_control_file(sync_control_file)?; this_timeline.get().save_control_file(sync_control_file)?;
if sync_control_file { if sync_control_file {
@@ -401,7 +430,7 @@ impl<'pg> ReceiveWalConn<'pg> {
//info!("Confirm LSN: {:X}/{:>08X}", (end_pos>>32) as u32, end_pos as u32); //info!("Confirm LSN: {:X}/{:>08X}", (end_pos>>32) as u32, end_pos as u32);
let resp = SafeKeeperResponse { let resp = SafeKeeperResponse {
epoch: my_info.epoch, epoch: my_info.epoch,
flush_lsn: end_pos, flush_lsn: my_info.flush_lsn,
hs_feedback: this_timeline.get().get_hs_feedback(), hs_feedback: this_timeline.get().get_hs_feedback(),
}; };
self.write_msg(&resp)?; self.write_msg(&resp)?;
@@ -410,9 +439,15 @@ impl<'pg> ReceiveWalConn<'pg> {
* Ping wal sender that new data is available. * Ping wal sender that new data is available.
* FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper. * FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper.
*/ */
trace!(
"Notify WAL senders min({}, {})={}",
req.commit_lsn,
my_info.flush_lsn,
min(req.commit_lsn, my_info.flush_lsn)
);
this_timeline this_timeline
.get() .get()
.notify_wal_senders(min(req.commit_lsn, end_pos)); .notify_wal_senders(min(req.commit_lsn, my_info.flush_lsn));
} }
Ok(()) Ok(())

View File

@@ -76,8 +76,10 @@ impl ReplicationConn {
let feedback = HotStandbyFeedback::des(&m)?; let feedback = HotStandbyFeedback::des(&m)?;
subscriber.add_hs_feedback(feedback); subscriber.add_hs_feedback(feedback);
} }
FeMessage::Sync => {}
FeMessage::CopyFailed => return Err(anyhow!("Copy failed")),
_ => { _ => {
// We only handle `CopyData` messages. Anything else is ignored. // We only handle `CopyData`, 'Sync', 'CopyFailed' messages. Anything else is ignored.
info!("unexpected message {:?}", msg); info!("unexpected message {:?}", msg);
} }
} }
@@ -215,9 +217,14 @@ impl ReplicationConn {
data: &file_buf, data: &file_buf,
}))?; }))?;
start_pos += send_size as u64; debug!(
"Sent WAL to page server {}..{}, end_pos={}",
start_pos,
start_pos + send_size as u64,
end_pos
);
debug!("Sent WAL to page server up to {}", end_pos); start_pos += send_size as u64;
// Decide whether to reuse this file. If we don't set wal_file here // Decide whether to reuse this file. If we don't set wal_file here
// a new file will be opened next time. // a new file will be opened next time.

View File

@@ -175,7 +175,7 @@ impl Timeline {
} }
} }
fn _stop_wal_senders(&self) { pub fn stop_wal_senders(&self) {
self.notify_wal_senders(END_REPLICATION_MARKER); self.notify_wal_senders(END_REPLICATION_MARKER);
} }

View File

@@ -24,6 +24,11 @@ impl Lsn {
/// Maximum possible value for an LSN /// Maximum possible value for an LSN
pub const MAX: Lsn = Lsn(u64::MAX); pub const MAX: Lsn = Lsn(u64::MAX);
/// Align LSN on 8-byte boundary (alignment of WAL records).
pub fn align(&self) -> Lsn {
Lsn((self.0 + 7) & !7)
}
/// Subtract a number, returning None on overflow. /// Subtract a number, returning None on overflow.
pub fn checked_sub<T: Into<u64>>(self, other: T) -> Option<Lsn> { pub fn checked_sub<T: Into<u64>>(self, other: T) -> Option<Lsn> {
let other: u64 = other.into(); let other: u64 = other.into();

View File

@@ -341,7 +341,7 @@ impl PostgresBackend {
// We prefer explicit pattern matching to wildcards, because // We prefer explicit pattern matching to wildcards, because
// this helps us spot the places where new variants are missing // this helps us spot the places where new variants are missing
FeMessage::CopyData(_) | FeMessage::CopyDone => { FeMessage::CopyData(_) | FeMessage::CopyDone | FeMessage::CopyFailed => {
bail!("unexpected message type: {:?}", msg); bail!("unexpected message type: {:?}", msg);
} }
} }

View File

@@ -31,6 +31,7 @@ pub enum FeMessage {
Terminate, Terminate,
CopyData(Bytes), CopyData(Bytes),
CopyDone, CopyDone,
CopyFailed,
PasswordMessage(Bytes), PasswordMessage(Bytes),
} }
@@ -138,6 +139,7 @@ impl FeMessage {
b'X' => Ok(Some(FeMessage::Terminate)), b'X' => Ok(Some(FeMessage::Terminate)),
b'd' => Ok(Some(FeMessage::CopyData(body))), b'd' => Ok(Some(FeMessage::CopyData(body))),
b'c' => Ok(Some(FeMessage::CopyDone)), b'c' => Ok(Some(FeMessage::CopyDone)),
b'f' => Ok(Some(FeMessage::CopyFailed)),
b'p' => Ok(Some(FeMessage::PasswordMessage(body))), b'p' => Ok(Some(FeMessage::PasswordMessage(body))),
tag => Err(anyhow!("unknown message tag: {},'{:?}'", tag, body)), tag => Err(anyhow!("unknown message tag: {},'{:?}'", tag, body)),
} }
@@ -338,6 +340,7 @@ pub enum BeMessage<'a> {
ControlFile, ControlFile,
CopyData(&'a [u8]), CopyData(&'a [u8]),
CopyDone, CopyDone,
CopyFailed,
CopyInResponse, CopyInResponse,
CopyOutResponse, CopyOutResponse,
CopyBothResponse, CopyBothResponse,
@@ -546,6 +549,11 @@ impl<'a> BeMessage<'a> {
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap(); write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
} }
BeMessage::CopyFailed => {
buf.put_u8(b'f');
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
}
BeMessage::CopyInResponse => { BeMessage::CopyInResponse => {
buf.put_u8(b'G'); buf.put_u8(b'G');
write_body(buf, |buf| { write_body(buf, |buf| {