diff --git a/postgres_ffi/src/xlog_utils.rs b/postgres_ffi/src/xlog_utils.rs index 7fe4f40158..caf1940a9c 100644 --- a/postgres_ffi/src/xlog_utils.rs +++ b/postgres_ffi/src/xlog_utils.rs @@ -51,6 +51,13 @@ pub type TimeLineID = u32; pub type TimestampTz = i64; pub type XLogSegNo = u64; +/// Interval of checkpointing metadata file. We should store metadata file to enforce +/// predicate that checkpoint.nextXid is larger than any XID in WAL. +/// But flushing checkpoint file for each transaction seems to be too expensive, +/// so XID_CHECKPOINT_INTERVAL is used to forward align nextXid and so perform +/// metadata checkpoint only once per XID_CHECKPOINT_INTERVAL transactions. +/// XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE +/// in order to let CLOG_TRUNCATE mechanism correctly extend CLOG. const XID_CHECKPOINT_INTERVAL: u32 = 1024; #[allow(non_snake_case)] @@ -400,9 +407,13 @@ impl CheckPoint { /// /// Returns 'true' if the XID was updated. pub fn update_next_xid(&mut self, xid: u32) -> bool { - let xid = xid.wrapping_add(XID_CHECKPOINT_INTERVAL - 1) & !(XID_CHECKPOINT_INTERVAL - 1); + // nextXid should nw greate than any XID in WAL, so increment provided XID and check for wraparround. + let mut new_xid = std::cmp::max(xid + 1, pg_constants::FIRST_NORMAL_TRANSACTION_ID); + // To reduce number of metadata checkpoints, we forward align XID on XID_CHECKPOINT_INTERVAL. + // XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE + new_xid = + new_xid.wrapping_add(XID_CHECKPOINT_INTERVAL - 1) & !(XID_CHECKPOINT_INTERVAL - 1); let full_xid = self.nextXid.value; - let new_xid = std::cmp::max(xid + 1, pg_constants::FIRST_NORMAL_TRANSACTION_ID); let old_xid = full_xid as u32; if new_xid.wrapping_sub(old_xid) as i32 > 0 { let mut epoch = full_xid >> 32; @@ -520,4 +531,34 @@ mod tests { println!("wal_end={}, tli={}", wal_end, tli); assert_eq!(wal_end, waldump_wal_end); } + + /// Check the math in update_next_xid + /// + /// NOTE: These checks are sensitive to the value of XID_CHECKPOINT_INTERVAL, + /// currently 1024. + #[test] + pub fn test_update_next_xid() { + let checkpoint_buf = [0u8; std::mem::size_of::()]; + let mut checkpoint = CheckPoint::decode(&checkpoint_buf).unwrap(); + + checkpoint.nextXid = FullTransactionId { value: 10 }; + assert_eq!(checkpoint.nextXid.value, 10); + + // The input XID gets rounded up to the next XID_CHECKPOINT_INTERVAL + // boundary + checkpoint.update_next_xid(100); + assert_eq!(checkpoint.nextXid.value, 1024); + + // No change + checkpoint.update_next_xid(500); + assert_eq!(checkpoint.nextXid.value, 1024); + checkpoint.update_next_xid(1023); + assert_eq!(checkpoint.nextXid.value, 1024); + + // The function returns the *next* XID, given the highest XID seen so + // far. So when we pass 1024, the nextXid gets bumped up to the next + // XID_CHECKPOINT_INTERVAL boundary. + checkpoint.update_next_xid(1024); + assert_eq!(checkpoint.nextXid.value, 2048); + } } diff --git a/test_runner/batch_others/test_next_xid.py b/test_runner/batch_others/test_next_xid.py new file mode 100644 index 0000000000..8a92cec749 --- /dev/null +++ b/test_runner/batch_others/test_next_xid.py @@ -0,0 +1,61 @@ +import pytest +import random +import time + +from fixtures.zenith_fixtures import ZenithEnvBuilder +from fixtures.log_helper import log + +pytest_plugins = ("fixtures.zenith_fixtures") + + +# Test restarting page server, while safekeeper and compute node keep +# running. +def test_next_xid(zenith_env_builder: ZenithEnvBuilder): + # One safekeeper is enough for this test. + zenith_env_builder.num_safekeepers = 1 + env = zenith_env_builder.init() + + pg = env.postgres.create_start('main') + + conn = pg.connect() + cur = conn.cursor() + cur.execute('CREATE TABLE t(x integer)') + + iterations = 32 + for i in range(1, iterations + 1): + print(f'iteration {i} / {iterations}') + + # Kill and restart the pageserver. + pg.stop() + env.pageserver.stop(immediate=True) + env.pageserver.start() + pg.start() + + retry_sleep = 0.5 + max_retries = 200 + retries = 0 + while True: + try: + conn = pg.connect() + cur = conn.cursor() + cur.execute(f"INSERT INTO t values({i})") + conn.close() + + except Exception as error: + # It's normal that it takes some time for the pageserver to + # restart, and for the connection to fail until it does. It + # should eventually recover, so retry until it succeeds. + print(f'failed: {error}') + if retries < max_retries: + retries += 1 + print(f'retry {retries} / {max_retries}') + time.sleep(retry_sleep) + continue + else: + raise + break + + conn = pg.connect() + cur = conn.cursor() + cur.execute("SELECT count(*) FROM t") + assert cur.fetchone() == (iterations, )