From 41b9750e8155bf24106e470530e587e5f5c8b372 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Thu, 24 Aug 2023 23:42:11 +0000 Subject: [PATCH] Run many schedules --- libs/walproposer/src/sim.rs | 4 +- libs/walproposer/src/simtest/util.rs | 45 ++++++++++++++++++---- libs/walproposer/src/simtest/wp_sk.rs | 55 +++++++++++++++++++++++---- pgxn/neon/walproposer.c | 5 +++ pgxn/neon/walproposer_utils.c | 4 +- safekeeper/src/simlib/network.rs | 2 + safekeeper/src/simlib/world.rs | 11 +++--- 7 files changed, 103 insertions(+), 23 deletions(-) diff --git a/libs/walproposer/src/sim.rs b/libs/walproposer/src/sim.rs index c2537f2883..4770a97fe9 100644 --- a/libs/walproposer/src/sim.rs +++ b/libs/walproposer/src/sim.rs @@ -5,6 +5,7 @@ use std::{ collections::HashMap, ffi::{CStr, CString}, }; +use tracing::trace; use crate::sim_proto::{anymessage_tag, AnyMessageTag, Event, EventTag, MESSAGE_BUF}; @@ -209,7 +210,7 @@ pub extern "C" fn sim_now() -> i64 { #[no_mangle] pub extern "C" fn sim_exit(code: i32, msg: *const u8) { - debug!("sim_exit({}, {:?})", code, msg); + trace!("sim_exit({}, {:?})", code, msg); sim_set_result(code, msg); // I tried to make use of pthread_exit, but it doesn't work. @@ -226,5 +227,6 @@ pub extern "C" fn sim_exit(code: i32, msg: *const u8) { pub extern "C" fn sim_set_result(code: i32, msg: *const u8) { let msg = unsafe { CStr::from_ptr(msg as *const i8) }; let msg = msg.to_string_lossy().into_owned(); + debug!("sim_set_result({}, {:?})", code, msg); os().set_result(code, msg); } diff --git a/libs/walproposer/src/simtest/util.rs b/libs/walproposer/src/simtest/util.rs index 57df69596b..06ef685ba9 100644 --- a/libs/walproposer/src/simtest/util.rs +++ b/libs/walproposer/src/simtest/util.rs @@ -1,12 +1,14 @@ use std::{ffi::CString, path::Path, str::FromStr, sync::Arc}; +use rand::{Rng, SeedableRng}; use safekeeper::simlib::{ network::{Delay, NetworkOptions}, proto::AnyMessage, + time::EmptyEvent, world::World, - world::{Node, NodeEvent}, time::EmptyEvent, + world::{Node, NodeEvent}, }; -use tracing::{info, error, warn, debug}; +use tracing::{debug, error, info, warn}; use utils::{id::TenantTimelineId, lsn::Lsn}; use crate::{ @@ -16,7 +18,10 @@ use crate::{ MyInsertRecord, WalProposerCleanup, WalProposerRust, }, c_context, - simtest::{safekeeper::run_server, log::{SimClock, init_logger}}, + simtest::{ + log::{init_logger, SimClock}, + safekeeper::run_server, + }, }; use super::disk::Disk; @@ -272,7 +277,6 @@ impl Test { } } - let mut wait_node = self.launch_sync(); // fake walproposer let mut wp = WalProposer { @@ -297,7 +301,10 @@ impl Test { if sync_in_progress && wait_node.is_finished() { let res = wait_node.result.lock().clone(); if res.0 != 0 { - anyhow::bail!("non-zero exitcode: {:?}", res); + warn!("sync non-zero exitcode: {:?}", res); + debug!("restarting walproposer"); + wait_node = self.launch_sync(); + continue; } let lsn = Lsn::from_str(&res.1)?; debug!("sync-safekeepers finished at LSN {}", lsn); @@ -350,7 +357,10 @@ impl Test { if wait_node.is_finished() { while self.world.step() && self.world.now() < next_event_time {} } else { - while self.world.step() && self.world.now() < next_event_time && !wait_node.is_finished() {} + while self.world.step() + && self.world.now() < next_event_time + && !wait_node.is_finished() + {} } } @@ -425,7 +435,7 @@ impl WalProposer { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum TestAction { WriteTx(usize), RestartSafekeeper(usize), @@ -433,3 +443,24 @@ pub enum TestAction { } pub type Schedule = Vec<(u64, TestAction)>; + +pub fn generate_schedule(seed: u64) -> Schedule { + let mut rng = rand::rngs::StdRng::seed_from_u64(seed); + let mut schedule = Vec::new(); + let mut time = 0; + + let cnt = rng.gen_range(1..100); + + for _ in 0..cnt { + time += rng.gen_range(0..100); + let action = match rng.gen_range(0..3) { + 0 => TestAction::WriteTx(rng.gen_range(1..10)), + 1 => TestAction::RestartSafekeeper(rng.gen_range(0..3)), + 2 => TestAction::RestartWalProposer, + _ => unreachable!(), + }; + schedule.push((time, action)); + } + + schedule +} diff --git a/libs/walproposer/src/simtest/wp_sk.rs b/libs/walproposer/src/simtest/wp_sk.rs index 088c2943b9..cd3a1a8db1 100644 --- a/libs/walproposer/src/simtest/wp_sk.rs +++ b/libs/walproposer/src/simtest/wp_sk.rs @@ -7,7 +7,7 @@ use safekeeper::simlib::{ world::World, world::{Node, NodeEvent}, }; -use tracing::info; +use tracing::{info, warn}; use utils::{id::TenantTimelineId, lsn::Lsn}; use crate::{ @@ -17,10 +17,17 @@ use crate::{ MyInsertRecord, WalProposerCleanup, WalProposerRust, }, c_context, - simtest::{safekeeper::run_server, log::{SimClock, init_logger}, util::TestConfig}, + simtest::{ + log::{init_logger, SimClock}, + safekeeper::run_server, + util::{generate_schedule, TestConfig}, + }, }; -use super::{disk::Disk, util::{Schedule, TestAction}}; +use super::{ + disk::Disk, + util::{Schedule, TestAction}, +}; #[test] fn sync_empty_safekeepers() { @@ -118,7 +125,7 @@ fn test_simple_restart() { } #[test] -fn test_simple_schedule() { +fn test_simple_schedule() -> anyhow::Result<()> { let clock = init_logger(); let mut config = TestConfig::new(Some(clock)); config.network.keepalive_timeout = Some(100); @@ -142,13 +149,15 @@ fn test_simple_schedule() { (1000, TestAction::WriteTx(5)), ]; - test.run_schedule(&schedule).unwrap(); + test.run_schedule(&schedule)?; info!("Test finished, stopping all threads"); test.world.stop_all(); + + Ok(()) } #[test] -fn test_random_schedules() { +fn test_random_schedules() -> anyhow::Result<()> { let clock = init_logger(); let mut config = TestConfig::new(Some(clock)); config.network.keepalive_timeout = Some(100); @@ -156,8 +165,40 @@ fn test_random_schedules() { for i in 0..1000 { let seed: u64 = rand::thread_rng().gen(); let test = config.start(seed); - info!("Running test with seed {}", seed); + warn!("Running test with seed {}", seed); + + let schedule = generate_schedule(seed); + test.run_schedule(&schedule)?; test.world.stop_all(); } + + Ok(()) +} + +#[test] +fn test_one_schedule() -> anyhow::Result<()> { + let clock = init_logger(); + let mut config = TestConfig::new(Some(clock)); + config.network.keepalive_timeout = Some(100); + + // let seed = 6762900106769428342; + // let test = config.start(seed); + // warn!("Running test with seed {}", seed); + + // let schedule = generate_schedule(seed); + // info!("schedule: {:?}", schedule); + // test.run_schedule(&schedule)?; + // test.world.stop_all(); + + let seed = 14035854184686918762; + let test = config.start(seed); + warn!("Running test with seed {}", seed); + + let schedule = generate_schedule(seed); + info!("schedule: {:?}", schedule); + test.run_schedule(&schedule).unwrap(); + test.world.stop_all(); + + Ok(()) } diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index b5ffd9eaa4..158c1bbc83 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -347,6 +347,8 @@ void WalProposerCleanup() SpinLockInit(&walprop_shared->mutex); pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0); } + + XLogWalPropClose(0); } void InitMyInsert(); @@ -644,6 +646,9 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId) { walprop_log(FATAL, "Too many safekeepers"); } + + memset(&safekeeper[n_safekeepers], 0, sizeof(Safekeeper)); + safekeeper[n_safekeepers].host = host; safekeeper[n_safekeepers].port = port; safekeeper[n_safekeepers].state = SS_OFFLINE; diff --git a/pgxn/neon/walproposer_utils.c b/pgxn/neon/walproposer_utils.c index be59cba7f8..d77d5f4540 100644 --- a/pgxn/neon/walproposer_utils.c +++ b/pgxn/neon/walproposer_utils.c @@ -486,9 +486,9 @@ XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr) void XLogWalPropClose(XLogRecPtr recptr) { - Assert(walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size)); + // Assert(walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size)); - if (close(walpropFile) != 0) + if (walpropFile >= 0 && close(walpropFile) != 0) { char xlogfname[MAXFNAMELEN]; diff --git a/safekeeper/src/simlib/network.rs b/safekeeper/src/simlib/network.rs index a14df40a61..0229ac0cc6 100644 --- a/safekeeper/src/simlib/network.rs +++ b/safekeeper/src/simlib/network.rs @@ -305,6 +305,8 @@ impl VirtualConnection { let send_buffer = &mut state.buffers[node_idx]; send_buffer.send_closed = true; + drop(state); + // TODO: notify the other side? self.dst_sockets[node_idx].send(NodeEvent::Closed(TCP::new(self.clone(), node_idx as u8))); diff --git a/safekeeper/src/simlib/world.rs b/safekeeper/src/simlib/world.rs index 30d6583dfe..f679c8410a 100644 --- a/safekeeper/src/simlib/world.rs +++ b/safekeeper/src/simlib/world.rs @@ -1,11 +1,11 @@ use rand::{rngs::StdRng, Rng, SeedableRng}; -use tracing::debug; use std::{ cell::RefCell, ops::DerefMut, panic::AssertUnwindSafe, sync::{atomic::AtomicU64, Arc}, }; +use tracing::debug; use super::{ chan::Chan, @@ -215,11 +215,7 @@ impl World { self.unconditional_parking.lock().len() ); for node in self.nodes.lock().iter() { - debug!( - "node id={:?} status={:?}", - node.id, - node.status.lock() - ); + debug!("node id={:?} status={:?}", node.id, node.status.lock()); } for park in self.unconditional_parking.lock().iter() { park.debug_print(); @@ -422,6 +418,8 @@ impl Node { } pub fn crash_stop(self: &Arc) { + self.world.await_all(); + let status = self.status.lock().clone(); match status { NodeStatus::NotStarted | NodeStatus::Finished | NodeStatus::Failed => return, @@ -444,6 +442,7 @@ impl Node { }; park.debug_print(); + // self.world.debug_print_state(); // unplug old network socket, and create a new one *self.network.lock() = Chan::new();