diff --git a/Cargo.lock b/Cargo.lock index 1f4cb8f3d7..6924c0c74a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3762,6 +3762,7 @@ dependencies = [ "log", "once_cell", "postgres", + "postgres_ffi", "tempfile", ] diff --git a/libs/postgres_ffi/src/waldecoder.rs b/libs/postgres_ffi/src/waldecoder.rs index f7bd70653c..7a69f471d9 100644 --- a/libs/postgres_ffi/src/waldecoder.rs +++ b/libs/postgres_ffi/src/waldecoder.rs @@ -82,7 +82,17 @@ impl WalStreamDecoder { // that cross page boundaries. loop { // parse and verify page boundaries as we go - if self.lsn.segment_offset(pg_constants::WAL_SEGMENT_SIZE) == 0 { + if self.padlen > 0 { + // We should first skip padding, as we may have to skip some page headers if we're processing the XLOG_SWITCH record. + if self.inputbuf.remaining() < self.padlen as usize { + return Ok(None); + } + + // skip padding + self.inputbuf.advance(self.padlen as usize); + self.lsn += self.padlen as u64; + self.padlen = 0; + } else if self.lsn.segment_offset(pg_constants::WAL_SEGMENT_SIZE) == 0 { // parse long header if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD { @@ -128,15 +138,6 @@ impl WalStreamDecoder { self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64; continue; - } else if self.padlen > 0 { - if self.inputbuf.remaining() < self.padlen as usize { - return Ok(None); - } - - // skip padding - self.inputbuf.advance(self.padlen as usize); - self.lsn += self.padlen as u64; - self.padlen = 0; } else if self.contlen == 0 { assert!(self.recordbuf.is_empty()); diff --git a/libs/postgres_ffi/wal_generate/Cargo.toml b/libs/postgres_ffi/wal_generate/Cargo.toml index 7edb36937d..ce1a60c4f8 100644 --- a/libs/postgres_ffi/wal_generate/Cargo.toml +++ b/libs/postgres_ffi/wal_generate/Cargo.toml @@ -12,4 +12,5 @@ env_logger = "0.9" log = "0.4" once_cell = "1.8.0" postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } +postgres_ffi = { path = "../" } tempfile = "3.2" diff --git a/libs/postgres_ffi/wal_generate/src/bin/wal_generate.rs b/libs/postgres_ffi/wal_generate/src/bin/wal_generate.rs index 6ed34caf28..1549bfb505 100644 --- a/libs/postgres_ffi/wal_generate/src/bin/wal_generate.rs +++ b/libs/postgres_ffi/wal_generate/src/bin/wal_generate.rs @@ -14,6 +14,7 @@ fn main() -> Result<()> { .possible_values([ "simple", "last_wal_record_xlog_switch", + "last_wal_record_xlog_switch_ends_on_page_boundary", "last_wal_record_crossing_segment", "wal_record_crossing_segment_followed_by_small_one", ]) @@ -59,6 +60,9 @@ fn main() -> Result<()> { let lsn = match arg_matches.value_of("type").unwrap() { "simple" => generate_simple(client)?, "last_wal_record_xlog_switch" => generate_last_wal_record_xlog_switch(client)?, + "last_wal_record_xlog_switch_ends_on_page_boundary" => { + generate_last_wal_record_xlog_switch_ends_on_page_boundary(client)? + } "last_wal_record_crossing_segment" => { generate_last_wal_record_crossing_segment(client)? } diff --git a/libs/postgres_ffi/wal_generate/src/lib.rs b/libs/postgres_ffi/wal_generate/src/lib.rs index 01639ccfff..ac6fcc441d 100644 --- a/libs/postgres_ffi/wal_generate/src/lib.rs +++ b/libs/postgres_ffi/wal_generate/src/lib.rs @@ -4,6 +4,9 @@ use log::*; use once_cell::sync::Lazy; use postgres::types::PgLsn; use postgres::Client; +use postgres_ffi::xlog_utils::{ + XLOG_BLCKSZ, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD, +}; use std::cmp::Ordering; use std::fs; use std::path::{Path, PathBuf}; @@ -269,6 +272,70 @@ pub fn generate_last_wal_record_xlog_switch( Ok(next_segment) } +pub fn generate_last_wal_record_xlog_switch_ends_on_page_boundary( + client: &mut impl postgres::GenericClient, +) -> Result { + // Do not use generate_internal because here we end up with flush_lsn exactly on + // the segment boundary and insert_lsn after the initial page header, which is unusual. + ensure_server_config(client)?; + + client.execute("CREATE table t(x int)", &[])?; + + // Add padding so the XLOG_SWITCH record ends exactly on XLOG_BLCKSZ boundary. + // We will use logical message as the padding. We start with detecting how much WAL + // it takes for one logical message, considering all alignments and headers. + let base_wal_advance = { + let before_lsn = client.pg_current_wal_insert_lsn()?; + // Small non-empty message bigger than few bytes is more likely than an empty + // message to have the same format as the big padding message. + client.execute( + "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', 10))", + &[], + )?; + // The XLOG_SWITCH record has no data => its size is exactly XLOG_SIZE_OF_XLOG_RECORD. + (u64::from(client.pg_current_wal_insert_lsn()?) - u64::from(before_lsn)) as usize + + XLOG_SIZE_OF_XLOG_RECORD + }; + let mut remaining_lsn = + XLOG_BLCKSZ - u64::from(client.pg_current_wal_insert_lsn()?) as usize % XLOG_BLCKSZ; + if remaining_lsn < base_wal_advance { + remaining_lsn += XLOG_BLCKSZ; + } + let repeats = 10 + remaining_lsn - base_wal_advance; + info!( + "current_wal_insert_lsn={}, remaining_lsn={}, base_wal_advance={}, repeats={}", + client.pg_current_wal_insert_lsn()?, + remaining_lsn, + base_wal_advance, + repeats + ); + client.execute( + "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', $1))", + &[&(repeats as i32)], + )?; + info!( + "current_wal_insert_lsn={}, XLOG_SIZE_OF_XLOG_RECORD={}", + client.pg_current_wal_insert_lsn()?, + XLOG_SIZE_OF_XLOG_RECORD + ); + + // Emit the XLOG_SWITCH + let after_xlog_switch: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0); + let next_segment = PgLsn::from(0x0200_0000); + ensure!( + after_xlog_switch < next_segment, + "XLOG_SWITCH message ended on or after the expected segment boundary: {} > {}", + after_xlog_switch, + next_segment + ); + ensure!( + u64::from(after_xlog_switch) as usize % XLOG_BLCKSZ == XLOG_SIZE_OF_XLOG_SHORT_PHD, + "XLOG_SWITCH message ended not on page boundary: {}", + after_xlog_switch + ); + Ok(next_segment) +} + fn generate_single_logical_message( client: &mut impl postgres::GenericClient, transactional: bool, diff --git a/test_runner/batch_others/test_crafted_wal_end.py b/test_runner/batch_others/test_crafted_wal_end.py index c4674f802e..945dfffe4f 100644 --- a/test_runner/batch_others/test_crafted_wal_end.py +++ b/test_runner/batch_others/test_crafted_wal_end.py @@ -10,6 +10,7 @@ import pytest [ 'simple', 'last_wal_record_xlog_switch', + 'last_wal_record_xlog_switch_ends_on_page_boundary', 'last_wal_record_crossing_segment', 'wal_record_crossing_segment_followed_by_small_one', ])