mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-06 04:00:37 +00:00
Compare commits
14 Commits
split-prox
...
safe_flush
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a8a2f62bc3 | ||
|
|
26060dd68e | ||
|
|
73d823e53c | ||
|
|
112909c5e4 | ||
|
|
07adc9dbda | ||
|
|
c05cedc626 | ||
|
|
815528e0ce | ||
|
|
a2e135b404 | ||
|
|
72de70a8cc | ||
|
|
4051c5d4ff | ||
|
|
f86bf26466 | ||
|
|
3ca4b638ac | ||
|
|
d61699b0f8 | ||
|
|
ead94feb05 |
@@ -320,7 +320,7 @@ impl PostgresNode {
|
|||||||
|
|
||||||
// Never clean up old WAL. TODO: We should use a replication
|
// Never clean up old WAL. TODO: We should use a replication
|
||||||
// slot or something proper, to prevent the compute node
|
// slot or something proper, to prevent the compute node
|
||||||
// from removing WAL that hasn't been streamed to the safekeepr or
|
// from removing WAL that hasn't been streamed to the safekeeper or
|
||||||
// page server yet. (gh issue #349)
|
// page server yet. (gh issue #349)
|
||||||
self.append_conf("postgresql.conf", "wal_keep_size='10TB'\n")?;
|
self.append_conf("postgresql.conf", "wal_keep_size='10TB'\n")?;
|
||||||
|
|
||||||
|
|||||||
@@ -842,6 +842,34 @@ impl Timeline for LayeredTimeline {
|
|||||||
fn get_prev_record_lsn(&self) -> Lsn {
|
fn get_prev_record_lsn(&self) -> Lsn {
|
||||||
self.prev_record_lsn.load()
|
self.prev_record_lsn.load()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
///
|
||||||
|
/// Wait until WAL has been received up to the given LSN.
|
||||||
|
///
|
||||||
|
fn wait_lsn(&self, mut lsn: Lsn) -> anyhow::Result<Lsn> {
|
||||||
|
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
|
||||||
|
// This is necessary for bootstrap.
|
||||||
|
if lsn == Lsn(0) {
|
||||||
|
let last_valid_lsn = self.last_valid_lsn.load();
|
||||||
|
trace!(
|
||||||
|
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
|
||||||
|
last_valid_lsn,
|
||||||
|
lsn
|
||||||
|
);
|
||||||
|
lsn = last_valid_lsn;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.last_valid_lsn
|
||||||
|
.wait_for_timeout(lsn, TIMEOUT)
|
||||||
|
.with_context(|| {
|
||||||
|
format!(
|
||||||
|
"Timed out while waiting for WAL record at LSN {} to arrive",
|
||||||
|
lsn
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
Ok(lsn)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl LayeredTimeline {
|
impl LayeredTimeline {
|
||||||
@@ -1055,34 +1083,6 @@ impl LayeredTimeline {
|
|||||||
Ok(layer_rc)
|
Ok(layer_rc)
|
||||||
}
|
}
|
||||||
|
|
||||||
///
|
|
||||||
/// Wait until WAL has been received up to the given LSN.
|
|
||||||
///
|
|
||||||
fn wait_lsn(&self, mut lsn: Lsn) -> anyhow::Result<Lsn> {
|
|
||||||
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
|
|
||||||
// This is necessary for bootstrap.
|
|
||||||
if lsn == Lsn(0) {
|
|
||||||
let last_valid_lsn = self.last_valid_lsn.load();
|
|
||||||
trace!(
|
|
||||||
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
|
|
||||||
last_valid_lsn,
|
|
||||||
lsn
|
|
||||||
);
|
|
||||||
lsn = last_valid_lsn;
|
|
||||||
}
|
|
||||||
|
|
||||||
self.last_valid_lsn
|
|
||||||
.wait_for_timeout(lsn, TIMEOUT)
|
|
||||||
.with_context(|| {
|
|
||||||
format!(
|
|
||||||
"Timed out while waiting for WAL record at LSN {} to arrive",
|
|
||||||
lsn
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
Ok(lsn)
|
|
||||||
}
|
|
||||||
|
|
||||||
///
|
///
|
||||||
/// Flush to disk all data that was written with the put_* functions
|
/// Flush to disk all data that was written with the put_* functions
|
||||||
///
|
///
|
||||||
|
|||||||
@@ -663,7 +663,7 @@ impl Timeline for ObjectTimeline {
|
|||||||
assert!(old <= lsn);
|
assert!(old <= lsn);
|
||||||
|
|
||||||
// Use old value of last_record_lsn as prev_record_lsn
|
// Use old value of last_record_lsn as prev_record_lsn
|
||||||
self.prev_record_lsn.fetch_max(Lsn((old.0 + 7) & !7));
|
self.prev_record_lsn.fetch_max(old.align());
|
||||||
|
|
||||||
// Also advance last_valid_lsn
|
// Also advance last_valid_lsn
|
||||||
let old = self.last_valid_lsn.advance(lsn);
|
let old = self.last_valid_lsn.advance(lsn);
|
||||||
@@ -712,6 +712,41 @@ impl Timeline for ObjectTimeline {
|
|||||||
let iter = self.obj_store.objects(self.timelineid, lsn)?;
|
let iter = self.obj_store.objects(self.timelineid, lsn)?;
|
||||||
Ok(Box::new(ObjectHistory { lsn, iter }))
|
Ok(Box::new(ObjectHistory { lsn, iter }))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// Wait until WAL has been received up to the given LSN.
|
||||||
|
//
|
||||||
|
fn wait_lsn(&self, req_lsn: Lsn) -> Result<Lsn> {
|
||||||
|
let mut lsn = req_lsn;
|
||||||
|
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
|
||||||
|
// This is necessary for bootstrap.
|
||||||
|
if lsn == Lsn(0) {
|
||||||
|
let last_valid_lsn = self.last_valid_lsn.load();
|
||||||
|
trace!(
|
||||||
|
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
|
||||||
|
last_valid_lsn,
|
||||||
|
lsn
|
||||||
|
);
|
||||||
|
lsn = last_valid_lsn;
|
||||||
|
}
|
||||||
|
trace!(
|
||||||
|
"Start waiting for LSN {}, valid LSN is {}",
|
||||||
|
lsn,
|
||||||
|
self.last_valid_lsn.load()
|
||||||
|
);
|
||||||
|
self.last_valid_lsn
|
||||||
|
.wait_for_timeout(lsn, TIMEOUT)
|
||||||
|
.with_context(|| {
|
||||||
|
format!(
|
||||||
|
"Timed out while waiting for WAL record at LSN {} to arrive. valid LSN in {}",
|
||||||
|
lsn,
|
||||||
|
self.last_valid_lsn.load(),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
//trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
|
||||||
|
|
||||||
|
Ok(lsn)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ObjectTimeline {
|
impl ObjectTimeline {
|
||||||
@@ -820,40 +855,6 @@ impl ObjectTimeline {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
|
||||||
// Wait until WAL has been received up to the given LSN.
|
|
||||||
//
|
|
||||||
fn wait_lsn(&self, mut lsn: Lsn) -> Result<Lsn> {
|
|
||||||
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
|
|
||||||
// This is necessary for bootstrap.
|
|
||||||
if lsn == Lsn(0) {
|
|
||||||
let last_valid_lsn = self.last_valid_lsn.load();
|
|
||||||
trace!(
|
|
||||||
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
|
|
||||||
last_valid_lsn,
|
|
||||||
lsn
|
|
||||||
);
|
|
||||||
lsn = last_valid_lsn;
|
|
||||||
}
|
|
||||||
trace!(
|
|
||||||
"Start waiting for LSN {}, valid LSN is {}",
|
|
||||||
lsn,
|
|
||||||
self.last_valid_lsn.load()
|
|
||||||
);
|
|
||||||
self.last_valid_lsn
|
|
||||||
.wait_for_timeout(lsn, TIMEOUT)
|
|
||||||
.with_context(|| {
|
|
||||||
format!(
|
|
||||||
"Timed out while waiting for WAL record at LSN {} to arrive. valid LSN in {}",
|
|
||||||
lsn,
|
|
||||||
self.last_valid_lsn.load(),
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
//trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
|
|
||||||
|
|
||||||
Ok(lsn)
|
|
||||||
}
|
|
||||||
|
|
||||||
///
|
///
|
||||||
/// Iterate through object versions with given key, in reverse LSN order.
|
/// Iterate through object versions with given key, in reverse LSN order.
|
||||||
///
|
///
|
||||||
|
|||||||
@@ -357,8 +357,13 @@ impl PageServerHandler {
|
|||||||
|
|
||||||
/* Send a tarball of the latest snapshot on the timeline */
|
/* Send a tarball of the latest snapshot on the timeline */
|
||||||
|
|
||||||
let req_lsn = lsn.unwrap_or_else(|| timeline.get_last_valid_lsn());
|
let req_lsn = match lsn {
|
||||||
|
Some(lsn) => {
|
||||||
|
timeline.wait_lsn(lsn)?;
|
||||||
|
lsn
|
||||||
|
}
|
||||||
|
None => timeline.get_last_record_lsn(),
|
||||||
|
};
|
||||||
{
|
{
|
||||||
let mut writer = CopyDataSink { pgb };
|
let mut writer = CopyDataSink { pgb };
|
||||||
let mut basebackup = basebackup::Basebackup::new(
|
let mut basebackup = basebackup::Basebackup::new(
|
||||||
@@ -469,7 +474,7 @@ impl postgres_backend::Handler for PageServerHandler {
|
|||||||
let (_, params_raw) = query_string.split_at("basebackup ".len());
|
let (_, params_raw) = query_string.split_at("basebackup ".len());
|
||||||
let params = params_raw.split(" ").collect::<Vec<_>>();
|
let params = params_raw.split(" ").collect::<Vec<_>>();
|
||||||
ensure!(
|
ensure!(
|
||||||
params.len() == 2,
|
params.len() >= 2,
|
||||||
"invalid param number for basebackup command"
|
"invalid param number for basebackup command"
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -479,7 +484,7 @@ impl postgres_backend::Handler for PageServerHandler {
|
|||||||
self.check_permission(Some(tenantid))?;
|
self.check_permission(Some(tenantid))?;
|
||||||
|
|
||||||
// TODO are there any tests with lsn option?
|
// TODO are there any tests with lsn option?
|
||||||
let lsn = if params.len() == 3 {
|
let lsn = if params.len() == 3 && params[2].len() != 0 {
|
||||||
Some(Lsn::from_str(params[2])?)
|
Some(Lsn::from_str(params[2])?)
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
@@ -575,6 +580,10 @@ impl postgres_backend::Handler for PageServerHandler {
|
|||||||
timeline.advance_last_valid_lsn(last_lsn);
|
timeline.advance_last_valid_lsn(last_lsn);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
FeMessage::CopyFailed => {
|
||||||
|
info!("Copy failed");
|
||||||
|
break;
|
||||||
|
}
|
||||||
FeMessage::Sync => {}
|
FeMessage::Sync => {}
|
||||||
_ => bail!("unexpected message {:?}", msg),
|
_ => bail!("unexpected message {:?}", msg),
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -203,6 +203,11 @@ pub trait Timeline: Send + Sync {
|
|||||||
/// Relation size is increased implicitly and decreased with Truncate updates.
|
/// Relation size is increased implicitly and decreased with Truncate updates.
|
||||||
// TODO ordering guarantee?
|
// TODO ordering guarantee?
|
||||||
fn history<'a>(&'a self) -> Result<Box<dyn History + 'a>>;
|
fn history<'a>(&'a self) -> Result<Box<dyn History + 'a>>;
|
||||||
|
|
||||||
|
//
|
||||||
|
// Wait until WAL has been received up to the given LSN.
|
||||||
|
//
|
||||||
|
fn wait_lsn(&self, lsn: Lsn) -> Result<Lsn>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait History: Iterator<Item = Result<Modification>> {
|
pub trait History: Iterator<Item = Result<Modification>> {
|
||||||
|
|||||||
@@ -264,7 +264,7 @@ fn import_slru_file(timeline: &dyn Timeline, lsn: Lsn, slru: SlruKind, path: &Pa
|
|||||||
/// Scan PostgreSQL WAL files in given directory
|
/// Scan PostgreSQL WAL files in given directory
|
||||||
/// and load all records >= 'startpoint' into the repository.
|
/// and load all records >= 'startpoint' into the repository.
|
||||||
pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: Lsn) -> Result<()> {
|
pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: Lsn) -> Result<()> {
|
||||||
let mut waldecoder = WalStreamDecoder::new(startpoint);
|
let mut waldecoder = WalStreamDecoder::new(startpoint, true);
|
||||||
|
|
||||||
let mut segno = startpoint.segment_number(pg_constants::WAL_SEGMENT_SIZE);
|
let mut segno = startpoint.segment_number(pg_constants::WAL_SEGMENT_SIZE);
|
||||||
let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE);
|
let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE);
|
||||||
@@ -427,7 +427,9 @@ pub fn save_decoded_record(
|
|||||||
// Remove twophase file. see RemoveTwoPhaseFile() in postgres code
|
// Remove twophase file. see RemoveTwoPhaseFile() in postgres code
|
||||||
trace!(
|
trace!(
|
||||||
"unlink twophaseFile for xid {} parsed_xact.xid {} here at {}",
|
"unlink twophaseFile for xid {} parsed_xact.xid {} here at {}",
|
||||||
decoded.xl_xid, parsed_xact.xid, lsn
|
decoded.xl_xid,
|
||||||
|
parsed_xact.xid,
|
||||||
|
lsn
|
||||||
);
|
);
|
||||||
timeline.put_unlink(
|
timeline.put_unlink(
|
||||||
RelishTag::TwoPhase {
|
RelishTag::TwoPhase {
|
||||||
|
|||||||
@@ -25,13 +25,13 @@ pub type MultiXactStatus = u32;
|
|||||||
pub struct WalStreamDecoder {
|
pub struct WalStreamDecoder {
|
||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
|
|
||||||
startlsn: Lsn, // LSN where this record starts
|
|
||||||
contlen: u32,
|
contlen: u32,
|
||||||
padlen: u32,
|
padlen: u32,
|
||||||
|
|
||||||
inputbuf: BytesMut,
|
inputbuf: BytesMut,
|
||||||
|
|
||||||
recordbuf: BytesMut,
|
recordbuf: BytesMut,
|
||||||
|
|
||||||
|
crc_check: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Error, Debug, Clone)]
|
#[derive(Error, Debug, Clone)]
|
||||||
@@ -46,19 +46,24 @@ pub struct WalDecodeError {
|
|||||||
// FIXME: This isn't a proper rust stream
|
// FIXME: This isn't a proper rust stream
|
||||||
//
|
//
|
||||||
impl WalStreamDecoder {
|
impl WalStreamDecoder {
|
||||||
pub fn new(lsn: Lsn) -> WalStreamDecoder {
|
pub fn new(lsn: Lsn, crc_check: bool) -> WalStreamDecoder {
|
||||||
WalStreamDecoder {
|
WalStreamDecoder {
|
||||||
lsn,
|
lsn,
|
||||||
|
|
||||||
startlsn: Lsn(0),
|
|
||||||
contlen: 0,
|
contlen: 0,
|
||||||
padlen: 0,
|
padlen: 0,
|
||||||
|
|
||||||
inputbuf: BytesMut::new(),
|
inputbuf: BytesMut::new(),
|
||||||
recordbuf: BytesMut::new(),
|
recordbuf: BytesMut::new(),
|
||||||
|
|
||||||
|
crc_check,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn available(&self) -> Lsn {
|
||||||
|
self.lsn + self.inputbuf.remaining() as u64
|
||||||
|
}
|
||||||
|
|
||||||
pub fn feed_bytes(&mut self, buf: &[u8]) {
|
pub fn feed_bytes(&mut self, buf: &[u8]) {
|
||||||
self.inputbuf.extend_from_slice(buf);
|
self.inputbuf.extend_from_slice(buf);
|
||||||
}
|
}
|
||||||
@@ -92,7 +97,9 @@ impl WalStreamDecoder {
|
|||||||
// TODO: verify the remaining fields in the header
|
// TODO: verify the remaining fields in the header
|
||||||
|
|
||||||
self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
|
self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
|
||||||
continue;
|
if !self.crc_check && self.contlen != hdr.std.xlp_rem_len {
|
||||||
|
self.contlen = hdr.std.xlp_rem_len; // skip continuation record
|
||||||
|
}
|
||||||
} else if self.lsn.block_offset() == 0 {
|
} else if self.lsn.block_offset() == 0 {
|
||||||
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD {
|
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
@@ -102,14 +109,19 @@ impl WalStreamDecoder {
|
|||||||
|
|
||||||
if hdr.xlp_pageaddr != self.lsn.0 {
|
if hdr.xlp_pageaddr != self.lsn.0 {
|
||||||
return Err(WalDecodeError {
|
return Err(WalDecodeError {
|
||||||
msg: "invalid xlog page header".into(),
|
msg: format!(
|
||||||
|
"invalid xlog page header: xlp_pageaddr={} vs. lsn={}",
|
||||||
|
hdr.xlp_pageaddr, self.lsn
|
||||||
|
),
|
||||||
lsn: self.lsn,
|
lsn: self.lsn,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
// TODO: verify the remaining fields in the header
|
// TODO: verify the remaining fields in the header
|
||||||
|
|
||||||
self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
|
self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
|
||||||
continue;
|
if !self.crc_check && self.contlen != hdr.xlp_rem_len {
|
||||||
|
self.contlen = hdr.xlp_rem_len; // skip continuation record
|
||||||
|
}
|
||||||
} else if self.padlen > 0 {
|
} else if self.padlen > 0 {
|
||||||
if self.inputbuf.remaining() < self.padlen as usize {
|
if self.inputbuf.remaining() < self.padlen as usize {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
@@ -127,7 +139,6 @@ impl WalStreamDecoder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// read xl_tot_len FIXME: assumes little-endian
|
// read xl_tot_len FIXME: assumes little-endian
|
||||||
self.startlsn = self.lsn;
|
|
||||||
let xl_tot_len = self.inputbuf.get_u32_le();
|
let xl_tot_len = self.inputbuf.get_u32_le();
|
||||||
if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
|
if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
|
||||||
return Err(WalDecodeError {
|
return Err(WalDecodeError {
|
||||||
@@ -142,7 +153,6 @@ impl WalStreamDecoder {
|
|||||||
self.recordbuf.put_u32_le(xl_tot_len);
|
self.recordbuf.put_u32_le(xl_tot_len);
|
||||||
|
|
||||||
self.contlen = xl_tot_len - 4;
|
self.contlen = xl_tot_len - 4;
|
||||||
continue;
|
|
||||||
} else {
|
} else {
|
||||||
// we're continuing a record, possibly from previous page.
|
// we're continuing a record, possibly from previous page.
|
||||||
let pageleft = self.lsn.remaining_in_block() as u32;
|
let pageleft = self.lsn.remaining_in_block() as u32;
|
||||||
@@ -164,17 +174,10 @@ impl WalStreamDecoder {
|
|||||||
let recordbuf = recordbuf.freeze();
|
let recordbuf = recordbuf.freeze();
|
||||||
let mut buf = recordbuf.clone();
|
let mut buf = recordbuf.clone();
|
||||||
|
|
||||||
|
let xlogrec = XLogRecord::from_bytes(&mut buf);
|
||||||
|
|
||||||
// XLOG_SWITCH records are special. If we see one, we need to skip
|
// XLOG_SWITCH records are special. If we see one, we need to skip
|
||||||
// to the next WAL segment.
|
// to the next WAL segment.
|
||||||
let xlogrec = XLogRecord::from_bytes(&mut buf);
|
|
||||||
let mut crc = crc32c_append(0, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
|
|
||||||
crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
|
|
||||||
if crc != xlogrec.xl_crc {
|
|
||||||
return Err(WalDecodeError {
|
|
||||||
msg: "WAL record crc mismatch".into(),
|
|
||||||
lsn: self.lsn,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
if xlogrec.is_xlog_switch_record() {
|
if xlogrec.is_xlog_switch_record() {
|
||||||
trace!("saw xlog switch record at {}", self.lsn);
|
trace!("saw xlog switch record at {}", self.lsn);
|
||||||
self.padlen =
|
self.padlen =
|
||||||
@@ -184,10 +187,29 @@ impl WalStreamDecoder {
|
|||||||
self.padlen = self.lsn.calc_padding(8u32) as u32;
|
self.padlen = self.lsn.calc_padding(8u32) as u32;
|
||||||
}
|
}
|
||||||
|
|
||||||
let result = (self.lsn, recordbuf);
|
// Check record CRC
|
||||||
|
if self.crc_check {
|
||||||
|
let mut crc = crc32c_append(0, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
|
||||||
|
crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
|
||||||
|
if crc != xlogrec.xl_crc {
|
||||||
|
info!("WAL record crc mismatch n={}, buf.len()={}, lsn={}, rec={:?}, recordbuf={:?}",
|
||||||
|
n, recordbuf.len(), self.lsn, xlogrec, recordbuf);
|
||||||
|
return Err(WalDecodeError {
|
||||||
|
msg: format!(
|
||||||
|
"WAL record crc mismatch n={}, buf.len()={}, lsn={}, rec={:?}",
|
||||||
|
n,
|
||||||
|
buf.len(),
|
||||||
|
self.lsn,
|
||||||
|
xlogrec
|
||||||
|
),
|
||||||
|
lsn: self.lsn,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let result = (self.lsn.align(), recordbuf);
|
||||||
return Ok(Some(result));
|
return Ok(Some(result));
|
||||||
}
|
}
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// check record boundaries
|
// check record boundaries
|
||||||
|
|||||||
@@ -22,8 +22,6 @@ use postgres_types::PgLsn;
|
|||||||
use std::cmp::{max, min};
|
use std::cmp::{max, min};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use std::fs::{File, OpenOptions};
|
|
||||||
use std::io::{Seek, SeekFrom, Write};
|
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::Mutex;
|
use std::sync::Mutex;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
@@ -178,7 +176,7 @@ fn walreceiver_main(
|
|||||||
let copy_stream = rclient.copy_both_simple(&query)?;
|
let copy_stream = rclient.copy_both_simple(&query)?;
|
||||||
let mut physical_stream = ReplicationIter::new(copy_stream);
|
let mut physical_stream = ReplicationIter::new(copy_stream);
|
||||||
|
|
||||||
let mut waldecoder = WalStreamDecoder::new(startpoint);
|
let mut waldecoder = WalStreamDecoder::new(startpoint, true);
|
||||||
|
|
||||||
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(RelishTag::Checkpoint, 0, startpoint)?;
|
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(RelishTag::Checkpoint, 0, startpoint)?;
|
||||||
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
|
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
|
||||||
@@ -194,45 +192,51 @@ fn walreceiver_main(
|
|||||||
let endlsn = startlsn + data.len() as u64;
|
let endlsn = startlsn + data.len() as u64;
|
||||||
let prev_last_rec_lsn = last_rec_lsn;
|
let prev_last_rec_lsn = last_rec_lsn;
|
||||||
|
|
||||||
write_wal_file(
|
|
||||||
conf,
|
|
||||||
startlsn,
|
|
||||||
&timelineid,
|
|
||||||
pg_constants::WAL_SEGMENT_SIZE,
|
|
||||||
data,
|
|
||||||
tenantid,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
trace!("received XLogData between {} and {}", startlsn, endlsn);
|
trace!("received XLogData between {} and {}", startlsn, endlsn);
|
||||||
|
|
||||||
waldecoder.feed_bytes(data);
|
waldecoder.feed_bytes(data);
|
||||||
|
|
||||||
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
loop {
|
||||||
// Save old checkpoint value to compare with it after decoding WAL record
|
match waldecoder.poll_decode() {
|
||||||
let old_checkpoint_bytes = checkpoint.encode();
|
Ok(Some((lsn, recdata))) => {
|
||||||
let decoded = decode_wal_record(recdata.clone());
|
// Save old checkpoint value to compare with it after decoding WAL record
|
||||||
restore_local_repo::save_decoded_record(
|
let old_checkpoint_bytes = checkpoint.encode();
|
||||||
&mut checkpoint,
|
let decoded = decode_wal_record(recdata.clone());
|
||||||
&*timeline,
|
restore_local_repo::save_decoded_record(
|
||||||
&decoded,
|
&mut checkpoint,
|
||||||
recdata,
|
&*timeline,
|
||||||
lsn,
|
&decoded,
|
||||||
)?;
|
recdata,
|
||||||
last_rec_lsn = lsn;
|
lsn,
|
||||||
|
)?;
|
||||||
|
last_rec_lsn = lsn;
|
||||||
|
|
||||||
let new_checkpoint_bytes = checkpoint.encode();
|
let new_checkpoint_bytes = checkpoint.encode();
|
||||||
// Check if checkpoint data was updated by save_decoded_record
|
// Check if checkpoint data was updated by save_decoded_record
|
||||||
if new_checkpoint_bytes != old_checkpoint_bytes {
|
if new_checkpoint_bytes != old_checkpoint_bytes {
|
||||||
timeline.put_page_image(
|
timeline.put_page_image(
|
||||||
RelishTag::Checkpoint,
|
RelishTag::Checkpoint,
|
||||||
0,
|
0,
|
||||||
lsn,
|
lsn,
|
||||||
new_checkpoint_bytes,
|
new_checkpoint_bytes,
|
||||||
false,
|
false,
|
||||||
)?;
|
)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(None) => {
|
||||||
|
trace!(
|
||||||
|
"End of replication stream {}..{} at {}",
|
||||||
|
startlsn,
|
||||||
|
endlsn,
|
||||||
|
last_rec_lsn
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
info!("Decode error {}", e);
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the last_valid LSN value in the page cache one more time. We updated
|
// Update the last_valid LSN value in the page cache one more time. We updated
|
||||||
// it in the loop above, between each WAL record, but we might have received
|
// it in the loop above, between each WAL record, but we might have received
|
||||||
// a partial record after the last completed record. Our page cache's value
|
// a partial record after the last completed record. Our page cache's value
|
||||||
@@ -407,98 +411,3 @@ pub fn identify_system(client: &mut Client) -> Result<IdentifySystem, Error> {
|
|||||||
Err(IdentifyError.into())
|
Err(IdentifyError.into())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_wal_file(
|
|
||||||
conf: &PageServerConf,
|
|
||||||
startpos: Lsn,
|
|
||||||
timelineid: &ZTimelineId,
|
|
||||||
wal_seg_size: usize,
|
|
||||||
buf: &[u8],
|
|
||||||
tenantid: &ZTenantId,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let mut bytes_left: usize = buf.len();
|
|
||||||
let mut bytes_written: usize = 0;
|
|
||||||
let mut partial;
|
|
||||||
let mut start_pos = startpos;
|
|
||||||
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
|
|
||||||
|
|
||||||
let wal_dir = conf.wal_dir_path(timelineid, tenantid);
|
|
||||||
|
|
||||||
/* Extract WAL location for this block */
|
|
||||||
let mut xlogoff = start_pos.segment_offset(wal_seg_size);
|
|
||||||
|
|
||||||
while bytes_left != 0 {
|
|
||||||
let bytes_to_write;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If crossing a WAL boundary, only write up until we reach wal
|
|
||||||
* segment size.
|
|
||||||
*/
|
|
||||||
if xlogoff + bytes_left > wal_seg_size {
|
|
||||||
bytes_to_write = wal_seg_size - xlogoff;
|
|
||||||
} else {
|
|
||||||
bytes_to_write = bytes_left;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Open file */
|
|
||||||
let segno = start_pos.segment_number(wal_seg_size);
|
|
||||||
let wal_file_name = XLogFileName(
|
|
||||||
1, // FIXME: always use Postgres timeline 1
|
|
||||||
segno,
|
|
||||||
wal_seg_size,
|
|
||||||
);
|
|
||||||
let wal_file_path = wal_dir.join(wal_file_name.clone());
|
|
||||||
let wal_file_partial_path = wal_dir.join(wal_file_name.clone() + ".partial");
|
|
||||||
|
|
||||||
{
|
|
||||||
let mut wal_file: File;
|
|
||||||
/* Try to open already completed segment */
|
|
||||||
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
|
|
||||||
wal_file = file;
|
|
||||||
partial = false;
|
|
||||||
} else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) {
|
|
||||||
/* Try to open existed partial file */
|
|
||||||
wal_file = file;
|
|
||||||
partial = true;
|
|
||||||
} else {
|
|
||||||
/* Create and fill new partial file */
|
|
||||||
partial = true;
|
|
||||||
match OpenOptions::new()
|
|
||||||
.create(true)
|
|
||||||
.write(true)
|
|
||||||
.open(&wal_file_partial_path)
|
|
||||||
{
|
|
||||||
Ok(mut file) => {
|
|
||||||
for _ in 0..(wal_seg_size / XLOG_BLCKSZ) {
|
|
||||||
file.write_all(&ZERO_BLOCK)?;
|
|
||||||
}
|
|
||||||
wal_file = file;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
error!("Failed to open log file {:?}: {}", &wal_file_path, e);
|
|
||||||
return Err(e.into());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
wal_file.seek(SeekFrom::Start(xlogoff as u64))?;
|
|
||||||
wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?;
|
|
||||||
|
|
||||||
// FIXME: Flush the file
|
|
||||||
//wal_file.sync_all()?;
|
|
||||||
}
|
|
||||||
/* Write was successful, advance our position */
|
|
||||||
bytes_written += bytes_to_write;
|
|
||||||
bytes_left -= bytes_to_write;
|
|
||||||
start_pos += bytes_to_write as u64;
|
|
||||||
xlogoff += bytes_to_write;
|
|
||||||
|
|
||||||
/* Did we reach the end of a WAL segment? */
|
|
||||||
if start_pos.segment_offset(wal_seg_size) == 0 {
|
|
||||||
xlogoff = 0;
|
|
||||||
if partial {
|
|
||||||
fs::rename(&wal_file_partial_path, &wal_file_path)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -108,17 +108,23 @@ fn find_end_of_wal_segment(
|
|||||||
segno: XLogSegNo,
|
segno: XLogSegNo,
|
||||||
tli: TimeLineID,
|
tli: TimeLineID,
|
||||||
wal_seg_size: usize,
|
wal_seg_size: usize,
|
||||||
|
is_partial: bool,
|
||||||
|
rec_offs: &mut usize,
|
||||||
|
rec_hdr: &mut [u8; XLOG_SIZE_OF_XLOG_RECORD],
|
||||||
|
crc: &mut u32,
|
||||||
|
check_contrec: bool,
|
||||||
) -> u32 {
|
) -> u32 {
|
||||||
let mut offs: usize = 0;
|
let mut offs: usize = 0;
|
||||||
let mut contlen: usize = 0;
|
let mut contlen: usize = 0;
|
||||||
let mut wal_crc: u32 = 0;
|
|
||||||
let mut crc: u32 = 0;
|
|
||||||
let mut rec_offs: usize = 0;
|
|
||||||
let mut buf = [0u8; XLOG_BLCKSZ];
|
let mut buf = [0u8; XLOG_BLCKSZ];
|
||||||
let file_name = XLogFileName(tli, segno, wal_seg_size);
|
let file_name = XLogFileName(tli, segno, wal_seg_size);
|
||||||
let mut last_valid_rec_pos: usize = 0;
|
let mut last_valid_rec_pos: usize = 0;
|
||||||
let mut file = File::open(data_dir.join(file_name.clone() + ".partial")).unwrap();
|
let file_path = data_dir.join(if is_partial {
|
||||||
let mut rec_hdr = [0u8; XLOG_RECORD_CRC_OFFS];
|
file_name.clone() + ".partial"
|
||||||
|
} else {
|
||||||
|
file_name
|
||||||
|
});
|
||||||
|
let mut file = File::open(&file_path).unwrap();
|
||||||
|
|
||||||
while offs < wal_seg_size {
|
while offs < wal_seg_size {
|
||||||
if offs % XLOG_BLCKSZ == 0 {
|
if offs % XLOG_BLCKSZ == 0 {
|
||||||
@@ -133,13 +139,33 @@ fn find_end_of_wal_segment(
|
|||||||
let xlp_info = LittleEndian::read_u16(&buf[2..4]);
|
let xlp_info = LittleEndian::read_u16(&buf[2..4]);
|
||||||
let xlp_rem_len = LittleEndian::read_u32(&buf[XLP_REM_LEN_OFFS..XLP_REM_LEN_OFFS + 4]);
|
let xlp_rem_len = LittleEndian::read_u32(&buf[XLP_REM_LEN_OFFS..XLP_REM_LEN_OFFS + 4]);
|
||||||
if xlp_magic != XLOG_PAGE_MAGIC as u16 {
|
if xlp_magic != XLOG_PAGE_MAGIC as u16 {
|
||||||
info!("Invalid WAL file {}.partial magic {}", file_name, xlp_magic);
|
info!("Invalid WAL file {:?} magic {}", &file_path, xlp_magic);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if offs == 0 {
|
if offs == 0 {
|
||||||
offs = XLOG_SIZE_OF_XLOG_LONG_PHD;
|
offs = XLOG_SIZE_OF_XLOG_LONG_PHD;
|
||||||
if (xlp_info & XLP_FIRST_IS_CONTRECORD) != 0 {
|
if (xlp_info & XLP_FIRST_IS_CONTRECORD) != 0 {
|
||||||
offs += ((xlp_rem_len + 7) & !7) as usize;
|
if check_contrec {
|
||||||
|
let xl_tot_len = LittleEndian::read_u32(&rec_hdr[0..4]) as usize;
|
||||||
|
contlen = xlp_rem_len as usize;
|
||||||
|
if *rec_offs + contlen < xl_tot_len
|
||||||
|
|| (*rec_offs + contlen != xl_tot_len
|
||||||
|
&& contlen != XLOG_BLCKSZ - XLOG_SIZE_OF_XLOG_LONG_PHD)
|
||||||
|
{
|
||||||
|
info!(
|
||||||
|
"Corrupted continuation record: offs={}, contlen={}, xl_tot_len={}",
|
||||||
|
*rec_offs, contlen, xl_tot_len
|
||||||
|
);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
offs += ((xlp_rem_len + 7) & !7) as usize;
|
||||||
|
}
|
||||||
|
} else if *rec_offs != 0 {
|
||||||
|
// There is incompleted page at previous segment but no cont record:
|
||||||
|
// it means that current segment is not valid and we have to return back.
|
||||||
|
info!("CONTRECORD flag is missed in page header");
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
offs += XLOG_SIZE_OF_XLOG_SHORT_PHD;
|
offs += XLOG_SIZE_OF_XLOG_SHORT_PHD;
|
||||||
@@ -150,9 +176,8 @@ fn find_end_of_wal_segment(
|
|||||||
if xl_tot_len == 0 {
|
if xl_tot_len == 0 {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
last_valid_rec_pos = offs;
|
|
||||||
offs += 4;
|
offs += 4;
|
||||||
rec_offs = 4;
|
*rec_offs = 4;
|
||||||
contlen = xl_tot_len - 4;
|
contlen = xl_tot_len - 4;
|
||||||
rec_hdr[0..4].copy_from_slice(&buf[page_offs..page_offs + 4]);
|
rec_hdr[0..4].copy_from_slice(&buf[page_offs..page_offs + 4]);
|
||||||
} else {
|
} else {
|
||||||
@@ -162,34 +187,33 @@ fn find_end_of_wal_segment(
|
|||||||
|
|
||||||
// read the rest of the record, or as much as fits on this page.
|
// read the rest of the record, or as much as fits on this page.
|
||||||
let n = min(contlen, pageleft);
|
let n = min(contlen, pageleft);
|
||||||
if rec_offs < XLOG_RECORD_CRC_OFFS {
|
let mut hdr_len: usize = 0;
|
||||||
let len = min(XLOG_RECORD_CRC_OFFS - rec_offs, n);
|
if *rec_offs < XLOG_SIZE_OF_XLOG_RECORD {
|
||||||
rec_hdr[rec_offs..rec_offs + len].copy_from_slice(&buf[page_offs..page_offs + len]);
|
// copy header
|
||||||
|
hdr_len = min(XLOG_SIZE_OF_XLOG_RECORD - *rec_offs, n);
|
||||||
|
rec_hdr[*rec_offs..*rec_offs + hdr_len]
|
||||||
|
.copy_from_slice(&buf[page_offs..page_offs + hdr_len]);
|
||||||
}
|
}
|
||||||
if rec_offs <= XLOG_RECORD_CRC_OFFS && rec_offs + n >= XLOG_SIZE_OF_XLOG_RECORD {
|
*crc = crc32c_append(*crc, &buf[page_offs + hdr_len..page_offs + n]);
|
||||||
let crc_offs = page_offs - rec_offs + XLOG_RECORD_CRC_OFFS;
|
*rec_offs += n;
|
||||||
wal_crc = LittleEndian::read_u32(&buf[crc_offs..crc_offs + 4]);
|
|
||||||
crc = crc32c_append(0, &buf[crc_offs + 4..page_offs + n]);
|
|
||||||
crc = !crc;
|
|
||||||
} else {
|
|
||||||
crc ^= 0xFFFFFFFFu32;
|
|
||||||
crc = crc32c_append(crc, &buf[page_offs..page_offs + n]);
|
|
||||||
crc = !crc;
|
|
||||||
}
|
|
||||||
rec_offs += n;
|
|
||||||
offs += n;
|
offs += n;
|
||||||
contlen -= n;
|
contlen -= n;
|
||||||
|
|
||||||
if contlen == 0 {
|
if contlen == 0 {
|
||||||
crc = !crc;
|
*crc = crc32c_append(*crc, &rec_hdr[0..XLOG_RECORD_CRC_OFFS]);
|
||||||
crc = crc32c_append(crc, &rec_hdr);
|
|
||||||
offs = (offs + 7) & !7; // pad on 8 bytes boundary */
|
offs = (offs + 7) & !7; // pad on 8 bytes boundary */
|
||||||
if crc == wal_crc {
|
let wal_crc = LittleEndian::read_u32(
|
||||||
|
&rec_hdr[XLOG_RECORD_CRC_OFFS..XLOG_RECORD_CRC_OFFS + 4],
|
||||||
|
);
|
||||||
|
if *crc == wal_crc {
|
||||||
last_valid_rec_pos = offs;
|
last_valid_rec_pos = offs;
|
||||||
|
// Reset rec_offs and crc for start of new record
|
||||||
|
*rec_offs = 0;
|
||||||
|
*crc = 0;
|
||||||
} else {
|
} else {
|
||||||
info!(
|
info!(
|
||||||
"CRC mismatch {} vs {} at {}",
|
"CRC mismatch {} vs {} at offset {} lsn {}",
|
||||||
crc, wal_crc, last_valid_rec_pos
|
*crc, wal_crc, offs, last_valid_rec_pos
|
||||||
);
|
);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -240,20 +264,142 @@ pub fn find_end_of_wal(
|
|||||||
}
|
}
|
||||||
if high_segno > 0 {
|
if high_segno > 0 {
|
||||||
let mut high_offs = 0;
|
let mut high_offs = 0;
|
||||||
/*
|
if precise {
|
||||||
* Move the starting pointer to the start of the next segment, if the
|
let mut crc: u32 = 0;
|
||||||
* highest one we saw was completed.
|
let mut rec_offs: usize = 0;
|
||||||
*/
|
let mut rec_hdr = [0u8; XLOG_SIZE_OF_XLOG_RECORD];
|
||||||
if !high_ispartial {
|
let wal_dir = data_dir.join("pg_wal");
|
||||||
high_segno += 1;
|
|
||||||
} else if precise {
|
/*
|
||||||
/* otherwise locate last record in last partial segment */
|
* To be able to calculate CRC of records crossing segment boundary,
|
||||||
high_offs = find_end_of_wal_segment(data_dir, high_segno, high_tli, wal_seg_size);
|
* we need to parse previous segments.
|
||||||
|
* So first traverse segments in backward direction to locate record start
|
||||||
|
* and then traverse forward, accumulating CRC.
|
||||||
|
*/
|
||||||
|
let mut prev_segno = high_segno - 1;
|
||||||
|
let mut prev_offs: u32 = 0;
|
||||||
|
while prev_segno > 1 {
|
||||||
|
// TOFO: first segment constains dummy checkpoint record at the beginning
|
||||||
|
prev_offs = find_end_of_wal_segment(
|
||||||
|
data_dir,
|
||||||
|
prev_segno,
|
||||||
|
high_tli,
|
||||||
|
wal_seg_size,
|
||||||
|
false,
|
||||||
|
&mut rec_offs,
|
||||||
|
&mut rec_hdr,
|
||||||
|
&mut crc,
|
||||||
|
false,
|
||||||
|
);
|
||||||
|
if prev_offs != 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
prev_segno -= 1;
|
||||||
|
}
|
||||||
|
if prev_offs != 0 {
|
||||||
|
// found start of WAL record
|
||||||
|
let first_segno = prev_segno;
|
||||||
|
let first_offs = prev_offs;
|
||||||
|
while prev_segno + 1 < high_segno {
|
||||||
|
// now traverse record in forward direction, accumulating CRC
|
||||||
|
prev_segno += 1;
|
||||||
|
prev_offs = find_end_of_wal_segment(
|
||||||
|
data_dir,
|
||||||
|
prev_segno,
|
||||||
|
high_tli,
|
||||||
|
wal_seg_size,
|
||||||
|
false,
|
||||||
|
&mut rec_offs,
|
||||||
|
&mut rec_hdr,
|
||||||
|
&mut crc,
|
||||||
|
true,
|
||||||
|
);
|
||||||
|
if prev_offs == 0 {
|
||||||
|
info!("Segment {} is corrupted", prev_segno,);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if prev_offs != 0 {
|
||||||
|
high_offs = find_end_of_wal_segment(
|
||||||
|
data_dir,
|
||||||
|
high_segno,
|
||||||
|
high_tli,
|
||||||
|
wal_seg_size,
|
||||||
|
high_ispartial,
|
||||||
|
&mut rec_offs,
|
||||||
|
&mut rec_hdr,
|
||||||
|
&mut crc,
|
||||||
|
true,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if high_offs == 0 {
|
||||||
|
// If last segment contais no valid records, then return back
|
||||||
|
info!("Last WAL segment {} contains no valid record, truncate WAL till {} segment",
|
||||||
|
high_segno, first_segno);
|
||||||
|
// Remove last segments containing corrupted WAL record
|
||||||
|
for segno in first_segno + 1..high_segno {
|
||||||
|
let file_name = XLogFileName(high_tli, segno, wal_seg_size);
|
||||||
|
let file_path = wal_dir.join(file_name);
|
||||||
|
if let Err(e) = fs::remove_file(&file_path) {
|
||||||
|
info!("Failed to remove file {:?}: {}", &file_path, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let file_name = XLogFileName(high_tli, high_segno, wal_seg_size);
|
||||||
|
let file_path = if high_ispartial {
|
||||||
|
wal_dir.join(file_name.clone() + ".partial")
|
||||||
|
} else {
|
||||||
|
wal_dir.join(file_name.clone())
|
||||||
|
};
|
||||||
|
if let Err(e) = fs::remove_file(&file_path) {
|
||||||
|
info!("Failed to remove file {:?}: {}", &file_path, e);
|
||||||
|
}
|
||||||
|
high_ispartial = false; // previous segment should not be partial
|
||||||
|
high_segno = first_segno;
|
||||||
|
high_offs = first_offs;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// failed to locate previous segment
|
||||||
|
assert!(prev_segno <= 1);
|
||||||
|
high_offs = find_end_of_wal_segment(
|
||||||
|
data_dir,
|
||||||
|
high_segno,
|
||||||
|
high_tli,
|
||||||
|
wal_seg_size,
|
||||||
|
high_ispartial,
|
||||||
|
&mut rec_offs,
|
||||||
|
&mut rec_hdr,
|
||||||
|
&mut crc,
|
||||||
|
false,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// If last segment is not marked as partial, it means that next segment
|
||||||
|
// was not written. Let's make this segment partial once again.
|
||||||
|
if !high_ispartial {
|
||||||
|
let file_name = XLogFileName(high_tli, high_segno, wal_seg_size);
|
||||||
|
if let Err(e) = fs::rename(
|
||||||
|
wal_dir.join(file_name.clone()),
|
||||||
|
wal_dir.join(file_name.clone() + ".partial"),
|
||||||
|
) {
|
||||||
|
info!(
|
||||||
|
"Failed to rename {} to {}.partial: {}",
|
||||||
|
&file_name, &file_name, e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
/*
|
||||||
|
* Move the starting pointer to the start of the next segment, if the
|
||||||
|
* highest one we saw was completed.
|
||||||
|
*/
|
||||||
|
if !high_ispartial {
|
||||||
|
high_segno += 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
let high_ptr = XLogSegNoOffsetToRecPtr(high_segno, high_offs, wal_seg_size);
|
let high_ptr = XLogSegNoOffsetToRecPtr(high_segno, high_offs, wal_seg_size);
|
||||||
return (high_ptr, high_tli);
|
return (high_ptr, high_tli);
|
||||||
}
|
}
|
||||||
(0, 0)
|
(0, 1) // First timeline is 1
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn main() {
|
pub fn main() {
|
||||||
@@ -469,7 +615,7 @@ mod tests {
|
|||||||
let (wal_end, tli) = find_end_of_wal(&wal_dir, wal_seg_size, true);
|
let (wal_end, tli) = find_end_of_wal(&wal_dir, wal_seg_size, true);
|
||||||
let wal_end = Lsn(wal_end);
|
let wal_end = Lsn(wal_end);
|
||||||
println!("wal_end={}, tli={}", wal_end, tli);
|
println!("wal_end={}, tli={}", wal_end, tli);
|
||||||
assert_eq!(wal_end, "0/2000000".parse::<Lsn>().unwrap());
|
assert_eq!(wal_end, "0/1699D10".parse::<Lsn>().unwrap());
|
||||||
|
|
||||||
// 4. Get the actual end of WAL by pg_waldump
|
// 4. Get the actual end of WAL by pg_waldump
|
||||||
let waldump_path = top_path.join("tmp_install/bin/pg_waldump");
|
let waldump_path = top_path.join("tmp_install/bin/pg_waldump");
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ pytest_plugins = ("fixtures.zenith_fixtures")
|
|||||||
#
|
#
|
||||||
# Test restarting and recreating a postgres instance
|
# Test restarting and recreating a postgres instance
|
||||||
#
|
#
|
||||||
@pytest.mark.parametrize('with_wal_acceptors', [False, True])
|
@pytest.mark.parametrize('with_wal_acceptors', [True, False])
|
||||||
def test_restart_compute(
|
def test_restart_compute(
|
||||||
zenith_cli,
|
zenith_cli,
|
||||||
pageserver: ZenithPageserver,
|
pageserver: ZenithPageserver,
|
||||||
@@ -31,31 +31,56 @@ def test_restart_compute(
|
|||||||
|
|
||||||
with closing(pg.connect()) as conn:
|
with closing(pg.connect()) as conn:
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
# Create table, and insert a row
|
cur.execute('CREATE TABLE t(key int primary key, value text)')
|
||||||
cur.execute('CREATE TABLE foo (t text)')
|
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
|
||||||
cur.execute("INSERT INTO foo VALUES ('bar')")
|
cur.execute('SELECT sum(key) FROM t')
|
||||||
|
r = cur.fetchone()
|
||||||
|
assert r == (5000050000, )
|
||||||
|
print("res = ", r)
|
||||||
|
|
||||||
# Stop and restart the Postgres instance
|
# Remove data directory and restart
|
||||||
pg.stop_and_destroy().create_start('test_restart_compute',
|
pg.stop_and_destroy().create_start('test_restart_compute',
|
||||||
wal_acceptors=wal_acceptor_connstrs)
|
wal_acceptors=wal_acceptor_connstrs)
|
||||||
|
|
||||||
|
|
||||||
with closing(pg.connect()) as conn:
|
with closing(pg.connect()) as conn:
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
# We can still see the row
|
# We can still see the row
|
||||||
cur.execute('SELECT count(*) FROM foo')
|
cur.execute('SELECT sum(key) FROM t')
|
||||||
assert cur.fetchone() == (1, )
|
r = cur.fetchone()
|
||||||
|
assert r == (5000050000, )
|
||||||
|
print("res = ", r)
|
||||||
|
|
||||||
# Insert another row
|
# Insert another row
|
||||||
cur.execute("INSERT INTO foo VALUES ('bar2')")
|
cur.execute("INSERT INTO t VALUES (100001, 'payload2')")
|
||||||
cur.execute('SELECT count(*) FROM foo')
|
cur.execute('SELECT count(*) FROM t')
|
||||||
assert cur.fetchone() == (2, )
|
|
||||||
|
|
||||||
# Stop, and destroy the Postgres instance. Then recreate and restart it.
|
r = cur.fetchone()
|
||||||
|
assert r == (100001, )
|
||||||
|
print("res = ", r)
|
||||||
|
|
||||||
|
# Again remove data directory and restart
|
||||||
pg.stop_and_destroy().create_start('test_restart_compute',
|
pg.stop_and_destroy().create_start('test_restart_compute',
|
||||||
wal_acceptors=wal_acceptor_connstrs)
|
wal_acceptors=wal_acceptor_connstrs)
|
||||||
|
|
||||||
with closing(pg.connect()) as conn:
|
with closing(pg.connect()) as conn:
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
# We can still see the rows
|
# We can still see the rows
|
||||||
cur.execute('SELECT count(*) FROM foo')
|
cur.execute('SELECT count(*) FROM t')
|
||||||
assert cur.fetchone() == (2, )
|
|
||||||
|
r = cur.fetchone()
|
||||||
|
assert r == (100001, )
|
||||||
|
print("res = ", r)
|
||||||
|
|
||||||
|
# And again remove data directory and restart
|
||||||
|
pg.stop_and_destroy().create_start('test_restart_compute',
|
||||||
|
wal_acceptors=wal_acceptor_connstrs)
|
||||||
|
|
||||||
|
with closing(pg.connect()) as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
# We can still see the rows
|
||||||
|
cur.execute('SELECT count(*) FROM t')
|
||||||
|
|
||||||
|
r = cur.fetchone()
|
||||||
|
assert r == (100001, )
|
||||||
|
print("res = ", r)
|
||||||
|
|||||||
@@ -249,10 +249,68 @@ def pageserver(zenith_cli: ZenithCli) -> Iterator[ZenithPageserver]:
|
|||||||
print('Starting pageserver cleanup')
|
print('Starting pageserver cleanup')
|
||||||
ps.stop()
|
ps.stop()
|
||||||
|
|
||||||
|
class PgBin:
|
||||||
|
""" A helper class for executing postgres binaries """
|
||||||
|
def __init__(self, log_dir: str, pg_distrib_dir: str):
|
||||||
|
self.log_dir = log_dir
|
||||||
|
self.pg_install_path = pg_distrib_dir
|
||||||
|
self.pg_bin_path = os.path.join(self.pg_install_path, 'bin')
|
||||||
|
self.env = os.environ.copy()
|
||||||
|
self.env['LD_LIBRARY_PATH'] = os.path.join(self.pg_install_path, 'lib')
|
||||||
|
|
||||||
|
def _fixpath(self, command: List[str]) -> None:
|
||||||
|
if '/' not in command[0]:
|
||||||
|
command[0] = os.path.join(self.pg_bin_path, command[0])
|
||||||
|
|
||||||
|
def _build_env(self, env_add: Optional[Env]) -> Env:
|
||||||
|
if env_add is None:
|
||||||
|
return self.env
|
||||||
|
env = self.env.copy()
|
||||||
|
env.update(env_add)
|
||||||
|
return env
|
||||||
|
|
||||||
|
def run(self, command: List[str], env: Optional[Env] = None, cwd: Optional[str] = None) -> None:
|
||||||
|
"""
|
||||||
|
Run one of the postgres binaries.
|
||||||
|
|
||||||
|
The command should be in list form, e.g. ['pgbench', '-p', '55432']
|
||||||
|
|
||||||
|
All the necessary environment variables will be set.
|
||||||
|
|
||||||
|
If the first argument (the command name) doesn't include a path (no '/'
|
||||||
|
characters present), then it will be edited to include the correct path.
|
||||||
|
|
||||||
|
If you want stdout/stderr captured to files, use `run_capture` instead.
|
||||||
|
"""
|
||||||
|
|
||||||
|
self._fixpath(command)
|
||||||
|
print('Running command "{}"'.format(' '.join(command)))
|
||||||
|
env = self._build_env(env)
|
||||||
|
subprocess.run(command, env=env, cwd=cwd, check=True)
|
||||||
|
|
||||||
|
def run_capture(self,
|
||||||
|
command: List[str],
|
||||||
|
env: Optional[Env] = None,
|
||||||
|
cwd: Optional[str] = None) -> None:
|
||||||
|
"""
|
||||||
|
Run one of the postgres binaries, with stderr and stdout redirected to a file.
|
||||||
|
|
||||||
|
This is just like `run`, but for chatty programs.
|
||||||
|
"""
|
||||||
|
|
||||||
|
self._fixpath(command)
|
||||||
|
print('Running command "{}"'.format(' '.join(command)))
|
||||||
|
env = self._build_env(env)
|
||||||
|
subprocess_capture(self.log_dir, command, env=env, cwd=cwd, check=True)
|
||||||
|
|
||||||
|
|
||||||
|
@zenfixture
|
||||||
|
def pg_bin(test_output_dir: str, pg_distrib_dir: str) -> PgBin:
|
||||||
|
return PgBin(test_output_dir, pg_distrib_dir)
|
||||||
|
|
||||||
class Postgres(PgProtocol):
|
class Postgres(PgProtocol):
|
||||||
""" An object representing a running postgres daemon. """
|
""" An object representing a running postgres daemon. """
|
||||||
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, tenant_id: str, port: int):
|
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, pg_bin: PgBin, tenant_id: str, port: int):
|
||||||
super().__init__(host='localhost', port=port)
|
super().__init__(host='localhost', port=port)
|
||||||
|
|
||||||
self.zenith_cli = zenith_cli
|
self.zenith_cli = zenith_cli
|
||||||
@@ -260,6 +318,7 @@ class Postgres(PgProtocol):
|
|||||||
self.repo_dir = repo_dir
|
self.repo_dir = repo_dir
|
||||||
self.branch: Optional[str] = None # dubious, see asserts below
|
self.branch: Optional[str] = None # dubious, see asserts below
|
||||||
self.tenant_id = tenant_id
|
self.tenant_id = tenant_id
|
||||||
|
self.pg_bin = pg_bin
|
||||||
# path to conf is <repo_dir>/pgdatadirs/tenants/<tenant_id>/<branch_name>/postgresql.conf
|
# path to conf is <repo_dir>/pgdatadirs/tenants/<tenant_id>/<branch_name>/postgresql.conf
|
||||||
|
|
||||||
def create(
|
def create(
|
||||||
@@ -299,27 +358,32 @@ class Postgres(PgProtocol):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
assert self.branch is not None
|
assert self.branch is not None
|
||||||
|
|
||||||
|
print(f"Starting postgres on brach {self.branch}")
|
||||||
|
|
||||||
self.zenith_cli.run(['pg', 'start', self.branch, f'--tenantid={self.tenant_id}'])
|
self.zenith_cli.run(['pg', 'start', self.branch, f'--tenantid={self.tenant_id}'])
|
||||||
self.running = True
|
self.running = True
|
||||||
|
|
||||||
|
self.pg_bin.run(['pg_controldata', self.pg_data_dir_path()])
|
||||||
|
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
def pg_data_dir_path(self) -> str:
|
||||||
|
""" Path to data directory """
|
||||||
|
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch
|
||||||
|
return os.path.join(self.repo_dir, path)
|
||||||
|
|
||||||
def pg_xact_dir_path(self) -> str:
|
def pg_xact_dir_path(self) -> str:
|
||||||
""" Path to pg_xact dir """
|
""" Path to pg_xact dir """
|
||||||
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch / 'pg_xact'
|
return os.path.join(self.pg_data_dir_path(), 'pg_xact')
|
||||||
return os.path.join(self.repo_dir, path)
|
|
||||||
|
|
||||||
def pg_twophase_dir_path(self) -> str:
|
def pg_twophase_dir_path(self) -> str:
|
||||||
""" Path to pg_twophase dir """
|
""" Path to pg_twophase dir """
|
||||||
print(self.tenant_id)
|
return os.path.join(self.pg_data_dir_path(), 'pg_twophase')
|
||||||
print(self.branch)
|
|
||||||
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch / 'pg_twophase'
|
|
||||||
return os.path.join(self.repo_dir, path)
|
|
||||||
|
|
||||||
def config_file_path(self) -> str:
|
def config_file_path(self) -> str:
|
||||||
""" Path to postgresql.conf """
|
""" Path to postgresql.conf """
|
||||||
filename = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch / 'postgresql.conf'
|
return os.path.join(self.pg_data_dir_path(), 'postgresql.conf')
|
||||||
return os.path.join(self.repo_dir, filename)
|
|
||||||
|
|
||||||
def adjust_for_wal_acceptors(self, wal_acceptors: str) -> 'Postgres':
|
def adjust_for_wal_acceptors(self, wal_acceptors: str) -> 'Postgres':
|
||||||
"""
|
"""
|
||||||
@@ -411,13 +475,14 @@ class Postgres(PgProtocol):
|
|||||||
|
|
||||||
class PostgresFactory:
|
class PostgresFactory:
|
||||||
""" An object representing multiple running postgres daemons. """
|
""" An object representing multiple running postgres daemons. """
|
||||||
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, initial_tenant: str, base_port: int = 55431):
|
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, pg_bin: PgBin, initial_tenant: str, base_port: int = 55431):
|
||||||
self.zenith_cli = zenith_cli
|
self.zenith_cli = zenith_cli
|
||||||
self.repo_dir = repo_dir
|
self.repo_dir = repo_dir
|
||||||
self.num_instances = 0
|
self.num_instances = 0
|
||||||
self.instances: List[Postgres] = []
|
self.instances: List[Postgres] = []
|
||||||
self.initial_tenant: str = initial_tenant
|
self.initial_tenant: str = initial_tenant
|
||||||
self.base_port = base_port
|
self.base_port = base_port
|
||||||
|
self.pg_bin = pg_bin
|
||||||
|
|
||||||
def create_start(
|
def create_start(
|
||||||
self,
|
self,
|
||||||
@@ -430,6 +495,7 @@ class PostgresFactory:
|
|||||||
pg = Postgres(
|
pg = Postgres(
|
||||||
zenith_cli=self.zenith_cli,
|
zenith_cli=self.zenith_cli,
|
||||||
repo_dir=self.repo_dir,
|
repo_dir=self.repo_dir,
|
||||||
|
pg_bin=self.pg_bin,
|
||||||
tenant_id=tenant_id or self.initial_tenant,
|
tenant_id=tenant_id or self.initial_tenant,
|
||||||
port=self.base_port + self.num_instances + 1,
|
port=self.base_port + self.num_instances + 1,
|
||||||
)
|
)
|
||||||
@@ -503,8 +569,8 @@ def initial_tenant(pageserver: ZenithPageserver):
|
|||||||
|
|
||||||
|
|
||||||
@zenfixture
|
@zenfixture
|
||||||
def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str) -> Iterator[PostgresFactory]:
|
def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str, pg_bin: PgBin) -> Iterator[PostgresFactory]:
|
||||||
pgfactory = PostgresFactory(zenith_cli, repo_dir, initial_tenant=initial_tenant)
|
pgfactory = PostgresFactory(zenith_cli, repo_dir, pg_bin, initial_tenant=initial_tenant)
|
||||||
|
|
||||||
yield pgfactory
|
yield pgfactory
|
||||||
|
|
||||||
@@ -512,67 +578,6 @@ def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str) -> Itera
|
|||||||
print('Starting postgres cleanup')
|
print('Starting postgres cleanup')
|
||||||
pgfactory.stop_all()
|
pgfactory.stop_all()
|
||||||
|
|
||||||
|
|
||||||
class PgBin:
|
|
||||||
""" A helper class for executing postgres binaries """
|
|
||||||
def __init__(self, log_dir: str, pg_distrib_dir: str):
|
|
||||||
self.log_dir = log_dir
|
|
||||||
self.pg_install_path = pg_distrib_dir
|
|
||||||
self.pg_bin_path = os.path.join(self.pg_install_path, 'bin')
|
|
||||||
self.env = os.environ.copy()
|
|
||||||
self.env['LD_LIBRARY_PATH'] = os.path.join(self.pg_install_path, 'lib')
|
|
||||||
|
|
||||||
def _fixpath(self, command: List[str]) -> None:
|
|
||||||
if '/' not in command[0]:
|
|
||||||
command[0] = os.path.join(self.pg_bin_path, command[0])
|
|
||||||
|
|
||||||
def _build_env(self, env_add: Optional[Env]) -> Env:
|
|
||||||
if env_add is None:
|
|
||||||
return self.env
|
|
||||||
env = self.env.copy()
|
|
||||||
env.update(env_add)
|
|
||||||
return env
|
|
||||||
|
|
||||||
def run(self, command: List[str], env: Optional[Env] = None, cwd: Optional[str] = None) -> None:
|
|
||||||
"""
|
|
||||||
Run one of the postgres binaries.
|
|
||||||
|
|
||||||
The command should be in list form, e.g. ['pgbench', '-p', '55432']
|
|
||||||
|
|
||||||
All the necessary environment variables will be set.
|
|
||||||
|
|
||||||
If the first argument (the command name) doesn't include a path (no '/'
|
|
||||||
characters present), then it will be edited to include the correct path.
|
|
||||||
|
|
||||||
If you want stdout/stderr captured to files, use `run_capture` instead.
|
|
||||||
"""
|
|
||||||
|
|
||||||
self._fixpath(command)
|
|
||||||
print('Running command "{}"'.format(' '.join(command)))
|
|
||||||
env = self._build_env(env)
|
|
||||||
subprocess.run(command, env=env, cwd=cwd, check=True)
|
|
||||||
|
|
||||||
def run_capture(self,
|
|
||||||
command: List[str],
|
|
||||||
env: Optional[Env] = None,
|
|
||||||
cwd: Optional[str] = None) -> None:
|
|
||||||
"""
|
|
||||||
Run one of the postgres binaries, with stderr and stdout redirected to a file.
|
|
||||||
|
|
||||||
This is just like `run`, but for chatty programs.
|
|
||||||
"""
|
|
||||||
|
|
||||||
self._fixpath(command)
|
|
||||||
print('Running command "{}"'.format(' '.join(command)))
|
|
||||||
env = self._build_env(env)
|
|
||||||
subprocess_capture(self.log_dir, command, env=env, cwd=cwd, check=True)
|
|
||||||
|
|
||||||
|
|
||||||
@zenfixture
|
|
||||||
def pg_bin(test_output_dir: str, pg_distrib_dir: str) -> PgBin:
|
|
||||||
return PgBin(test_output_dir, pg_distrib_dir)
|
|
||||||
|
|
||||||
|
|
||||||
def read_pid(path: Path):
|
def read_pid(path: Path):
|
||||||
""" Read content of file into number """
|
""" Read content of file into number """
|
||||||
return int(path.read_text())
|
return int(path.read_text())
|
||||||
|
|||||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: e3175fe60a...9932d259be
@@ -27,6 +27,7 @@ use crate::replication::HotStandbyFeedback;
|
|||||||
use crate::send_wal::SendWalHandler;
|
use crate::send_wal::SendWalHandler;
|
||||||
use crate::timeline::{Timeline, TimelineTools};
|
use crate::timeline::{Timeline, TimelineTools};
|
||||||
use crate::WalAcceptorConf;
|
use crate::WalAcceptorConf;
|
||||||
|
use pageserver::waldecoder::WalStreamDecoder;
|
||||||
use postgres_ffi::xlog_utils::{TimeLineID, XLogFileName, MAX_SEND_SIZE, XLOG_BLCKSZ};
|
use postgres_ffi::xlog_utils::{TimeLineID, XLogFileName, MAX_SEND_SIZE, XLOG_BLCKSZ};
|
||||||
|
|
||||||
pub const SK_MAGIC: u32 = 0xcafeceefu32;
|
pub const SK_MAGIC: u32 = 0xcafeceefu32;
|
||||||
@@ -300,8 +301,8 @@ impl<'pg> ReceiveWalConn<'pg> {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
my_info.server.node_id = prop.node_id;
|
my_info.server.node_id = prop.node_id;
|
||||||
this_timeline.get().set_info(&my_info);
|
|
||||||
/* Need to persist our vote first */
|
/* Need to persist our vote first */
|
||||||
|
this_timeline.get().set_info(&my_info);
|
||||||
this_timeline.get().save_control_file(true)?;
|
this_timeline.get().save_control_file(true)?;
|
||||||
|
|
||||||
let mut flushed_restart_lsn = Lsn(0);
|
let mut flushed_restart_lsn = Lsn(0);
|
||||||
@@ -322,9 +323,11 @@ impl<'pg> ReceiveWalConn<'pg> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"Start streaming from timeline {} tenant {} address {:?} flush_lsn={}",
|
"Start accepting WAL for timeline {} tenant {} address {:?} flush_lsn={}",
|
||||||
server_info.timeline_id, server_info.tenant_id, self.peer_addr, my_info.flush_lsn
|
server_info.timeline_id, server_info.tenant_id, self.peer_addr, my_info.flush_lsn,
|
||||||
);
|
);
|
||||||
|
let mut last_rec_lsn = Lsn(0);
|
||||||
|
let mut decoder = WalStreamDecoder::new(last_rec_lsn, false);
|
||||||
|
|
||||||
// Main loop
|
// Main loop
|
||||||
loop {
|
loop {
|
||||||
@@ -347,27 +350,52 @@ impl<'pg> ReceiveWalConn<'pg> {
|
|||||||
let end_pos = req.end_lsn;
|
let end_pos = req.end_lsn;
|
||||||
let rec_size = end_pos.checked_sub(start_pos).unwrap().0 as usize;
|
let rec_size = end_pos.checked_sub(start_pos).unwrap().0 as usize;
|
||||||
assert!(rec_size <= MAX_SEND_SIZE);
|
assert!(rec_size <= MAX_SEND_SIZE);
|
||||||
|
if rec_size != 0 {
|
||||||
|
debug!(
|
||||||
|
"received for {} bytes between {} and {}",
|
||||||
|
rec_size, start_pos, end_pos,
|
||||||
|
);
|
||||||
|
|
||||||
debug!(
|
/* Receive message body (from the rest of the message) */
|
||||||
"received for {} bytes between {} and {}",
|
let mut buf = Vec::with_capacity(rec_size);
|
||||||
rec_size, start_pos, end_pos,
|
msg_reader.read_to_end(&mut buf)?;
|
||||||
);
|
assert_eq!(buf.len(), rec_size);
|
||||||
|
|
||||||
/* Receive message body (from the rest of the message) */
|
if decoder.available() != start_pos {
|
||||||
let mut buf = Vec::with_capacity(rec_size);
|
info!(
|
||||||
msg_reader.read_to_end(&mut buf)?;
|
"Restart decoder from {} to {}",
|
||||||
assert_eq!(buf.len(), rec_size);
|
decoder.available(),
|
||||||
|
start_pos
|
||||||
/* Save message in file */
|
);
|
||||||
Self::write_wal_file(
|
decoder = WalStreamDecoder::new(start_pos, false);
|
||||||
swh,
|
}
|
||||||
start_pos,
|
decoder.feed_bytes(&buf);
|
||||||
timeline_id,
|
loop {
|
||||||
this_timeline.get(),
|
match decoder.poll_decode() {
|
||||||
wal_seg_size,
|
Err(e) => info!("Decode error {}", e),
|
||||||
&buf,
|
Ok(None) => {},
|
||||||
)?;
|
Ok(Some((lsn, _rec))) => {
|
||||||
|
last_rec_lsn = lsn;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
info!(
|
||||||
|
"Receive WAL {}..{} last_rec_lsn={}",
|
||||||
|
start_pos, end_pos, last_rec_lsn
|
||||||
|
);
|
||||||
|
|
||||||
|
/* Save message in file */
|
||||||
|
Self::write_wal_file(
|
||||||
|
swh,
|
||||||
|
start_pos,
|
||||||
|
timeline_id,
|
||||||
|
this_timeline.get(),
|
||||||
|
wal_seg_size,
|
||||||
|
&buf,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
my_info.restart_lsn = req.restart_lsn;
|
my_info.restart_lsn = req.restart_lsn;
|
||||||
my_info.commit_lsn = req.commit_lsn;
|
my_info.commit_lsn = req.commit_lsn;
|
||||||
|
|
||||||
@@ -377,13 +405,13 @@ impl<'pg> ReceiveWalConn<'pg> {
|
|||||||
* maximum (vcl) determined by WAL proposer during handshake.
|
* maximum (vcl) determined by WAL proposer during handshake.
|
||||||
* Switching epoch means that node completes recovery and start writing in the WAL new data.
|
* Switching epoch means that node completes recovery and start writing in the WAL new data.
|
||||||
*/
|
*/
|
||||||
if my_info.epoch < prop.epoch && end_pos > max(my_info.flush_lsn, prop.vcl) {
|
if my_info.epoch < prop.epoch && end_pos >= max(my_info.flush_lsn, prop.vcl) {
|
||||||
info!("Switch to new epoch {}", prop.epoch);
|
info!("Switch to new epoch {}", prop.epoch);
|
||||||
my_info.epoch = prop.epoch; /* bump epoch */
|
my_info.epoch = prop.epoch; /* bump epoch */
|
||||||
sync_control_file = true;
|
sync_control_file = true;
|
||||||
}
|
}
|
||||||
if end_pos > my_info.flush_lsn {
|
if last_rec_lsn > my_info.flush_lsn {
|
||||||
my_info.flush_lsn = end_pos;
|
my_info.flush_lsn = last_rec_lsn;
|
||||||
}
|
}
|
||||||
/*
|
/*
|
||||||
* Update restart LSN in control file.
|
* Update restart LSN in control file.
|
||||||
@@ -391,6 +419,7 @@ impl<'pg> ReceiveWalConn<'pg> {
|
|||||||
* when restart_lsn delta exceeds WAL segment size.
|
* when restart_lsn delta exceeds WAL segment size.
|
||||||
*/
|
*/
|
||||||
sync_control_file |= flushed_restart_lsn + (wal_seg_size as u64) < my_info.restart_lsn;
|
sync_control_file |= flushed_restart_lsn + (wal_seg_size as u64) < my_info.restart_lsn;
|
||||||
|
this_timeline.get().set_info(&my_info);
|
||||||
this_timeline.get().save_control_file(sync_control_file)?;
|
this_timeline.get().save_control_file(sync_control_file)?;
|
||||||
|
|
||||||
if sync_control_file {
|
if sync_control_file {
|
||||||
@@ -401,7 +430,7 @@ impl<'pg> ReceiveWalConn<'pg> {
|
|||||||
//info!("Confirm LSN: {:X}/{:>08X}", (end_pos>>32) as u32, end_pos as u32);
|
//info!("Confirm LSN: {:X}/{:>08X}", (end_pos>>32) as u32, end_pos as u32);
|
||||||
let resp = SafeKeeperResponse {
|
let resp = SafeKeeperResponse {
|
||||||
epoch: my_info.epoch,
|
epoch: my_info.epoch,
|
||||||
flush_lsn: end_pos,
|
flush_lsn: my_info.flush_lsn,
|
||||||
hs_feedback: this_timeline.get().get_hs_feedback(),
|
hs_feedback: this_timeline.get().get_hs_feedback(),
|
||||||
};
|
};
|
||||||
self.write_msg(&resp)?;
|
self.write_msg(&resp)?;
|
||||||
@@ -410,9 +439,15 @@ impl<'pg> ReceiveWalConn<'pg> {
|
|||||||
* Ping wal sender that new data is available.
|
* Ping wal sender that new data is available.
|
||||||
* FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper.
|
* FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper.
|
||||||
*/
|
*/
|
||||||
|
trace!(
|
||||||
|
"Notify WAL senders min({}, {})={}",
|
||||||
|
req.commit_lsn,
|
||||||
|
my_info.flush_lsn,
|
||||||
|
min(req.commit_lsn, my_info.flush_lsn)
|
||||||
|
);
|
||||||
this_timeline
|
this_timeline
|
||||||
.get()
|
.get()
|
||||||
.notify_wal_senders(min(req.commit_lsn, end_pos));
|
.notify_wal_senders(min(req.commit_lsn, my_info.flush_lsn));
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
@@ -76,8 +76,10 @@ impl ReplicationConn {
|
|||||||
let feedback = HotStandbyFeedback::des(&m)?;
|
let feedback = HotStandbyFeedback::des(&m)?;
|
||||||
subscriber.add_hs_feedback(feedback);
|
subscriber.add_hs_feedback(feedback);
|
||||||
}
|
}
|
||||||
|
FeMessage::Sync => {}
|
||||||
|
FeMessage::CopyFailed => return Err(anyhow!("Copy failed")),
|
||||||
_ => {
|
_ => {
|
||||||
// We only handle `CopyData` messages. Anything else is ignored.
|
// We only handle `CopyData`, 'Sync', 'CopyFailed' messages. Anything else is ignored.
|
||||||
info!("unexpected message {:?}", msg);
|
info!("unexpected message {:?}", msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -215,9 +217,14 @@ impl ReplicationConn {
|
|||||||
data: &file_buf,
|
data: &file_buf,
|
||||||
}))?;
|
}))?;
|
||||||
|
|
||||||
start_pos += send_size as u64;
|
debug!(
|
||||||
|
"Sent WAL to page server {}..{}, end_pos={}",
|
||||||
|
start_pos,
|
||||||
|
start_pos + send_size as u64,
|
||||||
|
end_pos
|
||||||
|
);
|
||||||
|
|
||||||
debug!("Sent WAL to page server up to {}", end_pos);
|
start_pos += send_size as u64;
|
||||||
|
|
||||||
// Decide whether to reuse this file. If we don't set wal_file here
|
// Decide whether to reuse this file. If we don't set wal_file here
|
||||||
// a new file will be opened next time.
|
// a new file will be opened next time.
|
||||||
|
|||||||
@@ -175,7 +175,7 @@ impl Timeline {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn _stop_wal_senders(&self) {
|
pub fn stop_wal_senders(&self) {
|
||||||
self.notify_wal_senders(END_REPLICATION_MARKER);
|
self.notify_wal_senders(END_REPLICATION_MARKER);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -24,6 +24,11 @@ impl Lsn {
|
|||||||
/// Maximum possible value for an LSN
|
/// Maximum possible value for an LSN
|
||||||
pub const MAX: Lsn = Lsn(u64::MAX);
|
pub const MAX: Lsn = Lsn(u64::MAX);
|
||||||
|
|
||||||
|
/// Align LSN on 8-byte boundary (alignment of WAL records).
|
||||||
|
pub fn align(&self) -> Lsn {
|
||||||
|
Lsn((self.0 + 7) & !7)
|
||||||
|
}
|
||||||
|
|
||||||
/// Subtract a number, returning None on overflow.
|
/// Subtract a number, returning None on overflow.
|
||||||
pub fn checked_sub<T: Into<u64>>(self, other: T) -> Option<Lsn> {
|
pub fn checked_sub<T: Into<u64>>(self, other: T) -> Option<Lsn> {
|
||||||
let other: u64 = other.into();
|
let other: u64 = other.into();
|
||||||
|
|||||||
@@ -341,7 +341,7 @@ impl PostgresBackend {
|
|||||||
|
|
||||||
// We prefer explicit pattern matching to wildcards, because
|
// We prefer explicit pattern matching to wildcards, because
|
||||||
// this helps us spot the places where new variants are missing
|
// this helps us spot the places where new variants are missing
|
||||||
FeMessage::CopyData(_) | FeMessage::CopyDone => {
|
FeMessage::CopyData(_) | FeMessage::CopyDone | FeMessage::CopyFailed => {
|
||||||
bail!("unexpected message type: {:?}", msg);
|
bail!("unexpected message type: {:?}", msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -31,6 +31,7 @@ pub enum FeMessage {
|
|||||||
Terminate,
|
Terminate,
|
||||||
CopyData(Bytes),
|
CopyData(Bytes),
|
||||||
CopyDone,
|
CopyDone,
|
||||||
|
CopyFailed,
|
||||||
PasswordMessage(Bytes),
|
PasswordMessage(Bytes),
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -138,6 +139,7 @@ impl FeMessage {
|
|||||||
b'X' => Ok(Some(FeMessage::Terminate)),
|
b'X' => Ok(Some(FeMessage::Terminate)),
|
||||||
b'd' => Ok(Some(FeMessage::CopyData(body))),
|
b'd' => Ok(Some(FeMessage::CopyData(body))),
|
||||||
b'c' => Ok(Some(FeMessage::CopyDone)),
|
b'c' => Ok(Some(FeMessage::CopyDone)),
|
||||||
|
b'f' => Ok(Some(FeMessage::CopyFailed)),
|
||||||
b'p' => Ok(Some(FeMessage::PasswordMessage(body))),
|
b'p' => Ok(Some(FeMessage::PasswordMessage(body))),
|
||||||
tag => Err(anyhow!("unknown message tag: {},'{:?}'", tag, body)),
|
tag => Err(anyhow!("unknown message tag: {},'{:?}'", tag, body)),
|
||||||
}
|
}
|
||||||
@@ -338,6 +340,7 @@ pub enum BeMessage<'a> {
|
|||||||
ControlFile,
|
ControlFile,
|
||||||
CopyData(&'a [u8]),
|
CopyData(&'a [u8]),
|
||||||
CopyDone,
|
CopyDone,
|
||||||
|
CopyFailed,
|
||||||
CopyInResponse,
|
CopyInResponse,
|
||||||
CopyOutResponse,
|
CopyOutResponse,
|
||||||
CopyBothResponse,
|
CopyBothResponse,
|
||||||
@@ -546,6 +549,11 @@ impl<'a> BeMessage<'a> {
|
|||||||
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
|
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
BeMessage::CopyFailed => {
|
||||||
|
buf.put_u8(b'f');
|
||||||
|
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
BeMessage::CopyInResponse => {
|
BeMessage::CopyInResponse => {
|
||||||
buf.put_u8(b'G');
|
buf.put_u8(b'G');
|
||||||
write_body(buf, |buf| {
|
write_body(buf, |buf| {
|
||||||
|
|||||||
Reference in New Issue
Block a user