Cleanup resources better

This commit is contained in:
Arthur Petukhovsky
2023-09-16 23:10:53 +00:00
parent 10ad3ae4eb
commit 44c7d96ed0
6 changed files with 102 additions and 5 deletions

View File

@@ -151,7 +151,7 @@ fn test_simple_schedule() -> anyhow::Result<()> {
test.run_schedule(&schedule)?;
info!("Test finished, stopping all threads");
test.world.stop_all();
test.world.deallocate();
Ok(())
}
@@ -209,7 +209,7 @@ fn test_random_schedules() -> anyhow::Result<()> {
let schedule = generate_schedule(seed);
test.run_schedule(&schedule)?;
test.world.stop_all();
test.world.deallocate();
}
Ok(())
@@ -229,7 +229,7 @@ fn test_one_schedule() -> anyhow::Result<()> {
// let schedule = generate_schedule(seed);
// info!("schedule: {:?}", schedule);
// test.run_schedule(&schedule)?;
// test.world.stop_all();
// test.world.deallocate();
let seed = 11245530003696902397;
config.network = generate_network_opts(seed);
@@ -240,7 +240,32 @@ fn test_one_schedule() -> anyhow::Result<()> {
let schedule = generate_schedule(seed);
info!("schedule: {:?}", schedule);
test.run_schedule(&schedule).unwrap();
test.world.stop_all();
test.world.deallocate();
Ok(())
}
#[test]
fn test_res_dealloc() -> anyhow::Result<()> {
// enable_debug();
let clock = init_logger();
let mut config = TestConfig::new(Some(clock));
let seed = 123456;
config.network = generate_network_opts(seed);
let test = config.start(seed);
warn!("Running test with seed {}", seed);
let schedule = generate_schedule(seed);
info!("schedule: {:?}", schedule);
test.run_schedule(&schedule).unwrap();
test.world.stop_all();
let world = test.world.clone();
drop(test);
info!("world strong count: {}", Arc::strong_count(&world));
world.deallocate();
info!("world strong count: {}", Arc::strong_count(&world));
Ok(())
}

View File

@@ -323,6 +323,15 @@ nwp_shmem_startup_hook(void)
void WalProposerCleanup()
{
for (int i = 0; i < n_safekeepers; i++)
{
if (safekeeper[i].xlogreader)
{
XLogReaderFree(safekeeper[i].xlogreader);
safekeeper[i].xlogreader = NULL;
}
}
n_safekeepers = 0;
quorum = 0;
lastSentCommitLsn = 0;

View File

@@ -70,4 +70,9 @@ impl<T: Clone> Chan<T> {
let queue = self.shared.queue.lock();
queue.front().cloned()
}
pub fn clear(&self) {
let mut queue = self.shared.queue.lock();
queue.clear();
}
}

View File

@@ -107,6 +107,8 @@ impl VirtualConnection {
options,
});
conn.world.add_conn(conn.clone());
conn.schedule_timeout();
conn.send_connect();
@@ -316,6 +318,11 @@ impl VirtualConnection {
fn as_event(self: &Arc<Self>) -> Box<NetworkEvent> {
Box::new(NetworkEvent(self.clone()))
}
pub fn deallocate(&self) {
self.dst_sockets[0].clear();
self.dst_sockets[1].clear();
}
}
struct NetworkBuffer {

View File

@@ -59,6 +59,10 @@ impl Timing {
.peek()
.map_or(false, |x| x.time <= self.current_time)
}
pub fn clear(&mut self) {
self.timers.clear();
}
}
pub struct Pending {

View File

@@ -5,7 +5,7 @@ use std::{
panic::AssertUnwindSafe,
sync::{atomic::AtomicU64, Arc},
};
use tracing::debug;
use tracing::{debug, info};
use super::{
chan::Chan,
@@ -47,6 +47,9 @@ pub struct World {
/// Internal event log.
events: Mutex<Vec<SEvent>>,
/// Connections.
connections: Mutex<Vec<Arc<VirtualConnection>>>,
}
impl World {
@@ -65,6 +68,7 @@ impl World {
network_options,
nodes_init,
events: Mutex::new(Vec::new()),
connections: Mutex::new(Vec::new()),
}
}
@@ -279,6 +283,45 @@ impl World {
std::mem::swap(&mut res, &mut events);
res
}
pub fn add_conn(&self, conn: Arc<VirtualConnection>) {
self.connections.lock().push(conn);
}
pub fn deallocate(&self) {
self.stop_all();
self.timing.lock().clear();
self.unconditional_parking.lock().clear();
let mut connections = Vec::new();
std::mem::swap(&mut connections, &mut self.connections.lock());
for conn in connections {
conn.deallocate();
debug!("conn strong count: {}", Arc::strong_count(&conn));
}
let mut nodes = Vec::new();
std::mem::swap(&mut nodes, &mut self.nodes.lock());
let mut weak_ptrs = Vec::new();
for node in nodes {
node.deallocate();
weak_ptrs.push(Arc::downgrade(&node));
}
for weak_ptr in weak_ptrs {
let node = weak_ptr.upgrade();
if node.is_none() {
debug!("node is already deallocated");
continue;
}
let node = node.unwrap();
debug!("node strong count: {}", Arc::strong_count(&node));
}
self.events.lock().clear();
}
}
thread_local! {
@@ -468,6 +511,10 @@ impl Node {
// self.world.debug_print_state();
self.world.wait_group.wait();
}
pub fn deallocate(&self) {
self.network.lock().clear();
}
}
/// Network events and timers.