Fix checkpoint.nextXid update (#1166)

* Fix checkpoint.nextXid update

* Add test for cehckpoint.nextXid

* Fix indentation of test_next_xid.py

* Fix mypy error in test_next_xid.py

* Tidy up the test case.

* Add a unit test

Co-authored-by: Heikki Linnakangas <heikki@zenith.tech>
This commit is contained in:
Konstantin Knizhnik
2022-01-27 18:21:51 +03:00
committed by GitHub
parent f58a22d07e
commit 08135910a5
2 changed files with 104 additions and 2 deletions

View File

@@ -51,6 +51,13 @@ pub type TimeLineID = u32;
pub type TimestampTz = i64;
pub type XLogSegNo = u64;
/// Interval of checkpointing metadata file. We should store metadata file to enforce
/// predicate that checkpoint.nextXid is larger than any XID in WAL.
/// But flushing checkpoint file for each transaction seems to be too expensive,
/// so XID_CHECKPOINT_INTERVAL is used to forward align nextXid and so perform
/// metadata checkpoint only once per XID_CHECKPOINT_INTERVAL transactions.
/// XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE
/// in order to let CLOG_TRUNCATE mechanism correctly extend CLOG.
const XID_CHECKPOINT_INTERVAL: u32 = 1024;
#[allow(non_snake_case)]
@@ -400,9 +407,13 @@ impl CheckPoint {
///
/// Returns 'true' if the XID was updated.
pub fn update_next_xid(&mut self, xid: u32) -> bool {
let xid = xid.wrapping_add(XID_CHECKPOINT_INTERVAL - 1) & !(XID_CHECKPOINT_INTERVAL - 1);
// nextXid should nw greate than any XID in WAL, so increment provided XID and check for wraparround.
let mut new_xid = std::cmp::max(xid + 1, pg_constants::FIRST_NORMAL_TRANSACTION_ID);
// To reduce number of metadata checkpoints, we forward align XID on XID_CHECKPOINT_INTERVAL.
// XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE
new_xid =
new_xid.wrapping_add(XID_CHECKPOINT_INTERVAL - 1) & !(XID_CHECKPOINT_INTERVAL - 1);
let full_xid = self.nextXid.value;
let new_xid = std::cmp::max(xid + 1, pg_constants::FIRST_NORMAL_TRANSACTION_ID);
let old_xid = full_xid as u32;
if new_xid.wrapping_sub(old_xid) as i32 > 0 {
let mut epoch = full_xid >> 32;
@@ -520,4 +531,34 @@ mod tests {
println!("wal_end={}, tli={}", wal_end, tli);
assert_eq!(wal_end, waldump_wal_end);
}
/// Check the math in update_next_xid
///
/// NOTE: These checks are sensitive to the value of XID_CHECKPOINT_INTERVAL,
/// currently 1024.
#[test]
pub fn test_update_next_xid() {
let checkpoint_buf = [0u8; std::mem::size_of::<CheckPoint>()];
let mut checkpoint = CheckPoint::decode(&checkpoint_buf).unwrap();
checkpoint.nextXid = FullTransactionId { value: 10 };
assert_eq!(checkpoint.nextXid.value, 10);
// The input XID gets rounded up to the next XID_CHECKPOINT_INTERVAL
// boundary
checkpoint.update_next_xid(100);
assert_eq!(checkpoint.nextXid.value, 1024);
// No change
checkpoint.update_next_xid(500);
assert_eq!(checkpoint.nextXid.value, 1024);
checkpoint.update_next_xid(1023);
assert_eq!(checkpoint.nextXid.value, 1024);
// The function returns the *next* XID, given the highest XID seen so
// far. So when we pass 1024, the nextXid gets bumped up to the next
// XID_CHECKPOINT_INTERVAL boundary.
checkpoint.update_next_xid(1024);
assert_eq!(checkpoint.nextXid.value, 2048);
}
}

View File

@@ -0,0 +1,61 @@
import pytest
import random
import time
from fixtures.zenith_fixtures import ZenithEnvBuilder
from fixtures.log_helper import log
pytest_plugins = ("fixtures.zenith_fixtures")
# Test restarting page server, while safekeeper and compute node keep
# running.
def test_next_xid(zenith_env_builder: ZenithEnvBuilder):
# One safekeeper is enough for this test.
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init()
pg = env.postgres.create_start('main')
conn = pg.connect()
cur = conn.cursor()
cur.execute('CREATE TABLE t(x integer)')
iterations = 32
for i in range(1, iterations + 1):
print(f'iteration {i} / {iterations}')
# Kill and restart the pageserver.
pg.stop()
env.pageserver.stop(immediate=True)
env.pageserver.start()
pg.start()
retry_sleep = 0.5
max_retries = 200
retries = 0
while True:
try:
conn = pg.connect()
cur = conn.cursor()
cur.execute(f"INSERT INTO t values({i})")
conn.close()
except Exception as error:
# It's normal that it takes some time for the pageserver to
# restart, and for the connection to fail until it does. It
# should eventually recover, so retry until it succeeds.
print(f'failed: {error}')
if retries < max_retries:
retries += 1
print(f'retry {retries} / {max_retries}')
time.sleep(retry_sleep)
continue
else:
raise
break
conn = pg.connect()
cur = conn.cursor()
cur.execute("SELECT count(*) FROM t")
assert cur.fetchone() == (iterations, )