Files
neon/safekeeper/tests/walproposer_sim/safekeeper_disk.rs
Arthur Petukhovsky 4be2223a4c Discrete event simulation for safekeepers (#5804)
This PR contains the first version of a
[FoundationDB-like](https://www.youtube.com/watch?v=4fFDFbi3toc)
simulation testing for safekeeper and walproposer.

### desim

This is a core "framework" for running determenistic simulation. It
operates on threads, allowing to test syncronous code (like walproposer).

`libs/desim/src/executor.rs` contains implementation of a determenistic
thread execution. This is achieved by blocking all threads, and each
time allowing only a single thread to make an execution step. All
executor's threads are blocked using `yield_me(after_ms)` function. This
function is called when a thread wants to sleep or wait for an external
notification (like blocking on a channel until it has a ready message).

`libs/desim/src/chan.rs` contains implementation of a channel (basic
sync primitive). It has unlimited capacity and any thread can push or
read messages to/from it.

`libs/desim/src/network.rs` has a very naive implementation of a network
(only reliable TCP-like connections are supported for now), that can
have arbitrary delays for each package and failure injections for
breaking connections with some probability.

`libs/desim/src/world.rs` ties everything together, to have a concept of
virtual nodes that can have network connections between them.

### walproposer_sim

Has everything to run walproposer and safekeepers in a simulation.

`safekeeper.rs` reimplements all necesary stuff from `receive_wal.rs`,
`send_wal.rs` and `timelines_global_map.rs`.

`walproposer_api.rs` implements all walproposer callback to use
simulation library.

`simulation.rs` defines a schedule – a set of events like `restart <sk>`
or `write_wal` that should happen at time `<ts>`. It also has code to
spawn walproposer/safekeeper threads and provide config to them.

### tests

`simple_test.rs` has tests that just start walproposer and 3 safekeepers
together in a simulation, and tests that they are not crashing right
away.

`misc_test.rs` has tests checking more advanced simulation cases, like
crashing or restarting threads, testing memory deallocation, etc.

`random_test.rs` is the main test, it checks thousands of random seeds
(schedules) for correctness. It roughly corresponds to running a real
python integration test in an environment with very unstable network and
cpu, but in a determenistic way (each seed results in the same execution
log) and much much faster.

Closes #547

---------

Co-authored-by: Arseny Sher <sher-ars@yandex.ru>
2024-02-12 20:29:57 +00:00

279 lines
8.2 KiB
Rust

use std::collections::HashMap;
use std::sync::Arc;
use parking_lot::Mutex;
use safekeeper::state::TimelinePersistentState;
use utils::id::TenantTimelineId;
use super::block_storage::BlockStorage;
use std::{ops::Deref, time::Instant};
use anyhow::Result;
use bytes::{Buf, BytesMut};
use futures::future::BoxFuture;
use postgres_ffi::{waldecoder::WalStreamDecoder, XLogSegNo};
use safekeeper::{control_file, metrics::WalStorageMetrics, wal_storage};
use tracing::{debug, info};
use utils::lsn::Lsn;
/// All safekeeper state that is usually saved to disk.
pub struct SafekeeperDisk {
pub timelines: Mutex<HashMap<TenantTimelineId, Arc<TimelineDisk>>>,
}
impl Default for SafekeeperDisk {
fn default() -> Self {
Self::new()
}
}
impl SafekeeperDisk {
pub fn new() -> Self {
SafekeeperDisk {
timelines: Mutex::new(HashMap::new()),
}
}
pub fn put_state(
&self,
ttid: &TenantTimelineId,
state: TimelinePersistentState,
) -> Arc<TimelineDisk> {
self.timelines
.lock()
.entry(*ttid)
.and_modify(|e| {
let mut mu = e.state.lock();
*mu = state.clone();
})
.or_insert_with(|| {
Arc::new(TimelineDisk {
state: Mutex::new(state),
wal: Mutex::new(BlockStorage::new()),
})
})
.clone()
}
}
/// Control file state and WAL storage.
pub struct TimelineDisk {
pub state: Mutex<TimelinePersistentState>,
pub wal: Mutex<BlockStorage>,
}
/// Implementation of `control_file::Storage` trait.
pub struct DiskStateStorage {
persisted_state: TimelinePersistentState,
disk: Arc<TimelineDisk>,
last_persist_at: Instant,
}
impl DiskStateStorage {
pub fn new(disk: Arc<TimelineDisk>) -> Self {
let guard = disk.state.lock();
let state = guard.clone();
drop(guard);
DiskStateStorage {
persisted_state: state,
disk,
last_persist_at: Instant::now(),
}
}
}
#[async_trait::async_trait]
impl control_file::Storage for DiskStateStorage {
/// Persist safekeeper state on disk and update internal state.
async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
self.persisted_state = s.clone();
*self.disk.state.lock() = s.clone();
Ok(())
}
/// Timestamp of last persist.
fn last_persist_at(&self) -> Instant {
// TODO: don't rely on it in tests
self.last_persist_at
}
}
impl Deref for DiskStateStorage {
type Target = TimelinePersistentState;
fn deref(&self) -> &Self::Target {
&self.persisted_state
}
}
/// Implementation of `wal_storage::Storage` trait.
pub struct DiskWALStorage {
/// Written to disk, but possibly still in the cache and not fully persisted.
/// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
write_lsn: Lsn,
/// The LSN of the last WAL record written to disk. Still can be not fully flushed.
write_record_lsn: Lsn,
/// The LSN of the last WAL record flushed to disk.
flush_record_lsn: Lsn,
/// Decoder is required for detecting boundaries of WAL records.
decoder: WalStreamDecoder,
/// Bytes of WAL records that are not yet written to disk.
unflushed_bytes: BytesMut,
/// Contains BlockStorage for WAL.
disk: Arc<TimelineDisk>,
}
impl DiskWALStorage {
pub fn new(disk: Arc<TimelineDisk>, state: &TimelinePersistentState) -> Result<Self> {
let write_lsn = if state.commit_lsn == Lsn(0) {
Lsn(0)
} else {
Self::find_end_of_wal(disk.clone(), state.commit_lsn)?
};
let flush_lsn = write_lsn;
Ok(DiskWALStorage {
write_lsn,
write_record_lsn: flush_lsn,
flush_record_lsn: flush_lsn,
decoder: WalStreamDecoder::new(flush_lsn, 16),
unflushed_bytes: BytesMut::new(),
disk,
})
}
fn find_end_of_wal(disk: Arc<TimelineDisk>, start_lsn: Lsn) -> Result<Lsn> {
let mut buf = [0; 8192];
let mut pos = start_lsn.0;
let mut decoder = WalStreamDecoder::new(start_lsn, 16);
let mut result = start_lsn;
loop {
disk.wal.lock().read(pos, &mut buf);
pos += buf.len() as u64;
decoder.feed_bytes(&buf);
loop {
match decoder.poll_decode() {
Ok(Some(record)) => result = record.0,
Err(e) => {
debug!(
"find_end_of_wal reached end at {:?}, decode error: {:?}",
result, e
);
return Ok(result);
}
Ok(None) => break, // need more data
}
}
}
}
}
#[async_trait::async_trait]
impl wal_storage::Storage for DiskWALStorage {
/// LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn {
self.flush_record_lsn
}
/// Write piece of WAL from buf to disk, but not necessarily sync it.
async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
if self.write_lsn != startpos {
panic!("write_wal called with wrong startpos");
}
self.unflushed_bytes.extend_from_slice(buf);
self.write_lsn += buf.len() as u64;
if self.decoder.available() != startpos {
info!(
"restart decoder from {} to {}",
self.decoder.available(),
startpos,
);
self.decoder = WalStreamDecoder::new(startpos, 16);
}
self.decoder.feed_bytes(buf);
loop {
match self.decoder.poll_decode()? {
None => break, // no full record yet
Some((lsn, _rec)) => {
self.write_record_lsn = lsn;
}
}
}
Ok(())
}
/// Truncate WAL at specified LSN, which must be the end of WAL record.
async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
if self.write_lsn != Lsn(0) && end_pos > self.write_lsn {
panic!(
"truncate_wal called on non-written WAL, write_lsn={}, end_pos={}",
self.write_lsn, end_pos
);
}
self.flush_wal().await?;
// write zeroes to disk from end_pos until self.write_lsn
let buf = [0; 8192];
let mut pos = end_pos.0;
while pos < self.write_lsn.0 {
self.disk.wal.lock().write(pos, &buf);
pos += buf.len() as u64;
}
self.write_lsn = end_pos;
self.write_record_lsn = end_pos;
self.flush_record_lsn = end_pos;
self.unflushed_bytes.clear();
self.decoder = WalStreamDecoder::new(end_pos, 16);
Ok(())
}
/// Durably store WAL on disk, up to the last written WAL record.
async fn flush_wal(&mut self) -> Result<()> {
if self.flush_record_lsn == self.write_record_lsn {
// no need to do extra flush
return Ok(());
}
let num_bytes = self.write_record_lsn.0 - self.flush_record_lsn.0;
self.disk.wal.lock().write(
self.flush_record_lsn.0,
&self.unflushed_bytes[..num_bytes as usize],
);
self.unflushed_bytes.advance(num_bytes as usize);
self.flush_record_lsn = self.write_record_lsn;
Ok(())
}
/// Remove all segments <= given segno. Returns function doing that as we
/// want to perform it without timeline lock.
fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
Box::pin(async move { Ok(()) })
}
/// Release resources associated with the storage -- technically, close FDs.
/// Currently we don't remove timelines until restart (#3146), so need to
/// spare descriptors. This would be useful for temporary tli detach as
/// well.
fn close(&mut self) {}
/// Get metrics for this timeline.
fn get_metrics(&self) -> WalStorageMetrics {
WalStorageMetrics::default()
}
}