safekeeper: flush WAL on transaction commit

This commit is contained in:
Erik Grinaker
2024-11-08 16:37:46 +01:00
parent 71f04f91c2
commit a81df96db5
2 changed files with 22 additions and 3 deletions

View File

@@ -14,7 +14,7 @@ use super::bindings::{
};
use super::wal_generator::LogicalMessageGenerator;
use super::PG_MAJORVERSION;
use crate::pg_constants;
use crate::pg_constants::{self, XLOG_XACT_COMMIT, XLOG_XACT_COMMIT_PREPARED};
use crate::PG_TLI;
use crate::{uint32, uint64, Oid};
use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
@@ -296,6 +296,13 @@ impl XLogRecord {
pub fn is_xlog_switch_record(&self) -> bool {
self.xl_info == pg_constants::XLOG_SWITCH && self.xl_rmid == pg_constants::RM_XLOG_ID
}
// Is this record a transaction commit?
pub fn is_xact_commit(&self) -> bool {
self.xl_rmid == pg_constants::RM_XACT_ID
&& (self.xl_info & pg_constants::XLOG_XACT_OPMASK == XLOG_XACT_COMMIT
|| self.xl_info & pg_constants::XLOG_XACT_OPMASK == XLOG_XACT_COMMIT_PREPARED)
}
}
impl XLogPageHeaderData {

View File

@@ -12,7 +12,7 @@ use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
use futures::future::BoxFuture;
use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogFromFileName};
use postgres_ffi::{dispatch_pgversion, XLogSegNo, PG_TLI};
use postgres_ffi::{dispatch_pgversion, XLogRecord, XLogSegNo, PG_TLI, XLOG_SIZE_OF_XLOG_RECORD};
use remote_storage::RemotePath;
use std::cmp::{max, min};
use std::future::Future;
@@ -448,11 +448,23 @@ impl Storage for PhysicalStorage {
// We may have flushed a previously written record.
self.flush_record_lsn = self.write_record_lsn;
}
while let Some((lsn, _rec)) = self.decoder.poll_decode()? {
let mut xact_commit = false;
while let Some((lsn, rec)) = self.decoder.poll_decode()? {
self.write_record_lsn = lsn;
if lsn <= self.flush_lsn {
self.flush_record_lsn = lsn;
}
// TODO: the decoder already has the record header, make it return it.
let header = XLogRecord::from_slice(&rec[0..XLOG_SIZE_OF_XLOG_RECORD])
.expect("invalid record header");
xact_commit = xact_commit || header.is_xact_commit();
}
// If a transaction committed, flush the WAL. This will emit an AppendResponse to the
// compute. Otherwise, with pipelined ingestion, the txn may have to wait until the next
// periodic flush in 1 second, causing commit latency.
if xact_commit {
self.flush_wal().await?;
}
Ok(())