From dd4c8fb5686283319e66f4306ef2d7ba17e4da06 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Thu, 9 Mar 2023 00:51:14 +0200 Subject: [PATCH] WIP --- Cargo.lock | 27 ++++++++ safekeeper/Cargo.toml | 3 + safekeeper/src/sim/disk.rs | 54 +++++++++++++++ safekeeper/src/sim/mod.rs | 1 + safekeeper/src/sim/node_os.rs | 7 +- safekeeper/src/sim/start_test.rs | 25 ++++++- safekeeper/src/sim/sync.rs | 30 ++++++++- safekeeper/src/sim/wait_group.rs | 54 +++++++++++++++ safekeeper/src/sim/world.rs | 111 +++++++++++++++++++++++++++---- 9 files changed, 295 insertions(+), 17 deletions(-) create mode 100644 safekeeper/src/sim/wait_group.rs diff --git a/Cargo.lock b/Cargo.lock index fe5aae6ae8..54f4e95c94 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1014,6 +1014,20 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6548a0ad5d2549e111e1f6a11a6c2e2d00ce6a3dafe22948d67c2b443f775e52" +[[package]] +name = "crossbeam" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c" +dependencies = [ + "cfg-if", + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + [[package]] name = "crossbeam-channel" version = "0.5.6" @@ -1048,6 +1062,16 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.14" @@ -3311,6 +3335,7 @@ dependencies = [ "clap 4.1.4", "const_format", "crc32c", + "crossbeam", "fs2", "git-version", "hex", @@ -3324,9 +3349,11 @@ dependencies = [ "postgres-protocol", "postgres_ffi", "pq_proto", + "rand", "regex", "remote_storage", "safekeeper_api", + "scopeguard", "serde", "serde_json", "serde_with", diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index 2424509477..91281cd597 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -42,8 +42,11 @@ remote_storage.workspace = true safekeeper_api.workspace = true storage_broker.workspace = true utils.workspace = true +scopeguard.workspace = true workspace_hack.workspace = true +crossbeam = "0.8.2" +rand.workspace = true [dev-dependencies] tempfile.workspace = true diff --git a/safekeeper/src/sim/disk.rs b/safekeeper/src/sim/disk.rs index a9d712ecc8..fea35efa14 100644 --- a/safekeeper/src/sim/disk.rs +++ b/safekeeper/src/sim/disk.rs @@ -1,7 +1,61 @@ +use std::sync::Arc; + use anyhow::Result; +use super::sync::Mutex; + pub trait Storage { fn flush_pos(&self) -> u32; fn flush(&mut self) -> Result<()>; fn write(&mut self, t: T); } + +#[derive(Clone)] +pub struct SharedStorage { + state: Arc>>, +} + +impl SharedStorage { + pub fn new() -> Self { + Self { + state: Arc::new(Mutex::new(InMemoryStorage::new())), + } + } +} + +impl Storage for SharedStorage { + fn flush_pos(&self) -> u32 { + self.state.lock().flush_pos + } + + fn flush(&mut self) -> Result<()> { + self.state.lock().flush() + } + + fn write(&mut self, t: T) { + self.state.lock().write(t); + } +} + +pub struct InMemoryStorage { + data: Vec, + flush_pos: u32, +} + +impl InMemoryStorage { + pub fn new() -> Self { + Self { + data: Vec::new(), + flush_pos: 0, + } + } + + pub fn flush(&mut self) -> Result<()> { + self.flush_pos = self.data.len() as u32; + Ok(()) + } + + pub fn write(&mut self, t: T) { + self.data.push(t); + } +} diff --git a/safekeeper/src/sim/mod.rs b/safekeeper/src/sim/mod.rs index 015a78aaaf..ac3b9b8817 100644 --- a/safekeeper/src/sim/mod.rs +++ b/safekeeper/src/sim/mod.rs @@ -8,3 +8,4 @@ pub mod tcp; pub mod chan; pub mod sync; pub mod start_test; +pub mod wait_group; diff --git a/safekeeper/src/sim/node_os.rs b/safekeeper/src/sim/node_os.rs index aa7f50fdea..3cc3ad4120 100644 --- a/safekeeper/src/sim/node_os.rs +++ b/safekeeper/src/sim/node_os.rs @@ -16,10 +16,15 @@ impl NodeOs { } } + /// Get the node id. + pub fn id(&self) -> NodeId { + self.internal.id + } + /// Returns a writable pipe. All incoming messages should be polled /// with [`network_epoll`]. Always successful. pub fn open_tcp(&self, dst: NodeId) -> Tcp { - self.world.open_tcp(self.internal, dst) + self.world.open_tcp(&self.internal, dst) } /// Returns a channel to receive events from the network. diff --git a/safekeeper/src/sim/start_test.rs b/safekeeper/src/sim/start_test.rs index c74b988024..f5a7f87080 100644 --- a/safekeeper/src/sim/start_test.rs +++ b/safekeeper/src/sim/start_test.rs @@ -1,9 +1,32 @@ use std::sync::Arc; -use super::world::World; +use super::{world::World, client::run_client, disklog::run_server, disk::SharedStorage}; #[test] fn start_simulation() { let world = Arc::new(World::new()); let client_node = world.new_node(); + let server_node = world.new_node(); + let server_id = server_node.id; + + // start the client thread + let data = [1, 2, 3, 4, 5]; + client_node.launch(move |os| { + run_client(os, &data, server_id) + }); + + // start the server thread + let shared_storage = SharedStorage::new(); + let server_storage = shared_storage.clone(); + server_node.launch(move |os| { + run_server(os, Box::new(server_storage)) + }); + + world.await_all(); + world.debug_print_state(); + + while world.step() { + println!("made a step!"); + world.debug_print_state(); + } } diff --git a/safekeeper/src/sim/sync.rs b/safekeeper/src/sim/sync.rs index 7a92e56713..6a25492869 100644 --- a/safekeeper/src/sim/sync.rs +++ b/safekeeper/src/sim/sync.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, backtrace::Backtrace, io::{self, Write}}; pub type Mutex = parking_lot::Mutex; @@ -60,13 +60,17 @@ pub struct Park { struct ParkState { finished: bool, + dbg_signal: u8, } impl Park { pub fn new() -> Arc { Arc::new( Park { - lock: Mutex::new(ParkState { finished: false }), + lock: Mutex::new(ParkState { + finished: false, + dbg_signal: 0, + }), cvar: parking_lot::Condvar::new(), } ) @@ -80,6 +84,17 @@ impl Park { let mut state = self.lock.lock(); while !state.finished { self.cvar.wait(&mut state); + + // check if debug info was requested + if state.dbg_signal != 0 { + let bt = Backtrace::capture(); + println!("DEBUG: thread {:?} is parked at {:?}", std::thread::current().id(), bt); + // TODO: fix bad ordering of output + io::stdout().flush().unwrap(); + state.dbg_signal = 0; + // trigger a notification to wake up the caller thread + self.cvar.notify_all(); + } } // finish parking } @@ -90,4 +105,15 @@ impl Park { state.finished = true; self.cvar.notify_all(); } + + /// Send a signal to the thread that is currently parked to print debug info. + pub fn debug_print(&self) { + let mut state = self.lock.lock(); + state.dbg_signal = 1; + self.cvar.notify_all(); + + while !state.dbg_signal == 0 && !state.finished { + self.cvar.wait(&mut state); + } + } } diff --git a/safekeeper/src/sim/wait_group.rs b/safekeeper/src/sim/wait_group.rs new file mode 100644 index 0000000000..363e59095f --- /dev/null +++ b/safekeeper/src/sim/wait_group.rs @@ -0,0 +1,54 @@ +use std::sync::{Arc, Condvar, Mutex}; + +/// This is a custom waitgroup for internal use, shouldn't be used by the custom code. +#[derive(Clone)] +pub struct WaitGroup { + inner: Arc, +} + +/// Inner state of a `WaitGroup`. +struct Inner { + // using std convar + cvar: Condvar, + count: Mutex, +} + +impl Default for WaitGroup { + fn default() -> Self { + Self { + inner: Arc::new(Inner { + cvar: Condvar::new(), + count: Mutex::new(0), + }), + } + } +} + +impl WaitGroup { + pub fn new() -> Self { + Self::default() + } + + pub fn wait(&self) { + if *self.inner.count.lock().unwrap() <= 0 { + return; + } + + let mut count = self.inner.count.lock().unwrap(); + while *count > 0 { + count = self.inner.cvar.wait(count).unwrap(); + } + } + + pub fn add(&self, delta: i32) { + let mut count = self.inner.count.lock().unwrap(); + *count += delta; + if *count <= 0 { + self.inner.cvar.notify_all(); + } + } + + pub fn done(&self) { + self.add(-1); + } +} diff --git a/safekeeper/src/sim/world.rs b/safekeeper/src/sim/world.rs index d5e726a746..c62032cd3f 100644 --- a/safekeeper/src/sim/world.rs +++ b/safekeeper/src/sim/world.rs @@ -1,30 +1,41 @@ use std::sync::{atomic::AtomicI32, Arc}; +use rand::{rngs::StdRng, SeedableRng, Rng}; -use super::{tcp::Tcp, sync::Mutex, chan::Chan, proto::AnyMessage}; +use super::{tcp::Tcp, sync::{Mutex, Park}, chan::Chan, proto::AnyMessage, node_os::NodeOs, wait_group::WaitGroup}; pub type NodeId = u32; /// Full world simulation state, shared between all nodes. pub struct World { nodes: Mutex>>, + + /// List of parked threads, to be woken up by the world simulation. + unconditional_parking: Mutex>>, + + /// Counter for running threads. Generally should not be more than 1, if you want + /// to get a deterministic simulation. 0 means that all threads are parked or finished. + wait_group: WaitGroup, + + /// Random number generator. + rng: Mutex, } impl World { pub fn new() -> World { World{ nodes: Mutex::new(Vec::new()), + unconditional_parking: Mutex::new(Vec::new()), + wait_group: WaitGroup::new(), + rng: Mutex::new(StdRng::seed_from_u64(1337)), } } /// Create a new node. - pub fn new_node(&self) -> Arc { + pub fn new_node(self: &Arc) -> Arc { // TODO: verify let mut nodes = self.nodes.lock(); let id = nodes.len() as NodeId; - let node = Arc::new(Node{ - id, - network: Chan::new(), - }); + let node = Arc::new(Node::new(id, self.clone())); nodes.push(node.clone()); node } @@ -41,12 +52,50 @@ impl World { } /// Returns a writable end of a TCP connection, to send src->dst messages. - pub fn open_tcp(&self, src: Arc, dst: NodeId) -> Tcp { + pub fn open_tcp(&self, src: &Arc, dst: NodeId) -> Tcp { // TODO: replace unwrap() with /dev/null socket. let dst = self.get_node(dst).unwrap(); Tcp::new(dst) } + + /// Blocks the current thread until all nodes will park or finish. + pub fn await_all(&self) { + self.wait_group.wait(); + } + + pub fn step(&self) -> bool { + self.await_all(); + + let mut parking = self.unconditional_parking.lock(); + if parking.is_empty() { + // nothing to do, all threads have finished + return false; + } + + let chosen_one = self.rng.lock().gen_range(0..parking.len()); + let park = parking.swap_remove(chosen_one); + drop(parking); + + // wake up the chosen thread + park.wake(); + + // to have a clean state after each step, wait for all threads to finish + self.await_all(); + return true; + } + + /// Print full world state to stdout. + pub fn debug_print_state(&self) { + println!("[DEBUG] World state, nodes.len()={:?}, parking.len()={:?}", self.nodes.lock().len(), self.unconditional_parking.lock().len()); + for node in self.nodes.lock().iter() { + println!("[DEBUG] node id={:?} status={:?}", node.id, node.status.lock()); + } + // for park in self.unconditional_parking.lock().iter() { + // println!("[DEBUG] parked thread, stacktrace:"); + // park.debug_print(); + // } + } } /// Internal node state. @@ -55,6 +104,7 @@ pub struct Node { network: Chan, status: Mutex, world: Arc, + join_handle: Mutex>>, } #[derive(Clone, Copy, Debug, PartialEq, Eq)] @@ -71,29 +121,64 @@ impl Node { Node{ id, network: Chan::new(), - status: Mutex::new(NodeStatus::NotStarted), + status: Mutex::new(NodeStatus::NotStarted), + world: world.clone(), + join_handle: Mutex::new(None), } } - pub fn launch(&self, f: impl FnOnce() + Send + 'static) { + /// Set a code to run in this node thread. + pub fn launch(self: &Arc, f: impl FnOnce(NodeOs) + Send + 'static) { let node = self.clone(); - std::thread::spawn(move || { - let status = node.status.lock(); + let world = self.world.clone(); + world.wait_group.add(1); + let join_handle = std::thread::spawn(move || { + let wg = world.wait_group.clone(); + scopeguard::defer! { + wg.done(); + } + + let mut status = node.status.lock(); if *status != NodeStatus::NotStarted { - // unhandled panic, clearly a caller bug, should never happen + // clearly a caller bug, should never happen panic!("node {} is already running", node.id); } *status = NodeStatus::Running; drop(status); - // TODO: + node.park_me(); + // TODO: recover from panic (update state, log the error) + f(NodeOs::new(world, node.clone())); + + let mut status = node.status.lock(); + *status = NodeStatus::Finished; + // TODO: log the thread is finished }); + *self.join_handle.lock() = Some(join_handle); } /// Returns a channel to receive events from the network. pub fn network_chan(&self) -> Chan { self.network.clone() } + + /// Park the node current thread until world simulation will decide to continue. + pub fn park_me(&self) { + // TODO: try to rewrite this function + let park = Park::new(); + let mut parking = self.world.unconditional_parking.lock(); + parking.push(park.clone()); + drop(parking); + + // decrease the running threads counter, because current thread is parked + self.world.wait_group.done(); + // and increase it once it will wake up + scopeguard::defer!(self.world.wait_group.add(1)); + + *self.status.lock() = NodeStatus::Parked; + park.park(); + *self.status.lock() = NodeStatus::Running; + } } #[derive(Clone)]