Validate logs

This commit is contained in:
Arthur Petukhovsky
2023-09-18 17:12:40 +00:00
parent 61e6b24cb2
commit 6a00ad3aab
8 changed files with 241 additions and 92 deletions

1
Cargo.lock generated
View File

@@ -4660,6 +4660,7 @@ dependencies = [
"rand",
"regex",
"safekeeper",
"scopeguard",
"serde",
"thiserror",
"tracing",

View File

@@ -21,6 +21,7 @@ thiserror.workspace = true
tracing.workspace = true
tracing-subscriber = { workspace = true, features = ["json"] }
serde.workspace = true
scopeguard.workspace = true
utils.workspace = true
safekeeper.workspace = true
postgres_ffi.workspace = true

View File

@@ -278,6 +278,7 @@ void InitMyInsert()
{
CurrBytePos = sim_redo_start_lsn;
PrevBytePos = InvalidXLogRecPtr;
sim_latest_available_lsn = sim_redo_start_lsn;
}
static void MyBeginInsert()
@@ -419,7 +420,6 @@ MyFinishInsert(RmgrId rmid, uint8 info, uint8 flags)
// Now write it to disk.
MyCopyXLogRecordToWAL(rechdr->xl_tot_len, &hdr_rdt, StartPos, EndPos);
return EndPos;
}

View File

@@ -14,7 +14,7 @@ use safekeeper::{
},
simlib::{network::TCP, node_os::NodeOs, proto::AnyMessage, world::NodeEvent},
timeline::TimelineError,
SafeKeeperConf,
SafeKeeperConf, wal_storage::Storage,
};
use tracing::{debug, info_span};
use utils::{
@@ -149,6 +149,7 @@ impl GlobalMap {
pub fn run_server(os: NodeOs, disk: Arc<Disk>) -> Result<()> {
let _enter = info_span!("safekeeper", id = os.id()).entered();
debug!("started server");
os.log_event("started;safekeeper".to_owned());
let conf = SafeKeeperConf {
workdir: PathBuf::from("."),
my_id: NodeId(os.id() as u64),
@@ -168,6 +169,12 @@ pub fn run_server(os: NodeOs, disk: Arc<Disk>) -> Result<()> {
let mut global = GlobalMap::new(disk, conf.clone())?;
let mut conns: HashMap<i64, ConnState> = HashMap::new();
for (&ttid, shared_state) in global.timelines.iter_mut() {
let flush_lsn = shared_state.sk.wal_store.flush_lsn();
let commit_lsn = shared_state.sk.state.commit_lsn;
os.log_event(format!("tli_loaded;{};{}", flush_lsn.0, commit_lsn.0));
}
let epoll = os.epoll();
loop {
// waiting for the next message

View File

@@ -1,4 +1,4 @@
use std::{ffi::CString, path::Path, str::FromStr, sync::Arc};
use std::{ffi::CString, path::Path, str::FromStr, sync::Arc, collections::HashMap};
use rand::{Rng, SeedableRng};
use safekeeper::simlib::{
@@ -6,7 +6,7 @@ use safekeeper::simlib::{
proto::AnyMessage,
time::EmptyEvent,
world::World,
world::{Node, NodeEvent},
world::{Node, NodeEvent, SEvent, NodeId},
};
use tracing::{debug, error, info, warn};
use utils::{id::TenantTimelineId, lsn::Lsn};
@@ -253,9 +253,6 @@ impl Test {
WalProposer {
node: client_node,
txes: Vec::new(),
last_committed_tx: 0,
commit_lsn: Lsn(0),
}
}
@@ -281,23 +278,15 @@ impl Test {
// fake walproposer
let mut wp = WalProposer {
node: wait_node.clone(),
txes: Vec::new(),
last_committed_tx: 0,
commit_lsn: Lsn(0),
};
let mut sync_in_progress = true;
let mut skipped_tx = 0;
let mut started_tx = 0;
let mut finished_tx = 0;
let mut schedule_ptr = 0;
loop {
if !sync_in_progress {
finished_tx += wp.update();
}
if sync_in_progress && wait_node.is_finished() {
let res = wait_node.result.lock().clone();
if res.0 != 0 {
@@ -324,10 +313,8 @@ impl Test {
match action {
TestAction::WriteTx(size) => {
if !sync_in_progress && !wait_node.is_finished() {
for _ in 0..*size {
started_tx += 1;
wp.write_tx();
}
started_tx += *size;
wp.write_tx(*size);
debug!("written {} transactions", size);
} else {
skipped_tx += size;
@@ -367,7 +354,6 @@ impl Test {
debug!("finished schedule");
debug!("skipped_tx: {}", skipped_tx);
debug!("started_tx: {}", started_tx);
debug!("finished_tx: {}", finished_tx);
Ok(())
}
@@ -375,59 +361,13 @@ impl Test {
pub struct WalProposer {
pub node: Arc<Node>,
pub txes: Vec<Lsn>,
pub last_committed_tx: usize,
pub commit_lsn: Lsn,
}
impl WalProposer {
pub fn write_tx(&mut self) -> usize {
let new_ptr = unsafe { MyInsertRecord() };
pub fn write_tx(&mut self, cnt: usize) {
self.node
.network_chan()
.send(NodeEvent::Internal(AnyMessage::LSN(new_ptr as u64)));
let tx_id = self.txes.len();
self.txes.push(Lsn(new_ptr as u64));
tx_id
}
/// Updates committed status.
pub fn update(&mut self) -> u64 {
let last_result = self.node.result.lock().clone();
if last_result.0 != 1 {
// not an LSN update
return 0;
}
let mut commited_now = 0;
let lsn_str = last_result.1;
let lsn = Lsn::from_str(&lsn_str);
match lsn {
Ok(lsn) => {
self.commit_lsn = lsn;
debug!("commit_lsn: {}", lsn);
while self.last_committed_tx < self.txes.len()
&& self.txes[self.last_committed_tx] <= lsn
{
debug!(
"Tx #{} was commited at {}, last_commit_lsn={}",
self.last_committed_tx, self.txes[self.last_committed_tx], self.commit_lsn
);
commited_now += 1;
self.last_committed_tx += 1;
}
}
Err(e) => {
error!("failed to parse LSN: {:?}", e);
}
}
commited_now
.send(NodeEvent::Internal(AnyMessage::Just32(cnt as u32)));
}
pub fn stop(&self) {
@@ -490,3 +430,181 @@ pub fn generate_network_opts(seed: u64) -> NetworkOptions {
},
}
}
#[derive(Debug,Clone,PartialEq,Eq)]
enum NodeKind {
Unknown,
Safekeeper,
WalProposer,
}
impl Default for NodeKind {
fn default() -> Self {
Self::Unknown
}
}
#[derive(Clone, Debug, Default)]
struct NodeInfo {
kind: NodeKind,
// walproposer
is_sync: bool,
term: u64,
epoch_lsn: u64,
// safekeeper
commit_lsn: u64,
flush_lsn: u64,
}
impl NodeInfo {
fn init_kind(&mut self, kind: NodeKind) {
if self.kind == NodeKind::Unknown {
self.kind = kind;
} else {
assert!(self.kind == kind);
}
}
fn started(&mut self, data: &str) {
let mut parts = data.split(';');
assert!(parts.next().unwrap() == "started");
match parts.next().unwrap() {
"safekeeper" => {
self.init_kind(NodeKind::Safekeeper);
}
"walproposer" => {
self.init_kind(NodeKind::WalProposer);
let is_sync: u8 = parts.next().unwrap().parse().unwrap();
self.is_sync = is_sync != 0;
}
_ => unreachable!(),
}
}
}
#[derive(Debug,Default)]
struct GlobalState {
nodes: Vec<NodeInfo>,
commit_lsn: u64,
write_lsn: u64,
max_write_lsn: u64,
written_wal: u64,
written_records: u64,
}
impl GlobalState {
fn new() -> Self {
Default::default()
}
fn get(&mut self, id: u32) -> &mut NodeInfo {
let id = id as usize;
if id >= self.nodes.len() {
self.nodes.resize(id + 1, NodeInfo::default());
}
&mut self.nodes[id]
}
}
pub fn validate_events(events: Vec<SEvent>) {
const INITDB_LSN: u64 = 21623024;
let hook = std::panic::take_hook();
scopeguard::defer_on_success! {
std::panic::set_hook(hook);
};
let mut state = GlobalState::new();
state.max_write_lsn = INITDB_LSN;
for event in events {
debug!("{:?}", event);
let node = state.get(event.node);
if event.data.starts_with("started;") {
node.started(&event.data);
continue;
}
assert!(node.kind != NodeKind::Unknown);
// drop reference to unlock state
let mut node = node.clone();
let mut parts = event.data.split(';');
match node.kind {
NodeKind::Safekeeper => {
match parts.next().unwrap() {
"tli_loaded" => {
let flush_lsn: u64 = parts.next().unwrap().parse().unwrap();
let commit_lsn: u64 = parts.next().unwrap().parse().unwrap();
node.flush_lsn = flush_lsn;
node.commit_lsn = commit_lsn;
}
_ => unreachable!(),
}
}
NodeKind::WalProposer => {
match parts.next().unwrap() {
"prop_elected" => {
let prop_lsn: u64 = parts.next().unwrap().parse().unwrap();
let prop_term: u64 = parts.next().unwrap().parse().unwrap();
let prev_lsn: u64 = parts.next().unwrap().parse().unwrap();
let prev_term: u64 = parts.next().unwrap().parse().unwrap();
assert!(prop_lsn >= prev_lsn);
assert!(prop_term >= prev_term);
assert!(prop_lsn >= state.commit_lsn);
if prop_lsn > state.write_lsn {
assert!(prop_lsn <= state.max_write_lsn);
debug!("moving write_lsn up from {} to {}", state.write_lsn, prop_lsn);
state.write_lsn = prop_lsn;
}
if prop_lsn < state.write_lsn {
debug!("moving write_lsn down from {} to {}", state.write_lsn, prop_lsn);
state.write_lsn = prop_lsn;
}
node.epoch_lsn = prop_lsn;
node.term = prop_term;
}
"write_wal" => {
assert!(!node.is_sync);
let start_lsn: u64 = parts.next().unwrap().parse().unwrap();
let end_lsn: u64 = parts.next().unwrap().parse().unwrap();
let cnt: u64 = parts.next().unwrap().parse().unwrap();
let size = end_lsn - start_lsn;
state.written_wal += size;
state.written_records += cnt;
// TODO: If we allow writing WAL before winning the election
assert!(start_lsn >= state.commit_lsn);
assert!(end_lsn >= start_lsn);
assert!(start_lsn == state.write_lsn);
state.write_lsn = end_lsn;
if end_lsn > state.max_write_lsn {
state.max_write_lsn = end_lsn;
}
}
"commit_lsn" => {
let lsn: u64 = parts.next().unwrap().parse().unwrap();
assert!(lsn >= state.commit_lsn);
state.commit_lsn = lsn;
}
_ => unreachable!(),
}
}
_ => unreachable!(),
}
// update the node in the state struct
*state.get(event.node) = node;
}
}

View File

@@ -20,7 +20,7 @@ use crate::{
simtest::{
log::{init_logger, SimClock},
safekeeper::run_server,
util::{generate_schedule, TestConfig, generate_network_opts},
util::{generate_schedule, TestConfig, generate_network_opts, validate_events},
}, enable_debug,
};
@@ -60,9 +60,8 @@ fn run_walproposer_generate_wal() {
test.poll_for_duration(30);
for i in 0..100 {
wp.write_tx();
wp.write_tx(1);
test.poll_for_duration(5);
wp.update();
}
}
@@ -80,19 +79,13 @@ fn crash_safekeeper() {
let mut wp = test.launch_walproposer(lsn);
test.poll_for_duration(30);
wp.update();
wp.write_tx();
wp.write_tx();
wp.write_tx();
wp.write_tx(3);
test.servers[0].restart();
test.poll_for_duration(100);
wp.update();
test.poll_for_duration(1000);
wp.update();
}
#[test]
@@ -109,13 +102,9 @@ fn test_simple_restart() {
let mut wp = test.launch_walproposer(lsn);
test.poll_for_duration(30);
wp.update();
wp.write_tx();
wp.write_tx();
wp.write_tx();
wp.write_tx(3);
test.poll_for_duration(100);
wp.update();
wp.stop();
drop(wp);
@@ -207,8 +196,8 @@ fn test_random_schedules() -> anyhow::Result<()> {
warn!("Running test with seed {}", seed);
let schedule = generate_schedule(seed);
test.run_schedule(&schedule)?;
test.run_schedule(&schedule).unwrap();
validate_events(test.world.take_events());
test.world.deallocate();
}
@@ -231,7 +220,7 @@ fn test_one_schedule() -> anyhow::Result<()> {
// test.run_schedule(&schedule)?;
// test.world.deallocate();
let seed = 11245530003696902397;
let seed = 3649773280641776194;
config.network = generate_network_opts(seed);
info!("network: {:?}", config.network);
let test = config.start(seed);
@@ -240,6 +229,7 @@ fn test_one_schedule() -> anyhow::Result<()> {
let schedule = generate_schedule(seed);
info!("schedule: {:?}", schedule);
test.run_schedule(&schedule).unwrap();
validate_events(test.world.take_events());
test.world.deallocate();
Ok(())

View File

@@ -370,6 +370,8 @@ void WalProposerRust()
InitMyInsert();
sim_log("started;walproposer;%d", (int) syncSafekeepers);
#if PG_VERSION_NUM < 150000
ThisTimeLineID = 1;
#endif
@@ -459,6 +461,8 @@ WalProposerBroadcast(XLogRecPtr startpos, XLogRecPtr endpos)
}
#ifdef SIMLIB
XLogRecPtr MyInsertRecord();
int
SimWaitEventSetWait(Safekeeper **sk, long timeout, WaitEvent *occurred_events)
{
@@ -484,9 +488,28 @@ SimWaitEventSetWait(Safekeeper **sk, long timeout, WaitEvent *occurred_events)
}
}
walprop_log(FATAL, "unknown tcp connection");
} else if (event.tag == Internal && event.any_message == LSN) {
} else if (event.tag == Internal && event.any_message == Just32) {
uint32_t tx_count;
XLogRecPtr start_lsn = sim_latest_available_lsn;
XLogRecPtr finish_lsn = sim_latest_available_lsn;
Assert(!syncSafekeepers);
sim_epoll_rcv(0);
sim_msg_get_lsn(&sim_latest_available_lsn);
sim_msg_get_just_u32(&tx_count);
// don't write WAL before winning the election
if (propEpochStartLsn != 0)
{
for (uint32_t i = 0; i < tx_count; i++)
{
finish_lsn = MyInsertRecord();
}
sim_log("write_wal;%lu;%lu;%d", start_lsn, finish_lsn, (int) tx_count);
sim_latest_available_lsn = finish_lsn;
}
*occurred_events = (WaitEvent) {
.events = WL_LATCH_SET,
};
@@ -1548,7 +1571,16 @@ DetermineEpochStartLsn(void)
safekeeper[donor].host, safekeeper[donor].port,
LSN_FORMAT_ARGS(truncateLsn));
sim_log("prop_elected;%lu", propEpochStartLsn);
{
XLogRecPtr prev_lsn = 0;
term_t prev_term = 0;
if (propTermHistory.n_entries > 1)
{
prev_lsn = propTermHistory.entries[propTermHistory.n_entries - 2].lsn;
prev_term = propTermHistory.entries[propTermHistory.n_entries - 2].term;
}
sim_log("prop_elected;%lu;%lu;%lu;%lu", propEpochStartLsn, propTerm, prev_lsn, prev_term);
}
/*
* Ensure the basebackup we are running (at RedoStartLsn) matches LSN

View File

@@ -5,7 +5,7 @@ use std::{
panic::AssertUnwindSafe,
sync::{atomic::AtomicU64, Arc},
};
use tracing::{debug, info};
use tracing::{debug, info, trace};
use super::{
chan::Chan,
@@ -298,7 +298,7 @@ impl World {
std::mem::swap(&mut connections, &mut self.connections.lock());
for conn in connections {
conn.deallocate();
debug!("conn strong count: {}", Arc::strong_count(&conn));
trace!("conn strong count: {}", Arc::strong_count(&conn));
}
let mut nodes = Vec::new();
@@ -313,7 +313,7 @@ impl World {
for weak_ptr in weak_ptrs {
let node = weak_ptr.upgrade();
if node.is_none() {
debug!("node is already deallocated");
trace!("node is already deallocated");
continue;
}
let node = node.unwrap();