diff --git a/libs/walproposer/src/simtest/wp_sk.rs b/libs/walproposer/src/simtest/wp_sk.rs index c263b5bd45..e8cd040833 100644 --- a/libs/walproposer/src/simtest/wp_sk.rs +++ b/libs/walproposer/src/simtest/wp_sk.rs @@ -151,7 +151,7 @@ fn test_simple_schedule() -> anyhow::Result<()> { test.run_schedule(&schedule)?; info!("Test finished, stopping all threads"); - test.world.stop_all(); + test.world.deallocate(); Ok(()) } @@ -209,7 +209,7 @@ fn test_random_schedules() -> anyhow::Result<()> { let schedule = generate_schedule(seed); test.run_schedule(&schedule)?; - test.world.stop_all(); + test.world.deallocate(); } Ok(()) @@ -229,7 +229,7 @@ fn test_one_schedule() -> anyhow::Result<()> { // let schedule = generate_schedule(seed); // info!("schedule: {:?}", schedule); // test.run_schedule(&schedule)?; - // test.world.stop_all(); + // test.world.deallocate(); let seed = 11245530003696902397; config.network = generate_network_opts(seed); @@ -240,7 +240,32 @@ fn test_one_schedule() -> anyhow::Result<()> { let schedule = generate_schedule(seed); info!("schedule: {:?}", schedule); test.run_schedule(&schedule).unwrap(); - test.world.stop_all(); + test.world.deallocate(); + + Ok(()) +} + +#[test] +fn test_res_dealloc() -> anyhow::Result<()> { + // enable_debug(); + let clock = init_logger(); + let mut config = TestConfig::new(Some(clock)); + + let seed = 123456; + config.network = generate_network_opts(seed); + let test = config.start(seed); + warn!("Running test with seed {}", seed); + + let schedule = generate_schedule(seed); + info!("schedule: {:?}", schedule); + test.run_schedule(&schedule).unwrap(); + test.world.stop_all(); + + let world = test.world.clone(); + drop(test); + info!("world strong count: {}", Arc::strong_count(&world)); + world.deallocate(); + info!("world strong count: {}", Arc::strong_count(&world)); Ok(()) } diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index d68332555b..235e443808 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -323,6 +323,15 @@ nwp_shmem_startup_hook(void) void WalProposerCleanup() { + for (int i = 0; i < n_safekeepers; i++) + { + if (safekeeper[i].xlogreader) + { + XLogReaderFree(safekeeper[i].xlogreader); + safekeeper[i].xlogreader = NULL; + } + } + n_safekeepers = 0; quorum = 0; lastSentCommitLsn = 0; diff --git a/safekeeper/src/simlib/chan.rs b/safekeeper/src/simlib/chan.rs index 21505d0dbe..4e4e0a282c 100644 --- a/safekeeper/src/simlib/chan.rs +++ b/safekeeper/src/simlib/chan.rs @@ -70,4 +70,9 @@ impl Chan { let queue = self.shared.queue.lock(); queue.front().cloned() } + + pub fn clear(&self) { + let mut queue = self.shared.queue.lock(); + queue.clear(); + } } diff --git a/safekeeper/src/simlib/network.rs b/safekeeper/src/simlib/network.rs index d197a94142..8a7d245717 100644 --- a/safekeeper/src/simlib/network.rs +++ b/safekeeper/src/simlib/network.rs @@ -107,6 +107,8 @@ impl VirtualConnection { options, }); + conn.world.add_conn(conn.clone()); + conn.schedule_timeout(); conn.send_connect(); @@ -316,6 +318,11 @@ impl VirtualConnection { fn as_event(self: &Arc) -> Box { Box::new(NetworkEvent(self.clone())) } + + pub fn deallocate(&self) { + self.dst_sockets[0].clear(); + self.dst_sockets[1].clear(); + } } struct NetworkBuffer { diff --git a/safekeeper/src/simlib/time.rs b/safekeeper/src/simlib/time.rs index a4127580bd..6c5502e2c9 100644 --- a/safekeeper/src/simlib/time.rs +++ b/safekeeper/src/simlib/time.rs @@ -59,6 +59,10 @@ impl Timing { .peek() .map_or(false, |x| x.time <= self.current_time) } + + pub fn clear(&mut self) { + self.timers.clear(); + } } pub struct Pending { diff --git a/safekeeper/src/simlib/world.rs b/safekeeper/src/simlib/world.rs index 4472a97c12..b8d4bf0460 100644 --- a/safekeeper/src/simlib/world.rs +++ b/safekeeper/src/simlib/world.rs @@ -5,7 +5,7 @@ use std::{ panic::AssertUnwindSafe, sync::{atomic::AtomicU64, Arc}, }; -use tracing::debug; +use tracing::{debug, info}; use super::{ chan::Chan, @@ -47,6 +47,9 @@ pub struct World { /// Internal event log. events: Mutex>, + + /// Connections. + connections: Mutex>>, } impl World { @@ -65,6 +68,7 @@ impl World { network_options, nodes_init, events: Mutex::new(Vec::new()), + connections: Mutex::new(Vec::new()), } } @@ -279,6 +283,45 @@ impl World { std::mem::swap(&mut res, &mut events); res } + + pub fn add_conn(&self, conn: Arc) { + self.connections.lock().push(conn); + } + + pub fn deallocate(&self) { + self.stop_all(); + + self.timing.lock().clear(); + self.unconditional_parking.lock().clear(); + + let mut connections = Vec::new(); + std::mem::swap(&mut connections, &mut self.connections.lock()); + for conn in connections { + conn.deallocate(); + debug!("conn strong count: {}", Arc::strong_count(&conn)); + } + + let mut nodes = Vec::new(); + std::mem::swap(&mut nodes, &mut self.nodes.lock()); + + let mut weak_ptrs = Vec::new(); + for node in nodes { + node.deallocate(); + weak_ptrs.push(Arc::downgrade(&node)); + } + + for weak_ptr in weak_ptrs { + let node = weak_ptr.upgrade(); + if node.is_none() { + debug!("node is already deallocated"); + continue; + } + let node = node.unwrap(); + debug!("node strong count: {}", Arc::strong_count(&node)); + } + + self.events.lock().clear(); + } } thread_local! { @@ -468,6 +511,10 @@ impl Node { // self.world.debug_print_state(); self.world.wait_group.wait(); } + + pub fn deallocate(&self) { + self.network.lock().clear(); + } } /// Network events and timers.