Support virtual disk in tests

This commit is contained in:
Arthur Petukhovsky
2023-08-17 13:09:55 +00:00
parent 413ce2cfe8
commit 731ed3bb64
6 changed files with 232 additions and 111 deletions

View File

@@ -0,0 +1,34 @@
use std::collections::HashMap;
use std::sync::Arc;
use safekeeper::safekeeper::SafeKeeperState;
use safekeeper::simlib::sync::Mutex;
use utils::id::TenantTimelineId;
pub struct Disk {
pub timelines: Mutex<HashMap<TenantTimelineId, Arc<TimelineDisk>>>,
}
impl Disk {
pub fn new() -> Self {
Disk {
timelines: Mutex::new(HashMap::new()),
}
}
pub fn put(&self, ttid: &TenantTimelineId, state: SafeKeeperState) -> Arc<TimelineDisk> {
self.timelines
.lock()
.entry(ttid.clone())
.or_insert_with(|| {
Arc::new(TimelineDisk {
state: Mutex::new(state),
})
})
.clone()
}
}
pub struct TimelineDisk {
pub state: Mutex<SafeKeeperState>,
}

View File

@@ -4,5 +4,6 @@ pub mod simple_client;
#[cfg(test)]
pub mod wp_sk;
pub mod disk;
pub mod safekeeper;
pub mod storage;

View File

@@ -2,7 +2,7 @@
//! Gets messages from the network, passes them down to consensus module and
//! sends replies back.
use std::{collections::HashMap, path::PathBuf, time::Duration};
use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration};
use anyhow::{bail, Result};
use bytes::BytesMut;
@@ -24,13 +24,13 @@ use utils::{
use crate::simtest::storage::{DummyWalStore, InMemoryState};
use super::disk::Disk;
struct ConnState {
tcp: TCP,
conf: SafeKeeperConf,
greeting: bool,
ttid: TenantTimelineId,
tli: Option<SharedState>,
flush_pending: bool,
}
@@ -38,96 +38,59 @@ struct SharedState {
sk: SafeKeeper<InMemoryState, DummyWalStore>,
}
pub fn run_server(os: NodeOs) -> Result<()> {
println!("started server {}", os.id());
let conf = SafeKeeperConf {
workdir: PathBuf::from("."),
my_id: NodeId(os.id() as u64),
listen_pg_addr: String::new(),
listen_http_addr: String::new(),
no_sync: false,
broker_endpoint: "/".parse::<Uri>().unwrap(),
broker_keepalive_interval: Duration::from_secs(0),
heartbeat_timeout: Duration::from_secs(0),
remote_storage: None,
max_offloader_lag_bytes: 0,
backup_runtime_threads: None,
wal_backup_enabled: false,
auth: None,
};
let mut conns: HashMap<i64, ConnState> = HashMap::new();
let epoll = os.epoll();
loop {
// waiting for the next message
let mut next_event = Some(epoll.recv());
loop {
let event = match next_event {
Some(event) => event,
None => break,
};
match event {
NodeEvent::Accept(tcp) => {
conns.insert(
tcp.id(),
ConnState {
tcp,
conf: conf.clone(),
greeting: false,
ttid: TenantTimelineId::empty(),
tli: None,
flush_pending: false,
},
);
}
NodeEvent::Message((msg, tcp)) => {
let conn = conns.get_mut(&tcp.id());
if let Some(conn) = conn {
let res = conn.process_any(msg);
if res.is_err() {
println!("conn {:?} error: {:?}", tcp, res);
conns.remove(&tcp.id());
}
} else {
println!("conn {:?} was closed, dropping msg {:?}", tcp, msg);
}
}
NodeEvent::Internal(_) => {}
NodeEvent::Closed(_) => {}
NodeEvent::WakeTimeout(_) => {}
}
// TODO: make simulator support multiple events per tick
next_event = epoll.try_recv();
}
conns.retain(|_, conn| {
let res = conn.flush();
if res.is_err() {
println!("conn {:?} error: {:?}", conn.tcp, res);
}
res.is_ok()
});
}
struct GlobalMap {
timelines: HashMap<TenantTimelineId, SharedState>,
conf: SafeKeeperConf,
disk: Arc<Disk>,
}
impl ConnState {
fn process_any(&mut self, any: AnyMessage) -> Result<()> {
if let AnyMessage::Bytes(copy_data) = any {
let msg = ProposerAcceptorMessage::parse(copy_data)?;
// println!("got msg: {:?}", msg);
return self.process(msg);
} else {
bail!("unexpected message, expected AnyMessage::Bytes");
impl GlobalMap {
fn new(disk: Arc<Disk>, conf: SafeKeeperConf) -> Result<Self> {
let mut timelines = HashMap::new();
for (&ttid, disk) in disk.timelines.lock().iter() {
info!("loading timeline {}", ttid);
let state = disk.state.lock().clone();
if state.server.wal_seg_size == 0 {
bail!(TimelineError::UninitializedWalSegSize(ttid));
}
if state.server.pg_version == UNKNOWN_SERVER_VERSION {
bail!(TimelineError::UninitialinzedPgVersion(ttid));
}
if state.commit_lsn < state.local_start_lsn {
bail!(
"commit_lsn {} is higher than local_start_lsn {}",
state.commit_lsn,
state.local_start_lsn
);
}
// TODO: implement "persistent" storage for tests
let control_store = InMemoryState::new(state.clone());
// TODO: implement "persistent" storage for tests
let wal_store = DummyWalStore::new();
let sk = SafeKeeper::new(control_store, wal_store, conf.my_id)?;
timelines.insert(ttid.clone(), SharedState { sk });
}
Ok(Self {
timelines,
conf,
disk,
})
}
fn create_timeline(&mut self, ttid: TenantTimelineId, server_info: ServerInfo) -> Result<()> {
fn create(&mut self, ttid: TenantTimelineId, server_info: ServerInfo) -> Result<()> {
if self.timelines.contains_key(&ttid) {
bail!("timeline {} already exists", ttid);
}
info!("creating new timeline {}", ttid);
self.ttid = ttid;
let commit_lsn = Lsn::INVALID;
let local_start_lsn = Lsn::INVALID;
@@ -159,12 +122,120 @@ impl ConnState {
let sk = SafeKeeper::new(control_store, wal_store, self.conf.my_id)?;
self.tli = Some(SharedState { sk });
self.timelines.insert(ttid.clone(), SharedState { sk });
Ok(())
}
fn process(&mut self, msg: ProposerAcceptorMessage) -> Result<()> {
fn get(&mut self, ttid: &TenantTimelineId) -> &mut SharedState {
self.timelines.get_mut(ttid).expect("timeline must exist")
}
fn has_tli(&self, ttid: &TenantTimelineId) -> bool {
self.timelines.contains_key(ttid)
}
}
pub fn run_server(os: NodeOs, disk: Arc<Disk>) -> Result<()> {
println!("started server {}", os.id());
let conf = SafeKeeperConf {
workdir: PathBuf::from("."),
my_id: NodeId(os.id() as u64),
listen_pg_addr: String::new(),
listen_http_addr: String::new(),
no_sync: false,
broker_endpoint: "/".parse::<Uri>().unwrap(),
broker_keepalive_interval: Duration::from_secs(0),
heartbeat_timeout: Duration::from_secs(0),
remote_storage: None,
max_offloader_lag_bytes: 0,
backup_runtime_threads: None,
wal_backup_enabled: false,
auth: None,
};
let mut global = GlobalMap::new(disk, conf.clone())?;
let mut conns: HashMap<i64, ConnState> = HashMap::new();
let epoll = os.epoll();
loop {
// waiting for the next message
let mut next_event = Some(epoll.recv());
loop {
let event = match next_event {
Some(event) => event,
None => break,
};
match event {
NodeEvent::Accept(tcp) => {
conns.insert(
tcp.id(),
ConnState {
tcp,
greeting: false,
ttid: TenantTimelineId::empty(),
flush_pending: false,
},
);
}
NodeEvent::Message((msg, tcp)) => {
let conn = conns.get_mut(&tcp.id());
if let Some(conn) = conn {
let res = conn.process_any(msg, &mut global);
if res.is_err() {
println!("conn {:?} error: {:?}", tcp, res);
conns.remove(&tcp.id());
}
} else {
println!("conn {:?} was closed, dropping msg {:?}", tcp, msg);
}
}
NodeEvent::Internal(_) => {}
NodeEvent::Closed(_) => {}
NodeEvent::WakeTimeout(_) => {}
}
// TODO: make simulator support multiple events per tick
next_event = epoll.try_recv();
}
conns.retain(|_, conn| {
let res = conn.flush(&mut global);
if res.is_err() {
println!("conn {:?} error: {:?}", conn.tcp, res);
}
res.is_ok()
});
}
}
impl ConnState {
fn process_any(&mut self, any: AnyMessage, global: &mut GlobalMap) -> Result<()> {
if let AnyMessage::Bytes(copy_data) = any {
let msg = ProposerAcceptorMessage::parse(copy_data)?;
// println!("got msg: {:?}", msg);
return self.process(msg, global);
} else {
bail!("unexpected message, expected AnyMessage::Bytes");
}
}
fn init_timeline(
&mut self,
ttid: TenantTimelineId,
server_info: ServerInfo,
global: &mut GlobalMap,
) -> Result<()> {
self.ttid = ttid;
if global.has_tli(&ttid) {
return Ok(());
}
global.create(ttid, server_info)
}
fn process(&mut self, msg: ProposerAcceptorMessage, global: &mut GlobalMap) -> Result<()> {
if !self.greeting {
self.greeting = true;
@@ -180,7 +251,7 @@ impl ConnState {
wal_seg_size: greeting.wal_seg_size,
};
let ttid = TenantTimelineId::new(greeting.tenant_id, greeting.timeline_id);
self.create_timeline(ttid, server_info)?
self.init_timeline(ttid, server_info, global)?
}
_ => {
bail!("unexpected message {msg:?} instead of greeting");
@@ -188,15 +259,18 @@ impl ConnState {
}
}
let tli = global.get(&self.ttid);
match msg {
ProposerAcceptorMessage::AppendRequest(append_request) => {
self.flush_pending = true;
self.process_sk_msg(&ProposerAcceptorMessage::NoFlushAppendRequest(
append_request,
))?;
self.process_sk_msg(
tli,
&ProposerAcceptorMessage::NoFlushAppendRequest(append_request),
)?;
}
other => {
self.process_sk_msg(&other)?;
self.process_sk_msg(tli, &other)?;
}
}
@@ -205,17 +279,21 @@ impl ConnState {
/// Process FlushWAL if needed.
// TODO: add extra flushes, to verify that extra flushes don't break anything
fn flush(&mut self) -> Result<()> {
fn flush(&mut self, global: &mut GlobalMap) -> Result<()> {
if !self.flush_pending {
return Ok(());
}
self.flush_pending = false;
self.process_sk_msg(&ProposerAcceptorMessage::FlushWAL)
let shared_state = global.get(&self.ttid);
self.process_sk_msg(shared_state, &ProposerAcceptorMessage::FlushWAL)
}
/// Make safekeeper process a message and send a reply to the TCP
fn process_sk_msg(&mut self, msg: &ProposerAcceptorMessage) -> Result<()> {
let shared_state = self.tli.as_mut().unwrap();
fn process_sk_msg(
&mut self,
shared_state: &mut SharedState,
msg: &ProposerAcceptorMessage,
) -> Result<()> {
let mut reply = shared_state.sk.process_msg(msg)?;
if let Some(reply) = &mut reply {
// // if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn

View File

@@ -1,7 +1,10 @@
use std::sync::Arc;
use safekeeper::{
simlib::{network::{Delay, NetworkOptions}, world::World},
simlib::{
network::{Delay, NetworkOptions},
world::World,
},
simtest::{start_simulation, Options},
};
@@ -27,10 +30,8 @@ fn run_rust_c_test() {
start_simulation(Options {
world,
time_limit: 1_000_000,
client_fn: Box::new(move |_, server_id| {
unsafe {
RunClientC(server_id);
}
client_fn: Box::new(move |_, server_id| unsafe {
RunClientC(server_id);
}),
u32_data,
});

View File

@@ -1,9 +1,9 @@
use std::ops::Deref;
use safekeeper::{safekeeper::SafeKeeperState, control_file, wal_storage};
use anyhow::Result;
use utils::lsn::Lsn;
use postgres_ffi::XLogSegNo;
use safekeeper::{control_file, safekeeper::SafeKeeperState, wal_storage};
use utils::lsn::Lsn;
pub struct InMemoryState {
persisted_state: SafeKeeperState,
@@ -11,9 +11,7 @@ pub struct InMemoryState {
impl InMemoryState {
pub fn new(persisted_state: SafeKeeperState) -> Self {
InMemoryState {
persisted_state,
}
InMemoryState { persisted_state }
}
}

View File

@@ -18,23 +18,32 @@ use crate::{
simtest::safekeeper::run_server,
};
use super::disk::Disk;
struct SkNode {
node: Arc<Node>,
id: u32,
disk: Arc<Disk>,
}
impl SkNode {
fn new(node: Arc<Node>) -> Self {
let res = Self { id: node.id, node };
let disk = Arc::new(Disk::new());
let res = Self {
id: node.id,
node,
disk,
};
res.launch();
res
}
fn launch(&self) {
let id = self.id;
let disk = self.disk.clone();
// start the server thread
self.node.launch(move |os| {
let res = run_server(os);
let res = run_server(os, disk);
println!("server {} finished: {:?}", id, res);
});
}