diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index a2a86e4736..342b043387 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -3,7 +3,7 @@ use std::io::Write; use std::net::SocketAddr; use std::net::TcpStream; use std::os::unix::fs::PermissionsExt; -use std::process::Command; +use std::process::{Command, Stdio}; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; @@ -11,7 +11,6 @@ use std::{collections::BTreeMap, path::PathBuf}; use anyhow::{Context, Result}; use lazy_static::lazy_static; -use postgres_ffi::pg_constants; use regex::Regex; use zenith_utils::connstring::connection_host_port; use zenith_utils::lsn::Lsn; @@ -229,18 +228,24 @@ impl PostgresNode { fn sync_walkeepers(&self) -> Result { let pg_path = self.env.pg_bin_dir().join("postgres"); - let sync_output = Command::new(pg_path) + let sync_handle = Command::new(pg_path) .arg("--sync-safekeepers") .env_clear() .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) .env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) .env("PGDATA", self.pgdata().to_str().unwrap()) - .output() - .with_context(|| "sync-walkeepers failed")?; + .stdout(Stdio::piped()) + // Comment this to avoid capturing stderr (useful if command hangs) + .stderr(Stdio::piped()) + .spawn() + .expect("postgres --sync-safekeepers failed to start"); + let sync_output = sync_handle + .wait_with_output() + .expect("postgres --sync-safekeepers failed"); if !sync_output.status.success() { anyhow::bail!( - "sync-walkeepers failed: '{}'", + "sync-safekeepers failed: '{}'", String::from_utf8_lossy(&sync_output.stderr) ); } @@ -373,12 +378,12 @@ impl PostgresNode { fn load_basebackup(&self) -> Result<()> { let lsn = if self.uses_wal_proposer { - // LSN WAL_SEGMENT_SIZE means that it is bootstrap and we need to download just + // LSN 0 means that it is bootstrap and we need to download just // latest data from the pageserver. That is a bit clumsy but whole bootstrap // procedure evolves quite actively right now, so let's think about it again // when things would be more stable (TODO). let lsn = self.sync_walkeepers()?; - if lsn == Lsn(pg_constants::WAL_SEGMENT_SIZE as u64) { + if lsn == Lsn(0 as u64) { None } else { Some(lsn) diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 0846c55ced..cb94b9248b 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -54,6 +54,11 @@ impl WalStreamDecoder { } } + // The latest LSN position fed to the decoder. + pub fn available(&self) -> Lsn { + self.lsn + self.inputbuf.remaining() as u64 + } + pub fn feed_bytes(&mut self, buf: &[u8]) { self.inputbuf.extend_from_slice(buf); } @@ -159,17 +164,10 @@ impl WalStreamDecoder { let recordbuf = recordbuf.freeze(); let mut buf = recordbuf.clone(); + let xlogrec = XLogRecord::from_bytes(&mut buf); + // XLOG_SWITCH records are special. If we see one, we need to skip // to the next WAL segment. - let xlogrec = XLogRecord::from_bytes(&mut buf); - let mut crc = crc32c_append(0, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]); - crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]); - if crc != xlogrec.xl_crc { - return Err(WalDecodeError { - msg: "WAL record crc mismatch".into(), - lsn: self.lsn, - }); - } if xlogrec.is_xlog_switch_record() { trace!("saw xlog switch record at {}", self.lsn); self.padlen = @@ -179,6 +177,15 @@ impl WalStreamDecoder { self.padlen = self.lsn.calc_padding(8u32) as u32; } + let mut crc = crc32c_append(0, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]); + crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]); + if crc != xlogrec.xl_crc { + return Err(WalDecodeError { + msg: "WAL record crc mismatch".into(), + lsn: self.lsn, + }); + } + // Always align resulting LSN on 0x8 boundary -- that is important for getPage() // and WalReceiver integration. Since this code is used both for WalReceiver and // initial WAL import let's force alignment right here. diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index c6f636d3fe..16301edf98 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -7,6 +7,7 @@ use byteorder::WriteBytesExt; use bytes::Buf; use bytes::Bytes; use log::*; +use pageserver::waldecoder::WalStreamDecoder; use postgres_ffi::xlog_utils::TimeLineID; use serde::{Deserialize, Serialize}; use std::cmp::max; @@ -71,8 +72,8 @@ pub struct SafeKeeperState { pub proposer_uuid: PgUuid, /// part of WAL acknowledged by quorum and available locally pub commit_lsn: Lsn, - /// minimal LSN which may be needed for recovery of some safekeeper (end lsn - /// + 1 of last record streamed to everyone) + /// minimal LSN which may be needed for recovery of some safekeeper (end_lsn + /// of last record streamed to everyone) pub truncate_lsn: Lsn, } @@ -142,7 +143,7 @@ pub struct VoteResponse { /// Safekeeper's log position, to let proposer choose the most advanced one epoch: Term, flush_lsn: Lsn, - restart_lsn: Lsn, + truncate_lsn: Lsn, } /// Request with WAL message sent from proposer to safekeeper. Along the way it @@ -163,8 +164,8 @@ pub struct AppendRequestHeader { end_lsn: Lsn, /// LSN committed by quorum of safekeepers commit_lsn: Lsn, - /// restart LSN position (minimal LSN which may be needed by proposer to perform recovery) - restart_lsn: Lsn, + /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper + truncate_lsn: Lsn, // only for logging/debugging proposer_uuid: PgUuid, } @@ -276,10 +277,9 @@ pub trait Storage { /// SafeKeeper which consumes events (messages from compute) and provides /// replies. -#[derive(Debug)] pub struct SafeKeeper { - /// Locally flushed part of WAL (end_lsn of last record). Established by - /// reading wal. + /// Locally flushed part of WAL with full records (end_lsn of last record). + /// Established by reading wal. pub flush_lsn: Lsn, pub tli: u32, /// not-yet-flushed pairs of same named fields in s.* @@ -288,6 +288,7 @@ pub struct SafeKeeper { pub storage: ST, pub s: SafeKeeperState, // persistent part pub elected_proposer_term: Term, // for monitoring/debugging + decoder: WalStreamDecoder, } impl SafeKeeper @@ -304,6 +305,7 @@ where storage, s: state, elected_proposer_term: 0, + decoder: WalStreamDecoder::new(Lsn(0)), } } @@ -347,7 +349,6 @@ where self.s.server.ztli = msg.ztli; self.s.server.tli = msg.tli; self.s.server.wal_seg_size = msg.wal_seg_size; - self.s.proposer_uuid = msg.proposer_id; self.storage.persist(&self.s, true)?; info!( @@ -367,7 +368,7 @@ where vote_given: false as u64, epoch: 0, flush_lsn: Lsn(0), - restart_lsn: Lsn(0), + truncate_lsn: Lsn(0), }; if self.s.acceptor_state.term < msg.term { self.s.acceptor_state.term = msg.term; @@ -376,7 +377,7 @@ where resp.vote_given = true as u64; resp.epoch = self.s.acceptor_state.epoch; resp.flush_lsn = self.flush_lsn; - resp.restart_lsn = self.s.truncate_lsn; + resp.truncate_lsn = self.s.truncate_lsn; } info!("processed VoteRequest for term {}: {:?}", msg.term, &resp); Ok(AcceptorProposerMessage::VoteResponse(resp)) @@ -411,9 +412,35 @@ where return Ok(AcceptorProposerMessage::AppendResponse(resp)); } + self.s.proposer_uuid = msg.h.proposer_uuid; + // do the job - self.storage - .write_wal(&self.s, msg.h.begin_lsn, &msg.wal_data)?; + let mut last_rec_lsn = Lsn(0); + if msg.wal_data.len() > 0 { + self.storage + .write_wal(&self.s, msg.h.begin_lsn, &msg.wal_data)?; + + // figure out last record's end lsn for reporting (if we got the + // whole record) + if self.decoder.available() != msg.h.begin_lsn { + info!( + "restart decoder from {} to {}", + self.decoder.available(), + msg.h.begin_lsn, + ); + self.decoder = WalStreamDecoder::new(msg.h.begin_lsn); + } + self.decoder.feed_bytes(&msg.wal_data); + loop { + match self.decoder.poll_decode()? { + None => break, // no full record yet + Some((lsn, _rec)) => { + last_rec_lsn = lsn; + } + } + } + } + let mut sync_control_file = false; /* * Epoch switch happen when written WAL record cross the boundary. @@ -429,21 +456,28 @@ where if self.s.acceptor_state.epoch < msg.h.term && msg.h.end_lsn >= max(self.flush_lsn, msg.h.epoch_start_lsn) { - info!("switched to new epoch {}", msg.h.term); + info!( + "switched to new epoch {} on receival of request end_lsn={:?}, len={:?}", + msg.h.term, + msg.h.end_lsn, + msg.wal_data.len(), + ); self.s.acceptor_state.epoch = msg.h.term; /* bump epoch */ sync_control_file = true; } - if msg.h.end_lsn > self.flush_lsn { - self.flush_lsn = msg.h.end_lsn; + if last_rec_lsn > self.flush_lsn { + self.flush_lsn = last_rec_lsn; } - self.s.proposer_uuid = msg.h.proposer_uuid; - // Advance commit_lsn taking into account what we have locally. - // xxx this is wrapped into epoch check because we overwrite wal - // instead of truncating it, so without it commit_lsn might include - // wrong part. Anyway, nobody is much interested in our commit_lsn while - // epoch switch hasn't happened, right? - if self.s.acceptor_state.epoch == msg.h.term { + // Advance commit_lsn taking into account what we have locally. xxx this + // is wrapped into epoch check because we overwrite wal instead of + // truncating it, so without it commit_lsn might include wrong part. + // Anyway, nobody is much interested in our commit_lsn while epoch + // switch hasn't happened, right? + // + // commit_lsn can be 0, being unknown to new walproposer while he hasn't + // collected majority of its epoch acks yet, ignore it in this case. + if self.s.acceptor_state.epoch == msg.h.term && msg.h.commit_lsn != Lsn(0) { let commit_lsn = min(msg.h.commit_lsn, self.flush_lsn); // If new commit_lsn reached epoch switch, force sync of control file: // walproposer in sync mode is very interested when this happens. @@ -451,10 +485,10 @@ where commit_lsn >= msg.h.epoch_start_lsn && self.s.commit_lsn < msg.h.epoch_start_lsn; self.commit_lsn = commit_lsn; } - self.truncate_lsn = msg.h.restart_lsn; + self.truncate_lsn = msg.h.truncate_lsn; /* - * Update restart LSN in control file. + * Update truncate and commit LSN in control file. * To avoid negative impact on performance of extra fsync, do it only * when restart_lsn delta exceeds WAL segment size. */ @@ -474,11 +508,12 @@ where // will be filled by caller code to avoid bothering safekeeper hs_feedback: HotStandbyFeedback::empty(), }; - debug!( - "processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, resp {:?}", + info!( + "processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, resp {:?}", msg.wal_data.len(), msg.h.end_lsn, msg.h.commit_lsn, + msg.h.truncate_lsn, &resp, ); Ok(AcceptorProposerMessage::AppendResponse(resp)) @@ -548,7 +583,7 @@ mod tests { begin_lsn: Lsn(1), end_lsn: Lsn(2), commit_lsn: Lsn(0), - restart_lsn: Lsn(0), + truncate_lsn: Lsn(0), proposer_uuid: [0; 16], }; let mut append_request = AppendRequest { diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index e177e72ba5..197ed58f8c 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -27,12 +27,9 @@ use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ}; const CONTROL_FILE_NAME: &str = "safekeeper.control"; /// Shared state associated with database instance (tenant) -#[derive(Debug)] struct SharedState { /// Safekeeper object sk: SafeKeeper, - /// opened file control file handle (needed to hold exlusive file lock) - control_file: File, /// For receiving-sending wal cooperation /// quorum commit LSN we've notified walsenders about commit_lsn: Lsn, @@ -59,7 +56,7 @@ impl SharedState { ) -> Result { let (cf, state) = SharedState::load_control_file(conf, timelineid, create)?; let storage = FileStorage { - control_file: cf.try_clone()?, + control_file: cf, conf: conf.clone(), }; let (flush_lsn, tli) = if state.server.wal_seg_size != 0 { @@ -72,7 +69,6 @@ impl SharedState { Ok(Self { commit_lsn: Lsn(0), sk: SafeKeeper::new(Lsn(flush_lsn), tli, storage, state), - control_file: cf, hs_feedback: HotStandbyFeedback { ts: 0, xmin: u64::MAX, @@ -161,7 +157,6 @@ impl SharedState { } /// Database instance (tenant) -#[derive(Debug)] pub struct Timeline { pub timelineid: ZTimelineId, mutex: Mutex,