diff --git a/Cargo.lock b/Cargo.lock index ab26a84e9d..63766ed1cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4660,6 +4660,7 @@ dependencies = [ "rand", "regex", "safekeeper", + "scopeguard", "serde", "thiserror", "tracing", diff --git a/libs/walproposer/Cargo.toml b/libs/walproposer/Cargo.toml index 3ac7080260..47a814b697 100644 --- a/libs/walproposer/Cargo.toml +++ b/libs/walproposer/Cargo.toml @@ -21,6 +21,7 @@ thiserror.workspace = true tracing.workspace = true tracing-subscriber = { workspace = true, features = ["json"] } serde.workspace = true +scopeguard.workspace = true utils.workspace = true safekeeper.workspace = true postgres_ffi.workspace = true diff --git a/libs/walproposer/libpqwalproposer.c b/libs/walproposer/libpqwalproposer.c index 5a50e51d6b..38fd463611 100644 --- a/libs/walproposer/libpqwalproposer.c +++ b/libs/walproposer/libpqwalproposer.c @@ -278,6 +278,7 @@ void InitMyInsert() { CurrBytePos = sim_redo_start_lsn; PrevBytePos = InvalidXLogRecPtr; + sim_latest_available_lsn = sim_redo_start_lsn; } static void MyBeginInsert() @@ -419,7 +420,6 @@ MyFinishInsert(RmgrId rmid, uint8 info, uint8 flags) // Now write it to disk. MyCopyXLogRecordToWAL(rechdr->xl_tot_len, &hdr_rdt, StartPos, EndPos); - return EndPos; } diff --git a/libs/walproposer/src/simtest/safekeeper.rs b/libs/walproposer/src/simtest/safekeeper.rs index 4daf69e804..4a86d04474 100644 --- a/libs/walproposer/src/simtest/safekeeper.rs +++ b/libs/walproposer/src/simtest/safekeeper.rs @@ -14,7 +14,7 @@ use safekeeper::{ }, simlib::{network::TCP, node_os::NodeOs, proto::AnyMessage, world::NodeEvent}, timeline::TimelineError, - SafeKeeperConf, + SafeKeeperConf, wal_storage::Storage, }; use tracing::{debug, info_span}; use utils::{ @@ -149,6 +149,7 @@ impl GlobalMap { pub fn run_server(os: NodeOs, disk: Arc) -> Result<()> { let _enter = info_span!("safekeeper", id = os.id()).entered(); debug!("started server"); + os.log_event("started;safekeeper".to_owned()); let conf = SafeKeeperConf { workdir: PathBuf::from("."), my_id: NodeId(os.id() as u64), @@ -168,6 +169,12 @@ pub fn run_server(os: NodeOs, disk: Arc) -> Result<()> { let mut global = GlobalMap::new(disk, conf.clone())?; let mut conns: HashMap = HashMap::new(); + for (&ttid, shared_state) in global.timelines.iter_mut() { + let flush_lsn = shared_state.sk.wal_store.flush_lsn(); + let commit_lsn = shared_state.sk.state.commit_lsn; + os.log_event(format!("tli_loaded;{};{}", flush_lsn.0, commit_lsn.0)); + } + let epoll = os.epoll(); loop { // waiting for the next message diff --git a/libs/walproposer/src/simtest/util.rs b/libs/walproposer/src/simtest/util.rs index 69b56885b2..2123515c3b 100644 --- a/libs/walproposer/src/simtest/util.rs +++ b/libs/walproposer/src/simtest/util.rs @@ -1,4 +1,4 @@ -use std::{ffi::CString, path::Path, str::FromStr, sync::Arc}; +use std::{ffi::CString, path::Path, str::FromStr, sync::Arc, collections::HashMap}; use rand::{Rng, SeedableRng}; use safekeeper::simlib::{ @@ -6,7 +6,7 @@ use safekeeper::simlib::{ proto::AnyMessage, time::EmptyEvent, world::World, - world::{Node, NodeEvent}, + world::{Node, NodeEvent, SEvent, NodeId}, }; use tracing::{debug, error, info, warn}; use utils::{id::TenantTimelineId, lsn::Lsn}; @@ -253,9 +253,6 @@ impl Test { WalProposer { node: client_node, - txes: Vec::new(), - last_committed_tx: 0, - commit_lsn: Lsn(0), } } @@ -281,23 +278,15 @@ impl Test { // fake walproposer let mut wp = WalProposer { node: wait_node.clone(), - txes: Vec::new(), - last_committed_tx: 0, - commit_lsn: Lsn(0), }; let mut sync_in_progress = true; let mut skipped_tx = 0; let mut started_tx = 0; - let mut finished_tx = 0; let mut schedule_ptr = 0; loop { - if !sync_in_progress { - finished_tx += wp.update(); - } - if sync_in_progress && wait_node.is_finished() { let res = wait_node.result.lock().clone(); if res.0 != 0 { @@ -324,10 +313,8 @@ impl Test { match action { TestAction::WriteTx(size) => { if !sync_in_progress && !wait_node.is_finished() { - for _ in 0..*size { - started_tx += 1; - wp.write_tx(); - } + started_tx += *size; + wp.write_tx(*size); debug!("written {} transactions", size); } else { skipped_tx += size; @@ -367,7 +354,6 @@ impl Test { debug!("finished schedule"); debug!("skipped_tx: {}", skipped_tx); debug!("started_tx: {}", started_tx); - debug!("finished_tx: {}", finished_tx); Ok(()) } @@ -375,59 +361,13 @@ impl Test { pub struct WalProposer { pub node: Arc, - pub txes: Vec, - pub last_committed_tx: usize, - pub commit_lsn: Lsn, } impl WalProposer { - pub fn write_tx(&mut self) -> usize { - let new_ptr = unsafe { MyInsertRecord() }; - + pub fn write_tx(&mut self, cnt: usize) { self.node .network_chan() - .send(NodeEvent::Internal(AnyMessage::LSN(new_ptr as u64))); - - let tx_id = self.txes.len(); - self.txes.push(Lsn(new_ptr as u64)); - - tx_id - } - - /// Updates committed status. - pub fn update(&mut self) -> u64 { - let last_result = self.node.result.lock().clone(); - if last_result.0 != 1 { - // not an LSN update - return 0; - } - - let mut commited_now = 0; - - let lsn_str = last_result.1; - let lsn = Lsn::from_str(&lsn_str); - match lsn { - Ok(lsn) => { - self.commit_lsn = lsn; - debug!("commit_lsn: {}", lsn); - - while self.last_committed_tx < self.txes.len() - && self.txes[self.last_committed_tx] <= lsn - { - debug!( - "Tx #{} was commited at {}, last_commit_lsn={}", - self.last_committed_tx, self.txes[self.last_committed_tx], self.commit_lsn - ); - commited_now += 1; - self.last_committed_tx += 1; - } - } - Err(e) => { - error!("failed to parse LSN: {:?}", e); - } - } - - commited_now + .send(NodeEvent::Internal(AnyMessage::Just32(cnt as u32))); } pub fn stop(&self) { @@ -490,3 +430,181 @@ pub fn generate_network_opts(seed: u64) -> NetworkOptions { }, } } + +#[derive(Debug,Clone,PartialEq,Eq)] +enum NodeKind { + Unknown, + Safekeeper, + WalProposer, +} + +impl Default for NodeKind { + fn default() -> Self { + Self::Unknown + } +} + +#[derive(Clone, Debug, Default)] +struct NodeInfo { + kind: NodeKind, + + // walproposer + is_sync: bool, + term: u64, + epoch_lsn: u64, + + // safekeeper + commit_lsn: u64, + flush_lsn: u64, +} + +impl NodeInfo { + fn init_kind(&mut self, kind: NodeKind) { + if self.kind == NodeKind::Unknown { + self.kind = kind; + } else { + assert!(self.kind == kind); + } + } + + fn started(&mut self, data: &str) { + let mut parts = data.split(';'); + assert!(parts.next().unwrap() == "started"); + match parts.next().unwrap() { + "safekeeper" => { + self.init_kind(NodeKind::Safekeeper); + } + "walproposer" => { + self.init_kind(NodeKind::WalProposer); + let is_sync: u8 = parts.next().unwrap().parse().unwrap(); + self.is_sync = is_sync != 0; + } + _ => unreachable!(), + } + } +} + +#[derive(Debug,Default)] +struct GlobalState { + nodes: Vec, + commit_lsn: u64, + write_lsn: u64, + max_write_lsn: u64, + + written_wal: u64, + written_records: u64, +} + +impl GlobalState { + fn new() -> Self { + Default::default() + } + + fn get(&mut self, id: u32) -> &mut NodeInfo { + let id = id as usize; + if id >= self.nodes.len() { + self.nodes.resize(id + 1, NodeInfo::default()); + } + &mut self.nodes[id] + } +} + +pub fn validate_events(events: Vec) { + const INITDB_LSN: u64 = 21623024; + + let hook = std::panic::take_hook(); + scopeguard::defer_on_success! { + std::panic::set_hook(hook); + }; + + let mut state = GlobalState::new(); + state.max_write_lsn = INITDB_LSN; + + for event in events { + debug!("{:?}", event); + + let node = state.get(event.node); + if event.data.starts_with("started;") { + node.started(&event.data); + continue; + } + assert!(node.kind != NodeKind::Unknown); + + // drop reference to unlock state + let mut node = node.clone(); + + let mut parts = event.data.split(';'); + match node.kind { + NodeKind::Safekeeper => { + match parts.next().unwrap() { + "tli_loaded" => { + let flush_lsn: u64 = parts.next().unwrap().parse().unwrap(); + let commit_lsn: u64 = parts.next().unwrap().parse().unwrap(); + node.flush_lsn = flush_lsn; + node.commit_lsn = commit_lsn; + } + _ => unreachable!(), + } + } + NodeKind::WalProposer => { + match parts.next().unwrap() { + "prop_elected" => { + let prop_lsn: u64 = parts.next().unwrap().parse().unwrap(); + let prop_term: u64 = parts.next().unwrap().parse().unwrap(); + let prev_lsn: u64 = parts.next().unwrap().parse().unwrap(); + let prev_term: u64 = parts.next().unwrap().parse().unwrap(); + + assert!(prop_lsn >= prev_lsn); + assert!(prop_term >= prev_term); + + assert!(prop_lsn >= state.commit_lsn); + + if prop_lsn > state.write_lsn { + assert!(prop_lsn <= state.max_write_lsn); + debug!("moving write_lsn up from {} to {}", state.write_lsn, prop_lsn); + state.write_lsn = prop_lsn; + } + if prop_lsn < state.write_lsn { + debug!("moving write_lsn down from {} to {}", state.write_lsn, prop_lsn); + state.write_lsn = prop_lsn; + } + + node.epoch_lsn = prop_lsn; + node.term = prop_term; + } + "write_wal" => { + assert!(!node.is_sync); + let start_lsn: u64 = parts.next().unwrap().parse().unwrap(); + let end_lsn: u64 = parts.next().unwrap().parse().unwrap(); + let cnt: u64 = parts.next().unwrap().parse().unwrap(); + + let size = end_lsn - start_lsn; + state.written_wal += size; + state.written_records += cnt; + + // TODO: If we allow writing WAL before winning the election + + assert!(start_lsn >= state.commit_lsn); + assert!(end_lsn >= start_lsn); + assert!(start_lsn == state.write_lsn); + state.write_lsn = end_lsn; + + if end_lsn > state.max_write_lsn { + state.max_write_lsn = end_lsn; + } + } + "commit_lsn" => { + let lsn: u64 = parts.next().unwrap().parse().unwrap(); + assert!(lsn >= state.commit_lsn); + state.commit_lsn = lsn; + } + _ => unreachable!(), + } + } + _ => unreachable!(), + } + + // update the node in the state struct + *state.get(event.node) = node; + } +} diff --git a/libs/walproposer/src/simtest/wp_sk.rs b/libs/walproposer/src/simtest/wp_sk.rs index 953d843563..996e03aef6 100644 --- a/libs/walproposer/src/simtest/wp_sk.rs +++ b/libs/walproposer/src/simtest/wp_sk.rs @@ -20,7 +20,7 @@ use crate::{ simtest::{ log::{init_logger, SimClock}, safekeeper::run_server, - util::{generate_schedule, TestConfig, generate_network_opts}, + util::{generate_schedule, TestConfig, generate_network_opts, validate_events}, }, enable_debug, }; @@ -60,9 +60,8 @@ fn run_walproposer_generate_wal() { test.poll_for_duration(30); for i in 0..100 { - wp.write_tx(); + wp.write_tx(1); test.poll_for_duration(5); - wp.update(); } } @@ -80,19 +79,13 @@ fn crash_safekeeper() { let mut wp = test.launch_walproposer(lsn); test.poll_for_duration(30); - wp.update(); - wp.write_tx(); - wp.write_tx(); - wp.write_tx(); + wp.write_tx(3); test.servers[0].restart(); test.poll_for_duration(100); - wp.update(); - test.poll_for_duration(1000); - wp.update(); } #[test] @@ -109,13 +102,9 @@ fn test_simple_restart() { let mut wp = test.launch_walproposer(lsn); test.poll_for_duration(30); - wp.update(); - wp.write_tx(); - wp.write_tx(); - wp.write_tx(); + wp.write_tx(3); test.poll_for_duration(100); - wp.update(); wp.stop(); drop(wp); @@ -207,8 +196,8 @@ fn test_random_schedules() -> anyhow::Result<()> { warn!("Running test with seed {}", seed); let schedule = generate_schedule(seed); - test.run_schedule(&schedule)?; - + test.run_schedule(&schedule).unwrap(); + validate_events(test.world.take_events()); test.world.deallocate(); } @@ -231,7 +220,7 @@ fn test_one_schedule() -> anyhow::Result<()> { // test.run_schedule(&schedule)?; // test.world.deallocate(); - let seed = 11245530003696902397; + let seed = 3649773280641776194; config.network = generate_network_opts(seed); info!("network: {:?}", config.network); let test = config.start(seed); @@ -240,6 +229,7 @@ fn test_one_schedule() -> anyhow::Result<()> { let schedule = generate_schedule(seed); info!("schedule: {:?}", schedule); test.run_schedule(&schedule).unwrap(); + validate_events(test.world.take_events()); test.world.deallocate(); Ok(()) diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 2ec56cdde2..3a9898d99b 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -370,6 +370,8 @@ void WalProposerRust() InitMyInsert(); + sim_log("started;walproposer;%d", (int) syncSafekeepers); + #if PG_VERSION_NUM < 150000 ThisTimeLineID = 1; #endif @@ -459,6 +461,8 @@ WalProposerBroadcast(XLogRecPtr startpos, XLogRecPtr endpos) } #ifdef SIMLIB +XLogRecPtr MyInsertRecord(); + int SimWaitEventSetWait(Safekeeper **sk, long timeout, WaitEvent *occurred_events) { @@ -484,9 +488,28 @@ SimWaitEventSetWait(Safekeeper **sk, long timeout, WaitEvent *occurred_events) } } walprop_log(FATAL, "unknown tcp connection"); - } else if (event.tag == Internal && event.any_message == LSN) { + } else if (event.tag == Internal && event.any_message == Just32) { + uint32_t tx_count; + XLogRecPtr start_lsn = sim_latest_available_lsn; + XLogRecPtr finish_lsn = sim_latest_available_lsn; + + Assert(!syncSafekeepers); + sim_epoll_rcv(0); - sim_msg_get_lsn(&sim_latest_available_lsn); + sim_msg_get_just_u32(&tx_count); + + // don't write WAL before winning the election + if (propEpochStartLsn != 0) + { + for (uint32_t i = 0; i < tx_count; i++) + { + finish_lsn = MyInsertRecord(); + } + + sim_log("write_wal;%lu;%lu;%d", start_lsn, finish_lsn, (int) tx_count); + sim_latest_available_lsn = finish_lsn; + } + *occurred_events = (WaitEvent) { .events = WL_LATCH_SET, }; @@ -1548,7 +1571,16 @@ DetermineEpochStartLsn(void) safekeeper[donor].host, safekeeper[donor].port, LSN_FORMAT_ARGS(truncateLsn)); - sim_log("prop_elected;%lu", propEpochStartLsn); + { + XLogRecPtr prev_lsn = 0; + term_t prev_term = 0; + if (propTermHistory.n_entries > 1) + { + prev_lsn = propTermHistory.entries[propTermHistory.n_entries - 2].lsn; + prev_term = propTermHistory.entries[propTermHistory.n_entries - 2].term; + } + sim_log("prop_elected;%lu;%lu;%lu;%lu", propEpochStartLsn, propTerm, prev_lsn, prev_term); + } /* * Ensure the basebackup we are running (at RedoStartLsn) matches LSN diff --git a/safekeeper/src/simlib/world.rs b/safekeeper/src/simlib/world.rs index b8d4bf0460..ad8f302c5d 100644 --- a/safekeeper/src/simlib/world.rs +++ b/safekeeper/src/simlib/world.rs @@ -5,7 +5,7 @@ use std::{ panic::AssertUnwindSafe, sync::{atomic::AtomicU64, Arc}, }; -use tracing::{debug, info}; +use tracing::{debug, info, trace}; use super::{ chan::Chan, @@ -298,7 +298,7 @@ impl World { std::mem::swap(&mut connections, &mut self.connections.lock()); for conn in connections { conn.deallocate(); - debug!("conn strong count: {}", Arc::strong_count(&conn)); + trace!("conn strong count: {}", Arc::strong_count(&conn)); } let mut nodes = Vec::new(); @@ -313,7 +313,7 @@ impl World { for weak_ptr in weak_ptrs { let node = weak_ptr.upgrade(); if node.is_none() { - debug!("node is already deallocated"); + trace!("node is already deallocated"); continue; } let node = node.unwrap();