Implement simlib sk server

This commit is contained in:
Arthur Petukhovsky
2023-06-02 14:49:55 +00:00
parent 3840d6b18b
commit 909d7fadb8
22 changed files with 461 additions and 60 deletions

1
Cargo.lock generated
View File

@@ -4653,6 +4653,7 @@ dependencies = [
"memoffset 0.8.0",
"once_cell",
"postgres",
"postgres_ffi",
"rand",
"regex",
"safekeeper",

View File

@@ -19,6 +19,7 @@ thiserror.workspace = true
serde.workspace = true
utils.workspace = true
safekeeper.workspace = true
postgres_ffi.workspace = true
workspace_hack.workspace = true

View File

@@ -15,4 +15,7 @@
int TestFunc(int a, int b);
void RunClientC(uint32_t serverId);
void WalProposerRust();
void WalProposerRust();
// Initialize global variables before calling any Postgres C code.
void MyContextInit();

View File

@@ -103,6 +103,7 @@ fn main() -> anyhow::Result<()> {
.allowlist_function("TestFunc")
.allowlist_function("RunClientC")
.allowlist_function("WalProposerRust")
.allowlist_function("MyContextInit")
// .clang_arg(format!("-I{inc_server_path}"))
// .clang_arg(format!("-I{inc_pgxn_path}"))
// Finish the builder and generate the bindings.

View File

@@ -11,6 +11,7 @@ enum AnyMessageTag {
InternalConnect,
Just32,
ReplCell,
Bytes,
};
typedef uint8_t AnyMessageTag;

View File

@@ -2,6 +2,8 @@
#![allow(non_camel_case_types)]
#![allow(non_snake_case)]
use safekeeper::simlib::node_os::NodeOs;
pub mod bindings {
include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
}
@@ -20,3 +22,10 @@ mod test;
#[cfg(test)]
pub mod simtest;
pub fn c_context() -> Option<Box<dyn Fn(NodeOs) + Send + Sync>> {
Some(Box::new(|os: NodeOs| {
sim::c_attach_node_os(os);
unsafe { bindings::MyContextInit(); }
}))
}

View File

@@ -31,7 +31,7 @@ fn tcp_load(id: i64) -> TCP {
}
/// Should be called before calling any of the C functions.
pub fn c_attach_node_os(os: NodeOs) {
pub(crate) fn c_attach_node_os(os: NodeOs) {
CURRENT_NODE_OS.with(|cell| {
*cell.borrow_mut() = Some(os);
});
@@ -96,6 +96,7 @@ pub extern "C" fn sim_epoll_rcv() -> Event {
AnyMessage::InternalConnect => AnyMessageTag::InternalConnect,
AnyMessage::Just32(_) => AnyMessageTag::Just32,
AnyMessage::ReplCell(_) => AnyMessageTag::ReplCell,
AnyMessage::Bytes(_) => AnyMessageTag::Bytes,
},
}
}

View File

@@ -51,4 +51,5 @@ pub enum AnyMessageTag {
InternalConnect,
Just32,
ReplCell,
Bytes,
}

View File

@@ -1,37 +1,8 @@
use safekeeper::{
simlib::network::{Delay, NetworkOptions},
simtest::{start_simulation, Options},
};
#[cfg(test)]
pub mod simple_client;
use crate::{bindings::RunClientC, sim::c_attach_node_os};
#[cfg(test)]
pub mod wp_sk;
#[test]
fn run_rust_c_test() {
let delay = Delay {
min: 1,
max: 5,
fail_prob: 0.0,
};
let network = NetworkOptions {
timeout: Some(50),
connect_delay: delay.clone(),
send_delay: delay.clone(),
};
let seed = 1337;
let u32_data: [u32; 5] = [1, 2, 3, 4, 5];
start_simulation(Options {
seed,
network: network.clone(),
time_limit: 1_000_000,
client_fn: Box::new(move |os, server_id| {
c_attach_node_os(os);
unsafe {
RunClientC(server_id);
}
}),
u32_data,
});
}
pub mod safekeeper;
pub mod storage;

View File

@@ -0,0 +1,206 @@
//! Safekeeper communication endpoint to WAL proposer (compute node).
//! Gets messages from the network, passes them down to consensus module and
//! sends replies back.
use std::collections::HashMap;
use bytes::BytesMut;
use log::info;
use safekeeper::{simlib::{node_os::NodeOs, network::TCP, world::NodeEvent, proto::AnyMessage}, safekeeper::{ProposerAcceptorMessage, ServerInfo, SafeKeeperState, UNKNOWN_SERVER_VERSION, SafeKeeper}, timeline::{TimelineError}, SafeKeeperConf};
use utils::{id::TenantTimelineId, lsn::Lsn};
use anyhow::{Result, bail};
use crate::simtest::storage::{InMemoryState, DummyWalStore};
struct ConnState {
tcp: TCP,
conf: SafeKeeperConf,
greeting: bool,
ttid: TenantTimelineId,
tli: Option<SharedState>,
flush_pending: bool,
}
struct SharedState {
sk: SafeKeeper<InMemoryState, DummyWalStore>,
}
pub fn run_server(os: NodeOs) -> Result<()> {
println!("started server {}", os.id());
let mut conns: HashMap<i64, ConnState> = HashMap::new();
let epoll = os.epoll();
loop {
// waiting for the next message
let mut next_event = Some(epoll.recv());
loop {
let event = match next_event {
Some(event) => event,
None => break,
};
match event {
NodeEvent::Accept(tcp) => {
conns.insert(tcp.id(), ConnState {
tcp,
conf: SafeKeeperConf::dummy(),
greeting: false,
ttid: TenantTimelineId::empty(),
tli: None,
flush_pending: false,
});
}
NodeEvent::Message((msg, tcp)) => {
let conn = conns.get_mut(&tcp.id());
if let Some(conn) = conn {
let res = conn.process_any(msg);
if res.is_err() {
println!("conn {:?} error: {:?}", tcp, res);
conns.remove(&tcp.id());
}
} else {
println!("conn {:?} was closed, dropping msg {:?}", tcp, msg);
}
}
NodeEvent::Closed(_) => {}
}
// TODO: make simulator support multiple events per tick
next_event = epoll.try_recv();
}
conns.retain(|_, conn| {
let res = conn.flush();
if res.is_err() {
println!("conn {:?} error: {:?}", conn.tcp, res);
}
res.is_ok()
});
}
}
impl ConnState {
fn process_any(&mut self, any: AnyMessage) -> Result<()> {
if let AnyMessage::Bytes(copy_data) = any {
let msg = ProposerAcceptorMessage::parse(copy_data)?;
return self.process(msg);
} else {
bail!("unexpected message, expected AnyMessage::Bytes");
}
}
fn create_timeline(&mut self, ttid: TenantTimelineId, server_info: ServerInfo) -> Result<()> {
info!("creating new timeline {}", ttid);
self.ttid = ttid;
let commit_lsn = Lsn::INVALID;
let local_start_lsn = Lsn::INVALID;
// TODO: load state from in-memory storage
let state = SafeKeeperState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
if state.server.wal_seg_size == 0 {
bail!(TimelineError::UninitializedWalSegSize(ttid));
}
if state.server.pg_version == UNKNOWN_SERVER_VERSION {
bail!(TimelineError::UninitialinzedPgVersion(ttid));
}
if state.commit_lsn < state.local_start_lsn {
bail!(
"commit_lsn {} is higher than local_start_lsn {}",
state.commit_lsn,
state.local_start_lsn
);
}
// TODO: implement "persistent" storage for tests
let control_store = InMemoryState::new(state.clone());
// TODO: implement "persistent" storage for tests
let wal_store = DummyWalStore::new();
let sk = SafeKeeper::new(control_store, wal_store, self.conf.my_id)?;
self.tli = Some(SharedState {
sk,
});
Ok(())
}
fn process(&mut self, msg: ProposerAcceptorMessage) -> Result<()> {
if !self.greeting {
self.greeting = true;
match msg {
ProposerAcceptorMessage::Greeting(ref greeting) => {
info!(
"start handshake with walproposer {:?} sysid {} timeline {}",
self.tcp, greeting.system_id, greeting.tli,
);
let server_info = ServerInfo {
pg_version: greeting.pg_version,
system_id: greeting.system_id,
wal_seg_size: greeting.wal_seg_size,
};
let ttid = TenantTimelineId::new(greeting.tenant_id, greeting.timeline_id);
self.create_timeline(ttid, server_info)?
}
_ => {
bail!(
"unexpected message {msg:?} instead of greeting"
);
}
}
return Ok(());
}
match msg {
ProposerAcceptorMessage::AppendRequest(append_request) => {
self.flush_pending = true;
self.process_sk_msg(&ProposerAcceptorMessage::NoFlushAppendRequest(append_request))?;
}
other => {
self.process_sk_msg(&other)?;
}
}
Ok(())
}
/// Process FlushWAL if needed.
// TODO: add extra flushes, to verify that extra flushes don't break anything
fn flush(&mut self) -> Result<()> {
if !self.flush_pending {
return Ok(());
}
self.flush_pending = false;
self.process_sk_msg(&ProposerAcceptorMessage::FlushWAL)
}
/// Make safekeeper process a message and send a reply to the TCP
fn process_sk_msg(&mut self, msg: &ProposerAcceptorMessage) -> Result<()> {
let shared_state = self.tli.as_mut().unwrap();
let reply = shared_state.sk.process_msg(msg)?;
if let Some(reply) = reply {
let mut buf = BytesMut::with_capacity(128);
reply.serialize(&mut buf)?;
self.tcp.send(AnyMessage::Bytes(buf.into()));
}
Ok(())
}
}
impl Drop for ConnState {
fn drop(&mut self) {
println!("dropping conn: {:?}", self.tcp);
self.tcp.close();
// TODO: clean up non-fsynced WAL
}
}

View File

@@ -0,0 +1,37 @@
use std::sync::Arc;
use safekeeper::{
simlib::{network::{Delay, NetworkOptions}, world::World},
simtest::{start_simulation, Options},
};
use crate::{bindings::RunClientC, c_context};
#[test]
fn run_rust_c_test() {
let delay = Delay {
min: 1,
max: 5,
fail_prob: 0.5,
};
let network = NetworkOptions {
timeout: Some(50),
connect_delay: delay.clone(),
send_delay: delay.clone(),
};
let u32_data: [u32; 5] = [1, 2, 3, 4, 5];
let world = Arc::new(World::new(1337, Arc::new(network), c_context()));
start_simulation(Options {
world,
time_limit: 1_000_000,
client_fn: Box::new(move |_, server_id| {
unsafe {
RunClientC(server_id);
}
}),
u32_data,
});
}

View File

@@ -0,0 +1,71 @@
use std::ops::Deref;
use safekeeper::{safekeeper::SafeKeeperState, control_file, wal_storage};
use anyhow::Result;
use utils::lsn::Lsn;
use postgres_ffi::XLogSegNo;
pub struct InMemoryState {
persisted_state: SafeKeeperState,
}
impl InMemoryState {
pub fn new(persisted_state: SafeKeeperState) -> Self {
InMemoryState {
persisted_state,
}
}
}
impl control_file::Storage for InMemoryState {
fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
self.persisted_state = s.clone();
Ok(())
}
}
impl Deref for InMemoryState {
type Target = SafeKeeperState;
fn deref(&self) -> &Self::Target {
&self.persisted_state
}
}
pub struct DummyWalStore {
lsn: Lsn,
}
impl DummyWalStore {
pub fn new() -> Self {
DummyWalStore { lsn: Lsn::INVALID }
}
}
impl wal_storage::Storage for DummyWalStore {
fn flush_lsn(&self) -> Lsn {
self.lsn
}
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
self.lsn = startpos + buf.len() as u64;
Ok(())
}
fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
self.lsn = end_pos;
Ok(())
}
fn flush_wal(&mut self) -> Result<()> {
Ok(())
}
fn remove_up_to(&self) -> Box<dyn Fn(XLogSegNo) -> Result<()>> {
Box::new(move |_segno_up_to: XLogSegNo| Ok(()))
}
fn get_metrics(&self) -> safekeeper::metrics::WalStorageMetrics {
safekeeper::metrics::WalStorageMetrics::default()
}
}

View File

@@ -0,0 +1,52 @@
use std::sync::Arc;
use safekeeper::simlib::{network::{Delay, NetworkOptions}, world::World};
use crate::{simtest::safekeeper::run_server, c_context};
#[test]
fn run_walproposer_safekeeper_test() {
let delay = Delay {
min: 1,
max: 5,
fail_prob: 0.0,
};
let network = NetworkOptions {
timeout: Some(1000),
connect_delay: delay.clone(),
send_delay: delay.clone(),
};
let seed = 1337;
let network = Arc::new(network);
let world = Arc::new(World::new(seed, network, c_context()));
world.register_world();
let client_node = world.new_node();
let servers = [world.new_node(), world.new_node(), world.new_node()];
// let server_ids = [servers[0].id, servers[1].id, servers[2].id];
// start the client thread
client_node.launch(move |_| {
// TODO: run sync-safekeepers
});
// start server threads
for ptr in servers.iter() {
let server = ptr.clone();
let id = server.id;
server.launch(move |os| {
let res = run_server(os);
println!("server {} finished: {:?}", id, res);
});
}
world.await_all();
let time_limit = 1_000_000;
while world.step() && world.now() < time_limit {}
// TODO: verify sync_safekeepers LSN
}

View File

@@ -1,15 +1,29 @@
use crate::bindings::{TestFunc, WalProposerRust};
use crate::bindings::{TestFunc, MyContextInit};
#[test]
fn test_rust_c_calls() {
let res = unsafe { TestFunc(1, 2) };
let res = std::thread::spawn(|| {
let res = unsafe {
MyContextInit();
TestFunc(1, 2)
};
res
}).join().unwrap();
println!("res: {}", res);
}
#[test]
fn test_sim_bindings() {
// unsafe { RunClientC(0); }
unsafe {
WalProposerRust();
}
std::thread::spawn(|| {
unsafe {
MyContextInit();
TestFunc(1, 2)
}
}).join().unwrap();
std::thread::spawn(|| {
unsafe {
MyContextInit();
TestFunc(1, 2)
}
}).join().unwrap();
}

View File

@@ -1,6 +1,7 @@
#include "bindgen_deps.h"
#include "rust_bindings.h"
#include <stdio.h>
#include <pthread.h>
#include "postgres.h"
#include "utils/memutils.h"
@@ -8,8 +9,6 @@
const char *progname = "fakepostgres";
int TestFunc(int a, int b) {
MemoryContextInit();
printf("TestFunc: %d + %d = %d\n", a, b, a + b);
rust_function(0);
elog(LOG, "postgres elog test");
@@ -19,8 +18,6 @@ int TestFunc(int a, int b) {
// This is a quick experiment with rewriting existing Rust code in C.
void RunClientC(uint32_t serverId) {
MemoryContextInit();
uint32_t clientId = sim_id();
elog(LOG, "started client");
@@ -55,3 +52,17 @@ void RunClientC(uint32_t serverId) {
}
}
}
bool initializedMemoryContext = false;
// pthread_mutex_init(&lock, NULL)?
pthread_mutex_t lock;
void MyContextInit() {
// initializes global variables, TODO how to make them thread-local?
pthread_mutex_lock(&lock);
if (!initializedMemoryContext) {
initializedMemoryContext = true;
MemoryContextInit();
}
pthread_mutex_unlock(&lock);
}

View File

@@ -318,7 +318,6 @@ nwp_shmem_startup_hook(void)
void WalProposerRust()
{
MemoryContextInit();
elog(LOG, "WalProposerRust");
}

View File

@@ -77,8 +77,7 @@ impl SafeKeeperConf {
}
impl SafeKeeperConf {
#[cfg(test)]
fn dummy() -> Self {
pub fn dummy() -> Self {
SafeKeeperConf {
workdir: PathBuf::from("./"),
no_sync: false,

View File

@@ -44,4 +44,10 @@ impl<T: Clone> Chan<T> {
self.shared.condvar.wait(&mut queue);
}
}
/// Get a message from the front of the queue, or return `None` if the queue is empty.
pub fn try_recv(&self) -> Option<T> {
let mut queue = self.shared.queue.lock();
queue.pop_front()
}
}

View File

@@ -379,4 +379,12 @@ impl TCP {
-positive
}
}
pub fn connection_id(&self) -> u64 {
self.conn.connection_id
}
pub fn close(&self) {
self.conn.close(self.dir as usize);
}
}

View File

@@ -1,3 +1,5 @@
use bytes::Bytes;
/// All possible flavours of messages.
/// Grouped by the receiver node.
#[derive(Clone, Debug)]
@@ -8,6 +10,7 @@ pub enum AnyMessage {
InternalConnect,
Just32(u32),
ReplCell(ReplCell),
Bytes(Bytes),
}
#[derive(Clone, Debug)]

View File

@@ -39,10 +39,13 @@ pub struct World {
/// Network options.
network_options: Arc<NetworkOptions>,
/// Optional function to initialize nodes right after thread creation.
nodes_init: Option<Box<dyn Fn(NodeOs) + Send + Sync>>,
}
impl World {
pub fn new(seed: u64, network_options: Arc<NetworkOptions>) -> World {
pub fn new(seed: u64, network_options: Arc<NetworkOptions>, nodes_init: Option<Box<dyn Fn(NodeOs) + Send + Sync>>) -> World {
World {
nodes: Mutex::new(Vec::new()),
unconditional_parking: Mutex::new(Vec::new()),
@@ -51,6 +54,7 @@ impl World {
timing: Mutex::new(Timing::new()),
connection_counter: AtomicU64::new(0),
network_options,
nodes_init,
}
}
@@ -276,6 +280,11 @@ impl Node {
// park the current thread, [`launch`] will wait until it's parked
Park::yield_thread();
if let Some(nodes_init) = world.nodes_init.as_ref() {
nodes_init(NodeOs::new(world.clone(), node.clone()));
}
// TODO: recover from panic (update state, log the error)
f(NodeOs::new(world, node.clone()));

View File

@@ -6,7 +6,6 @@ use std::sync::Arc;
use crate::{
simlib::{
network::{NetworkOptions},
proto::ReplCell,
world::World, node_os::NodeOs,
},
@@ -15,8 +14,8 @@ use crate::{
#[cfg(test)]
mod tests {
use crate::simlib::network::{Delay, NetworkOptions};
use std::sync::Arc;
use crate::simlib::{network::{Delay, NetworkOptions}, world::World};
use super::{u32_to_cells, start_simulation, Options, client::run_client};
#[test]
@@ -36,10 +35,10 @@ mod tests {
for seed in 0..20 {
let u32_data: [u32; 5] = [1, 2, 3, 4, 5];
let data = u32_to_cells(&u32_data, 1);
let world = Arc::new(World::new(seed, Arc::new(network.clone()), None));
start_simulation(Options {
seed,
network: network.clone(),
world,
time_limit: 1_000_000,
client_fn: Box::new(move |os, server_id| {
run_client(os, &data, server_id)
@@ -52,16 +51,14 @@ mod tests {
}
pub struct Options {
pub seed: u64,
pub network: NetworkOptions,
pub world: Arc<World>,
pub time_limit: u64,
pub u32_data: [u32; 5],
pub client_fn: Box<dyn FnOnce(NodeOs, u32) + Send + 'static>,
}
pub fn start_simulation(options: Options) {
let network = Arc::new(options.network);
let world = Arc::new(World::new(options.seed, network));
let world = options.world;
world.register_world();
let client_node = world.new_node();