From 731ed3bb6470e3db18190498220d36dd2b93605e Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Thu, 17 Aug 2023 13:09:55 +0000 Subject: [PATCH] Support virtual disk in tests --- libs/walproposer/src/simtest/disk.rs | 34 +++ libs/walproposer/src/simtest/mod.rs | 1 + libs/walproposer/src/simtest/safekeeper.rs | 276 +++++++++++------- libs/walproposer/src/simtest/simple_client.rs | 11 +- libs/walproposer/src/simtest/storage.rs | 8 +- libs/walproposer/src/simtest/wp_sk.rs | 13 +- 6 files changed, 232 insertions(+), 111 deletions(-) create mode 100644 libs/walproposer/src/simtest/disk.rs diff --git a/libs/walproposer/src/simtest/disk.rs b/libs/walproposer/src/simtest/disk.rs new file mode 100644 index 0000000000..a7bb9271b0 --- /dev/null +++ b/libs/walproposer/src/simtest/disk.rs @@ -0,0 +1,34 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use safekeeper::safekeeper::SafeKeeperState; +use safekeeper::simlib::sync::Mutex; +use utils::id::TenantTimelineId; + +pub struct Disk { + pub timelines: Mutex>>, +} + +impl Disk { + pub fn new() -> Self { + Disk { + timelines: Mutex::new(HashMap::new()), + } + } + + pub fn put(&self, ttid: &TenantTimelineId, state: SafeKeeperState) -> Arc { + self.timelines + .lock() + .entry(ttid.clone()) + .or_insert_with(|| { + Arc::new(TimelineDisk { + state: Mutex::new(state), + }) + }) + .clone() + } +} + +pub struct TimelineDisk { + pub state: Mutex, +} diff --git a/libs/walproposer/src/simtest/mod.rs b/libs/walproposer/src/simtest/mod.rs index 8d453e23c2..0776a9cdfc 100644 --- a/libs/walproposer/src/simtest/mod.rs +++ b/libs/walproposer/src/simtest/mod.rs @@ -4,5 +4,6 @@ pub mod simple_client; #[cfg(test)] pub mod wp_sk; +pub mod disk; pub mod safekeeper; pub mod storage; diff --git a/libs/walproposer/src/simtest/safekeeper.rs b/libs/walproposer/src/simtest/safekeeper.rs index fb7ce97af1..17e2ccfa41 100644 --- a/libs/walproposer/src/simtest/safekeeper.rs +++ b/libs/walproposer/src/simtest/safekeeper.rs @@ -2,7 +2,7 @@ //! Gets messages from the network, passes them down to consensus module and //! sends replies back. -use std::{collections::HashMap, path::PathBuf, time::Duration}; +use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration}; use anyhow::{bail, Result}; use bytes::BytesMut; @@ -24,13 +24,13 @@ use utils::{ use crate::simtest::storage::{DummyWalStore, InMemoryState}; +use super::disk::Disk; + struct ConnState { tcp: TCP, - conf: SafeKeeperConf, greeting: bool, ttid: TenantTimelineId, - tli: Option, flush_pending: bool, } @@ -38,96 +38,59 @@ struct SharedState { sk: SafeKeeper, } -pub fn run_server(os: NodeOs) -> Result<()> { - println!("started server {}", os.id()); - let conf = SafeKeeperConf { - workdir: PathBuf::from("."), - my_id: NodeId(os.id() as u64), - listen_pg_addr: String::new(), - listen_http_addr: String::new(), - no_sync: false, - broker_endpoint: "/".parse::().unwrap(), - broker_keepalive_interval: Duration::from_secs(0), - heartbeat_timeout: Duration::from_secs(0), - remote_storage: None, - max_offloader_lag_bytes: 0, - backup_runtime_threads: None, - wal_backup_enabled: false, - auth: None, - }; - - let mut conns: HashMap = HashMap::new(); - - let epoll = os.epoll(); - loop { - // waiting for the next message - let mut next_event = Some(epoll.recv()); - - loop { - let event = match next_event { - Some(event) => event, - None => break, - }; - - match event { - NodeEvent::Accept(tcp) => { - conns.insert( - tcp.id(), - ConnState { - tcp, - conf: conf.clone(), - greeting: false, - ttid: TenantTimelineId::empty(), - tli: None, - flush_pending: false, - }, - ); - } - NodeEvent::Message((msg, tcp)) => { - let conn = conns.get_mut(&tcp.id()); - if let Some(conn) = conn { - let res = conn.process_any(msg); - if res.is_err() { - println!("conn {:?} error: {:?}", tcp, res); - conns.remove(&tcp.id()); - } - } else { - println!("conn {:?} was closed, dropping msg {:?}", tcp, msg); - } - } - NodeEvent::Internal(_) => {} - NodeEvent::Closed(_) => {} - NodeEvent::WakeTimeout(_) => {} - } - - // TODO: make simulator support multiple events per tick - next_event = epoll.try_recv(); - } - - conns.retain(|_, conn| { - let res = conn.flush(); - if res.is_err() { - println!("conn {:?} error: {:?}", conn.tcp, res); - } - res.is_ok() - }); - } +struct GlobalMap { + timelines: HashMap, + conf: SafeKeeperConf, + disk: Arc, } -impl ConnState { - fn process_any(&mut self, any: AnyMessage) -> Result<()> { - if let AnyMessage::Bytes(copy_data) = any { - let msg = ProposerAcceptorMessage::parse(copy_data)?; - // println!("got msg: {:?}", msg); - return self.process(msg); - } else { - bail!("unexpected message, expected AnyMessage::Bytes"); +impl GlobalMap { + fn new(disk: Arc, conf: SafeKeeperConf) -> Result { + let mut timelines = HashMap::new(); + + for (&ttid, disk) in disk.timelines.lock().iter() { + info!("loading timeline {}", ttid); + let state = disk.state.lock().clone(); + + if state.server.wal_seg_size == 0 { + bail!(TimelineError::UninitializedWalSegSize(ttid)); + } + + if state.server.pg_version == UNKNOWN_SERVER_VERSION { + bail!(TimelineError::UninitialinzedPgVersion(ttid)); + } + + if state.commit_lsn < state.local_start_lsn { + bail!( + "commit_lsn {} is higher than local_start_lsn {}", + state.commit_lsn, + state.local_start_lsn + ); + } + + // TODO: implement "persistent" storage for tests + let control_store = InMemoryState::new(state.clone()); + + // TODO: implement "persistent" storage for tests + let wal_store = DummyWalStore::new(); + + let sk = SafeKeeper::new(control_store, wal_store, conf.my_id)?; + timelines.insert(ttid.clone(), SharedState { sk }); } + + Ok(Self { + timelines, + conf, + disk, + }) } - fn create_timeline(&mut self, ttid: TenantTimelineId, server_info: ServerInfo) -> Result<()> { + fn create(&mut self, ttid: TenantTimelineId, server_info: ServerInfo) -> Result<()> { + if self.timelines.contains_key(&ttid) { + bail!("timeline {} already exists", ttid); + } + info!("creating new timeline {}", ttid); - self.ttid = ttid; let commit_lsn = Lsn::INVALID; let local_start_lsn = Lsn::INVALID; @@ -159,12 +122,120 @@ impl ConnState { let sk = SafeKeeper::new(control_store, wal_store, self.conf.my_id)?; - self.tli = Some(SharedState { sk }); - + self.timelines.insert(ttid.clone(), SharedState { sk }); Ok(()) } - fn process(&mut self, msg: ProposerAcceptorMessage) -> Result<()> { + fn get(&mut self, ttid: &TenantTimelineId) -> &mut SharedState { + self.timelines.get_mut(ttid).expect("timeline must exist") + } + + fn has_tli(&self, ttid: &TenantTimelineId) -> bool { + self.timelines.contains_key(ttid) + } +} + +pub fn run_server(os: NodeOs, disk: Arc) -> Result<()> { + println!("started server {}", os.id()); + let conf = SafeKeeperConf { + workdir: PathBuf::from("."), + my_id: NodeId(os.id() as u64), + listen_pg_addr: String::new(), + listen_http_addr: String::new(), + no_sync: false, + broker_endpoint: "/".parse::().unwrap(), + broker_keepalive_interval: Duration::from_secs(0), + heartbeat_timeout: Duration::from_secs(0), + remote_storage: None, + max_offloader_lag_bytes: 0, + backup_runtime_threads: None, + wal_backup_enabled: false, + auth: None, + }; + + let mut global = GlobalMap::new(disk, conf.clone())?; + let mut conns: HashMap = HashMap::new(); + + let epoll = os.epoll(); + loop { + // waiting for the next message + let mut next_event = Some(epoll.recv()); + + loop { + let event = match next_event { + Some(event) => event, + None => break, + }; + + match event { + NodeEvent::Accept(tcp) => { + conns.insert( + tcp.id(), + ConnState { + tcp, + greeting: false, + ttid: TenantTimelineId::empty(), + flush_pending: false, + }, + ); + } + NodeEvent::Message((msg, tcp)) => { + let conn = conns.get_mut(&tcp.id()); + if let Some(conn) = conn { + let res = conn.process_any(msg, &mut global); + if res.is_err() { + println!("conn {:?} error: {:?}", tcp, res); + conns.remove(&tcp.id()); + } + } else { + println!("conn {:?} was closed, dropping msg {:?}", tcp, msg); + } + } + NodeEvent::Internal(_) => {} + NodeEvent::Closed(_) => {} + NodeEvent::WakeTimeout(_) => {} + } + + // TODO: make simulator support multiple events per tick + next_event = epoll.try_recv(); + } + + conns.retain(|_, conn| { + let res = conn.flush(&mut global); + if res.is_err() { + println!("conn {:?} error: {:?}", conn.tcp, res); + } + res.is_ok() + }); + } +} + +impl ConnState { + fn process_any(&mut self, any: AnyMessage, global: &mut GlobalMap) -> Result<()> { + if let AnyMessage::Bytes(copy_data) = any { + let msg = ProposerAcceptorMessage::parse(copy_data)?; + // println!("got msg: {:?}", msg); + return self.process(msg, global); + } else { + bail!("unexpected message, expected AnyMessage::Bytes"); + } + } + + fn init_timeline( + &mut self, + ttid: TenantTimelineId, + server_info: ServerInfo, + global: &mut GlobalMap, + ) -> Result<()> { + self.ttid = ttid; + if global.has_tli(&ttid) { + return Ok(()); + } + + global.create(ttid, server_info) + } + + fn process(&mut self, msg: ProposerAcceptorMessage, global: &mut GlobalMap) -> Result<()> { if !self.greeting { self.greeting = true; @@ -180,7 +251,7 @@ impl ConnState { wal_seg_size: greeting.wal_seg_size, }; let ttid = TenantTimelineId::new(greeting.tenant_id, greeting.timeline_id); - self.create_timeline(ttid, server_info)? + self.init_timeline(ttid, server_info, global)? } _ => { bail!("unexpected message {msg:?} instead of greeting"); @@ -188,15 +259,18 @@ impl ConnState { } } + let tli = global.get(&self.ttid); + match msg { ProposerAcceptorMessage::AppendRequest(append_request) => { self.flush_pending = true; - self.process_sk_msg(&ProposerAcceptorMessage::NoFlushAppendRequest( - append_request, - ))?; + self.process_sk_msg( + tli, + &ProposerAcceptorMessage::NoFlushAppendRequest(append_request), + )?; } other => { - self.process_sk_msg(&other)?; + self.process_sk_msg(tli, &other)?; } } @@ -205,17 +279,21 @@ impl ConnState { /// Process FlushWAL if needed. // TODO: add extra flushes, to verify that extra flushes don't break anything - fn flush(&mut self) -> Result<()> { + fn flush(&mut self, global: &mut GlobalMap) -> Result<()> { if !self.flush_pending { return Ok(()); } self.flush_pending = false; - self.process_sk_msg(&ProposerAcceptorMessage::FlushWAL) + let shared_state = global.get(&self.ttid); + self.process_sk_msg(shared_state, &ProposerAcceptorMessage::FlushWAL) } /// Make safekeeper process a message and send a reply to the TCP - fn process_sk_msg(&mut self, msg: &ProposerAcceptorMessage) -> Result<()> { - let shared_state = self.tli.as_mut().unwrap(); + fn process_sk_msg( + &mut self, + shared_state: &mut SharedState, + msg: &ProposerAcceptorMessage, + ) -> Result<()> { let mut reply = shared_state.sk.process_msg(msg)?; if let Some(reply) = &mut reply { // // if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn diff --git a/libs/walproposer/src/simtest/simple_client.rs b/libs/walproposer/src/simtest/simple_client.rs index f2c84abe55..f4c8f9f456 100644 --- a/libs/walproposer/src/simtest/simple_client.rs +++ b/libs/walproposer/src/simtest/simple_client.rs @@ -1,7 +1,10 @@ use std::sync::Arc; use safekeeper::{ - simlib::{network::{Delay, NetworkOptions}, world::World}, + simlib::{ + network::{Delay, NetworkOptions}, + world::World, + }, simtest::{start_simulation, Options}, }; @@ -27,10 +30,8 @@ fn run_rust_c_test() { start_simulation(Options { world, time_limit: 1_000_000, - client_fn: Box::new(move |_, server_id| { - unsafe { - RunClientC(server_id); - } + client_fn: Box::new(move |_, server_id| unsafe { + RunClientC(server_id); }), u32_data, }); diff --git a/libs/walproposer/src/simtest/storage.rs b/libs/walproposer/src/simtest/storage.rs index 51c5e6dbcf..57b675728c 100644 --- a/libs/walproposer/src/simtest/storage.rs +++ b/libs/walproposer/src/simtest/storage.rs @@ -1,9 +1,9 @@ use std::ops::Deref; -use safekeeper::{safekeeper::SafeKeeperState, control_file, wal_storage}; use anyhow::Result; -use utils::lsn::Lsn; use postgres_ffi::XLogSegNo; +use safekeeper::{control_file, safekeeper::SafeKeeperState, wal_storage}; +use utils::lsn::Lsn; pub struct InMemoryState { persisted_state: SafeKeeperState, @@ -11,9 +11,7 @@ pub struct InMemoryState { impl InMemoryState { pub fn new(persisted_state: SafeKeeperState) -> Self { - InMemoryState { - persisted_state, - } + InMemoryState { persisted_state } } } diff --git a/libs/walproposer/src/simtest/wp_sk.rs b/libs/walproposer/src/simtest/wp_sk.rs index ad16d4dc32..58e682dbb7 100644 --- a/libs/walproposer/src/simtest/wp_sk.rs +++ b/libs/walproposer/src/simtest/wp_sk.rs @@ -18,23 +18,32 @@ use crate::{ simtest::safekeeper::run_server, }; +use super::disk::Disk; + struct SkNode { node: Arc, id: u32, + disk: Arc, } impl SkNode { fn new(node: Arc) -> Self { - let res = Self { id: node.id, node }; + let disk = Arc::new(Disk::new()); + let res = Self { + id: node.id, + node, + disk, + }; res.launch(); res } fn launch(&self) { let id = self.id; + let disk = self.disk.clone(); // start the server thread self.node.launch(move |os| { - let res = run_server(os); + let res = run_server(os, disk); println!("server {} finished: {:?}", id, res); }); }