Files
neon/libs/utils/src/sync/spsc_fold.rs
Christian Schwarz aa4ec11af9 page_service: rewrite batching to work without a timeout (#9851)
# Problem

The timeout-based batching adds latency to unbatchable workloads.

We can choose a short batching timeout (e.g. 10us) but that requires
high-resolution timers, which tokio doesn't have.
I thoroughly explored options to use OS timers (see
[this](https://github.com/neondatabase/neon/pull/9822) abandoned PR).
In short, it's not an attractive option because any timer implementation
adds non-trivial overheads.

# Solution

The insight is that, in the steady state of a batchable workload, the
time we spend in `get_vectored` will be hundreds of microseconds anyway.

If we prepare the next batch concurrently to `get_vectored`, we will
have a sizeable batch ready once `get_vectored` of the current batch is
done and do not need an explicit timeout.

This can be reasonably described as **pipelining of the protocol
handler**.

# Implementation

We model the sub-protocol handler for pagestream requests
(`handle_pagrequests`) as two futures that form a pipeline:

2. Batching: read requests from the connection and fill the current
batch
3. Execution: `take` the current batch, execute it using `get_vectored`,
and send the response.

The Reading and Batching stage are connected through a new type of
channel called `spsc_fold`.

See the long comment in the `handle_pagerequests_pipelined` for details.

# Changes

- Refactor `handle_pagerequests`
    - separate functions for
- reading one protocol message; produces a `BatchedFeMessage` with just
one page request in it
- batching; tried to merge an incoming `BatchedFeMessage` into an
existing `BatchedFeMessage`; returns `None` on success and returns back
the incoming message in case merging isn't possible
        - execution of a batched message
- unify the timeline handle acquisition & request span construction; it
now happen in the function that reads the protocol message
- Implement serial and pipelined model
    - serial: what we had before any of the batching changes
      - read one protocol message
      - execute protocol messages
    - pipelined: the design described above
- optionality for execution of the pipeline: either via concurrent
futures vs tokio tasks
- Pageserver config
  - remove batching timeout field
  - add ability to configure pipelining mode
- add ability to limit max batch size for pipelined configurations
(required for the rollout, cf
https://github.com/neondatabase/cloud/issues/20620 )
  - ability to configure execution mode
- Tests
  - remove `batch_timeout` parametrization
  - rename `test_getpage_merge_smoke` to `test_throughput`
- add parametrization to test different max batch sizes and execution
moes
  - rename `test_timer_precision` to `test_latency`
  - rename the test case file to `test_page_service_batching.py`
  - better descriptions of what the tests actually do

## On the holding The `TimelineHandle` in the pending batch

While batching, we hold the `TimelineHandle` in the pending batch.
Therefore, the timeline will not finish shutting down while we're
batching.

This is not a problem in practice because the concurrently ongoing
`get_vectored` call will fail quickly with an error indicating that the
timeline is shutting down.
This results in the Execution stage returning a `QueryError::Shutdown`,
which causes the pipeline / entire page service connection to shut down.
This drops all references to the
`Arc<Mutex<Option<Box<BatchedFeMessage>>>>` object, thereby dropping the
contained `TimelineHandle`s.

- => fixes https://github.com/neondatabase/neon/issues/9850

# Performance

Local run of the benchmarks, results in [this empty
commit](1cf5b1463f)
in the PR branch.

Key take-aways:
* `concurrent-futures` and `tasks` deliver identical `batching_factor`
* tail latency impact unknown, cf
https://github.com/neondatabase/neon/issues/9837
* `concurrent-futures` has higher throughput than `tasks` in all
workloads (=lower `time` metric)
* In unbatchable workloads, `concurrent-futures` has 5% higher
`CPU-per-throughput` than that of `tasks`, and 15% higher than that of
`serial`.
* In batchable-32 workload, `concurrent-futures` has 8% lower
`CPU-per-throughput` than that of `tasks` (comparison to tput of
`serial` is irrelevant)
* in unbatchable workloads, mean and tail latencies of
`concurrent-futures` is practically identical to `serial`, whereas
`tasks` adds 20-30us of overhead

Overall, `concurrent-futures` seems like a slightly more attractive
choice.

# Rollout

This change is disabled-by-default.

Rollout plan:
- https://github.com/neondatabase/cloud/issues/20620

# Refs

- epic: https://github.com/neondatabase/neon/issues/9376
- this sub-task: https://github.com/neondatabase/neon/issues/9377
- the abandoned attempt to improve batching timeout resolution:
https://github.com/neondatabase/neon/pull/9820
- closes https://github.com/neondatabase/neon/issues/9850
- fixes https://github.com/neondatabase/neon/issues/9835
2024-11-30 00:16:24 +00:00

453 lines
14 KiB
Rust

use core::{future::poll_fn, task::Poll};
use std::sync::{Arc, Mutex};
use diatomic_waker::DiatomicWaker;
pub struct Sender<T> {
state: Arc<Inner<T>>,
}
pub struct Receiver<T> {
state: Arc<Inner<T>>,
}
struct Inner<T> {
wake_receiver: DiatomicWaker,
wake_sender: DiatomicWaker,
value: Mutex<State<T>>,
}
enum State<T> {
NoData,
HasData(T),
TryFoldFailed, // transient state
SenderWaitsForReceiverToConsume(T),
SenderGone(Option<T>),
ReceiverGone,
AllGone,
SenderDropping, // transient state
ReceiverDropping, // transient state
}
pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
let inner = Inner {
wake_receiver: DiatomicWaker::new(),
wake_sender: DiatomicWaker::new(),
value: Mutex::new(State::NoData),
};
let state = Arc::new(inner);
(
Sender {
state: state.clone(),
},
Receiver { state },
)
}
#[derive(Debug, thiserror::Error)]
pub enum SendError {
#[error("receiver is gone")]
ReceiverGone,
}
impl<T: Send> Sender<T> {
/// # Panics
///
/// If `try_fold` panics, any subsequent call to `send` panic.
pub async fn send<F>(&mut self, value: T, try_fold: F) -> Result<(), SendError>
where
F: Fn(&mut T, T) -> Result<(), T>,
{
let mut value = Some(value);
poll_fn(|cx| {
let mut guard = self.state.value.lock().unwrap();
match &mut *guard {
State::NoData => {
*guard = State::HasData(value.take().unwrap());
self.state.wake_receiver.notify();
Poll::Ready(Ok(()))
}
State::HasData(_) => {
let State::HasData(acc_mut) = &mut *guard else {
unreachable!("this match arm guarantees that the guard is HasData");
};
match try_fold(acc_mut, value.take().unwrap()) {
Ok(()) => {
// no need to wake receiver, if it was waiting it already
// got a wake-up when we transitioned from NoData to HasData
Poll::Ready(Ok(()))
}
Err(unfoldable_value) => {
value = Some(unfoldable_value);
let State::HasData(acc) =
std::mem::replace(&mut *guard, State::TryFoldFailed)
else {
unreachable!("this match arm guarantees that the guard is HasData");
};
*guard = State::SenderWaitsForReceiverToConsume(acc);
// SAFETY: send is single threaded due to `&mut self` requirement,
// therefore register is not concurrent.
unsafe {
self.state.wake_sender.register(cx.waker());
}
Poll::Pending
}
}
}
State::SenderWaitsForReceiverToConsume(_data) => {
// Really, we shouldn't be polled until receiver has consumed and wakes us.
Poll::Pending
}
State::ReceiverGone => Poll::Ready(Err(SendError::ReceiverGone)),
State::SenderGone(_)
| State::AllGone
| State::SenderDropping
| State::ReceiverDropping
| State::TryFoldFailed => {
unreachable!();
}
}
})
.await
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
scopeguard::defer! {
self.state.wake_receiver.notify()
};
let Ok(mut guard) = self.state.value.lock() else {
return;
};
*guard = match std::mem::replace(&mut *guard, State::SenderDropping) {
State::NoData => State::SenderGone(None),
State::HasData(data) | State::SenderWaitsForReceiverToConsume(data) => {
State::SenderGone(Some(data))
}
State::ReceiverGone => State::AllGone,
State::TryFoldFailed
| State::SenderGone(_)
| State::AllGone
| State::SenderDropping
| State::ReceiverDropping => {
unreachable!("unreachable state {:?}", guard.discriminant_str())
}
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum RecvError {
#[error("sender is gone")]
SenderGone,
}
impl<T: Send> Receiver<T> {
pub async fn recv(&mut self) -> Result<T, RecvError> {
poll_fn(|cx| {
let mut guard = self.state.value.lock().unwrap();
match &mut *guard {
State::NoData => {
// SAFETY: recv is single threaded due to `&mut self` requirement,
// therefore register is not concurrent.
unsafe {
self.state.wake_receiver.register(cx.waker());
}
Poll::Pending
}
guard @ State::HasData(_)
| guard @ State::SenderWaitsForReceiverToConsume(_)
| guard @ State::SenderGone(Some(_)) => {
let data = guard
.take_data()
.expect("in these states, data is guaranteed to be present");
self.state.wake_sender.notify();
Poll::Ready(Ok(data))
}
State::SenderGone(None) => Poll::Ready(Err(RecvError::SenderGone)),
State::ReceiverGone
| State::AllGone
| State::SenderDropping
| State::ReceiverDropping
| State::TryFoldFailed => {
unreachable!("unreachable state {:?}", guard.discriminant_str());
}
}
})
.await
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
scopeguard::defer! {
self.state.wake_sender.notify()
};
let Ok(mut guard) = self.state.value.lock() else {
return;
};
*guard = match std::mem::replace(&mut *guard, State::ReceiverDropping) {
State::NoData => State::ReceiverGone,
State::HasData(_) | State::SenderWaitsForReceiverToConsume(_) => State::ReceiverGone,
State::SenderGone(_) => State::AllGone,
State::TryFoldFailed
| State::ReceiverGone
| State::AllGone
| State::SenderDropping
| State::ReceiverDropping => {
unreachable!("unreachable state {:?}", guard.discriminant_str())
}
}
}
}
impl<T> State<T> {
fn take_data(&mut self) -> Option<T> {
match self {
State::HasData(_) => {
let State::HasData(data) = std::mem::replace(self, State::NoData) else {
unreachable!("this match arm guarantees that the state is HasData");
};
Some(data)
}
State::SenderWaitsForReceiverToConsume(_) => {
let State::SenderWaitsForReceiverToConsume(data) =
std::mem::replace(self, State::NoData)
else {
unreachable!(
"this match arm guarantees that the state is SenderWaitsForReceiverToConsume"
);
};
Some(data)
}
State::SenderGone(data) => Some(data.take().unwrap()),
State::NoData
| State::TryFoldFailed
| State::ReceiverGone
| State::AllGone
| State::SenderDropping
| State::ReceiverDropping => None,
}
}
fn discriminant_str(&self) -> &'static str {
match self {
State::NoData => "NoData",
State::HasData(_) => "HasData",
State::TryFoldFailed => "TryFoldFailed",
State::SenderWaitsForReceiverToConsume(_) => "SenderWaitsForReceiverToConsume",
State::SenderGone(_) => "SenderGone",
State::ReceiverGone => "ReceiverGone",
State::AllGone => "AllGone",
State::SenderDropping => "SenderDropping",
State::ReceiverDropping => "ReceiverDropping",
}
}
}
#[cfg(test)]
mod tests {
use super::*;
const FOREVER: std::time::Duration = std::time::Duration::from_secs(u64::MAX);
#[tokio::test]
async fn test_send_recv() {
let (mut sender, mut receiver) = channel();
sender
.send(42, |acc, val| {
*acc += val;
Ok(())
})
.await
.unwrap();
let received = receiver.recv().await.unwrap();
assert_eq!(received, 42);
}
#[tokio::test]
async fn test_send_recv_with_fold() {
let (mut sender, mut receiver) = channel();
sender
.send(1, |acc, val| {
*acc += val;
Ok(())
})
.await
.unwrap();
sender
.send(2, |acc, val| {
*acc += val;
Ok(())
})
.await
.unwrap();
let received = receiver.recv().await.unwrap();
assert_eq!(received, 3);
}
#[tokio::test(start_paused = true)]
async fn test_sender_waits_for_receiver_if_try_fold_fails() {
let (mut sender, mut receiver) = channel();
sender.send(23, |_, _| panic!("first send")).await.unwrap();
let send_fut = sender.send(42, |_, val| Err(val));
let mut send_fut = std::pin::pin!(send_fut);
tokio::select! {
_ = tokio::time::sleep(FOREVER) => {},
_ = &mut send_fut => {
panic!("send should not complete");
},
}
let val = receiver.recv().await.unwrap();
assert_eq!(val, 23);
tokio::select! {
_ = tokio::time::sleep(FOREVER) => {
panic!("receiver should have consumed the value");
},
_ = &mut send_fut => { },
}
let val = receiver.recv().await.unwrap();
assert_eq!(val, 42);
}
#[tokio::test(start_paused = true)]
async fn test_sender_errors_if_waits_for_receiver_and_receiver_drops() {
let (mut sender, receiver) = channel();
sender.send(23, |_, _| unreachable!()).await.unwrap();
let send_fut = sender.send(42, |_, val| Err(val));
let send_fut = std::pin::pin!(send_fut);
drop(receiver);
let result = send_fut.await;
assert!(matches!(result, Err(SendError::ReceiverGone)));
}
#[tokio::test(start_paused = true)]
async fn test_receiver_errors_if_waits_for_sender_and_sender_drops() {
let (sender, mut receiver) = channel::<()>();
let recv_fut = receiver.recv();
let recv_fut = std::pin::pin!(recv_fut);
drop(sender);
let result = recv_fut.await;
assert!(matches!(result, Err(RecvError::SenderGone)));
}
#[tokio::test(start_paused = true)]
async fn test_receiver_errors_if_waits_for_sender_and_sender_drops_with_data() {
let (mut sender, mut receiver) = channel();
sender.send(42, |_, _| unreachable!()).await.unwrap();
{
let recv_fut = receiver.recv();
let recv_fut = std::pin::pin!(recv_fut);
drop(sender);
let val = recv_fut.await.unwrap();
assert_eq!(val, 42);
}
let result = receiver.recv().await;
assert!(matches!(result, Err(RecvError::SenderGone)));
}
#[tokio::test(start_paused = true)]
async fn test_receiver_waits_for_sender_if_no_data() {
let (mut sender, mut receiver) = channel();
let recv_fut = receiver.recv();
let mut recv_fut = std::pin::pin!(recv_fut);
tokio::select! {
_ = tokio::time::sleep(FOREVER) => {},
_ = &mut recv_fut => {
panic!("recv should not complete");
},
}
sender.send(42, |_, _| Ok(())).await.unwrap();
let val = recv_fut.await.unwrap();
assert_eq!(val, 42);
}
#[tokio::test]
async fn test_receiver_gone_while_nodata() {
let (mut sender, receiver) = channel();
drop(receiver);
let result = sender.send(42, |_, _| Ok(())).await;
assert!(matches!(result, Err(SendError::ReceiverGone)));
}
#[tokio::test]
async fn test_sender_gone_while_nodata() {
let (sender, mut receiver) = super::channel::<usize>();
drop(sender);
let result = receiver.recv().await;
assert!(matches!(result, Err(RecvError::SenderGone)));
}
#[tokio::test(start_paused = true)]
async fn test_receiver_drops_after_sender_went_to_sleep() {
let (mut sender, receiver) = channel();
let state = receiver.state.clone();
sender.send(23, |_, _| unreachable!()).await.unwrap();
let send_task = tokio::spawn(async move { sender.send(42, |_, v| Err(v)).await });
tokio::time::sleep(FOREVER).await;
assert!(matches!(
&*state.value.lock().unwrap(),
&State::SenderWaitsForReceiverToConsume(_)
));
drop(receiver);
let err = send_task
.await
.unwrap()
.expect_err("should unblock immediately");
assert!(matches!(err, SendError::ReceiverGone));
}
#[tokio::test(start_paused = true)]
async fn test_sender_drops_after_receiver_went_to_sleep() {
let (sender, mut receiver) = channel::<usize>();
let state = sender.state.clone();
let recv_task = tokio::spawn(async move { receiver.recv().await });
tokio::time::sleep(FOREVER).await;
assert!(matches!(&*state.value.lock().unwrap(), &State::NoData));
drop(sender);
let err = recv_task.await.unwrap().expect_err("should error");
assert!(matches!(err, RecvError::SenderGone));
}
}