mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-06 04:52:55 +00:00
Test what happens when XLOG_SWITCH ends on page boundary, fix #1991
This commit is contained in:
committed by
Egor Suvorov
parent
85bda437de
commit
80b7a3b51a
@@ -82,7 +82,17 @@ impl WalStreamDecoder {
|
||||
// that cross page boundaries.
|
||||
loop {
|
||||
// parse and verify page boundaries as we go
|
||||
if self.lsn.segment_offset(pg_constants::WAL_SEGMENT_SIZE) == 0 {
|
||||
if self.padlen > 0 {
|
||||
// We should first skip padding, as we may have to skip some page headers if we're processing the XLOG_SWITCH record.
|
||||
if self.inputbuf.remaining() < self.padlen as usize {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// skip padding
|
||||
self.inputbuf.advance(self.padlen as usize);
|
||||
self.lsn += self.padlen as u64;
|
||||
self.padlen = 0;
|
||||
} else if self.lsn.segment_offset(pg_constants::WAL_SEGMENT_SIZE) == 0 {
|
||||
// parse long header
|
||||
|
||||
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD {
|
||||
@@ -128,15 +138,6 @@ impl WalStreamDecoder {
|
||||
|
||||
self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
|
||||
continue;
|
||||
} else if self.padlen > 0 {
|
||||
if self.inputbuf.remaining() < self.padlen as usize {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// skip padding
|
||||
self.inputbuf.advance(self.padlen as usize);
|
||||
self.lsn += self.padlen as u64;
|
||||
self.padlen = 0;
|
||||
} else if self.contlen == 0 {
|
||||
assert!(self.recordbuf.is_empty());
|
||||
|
||||
|
||||
@@ -12,4 +12,5 @@ env_logger = "0.9"
|
||||
log = "0.4"
|
||||
once_cell = "1.8.0"
|
||||
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
|
||||
postgres_ffi = { path = "../" }
|
||||
tempfile = "3.2"
|
||||
|
||||
@@ -14,6 +14,7 @@ fn main() -> Result<()> {
|
||||
.possible_values([
|
||||
"simple",
|
||||
"last_wal_record_xlog_switch",
|
||||
"last_wal_record_xlog_switch_ends_on_page_boundary",
|
||||
"last_wal_record_crossing_segment",
|
||||
"wal_record_crossing_segment_followed_by_small_one",
|
||||
])
|
||||
@@ -59,6 +60,9 @@ fn main() -> Result<()> {
|
||||
let lsn = match arg_matches.value_of("type").unwrap() {
|
||||
"simple" => generate_simple(client)?,
|
||||
"last_wal_record_xlog_switch" => generate_last_wal_record_xlog_switch(client)?,
|
||||
"last_wal_record_xlog_switch_ends_on_page_boundary" => {
|
||||
generate_last_wal_record_xlog_switch_ends_on_page_boundary(client)?
|
||||
}
|
||||
"last_wal_record_crossing_segment" => {
|
||||
generate_last_wal_record_crossing_segment(client)?
|
||||
}
|
||||
|
||||
@@ -4,6 +4,9 @@ use log::*;
|
||||
use once_cell::sync::Lazy;
|
||||
use postgres::types::PgLsn;
|
||||
use postgres::Client;
|
||||
use postgres_ffi::xlog_utils::{
|
||||
XLOG_BLCKSZ, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
|
||||
};
|
||||
use std::cmp::Ordering;
|
||||
use std::fs;
|
||||
use std::path::{Path, PathBuf};
|
||||
@@ -269,6 +272,70 @@ pub fn generate_last_wal_record_xlog_switch(
|
||||
Ok(next_segment)
|
||||
}
|
||||
|
||||
pub fn generate_last_wal_record_xlog_switch_ends_on_page_boundary(
|
||||
client: &mut impl postgres::GenericClient,
|
||||
) -> Result<PgLsn> {
|
||||
// Do not use generate_internal because here we end up with flush_lsn exactly on
|
||||
// the segment boundary and insert_lsn after the initial page header, which is unusual.
|
||||
ensure_server_config(client)?;
|
||||
|
||||
client.execute("CREATE table t(x int)", &[])?;
|
||||
|
||||
// Add padding so the XLOG_SWITCH record ends exactly on XLOG_BLCKSZ boundary.
|
||||
// We will use logical message as the padding. We start with detecting how much WAL
|
||||
// it takes for one logical message, considering all alignments and headers.
|
||||
let base_wal_advance = {
|
||||
let before_lsn = client.pg_current_wal_insert_lsn()?;
|
||||
// Small non-empty message bigger than few bytes is more likely than an empty
|
||||
// message to have the same format as the big padding message.
|
||||
client.execute(
|
||||
"SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', 10))",
|
||||
&[],
|
||||
)?;
|
||||
// The XLOG_SWITCH record has no data => its size is exactly XLOG_SIZE_OF_XLOG_RECORD.
|
||||
(u64::from(client.pg_current_wal_insert_lsn()?) - u64::from(before_lsn)) as usize
|
||||
+ XLOG_SIZE_OF_XLOG_RECORD
|
||||
};
|
||||
let mut remaining_lsn =
|
||||
XLOG_BLCKSZ - u64::from(client.pg_current_wal_insert_lsn()?) as usize % XLOG_BLCKSZ;
|
||||
if remaining_lsn < base_wal_advance {
|
||||
remaining_lsn += XLOG_BLCKSZ;
|
||||
}
|
||||
let repeats = 10 + remaining_lsn - base_wal_advance;
|
||||
info!(
|
||||
"current_wal_insert_lsn={}, remaining_lsn={}, base_wal_advance={}, repeats={}",
|
||||
client.pg_current_wal_insert_lsn()?,
|
||||
remaining_lsn,
|
||||
base_wal_advance,
|
||||
repeats
|
||||
);
|
||||
client.execute(
|
||||
"SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', $1))",
|
||||
&[&(repeats as i32)],
|
||||
)?;
|
||||
info!(
|
||||
"current_wal_insert_lsn={}, XLOG_SIZE_OF_XLOG_RECORD={}",
|
||||
client.pg_current_wal_insert_lsn()?,
|
||||
XLOG_SIZE_OF_XLOG_RECORD
|
||||
);
|
||||
|
||||
// Emit the XLOG_SWITCH
|
||||
let after_xlog_switch: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
|
||||
let next_segment = PgLsn::from(0x0200_0000);
|
||||
ensure!(
|
||||
after_xlog_switch < next_segment,
|
||||
"XLOG_SWITCH message ended on or after the expected segment boundary: {} > {}",
|
||||
after_xlog_switch,
|
||||
next_segment
|
||||
);
|
||||
ensure!(
|
||||
u64::from(after_xlog_switch) as usize % XLOG_BLCKSZ == XLOG_SIZE_OF_XLOG_SHORT_PHD,
|
||||
"XLOG_SWITCH message ended not on page boundary: {}",
|
||||
after_xlog_switch
|
||||
);
|
||||
Ok(next_segment)
|
||||
}
|
||||
|
||||
fn generate_single_logical_message(
|
||||
client: &mut impl postgres::GenericClient,
|
||||
transactional: bool,
|
||||
|
||||
Reference in New Issue
Block a user