Make flush_lsn reported by safekeepers point to record boundary.

Otherwise we produce corrupted record holes in WAL during compute node restart
in case there was an unfinished record from the old compute, as these reports
advance commit_lsn -- reliably persisted part of WAL.

ref #549.

Mostly by @knizhnik. I adjusted to make sure proposer always starts streaming
since record beginning so we don't need special quirks for decoding in
safekeeper.
This commit is contained in:
Arseny Sher
2021-09-09 17:56:33 +03:00
committed by arssher
parent 7c62a57e54
commit 0aec60938a
4 changed files with 93 additions and 51 deletions

View File

@@ -3,7 +3,7 @@ use std::io::Write;
use std::net::SocketAddr;
use std::net::TcpStream;
use std::os::unix::fs::PermissionsExt;
use std::process::Command;
use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
@@ -11,7 +11,6 @@ use std::{collections::BTreeMap, path::PathBuf};
use anyhow::{Context, Result};
use lazy_static::lazy_static;
use postgres_ffi::pg_constants;
use regex::Regex;
use zenith_utils::connstring::connection_host_port;
use zenith_utils::lsn::Lsn;
@@ -229,18 +228,24 @@ impl PostgresNode {
fn sync_walkeepers(&self) -> Result<Lsn> {
let pg_path = self.env.pg_bin_dir().join("postgres");
let sync_output = Command::new(pg_path)
let sync_handle = Command::new(pg_path)
.arg("--sync-safekeepers")
.env_clear()
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("PGDATA", self.pgdata().to_str().unwrap())
.output()
.with_context(|| "sync-walkeepers failed")?;
.stdout(Stdio::piped())
// Comment this to avoid capturing stderr (useful if command hangs)
.stderr(Stdio::piped())
.spawn()
.expect("postgres --sync-safekeepers failed to start");
let sync_output = sync_handle
.wait_with_output()
.expect("postgres --sync-safekeepers failed");
if !sync_output.status.success() {
anyhow::bail!(
"sync-walkeepers failed: '{}'",
"sync-safekeepers failed: '{}'",
String::from_utf8_lossy(&sync_output.stderr)
);
}
@@ -373,12 +378,12 @@ impl PostgresNode {
fn load_basebackup(&self) -> Result<()> {
let lsn = if self.uses_wal_proposer {
// LSN WAL_SEGMENT_SIZE means that it is bootstrap and we need to download just
// LSN 0 means that it is bootstrap and we need to download just
// latest data from the pageserver. That is a bit clumsy but whole bootstrap
// procedure evolves quite actively right now, so let's think about it again
// when things would be more stable (TODO).
let lsn = self.sync_walkeepers()?;
if lsn == Lsn(pg_constants::WAL_SEGMENT_SIZE as u64) {
if lsn == Lsn(0 as u64) {
None
} else {
Some(lsn)

View File

@@ -54,6 +54,11 @@ impl WalStreamDecoder {
}
}
// The latest LSN position fed to the decoder.
pub fn available(&self) -> Lsn {
self.lsn + self.inputbuf.remaining() as u64
}
pub fn feed_bytes(&mut self, buf: &[u8]) {
self.inputbuf.extend_from_slice(buf);
}
@@ -159,17 +164,10 @@ impl WalStreamDecoder {
let recordbuf = recordbuf.freeze();
let mut buf = recordbuf.clone();
let xlogrec = XLogRecord::from_bytes(&mut buf);
// XLOG_SWITCH records are special. If we see one, we need to skip
// to the next WAL segment.
let xlogrec = XLogRecord::from_bytes(&mut buf);
let mut crc = crc32c_append(0, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
if crc != xlogrec.xl_crc {
return Err(WalDecodeError {
msg: "WAL record crc mismatch".into(),
lsn: self.lsn,
});
}
if xlogrec.is_xlog_switch_record() {
trace!("saw xlog switch record at {}", self.lsn);
self.padlen =
@@ -179,6 +177,15 @@ impl WalStreamDecoder {
self.padlen = self.lsn.calc_padding(8u32) as u32;
}
let mut crc = crc32c_append(0, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
if crc != xlogrec.xl_crc {
return Err(WalDecodeError {
msg: "WAL record crc mismatch".into(),
lsn: self.lsn,
});
}
// Always align resulting LSN on 0x8 boundary -- that is important for getPage()
// and WalReceiver integration. Since this code is used both for WalReceiver and
// initial WAL import let's force alignment right here.

View File

@@ -7,6 +7,7 @@ use byteorder::WriteBytesExt;
use bytes::Buf;
use bytes::Bytes;
use log::*;
use pageserver::waldecoder::WalStreamDecoder;
use postgres_ffi::xlog_utils::TimeLineID;
use serde::{Deserialize, Serialize};
use std::cmp::max;
@@ -71,8 +72,8 @@ pub struct SafeKeeperState {
pub proposer_uuid: PgUuid,
/// part of WAL acknowledged by quorum and available locally
pub commit_lsn: Lsn,
/// minimal LSN which may be needed for recovery of some safekeeper (end lsn
/// + 1 of last record streamed to everyone)
/// minimal LSN which may be needed for recovery of some safekeeper (end_lsn
/// of last record streamed to everyone)
pub truncate_lsn: Lsn,
}
@@ -142,7 +143,7 @@ pub struct VoteResponse {
/// Safekeeper's log position, to let proposer choose the most advanced one
epoch: Term,
flush_lsn: Lsn,
restart_lsn: Lsn,
truncate_lsn: Lsn,
}
/// Request with WAL message sent from proposer to safekeeper. Along the way it
@@ -163,8 +164,8 @@ pub struct AppendRequestHeader {
end_lsn: Lsn,
/// LSN committed by quorum of safekeepers
commit_lsn: Lsn,
/// restart LSN position (minimal LSN which may be needed by proposer to perform recovery)
restart_lsn: Lsn,
/// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
truncate_lsn: Lsn,
// only for logging/debugging
proposer_uuid: PgUuid,
}
@@ -276,10 +277,9 @@ pub trait Storage {
/// SafeKeeper which consumes events (messages from compute) and provides
/// replies.
#[derive(Debug)]
pub struct SafeKeeper<ST: Storage> {
/// Locally flushed part of WAL (end_lsn of last record). Established by
/// reading wal.
/// Locally flushed part of WAL with full records (end_lsn of last record).
/// Established by reading wal.
pub flush_lsn: Lsn,
pub tli: u32,
/// not-yet-flushed pairs of same named fields in s.*
@@ -288,6 +288,7 @@ pub struct SafeKeeper<ST: Storage> {
pub storage: ST,
pub s: SafeKeeperState, // persistent part
pub elected_proposer_term: Term, // for monitoring/debugging
decoder: WalStreamDecoder,
}
impl<ST> SafeKeeper<ST>
@@ -304,6 +305,7 @@ where
storage,
s: state,
elected_proposer_term: 0,
decoder: WalStreamDecoder::new(Lsn(0)),
}
}
@@ -347,7 +349,6 @@ where
self.s.server.ztli = msg.ztli;
self.s.server.tli = msg.tli;
self.s.server.wal_seg_size = msg.wal_seg_size;
self.s.proposer_uuid = msg.proposer_id;
self.storage.persist(&self.s, true)?;
info!(
@@ -367,7 +368,7 @@ where
vote_given: false as u64,
epoch: 0,
flush_lsn: Lsn(0),
restart_lsn: Lsn(0),
truncate_lsn: Lsn(0),
};
if self.s.acceptor_state.term < msg.term {
self.s.acceptor_state.term = msg.term;
@@ -376,7 +377,7 @@ where
resp.vote_given = true as u64;
resp.epoch = self.s.acceptor_state.epoch;
resp.flush_lsn = self.flush_lsn;
resp.restart_lsn = self.s.truncate_lsn;
resp.truncate_lsn = self.s.truncate_lsn;
}
info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
Ok(AcceptorProposerMessage::VoteResponse(resp))
@@ -411,9 +412,35 @@ where
return Ok(AcceptorProposerMessage::AppendResponse(resp));
}
self.s.proposer_uuid = msg.h.proposer_uuid;
// do the job
self.storage
.write_wal(&self.s, msg.h.begin_lsn, &msg.wal_data)?;
let mut last_rec_lsn = Lsn(0);
if msg.wal_data.len() > 0 {
self.storage
.write_wal(&self.s, msg.h.begin_lsn, &msg.wal_data)?;
// figure out last record's end lsn for reporting (if we got the
// whole record)
if self.decoder.available() != msg.h.begin_lsn {
info!(
"restart decoder from {} to {}",
self.decoder.available(),
msg.h.begin_lsn,
);
self.decoder = WalStreamDecoder::new(msg.h.begin_lsn);
}
self.decoder.feed_bytes(&msg.wal_data);
loop {
match self.decoder.poll_decode()? {
None => break, // no full record yet
Some((lsn, _rec)) => {
last_rec_lsn = lsn;
}
}
}
}
let mut sync_control_file = false;
/*
* Epoch switch happen when written WAL record cross the boundary.
@@ -429,21 +456,28 @@ where
if self.s.acceptor_state.epoch < msg.h.term
&& msg.h.end_lsn >= max(self.flush_lsn, msg.h.epoch_start_lsn)
{
info!("switched to new epoch {}", msg.h.term);
info!(
"switched to new epoch {} on receival of request end_lsn={:?}, len={:?}",
msg.h.term,
msg.h.end_lsn,
msg.wal_data.len(),
);
self.s.acceptor_state.epoch = msg.h.term; /* bump epoch */
sync_control_file = true;
}
if msg.h.end_lsn > self.flush_lsn {
self.flush_lsn = msg.h.end_lsn;
if last_rec_lsn > self.flush_lsn {
self.flush_lsn = last_rec_lsn;
}
self.s.proposer_uuid = msg.h.proposer_uuid;
// Advance commit_lsn taking into account what we have locally.
// xxx this is wrapped into epoch check because we overwrite wal
// instead of truncating it, so without it commit_lsn might include
// wrong part. Anyway, nobody is much interested in our commit_lsn while
// epoch switch hasn't happened, right?
if self.s.acceptor_state.epoch == msg.h.term {
// Advance commit_lsn taking into account what we have locally. xxx this
// is wrapped into epoch check because we overwrite wal instead of
// truncating it, so without it commit_lsn might include wrong part.
// Anyway, nobody is much interested in our commit_lsn while epoch
// switch hasn't happened, right?
//
// commit_lsn can be 0, being unknown to new walproposer while he hasn't
// collected majority of its epoch acks yet, ignore it in this case.
if self.s.acceptor_state.epoch == msg.h.term && msg.h.commit_lsn != Lsn(0) {
let commit_lsn = min(msg.h.commit_lsn, self.flush_lsn);
// If new commit_lsn reached epoch switch, force sync of control file:
// walproposer in sync mode is very interested when this happens.
@@ -451,10 +485,10 @@ where
commit_lsn >= msg.h.epoch_start_lsn && self.s.commit_lsn < msg.h.epoch_start_lsn;
self.commit_lsn = commit_lsn;
}
self.truncate_lsn = msg.h.restart_lsn;
self.truncate_lsn = msg.h.truncate_lsn;
/*
* Update restart LSN in control file.
* Update truncate and commit LSN in control file.
* To avoid negative impact on performance of extra fsync, do it only
* when restart_lsn delta exceeds WAL segment size.
*/
@@ -474,11 +508,12 @@ where
// will be filled by caller code to avoid bothering safekeeper
hs_feedback: HotStandbyFeedback::empty(),
};
debug!(
"processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, resp {:?}",
info!(
"processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, resp {:?}",
msg.wal_data.len(),
msg.h.end_lsn,
msg.h.commit_lsn,
msg.h.truncate_lsn,
&resp,
);
Ok(AcceptorProposerMessage::AppendResponse(resp))
@@ -548,7 +583,7 @@ mod tests {
begin_lsn: Lsn(1),
end_lsn: Lsn(2),
commit_lsn: Lsn(0),
restart_lsn: Lsn(0),
truncate_lsn: Lsn(0),
proposer_uuid: [0; 16],
};
let mut append_request = AppendRequest {

View File

@@ -27,12 +27,9 @@ use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ};
const CONTROL_FILE_NAME: &str = "safekeeper.control";
/// Shared state associated with database instance (tenant)
#[derive(Debug)]
struct SharedState {
/// Safekeeper object
sk: SafeKeeper<FileStorage>,
/// opened file control file handle (needed to hold exlusive file lock)
control_file: File,
/// For receiving-sending wal cooperation
/// quorum commit LSN we've notified walsenders about
commit_lsn: Lsn,
@@ -59,7 +56,7 @@ impl SharedState {
) -> Result<Self> {
let (cf, state) = SharedState::load_control_file(conf, timelineid, create)?;
let storage = FileStorage {
control_file: cf.try_clone()?,
control_file: cf,
conf: conf.clone(),
};
let (flush_lsn, tli) = if state.server.wal_seg_size != 0 {
@@ -72,7 +69,6 @@ impl SharedState {
Ok(Self {
commit_lsn: Lsn(0),
sk: SafeKeeper::new(Lsn(flush_lsn), tli, storage, state),
control_file: cf,
hs_feedback: HotStandbyFeedback {
ts: 0,
xmin: u64::MAX,
@@ -161,7 +157,6 @@ impl SharedState {
}
/// Database instance (tenant)
#[derive(Debug)]
pub struct Timeline {
pub timelineid: ZTimelineId,
mutex: Mutex<SharedState>,