Optimize WAL storage in safekeeper (#1318)

When several AppendRequest's can be read from socket without blocking,
they are processed together and fsync() to segment file is only called
once. Segment file is no longer opened for every write request, now
last opened file is cached inside PhysicalStorage. New metric for WAL
flushes was added to the storage, FLUSH_WAL_SECONDS. More errors were
added to storage for non-sequential WAL writes, now write_lsn can be
moved only with calls to truncate_lsn(new_lsn).

New messages have been added to ProposerAcceptorMessage enum. They
can't be deserialized directly and now are used only for optimizing
flushes. Existing protocol wasn't changed and flush will be called for
every AppendRequest, as it was before.
This commit is contained in:
Arthur Petukhovsky
2022-02-25 18:52:21 +03:00
committed by GitHub
parent 137d616e76
commit c8a1192b53
4 changed files with 392 additions and 206 deletions

View File

@@ -3,6 +3,7 @@
use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
use crate::receive_wal::ReceiveWalConn;
use crate::safekeeper::{AcceptorProposerMessage, ProposerAcceptorMessage};
use crate::send_wal::ReplicationConn;
use crate::timeline::{Timeline, TimelineTools};
use crate::SafeKeeperConf;
@@ -160,6 +161,17 @@ impl SafekeeperPostgresHandler {
}
}
/// Shortcut for calling `process_msg` in the timeline.
pub fn process_safekeeper_msg(
&self,
msg: &ProposerAcceptorMessage,
) -> Result<Option<AcceptorProposerMessage>> {
self.timeline
.get()
.process_msg(msg)
.context("failed to process ProposerAcceptorMessage")
}
///
/// Handle IDENTIFY_SYSTEM replication command
///

View File

@@ -2,15 +2,21 @@
//! Gets messages from the network, passes them down to consensus module and
//! sends replies back.
use anyhow::{bail, Context, Result};
use bytes::Bytes;
use anyhow::{anyhow, bail, Result};
use bytes::BytesMut;
use tokio::sync::mpsc::UnboundedSender;
use tracing::*;
use zenith_utils::sock_split::ReadStream;
use crate::timeline::Timeline;
use std::net::SocketAddr;
use std::sync::mpsc::channel;
use std::sync::mpsc::Receiver;
use std::sync::Arc;
use std::thread;
use crate::safekeeper::AcceptorProposerMessage;
use crate::safekeeper::ProposerAcceptorMessage;
@@ -46,21 +52,6 @@ impl<'pg> ReceiveWalConn<'pg> {
}
}
// Read and extract the bytes of a `CopyData` message from the postgres instance
fn read_msg_bytes(&mut self) -> Result<Bytes> {
match self.pg_backend.read_message()? {
Some(FeMessage::CopyData(bytes)) => Ok(bytes),
Some(msg) => bail!("expected `CopyData` message, found {:?}", msg),
None => bail!("connection closed unexpectedly"),
}
}
// Read and parse message sent from the postgres instance
fn read_msg(&mut self) -> Result<ProposerAcceptorMessage> {
let data = self.read_msg_bytes()?;
ProposerAcceptorMessage::parse(data)
}
// Send message to the postgres
fn write_msg(&mut self, msg: &AcceptorProposerMessage) -> Result<()> {
let mut buf = BytesMut::with_capacity(128);
@@ -77,18 +68,22 @@ impl<'pg> ReceiveWalConn<'pg> {
self.pg_backend
.write_message(&BeMessage::CopyBothResponse)?;
let r = self
.pg_backend
.take_stream_in()
.ok_or_else(|| anyhow!("failed to take read stream from pgbackend"))?;
let mut poll_reader = ProposerPollStream::new(r)?;
// Receive information about server
let mut msg = self
.read_msg()
.context("failed to receive proposer greeting")?;
match msg {
let next_msg = poll_reader.recv_msg()?;
match next_msg {
ProposerAcceptorMessage::Greeting(ref greeting) => {
info!(
"start handshake with wal proposer {} sysid {} timeline {}",
self.peer_addr, greeting.system_id, greeting.tli,
);
}
_ => bail!("unexpected message {:?} instead of greeting", msg),
_ => bail!("unexpected message {:?} instead of greeting", next_msg),
}
// Register the connection and defer unregister.
@@ -100,16 +95,97 @@ impl<'pg> ReceiveWalConn<'pg> {
callmemaybe_tx: spg.tx.clone(),
};
let mut next_msg = Some(next_msg);
loop {
let reply = spg
.timeline
.get()
.process_msg(&msg)
.context("failed to process ProposerAcceptorMessage")?;
if let Some(reply) = reply {
self.write_msg(&reply)?;
if matches!(next_msg, Some(ProposerAcceptorMessage::AppendRequest(_))) {
// poll AppendRequest's without blocking and write WAL to disk without flushing,
// while it's readily available
while let Some(ProposerAcceptorMessage::AppendRequest(append_request)) = next_msg {
let msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
let reply = spg.process_safekeeper_msg(&msg)?;
if let Some(reply) = reply {
self.write_msg(&reply)?;
}
next_msg = poll_reader.poll_msg();
}
// flush all written WAL to the disk
let reply = spg.process_safekeeper_msg(&ProposerAcceptorMessage::FlushWAL)?;
if let Some(reply) = reply {
self.write_msg(&reply)?;
}
} else if let Some(msg) = next_msg.take() {
// process other message
let reply = spg.process_safekeeper_msg(&msg)?;
if let Some(reply) = reply {
self.write_msg(&reply)?;
}
}
msg = self.read_msg()?;
// blocking wait for the next message
if next_msg.is_none() {
next_msg = Some(poll_reader.recv_msg()?);
}
}
}
}
struct ProposerPollStream {
msg_rx: Receiver<ProposerAcceptorMessage>,
read_thread: Option<thread::JoinHandle<Result<()>>>,
}
impl ProposerPollStream {
fn new(mut r: ReadStream) -> Result<Self> {
let (msg_tx, msg_rx) = channel();
let read_thread = thread::Builder::new()
.name("Read WAL thread".into())
.spawn(move || -> Result<()> {
loop {
let copy_data = match FeMessage::read(&mut r)? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(msg) => bail!("expected `CopyData` message, found {:?}", msg),
None => bail!("connection closed unexpectedly"),
};
let msg = ProposerAcceptorMessage::parse(copy_data)?;
msg_tx.send(msg)?;
}
// msg_tx will be dropped here, this will also close msg_rx
})?;
Ok(Self {
msg_rx,
read_thread: Some(read_thread),
})
}
fn recv_msg(&mut self) -> Result<ProposerAcceptorMessage> {
self.msg_rx.recv().map_err(|_| {
// return error from the read thread
let res = match self.read_thread.take() {
Some(thread) => thread.join(),
None => return anyhow!("read thread is gone"),
};
match res {
Ok(Ok(())) => anyhow!("unexpected result from read thread"),
Err(err) => anyhow!("read thread panicked: {:?}", err),
Ok(Err(err)) => err,
}
})
}
fn poll_msg(&mut self) -> Option<ProposerAcceptorMessage> {
let res = self.msg_rx.try_recv();
match res {
Err(_) => None,
Ok(msg) => Some(msg),
}
}
}

View File

@@ -301,6 +301,8 @@ pub enum ProposerAcceptorMessage {
VoteRequest(VoteRequest),
Elected(ProposerElected),
AppendRequest(AppendRequest),
NoFlushAppendRequest(AppendRequest),
FlushWAL,
}
impl ProposerAcceptorMessage {
@@ -499,7 +501,11 @@ where
ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg),
ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg),
ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg),
ProposerAcceptorMessage::AppendRequest(msg) => self.handle_append_request(msg),
ProposerAcceptorMessage::AppendRequest(msg) => self.handle_append_request(msg, true),
ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
self.handle_append_request(msg, false)
}
ProposerAcceptorMessage::FlushWAL => self.handle_flush(),
}
}
@@ -605,7 +611,10 @@ where
return Ok(None);
}
// truncate wal, update the lsns
// TODO: cross check divergence point, check if msg.start_streaming_at corresponds to
// intersection of our history and history from msg
// truncate wal, update the LSNs
self.wal_store.truncate_wal(msg.start_streaming_at)?;
// and now adopt term history from proposer
@@ -622,6 +631,7 @@ where
fn handle_append_request(
&mut self,
msg: &AppendRequest,
mut require_flush: bool,
) -> Result<Option<AcceptorProposerMessage>> {
if self.s.acceptor_state.term < msg.h.term {
bail!("got AppendRequest before ProposerElected");
@@ -650,9 +660,15 @@ where
if self.s.wal_start_lsn == Lsn(0) {
self.s.wal_start_lsn = msg.h.begin_lsn;
sync_control_file = true;
require_flush = true;
}
}
// flush wal to the disk, if required
if require_flush {
self.wal_store.flush_wal()?;
}
// Advance commit_lsn taking into account what we have locally.
// commit_lsn can be 0, being unknown to new walproposer while he hasn't
// collected majority of its epoch acks yet, ignore it in this case.
@@ -670,11 +686,9 @@ where
}
self.truncate_lsn = msg.h.truncate_lsn;
/*
* Update truncate and commit LSN in control file.
* To avoid negative impact on performance of extra fsync, do it only
* when truncate_lsn delta exceeds WAL segment size.
*/
// Update truncate and commit LSN in control file.
// To avoid negative impact on performance of extra fsync, do it only
// when truncate_lsn delta exceeds WAL segment size.
sync_control_file |=
self.s.truncate_lsn + (self.s.server.wal_seg_size as u64) < self.truncate_lsn;
if sync_control_file {
@@ -686,6 +700,11 @@ where
self.control_store.persist(&self.s)?;
}
// If flush_lsn hasn't updated, AppendResponse is not very useful.
if !require_flush {
return Ok(None);
}
let resp = self.append_response();
trace!(
"processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, resp {:?}",
@@ -697,6 +716,14 @@ where
);
Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
}
/// Flush WAL to disk. Return AppendResponse with latest LSNs.
fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
self.wal_store.flush_wal()?;
Ok(Some(AcceptorProposerMessage::AppendResponse(
self.append_response(),
)))
}
}
#[cfg(test)]
@@ -738,6 +765,10 @@ mod tests {
self.lsn = end_pos;
Ok(())
}
fn flush_wal(&mut self) -> Result<()> {
Ok(())
}
}
#[test]

View File

@@ -7,7 +7,7 @@
//!
//! Note that last file has `.partial` suffix, that's different from postgres.
use anyhow::{anyhow, Context, Result};
use anyhow::{anyhow, bail, Context, Result};
use std::io::{Read, Seek, SeekFrom};
use lazy_static::lazy_static;
@@ -58,12 +58,20 @@ lazy_static! {
DISK_WRITE_SECONDS_BUCKETS.to_vec()
)
.expect("Failed to register safekeeper_write_wal_seconds histogram vec");
static ref FLUSH_WAL_SECONDS: HistogramVec = register_histogram_vec!(
"safekeeper_flush_wal_seconds",
"Seconds spent syncing WAL to a disk, grouped by timeline",
&["tenant_id", "timeline_id"],
DISK_WRITE_SECONDS_BUCKETS.to_vec()
)
.expect("Failed to register safekeeper_flush_wal_seconds histogram vec");
}
struct WalStorageMetrics {
flush_lsn: Gauge,
write_wal_bytes: Histogram,
write_wal_seconds: Histogram,
flush_wal_seconds: Histogram,
}
impl WalStorageMetrics {
@@ -74,24 +82,38 @@ impl WalStorageMetrics {
flush_lsn: FLUSH_LSN_GAUGE.with_label_values(&[&tenant_id, &timeline_id]),
write_wal_bytes: WRITE_WAL_BYTES.with_label_values(&[&tenant_id, &timeline_id]),
write_wal_seconds: WRITE_WAL_SECONDS.with_label_values(&[&tenant_id, &timeline_id]),
flush_wal_seconds: FLUSH_WAL_SECONDS.with_label_values(&[&tenant_id, &timeline_id]),
}
}
}
pub trait Storage {
/// lsn of last durably stored WAL record.
/// LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn;
/// Init storage with wal_seg_size and read WAL from disk to get latest lsn.
/// Init storage with wal_seg_size and read WAL from disk to get latest LSN.
fn init_storage(&mut self, state: &SafeKeeperState) -> Result<()>;
/// Write piece of wal in buf to disk and sync it.
/// Write piece of WAL from buf to disk, but not necessarily sync it.
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>;
// Truncate WAL at specified LSN.
/// Truncate WAL at specified LSN, which must be the end of WAL record.
fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()>;
/// Durably store WAL on disk, up to the last written WAL record.
fn flush_wal(&mut self) -> Result<()>;
}
/// PhysicalStorage is a storage that stores WAL on disk. Writes are separated from flushes
/// for better performance. Storage must be initialized before use.
///
/// WAL is stored in segments, each segment is a file. Last segment has ".partial" suffix in
/// its filename and may be not fully flushed.
///
/// Relationship of LSNs:
/// `write_lsn` >= `write_record_lsn` >= `flush_record_lsn`
///
/// When storage is just created, all LSNs are zeroes and there are no segments on disk.
pub struct PhysicalStorage {
metrics: WalStorageMetrics,
zttid: ZTenantTimelineId,
@@ -99,27 +121,29 @@ pub struct PhysicalStorage {
conf: SafeKeeperConf,
// fields below are filled upon initialization
// None if unitialized, Some(lsn) if storage is initialized
/// None if unitialized, Some(usize) if storage is initialized.
wal_seg_size: Option<usize>,
// Relationship of lsns:
// `write_lsn` >= `write_record_lsn` >= `flush_record_lsn`
//
// All lsns are zeroes, if storage is just created, and there are no segments on disk.
// Written to disk, but possibly still in the cache and not fully persisted.
// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
/// Written to disk, but possibly still in the cache and not fully persisted.
/// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
write_lsn: Lsn,
// The LSN of the last WAL record written to disk. Still can be not fully flushed.
/// The LSN of the last WAL record written to disk. Still can be not fully flushed.
write_record_lsn: Lsn,
// The LSN of the last WAL record flushed to disk.
/// The LSN of the last WAL record flushed to disk.
flush_record_lsn: Lsn,
// Decoder is required for detecting boundaries of WAL records.
/// Decoder is required for detecting boundaries of WAL records.
decoder: WalStreamDecoder,
/// Cached open file for the last segment.
///
/// If Some(file) is open, then it always:
/// - has ".partial" suffix
/// - points to write_lsn, so no seek is needed for writing
/// - doesn't point to the end of the segment
file: Option<File>,
}
impl PhysicalStorage {
@@ -135,128 +159,146 @@ impl PhysicalStorage {
write_record_lsn: Lsn(0),
flush_record_lsn: Lsn(0),
decoder: WalStreamDecoder::new(Lsn(0)),
file: None,
}
}
// wrapper for flush_lsn updates that also updates metrics
/// Wrapper for flush_lsn updates that also updates metrics.
fn update_flush_lsn(&mut self) {
self.flush_record_lsn = self.write_record_lsn;
self.metrics.flush_lsn.set(self.flush_record_lsn.0 as f64);
}
/// Helper returning full path to WAL segment file and its .partial brother.
fn wal_file_paths(&self, segno: XLogSegNo) -> Result<(PathBuf, PathBuf)> {
let wal_seg_size = self
.wal_seg_size
.ok_or_else(|| anyhow!("wal_seg_size is not initialized"))?;
let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
let wal_file_path = self.timeline_dir.join(wal_file_name.clone());
let wal_file_partial_path = self.timeline_dir.join(wal_file_name + ".partial");
Ok((wal_file_path, wal_file_partial_path))
/// Call fdatasync if config requires so.
fn fdatasync_file(&self, file: &mut File) -> Result<()> {
if !self.conf.no_sync {
self.metrics
.flush_wal_seconds
.observe_closure_duration(|| file.sync_data())?;
}
Ok(())
}
// TODO: this function is going to be refactored soon, what will change:
// - flush will be called separately from write_wal, this function
// will only write bytes to disk
// - File will be cached in PhysicalStorage, to remove extra syscalls,
// such as open(), seek(), close()
fn write_and_flush(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
/// Call fsync if config requires so.
fn fsync_file(&self, file: &mut File) -> Result<()> {
if !self.conf.no_sync {
self.metrics
.flush_wal_seconds
.observe_closure_duration(|| file.sync_all())?;
}
Ok(())
}
/// Open or create WAL segment file. Caller must call seek to the wanted position.
/// Returns `file` and `is_partial`.
fn open_or_create(&self, segno: XLogSegNo, wal_seg_size: usize) -> Result<(File, bool)> {
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
// Try to open already completed segment
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
Ok((file, false))
} else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) {
// Try to open existing partial file
Ok((file, true))
} else {
// Create and fill new partial file
let mut file = OpenOptions::new()
.create(true)
.write(true)
.open(&wal_file_partial_path)
.with_context(|| format!("Failed to open log file {:?}", &wal_file_path))?;
write_zeroes(&mut file, wal_seg_size)?;
self.fsync_file(&mut file)?;
Ok((file, true))
}
}
/// Write WAL bytes, which are known to be located in a single WAL segment.
fn write_in_segment(
&mut self,
segno: u64,
xlogoff: usize,
buf: &[u8],
wal_seg_size: usize,
) -> Result<()> {
let mut file = if let Some(file) = self.file.take() {
file
} else {
let (mut file, is_partial) = self.open_or_create(segno, wal_seg_size)?;
assert!(is_partial, "unexpected write into non-partial segment file");
file.seek(SeekFrom::Start(xlogoff as u64))?;
file
};
file.write_all(buf)?;
if xlogoff + buf.len() == wal_seg_size {
// If we reached the end of a WAL segment, flush and close it.
self.fdatasync_file(&mut file)?;
// Rename partial file to completed file
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
fs::rename(&wal_file_partial_path, &wal_file_path)?;
} else {
// otherwise, file can be reused later
self.file = Some(file);
}
Ok(())
}
/// Writes WAL to the segment files, until everything is writed. If some segments
/// are fully written, they are flushed to disk. The last (partial) segment can
/// be flushed separately later.
///
/// Updates `write_lsn`.
fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
let wal_seg_size = self
.wal_seg_size
.ok_or_else(|| anyhow!("wal_seg_size is not initialized"))?;
let mut bytes_left: usize = buf.len();
let mut bytes_written: usize = 0;
let mut partial;
let mut start_pos = startpos;
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
/* Extract WAL location for this block */
let mut xlogoff = start_pos.segment_offset(wal_seg_size) as usize;
while bytes_left != 0 {
let bytes_to_write;
/*
* If crossing a WAL boundary, only write up until we reach wal
* segment size.
*/
if xlogoff + bytes_left > wal_seg_size {
bytes_to_write = wal_seg_size - xlogoff;
} else {
bytes_to_write = bytes_left;
if self.write_lsn != pos {
// need to flush the file before discarding it
if let Some(mut file) = self.file.take() {
self.fdatasync_file(&mut file)?;
}
/* Open file */
let segno = start_pos.segment_number(wal_seg_size);
let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno)?;
{
let mut wal_file: File;
/* Try to open already completed segment */
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
wal_file = file;
partial = false;
} else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path)
{
/* Try to open existed partial file */
wal_file = file;
partial = true;
} else {
/* Create and fill new partial file */
partial = true;
match OpenOptions::new()
.create(true)
.write(true)
.open(&wal_file_partial_path)
{
Ok(mut file) => {
for _ in 0..(wal_seg_size / XLOG_BLCKSZ) {
file.write_all(ZERO_BLOCK)?;
}
wal_file = file;
}
Err(e) => {
error!("Failed to open log file {:?}: {}", &wal_file_path, e);
return Err(e.into());
}
}
}
wal_file.seek(SeekFrom::Start(xlogoff as u64))?;
wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?;
// Flush file, if not said otherwise
if !self.conf.no_sync {
wal_file.sync_all()?;
}
}
/* Write was successful, advance our position */
bytes_written += bytes_to_write;
bytes_left -= bytes_to_write;
start_pos += bytes_to_write as u64;
xlogoff += bytes_to_write;
/* Did we reach the end of a WAL segment? */
if start_pos.segment_offset(wal_seg_size) == 0 {
xlogoff = 0;
if partial {
fs::rename(&wal_file_partial_path, &wal_file_path)?;
}
}
self.write_lsn = pos;
}
while !buf.is_empty() {
// Extract WAL location for this block
let xlogoff = self.write_lsn.segment_offset(wal_seg_size) as usize;
let segno = self.write_lsn.segment_number(wal_seg_size);
// If crossing a WAL boundary, only write up until we reach wal segment size.
let bytes_write = if xlogoff + buf.len() > wal_seg_size {
wal_seg_size - xlogoff
} else {
buf.len()
};
self.write_in_segment(segno, xlogoff, &buf[..bytes_write], wal_seg_size)?;
self.write_lsn += bytes_write as u64;
buf = &buf[bytes_write..];
}
Ok(())
}
}
impl Storage for PhysicalStorage {
// flush_lsn returns lsn of last durably stored WAL record.
/// flush_lsn returns LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn {
self.flush_record_lsn
}
// Storage needs to know wal_seg_size to know which segment to read/write, but
// wal_seg_size is not always known at the moment of storage creation. This method
// allows to postpone its initialization.
/// Storage needs to know wal_seg_size to know which segment to read/write, but
/// wal_seg_size is not always known at the moment of storage creation. This method
/// allows to postpone its initialization.
fn init_storage(&mut self, state: &SafeKeeperState) -> Result<()> {
if state.server.wal_seg_size == 0 {
// wal_seg_size is still unknown
@@ -294,29 +336,31 @@ impl Storage for PhysicalStorage {
Ok(())
}
// Write and flush WAL to disk.
/// Write WAL to disk.
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
// Disallow any non-sequential writes, which can result in gaps or overwrites.
// If we need to move the pointer, use truncate_wal() instead.
if self.write_lsn > startpos {
warn!(
bail!(
"write_wal rewrites WAL written before, write_lsn={}, startpos={}",
self.write_lsn, startpos
self.write_lsn,
startpos
);
}
if self.write_lsn < startpos {
warn!(
if self.write_lsn < startpos && self.write_lsn != Lsn(0) {
bail!(
"write_wal creates gap in written WAL, write_lsn={}, startpos={}",
self.write_lsn, startpos
self.write_lsn,
startpos
);
// TODO: return error if write_lsn is not zero
}
{
let _timer = self.metrics.write_wal_seconds.start_timer();
self.write_and_flush(startpos, buf)?;
self.write_exact(startpos, buf)?;
}
// WAL is written and flushed, updating lsns
self.write_lsn = startpos + buf.len() as u64;
// WAL is written, updating write metrics
self.metrics.write_wal_bytes.observe(buf.len() as f64);
// figure out last record's end lsn for reporting (if we got the
@@ -339,69 +383,67 @@ impl Storage for PhysicalStorage {
}
}
Ok(())
}
fn flush_wal(&mut self) -> Result<()> {
if self.flush_record_lsn == self.write_record_lsn {
// no need to do extra flush
return Ok(());
}
if let Some(mut unflushed_file) = self.file.take() {
self.fdatasync_file(&mut unflushed_file)?;
self.file = Some(unflushed_file);
} else {
// We have unflushed data (write_lsn != flush_lsn), but no file.
// This should only happen if last file was fully written and flushed,
// but haven't updated flush_lsn yet.
assert!(self.write_lsn.segment_offset(self.wal_seg_size.unwrap()) == 0);
}
// everything is flushed now, let's update flush_lsn
self.update_flush_lsn();
Ok(())
}
// Truncate written WAL by removing all WAL segments after the given LSN.
// end_pos must point to the end of the WAL record.
/// Truncate written WAL by removing all WAL segments after the given LSN.
/// end_pos must point to the end of the WAL record.
fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
let wal_seg_size = self
.wal_seg_size
.ok_or_else(|| anyhow!("wal_seg_size is not initialized"))?;
// TODO: cross check divergence point
// nothing to truncate
if self.write_lsn == Lsn(0) {
return Ok(());
}
// Streaming must not create a hole, so truncate cannot be called on non-written lsn
assert!(self.write_lsn >= end_pos);
assert!(self.write_lsn == Lsn(0) || self.write_lsn >= end_pos);
// open segment files and delete or fill end with zeroes
let partial;
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
/* Extract WAL location for this block */
let mut xlogoff = end_pos.segment_offset(wal_seg_size) as usize;
/* Open file */
let mut segno = end_pos.segment_number(wal_seg_size);
let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno)?;
{
let mut wal_file: File;
/* Try to open already completed segment */
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
wal_file = file;
partial = false;
} else {
wal_file = OpenOptions::new()
.write(true)
.open(&wal_file_partial_path)?;
partial = true;
}
wal_file.seek(SeekFrom::Start(xlogoff as u64))?;
while xlogoff < wal_seg_size {
let bytes_to_write = min(XLOG_BLCKSZ, wal_seg_size - xlogoff);
wal_file.write_all(&ZERO_BLOCK[0..bytes_to_write])?;
xlogoff += bytes_to_write;
}
// Flush file, if not said otherwise
if !self.conf.no_sync {
wal_file.sync_all()?;
}
// Close previously opened file, if any
if let Some(mut unflushed_file) = self.file.take() {
self.fdatasync_file(&mut unflushed_file)?;
}
if !partial {
let xlogoff = end_pos.segment_offset(wal_seg_size) as usize;
let segno = end_pos.segment_number(wal_seg_size);
let (mut file, is_partial) = self.open_or_create(segno, wal_seg_size)?;
// Fill end with zeroes
file.seek(SeekFrom::Start(xlogoff as u64))?;
write_zeroes(&mut file, wal_seg_size - xlogoff)?;
self.fdatasync_file(&mut file)?;
if !is_partial {
// Make segment partial once again
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
fs::rename(&wal_file_path, &wal_file_partial_path)?;
}
// Remove all subsequent segments
let mut segno = segno;
loop {
segno += 1;
let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno)?;
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
// TODO: better use fs::try_exists which is currenty avaialble only in nightly build
if wal_file_path.exists() {
fs::remove_file(&wal_file_path)?;
@@ -412,7 +454,7 @@ impl Storage for PhysicalStorage {
}
}
// Update lsns
// Update LSNs
self.write_lsn = end_pos;
self.write_record_lsn = end_pos;
self.update_flush_lsn();
@@ -491,3 +533,28 @@ impl WalReader {
})
}
}
/// Zero block for filling created WAL segments.
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
/// Helper for filling file with zeroes.
fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> {
while count >= XLOG_BLCKSZ {
file.write_all(ZERO_BLOCK)?;
count -= XLOG_BLCKSZ;
}
file.write_all(&ZERO_BLOCK[0..count])?;
Ok(())
}
/// Helper returning full path to WAL segment file and its .partial brother.
fn wal_file_paths(
timeline_dir: &Path,
segno: XLogSegNo,
wal_seg_size: usize,
) -> Result<(PathBuf, PathBuf)> {
let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
let wal_file_path = timeline_dir.join(wal_file_name.clone());
let wal_file_partial_path = timeline_dir.join(wal_file_name + ".partial");
Ok((wal_file_path, wal_file_partial_path))
}