diff --git a/libs/postgres_ffi/src/xlog_utils.rs b/libs/postgres_ffi/src/xlog_utils.rs index 852b20eace..26f6b8df3b 100644 --- a/libs/postgres_ffi/src/xlog_utils.rs +++ b/libs/postgres_ffi/src/xlog_utils.rs @@ -14,7 +14,7 @@ use super::bindings::{ }; use super::wal_generator::LogicalMessageGenerator; use super::PG_MAJORVERSION; -use crate::pg_constants; +use crate::pg_constants::{self, XLOG_XACT_COMMIT, XLOG_XACT_COMMIT_PREPARED}; use crate::PG_TLI; use crate::{uint32, uint64, Oid}; use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ}; @@ -296,6 +296,13 @@ impl XLogRecord { pub fn is_xlog_switch_record(&self) -> bool { self.xl_info == pg_constants::XLOG_SWITCH && self.xl_rmid == pg_constants::RM_XLOG_ID } + + // Is this record a transaction commit? + pub fn is_xact_commit(&self) -> bool { + self.xl_rmid == pg_constants::RM_XACT_ID + && (self.xl_info & pg_constants::XLOG_XACT_OPMASK == XLOG_XACT_COMMIT + || self.xl_info & pg_constants::XLOG_XACT_OPMASK == XLOG_XACT_COMMIT_PREPARED) + } } impl XLogPageHeaderData { diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 47ebc21495..9f9ec63147 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -12,7 +12,7 @@ use bytes::Bytes; use camino::{Utf8Path, Utf8PathBuf}; use futures::future::BoxFuture; use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogFromFileName}; -use postgres_ffi::{dispatch_pgversion, XLogSegNo, PG_TLI}; +use postgres_ffi::{dispatch_pgversion, XLogRecord, XLogSegNo, PG_TLI, XLOG_SIZE_OF_XLOG_RECORD}; use remote_storage::RemotePath; use std::cmp::{max, min}; use std::future::Future; @@ -448,11 +448,23 @@ impl Storage for PhysicalStorage { // We may have flushed a previously written record. self.flush_record_lsn = self.write_record_lsn; } - while let Some((lsn, _rec)) = self.decoder.poll_decode()? { + let mut xact_commit = false; + while let Some((lsn, rec)) = self.decoder.poll_decode()? { self.write_record_lsn = lsn; if lsn <= self.flush_lsn { self.flush_record_lsn = lsn; } + // TODO: the decoder already has the record header, make it return it. + let header = XLogRecord::from_slice(&rec[0..XLOG_SIZE_OF_XLOG_RECORD]) + .expect("invalid record header"); + xact_commit = xact_commit || header.is_xact_commit(); + } + + // If a transaction committed, flush the WAL. This will emit an AppendResponse to the + // compute. Otherwise, with pipelined ingestion, the txn may have to wait until the next + // periodic flush in 1 second, causing commit latency. + if xact_commit { + self.flush_wal().await?; } Ok(())