From 0d4f987fc85ab367171c03f86782416b8291b2cf Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Wed, 31 May 2023 20:25:25 +0000 Subject: [PATCH] Implement full simlib C API --- libs/walproposer/bindgen_deps.h | 7 ++- libs/walproposer/rust_bindings.h | 25 ++++++++ libs/walproposer/src/sim.rs | 95 +++++++++++++++++++++++++++-- libs/walproposer/src/simtest/mod.rs | 33 ++++------ libs/walproposer/src/test.rs | 2 +- libs/walproposer/test.c | 29 ++++++++- safekeeper/src/lib.rs | 1 - safekeeper/src/simlib/network.rs | 11 +++- safekeeper/src/simlib/proto.rs | 2 + safekeeper/src/simtest/mod.rs | 40 +++++++++--- 10 files changed, 204 insertions(+), 41 deletions(-) diff --git a/libs/walproposer/bindgen_deps.h b/libs/walproposer/bindgen_deps.h index a10ca230e4..43fa15f38e 100644 --- a/libs/walproposer/bindgen_deps.h +++ b/libs/walproposer/bindgen_deps.h @@ -7,5 +7,10 @@ // #include "c.h" // #include "walproposer.h" +#include +#include +#include +#include + int TestFunc(int a, int b); -void RunClientC(); +void RunClientC(uint32_t serverId); diff --git a/libs/walproposer/rust_bindings.h b/libs/walproposer/rust_bindings.h index 2f8611a03e..f0cbdbc8c4 100644 --- a/libs/walproposer/rust_bindings.h +++ b/libs/walproposer/rust_bindings.h @@ -3,6 +3,31 @@ #include #include +typedef struct ReplCell { + uint32_t value; + uint32_t client_id; + uint32_t seqno; +} ReplCell; + +typedef struct Event { + int64_t tcp; + uint32_t value; + uint32_t tag; +} Event; + void rust_function(uint32_t a); +/** + * C API for the node os. + */ void sim_sleep(uint64_t ms); + +uint64_t sim_random(uint64_t max); + +uint32_t sim_id(void); + +int64_t sim_open_tcp(uint32_t dst); + +void sim_tcp_send(int64_t tcp, struct ReplCell value); + +struct Event sim_epoll_rcv(void); diff --git a/libs/walproposer/src/sim.rs b/libs/walproposer/src/sim.rs index be5eacec33..4dfcaa7d39 100644 --- a/libs/walproposer/src/sim.rs +++ b/libs/walproposer/src/sim.rs @@ -1,9 +1,10 @@ -use std::cell::RefCell; +use std::{cell::RefCell, collections::HashMap}; -use safekeeper::simlib::node_os::NodeOs; +use safekeeper::simlib::{node_os::NodeOs, network::TCP, proto::AnyMessage, world::NodeEvent, self}; thread_local! { - pub static CURRENT_NODE_OS: RefCell> = RefCell::new(None); + static CURRENT_NODE_OS: RefCell> = RefCell::new(None); + static TCP_CACHE: RefCell> = RefCell::new(HashMap::new()); } /// Get the current node os. @@ -13,15 +14,101 @@ fn os() -> NodeOs { }) } +fn tcp_save(tcp: TCP) -> i64 { + TCP_CACHE.with(|cell| { + let mut cache = cell.borrow_mut(); + let id = tcp.id(); + cache.insert(id, tcp); + id + }) +} + +fn tcp_load(id: i64) -> TCP { + TCP_CACHE.with(|cell| { + let cache = cell.borrow(); + cache.get(&id).expect("unknown TCP id").clone() + }) +} + /// Should be called before calling any of the C functions. pub fn c_attach_node_os(os: NodeOs) { CURRENT_NODE_OS.with(|cell| { *cell.borrow_mut() = Some(os); }); + TCP_CACHE.with(|cell| { + *cell.borrow_mut() = HashMap::new(); + }); } +/// C API for the node os. + #[no_mangle] pub extern "C" fn sim_sleep(ms: u64) { - println!("got a call to sleep for {} ms", ms); os().sleep(ms); } + +#[no_mangle] +pub extern "C" fn sim_random(max: u64) -> u64 { + os().random(max) +} + +#[no_mangle] +pub extern "C" fn sim_id() -> u32 { + os().id().into() +} + +#[no_mangle] +pub extern "C" fn sim_open_tcp(dst: u32) -> i64 { + tcp_save(os().open_tcp(dst.into())) +} + +#[no_mangle] +// TODO: custom types!! +pub extern "C" fn sim_tcp_send(tcp: i64, value: ReplCell) { + tcp_load(tcp).send(AnyMessage::ReplCell(simlib::proto::ReplCell { + value: value.value, + client_id: value.client_id, + seqno: value.seqno, + })); +} + +#[no_mangle] +pub extern "C" fn sim_epoll_rcv() -> Event { + let event = os().epoll().recv(); + match event { + NodeEvent::Accept(tcp) => Event { + tcp: tcp_save(tcp), + value: 0, + tag: 1, + }, + NodeEvent::Closed(tcp) => Event { + tcp: tcp_save(tcp), + value: 0, + tag: 2, + }, + NodeEvent::Message((message, tcp)) => Event { + tcp: tcp_save(tcp), + value: match message { + AnyMessage::Just32(value) => value.into(), + AnyMessage::ReplCell(cell) => cell.value, + _ => 0, + }, + tag: 3, + }, + } +} + +#[repr(C)] +pub struct Event { + pub tcp: i64, + // TODO: !!! + pub value: u32, + pub tag: u32, +} + +#[repr(C)] +pub struct ReplCell { + pub value: u32, + pub client_id: u32, + pub seqno: u32, +} diff --git a/libs/walproposer/src/simtest/mod.rs b/libs/walproposer/src/simtest/mod.rs index c9c0f39aef..52d6673f33 100644 --- a/libs/walproposer/src/simtest/mod.rs +++ b/libs/walproposer/src/simtest/mod.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use safekeeper::simlib::{network::{NetworkOptions, Delay}, world::World}; +use safekeeper::{simlib::{network::{NetworkOptions, Delay}, world::World}, simtest::{Options, start_simulation}}; use crate::{bindings::RunClientC, sim::c_attach_node_os}; @@ -8,8 +8,8 @@ use crate::{bindings::RunClientC, sim::c_attach_node_os}; fn run_rust_c_test() { let delay = Delay { min: 1, - max: 60, - fail_prob: 0.4, + max: 5, + fail_prob: 0.0, }; let network = NetworkOptions { @@ -19,23 +19,16 @@ fn run_rust_c_test() { }; let seed = 1337; - start_simulation_2(seed, network.clone(), 1_000_000); -} + let u32_data: [u32; 5] = [1, 2, 3, 4, 5]; -fn start_simulation_2(seed: u64, network: NetworkOptions, time_limit: u64) { - let network = Arc::new(network); - let world = Arc::new(World::new(seed, network)); - world.register_world(); - - let client_node = world.new_node(); - client_node.launch(move |os| { - c_attach_node_os(os); - unsafe { RunClientC() } + start_simulation(Options { + seed, + network: network.clone(), + time_limit: 1_000_000, + client_fn: Box::new(move |os, server_id| { + c_attach_node_os(os); + unsafe { RunClientC(server_id); } + }), + u32_data, }); - - world.await_all(); - - while world.step() && world.now() < time_limit { - println!("made a step"); - } } diff --git a/libs/walproposer/src/test.rs b/libs/walproposer/src/test.rs index 9e70c928d8..7fd2a58ac1 100644 --- a/libs/walproposer/src/test.rs +++ b/libs/walproposer/src/test.rs @@ -8,5 +8,5 @@ fn test_rust_c_calls() { #[test] fn test_sim_bindings() { - unsafe { RunClientC(); } + unsafe { RunClientC(0); } } diff --git a/libs/walproposer/test.c b/libs/walproposer/test.c index 61f8c7945b..92ccd5b615 100644 --- a/libs/walproposer/test.c +++ b/libs/walproposer/test.c @@ -18,12 +18,35 @@ int TestFunc(int a, int b) { } // This is a quick experiment with rewriting existing Rust code in C. -void RunClientC() { +void RunClientC(uint32_t serverId) { MemoryContextInit(); + uint32_t clientId = sim_id(); + elog(LOG, "started client"); - for (int i = 0; i < 10; i++) { - sim_sleep(100); + int data_len = 5; + + int delivered = 0; + int tcp = sim_open_tcp(serverId); + while (delivered < data_len) { + ReplCell cell = { + .value = delivered+1, + .client_id = clientId, + .seqno = delivered, + }; + sim_tcp_send(tcp, cell); + + Event event = sim_epoll_rcv(); + if (event.tag == 2) { + // closed + elog(LOG, "connection closed"); + tcp = sim_open_tcp(serverId); + } else if (event.tag == 3) { + // got message + if (event.value == delivered + 1) { + delivered += 1; + } + } } } diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 757b1dc851..07d2fb671f 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -20,7 +20,6 @@ pub mod remove_wal; pub mod safekeeper; pub mod send_wal; pub mod simlib; -#[cfg(test)] pub mod simtest; pub mod timeline; pub mod wal_backup; diff --git a/safekeeper/src/simlib/network.rs b/safekeeper/src/simlib/network.rs index e69ce98593..93ad3743eb 100644 --- a/safekeeper/src/simlib/network.rs +++ b/safekeeper/src/simlib/network.rs @@ -66,7 +66,7 @@ type MessageDirection = u8; /// Node 0 is the creator of the connection (client), /// and node 1 is the acceptor (server). pub struct VirtualConnection { - /// Connection id, used for logging and debugging. + /// Connection id, used for logging and debugging and C API. pub connection_id: u64, pub world: Arc, pub nodes: [Arc; 2], @@ -370,4 +370,13 @@ impl TCP { pub fn send(&self, msg: AnyMessage) { self.conn.send(self.dir, msg); } + + pub fn id(&self) -> i64 { + let positive: i64 = (self.conn.connection_id + 1) as i64; + if self.dir == 0 { + positive + } else { + -positive + } + } } diff --git a/safekeeper/src/simlib/proto.rs b/safekeeper/src/simlib/proto.rs index b3d05641b8..a67f907944 100644 --- a/safekeeper/src/simlib/proto.rs +++ b/safekeeper/src/simlib/proto.rs @@ -1,6 +1,7 @@ /// All possible flavours of messages. /// Grouped by the receiver node. #[derive(Clone, Debug)] +#[repr(C)] pub enum AnyMessage { /// Used internally for notifying node about new incoming connection. InternalConnect, @@ -9,6 +10,7 @@ pub enum AnyMessage { } #[derive(Clone, Debug)] +#[repr(C)] pub struct ReplCell { pub value: u32, pub client_id: u32, diff --git a/safekeeper/src/simtest/mod.rs b/safekeeper/src/simtest/mod.rs index 22e8d5e2af..823fb0aca8 100644 --- a/safekeeper/src/simtest/mod.rs +++ b/safekeeper/src/simtest/mod.rs @@ -8,7 +8,7 @@ use crate::{ simlib::{ network::{Delay, NetworkOptions}, proto::ReplCell, - world::World, + world::World, node_os::NodeOs, }, simtest::{client::run_client, disk::SharedStorage, server::run_server}, }; @@ -28,13 +28,32 @@ fn run_pure_rust_test() { }; for seed in 0..2000 { - start_simulation(seed, network.clone(), 1_000_000); + let u32_data: [u32; 5] = [1, 2, 3, 4, 5]; + let data = u32_to_cells(&u32_data, 1); + + start_simulation(Options { + seed, + network: network.clone(), + time_limit: 1_000_000, + client_fn: Box::new(move |os, server_id| { + run_client(os, &data, server_id) + }), + u32_data, + }); } } -fn start_simulation(seed: u64, network: NetworkOptions, time_limit: u64) { - let network = Arc::new(network); - let world = Arc::new(World::new(seed, network)); +pub struct Options { + pub seed: u64, + pub network: NetworkOptions, + pub time_limit: u64, + pub u32_data: [u32; 5], + pub client_fn: Box, +} + +pub fn start_simulation(options: Options) { + let network = Arc::new(options.network); + let world = Arc::new(World::new(options.seed, network)); world.register_world(); let client_node = world.new_node(); @@ -42,9 +61,10 @@ fn start_simulation(seed: u64, network: NetworkOptions, time_limit: u64) { let server_id = server_node.id; // start the client thread - let u32_data = &[1, 2, 3, 4, 5]; - let data = u32_to_cells(u32_data, 1); - client_node.launch(move |os| run_client(os, &data, server_id)); + client_node.launch(move |os| { + let client_fn = options.client_fn; + client_fn(os, server_id); + }); // start the server thread let shared_storage = SharedStorage::new(); @@ -53,10 +73,10 @@ fn start_simulation(seed: u64, network: NetworkOptions, time_limit: u64) { world.await_all(); - while world.step() && world.now() < time_limit {} + while world.step() && world.now() < options.time_limit {} let disk_data = shared_storage.state.lock().data.clone(); - assert!(verify_data(&disk_data, &u32_data[..])); + assert!(verify_data(&disk_data, &options.u32_data[..])); } fn u32_to_cells(data: &[u32], client_id: u32) -> Vec {