Compare commits

...

2 Commits

Author SHA1 Message Date
Erik Grinaker
a81df96db5 safekeeper: flush WAL on transaction commit 2024-11-11 17:48:32 +01:00
Erik Grinaker
71f04f91c2 safekeeper: send AppendResponse on segment flush 2024-11-11 11:24:43 +01:00
4 changed files with 75 additions and 27 deletions

View File

@@ -14,7 +14,7 @@ use super::bindings::{
};
use super::wal_generator::LogicalMessageGenerator;
use super::PG_MAJORVERSION;
use crate::pg_constants;
use crate::pg_constants::{self, XLOG_XACT_COMMIT, XLOG_XACT_COMMIT_PREPARED};
use crate::PG_TLI;
use crate::{uint32, uint64, Oid};
use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
@@ -296,6 +296,13 @@ impl XLogRecord {
pub fn is_xlog_switch_record(&self) -> bool {
self.xl_info == pg_constants::XLOG_SWITCH && self.xl_rmid == pg_constants::RM_XLOG_ID
}
// Is this record a transaction commit?
pub fn is_xact_commit(&self) -> bool {
self.xl_rmid == pg_constants::RM_XACT_ID
&& (self.xl_info & pg_constants::XLOG_XACT_OPMASK == XLOG_XACT_COMMIT
|| self.xl_info & pg_constants::XLOG_XACT_OPMASK == XLOG_XACT_COMMIT_PREPARED)
}
}
impl XLogPageHeaderData {

View File

@@ -562,6 +562,9 @@ impl WalAcceptor {
// Don't flush the WAL on every append, only periodically via flush_ticker.
// This batches multiple appends per fsync. If the channel is empty after
// sending the reply, we'll schedule an immediate flush.
//
// Note that a flush can still happen on segment bounds, which will result
// in an AppendResponse.
if let ProposerAcceptorMessage::AppendRequest(append_request) = msg {
msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
dirty = true;

View File

@@ -947,6 +947,7 @@ where
// while first connection still gets some packets later. It might be
// better to not log this as error! above.
let write_lsn = self.wal_store.write_lsn();
let flush_lsn = self.wal_store.flush_lsn();
if write_lsn > msg.h.begin_lsn {
bail!(
"append request rewrites WAL written before, write_lsn={}, msg lsn={}",
@@ -1012,7 +1013,9 @@ where
);
// If flush_lsn hasn't updated, AppendResponse is not very useful.
if !require_flush {
// This is the common case for !require_flush, but a flush can still
// happen on segment bounds.
if flush_lsn == self.flush_lsn() {
return Ok(None);
}

View File

@@ -12,7 +12,7 @@ use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
use futures::future::BoxFuture;
use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogFromFileName};
use postgres_ffi::{dispatch_pgversion, XLogSegNo, PG_TLI};
use postgres_ffi::{dispatch_pgversion, XLogRecord, XLogSegNo, PG_TLI, XLOG_SIZE_OF_XLOG_RECORD};
use remote_storage::RemotePath;
use std::cmp::{max, min};
use std::future::Future;
@@ -113,6 +113,13 @@ pub struct PhysicalStorage {
/// non-aligned chunks of data.
write_record_lsn: Lsn,
/// The last LSN flushed to disk. May be in the middle of a record.
///
/// NB: when the rest of the system refers to `flush_lsn`, it usually
/// actually refers to `flush_record_lsn`. This ambiguity can be dangerous
/// and should be resolved.
flush_lsn: Lsn,
/// The LSN of the last WAL record flushed to disk.
flush_record_lsn: Lsn,
@@ -205,6 +212,7 @@ impl PhysicalStorage {
system_id: state.server.system_id,
write_lsn,
write_record_lsn: write_lsn,
flush_lsn,
flush_record_lsn: flush_lsn,
decoder: WalStreamDecoder::new(write_lsn, state.server.pg_version / 10000),
file: None,
@@ -288,8 +296,9 @@ impl PhysicalStorage {
}
}
/// Write WAL bytes, which are known to be located in a single WAL segment.
async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> {
/// Write WAL bytes, which are known to be located in a single WAL segment. Returns true if the
/// segment was completed, closed, and flushed to disk.
async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<bool> {
let mut file = if let Some(file) = self.file.take() {
file
} else {
@@ -313,20 +322,24 @@ impl PhysicalStorage {
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size);
fs::rename(wal_file_partial_path, wal_file_path).await?;
Ok(true)
} else {
// otherwise, file can be reused later
self.file = Some(file);
Ok(false)
}
Ok(())
}
/// Writes WAL to the segment files, until everything is writed. If some segments
/// are fully written, they are flushed to disk. The last (partial) segment can
/// be flushed separately later.
///
/// Updates `write_lsn`.
/// Updates `write_lsn` and `flush_lsn`.
async fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
// TODO: this shouldn't be possible, except possibly with write_lsn == 0.
// Rename this method to `append_exact`, and make it append-only, removing
// the `pos` parameter and this check. For this reason, we don't update
// `flush_lsn` here.
if self.write_lsn != pos {
// need to flush the file before discarding it
if let Some(file) = self.file.take() {
@@ -348,9 +361,13 @@ impl PhysicalStorage {
buf.len()
};
self.write_in_segment(segno, xlogoff, &buf[..bytes_write])
let flushed = self
.write_in_segment(segno, xlogoff, &buf[..bytes_write])
.await?;
self.write_lsn += bytes_write as u64;
if flushed {
self.flush_lsn = self.write_lsn;
}
buf = &buf[bytes_write..];
}
@@ -364,6 +381,9 @@ impl Storage for PhysicalStorage {
self.write_lsn
}
/// flush_lsn returns LSN of last durably stored WAL record.
///
/// TODO: flush_lsn() returns flush_record_lsn, but write_lsn() returns write_lsn: confusing.
#[allow(clippy::misnamed_getters)]
fn flush_lsn(&self) -> Lsn {
self.flush_record_lsn
}
@@ -410,8 +430,9 @@ impl Storage for PhysicalStorage {
self.metrics.observe_write_seconds(write_seconds);
self.metrics.observe_write_bytes(buf.len());
// figure out last record's end lsn for reporting (if we got the
// whole record)
// Figure out the last record's end LSN and update `write_record_lsn`
// (if we got a whole record). The write may also have closed and
// flushed a segment, so update `flush_record_lsn` as well.
if self.decoder.available() != startpos {
info!(
"restart decoder from {} to {}",
@@ -422,13 +443,28 @@ impl Storage for PhysicalStorage {
self.decoder = WalStreamDecoder::new(startpos, pg_version);
}
self.decoder.feed_bytes(buf);
loop {
match self.decoder.poll_decode()? {
None => break, // no full record yet
Some((lsn, _rec)) => {
self.write_record_lsn = lsn;
}
if self.write_record_lsn <= self.flush_lsn {
// We may have flushed a previously written record.
self.flush_record_lsn = self.write_record_lsn;
}
let mut xact_commit = false;
while let Some((lsn, rec)) = self.decoder.poll_decode()? {
self.write_record_lsn = lsn;
if lsn <= self.flush_lsn {
self.flush_record_lsn = lsn;
}
// TODO: the decoder already has the record header, make it return it.
let header = XLogRecord::from_slice(&rec[0..XLOG_SIZE_OF_XLOG_RECORD])
.expect("invalid record header");
xact_commit = xact_commit || header.is_xact_commit();
}
// If a transaction committed, flush the WAL. This will emit an AppendResponse to the
// compute. Otherwise, with pipelined ingestion, the txn may have to wait until the next
// periodic flush in 1 second, causing commit latency.
if xact_commit {
self.flush_wal().await?;
}
Ok(())
@@ -444,19 +480,17 @@ impl Storage for PhysicalStorage {
self.fdatasync_file(&unflushed_file).await?;
self.file = Some(unflushed_file);
} else {
// We have unflushed data (write_lsn != flush_lsn), but no file.
// This should only happen if last file was fully written and flushed,
// but haven't updated flush_lsn yet.
if self.write_lsn.segment_offset(self.wal_seg_size) != 0 {
bail!(
"unexpected unflushed data with no open file, write_lsn={}, flush_lsn={}",
self.write_lsn,
self.flush_record_lsn
);
}
// We have unflushed data (write_lsn != flush_lsn), but no file. This
// shouldn't happen, since the segment is flushed on close.
bail!(
"unexpected unflushed data with no open file, write_lsn={}, flush_lsn={}",
self.write_lsn,
self.flush_record_lsn
);
}
// everything is flushed now, let's update flush_lsn
self.flush_lsn = self.write_lsn;
self.flush_record_lsn = self.write_record_lsn;
Ok(())
}
@@ -515,6 +549,7 @@ impl Storage for PhysicalStorage {
// Update LSNs
self.write_lsn = end_pos;
self.write_record_lsn = end_pos;
self.flush_lsn = end_pos;
self.flush_record_lsn = end_pos;
self.is_truncated_after_restart = true;
Ok(())