Compare commits

...

20 Commits

Author SHA1 Message Date
Christian Schwarz
81d715b187 another rename 2023-06-07 16:27:53 +02:00
Christian Schwarz
0afd20068b title 2023-06-07 16:27:53 +02:00
Christian Schwarz
f3d7bf9e09 rename the crate and fix some compile errors that I missed a while back 2023-06-07 16:27:53 +02:00
Christian Schwarz
748c06cff8 docs improvements 2023-06-07 16:27:53 +02:00
Christian Schwarz
0d82862d55 generic GetReconstructPathError 2023-06-07 16:27:53 +02:00
Christian Schwarz
f2bd71d0a8 more docs 2023-06-07 16:27:53 +02:00
Christian Schwarz
de9521214d all-in on immutable + better crate comment 2023-06-07 16:27:53 +02:00
Christian Schwarz
8d9207040f WIP doc comments (more aspirational than reality) 2023-06-07 16:27:53 +02:00
Christian Schwarz
8e57d95026 completely immutable variant of the design 2023-06-07 16:27:53 +02:00
Christian Schwarz
6c71fc6646 struct type for in-memory layer put failure 2023-06-07 16:27:53 +02:00
Christian Schwarz
e6a36b5236 unused PutError accessors 2023-06-07 16:27:53 +02:00
Christian Schwarz
56f57172dd move code around to prep for alternative impl 2023-06-07 16:27:53 +02:00
Christian Schwarz
74ad719ede fix most unused variable warnings 2023-06-07 16:27:53 +02:00
Christian Schwarz
5570384672 WIP 2023-06-07 16:27:53 +02:00
Christian Schwarz
321e74b5ee implement support for non-blocking SeqWait::wait_for_timeout and use it 2023-06-07 16:27:53 +02:00
Christian Schwarz
712a516a2f switch to a Types trait to declutter the generics 2023-06-07 16:27:53 +02:00
Christian Schwarz
be5ba04dca rename trait to HistoricStuff 2023-06-07 16:27:53 +02:00
Christian Schwarz
b4b1292e15 WIP version of fully generic layer map built atop the new seqwait 2023-06-07 16:27:53 +02:00
Christian Schwarz
9b992c621d seqwait: handle wait_for_timeout() with a zero duration efficiently 2023-06-07 16:27:49 +02:00
Christian Schwarz
1fa17ed486 seqwait: add support for communicating immutable data between advance and wait
The long-term plan is to make LayerMap an immutable data structure that is
multi-versioned. Meaning, we will not modify LayerMap in place but create
a (cheap) copy and modify that copy. Once we're done making modifications,
we make the copy available to readers through the SeqWait.

The modifications will be made by a _single_ task, the pageserver actor.
But _many_ readers can wait_for & use same or multiple versions of the LayerMap.

So, there's a new method `split_spmc` that splits up a `SeqWait` into a
not-clonable producer (Advance) and a clonable consumer (Wait).

(SeqWait itself is mpmc, but, for the actor architecture, it makes sense
 to enforce spmc in the type system)

 # Please enter the commit message for your changes. Lines starting
2023-06-07 16:26:48 +02:00
7 changed files with 856 additions and 46 deletions

11
Cargo.lock generated
View File

@@ -3991,6 +3991,16 @@ dependencies = [
"time-core",
]
[[package]]
name = "timeline_data_path"
version = "0.1.0"
dependencies = [
"thiserror",
"tokio",
"utils",
"workspace_hack",
]
[[package]]
name = "tinytemplate"
version = "1.2.1"
@@ -4564,6 +4574,7 @@ dependencies = [
"byteorder",
"bytes",
"criterion",
"either",
"futures",
"heapless",
"hex",

View File

@@ -0,0 +1,13 @@
[package]
name = "timeline_data_path"
version = "0.1.0"
edition.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
utils.workspace = true
workspace_hack.workspace = true
tokio.workspace = true
thiserror.workspace = true

View File

@@ -0,0 +1,396 @@
//! The Timeline's core data path.
//!
//! # Overview
//!
//! This crate implements the core data path of a Timeline inside Pageserver:
//!
//! 1. WAL records from `walreceiver`, via in-memory layers, into persistent L0 layers.
//! 1. `GetPage@LSN`: retrieval of WAL records and page images for feeding into WAL redo.
//! 1. Data re-shuffeling through compaction (TODO).
//! 1. Page image creation & garbage collection through GC (TODO).
//!
//! This crate assumes the following concepts, but is fully generic over their implementation:
//!
//! - **Delta Records**: data is written into the system in the form of self-descriptive deltas.
//! For the Pageserver use case, these deltas are derived from Postgres WAL records.
//! - **Page Numbers**: Delta Records always affect a single key.
//! That key is called page number, because, in the Pageserver use case, the Postgres table page numbers are the keys.
//! - **LSN**: When writing Delta Records into the system, they are associated with a monotonically increasing LSN.
//! Subsequently written Delta Records must have increasing LSNs.
//! - **Page Images**: Delta Records for a given page can be used to reconstruct the page. Think of it like squashing diffs.
//! - When sorting the Delta Records for a given key by their LSN, any prefix of that sorting can be squashed into a page image.
//! - Delta Records following such a squash can be squashed into that page image.
//! - In Pageserver, WAL redo implements the (pure) function of squashing.
//! - **In-Memory Layer**: an object that represents an "unfinished" L0 layer file, holding Delta Records in insertion order.
//! "Unfinished" means that we're still writing Delta Records to that file.
//! - **Historic Layer**: an object that represents a "finished" layer file, at any compaction level.
//! Such objects reside on disk and/or in remote storage.
//! They may contain Delta Records, Page Images, or a mixture thereof. It doesn't matter.
//! - **HistoricStuff**: an efficient lookup data structure to find the list of Historic Layer objects
//! that hold the Delta Records / PageImages required to reconstruct a Page Image at a given LSN.
//!
//! # API
//!
//! The core idea is that of a specialized single-producer multi-consumer structure,
//! embodied by a Read-end and a Write-end.
//!
//! The Write-end is used to push new `DeltaRecord @ LSN`s into the system.
//! In Pageserver, this is used by the `WalReceiver`.
//!
//! The Read-end provides the `GetPage@LSN` API.
//! In the current iteration, we actually return something called `ReconstructWork`.
//! I.e., we leave the work of reading the values from the layers, and the WAL redo invocation to the caller.
//! Find rationale for this design in the *Scope* section.
//!
//! ## Immutability
//!
//! The traits defined by this crate assume immutable data structures that are multi-versioned.
//!
//! As an example for what "immutable" means, take the case where we add a new Historic Layer to HistoricStuff.
//! Traditionally, one would use shared mutable state, i.e. `Arc<RwLock<...>>`.
//! To insert the new Historic Layer, we would acquire the RwLock in write mode and modify a lookup data structure to accomodate the new layer.
//! The Read-ends would use RwLock in read mode to read from the data structure.
//!
//! Conversely, with *immutable data structures*, writers create new version (aka *snapshots*) of the lookup data structure.
//! New reads on the Read-ends will use the new snapshot, but old ongoing reads would use the old version(s).
//! An efficient implementation would likely share the Historic Layer objects, e.g., using `Arc`.
//! And maybe there's internally mutable state inside the layer objects, e.g., to track residence (i.e., *on-demand downloaded* vs *evicted*).
//! But the important point is that there's no synchronization / lock-holding at any higher level, except when grabbing a reference to the snapshot (Read-end), or when publishing a new snapshot (Write-end).
//!
//! ## Scope
//!
//! The following concerns are considered implementation details from the perspective of this crate:
//!
//! - **Layer File Persistence**: `HistoricStuff::make_historic` is responsible for this.
//! - **Reading Layer Files**: the `ReconstructWork` that the Read-end returns from `GetPage@LSN` requests contains the list of layers to consult.
//! The crate consumer is responsible for reading the layers & doing WAL redo.
//! Likely the implementation of `HistoricStuff` plays a role here, because it is responsible for persisting the layer files.
//! - **Layer Eviction & On-Demand Download**: this is just an aspect of the above.
//! The crate consumer can choose to implement eviction & on-demand download however they wish.
//! The only requirement is that the Historic Layers don't change their contents, i.e., they always returnt he same reconstruct values for the same lookup.
//! - For example, a `LayerCache` modoule or service could take care of layer uploads, eviction, and on-demand downloads.
//! Initially, the `layer cache` can be local-only.
//! But in the future, it can be multi-machine / clustered pagesevers / aka "sharding".
//!
//! # Example
//!
//! The [`new`] function is the entrypoint to this crate.
//!
//! See the test cases for how it is used.
use std::{marker::PhantomData, time::Duration};
use utils::seqwait::{self, Advance, SeqWait, Wait};
#[cfg(test)]
mod tests;
/// Collection of types / type bounds used by Read-end and Write-end.
///
/// See the [`crate`]-level docs's *Concepts* section to learn about
/// the meaning of each associated `type`.
///
/// # Usage
///
/// Define a zero-sized-type and impl this Trait for it.
/// Then use that zero-sized-type as the single generic argument to [`new`]
/// and almost all types declared in this crate.
///
/// It might feel a bit weird, but, the alternative is to have umpteen generic
/// types per `impl` with repetitive trait bounds.
///
/// Search the test cases for an example of how this can be used to improve testability.
pub trait Types {
type Key: Copy;
type Lsn: Ord + Copy;
type LsnCounter: seqwait::MonotonicCounter<Self::Lsn> + Copy;
type DeltaRecord;
type HistoricLayer;
type InMemoryLayer: InMemoryLayer<Types = Self> + Clone;
type HistoricStuff: HistoricStuff<Types = Self> + Clone;
type GetReconstructPathError: std::error::Error;
}
/// Error returned by [`InMemoryLayer::put`].
#[derive(thiserror::Error)]
pub struct InMemoryLayerPutError<DeltaRecord> {
delta: DeltaRecord,
kind: InMemoryLayerPutErrorKind,
}
/// Part of [`InMemoryLayerPutError`].
#[derive(Debug)]
pub enum InMemoryLayerPutErrorKind {
LayerFull,
AlreadyHaveRecordForKeyAndLsn,
}
impl<DeltaRecord> std::fmt::Debug for InMemoryLayerPutError<DeltaRecord> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InMemoryLayerPutError")
// would require DeltaRecord to impl Debug
// .field("delta", &self.delta)
.field("kind", &self.kind)
.finish()
}
}
/// An in-memory layer. See [`crate`] docs for details on this concept.
pub trait InMemoryLayer: std::fmt::Debug + Default + Clone {
type Types: Types;
fn put(
&mut self,
key: <Self::Types as Types>::Key,
lsn: <Self::Types as Types>::Lsn,
delta: <Self::Types as Types>::DeltaRecord,
) -> Result<Self, InMemoryLayerPutError<<Self::Types as Types>::DeltaRecord>>;
fn get(
&self,
key: <Self::Types as Types>::Key,
lsn: <Self::Types as Types>::Lsn,
) -> Vec<<Self::Types as Types>::DeltaRecord>;
}
/// The manager of [`Types::HistoricLayer`]s.
pub trait HistoricStuff {
type Types: Types;
fn get_reconstruct_path(
&self,
key: <Self::Types as Types>::Key,
lsn: <Self::Types as Types>::Lsn,
) -> Result<
Vec<<Self::Types as Types>::HistoricLayer>,
<Self::Types as Types>::GetReconstructPathError,
>;
/// Produce a new version of `self` that includes the given inmem layer.
fn make_historic(&self, inmem: <Self::Types as Types>::InMemoryLayer) -> Self;
}
/// A snapshot of the data. See [`crate`]-level docs section on *immutability* for details.
struct Snapshot<T: Types> {
_types: PhantomData<T>,
inmem: Option<T::InMemoryLayer>,
historic: T::HistoricStuff,
}
impl<T: Types> Clone for Snapshot<T> {
fn clone(&self) -> Self {
Self {
_types: self._types.clone(),
inmem: self.inmem.clone(),
historic: self.historic.clone(),
}
}
}
/// The Read-end. See [`crate`]-level docs for details.
pub struct Reader<T: Types> {
wait: Wait<T::LsnCounter, T::Lsn, Snapshot<T>>,
}
/// The Write-end. See [`crate`]-level docs for details.
pub struct Writer<T: Types> {
advance: Advance<T::LsnCounter, T::Lsn, Snapshot<T>>,
}
/// Setup a pair of Read-end and Write-End. This is the entrypoint to this crate.
///
/// The idea is that the caller loads the arguments from persistent state that `HistoricStuff` wrote at an earlier point in time.
pub fn new<T: Types>(lsn: T::LsnCounter, historic: T::HistoricStuff) -> (Reader<T>, Writer<T>) {
let state = Snapshot {
_types: PhantomData::<T>::default(),
inmem: None,
historic: historic,
};
let (wait, advance) = SeqWait::new(lsn, state).split_spmc();
let reader = Reader { wait };
let read_writer = Writer { advance };
(reader, read_writer)
}
/// Error returned by the get-page operations.
#[derive(Debug, thiserror::Error)]
pub enum GetError<T: Types> {
#[error(transparent)]
SeqWait(seqwait::SeqWaitError),
#[error(transparent)]
GetReconstructPath(T::GetReconstructPathError),
}
/// Self-contained set of objects required to reconstruct a page image for the given `key` @ `lsn`.
///
/// This is returned by the `get` methods of [`Reader`] and [`Writer`].
///
/// To reconstruct the page image, stack up (top to bottom) `inmem_records` plus all records found for `key` and `lsn` along the `historic_path` until an initial page image is found.
/// Then feed that stack to WAL-redo to get the page image.
///
/// See [`crate`]-level docs on *scope* for why we don't return page images from these functions.
pub struct ReconstructWork<T: Types> {
pub key: T::Key,
pub lsn: T::Lsn,
pub inmem_records: Vec<T::DeltaRecord>,
pub historic_path: Vec<T::HistoricLayer>,
}
impl<T: Types> Reader<T> {
/// This is the `GetPage@LSN` operation.
///
/// See the [`crate`]-level docs for why we return [`ReconstructWork`] instead of a Page Image here.
pub async fn get(&self, key: T::Key, lsn: T::Lsn) -> Result<ReconstructWork<T>, GetError<T>> {
// XXX dedup with Writer::get_nowait
let state = self.wait.wait_for(lsn).await.map_err(GetError::SeqWait)?;
let inmem_records = state
.inmem
.as_ref()
.map(|iml| iml.get(key, lsn))
.unwrap_or_default();
let historic_path = state
.historic
.get_reconstruct_path(key, lsn)
.map_err(GetError::GetReconstructPath)?;
Ok(ReconstructWork {
key,
lsn,
inmem_records,
historic_path,
})
}
}
/// Error returned by the `put` operation.
#[derive(thiserror::Error)]
pub struct PutError<T: Types> {
/// The `delta` record which we failed to `put`.
pub delta: T::DeltaRecord,
/// Description of what went wrong.
pub kind: PutErrorKind,
}
/// Part of [`PutError`].
#[derive(Debug)]
pub enum PutErrorKind {
AlreadyHaveInMemoryRecordForKeyAndLsn,
}
impl<T: Types> std::fmt::Debug for PutError<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PutError")
// would need to require Debug for DeltaRecord
// .field("delta", &self.delta)
.field("kind", &self.kind)
.finish()
}
}
impl<T: Types> Writer<T> {
/// Insert data into the system.
pub async fn put(
&mut self,
key: T::Key,
lsn: T::Lsn,
delta: T::DeltaRecord,
) -> Result<(), PutError<T>> {
let (_snapshot_lsn, snapshot) = self.advance.get_current_data();
// TODO ensure snapshot_lsn <= lsn?
let mut inmem = snapshot
.inmem
.unwrap_or_else(|| T::InMemoryLayer::default());
// XXX: use the Advance as witness and only allow witness to access inmem in write mode
match inmem.put(key, lsn, delta) {
Ok(new_inmem) => {
let new_snapshot = Snapshot {
_types: PhantomData,
inmem: Some(new_inmem),
historic: snapshot.historic,
};
self.advance.advance(lsn, Some(new_snapshot));
}
Err(InMemoryLayerPutError {
delta,
kind: InMemoryLayerPutErrorKind::AlreadyHaveRecordForKeyAndLsn,
}) => {
return Err(PutError {
delta,
kind: PutErrorKind::AlreadyHaveInMemoryRecordForKeyAndLsn,
});
}
Err(InMemoryLayerPutError {
delta,
kind: InMemoryLayerPutErrorKind::LayerFull,
}) => {
let new_historic = snapshot.historic.make_historic(inmem);
let mut new_inmem = T::InMemoryLayer::default();
let new_inmem = new_inmem
.put(key, lsn, delta)
.expect("put into default inmem layer must not fail");
let new_state = Snapshot {
_types: PhantomData::<T>::default(),
inmem: Some(new_inmem),
historic: new_historic,
};
self.advance.advance(lsn, Some(new_state));
}
}
Ok(())
}
/// Force flushing of the current in-memory layer.
///
/// Usually, flushing happens only if the in-memory layer is full.
/// Use this API to make it happen in other circumstances (shutdown, periodic ticker, etc.).
pub async fn force_flush(&mut self) -> tokio::io::Result<()> {
let (snapshot_lsn, snapshot) = self.advance.get_current_data();
let Snapshot {
_types,
inmem,
historic,
} = snapshot;
// XXX: use the Advance as witness and only allow witness to access inmem in "write" mode
let Some(inmem) = inmem else {
// nothing to do
return Ok(());
};
let new_historic = historic.make_historic(inmem);
let new_snapshot = Snapshot {
_types: PhantomData::<T>::default(),
inmem: None,
historic: new_historic,
};
self.advance.advance(snapshot_lsn, Some(new_snapshot)); // TODO: should fail if we're past snapshot_lsn
Ok(())
}
/// `get` at the given LSN, without blocking.
///
/// Fails with a timeout error if the `lsn` isn't there yet.
/// That makes sense because the only way we'd stop waiting is by a `self.put()`.
/// But concurrent `put()` is forbidden.
pub async fn get_nowait(
&self,
key: T::Key,
lsn: T::Lsn,
) -> Result<ReconstructWork<T>, GetError<T>> {
// XXX dedup with Reader::get
let state = self
.advance
.wait_for_timeout(lsn, Duration::from_secs(0))
// The await is never going to block because we pass from_secs(0).
.await
.map_err(GetError::SeqWait)?;
let inmem_records = state
.inmem
.as_ref()
.map(|iml| iml.get(key, lsn))
.unwrap_or_default();
let historic_path = state
.historic
.get_reconstruct_path(key, lsn)
.map_err(GetError::GetReconstructPath)?;
Ok(ReconstructWork {
key,
lsn,
inmem_records,
historic_path,
})
}
}

View File

@@ -0,0 +1,170 @@
use std::collections::{btree_map::Entry, BTreeMap};
use std::sync::Arc;
use utils::seqwait;
/// The ZST for which we impl the `super::Types` type collection trait.
struct TestTypes;
impl super::Types for TestTypes {
type Key = usize;
type Lsn = usize;
type LsnCounter = UsizeCounter;
type DeltaRecord = &'static str;
type HistoricLayer = Arc<TestHistoricLayer>;
type InMemoryLayer = TestInMemoryLayer;
type HistoricStuff = TestHistoricStuff;
}
/// For testing, our in-memory layer is a simple hashmap.
#[derive(Clone, Default, Debug)]
struct TestInMemoryLayer {
by_key: BTreeMap<usize, BTreeMap<usize, &'static str>>,
}
/// For testing, our historic layers are just in-memory layer objects with `frozen==true`.
struct TestHistoricLayer(TestInMemoryLayer);
/// This is the data structure that impls the `HistoricStuff` trait.
#[derive(Default, Clone)]
struct TestHistoricStuff {
by_key: BTreeMap<usize, BTreeMap<usize, Arc<TestHistoricLayer>>>,
}
/// `seqwait::MonotonicCounter` impl
#[derive(Copy, Clone)]
pub struct UsizeCounter(usize);
// Our testing impl of HistoricStuff references the frozen InMemoryLayer objects
// from all the (key,lsn) entries that it covers.
// This mimics the (much more efficient) search tree in the real impl.
impl super::HistoricStuff for TestHistoricStuff {
type Types = TestTypes;
fn get_reconstruct_path(
&self,
key: usize,
lsn: usize,
) -> Result<Vec<Arc<TestHistoricLayer>>, super::GetReconstructPathError> {
let Some(bk) = self.by_key.get(&key) else {
return Ok(vec![]);
};
Ok(bk.range(..=lsn).rev().map(|(_, l)| Arc::clone(l)).collect())
}
fn make_historic(&self, inmem: TestInMemoryLayer) -> Self {
// For the purposes of testing, just turn the inmemory layer historic through the type system
let historic = Arc::new(TestHistoricLayer(inmem));
// Deep-copy
let mut copy = self.by_key.clone();
// Add the references to `inmem` to the deep-copied struct
for (k, v) in historic.0.by_key.iter() {
for (lsn, _deltas) in v.into_iter() {
let by_key = copy.entry(*k).or_default();
let overwritten = by_key.insert(*lsn, historic.clone());
assert!(matches!(overwritten, None), "layers must not overlap");
}
}
Self { by_key: copy }
}
}
impl super::InMemoryLayer for TestInMemoryLayer {
type Types = TestTypes;
fn put(
&mut self,
key: usize,
lsn: usize,
delta: &'static str,
) -> Result<Self, super::InMemoryLayerPutError<&'static str>> {
let mut clone = self.clone();
drop(self);
let by_key = clone.by_key.entry(key).or_default();
match by_key.entry(lsn) {
Entry::Occupied(_record) => {
return Err(super::InMemoryLayerPutError {
delta,
kind: super::InMemoryLayerPutErrorKind::AlreadyHaveRecordForKeyAndLsn,
});
}
Entry::Vacant(vacant) => vacant.insert(delta),
};
Ok(clone)
}
fn get(&self, key: usize, lsn: usize) -> Vec<&'static str> {
let by_key = match self.by_key.get(&key) {
Some(by_key) => by_key,
None => return vec![],
};
by_key
.range(..=lsn)
.map(|(_, v)| v)
.rev()
.cloned()
.collect()
}
}
impl UsizeCounter {
pub fn new(inital: usize) -> Self {
UsizeCounter(inital)
}
}
impl seqwait::MonotonicCounter<usize> for UsizeCounter {
fn cnt_advance(&mut self, new_val: usize) {
assert!(self.0 < new_val);
self.0 = new_val;
}
fn cnt_value(&self) -> usize {
self.0
}
}
#[test]
fn basic() {
let lm = TestHistoricStuff::default();
let (r, mut rw) = super::new::<TestTypes>(UsizeCounter::new(0), lm);
let r = Arc::new(r);
let r2 = Arc::clone(&r);
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let read_jh = rt.spawn(async move { r.get(0, 10).await });
let mut rw = rt.block_on(async move {
rw.put(0, 1, "foo").await.unwrap();
rw.put(1, 1, "bar").await.unwrap();
rw.put(0, 10, "baz").await.unwrap();
rw
});
let read_res = rt.block_on(read_jh).unwrap().unwrap();
assert!(
read_res.historic_path.is_empty(),
"we have pushed less than needed for flush"
);
assert_eq!(read_res.inmem_records, vec!["baz", "foo"]);
let rw = rt.block_on(async move {
rw.put(0, 11, "blup").await.unwrap();
rw
});
let read_res = rt.block_on(async move { r2.get(0, 11).await.unwrap() });
assert_eq!(read_res.historic_path.len(), 0);
assert_eq!(read_res.inmem_records, vec!["blup", "baz", "foo"]);
drop(rw);
}

View File

@@ -37,6 +37,7 @@ uuid = { version = "1.2", features = ["v4", "serde"] }
metrics.workspace = true
workspace_hack.workspace = true
either.workspace = true
[dev-dependencies]
byteorder.workspace = true

View File

@@ -1,12 +1,13 @@
#![warn(missing_docs)]
use either::Either;
use std::cmp::{Eq, Ordering, PartialOrd};
use std::collections::BinaryHeap;
use std::fmt::Debug;
use std::mem;
use std::sync::Mutex;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::watch::{channel, Receiver, Sender};
use tokio::sync::oneshot::{channel, Receiver, Sender};
use tokio::time::timeout;
/// An error happened while waiting for a number
@@ -36,45 +37,48 @@ pub trait MonotonicCounter<V> {
}
/// Internal components of a `SeqWait`
struct SeqWaitInt<S, V>
struct SeqWaitInt<S, V, T>
where
S: MonotonicCounter<V>,
V: Ord,
T: Clone,
{
waiters: BinaryHeap<Waiter<V>>,
waiters: BinaryHeap<Waiter<V, T>>,
current: S,
shutdown: bool,
data: T,
}
struct Waiter<T>
struct Waiter<V, T>
where
T: Ord,
V: Ord,
T: Clone,
{
wake_num: T, // wake me when this number arrives ...
wake_channel: Sender<()>, // ... by sending a message to this channel
wake_num: V, // wake me when this number arrives ...
wake_channel: Sender<T>, // ... by sending a message to this channel
}
// BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here
// to get that.
impl<T: Ord> PartialOrd for Waiter<T> {
impl<V: Ord, T: Clone> PartialOrd for Waiter<V, T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
other.wake_num.partial_cmp(&self.wake_num)
}
}
impl<T: Ord> Ord for Waiter<T> {
impl<V: Ord, T: Clone> Ord for Waiter<V, T> {
fn cmp(&self, other: &Self) -> Ordering {
other.wake_num.cmp(&self.wake_num)
}
}
impl<T: Ord> PartialEq for Waiter<T> {
impl<V: Ord, T: Clone> PartialEq for Waiter<V, T> {
fn eq(&self, other: &Self) -> bool {
other.wake_num == self.wake_num
}
}
impl<T: Ord> Eq for Waiter<T> {}
impl<V: Ord, T: Clone> Eq for Waiter<V, T> {}
/// A tool for waiting on a sequence number
///
@@ -92,25 +96,28 @@ impl<T: Ord> Eq for Waiter<T> {}
///
/// <S> means Storage, <V> is type of counter that this storage exposes.
///
pub struct SeqWait<S, V>
pub struct SeqWait<S, V, T>
where
S: MonotonicCounter<V>,
V: Ord,
T: Clone,
{
internal: Mutex<SeqWaitInt<S, V>>,
internal: Mutex<SeqWaitInt<S, V, T>>,
}
impl<S, V> SeqWait<S, V>
impl<S, V, T> SeqWait<S, V, T>
where
S: MonotonicCounter<V> + Copy,
V: Ord + Copy,
T: Clone,
{
/// Create a new `SeqWait`, initialized to a particular number
pub fn new(starting_num: S) -> Self {
pub fn new(starting_num: S, data: T) -> Self {
let internal = SeqWaitInt {
waiters: BinaryHeap::new(),
current: starting_num,
shutdown: false,
data,
};
SeqWait {
internal: Mutex::new(internal),
@@ -144,10 +151,13 @@ where
///
/// This call won't complete until someone has called `advance`
/// with a number greater than or equal to the one we're waiting for.
pub async fn wait_for(&self, num: V) -> Result<(), SeqWaitError> {
match self.queue_for_wait(num) {
Ok(None) => Ok(()),
Ok(Some(mut rx)) => rx.changed().await.map_err(|_| SeqWaitError::Shutdown),
pub async fn wait_for(&self, num: V) -> Result<T, SeqWaitError> {
match self.queue_for_wait(num, false) {
Ok(Either::Left(data)) => Ok(data),
Ok(Either::Right(rx)) => match rx.await {
Err(_) => Err(SeqWaitError::Shutdown),
Ok(data) => Ok(data),
},
Err(e) => Err(e),
}
}
@@ -159,15 +169,18 @@ where
///
/// If that hasn't happened after the specified timeout duration,
/// [`SeqWaitError::Timeout`] will be returned.
///
/// Pass `timeout_duration.is_zero() == true` to guarantee that the
/// future that is this function will never await.
pub async fn wait_for_timeout(
&self,
num: V,
timeout_duration: Duration,
) -> Result<(), SeqWaitError> {
match self.queue_for_wait(num) {
Ok(None) => Ok(()),
Ok(Some(mut rx)) => match timeout(timeout_duration, rx.changed()).await {
Ok(Ok(())) => Ok(()),
) -> Result<T, SeqWaitError> {
match self.queue_for_wait(num, timeout_duration.is_zero()) {
Ok(Either::Left(data)) => Ok(data),
Ok(Either::Right(rx)) => match timeout(timeout_duration, rx).await {
Ok(Ok(data)) => Ok(data),
Ok(Err(_)) => Err(SeqWaitError::Shutdown),
Err(_) => Err(SeqWaitError::Timeout),
},
@@ -177,41 +190,50 @@ where
/// Register and return a channel that will be notified when a number arrives,
/// or None, if it has already arrived.
fn queue_for_wait(&self, num: V) -> Result<Option<Receiver<()>>, SeqWaitError> {
fn queue_for_wait(&self, num: V, nowait: bool) -> Result<Either<T, Receiver<T>>, SeqWaitError> {
let mut internal = self.internal.lock().unwrap();
if internal.current.cnt_value() >= num {
return Ok(None);
return Ok(Either::Left(internal.data.clone()));
}
if internal.shutdown {
return Err(SeqWaitError::Shutdown);
}
if nowait {
return Err(SeqWaitError::Timeout);
}
// Create a new channel.
let (tx, rx) = channel(());
let (tx, rx) = channel();
internal.waiters.push(Waiter {
wake_num: num,
wake_channel: tx,
});
// Drop the lock as we exit this scope.
Ok(Some(rx))
Ok(Either::Right(rx))
}
/// Announce a new number has arrived
///
/// All waiters at this value or below will be woken.
///
/// If `new_data` is Some(), it will update the internal data,
/// even if `num` is smaller than the internal counter.
/// It will not cause a wake-up though, in this case.
///
/// Returns the old number.
pub fn advance(&self, num: V) -> V {
pub fn advance(&self, num: V, new_data: Option<T>) -> V {
let old_value;
let wake_these = {
let (wake_these, with_data) = {
let mut internal = self.internal.lock().unwrap();
if let Some(new_data) = new_data {
internal.data = new_data;
}
old_value = internal.current.cnt_value();
if old_value >= num {
return old_value;
}
internal.current.cnt_advance(num);
// Pop all waiters <= num from the heap. Collect them in a vector, and
// wake them up after releasing the lock.
let mut wake_these = Vec::new();
@@ -221,13 +243,13 @@ where
}
wake_these.push(internal.waiters.pop().unwrap().wake_channel);
}
wake_these
(wake_these, internal.data.clone())
};
for tx in wake_these {
// This can fail if there are no receivers.
// We don't care; discard the error.
let _ = tx.send(());
let _ = tx.send(with_data.clone());
}
old_value
}
@@ -236,6 +258,106 @@ where
pub fn load(&self) -> S {
self.internal.lock().unwrap().current
}
/// Split the seqwait into a part than can only do wait,
/// and another part that can do advance + wait.
///
/// The wait-only part can be cloned, the advance part cannot be cloned.
/// This provides a single-producer multi-consumer scheme.
pub fn split_spmc(self) -> (Wait<S, V, T>, Advance<S, V, T>) {
let inner = Arc::new(self);
let w = Wait {
inner: inner.clone(),
};
let a = Advance { inner };
(w, a)
}
}
/// See [`SeqWait::split_spmc`].
pub struct Wait<S, V, T>
where
S: MonotonicCounter<V> + Copy,
V: Ord + Copy,
T: Clone,
{
inner: Arc<SeqWait<S, V, T>>,
}
/// See [`SeqWait::split_spmc`].
pub struct Advance<S, V, T>
where
S: MonotonicCounter<V> + Copy,
V: Ord + Copy,
T: Clone,
{
inner: Arc<SeqWait<S, V, T>>,
}
impl<S, V, T> Wait<S, V, T>
where
S: MonotonicCounter<V> + Copy,
V: Ord + Copy,
T: Clone,
{
/// See [`SeqWait::wait_for`].
pub async fn wait_for(&self, num: V) -> Result<T, SeqWaitError> {
self.inner.wait_for(num).await
}
/// See [`SeqWait::wait_for_timeout`].
pub async fn wait_for_timeout(
&self,
num: V,
timeout_duration: Duration,
) -> Result<T, SeqWaitError> {
self.inner.wait_for_timeout(num, timeout_duration).await
}
}
impl<S, V, T> Advance<S, V, T>
where
S: MonotonicCounter<V> + Copy,
V: Ord + Copy,
T: Clone,
{
/// See [`SeqWait::advance`].
pub fn advance(&self, num: V, new_data: Option<T>) -> V {
self.inner.advance(num, new_data)
}
/// See [`SeqWait::wait_for`].
pub async fn wait_for(&self, num: V) -> Result<T, SeqWaitError> {
self.inner.wait_for(num).await
}
/// See [`SeqWait::wait_for_timeout`].
pub async fn wait_for_timeout(
&self,
num: V,
timeout_duration: Duration,
) -> Result<T, SeqWaitError> {
self.inner.wait_for_timeout(num, timeout_duration).await
}
/// Get a `Clone::clone` of the current data inside the seqwait.
pub fn get_current_data(&self) -> (V, T) {
let inner = self.inner.internal.lock().unwrap();
(inner.current.cnt_value(), inner.data.clone())
}
}
impl<S, V, T> Clone for Wait<S, V, T>
where
S: MonotonicCounter<V> + Copy,
V: Ord + Copy,
T: Clone,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
#[cfg(test)]
@@ -256,12 +378,12 @@ mod tests {
#[tokio::test]
async fn seqwait() {
let seq = Arc::new(SeqWait::new(0));
let seq = Arc::new(SeqWait::new(0, ()));
let seq2 = Arc::clone(&seq);
let seq3 = Arc::clone(&seq);
let jh1 = tokio::task::spawn(async move {
seq2.wait_for(42).await.expect("wait_for 42");
let old = seq2.advance(100);
let old = seq2.advance(100, None);
assert_eq!(old, 99);
seq2.wait_for_timeout(999, Duration::from_millis(100))
.await
@@ -272,12 +394,12 @@ mod tests {
seq3.wait_for(0).await.expect("wait_for 0");
});
tokio::time::sleep(Duration::from_millis(200)).await;
let old = seq.advance(99);
let old = seq.advance(99, None);
assert_eq!(old, 0);
seq.wait_for(100).await.expect("wait_for 100");
// Calling advance with a smaller value is a no-op
assert_eq!(seq.advance(98), 100);
assert_eq!(seq.advance(98, None), 100);
assert_eq!(seq.load(), 100);
jh1.await.unwrap();
@@ -288,7 +410,7 @@ mod tests {
#[tokio::test]
async fn seqwait_timeout() {
let seq = Arc::new(SeqWait::new(0));
let seq = Arc::new(SeqWait::new(0, ()));
let seq2 = Arc::clone(&seq);
let jh = tokio::task::spawn(async move {
let timeout = Duration::from_millis(1);
@@ -298,10 +420,104 @@ mod tests {
tokio::time::sleep(Duration::from_millis(200)).await;
// This will attempt to wake, but nothing will happen
// because the waiter already dropped its Receiver.
let old = seq.advance(99);
let old = seq.advance(99, None);
assert_eq!(old, 0);
jh.await.unwrap();
seq.shutdown();
}
#[tokio::test]
async fn data_basic() {
let seq = Arc::new(SeqWait::new(0, "a"));
let seq2 = Arc::clone(&seq);
let jh = tokio::task::spawn(async move {
let data = seq.wait_for(2).await.unwrap();
assert_eq!(data, "b");
});
seq2.advance(1, Some("x"));
seq2.advance(2, Some("b"));
jh.await.unwrap();
}
#[test]
fn data_always_most_recent() {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
let seq = Arc::new(SeqWait::new(0, "a"));
let seq2 = Arc::clone(&seq);
let jh = rt.spawn(async move {
let data = seq.wait_for(2).await.unwrap();
assert_eq!(data, "d");
});
// jh is not running until we poll it, thanks to current thread runtime
rt.block_on(async move {
seq2.advance(2, Some("b"));
seq2.advance(3, Some("c"));
seq2.advance(4, Some("d"));
});
rt.block_on(jh).unwrap();
}
#[tokio::test]
async fn split_spmc_api_surface() {
let seq = SeqWait::new(0, 1);
let (w, a) = seq.split_spmc();
let _ = w.wait_for(1);
let _ = w.wait_for_timeout(0, Duration::from_secs(10));
let _ = w.clone();
let _ = a.advance(1, None);
let _ = a.wait_for(1);
let _ = a.wait_for_timeout(0, Duration::from_secs(10));
// TODO would be nice to have must-not-compile tests for Advance not being clonable.
}
#[tokio::test]
async fn new_data_same_lsn() {
let seq = Arc::new(SeqWait::new(0, "a"));
seq.advance(1, Some("b"));
let data = seq.wait_for(1).await.unwrap();
assert_eq!(data, "b", "the regular case where lsn and data advance");
seq.advance(1, Some("c"));
let data = seq.wait_for(1).await.unwrap();
assert_eq!(
data, "c",
"no lsn advance still gives new data for old lsn wait_for's"
);
let (start_wait_for_sender, start_wait_for_receiver) = tokio::sync::oneshot::channel();
// ensure we don't wake waiters for data-only change
let jh = tokio::spawn({
let seq = seq.clone();
async move {
start_wait_for_receiver.await.unwrap();
match tokio::time::timeout(Duration::from_secs(2), seq.wait_for(2)).await {
Ok(_) => {
assert!(
false,
"advance should not wake waiters if data changes but LSN doesn't"
);
}
Err(_) => {
// Good, we weren't woken up.
}
}
}
});
seq.advance(1, Some("d"));
start_wait_for_sender.send(()).unwrap();
jh.await.unwrap();
}
}

View File

@@ -145,7 +145,7 @@ pub struct Timeline {
// 'last_record_lsn.load().prev'. It's used to set the xl_prev pointer of the
// first WAL record when the node is started up. But here, we just
// keep track of it.
last_record_lsn: SeqWait<RecordLsn, Lsn>,
last_record_lsn: SeqWait<RecordLsn, Lsn, ()>,
// All WAL records have been processed and stored durably on files on
// local disk, up to this LSN. On crash and restart, we need to re-process
@@ -1270,10 +1270,13 @@ impl Timeline {
remote_client: remote_client.map(Arc::new),
// initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
last_record_lsn: SeqWait::new(RecordLsn {
last: disk_consistent_lsn,
prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
}),
last_record_lsn: SeqWait::new(
RecordLsn {
last: disk_consistent_lsn,
prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
},
(),
),
disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0),
last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
@@ -2420,7 +2423,7 @@ impl Timeline {
assert!(new_lsn.is_aligned());
self.metrics.last_record_gauge.set(new_lsn.0 as i64);
self.last_record_lsn.advance(new_lsn);
self.last_record_lsn.advance(new_lsn, None);
}
fn freeze_inmem_layer(&self, write_lock_held: bool) {