Implement full simlib C API

This commit is contained in:
Arthur Petukhovsky
2023-05-31 20:25:25 +00:00
parent aa0763d49d
commit 0d4f987fc8
10 changed files with 204 additions and 41 deletions

View File

@@ -7,5 +7,10 @@
// #include "c.h"
// #include "walproposer.h"
#include <stdarg.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>
int TestFunc(int a, int b);
void RunClientC();
void RunClientC(uint32_t serverId);

View File

@@ -3,6 +3,31 @@
#include <stdint.h>
#include <stdlib.h>
typedef struct ReplCell {
uint32_t value;
uint32_t client_id;
uint32_t seqno;
} ReplCell;
typedef struct Event {
int64_t tcp;
uint32_t value;
uint32_t tag;
} Event;
void rust_function(uint32_t a);
/**
* C API for the node os.
*/
void sim_sleep(uint64_t ms);
uint64_t sim_random(uint64_t max);
uint32_t sim_id(void);
int64_t sim_open_tcp(uint32_t dst);
void sim_tcp_send(int64_t tcp, struct ReplCell value);
struct Event sim_epoll_rcv(void);

View File

@@ -1,9 +1,10 @@
use std::cell::RefCell;
use std::{cell::RefCell, collections::HashMap};
use safekeeper::simlib::node_os::NodeOs;
use safekeeper::simlib::{node_os::NodeOs, network::TCP, proto::AnyMessage, world::NodeEvent, self};
thread_local! {
pub static CURRENT_NODE_OS: RefCell<Option<NodeOs>> = RefCell::new(None);
static CURRENT_NODE_OS: RefCell<Option<NodeOs>> = RefCell::new(None);
static TCP_CACHE: RefCell<HashMap<i64, TCP>> = RefCell::new(HashMap::new());
}
/// Get the current node os.
@@ -13,15 +14,101 @@ fn os() -> NodeOs {
})
}
fn tcp_save(tcp: TCP) -> i64 {
TCP_CACHE.with(|cell| {
let mut cache = cell.borrow_mut();
let id = tcp.id();
cache.insert(id, tcp);
id
})
}
fn tcp_load(id: i64) -> TCP {
TCP_CACHE.with(|cell| {
let cache = cell.borrow();
cache.get(&id).expect("unknown TCP id").clone()
})
}
/// Should be called before calling any of the C functions.
pub fn c_attach_node_os(os: NodeOs) {
CURRENT_NODE_OS.with(|cell| {
*cell.borrow_mut() = Some(os);
});
TCP_CACHE.with(|cell| {
*cell.borrow_mut() = HashMap::new();
});
}
/// C API for the node os.
#[no_mangle]
pub extern "C" fn sim_sleep(ms: u64) {
println!("got a call to sleep for {} ms", ms);
os().sleep(ms);
}
#[no_mangle]
pub extern "C" fn sim_random(max: u64) -> u64 {
os().random(max)
}
#[no_mangle]
pub extern "C" fn sim_id() -> u32 {
os().id().into()
}
#[no_mangle]
pub extern "C" fn sim_open_tcp(dst: u32) -> i64 {
tcp_save(os().open_tcp(dst.into()))
}
#[no_mangle]
// TODO: custom types!!
pub extern "C" fn sim_tcp_send(tcp: i64, value: ReplCell) {
tcp_load(tcp).send(AnyMessage::ReplCell(simlib::proto::ReplCell {
value: value.value,
client_id: value.client_id,
seqno: value.seqno,
}));
}
#[no_mangle]
pub extern "C" fn sim_epoll_rcv() -> Event {
let event = os().epoll().recv();
match event {
NodeEvent::Accept(tcp) => Event {
tcp: tcp_save(tcp),
value: 0,
tag: 1,
},
NodeEvent::Closed(tcp) => Event {
tcp: tcp_save(tcp),
value: 0,
tag: 2,
},
NodeEvent::Message((message, tcp)) => Event {
tcp: tcp_save(tcp),
value: match message {
AnyMessage::Just32(value) => value.into(),
AnyMessage::ReplCell(cell) => cell.value,
_ => 0,
},
tag: 3,
},
}
}
#[repr(C)]
pub struct Event {
pub tcp: i64,
// TODO: !!!
pub value: u32,
pub tag: u32,
}
#[repr(C)]
pub struct ReplCell {
pub value: u32,
pub client_id: u32,
pub seqno: u32,
}

View File

@@ -1,6 +1,6 @@
use std::sync::Arc;
use safekeeper::simlib::{network::{NetworkOptions, Delay}, world::World};
use safekeeper::{simlib::{network::{NetworkOptions, Delay}, world::World}, simtest::{Options, start_simulation}};
use crate::{bindings::RunClientC, sim::c_attach_node_os};
@@ -8,8 +8,8 @@ use crate::{bindings::RunClientC, sim::c_attach_node_os};
fn run_rust_c_test() {
let delay = Delay {
min: 1,
max: 60,
fail_prob: 0.4,
max: 5,
fail_prob: 0.0,
};
let network = NetworkOptions {
@@ -19,23 +19,16 @@ fn run_rust_c_test() {
};
let seed = 1337;
start_simulation_2(seed, network.clone(), 1_000_000);
}
let u32_data: [u32; 5] = [1, 2, 3, 4, 5];
fn start_simulation_2(seed: u64, network: NetworkOptions, time_limit: u64) {
let network = Arc::new(network);
let world = Arc::new(World::new(seed, network));
world.register_world();
let client_node = world.new_node();
client_node.launch(move |os| {
c_attach_node_os(os);
unsafe { RunClientC() }
start_simulation(Options {
seed,
network: network.clone(),
time_limit: 1_000_000,
client_fn: Box::new(move |os, server_id| {
c_attach_node_os(os);
unsafe { RunClientC(server_id); }
}),
u32_data,
});
world.await_all();
while world.step() && world.now() < time_limit {
println!("made a step");
}
}

View File

@@ -8,5 +8,5 @@ fn test_rust_c_calls() {
#[test]
fn test_sim_bindings() {
unsafe { RunClientC(); }
unsafe { RunClientC(0); }
}

View File

@@ -18,12 +18,35 @@ int TestFunc(int a, int b) {
}
// This is a quick experiment with rewriting existing Rust code in C.
void RunClientC() {
void RunClientC(uint32_t serverId) {
MemoryContextInit();
uint32_t clientId = sim_id();
elog(LOG, "started client");
for (int i = 0; i < 10; i++) {
sim_sleep(100);
int data_len = 5;
int delivered = 0;
int tcp = sim_open_tcp(serverId);
while (delivered < data_len) {
ReplCell cell = {
.value = delivered+1,
.client_id = clientId,
.seqno = delivered,
};
sim_tcp_send(tcp, cell);
Event event = sim_epoll_rcv();
if (event.tag == 2) {
// closed
elog(LOG, "connection closed");
tcp = sim_open_tcp(serverId);
} else if (event.tag == 3) {
// got message
if (event.value == delivered + 1) {
delivered += 1;
}
}
}
}

View File

@@ -20,7 +20,6 @@ pub mod remove_wal;
pub mod safekeeper;
pub mod send_wal;
pub mod simlib;
#[cfg(test)]
pub mod simtest;
pub mod timeline;
pub mod wal_backup;

View File

@@ -66,7 +66,7 @@ type MessageDirection = u8;
/// Node 0 is the creator of the connection (client),
/// and node 1 is the acceptor (server).
pub struct VirtualConnection {
/// Connection id, used for logging and debugging.
/// Connection id, used for logging and debugging and C API.
pub connection_id: u64,
pub world: Arc<World>,
pub nodes: [Arc<Node>; 2],
@@ -370,4 +370,13 @@ impl TCP {
pub fn send(&self, msg: AnyMessage) {
self.conn.send(self.dir, msg);
}
pub fn id(&self) -> i64 {
let positive: i64 = (self.conn.connection_id + 1) as i64;
if self.dir == 0 {
positive
} else {
-positive
}
}
}

View File

@@ -1,6 +1,7 @@
/// All possible flavours of messages.
/// Grouped by the receiver node.
#[derive(Clone, Debug)]
#[repr(C)]
pub enum AnyMessage {
/// Used internally for notifying node about new incoming connection.
InternalConnect,
@@ -9,6 +10,7 @@ pub enum AnyMessage {
}
#[derive(Clone, Debug)]
#[repr(C)]
pub struct ReplCell {
pub value: u32,
pub client_id: u32,

View File

@@ -8,7 +8,7 @@ use crate::{
simlib::{
network::{Delay, NetworkOptions},
proto::ReplCell,
world::World,
world::World, node_os::NodeOs,
},
simtest::{client::run_client, disk::SharedStorage, server::run_server},
};
@@ -28,13 +28,32 @@ fn run_pure_rust_test() {
};
for seed in 0..2000 {
start_simulation(seed, network.clone(), 1_000_000);
let u32_data: [u32; 5] = [1, 2, 3, 4, 5];
let data = u32_to_cells(&u32_data, 1);
start_simulation(Options {
seed,
network: network.clone(),
time_limit: 1_000_000,
client_fn: Box::new(move |os, server_id| {
run_client(os, &data, server_id)
}),
u32_data,
});
}
}
fn start_simulation(seed: u64, network: NetworkOptions, time_limit: u64) {
let network = Arc::new(network);
let world = Arc::new(World::new(seed, network));
pub struct Options {
pub seed: u64,
pub network: NetworkOptions,
pub time_limit: u64,
pub u32_data: [u32; 5],
pub client_fn: Box<dyn FnOnce(NodeOs, u32) + Send + 'static>,
}
pub fn start_simulation(options: Options) {
let network = Arc::new(options.network);
let world = Arc::new(World::new(options.seed, network));
world.register_world();
let client_node = world.new_node();
@@ -42,9 +61,10 @@ fn start_simulation(seed: u64, network: NetworkOptions, time_limit: u64) {
let server_id = server_node.id;
// start the client thread
let u32_data = &[1, 2, 3, 4, 5];
let data = u32_to_cells(u32_data, 1);
client_node.launch(move |os| run_client(os, &data, server_id));
client_node.launch(move |os| {
let client_fn = options.client_fn;
client_fn(os, server_id);
});
// start the server thread
let shared_storage = SharedStorage::new();
@@ -53,10 +73,10 @@ fn start_simulation(seed: u64, network: NetworkOptions, time_limit: u64) {
world.await_all();
while world.step() && world.now() < time_limit {}
while world.step() && world.now() < options.time_limit {}
let disk_data = shared_storage.state.lock().data.clone();
assert!(verify_data(&disk_data, &u32_data[..]));
assert!(verify_data(&disk_data, &options.u32_data[..]));
}
fn u32_to_cells(data: &[u32], client_id: u32) -> Vec<ReplCell> {