Add simulation schedule

This commit is contained in:
Arthur Petukhovsky
2023-08-24 15:24:38 +00:00
parent 33f7877d1b
commit 420d3bc18f
10 changed files with 489 additions and 304 deletions

View File

@@ -8,3 +8,4 @@ pub mod disk;
pub mod safekeeper;
pub mod storage;
pub mod log;
pub mod util;

View File

@@ -284,8 +284,8 @@ impl ConnState {
match msg {
ProposerAcceptorMessage::Greeting(ref greeting) => {
info!(
"start handshake with walproposer {:?} sysid {} timeline {}",
self.tcp, greeting.system_id, greeting.tli,
"start handshake with walproposer {:?}",
self.tcp,
);
let server_info = ServerInfo {
pg_version: greeting.pg_version,

View File

@@ -19,7 +19,7 @@ fn run_rust_c_test() {
};
let network = NetworkOptions {
timeout: Some(50),
keepalive_timeout: Some(50),
connect_delay: delay.clone(),
send_delay: delay.clone(),
};

View File

@@ -0,0 +1,437 @@
use std::{ffi::CString, path::Path, str::FromStr, sync::Arc};
use safekeeper::simlib::{
network::{Delay, NetworkOptions},
proto::AnyMessage,
world::World,
world::{Node, NodeEvent}, time::EmptyEvent,
};
use tracing::{info, error, warn};
use utils::{id::TenantTimelineId, lsn::Lsn};
use crate::{
bindings::{
neon_tenant_walproposer, neon_timeline_walproposer, sim_redo_start_lsn, syncSafekeepers,
wal_acceptor_connection_timeout, wal_acceptor_reconnect_timeout, wal_acceptors_list,
MyInsertRecord, WalProposerCleanup, WalProposerRust,
},
c_context,
simtest::{safekeeper::run_server, log::{SimClock, init_logger}},
};
use super::disk::Disk;
pub struct SkNode {
pub node: Arc<Node>,
pub id: u32,
pub disk: Arc<Disk>,
}
impl SkNode {
pub fn new(node: Arc<Node>) -> Self {
let disk = Arc::new(Disk::new());
let res = Self {
id: node.id,
node,
disk,
};
res.launch();
res
}
pub fn launch(&self) {
let id = self.id;
let disk = self.disk.clone();
// start the server thread
self.node.launch(move |os| {
let res = run_server(os, disk);
println!("server {} finished: {:?}", id, res);
});
}
pub fn restart(&self) {
self.node.crash_stop();
self.launch();
}
}
pub struct TestConfig {
pub network: NetworkOptions,
pub timeout: u64,
pub clock: Option<SimClock>,
}
impl TestConfig {
pub fn new(clock: Option<SimClock>) -> Self {
Self {
network: NetworkOptions {
keepalive_timeout: Some(2000),
connect_delay: Delay {
min: 1,
max: 5,
fail_prob: 0.0,
},
send_delay: Delay {
min: 1,
max: 5,
fail_prob: 0.0,
},
},
timeout: 1_000 * 10,
clock,
}
}
pub fn start(&self, seed: u64) -> Test {
let world = Arc::new(World::new(
seed,
Arc::new(self.network.clone()),
c_context(),
));
world.register_world();
if let Some(clock) = &self.clock {
clock.set_world(world.clone());
}
let servers = [
SkNode::new(world.new_node()),
SkNode::new(world.new_node()),
SkNode::new(world.new_node()),
];
let server_ids = [servers[0].id, servers[1].id, servers[2].id];
info!("safekeepers: {:?}", server_ids);
let safekeepers_guc = server_ids.map(|id| format!("node:{}", id)).join(",");
let ttid = TenantTimelineId::generate();
// wait init for all servers
world.await_all();
// clean up pgdata directory
self.init_pgdata();
Test {
world,
servers,
safekeepers_guc,
ttid,
timeout: self.timeout,
}
}
pub fn init_pgdata(&self) {
let pgdata = Path::new("/home/admin/simulator/libs/walproposer/pgdata");
if pgdata.exists() {
std::fs::remove_dir_all(pgdata).unwrap();
}
std::fs::create_dir(pgdata).unwrap();
// create empty pg_wal and pg_notify subdirs
std::fs::create_dir(pgdata.join("pg_wal")).unwrap();
std::fs::create_dir(pgdata.join("pg_notify")).unwrap();
// write postgresql.conf
let mut conf = std::fs::File::create(pgdata.join("postgresql.conf")).unwrap();
let content = "
wal_log_hints=off
hot_standby=on
fsync=off
wal_level=replica
restart_after_crash=off
shared_preload_libraries=neon
neon.pageserver_connstring=''
neon.tenant_id=cc6e67313d57283bad411600fbf5c142
neon.timeline_id=de6fa815c1e45aa61491c3d34c4eb33e
synchronous_standby_names=walproposer
neon.safekeepers='node:1,node:2,node:3'
max_connections=100
";
std::io::Write::write_all(&mut conf, content.as_bytes()).unwrap();
}
}
pub struct Test {
pub world: Arc<World>,
pub servers: [SkNode; 3],
pub safekeepers_guc: String,
pub ttid: TenantTimelineId,
pub timeout: u64,
}
impl Test {
fn launch_sync(&self) -> Arc<Node> {
let client_node = self.world.new_node();
info!("sync-safekeepers started at node {}", client_node.id);
// start the client thread
let guc = self.safekeepers_guc.clone();
let ttid = self.ttid.clone();
client_node.launch(move |_| {
let list = CString::new(guc).unwrap();
unsafe {
WalProposerCleanup();
syncSafekeepers = true;
wal_acceptors_list = list.into_raw();
wal_acceptor_reconnect_timeout = 1000;
wal_acceptor_connection_timeout = 5000;
neon_tenant_walproposer =
CString::new(ttid.tenant_id.to_string()).unwrap().into_raw();
neon_timeline_walproposer = CString::new(ttid.timeline_id.to_string())
.unwrap()
.into_raw();
WalProposerRust();
}
});
self.world.await_all();
client_node
}
pub fn sync_safekeepers(&self) -> anyhow::Result<Lsn> {
let client_node = self.launch_sync();
// poll until exit or timeout
let time_limit = self.timeout;
while self.world.step() && self.world.now() < time_limit && !client_node.is_finished() {}
if !client_node.is_finished() {
anyhow::bail!("timeout or idle stuck");
}
let res = client_node.result.lock().clone();
if res.0 != 0 {
anyhow::bail!("non-zero exitcode: {:?}", res);
}
let lsn = Lsn::from_str(&res.1)?;
Ok(lsn)
}
pub fn launch_walproposer(&self, lsn: Lsn) -> WalProposer {
let client_node = self.world.new_node();
let lsn = if lsn.0 == 0 {
// usual LSN after basebackup
Lsn(21623024)
} else {
lsn
};
// start the client thread
let guc = self.safekeepers_guc.clone();
let ttid = self.ttid.clone();
client_node.launch(move |_| {
let list = CString::new(guc).unwrap();
unsafe {
WalProposerCleanup();
sim_redo_start_lsn = lsn.0;
syncSafekeepers = false;
wal_acceptors_list = list.into_raw();
wal_acceptor_reconnect_timeout = 1000;
wal_acceptor_connection_timeout = 5000;
neon_tenant_walproposer =
CString::new(ttid.tenant_id.to_string()).unwrap().into_raw();
neon_timeline_walproposer = CString::new(ttid.timeline_id.to_string())
.unwrap()
.into_raw();
WalProposerRust();
}
});
self.world.await_all();
WalProposer {
node: client_node,
txes: Vec::new(),
last_committed_tx: 0,
commit_lsn: Lsn(0),
}
}
pub fn poll_for_duration(&self, duration: u64) {
let time_limit = std::cmp::min(self.world.now() + duration, self.timeout);
while self.world.step() && self.world.now() < time_limit {}
}
pub fn run_schedule(&self, schedule: &Schedule) -> anyhow::Result<()> {
{
let empty_event = Box::new(EmptyEvent);
let now = self.world.now();
for (time, _) in schedule {
if *time < now {
continue;
}
self.world.schedule(*time - now, empty_event.clone())
}
}
let mut wait_node = self.launch_sync();
// fake walproposer
let mut wp = WalProposer {
node: wait_node.clone(),
txes: Vec::new(),
last_committed_tx: 0,
commit_lsn: Lsn(0),
};
let mut sync_in_progress = true;
let mut skipped_tx = 0;
let mut started_tx = 0;
let mut finished_tx = 0;
let mut schedule_ptr = 0;
loop {
if !sync_in_progress {
finished_tx += wp.update();
}
if sync_in_progress && wait_node.is_finished() {
let res = wait_node.result.lock().clone();
if res.0 != 0 {
anyhow::bail!("non-zero exitcode: {:?}", res);
}
let lsn = Lsn::from_str(&res.1)?;
info!("sync-safekeepers finished at LSN {}", lsn);
wp = self.launch_walproposer(lsn);
wait_node = wp.node.clone();
info!("walproposer started at node {}", wait_node.id);
sync_in_progress = false;
}
let now = self.world.now();
while schedule_ptr < schedule.len() && schedule[schedule_ptr].0 <= now {
if now != schedule[schedule_ptr].0 {
warn!("skipped event {:?} at {}", schedule[schedule_ptr], now);
}
let action = &schedule[schedule_ptr].1;
match action {
TestAction::WriteTx(size) => {
if !sync_in_progress && !wait_node.is_finished() {
for _ in 0..*size {
started_tx += 1;
wp.write_tx();
}
info!("written {} transactions", size);
} else {
skipped_tx += size;
info!("skipped {} transactions", size);
}
}
TestAction::RestartSafekeeper(id) => {
info!("restarting safekeeper {}", id);
self.servers[*id as usize].restart();
}
TestAction::RestartWalProposer => {
info!("restarting walproposer");
wait_node.crash_stop();
sync_in_progress = true;
wait_node = self.launch_sync();
}
}
schedule_ptr += 1;
}
if schedule_ptr == schedule.len() {
break;
}
let next_event_time = schedule[schedule_ptr].0;
info!("next event at {}, polling", next_event_time);
// poll until the next event
if wait_node.is_finished() {
while self.world.step() && self.world.now() < next_event_time {}
} else {
while self.world.step() && self.world.now() < next_event_time && !wait_node.is_finished() {}
}
}
info!("finished schedule");
info!("skipped_tx: {}", skipped_tx);
info!("started_tx: {}", started_tx);
info!("finished_tx: {}", finished_tx);
Ok(())
}
}
pub struct WalProposer {
pub node: Arc<Node>,
pub txes: Vec<Lsn>,
pub last_committed_tx: usize,
pub commit_lsn: Lsn,
}
impl WalProposer {
pub fn write_tx(&mut self) -> usize {
let new_ptr = unsafe { MyInsertRecord() };
self.node
.network_chan()
.send(NodeEvent::Internal(AnyMessage::LSN(new_ptr as u64)));
let tx_id = self.txes.len();
self.txes.push(Lsn(new_ptr as u64));
tx_id
}
/// Updates committed status.
pub fn update(&mut self) -> u64 {
let last_result = self.node.result.lock().clone();
if last_result.0 != 1 {
// not an LSN update
return 0;
}
let mut commited_now = 0;
let lsn_str = last_result.1;
let lsn = Lsn::from_str(&lsn_str);
match lsn {
Ok(lsn) => {
self.commit_lsn = lsn;
info!("commit_lsn: {}", lsn);
while self.last_committed_tx < self.txes.len()
&& self.txes[self.last_committed_tx] <= lsn
{
info!(
"Tx #{} was commited at {}, last_commit_lsn={}",
self.last_committed_tx, self.txes[self.last_committed_tx], self.commit_lsn
);
commited_now += 1;
self.last_committed_tx += 1;
}
}
Err(e) => {
error!("failed to parse LSN: {:?}", e);
}
}
commited_now
}
pub fn stop(&self) {
self.node.crash_stop();
}
}
#[derive(Debug)]
pub enum TestAction {
WriteTx(usize),
RestartSafekeeper(usize),
RestartWalProposer,
}
pub type Schedule = Vec<(u64, TestAction)>;

View File

@@ -15,298 +15,10 @@ use crate::{
MyInsertRecord, WalProposerCleanup, WalProposerRust,
},
c_context,
simtest::{safekeeper::run_server, log::{SimClock, init_logger}},
simtest::{safekeeper::run_server, log::{SimClock, init_logger}, util::TestConfig},
};
use super::disk::Disk;
struct SkNode {
node: Arc<Node>,
id: u32,
disk: Arc<Disk>,
}
impl SkNode {
fn new(node: Arc<Node>) -> Self {
let disk = Arc::new(Disk::new());
let res = Self {
id: node.id,
node,
disk,
};
res.launch();
res
}
fn launch(&self) {
let id = self.id;
let disk = self.disk.clone();
// start the server thread
self.node.launch(move |os| {
let res = run_server(os, disk);
println!("server {} finished: {:?}", id, res);
});
}
fn restart(&self) {
self.node.crash_stop();
self.launch();
}
}
struct TestConfig {
network: NetworkOptions,
timeout: u64,
clock: Option<SimClock>,
}
impl TestConfig {
fn new(clock: Option<SimClock>) -> Self {
Self {
network: NetworkOptions {
timeout: Some(2000),
connect_delay: Delay {
min: 1,
max: 5,
fail_prob: 0.0,
},
send_delay: Delay {
min: 1,
max: 5,
fail_prob: 0.0,
},
},
timeout: 1_000 * 10,
clock,
}
}
fn start(&self, seed: u64) -> Test {
let world = Arc::new(World::new(
seed,
Arc::new(self.network.clone()),
c_context(),
));
world.register_world();
if let Some(clock) = &self.clock {
clock.set_world(world.clone());
}
let servers = [
SkNode::new(world.new_node()),
SkNode::new(world.new_node()),
SkNode::new(world.new_node()),
];
let server_ids = [servers[0].id, servers[1].id, servers[2].id];
let safekeepers_guc = server_ids.map(|id| format!("node:{}", id)).join(",");
let ttid = TenantTimelineId::generate();
// wait init for all servers
world.await_all();
// clean up pgdata directory
self.init_pgdata();
Test {
world,
servers,
safekeepers_guc,
ttid,
timeout: self.timeout,
}
}
fn init_pgdata(&self) {
let pgdata = Path::new("/home/admin/simulator/libs/walproposer/pgdata");
if pgdata.exists() {
std::fs::remove_dir_all(pgdata).unwrap();
}
std::fs::create_dir(pgdata).unwrap();
// create empty pg_wal and pg_notify subdirs
std::fs::create_dir(pgdata.join("pg_wal")).unwrap();
std::fs::create_dir(pgdata.join("pg_notify")).unwrap();
// write postgresql.conf
let mut conf = std::fs::File::create(pgdata.join("postgresql.conf")).unwrap();
let content = "
wal_log_hints=off
hot_standby=on
fsync=off
wal_level=replica
restart_after_crash=off
shared_preload_libraries=neon
neon.pageserver_connstring=''
neon.tenant_id=cc6e67313d57283bad411600fbf5c142
neon.timeline_id=de6fa815c1e45aa61491c3d34c4eb33e
synchronous_standby_names=walproposer
neon.safekeepers='node:1,node:2,node:3'
max_connections=100
";
std::io::Write::write_all(&mut conf, content.as_bytes()).unwrap();
}
}
struct Test {
world: Arc<World>,
servers: [SkNode; 3],
safekeepers_guc: String,
ttid: TenantTimelineId,
timeout: u64,
}
impl Test {
fn sync_safekeepers(&self) -> anyhow::Result<Lsn> {
let client_node = self.world.new_node();
// start the client thread
let guc = self.safekeepers_guc.clone();
let ttid = self.ttid.clone();
client_node.launch(move |_| {
let list = CString::new(guc).unwrap();
unsafe {
WalProposerCleanup();
syncSafekeepers = true;
wal_acceptors_list = list.into_raw();
wal_acceptor_reconnect_timeout = 1000;
wal_acceptor_connection_timeout = 5000;
neon_tenant_walproposer =
CString::new(ttid.tenant_id.to_string()).unwrap().into_raw();
neon_timeline_walproposer = CString::new(ttid.timeline_id.to_string())
.unwrap()
.into_raw();
WalProposerRust();
}
});
self.world.await_all();
// poll until exit or timeout
let time_limit = self.timeout;
while self.world.step() && self.world.now() < time_limit && !client_node.is_finished() {}
if !client_node.is_finished() {
anyhow::bail!("timeout or idle stuck");
}
let res = client_node.result.lock().clone();
if res.0 != 0 {
anyhow::bail!("non-zero exitcode: {:?}", res);
}
let lsn = Lsn::from_str(&res.1)?;
Ok(lsn)
}
fn launch_walproposer(&self, lsn: Lsn) -> WalProposer {
let client_node = self.world.new_node();
let lsn = if lsn.0 == 0 {
// usual LSN after basebackup
Lsn(21623024)
} else {
lsn
};
// start the client thread
let guc = self.safekeepers_guc.clone();
let ttid = self.ttid.clone();
client_node.launch(move |_| {
let list = CString::new(guc).unwrap();
unsafe {
WalProposerCleanup();
sim_redo_start_lsn = lsn.0;
syncSafekeepers = false;
wal_acceptors_list = list.into_raw();
wal_acceptor_reconnect_timeout = 1000;
wal_acceptor_connection_timeout = 5000;
neon_tenant_walproposer =
CString::new(ttid.tenant_id.to_string()).unwrap().into_raw();
neon_timeline_walproposer = CString::new(ttid.timeline_id.to_string())
.unwrap()
.into_raw();
WalProposerRust();
}
});
self.world.await_all();
WalProposer {
node: client_node,
txes: Vec::new(),
last_committed_tx: 0,
commit_lsn: Lsn(0),
}
}
fn poll_for_duration(&self, duration: u64) {
let time_limit = std::cmp::min(self.world.now() + duration, self.timeout);
while self.world.step() && self.world.now() < time_limit {}
}
}
struct WalProposer {
node: Arc<Node>,
txes: Vec<Lsn>,
last_committed_tx: usize,
commit_lsn: Lsn,
}
impl WalProposer {
fn write_tx(&mut self) -> usize {
let new_ptr = unsafe { MyInsertRecord() };
self.node
.network_chan()
.send(NodeEvent::Internal(AnyMessage::LSN(new_ptr as u64)));
let tx_id = self.txes.len();
self.txes.push(Lsn(new_ptr as u64));
tx_id
}
/// Updates committed status.
fn update(&mut self) {
let last_result = self.node.result.lock().clone();
if last_result.0 != 1 {
// not an LSN update
return;
}
let lsn_str = last_result.1;
let lsn = Lsn::from_str(&lsn_str);
match lsn {
Ok(lsn) => {
self.commit_lsn = lsn;
println!("commit_lsn: {}", lsn);
while self.last_committed_tx < self.txes.len()
&& self.txes[self.last_committed_tx] <= lsn
{
println!(
"Tx #{} was commited at {}, last_commit_lsn={}",
self.last_committed_tx, self.txes[self.last_committed_tx], self.commit_lsn
);
self.last_committed_tx += 1;
}
}
Err(e) => {
println!("failed to parse LSN: {:?}", e);
}
}
}
fn stop(&self) {
self.node.crash_stop();
}
}
use super::{disk::Disk, util::{Schedule, TestAction}};
#[test]
fn sync_empty_safekeepers() {
@@ -402,3 +114,31 @@ fn test_simple_restart() {
let lsn = test.sync_safekeepers().unwrap();
println!("Sucessfully synced safekeepers at {}", lsn);
}
#[test]
fn test_simple_schedule() {
let clock = init_logger();
let mut config = TestConfig::new(Some(clock));
config.network.keepalive_timeout = Some(100);
let test = config.start(1337);
let schedule: Schedule = vec![
(0, TestAction::RestartWalProposer),
(50, TestAction::WriteTx(5)),
(100, TestAction::RestartSafekeeper(0)),
(100, TestAction::WriteTx(5)),
(110, TestAction::RestartSafekeeper(1)),
(110, TestAction::WriteTx(5)),
(120, TestAction::RestartSafekeeper(2)),
(120, TestAction::WriteTx(5)),
(201, TestAction::RestartWalProposer),
(251, TestAction::RestartSafekeeper(0)),
(251, TestAction::RestartSafekeeper(1)),
(251, TestAction::RestartSafekeeper(2)),
(251, TestAction::WriteTx(5)),
(255, TestAction::WriteTx(5)),
(1000, TestAction::WriteTx(5)),
];
test.run_schedule(&schedule).unwrap();
}

View File

@@ -54,7 +54,7 @@ impl Delay {
#[derive(Clone)]
pub struct NetworkOptions {
/// Connection will be automatically closed after this timeout.
pub timeout: Option<u64>,
pub keepalive_timeout: Option<u64>,
pub connect_delay: Delay,
pub send_delay: Delay,
}
@@ -117,7 +117,7 @@ impl VirtualConnection {
/// Notify the future about the possible timeout.
fn schedule_timeout(self: &Arc<Self>) {
if let Some(timeout) = self.options.timeout {
if let Some(timeout) = self.options.keepalive_timeout {
self.world.schedule(timeout, self.as_event());
}
}
@@ -139,7 +139,7 @@ impl VirtualConnection {
// Close the one side of the connection by timeout if the node
// has not received any messages for a long time.
if let Some(timeout) = self.options.timeout {
if let Some(timeout) = self.options.keepalive_timeout {
let mut to_close = [false, false];
for direction in 0..2 {
let node_idx = direction ^ 1;

View File

@@ -163,7 +163,7 @@ impl Park {
}
if state.panic {
panic!("thread was crashed by the simulation");
panic!("node {} was crashed by the simulation", node.id);
}
// println!("PARKING ENDED: node {:?}", node.id);

View File

@@ -143,3 +143,10 @@ impl Debug for NetworkEvent {
.finish()
}
}
#[derive(Copy, Clone, Debug)]
pub struct EmptyEvent;
impl Event for EmptyEvent {
fn process(&self) {}
}

View File

@@ -1,4 +1,5 @@
use rand::{rngs::StdRng, Rng, SeedableRng};
use tracing::debug;
use std::{
cell::RefCell,
ops::DerefMut,
@@ -71,7 +72,6 @@ impl World {
/// Create a new node.
pub fn new_node(self: &Arc<Self>) -> Arc<Node> {
// TODO: verify
let mut nodes = self.nodes.lock();
let id = nodes.len() as NodeId;
let node = Arc::new(Node::new(id, self.clone(), self.new_rng()));
@@ -202,14 +202,14 @@ impl World {
/// Print full world state to stdout.
pub fn debug_print_state(&self) {
println!(
"[DEBUG] World state, nodes.len()={:?}, parking.len()={:?}",
debug!(
"World state, nodes.len()={:?}, parking.len()={:?}",
self.nodes.lock().len(),
self.unconditional_parking.lock().len()
);
for node in self.nodes.lock().iter() {
println!(
"[DEBUG] node id={:?} status={:?}",
debug!(
"node id={:?} status={:?}",
node.id,
node.status.lock()
);
@@ -424,7 +424,7 @@ impl Node {
NodeStatus::Waiting | NodeStatus::Parked => {}
}
println!("Node {} is crashing, status={:?}", self.id, status);
debug!("Node {} is crashing, status={:?}", self.id, status);
self.world.debug_print_state();
let park = self.world.find_parked_node(self);

View File

@@ -27,7 +27,7 @@ mod tests {
};
let network = NetworkOptions {
timeout: Some(50),
keepalive_timeout: Some(50),
connect_delay: delay.clone(),
send_delay: delay.clone(),
};