mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-28 10:30:40 +00:00
Basic safekeeper refactoring and bug fixing.
1) Extract consensus logic to safekeeper.rs. 2) Change the voting flow so that acceptor tells his epoch along with giving the vote, not before it; otherwise it might get immediately stale. #294 3) Process messages from compute atomically and sync state properly. #270 4) Use separate structs for disk and network. ref #315
This commit is contained in:
@@ -121,6 +121,7 @@ fn find_end_of_wal_segment(
|
||||
let mut rec_hdr = [0u8; XLOG_RECORD_CRC_OFFS];
|
||||
|
||||
while offs < wal_seg_size {
|
||||
// we are at the beginning of the page; read it in
|
||||
if offs % XLOG_BLCKSZ == 0 {
|
||||
if let Ok(bytes_read) = file.read(&mut buf) {
|
||||
if bytes_read != buf.len() {
|
||||
@@ -144,11 +145,12 @@ fn find_end_of_wal_segment(
|
||||
} else {
|
||||
offs += XLOG_SIZE_OF_XLOG_SHORT_PHD;
|
||||
}
|
||||
// beginning of the next record
|
||||
} else if contlen == 0 {
|
||||
let page_offs = offs % XLOG_BLCKSZ;
|
||||
let xl_tot_len = LittleEndian::read_u32(&buf[page_offs..page_offs + 4]) as usize;
|
||||
if xl_tot_len == 0 {
|
||||
break;
|
||||
break; // zeros, reached the end
|
||||
}
|
||||
last_valid_rec_pos = offs;
|
||||
offs += 4;
|
||||
@@ -156,12 +158,13 @@ fn find_end_of_wal_segment(
|
||||
contlen = xl_tot_len - 4;
|
||||
rec_hdr[0..4].copy_from_slice(&buf[page_offs..page_offs + 4]);
|
||||
} else {
|
||||
let page_offs = offs % XLOG_BLCKSZ;
|
||||
// we're continuing a record, possibly from previous page.
|
||||
let page_offs = offs % XLOG_BLCKSZ;
|
||||
let pageleft = XLOG_BLCKSZ - page_offs;
|
||||
|
||||
// read the rest of the record, or as much as fits on this page.
|
||||
let n = min(contlen, pageleft);
|
||||
// fill rec_hdr (header up to (but not including) xl_crc field)
|
||||
if rec_offs < XLOG_RECORD_CRC_OFFS {
|
||||
let len = min(XLOG_RECORD_CRC_OFFS - rec_offs, n);
|
||||
rec_hdr[rec_offs..rec_offs + len].copy_from_slice(&buf[page_offs..page_offs + len]);
|
||||
@@ -185,6 +188,8 @@ fn find_end_of_wal_segment(
|
||||
crc = crc32c_append(crc, &rec_hdr);
|
||||
offs = (offs + 7) & !7; // pad on 8 bytes boundary */
|
||||
if crc == wal_crc {
|
||||
// record is valid, advance the result to its end (with
|
||||
// alignment to the next record taken into account)
|
||||
last_valid_rec_pos = offs;
|
||||
} else {
|
||||
info!(
|
||||
@@ -201,6 +206,9 @@ fn find_end_of_wal_segment(
|
||||
|
||||
///
|
||||
/// Scan a directory that contains PostgreSQL WAL files, for the end of WAL.
|
||||
/// If precise, returns end LSN (next insertion point, basically);
|
||||
/// otherwise, start of the last segment.
|
||||
/// Returns (0, 0) if there is no WAL.
|
||||
///
|
||||
pub fn find_end_of_wal(
|
||||
data_dir: &Path,
|
||||
|
||||
@@ -5,6 +5,7 @@ use std::time::Duration;
|
||||
pub mod receive_wal;
|
||||
pub mod replication;
|
||||
pub mod s3_offload;
|
||||
pub mod safekeeper;
|
||||
pub mod send_wal;
|
||||
pub mod timeline;
|
||||
pub mod wal_service;
|
||||
|
||||
@@ -1,142 +1,26 @@
|
||||
//! This implements the Safekeeper protocol, picking up immediately after the "START_WAL_PUSH" message
|
||||
//!
|
||||
//! FIXME: better description needed here
|
||||
//! Safekeeper communication endpoint to wal proposer (compute node).
|
||||
//! Gets messages from the network, passes them down to consensus module and
|
||||
//! sends replies back.
|
||||
|
||||
use anyhow::{bail, Context, Result};
|
||||
use bincode::config::Options;
|
||||
use bytes::{Buf, Bytes};
|
||||
use bytes::Bytes;
|
||||
use log::*;
|
||||
use postgres::{Client, Config, NoTls};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::cmp::{max, min};
|
||||
use std::fs::{self, File, OpenOptions};
|
||||
use std::io::{Read, Seek, SeekFrom, Write};
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::str;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::thread::sleep;
|
||||
use zenith_utils::bin_ser::{self, le_coder, LeSer};
|
||||
use zenith_utils::connstring::connection_host_port;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
use zenith_utils::postgres_backend::PostgresBackend;
|
||||
use zenith_utils::pq_proto::{BeMessage, FeMessage, SystemId};
|
||||
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||
|
||||
use crate::replication::HotStandbyFeedback;
|
||||
use crate::safekeeper::AcceptorProposerMessage;
|
||||
use crate::safekeeper::ProposerAcceptorMessage;
|
||||
|
||||
use crate::send_wal::SendWalHandler;
|
||||
use crate::timeline::{Timeline, TimelineTools};
|
||||
use crate::timeline::TimelineTools;
|
||||
use crate::WalAcceptorConf;
|
||||
use postgres_ffi::xlog_utils::{TimeLineID, XLogFileName, MAX_SEND_SIZE, XLOG_BLCKSZ};
|
||||
|
||||
pub const SK_MAGIC: u32 = 0xcafeceefu32;
|
||||
pub const SK_FORMAT_VERSION: u32 = 1;
|
||||
const SK_PROTOCOL_VERSION: u32 = 1;
|
||||
const UNKNOWN_SERVER_VERSION: u32 = 0;
|
||||
const END_OF_STREAM: Lsn = Lsn(0);
|
||||
pub const CONTROL_FILE_NAME: &str = "safekeeper.control";
|
||||
|
||||
/// Unique node identifier used by Paxos
|
||||
#[derive(Debug, Clone, Copy, Ord, PartialOrd, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct NodeId {
|
||||
term: u64,
|
||||
uuid: [u8; 16],
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub struct ServerInfo {
|
||||
/// proposer-safekeeper protocol version
|
||||
pub protocol_version: u32,
|
||||
/// Postgres server version
|
||||
pub pg_version: u32,
|
||||
pub node_id: NodeId,
|
||||
pub system_id: SystemId,
|
||||
/// Zenith timelineid
|
||||
pub timeline_id: ZTimelineId,
|
||||
pub wal_end: Lsn,
|
||||
pub timeline: TimeLineID,
|
||||
pub wal_seg_size: u32,
|
||||
pub tenant_id: ZTenantId,
|
||||
}
|
||||
|
||||
/// Vote request sent from proposer to safekeepers
|
||||
#[derive(Debug, PartialEq, Serialize, Deserialize)]
|
||||
struct RequestVote {
|
||||
node_id: NodeId,
|
||||
/// volume commit LSN
|
||||
vcl: Lsn,
|
||||
/// new epoch when safekeeper reaches vcl
|
||||
epoch: u64,
|
||||
}
|
||||
|
||||
/// Information of about storage node
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub struct SafeKeeperInfo {
|
||||
/// magic for verifying content the control file
|
||||
pub magic: u32,
|
||||
/// safekeeper format version
|
||||
pub format_version: u32,
|
||||
/// safekeeper's epoch
|
||||
pub epoch: u64,
|
||||
/// information about server
|
||||
pub server: ServerInfo,
|
||||
/// part of WAL acknowledged by quorum
|
||||
pub commit_lsn: Lsn,
|
||||
/// locally flushed part of WAL
|
||||
pub flush_lsn: Lsn,
|
||||
/// minimal LSN which may be needed for recovery of some safekeeper: min(commit_lsn) for all safekeepers
|
||||
pub restart_lsn: Lsn,
|
||||
}
|
||||
|
||||
impl SafeKeeperInfo {
|
||||
pub fn new() -> SafeKeeperInfo {
|
||||
SafeKeeperInfo {
|
||||
magic: SK_MAGIC,
|
||||
format_version: SK_FORMAT_VERSION,
|
||||
epoch: 0,
|
||||
server: ServerInfo {
|
||||
protocol_version: SK_PROTOCOL_VERSION, /* proposer-safekeeper protocol version */
|
||||
pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
|
||||
node_id: NodeId {
|
||||
term: 0,
|
||||
uuid: [0; 16],
|
||||
},
|
||||
system_id: 0, /* Postgres system identifier */
|
||||
timeline_id: ZTimelineId::from([0u8; 16]),
|
||||
wal_end: Lsn(0),
|
||||
timeline: 0,
|
||||
wal_seg_size: 0,
|
||||
tenant_id: ZTenantId::from([0u8; 16]),
|
||||
},
|
||||
commit_lsn: Lsn(0), /* part of WAL acknowledged by quorum */
|
||||
flush_lsn: Lsn(0), /* locally flushed part of WAL */
|
||||
restart_lsn: Lsn(0), /* minimal LSN which may be needed for recovery of some safekeeper */
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Request with WAL message sent from proposer to safekeeper.
|
||||
#[derive(Debug, PartialEq, Serialize, Deserialize)]
|
||||
struct SafeKeeperRequest {
|
||||
/// Sender's node identifier (looks like we do not need it for TCP streaming connection)
|
||||
sender_id: NodeId,
|
||||
/// start position of message in WAL
|
||||
begin_lsn: Lsn,
|
||||
/// end position of message in WAL
|
||||
end_lsn: Lsn,
|
||||
/// restart LSN position (minimal LSN which may be needed by proposer to perform recovery)
|
||||
restart_lsn: Lsn,
|
||||
/// LSN committed by quorum of safekeepers
|
||||
commit_lsn: Lsn,
|
||||
}
|
||||
|
||||
/// Report safekeeper state to proposer
|
||||
#[derive(Debug, PartialEq, Serialize, Deserialize)]
|
||||
struct SafeKeeperResponse {
|
||||
epoch: u64,
|
||||
flush_lsn: Lsn,
|
||||
hs_feedback: HotStandbyFeedback,
|
||||
}
|
||||
use zenith_utils::connstring::connection_host_port;
|
||||
use zenith_utils::postgres_backend::PostgresBackend;
|
||||
use zenith_utils::pq_proto::{BeMessage, FeMessage};
|
||||
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||
|
||||
pub struct ReceiveWalConn<'pg> {
|
||||
/// Postgres connection
|
||||
@@ -206,315 +90,58 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
}
|
||||
}
|
||||
|
||||
// Read the result of a `CopyData` message sent from the postgres instance
|
||||
//
|
||||
// As the trait bound implies, this always encodes little-endian.
|
||||
fn read_msg<T: LeSer>(&mut self) -> Result<T> {
|
||||
// Read and parse message sent from the postgres instance
|
||||
fn read_msg(&mut self) -> Result<ProposerAcceptorMessage> {
|
||||
let data = self.read_msg_bytes()?;
|
||||
// Taken directly from `LeSer::des`:
|
||||
let value = le_coder()
|
||||
.reject_trailing_bytes()
|
||||
.deserialize(&data)
|
||||
.or(Err(bin_ser::DeserializeError::BadInput))?;
|
||||
Ok(value)
|
||||
ProposerAcceptorMessage::parse(data)
|
||||
}
|
||||
|
||||
// Writes the value into a `CopyData` message sent to the postgres instance
|
||||
fn write_msg<T: LeSer>(&mut self, value: &T) -> Result<()> {
|
||||
// Send message to the postgres
|
||||
fn write_msg(&mut self, msg: &AcceptorProposerMessage) -> Result<()> {
|
||||
let mut buf = Vec::new();
|
||||
value.ser_into(&mut buf)?;
|
||||
msg.serialize(&mut buf)?;
|
||||
self.pg_backend.write_message(&BeMessage::CopyData(&buf))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Receive WAL from wal_proposer
|
||||
pub fn run(&mut self, swh: &mut SendWalHandler) -> Result<()> {
|
||||
let mut this_timeline: Option<Arc<Timeline>> = None;
|
||||
|
||||
// Notify the libpq client that it's allowed to send `CopyData` messages
|
||||
self.pg_backend
|
||||
.write_message(&BeMessage::CopyBothResponse)?;
|
||||
|
||||
// Receive information about server
|
||||
let server_info = self
|
||||
.read_msg::<ServerInfo>()
|
||||
.context("Failed to receive server info")?;
|
||||
info!(
|
||||
"Start handshake with wal_proposer {} sysid {} timeline {} tenant {}",
|
||||
self.peer_addr, server_info.system_id, server_info.timeline_id, server_info.tenant_id,
|
||||
);
|
||||
// FIXME: also check that the system identifier matches
|
||||
this_timeline.set(server_info.timeline_id)?;
|
||||
this_timeline.get().load_control_file(&swh.conf)?;
|
||||
|
||||
let mut my_info = this_timeline.get().get_info();
|
||||
|
||||
/* Check protocol compatibility */
|
||||
if server_info.protocol_version != SK_PROTOCOL_VERSION {
|
||||
bail!(
|
||||
"Incompatible protocol version {}, expected {}",
|
||||
server_info.protocol_version,
|
||||
SK_PROTOCOL_VERSION
|
||||
);
|
||||
}
|
||||
/* Postgres upgrade is not treated as fatal error */
|
||||
if server_info.pg_version != my_info.server.pg_version
|
||||
&& my_info.server.pg_version != UNKNOWN_SERVER_VERSION
|
||||
{
|
||||
info!(
|
||||
"Incompatible server version {}, expected {}",
|
||||
server_info.pg_version, my_info.server.pg_version
|
||||
);
|
||||
let mut msg = self
|
||||
.read_msg()
|
||||
.context("failed to receive proposer greeting")?;
|
||||
let tenant_id: ZTenantId;
|
||||
match msg {
|
||||
ProposerAcceptorMessage::Greeting(ref greeting) => {
|
||||
info!(
|
||||
"start handshake with wal proposer {} sysid {} timeline {}",
|
||||
self.peer_addr, greeting.system_id, greeting.tli,
|
||||
);
|
||||
tenant_id = greeting.tenant_id;
|
||||
}
|
||||
_ => bail!("unexpected message {:?} instead of greeting", msg),
|
||||
}
|
||||
|
||||
/* Update information about server, but preserve locally stored node_id */
|
||||
let node_id = my_info.server.node_id;
|
||||
my_info.server = server_info.clone();
|
||||
my_info.server.node_id = node_id;
|
||||
|
||||
/* Calculate WAL end based on local data */
|
||||
let (flush_lsn, timeline_id) = this_timeline.find_end_of_wal(&swh.conf.data_dir, true);
|
||||
my_info.flush_lsn = flush_lsn;
|
||||
my_info.server.timeline = timeline_id;
|
||||
|
||||
info!(
|
||||
"find_end_of_wal in {:?}: timeline={} flush_lsn={}",
|
||||
&swh.conf.data_dir, timeline_id, flush_lsn
|
||||
);
|
||||
|
||||
/* Report my identifier to proposer */
|
||||
self.write_msg(&my_info)?;
|
||||
|
||||
/* Wait for vote request */
|
||||
let prop = self
|
||||
.read_msg::<RequestVote>()
|
||||
.context("Failed to read vote request")?;
|
||||
/* This is Paxos check which should ensure that only one master can perform commits */
|
||||
if prop.node_id < my_info.server.node_id {
|
||||
/* Send my node-id to inform proposer that it's candidate was rejected */
|
||||
self.write_msg(&my_info.server.node_id)?;
|
||||
bail!(
|
||||
"Reject connection attempt with term {} because my term is {}",
|
||||
prop.node_id.term,
|
||||
my_info.server.node_id.term,
|
||||
);
|
||||
}
|
||||
my_info.server.node_id = prop.node_id;
|
||||
this_timeline.get().set_info(&my_info);
|
||||
/* Need to persist our vote first */
|
||||
this_timeline.get().save_control_file(true)?;
|
||||
|
||||
let mut flushed_restart_lsn = Lsn(0);
|
||||
let wal_seg_size = server_info.wal_seg_size as usize;
|
||||
|
||||
/* Acknowledge the proposed candidate by returning it to the proposer */
|
||||
self.write_msg(&prop.node_id)?;
|
||||
|
||||
// if requested, ask pageserver to fetch wal from us
|
||||
// xxx: this place seems not really fitting
|
||||
if swh.conf.pageserver_addr.is_some() {
|
||||
// Need to establish replication channel with page server.
|
||||
// Add far as replication in postgres is initiated by receiver, we should use callme mechanism
|
||||
let conf = swh.conf.clone();
|
||||
let timelineid = this_timeline.get().timelineid;
|
||||
let tenantid = server_info.tenant_id;
|
||||
let timelineid = swh.timeline.get().timelineid;
|
||||
thread::spawn(move || {
|
||||
request_callback(conf, timelineid, tenantid);
|
||||
request_callback(conf, timelineid, tenant_id);
|
||||
});
|
||||
}
|
||||
|
||||
info!(
|
||||
"Start streaming from timeline {} tenant {} address {:?} flush_lsn={}",
|
||||
server_info.timeline_id, server_info.tenant_id, self.peer_addr, my_info.flush_lsn
|
||||
);
|
||||
|
||||
// Main loop
|
||||
loop {
|
||||
let mut sync_control_file = false;
|
||||
|
||||
/* Receive message header */
|
||||
let msg_bytes = self.read_msg_bytes()?;
|
||||
let mut msg_reader = msg_bytes.reader();
|
||||
|
||||
let req = SafeKeeperRequest::des_from(&mut msg_reader)
|
||||
.context("Failed to get WAL message header")?;
|
||||
if req.sender_id != my_info.server.node_id {
|
||||
bail!("Sender NodeId is changed");
|
||||
}
|
||||
if req.begin_lsn == END_OF_STREAM {
|
||||
info!("Server stops streaming");
|
||||
break;
|
||||
}
|
||||
let start_pos = req.begin_lsn;
|
||||
let end_pos = req.end_lsn;
|
||||
let rec_size = end_pos.checked_sub(start_pos).unwrap().0 as usize;
|
||||
assert!(rec_size <= MAX_SEND_SIZE);
|
||||
|
||||
debug!(
|
||||
"received for {} bytes between {} and {}",
|
||||
rec_size, start_pos, end_pos,
|
||||
);
|
||||
|
||||
/* Receive message body (from the rest of the message) */
|
||||
let mut buf = Vec::with_capacity(rec_size);
|
||||
msg_reader.read_to_end(&mut buf)?;
|
||||
assert_eq!(buf.len(), rec_size);
|
||||
|
||||
/* Save message in file */
|
||||
Self::write_wal_file(
|
||||
swh,
|
||||
start_pos,
|
||||
timeline_id,
|
||||
this_timeline.get(),
|
||||
wal_seg_size,
|
||||
&buf,
|
||||
)?;
|
||||
|
||||
my_info.restart_lsn = req.restart_lsn;
|
||||
my_info.commit_lsn = req.commit_lsn;
|
||||
|
||||
/*
|
||||
* Epoch switch happen when written WAL record cross the boundary.
|
||||
* The boundary is maximum of last WAL position at this node (FlushLSN) and global
|
||||
* maximum (vcl) determined by WAL proposer during handshake.
|
||||
* Switching epoch means that node completes recovery and start writing in the WAL new data.
|
||||
*/
|
||||
if my_info.epoch < prop.epoch && end_pos > max(my_info.flush_lsn, prop.vcl) {
|
||||
info!("Switch to new epoch {}", prop.epoch);
|
||||
my_info.epoch = prop.epoch; /* bump epoch */
|
||||
sync_control_file = true;
|
||||
}
|
||||
if end_pos > my_info.flush_lsn {
|
||||
my_info.flush_lsn = end_pos;
|
||||
}
|
||||
/*
|
||||
* Update restart LSN in control file.
|
||||
* To avoid negative impact on performance of extra fsync, do it only
|
||||
* when restart_lsn delta exceeds WAL segment size.
|
||||
*/
|
||||
sync_control_file |= flushed_restart_lsn + (wal_seg_size as u64) < my_info.restart_lsn;
|
||||
this_timeline.get().save_control_file(sync_control_file)?;
|
||||
|
||||
if sync_control_file {
|
||||
flushed_restart_lsn = my_info.restart_lsn;
|
||||
}
|
||||
|
||||
/* Report flush position */
|
||||
//info!("Confirm LSN: {:X}/{:>08X}", (end_pos>>32) as u32, end_pos as u32);
|
||||
let resp = SafeKeeperResponse {
|
||||
epoch: my_info.epoch,
|
||||
flush_lsn: end_pos,
|
||||
hs_feedback: this_timeline.get().get_hs_feedback(),
|
||||
};
|
||||
self.write_msg(&resp)?;
|
||||
|
||||
/*
|
||||
* Ping wal sender that new data is available.
|
||||
* FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper.
|
||||
*/
|
||||
this_timeline
|
||||
.get()
|
||||
.notify_wal_senders(min(req.commit_lsn, end_pos));
|
||||
let reply = swh.timeline.get().process_msg(&msg)?;
|
||||
self.write_msg(&reply)?;
|
||||
msg = self.read_msg()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_wal_file(
|
||||
swh: &SendWalHandler,
|
||||
startpos: Lsn,
|
||||
timeline_id: TimeLineID,
|
||||
timeline: &Arc<Timeline>,
|
||||
wal_seg_size: usize,
|
||||
buf: &[u8],
|
||||
) -> Result<()> {
|
||||
let mut bytes_left: usize = buf.len();
|
||||
let mut bytes_written: usize = 0;
|
||||
let mut partial;
|
||||
let mut start_pos = startpos;
|
||||
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
|
||||
|
||||
/* Extract WAL location for this block */
|
||||
let mut xlogoff = start_pos.segment_offset(wal_seg_size) as usize;
|
||||
|
||||
while bytes_left != 0 {
|
||||
let bytes_to_write;
|
||||
|
||||
/*
|
||||
* If crossing a WAL boundary, only write up until we reach wal
|
||||
* segment size.
|
||||
*/
|
||||
if xlogoff + bytes_left > wal_seg_size {
|
||||
bytes_to_write = wal_seg_size - xlogoff;
|
||||
} else {
|
||||
bytes_to_write = bytes_left;
|
||||
}
|
||||
|
||||
/* Open file */
|
||||
let segno = start_pos.segment_number(wal_seg_size);
|
||||
let wal_file_name = XLogFileName(timeline_id, segno, wal_seg_size);
|
||||
let wal_file_path = swh
|
||||
.conf
|
||||
.data_dir
|
||||
.join(timeline.timelineid.to_string())
|
||||
.join(wal_file_name.clone());
|
||||
let wal_file_partial_path = swh
|
||||
.conf
|
||||
.data_dir
|
||||
.join(timeline.timelineid.to_string())
|
||||
.join(wal_file_name.clone() + ".partial");
|
||||
|
||||
{
|
||||
let mut wal_file: File;
|
||||
/* Try to open already completed segment */
|
||||
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
|
||||
wal_file = file;
|
||||
partial = false;
|
||||
} else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path)
|
||||
{
|
||||
/* Try to open existed partial file */
|
||||
wal_file = file;
|
||||
partial = true;
|
||||
} else {
|
||||
/* Create and fill new partial file */
|
||||
partial = true;
|
||||
match OpenOptions::new()
|
||||
.create(true)
|
||||
.write(true)
|
||||
.open(&wal_file_partial_path)
|
||||
{
|
||||
Ok(mut file) => {
|
||||
for _ in 0..(wal_seg_size / XLOG_BLCKSZ) {
|
||||
file.write_all(&ZERO_BLOCK)?;
|
||||
}
|
||||
wal_file = file;
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to open log file {:?}: {}", &wal_file_path, e);
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
wal_file.seek(SeekFrom::Start(xlogoff as u64))?;
|
||||
wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?;
|
||||
|
||||
// Flush file is not prohibited
|
||||
if !swh.conf.no_sync {
|
||||
wal_file.sync_all()?;
|
||||
}
|
||||
}
|
||||
/* Write was successful, advance our position */
|
||||
bytes_written += bytes_to_write;
|
||||
bytes_left -= bytes_to_write;
|
||||
start_pos += bytes_to_write as u64;
|
||||
xlogoff += bytes_to_write;
|
||||
|
||||
/* Did we reach the end of a WAL segment? */
|
||||
if start_pos.segment_offset(wal_seg_size) == 0 {
|
||||
xlogoff = 0;
|
||||
if partial {
|
||||
fs::rename(&wal_file_partial_path, &wal_file_path)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,6 +35,16 @@ pub struct HotStandbyFeedback {
|
||||
pub catalog_xmin: FullTransactionId,
|
||||
}
|
||||
|
||||
impl HotStandbyFeedback {
|
||||
pub fn empty() -> HotStandbyFeedback {
|
||||
HotStandbyFeedback {
|
||||
ts: 0,
|
||||
xmin: 0,
|
||||
catalog_xmin: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A network connection that's speaking the replication protocol.
|
||||
pub struct ReplicationConn {
|
||||
/// This is an `Option` because we will spawn a background thread that will
|
||||
@@ -150,7 +160,7 @@ impl ReplicationConn {
|
||||
break;
|
||||
}
|
||||
}
|
||||
let (wal_end, timeline) = swh.timeline.find_end_of_wal(&swh.conf.data_dir, true);
|
||||
let (wal_end, timeline) = swh.timeline.get().get_end_of_wal();
|
||||
if start_pos == Lsn(0) {
|
||||
start_pos = wal_end;
|
||||
}
|
||||
|
||||
548
walkeeper/src/safekeeper.rs
Normal file
548
walkeeper/src/safekeeper.rs
Normal file
@@ -0,0 +1,548 @@
|
||||
//! Acceptor part of proposer-acceptor consensus algorithm.
|
||||
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use byteorder::LittleEndian;
|
||||
use byteorder::ReadBytesExt;
|
||||
use byteorder::WriteBytesExt;
|
||||
use bytes::Buf;
|
||||
use bytes::Bytes;
|
||||
use log::*;
|
||||
use postgres_ffi::xlog_utils::TimeLineID;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::cmp::max;
|
||||
use std::io;
|
||||
use std::io::Read;
|
||||
|
||||
use crate::replication::HotStandbyFeedback;
|
||||
use postgres_ffi::xlog_utils::MAX_SEND_SIZE;
|
||||
use zenith_utils::bin_ser::LeSer;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
use zenith_utils::pq_proto::SystemId;
|
||||
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||
|
||||
pub const SK_MAGIC: u32 = 0xcafeceefu32;
|
||||
pub const SK_FORMAT_VERSION: u32 = 1;
|
||||
const SK_PROTOCOL_VERSION: u32 = 1;
|
||||
const UNKNOWN_SERVER_VERSION: u32 = 0;
|
||||
|
||||
/// Consensus logical timestamp.
|
||||
type Term = u64;
|
||||
|
||||
/// Unique id of proposer. Not needed for correctness, used for monitoring.
|
||||
type PgUuid = [u8; 16];
|
||||
|
||||
/// Persistent consensus state of the acceptor.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct AcceptorState {
|
||||
/// acceptor's last term it voted for (advanced in 1 phase)
|
||||
pub term: Term,
|
||||
/// acceptor's epoch (advanced, i.e. bumped to 'term' when VCL is reached).
|
||||
pub epoch: Term,
|
||||
}
|
||||
|
||||
/// Information about Postgres. Safekeeper gets it once and then verifies
|
||||
/// all further connections from computes match.
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub struct ServerInfo {
|
||||
/// Postgres server version
|
||||
pub pg_version: u32,
|
||||
pub system_id: SystemId,
|
||||
/// Zenith timelineid
|
||||
pub ztli: ZTimelineId,
|
||||
pub tli: TimeLineID,
|
||||
pub wal_seg_size: u32,
|
||||
}
|
||||
|
||||
/// Persistent information stored on safekeeper node
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct SafeKeeperState {
|
||||
/// magic for verifying content the control file
|
||||
pub magic: u32,
|
||||
/// safekeeper format version
|
||||
pub format_version: u32,
|
||||
/// persistent acceptor state
|
||||
pub acceptor_state: AcceptorState,
|
||||
/// information about server
|
||||
pub server: ServerInfo,
|
||||
/// Unique id of the last *elected* proposer we dealed with. Not needed
|
||||
/// correctness, exists for monitoring purposes.
|
||||
pub proposer_uuid: PgUuid,
|
||||
/// part of WAL acknowledged by quorum
|
||||
pub commit_lsn: Lsn,
|
||||
/// minimal LSN which may be needed for recovery of some safekeeper: min(commit_lsn) for all safekeepers
|
||||
pub restart_lsn: Lsn,
|
||||
}
|
||||
|
||||
impl SafeKeeperState {
|
||||
pub fn new() -> SafeKeeperState {
|
||||
SafeKeeperState {
|
||||
magic: SK_MAGIC,
|
||||
format_version: SK_FORMAT_VERSION,
|
||||
acceptor_state: AcceptorState { term: 0, epoch: 0 },
|
||||
server: ServerInfo {
|
||||
pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
|
||||
system_id: 0, /* Postgres system identifier */
|
||||
ztli: ZTimelineId::from([0u8; 16]),
|
||||
tli: 0,
|
||||
wal_seg_size: 0,
|
||||
},
|
||||
proposer_uuid: [0; 16],
|
||||
commit_lsn: Lsn(0), /* part of WAL acknowledged by quorum */
|
||||
restart_lsn: Lsn(0), /* minimal LSN which may be needed for recovery of some safekeeper */
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// protocol messages
|
||||
|
||||
/// Initial Proposer -> Acceptor message
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct ProposerGreeting {
|
||||
/// proposer-acceptor protocol version
|
||||
pub protocol_version: u32,
|
||||
/// Postgres server version
|
||||
pub pg_version: u32,
|
||||
pub proposer_id: PgUuid,
|
||||
pub system_id: SystemId,
|
||||
/// Zenith timelineid
|
||||
pub ztli: ZTimelineId,
|
||||
pub tenant_id: ZTenantId,
|
||||
pub tli: TimeLineID,
|
||||
pub wal_seg_size: u32,
|
||||
}
|
||||
|
||||
/// Acceptor -> Proposer initial response: the highest term known to me
|
||||
/// (acceptor voted for).
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct AcceptorGreeting {
|
||||
term: u64,
|
||||
}
|
||||
|
||||
/// Vote request sent from proposer to safekeepers
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct VoteRequest {
|
||||
term: Term,
|
||||
}
|
||||
|
||||
/// Vote itself, sent from safekeeper to proposer
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct VoteResponse {
|
||||
term: Term, // not really needed, just a sanity check
|
||||
vote_given: u64, // fixme u64 due to padding
|
||||
/// Safekeeper's log position, to let proposer choose the most advanced one
|
||||
epoch: Term,
|
||||
flush_lsn: Lsn,
|
||||
restart_lsn: Lsn,
|
||||
}
|
||||
|
||||
/// Request with WAL message sent from proposer to safekeeper. Along the way it
|
||||
/// announces 1) successful election (with VCL); 2) commit_lsn.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct AppendRequest {
|
||||
h: AppendRequestHeader,
|
||||
wal_data: Bytes,
|
||||
}
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct AppendRequestHeader {
|
||||
term: Term,
|
||||
/// volume commit LSN
|
||||
vcl: Lsn,
|
||||
/// start position of message in WAL
|
||||
begin_lsn: Lsn,
|
||||
/// end position of message in WAL
|
||||
end_lsn: Lsn,
|
||||
/// LSN committed by quorum of safekeepers
|
||||
commit_lsn: Lsn,
|
||||
/// restart LSN position (minimal LSN which may be needed by proposer to perform recovery)
|
||||
restart_lsn: Lsn,
|
||||
// only for logging/debugging
|
||||
proposer_uuid: PgUuid,
|
||||
}
|
||||
|
||||
/// Report safekeeper state to proposer
|
||||
#[derive(Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub struct AppendResponse {
|
||||
// Current term of the safekeeper; if it is higher than proposer's, the
|
||||
// compute is out of date.
|
||||
pub term: Term,
|
||||
pub epoch: Term,
|
||||
// NOTE: this is physical end of wal on safekeeper; currently it doesn't
|
||||
// make much sense without taking epoch into account, as history can be
|
||||
// diverged.
|
||||
pub flush_lsn: Lsn,
|
||||
pub hs_feedback: HotStandbyFeedback,
|
||||
}
|
||||
|
||||
/// Proposer -> Acceptor messages
|
||||
#[derive(Debug)]
|
||||
pub enum ProposerAcceptorMessage {
|
||||
Greeting(ProposerGreeting),
|
||||
VoteRequest(VoteRequest),
|
||||
AppendRequest(AppendRequest),
|
||||
}
|
||||
|
||||
impl ProposerAcceptorMessage {
|
||||
/// Parse proposer message.
|
||||
pub fn parse(msg: Bytes) -> Result<ProposerAcceptorMessage> {
|
||||
// xxx using Reader is inefficient but easy to work with bincode
|
||||
let mut stream = msg.reader();
|
||||
// u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
|
||||
let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
|
||||
match tag {
|
||||
'g' => {
|
||||
let msg = ProposerGreeting::des_from(&mut stream)?;
|
||||
Ok(ProposerAcceptorMessage::Greeting(msg))
|
||||
}
|
||||
'v' => {
|
||||
let msg = VoteRequest::des_from(&mut stream)?;
|
||||
Ok(ProposerAcceptorMessage::VoteRequest(msg))
|
||||
}
|
||||
'a' => {
|
||||
// read header followed by wal data
|
||||
let hdr = AppendRequestHeader::des_from(&mut stream)?;
|
||||
let rec_size = hdr
|
||||
.end_lsn
|
||||
.checked_sub(hdr.begin_lsn)
|
||||
.ok_or(anyhow!("begin_lsn > end_lsn in AppendRequest"))?
|
||||
.0 as usize;
|
||||
if rec_size > MAX_SEND_SIZE {
|
||||
bail!(
|
||||
"AppendRequest is longer than MAX_SEND_SIZE ({})",
|
||||
MAX_SEND_SIZE
|
||||
);
|
||||
}
|
||||
|
||||
let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
|
||||
stream.read_exact(&mut wal_data_vec)?;
|
||||
let wal_data = Bytes::from(wal_data_vec);
|
||||
let msg = AppendRequest {
|
||||
h: hdr,
|
||||
wal_data: wal_data,
|
||||
};
|
||||
|
||||
Ok(ProposerAcceptorMessage::AppendRequest(msg))
|
||||
}
|
||||
_ => Err(anyhow!("unknown proposer-acceptor message tag: {}", tag,)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Acceptor -> Proposer messages
|
||||
#[derive(Debug)]
|
||||
pub enum AcceptorProposerMessage {
|
||||
Greeting(AcceptorGreeting),
|
||||
VoteResponse(VoteResponse),
|
||||
AppendResponse(AppendResponse),
|
||||
}
|
||||
|
||||
impl AcceptorProposerMessage {
|
||||
/// Serialize acceptor -> proposer message.
|
||||
pub fn serialize(&self, stream: &mut impl io::Write) -> Result<()> {
|
||||
match self {
|
||||
AcceptorProposerMessage::Greeting(msg) => {
|
||||
stream.write_u64::<LittleEndian>('g' as u64)?;
|
||||
msg.ser_into(stream)?;
|
||||
}
|
||||
AcceptorProposerMessage::VoteResponse(msg) => {
|
||||
stream.write_u64::<LittleEndian>('v' as u64)?;
|
||||
msg.ser_into(stream)?;
|
||||
}
|
||||
AcceptorProposerMessage::AppendResponse(msg) => {
|
||||
stream.write_u64::<LittleEndian>('a' as u64)?;
|
||||
msg.ser_into(stream)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub trait Storage {
|
||||
/// Persist safekeeper state on disk, optionally syncing it.
|
||||
fn persist(&mut self, s: &SafeKeeperState, sync: bool) -> Result<()>;
|
||||
/// Write piece of wal in buf to disk.
|
||||
fn write_wal(&mut self, s: &SafeKeeperState, startpos: Lsn, buf: &[u8]) -> Result<()>;
|
||||
}
|
||||
|
||||
/// SafeKeeper which consumes events (messages from compute) and provides
|
||||
/// replies.
|
||||
#[derive(Debug)]
|
||||
pub struct SafeKeeper<ST: Storage> {
|
||||
/// Locally flushed part of WAL (end_lsn of last record). Established by
|
||||
/// reading wal.
|
||||
pub flush_lsn: Lsn,
|
||||
pub tli: u32,
|
||||
pub flushed_restart_lsn: Lsn,
|
||||
pub storage: ST,
|
||||
pub s: SafeKeeperState, // persistent part
|
||||
pub elected_proposer_term: Term, // for monitoring/debugging
|
||||
}
|
||||
|
||||
impl<ST> SafeKeeper<ST>
|
||||
where
|
||||
ST: Storage,
|
||||
{
|
||||
// constructor
|
||||
pub fn new(flush_lsn: Lsn, tli: u32, storage: ST, state: SafeKeeperState) -> SafeKeeper<ST> {
|
||||
SafeKeeper {
|
||||
flush_lsn,
|
||||
tli,
|
||||
flushed_restart_lsn: Lsn(0),
|
||||
storage,
|
||||
s: state,
|
||||
elected_proposer_term: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Process message from proposer and possibly form reply. Concurrent
|
||||
/// callers must exclude each other.
|
||||
pub fn process_msg(
|
||||
&mut self,
|
||||
msg: &ProposerAcceptorMessage,
|
||||
) -> Result<AcceptorProposerMessage> {
|
||||
match msg {
|
||||
ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg),
|
||||
ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg),
|
||||
ProposerAcceptorMessage::AppendRequest(msg) => self.handle_append_request(msg),
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle initial message from proposer: check its sanity and send my
|
||||
/// current term.
|
||||
fn handle_greeting(&mut self, msg: &ProposerGreeting) -> Result<AcceptorProposerMessage> {
|
||||
/* Check protocol compatibility */
|
||||
if msg.protocol_version != SK_PROTOCOL_VERSION {
|
||||
bail!(
|
||||
"incompatible protocol version {}, expected {}",
|
||||
msg.protocol_version,
|
||||
SK_PROTOCOL_VERSION
|
||||
);
|
||||
}
|
||||
if self.s.server.system_id != 0 && self.s.server.system_id != msg.system_id {
|
||||
bail!(
|
||||
"system identifier changed: got {}, expected {}",
|
||||
msg.system_id,
|
||||
self.s.server.system_id,
|
||||
);
|
||||
}
|
||||
/* Postgres upgrade is not treated as fatal error */
|
||||
if msg.pg_version != self.s.server.pg_version
|
||||
&& self.s.server.pg_version != UNKNOWN_SERVER_VERSION
|
||||
{
|
||||
info!(
|
||||
"incompatible server version {}, expected {}",
|
||||
msg.pg_version, self.s.server.pg_version
|
||||
);
|
||||
}
|
||||
|
||||
// set basic info about server, if not yet
|
||||
self.s.server.system_id = msg.system_id;
|
||||
self.s.server.ztli = msg.ztli;
|
||||
self.s.server.tli = msg.tli;
|
||||
self.s.server.wal_seg_size = msg.wal_seg_size;
|
||||
self.s.proposer_uuid = msg.proposer_id;
|
||||
self.storage.persist(&self.s, true)?;
|
||||
|
||||
info!(
|
||||
"processed greeting from proposer {:?}, sending term {:?}",
|
||||
msg.proposer_id, self.s.acceptor_state.term
|
||||
);
|
||||
Ok(AcceptorProposerMessage::Greeting(AcceptorGreeting {
|
||||
term: self.s.acceptor_state.term,
|
||||
}))
|
||||
}
|
||||
|
||||
/// Give vote for the given term, if we haven't done that previously.
|
||||
fn handle_vote_request(&mut self, msg: &VoteRequest) -> Result<AcceptorProposerMessage> {
|
||||
// initialize with refusal
|
||||
let mut resp = VoteResponse {
|
||||
term: msg.term,
|
||||
vote_given: false as u64,
|
||||
epoch: 0,
|
||||
flush_lsn: Lsn(0),
|
||||
restart_lsn: Lsn(0),
|
||||
};
|
||||
if self.s.acceptor_state.term < msg.term {
|
||||
self.s.acceptor_state.term = msg.term;
|
||||
// persist vote before sending it out
|
||||
self.storage.persist(&self.s, true)?;
|
||||
resp.vote_given = true as u64;
|
||||
resp.epoch = self.s.acceptor_state.epoch;
|
||||
resp.flush_lsn = self.flush_lsn;
|
||||
resp.restart_lsn = self.s.restart_lsn;
|
||||
}
|
||||
info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
|
||||
Ok(AcceptorProposerMessage::VoteResponse(resp))
|
||||
}
|
||||
|
||||
/// Handle request to append WAL.
|
||||
fn handle_append_request(&mut self, msg: &AppendRequest) -> Result<AcceptorProposerMessage> {
|
||||
// log first AppendRequest from this proposer
|
||||
if self.elected_proposer_term < msg.h.term {
|
||||
info!(
|
||||
"start receiving WAL from timeline {} term {}",
|
||||
self.s.server.ztli, msg.h.term,
|
||||
);
|
||||
self.elected_proposer_term = msg.h.term;
|
||||
}
|
||||
|
||||
// If our term is lower than elected proposer one, bump it.
|
||||
if self.s.acceptor_state.term < msg.h.term {
|
||||
self.s.acceptor_state.term = msg.h.term;
|
||||
self.storage.persist(&self.s, true)?;
|
||||
}
|
||||
// OTOH, if it is higher, immediately refuse the message.
|
||||
else if self.s.acceptor_state.term > msg.h.term {
|
||||
let resp = AppendResponse {
|
||||
term: self.s.acceptor_state.term,
|
||||
epoch: self.s.acceptor_state.epoch,
|
||||
flush_lsn: Lsn(0),
|
||||
hs_feedback: HotStandbyFeedback::empty(),
|
||||
};
|
||||
return Ok(AcceptorProposerMessage::AppendResponse(resp));
|
||||
}
|
||||
|
||||
// do the job
|
||||
self.storage
|
||||
.write_wal(&self.s, msg.h.begin_lsn, &msg.wal_data)?;
|
||||
let mut sync_control_file = false;
|
||||
/*
|
||||
* Epoch switch happen when written WAL record cross the boundary.
|
||||
* The boundary is maximum of last WAL position at this node (FlushLSN) and global
|
||||
* maximum (vcl) determined by WAL proposer during handshake.
|
||||
* Switching epoch means that node completes recovery and start writing in the WAL new data.
|
||||
* XXX: this is wrong, we must actively truncate not matching part of log.
|
||||
*/
|
||||
if self.s.acceptor_state.epoch < msg.h.term
|
||||
&& msg.h.end_lsn > max(self.flush_lsn, msg.h.vcl)
|
||||
{
|
||||
info!("switched to new epoch {}", msg.h.term);
|
||||
self.s.acceptor_state.epoch = msg.h.term; /* bump epoch */
|
||||
sync_control_file = true;
|
||||
}
|
||||
if msg.h.end_lsn > self.flush_lsn {
|
||||
self.flush_lsn = msg.h.end_lsn;
|
||||
}
|
||||
|
||||
self.s.proposer_uuid = msg.h.proposer_uuid;
|
||||
self.s.commit_lsn = msg.h.commit_lsn;
|
||||
self.s.restart_lsn = msg.h.restart_lsn;
|
||||
|
||||
/*
|
||||
* Update restart LSN in control file.
|
||||
* To avoid negative impact on performance of extra fsync, do it only
|
||||
* when restart_lsn delta exceeds WAL segment size.
|
||||
*/
|
||||
sync_control_file |=
|
||||
self.flushed_restart_lsn + (self.s.server.wal_seg_size as u64) < self.s.restart_lsn;
|
||||
self.storage.persist(&self.s, sync_control_file)?;
|
||||
if sync_control_file {
|
||||
self.flushed_restart_lsn = self.s.restart_lsn;
|
||||
}
|
||||
|
||||
let resp = AppendResponse {
|
||||
term: self.s.acceptor_state.term,
|
||||
epoch: self.s.acceptor_state.epoch,
|
||||
flush_lsn: self.flush_lsn,
|
||||
// will be filled by caller code to avoid bothering safekeeper
|
||||
hs_feedback: HotStandbyFeedback::empty(),
|
||||
};
|
||||
trace!(
|
||||
"processed AppendRequest of len {}, flush_lsn={:X}/{:>08X}, resp {:?}",
|
||||
msg.wal_data.len(),
|
||||
(self.flush_lsn.0 >> 32) as u32,
|
||||
self.flush_lsn.0 as u32,
|
||||
&resp,
|
||||
);
|
||||
Ok(AcceptorProposerMessage::AppendResponse(resp))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
// fake storage for tests
|
||||
struct InMemoryStorage {
|
||||
persisted_state: SafeKeeperState,
|
||||
}
|
||||
|
||||
impl Storage for InMemoryStorage {
|
||||
fn persist(&mut self, s: &SafeKeeperState, _sync: bool) -> Result<()> {
|
||||
self.persisted_state = s.clone();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_wal(&mut self, _s: &SafeKeeperState, _startpos: Lsn, _buf: &[u8]) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_voting() {
|
||||
let storage = InMemoryStorage {
|
||||
persisted_state: SafeKeeperState::new(),
|
||||
};
|
||||
let mut sk = SafeKeeper::new(Lsn(0), 0, storage, SafeKeeperState::new());
|
||||
|
||||
// check voting for 1 is ok
|
||||
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
|
||||
let mut vote_resp = sk.process_msg(&vote_request);
|
||||
match vote_resp.unwrap() {
|
||||
AcceptorProposerMessage::VoteResponse(resp) => assert!(resp.vote_given != 0),
|
||||
_ => assert!(false),
|
||||
}
|
||||
|
||||
// reboot...
|
||||
let state = sk.storage.persisted_state.clone();
|
||||
let storage = InMemoryStorage {
|
||||
persisted_state: state.clone(),
|
||||
};
|
||||
sk = SafeKeeper::new(Lsn(0), 0, storage, state);
|
||||
|
||||
// and ensure voting second time for 1 is not ok
|
||||
vote_resp = sk.process_msg(&vote_request);
|
||||
match vote_resp.unwrap() {
|
||||
AcceptorProposerMessage::VoteResponse(resp) => assert!(resp.vote_given == 0),
|
||||
_ => assert!(false),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_epoch_switch() {
|
||||
let storage = InMemoryStorage {
|
||||
persisted_state: SafeKeeperState::new(),
|
||||
};
|
||||
let mut sk = SafeKeeper::new(Lsn(0), 0, storage, SafeKeeperState::new());
|
||||
|
||||
let mut ar_hdr = AppendRequestHeader {
|
||||
term: 1,
|
||||
vcl: Lsn(2),
|
||||
begin_lsn: Lsn(1),
|
||||
end_lsn: Lsn(2),
|
||||
commit_lsn: Lsn(0),
|
||||
restart_lsn: Lsn(0),
|
||||
proposer_uuid: [0; 16],
|
||||
};
|
||||
let mut append_request = AppendRequest {
|
||||
h: ar_hdr.clone(),
|
||||
wal_data: Bytes::from_static(b"b"),
|
||||
};
|
||||
|
||||
// check that AppendRequest before VCL doesn't switch epoch
|
||||
let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request));
|
||||
assert!(resp.is_ok());
|
||||
assert!(sk.storage.persisted_state.acceptor_state.epoch == 0);
|
||||
|
||||
// but record after VCL does the switch
|
||||
ar_hdr.begin_lsn = Lsn(2);
|
||||
ar_hdr.end_lsn = Lsn(3);
|
||||
append_request = AppendRequest {
|
||||
h: ar_hdr.clone(),
|
||||
wal_data: Bytes::from_static(b"b"),
|
||||
};
|
||||
let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request));
|
||||
assert!(resp.is_ok());
|
||||
assert!(sk.storage.persisted_state.acceptor_state.epoch == 1);
|
||||
}
|
||||
}
|
||||
@@ -21,6 +21,7 @@ pub struct SendWalHandler {
|
||||
pub conf: WalAcceptorConf,
|
||||
/// assigned application name
|
||||
pub appname: Option<String>,
|
||||
pub timelineid: Option<ZTimelineId>,
|
||||
pub timeline: Option<Arc<Timeline>>,
|
||||
}
|
||||
|
||||
@@ -29,7 +30,7 @@ impl postgres_backend::Handler for SendWalHandler {
|
||||
match sm.params.get("ztimelineid") {
|
||||
Some(ref ztimelineid) => {
|
||||
let ztlid = ZTimelineId::from_str(ztimelineid)?;
|
||||
self.timeline.set(ztlid)?;
|
||||
self.timelineid = Some(ztlid);
|
||||
}
|
||||
_ => bail!("timelineid is required"),
|
||||
}
|
||||
@@ -40,6 +41,16 @@ impl postgres_backend::Handler for SendWalHandler {
|
||||
}
|
||||
|
||||
fn process_query(&mut self, pgb: &mut PostgresBackend, query_string: Bytes) -> Result<()> {
|
||||
// START_WAL_PUSH is the only command that initializes the timeline
|
||||
if self.timeline.is_none() {
|
||||
if query_string.starts_with(b"START_WAL_PUSH") {
|
||||
self.timeline
|
||||
.set(&self.conf, self.timelineid.unwrap(), true)?;
|
||||
} else {
|
||||
self.timeline
|
||||
.set(&self.conf, self.timelineid.unwrap(), false)?;
|
||||
}
|
||||
}
|
||||
if query_string.starts_with(b"IDENTIFY_SYSTEM") {
|
||||
self.handle_identify_system(pgb)?;
|
||||
Ok(())
|
||||
@@ -60,6 +71,7 @@ impl SendWalHandler {
|
||||
SendWalHandler {
|
||||
conf,
|
||||
appname: None,
|
||||
timelineid: None,
|
||||
timeline: None,
|
||||
}
|
||||
}
|
||||
@@ -68,7 +80,7 @@ impl SendWalHandler {
|
||||
/// Handle IDENTIFY_SYSTEM replication command
|
||||
///
|
||||
fn handle_identify_system(&mut self, pgb: &mut PostgresBackend) -> Result<()> {
|
||||
let (start_pos, timeline) = self.timeline.find_end_of_wal(&self.conf.data_dir, false);
|
||||
let (start_pos, timeline) = self.timeline.get().get_end_of_wal();
|
||||
let lsn = start_pos.to_string();
|
||||
let tli = timeline.to_string();
|
||||
let sysid = self.timeline.get().get_info().server.system_id.to_string();
|
||||
|
||||
@@ -1,50 +1,77 @@
|
||||
//! This module contains tools for managing timelines.
|
||||
//!
|
||||
//! This module contains timeline id -> safekeeper state map with file-backed
|
||||
//! persistence and support for interaction between sending and receiving wal.
|
||||
|
||||
use anyhow::{bail, Result};
|
||||
use fs2::FileExt;
|
||||
use lazy_static::lazy_static;
|
||||
use log::*;
|
||||
use postgres_ffi::xlog_utils::{find_end_of_wal, TimeLineID};
|
||||
use postgres_ffi::xlog_utils::find_end_of_wal;
|
||||
use std::cmp::{max, min};
|
||||
use std::collections::HashMap;
|
||||
use std::fs::{self, File, OpenOptions};
|
||||
use std::io::{Seek, SeekFrom};
|
||||
use std::path::Path;
|
||||
use std::io::{Seek, SeekFrom, Write};
|
||||
use std::sync::{Arc, Condvar, Mutex};
|
||||
use zenith_utils::bin_ser::LeSer;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
use zenith_utils::zid::ZTimelineId;
|
||||
|
||||
use crate::receive_wal::{SafeKeeperInfo, CONTROL_FILE_NAME, SK_FORMAT_VERSION, SK_MAGIC};
|
||||
use crate::replication::{HotStandbyFeedback, END_REPLICATION_MARKER};
|
||||
use crate::safekeeper::{
|
||||
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, Storage,
|
||||
SK_FORMAT_VERSION, SK_MAGIC,
|
||||
};
|
||||
use crate::WalAcceptorConf;
|
||||
use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ};
|
||||
|
||||
const CONTROL_FILE_NAME: &str = "safekeeper.control";
|
||||
|
||||
/// Shared state associated with database instance (tenant)
|
||||
#[derive(Debug)]
|
||||
struct SharedState {
|
||||
/// quorum commit LSN
|
||||
/// Safekeeper object
|
||||
sk: SafeKeeper<FileStorage>,
|
||||
/// opened file control file handle (needed to hold exlusive file lock)
|
||||
control_file: File,
|
||||
/// For receiving-sending wal cooperation
|
||||
/// quorum commit LSN we've notified walsenders about
|
||||
commit_lsn: Lsn,
|
||||
/// information about this safekeeper
|
||||
info: SafeKeeperInfo,
|
||||
/// opened file control file handle (needed to hold exlusive file lock
|
||||
control_file: Option<File>,
|
||||
/// combined hot standby feedback from all replicas
|
||||
hs_feedback: HotStandbyFeedback,
|
||||
}
|
||||
|
||||
impl SharedState {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
/// Restore SharedState from control file. Locks the control file along the
|
||||
/// way to prevent running more than one instance of safekeeper on the same
|
||||
/// data dir.
|
||||
/// If create=false and file doesn't exist, bails out.
|
||||
fn create_restore(
|
||||
conf: &WalAcceptorConf,
|
||||
timelineid: ZTimelineId,
|
||||
create: bool,
|
||||
) -> Result<Self> {
|
||||
let (cf, state) = SharedState::load_control_file(conf, timelineid, create)?;
|
||||
let storage = FileStorage {
|
||||
control_file: cf.try_clone()?,
|
||||
conf: conf.clone(),
|
||||
};
|
||||
let (flush_lsn, tli) = if state.server.wal_seg_size != 0 {
|
||||
let wal_dir = conf.data_dir.join(format!("{}", timelineid));
|
||||
find_end_of_wal(&wal_dir, state.server.wal_seg_size as usize, true)
|
||||
} else {
|
||||
(0, 0)
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
commit_lsn: Lsn(0),
|
||||
info: SafeKeeperInfo::new(),
|
||||
control_file: None,
|
||||
sk: SafeKeeper::new(Lsn(flush_lsn), tli, storage, state),
|
||||
control_file: cf,
|
||||
hs_feedback: HotStandbyFeedback {
|
||||
ts: 0,
|
||||
xmin: u64::MAX,
|
||||
catalog_xmin: u64::MAX,
|
||||
},
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Accumulate hot standby feedbacks from replicas
|
||||
@@ -54,80 +81,75 @@ impl SharedState {
|
||||
self.hs_feedback.ts = max(self.hs_feedback.ts, feedback.ts);
|
||||
}
|
||||
|
||||
/// Load and lock control file (prevent running more than one instance of safekeeper)
|
||||
pub fn load_control_file(
|
||||
&mut self,
|
||||
/// Fetch and lock control file (prevent running more than one instance of safekeeper)
|
||||
/// If create=false and file doesn't exist, bails out.
|
||||
fn load_control_file(
|
||||
conf: &WalAcceptorConf,
|
||||
timelineid: ZTimelineId,
|
||||
) -> Result<()> {
|
||||
if self.control_file.is_some() {
|
||||
info!("control file for timeline {} is already open", timelineid);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
create: bool,
|
||||
) -> Result<(File, SafeKeeperState)> {
|
||||
let control_file_path = conf
|
||||
.data_dir
|
||||
.join(timelineid.to_string())
|
||||
.join(CONTROL_FILE_NAME);
|
||||
info!("loading control file {}", control_file_path.display());
|
||||
match OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true)
|
||||
.open(&control_file_path)
|
||||
{
|
||||
Ok(file) => {
|
||||
info!(
|
||||
"loading control file {}, create={}",
|
||||
control_file_path.display(),
|
||||
create
|
||||
);
|
||||
let mut opts = OpenOptions::new();
|
||||
opts.read(true).write(true);
|
||||
if create {
|
||||
opts.create(true);
|
||||
}
|
||||
match opts.open(&control_file_path) {
|
||||
Ok(mut file) => {
|
||||
// Lock file to prevent two or more active wal_acceptors
|
||||
match file.try_lock_exclusive() {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
bail!(
|
||||
"Control file {:?} is locked by some other process: {}",
|
||||
"control file {:?} is locked by some other process: {}",
|
||||
&control_file_path,
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
self.control_file = Some(file);
|
||||
|
||||
let cfile_ref = self.control_file.as_mut().unwrap();
|
||||
match SafeKeeperInfo::des_from(cfile_ref) {
|
||||
Err(e) => {
|
||||
warn!("read from {:?} failed: {}", control_file_path, e);
|
||||
// Empty file is legit on 'create', don't try to deser from it.
|
||||
if file.metadata().unwrap().len() == 0 {
|
||||
if !create {
|
||||
bail!("control file is empty");
|
||||
}
|
||||
Ok(info) => {
|
||||
if info.magic != SK_MAGIC {
|
||||
bail!("Invalid control file magic: {}", info.magic);
|
||||
return Ok((file, SafeKeeperState::new()));
|
||||
} else {
|
||||
match SafeKeeperState::des_from(&mut file) {
|
||||
Err(e) => {
|
||||
bail!("failed to read control file {:?}: {}", control_file_path, e);
|
||||
}
|
||||
if info.format_version != SK_FORMAT_VERSION {
|
||||
bail!(
|
||||
"Incompatible format version: {} vs. {}",
|
||||
info.format_version,
|
||||
SK_FORMAT_VERSION
|
||||
);
|
||||
Ok(s) => {
|
||||
if s.magic != SK_MAGIC {
|
||||
bail!("bad control file magic: {}", s.magic);
|
||||
}
|
||||
if s.format_version != SK_FORMAT_VERSION {
|
||||
bail!(
|
||||
"incompatible format version: {} vs. {}",
|
||||
s.format_version,
|
||||
SK_FORMAT_VERSION
|
||||
);
|
||||
}
|
||||
return Ok((file, s));
|
||||
}
|
||||
self.info = info;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
panic!(
|
||||
"Failed to open control file {:?}: {}",
|
||||
&control_file_path, e
|
||||
bail!(
|
||||
"failed to open control file {:?}: {}",
|
||||
&control_file_path,
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn save_control_file(&mut self, sync: bool) -> Result<()> {
|
||||
let file = self.control_file.as_mut().unwrap();
|
||||
file.seek(SeekFrom::Start(0))?;
|
||||
self.info.ser_into(file)?;
|
||||
if sync {
|
||||
file.sync_all()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -179,12 +201,32 @@ impl Timeline {
|
||||
self.notify_wal_senders(END_REPLICATION_MARKER);
|
||||
}
|
||||
|
||||
pub fn get_info(&self) -> SafeKeeperInfo {
|
||||
return self.mutex.lock().unwrap().info.clone();
|
||||
/// Pass arrived message to the safekeeper.
|
||||
pub fn process_msg(&self, msg: &ProposerAcceptorMessage) -> Result<AcceptorProposerMessage> {
|
||||
let mut rmsg: AcceptorProposerMessage;
|
||||
let commit_lsn: Lsn;
|
||||
{
|
||||
let mut shared_state = self.mutex.lock().unwrap();
|
||||
rmsg = shared_state.sk.process_msg(msg)?;
|
||||
// locally available commit lsn. flush_lsn can be smaller than
|
||||
// commit_lsn if we are catching up safekeeper.
|
||||
commit_lsn = min(shared_state.sk.flush_lsn, shared_state.sk.s.commit_lsn);
|
||||
|
||||
// if this is AppendResponse, fill in proper hot standby feedback
|
||||
match rmsg {
|
||||
AcceptorProposerMessage::AppendResponse(ref mut resp) => {
|
||||
resp.hs_feedback = shared_state.hs_feedback.clone();
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
// Ping wal sender that new data might be available.
|
||||
self.notify_wal_senders(commit_lsn);
|
||||
Ok(rmsg)
|
||||
}
|
||||
|
||||
pub fn set_info(&self, info: &SafeKeeperInfo) {
|
||||
self.mutex.lock().unwrap().info = info.clone();
|
||||
pub fn get_info(&self) -> SafeKeeperState {
|
||||
self.mutex.lock().unwrap().sk.s.clone()
|
||||
}
|
||||
|
||||
// Accumulate hot standby feedbacks from replicas
|
||||
@@ -198,44 +240,36 @@ impl Timeline {
|
||||
shared_state.hs_feedback.clone()
|
||||
}
|
||||
|
||||
pub fn load_control_file(&self, conf: &WalAcceptorConf) -> Result<()> {
|
||||
let mut shared_state = self.mutex.lock().unwrap();
|
||||
shared_state.load_control_file(conf, self.timelineid)
|
||||
}
|
||||
|
||||
pub fn save_control_file(&self, sync: bool) -> Result<()> {
|
||||
let mut shared_state = self.mutex.lock().unwrap();
|
||||
shared_state.save_control_file(sync)
|
||||
pub fn get_end_of_wal(&self) -> (Lsn, u32) {
|
||||
let shared_state = self.mutex.lock().unwrap();
|
||||
(shared_state.sk.flush_lsn, shared_state.sk.tli)
|
||||
}
|
||||
}
|
||||
|
||||
// Utilities needed by various Connection-like objects
|
||||
pub trait TimelineTools {
|
||||
fn set(&mut self, timeline_id: ZTimelineId) -> Result<()>;
|
||||
fn set(&mut self, conf: &WalAcceptorConf, timeline_id: ZTimelineId, create: bool)
|
||||
-> Result<()>;
|
||||
fn get(&self) -> &Arc<Timeline>;
|
||||
fn find_end_of_wal(&self, data_dir: &Path, precise: bool) -> (Lsn, TimeLineID);
|
||||
}
|
||||
|
||||
impl TimelineTools for Option<Arc<Timeline>> {
|
||||
fn set(&mut self, timeline_id: ZTimelineId) -> Result<()> {
|
||||
fn set(
|
||||
&mut self,
|
||||
conf: &WalAcceptorConf,
|
||||
timeline_id: ZTimelineId,
|
||||
create: bool,
|
||||
) -> Result<()> {
|
||||
// We will only set the timeline once. If it were to ever change,
|
||||
// anyone who cloned the Arc would be out of date.
|
||||
assert!(self.is_none());
|
||||
*self = Some(GlobalTimelines::store(timeline_id)?);
|
||||
*self = Some(GlobalTimelines::get(conf, timeline_id, create)?);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get(&self) -> &Arc<Timeline> {
|
||||
self.as_ref().unwrap()
|
||||
}
|
||||
|
||||
/// Find last WAL record. If "precise" is false then just locate last partial segment
|
||||
fn find_end_of_wal(&self, data_dir: &Path, precise: bool) -> (Lsn, TimeLineID) {
|
||||
let seg_size = self.get().get_info().server.wal_seg_size as usize;
|
||||
let wal_dir = data_dir.join(format!("{}", self.get().timelineid));
|
||||
let (lsn, timeline) = find_end_of_wal(&wal_dir, seg_size, precise);
|
||||
(Lsn(lsn), timeline)
|
||||
}
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
@@ -247,22 +281,143 @@ lazy_static! {
|
||||
struct GlobalTimelines;
|
||||
|
||||
impl GlobalTimelines {
|
||||
/// Store a new timeline into the global TIMELINES map.
|
||||
fn store(timeline_id: ZTimelineId) -> Result<Arc<Timeline>> {
|
||||
/// Get a timeline with control file loaded from the global TIMELINES map.
|
||||
/// If control file doesn't exist and create=false, bails out.
|
||||
pub fn get(
|
||||
conf: &WalAcceptorConf,
|
||||
timeline_id: ZTimelineId,
|
||||
create: bool,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
let mut timelines = TIMELINES.lock().unwrap();
|
||||
|
||||
match timelines.get(&timeline_id) {
|
||||
Some(result) => Ok(Arc::clone(result)),
|
||||
None => {
|
||||
info!("creating timeline dir {}", timeline_id);
|
||||
info!(
|
||||
"creating timeline dir {}, create is {}",
|
||||
timeline_id, create
|
||||
);
|
||||
fs::create_dir_all(timeline_id.to_string())?;
|
||||
|
||||
let shared_state = SharedState::new();
|
||||
let shared_state = SharedState::create_restore(conf, timeline_id, create)?;
|
||||
|
||||
let new_tid = Arc::new(Timeline::new(timeline_id, shared_state));
|
||||
timelines.insert(timeline_id, Arc::clone(&new_tid));
|
||||
Ok(new_tid)
|
||||
let new_tli = Arc::new(Timeline::new(timeline_id, shared_state));
|
||||
timelines.insert(timeline_id, Arc::clone(&new_tli));
|
||||
Ok(new_tli)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct FileStorage {
|
||||
control_file: File,
|
||||
conf: WalAcceptorConf,
|
||||
}
|
||||
|
||||
impl Storage for FileStorage {
|
||||
fn persist(&mut self, s: &SafeKeeperState, sync: bool) -> Result<()> {
|
||||
self.control_file.seek(SeekFrom::Start(0))?;
|
||||
s.ser_into(&mut self.control_file)?;
|
||||
if sync {
|
||||
self.control_file.sync_all()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_wal(&mut self, s: &SafeKeeperState, startpos: Lsn, buf: &[u8]) -> Result<()> {
|
||||
let mut bytes_left: usize = buf.len();
|
||||
let mut bytes_written: usize = 0;
|
||||
let mut partial;
|
||||
let mut start_pos = startpos;
|
||||
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
|
||||
let wal_seg_size = s.server.wal_seg_size as usize;
|
||||
let ztli = s.server.ztli;
|
||||
|
||||
/* Extract WAL location for this block */
|
||||
let mut xlogoff = start_pos.segment_offset(wal_seg_size) as usize;
|
||||
|
||||
while bytes_left != 0 {
|
||||
let bytes_to_write;
|
||||
|
||||
/*
|
||||
* If crossing a WAL boundary, only write up until we reach wal
|
||||
* segment size.
|
||||
*/
|
||||
if xlogoff + bytes_left > wal_seg_size {
|
||||
bytes_to_write = wal_seg_size - xlogoff;
|
||||
} else {
|
||||
bytes_to_write = bytes_left;
|
||||
}
|
||||
|
||||
/* Open file */
|
||||
let segno = start_pos.segment_number(wal_seg_size);
|
||||
// note: we basically don't support changing pg timeline
|
||||
let wal_file_name = XLogFileName(s.server.tli, segno, wal_seg_size);
|
||||
let wal_file_path = self
|
||||
.conf
|
||||
.data_dir
|
||||
.join(ztli.to_string())
|
||||
.join(wal_file_name.clone());
|
||||
let wal_file_partial_path = self
|
||||
.conf
|
||||
.data_dir
|
||||
.join(ztli.to_string())
|
||||
.join(wal_file_name.clone() + ".partial");
|
||||
|
||||
{
|
||||
let mut wal_file: File;
|
||||
/* Try to open already completed segment */
|
||||
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
|
||||
wal_file = file;
|
||||
partial = false;
|
||||
} else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path)
|
||||
{
|
||||
/* Try to open existed partial file */
|
||||
wal_file = file;
|
||||
partial = true;
|
||||
} else {
|
||||
/* Create and fill new partial file */
|
||||
partial = true;
|
||||
match OpenOptions::new()
|
||||
.create(true)
|
||||
.write(true)
|
||||
.open(&wal_file_partial_path)
|
||||
{
|
||||
Ok(mut file) => {
|
||||
for _ in 0..(wal_seg_size / XLOG_BLCKSZ) {
|
||||
file.write_all(&ZERO_BLOCK)?;
|
||||
}
|
||||
wal_file = file;
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to open log file {:?}: {}", &wal_file_path, e);
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
wal_file.seek(SeekFrom::Start(xlogoff as u64))?;
|
||||
wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?;
|
||||
|
||||
// Flush file, if not said otherwise
|
||||
if !self.conf.no_sync {
|
||||
wal_file.sync_all()?;
|
||||
}
|
||||
}
|
||||
/* Write was successful, advance our position */
|
||||
bytes_written += bytes_to_write;
|
||||
bytes_left -= bytes_to_write;
|
||||
start_pos += bytes_to_write as u64;
|
||||
xlogoff += bytes_to_write;
|
||||
|
||||
/* Did we reach the end of a WAL segment? */
|
||||
if start_pos.segment_offset(wal_seg_size) == 0 {
|
||||
xlogoff = 0;
|
||||
if partial {
|
||||
fs::rename(&wal_file_partial_path, &wal_file_path)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user