mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-28 00:23:00 +00:00
page_service: rewrite batching to work without a timeout (#9851)
# Problem
The timeout-based batching adds latency to unbatchable workloads.
We can choose a short batching timeout (e.g. 10us) but that requires
high-resolution timers, which tokio doesn't have.
I thoroughly explored options to use OS timers (see
[this](https://github.com/neondatabase/neon/pull/9822) abandoned PR).
In short, it's not an attractive option because any timer implementation
adds non-trivial overheads.
# Solution
The insight is that, in the steady state of a batchable workload, the
time we spend in `get_vectored` will be hundreds of microseconds anyway.
If we prepare the next batch concurrently to `get_vectored`, we will
have a sizeable batch ready once `get_vectored` of the current batch is
done and do not need an explicit timeout.
This can be reasonably described as **pipelining of the protocol
handler**.
# Implementation
We model the sub-protocol handler for pagestream requests
(`handle_pagrequests`) as two futures that form a pipeline:
2. Batching: read requests from the connection and fill the current
batch
3. Execution: `take` the current batch, execute it using `get_vectored`,
and send the response.
The Reading and Batching stage are connected through a new type of
channel called `spsc_fold`.
See the long comment in the `handle_pagerequests_pipelined` for details.
# Changes
- Refactor `handle_pagerequests`
- separate functions for
- reading one protocol message; produces a `BatchedFeMessage` with just
one page request in it
- batching; tried to merge an incoming `BatchedFeMessage` into an
existing `BatchedFeMessage`; returns `None` on success and returns back
the incoming message in case merging isn't possible
- execution of a batched message
- unify the timeline handle acquisition & request span construction; it
now happen in the function that reads the protocol message
- Implement serial and pipelined model
- serial: what we had before any of the batching changes
- read one protocol message
- execute protocol messages
- pipelined: the design described above
- optionality for execution of the pipeline: either via concurrent
futures vs tokio tasks
- Pageserver config
- remove batching timeout field
- add ability to configure pipelining mode
- add ability to limit max batch size for pipelined configurations
(required for the rollout, cf
https://github.com/neondatabase/cloud/issues/20620 )
- ability to configure execution mode
- Tests
- remove `batch_timeout` parametrization
- rename `test_getpage_merge_smoke` to `test_throughput`
- add parametrization to test different max batch sizes and execution
moes
- rename `test_timer_precision` to `test_latency`
- rename the test case file to `test_page_service_batching.py`
- better descriptions of what the tests actually do
## On the holding The `TimelineHandle` in the pending batch
While batching, we hold the `TimelineHandle` in the pending batch.
Therefore, the timeline will not finish shutting down while we're
batching.
This is not a problem in practice because the concurrently ongoing
`get_vectored` call will fail quickly with an error indicating that the
timeline is shutting down.
This results in the Execution stage returning a `QueryError::Shutdown`,
which causes the pipeline / entire page service connection to shut down.
This drops all references to the
`Arc<Mutex<Option<Box<BatchedFeMessage>>>>` object, thereby dropping the
contained `TimelineHandle`s.
- => fixes https://github.com/neondatabase/neon/issues/9850
# Performance
Local run of the benchmarks, results in [this empty
commit](1cf5b1463f)
in the PR branch.
Key take-aways:
* `concurrent-futures` and `tasks` deliver identical `batching_factor`
* tail latency impact unknown, cf
https://github.com/neondatabase/neon/issues/9837
* `concurrent-futures` has higher throughput than `tasks` in all
workloads (=lower `time` metric)
* In unbatchable workloads, `concurrent-futures` has 5% higher
`CPU-per-throughput` than that of `tasks`, and 15% higher than that of
`serial`.
* In batchable-32 workload, `concurrent-futures` has 8% lower
`CPU-per-throughput` than that of `tasks` (comparison to tput of
`serial` is irrelevant)
* in unbatchable workloads, mean and tail latencies of
`concurrent-futures` is practically identical to `serial`, whereas
`tasks` adds 20-30us of overhead
Overall, `concurrent-futures` seems like a slightly more attractive
choice.
# Rollout
This change is disabled-by-default.
Rollout plan:
- https://github.com/neondatabase/cloud/issues/20620
# Refs
- epic: https://github.com/neondatabase/neon/issues/9376
- this sub-task: https://github.com/neondatabase/neon/issues/9377
- the abandoned attempt to improve batching timeout resolution:
https://github.com/neondatabase/neon/pull/9820
- closes https://github.com/neondatabase/neon/issues/9850
- fixes https://github.com/neondatabase/neon/issues/9835
This commit is contained in:
committed by
GitHub
parent
973a8d2680
commit
aa4ec11af9
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -1,6 +1,6 @@
|
||||
# This file is automatically @generated by Cargo.
|
||||
# It is not intended for manual editing.
|
||||
version = 3
|
||||
version = 4
|
||||
|
||||
[[package]]
|
||||
name = "RustyXML"
|
||||
@@ -1717,6 +1717,12 @@ dependencies = [
|
||||
"utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "diatomic-waker"
|
||||
version = "0.2.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ab03c107fafeb3ee9f5925686dbb7a73bc76e3932abb0d2b365cb64b169cf04c"
|
||||
|
||||
[[package]]
|
||||
name = "diesel"
|
||||
version = "2.2.3"
|
||||
@@ -7045,6 +7051,7 @@ dependencies = [
|
||||
"chrono",
|
||||
"const_format",
|
||||
"criterion",
|
||||
"diatomic-waker",
|
||||
"fail",
|
||||
"futures",
|
||||
"git-version",
|
||||
@@ -7063,6 +7070,7 @@ dependencies = [
|
||||
"rand 0.8.5",
|
||||
"regex",
|
||||
"routerify",
|
||||
"scopeguard",
|
||||
"sentry",
|
||||
"serde",
|
||||
"serde_assert",
|
||||
|
||||
@@ -83,6 +83,7 @@ comfy-table = "7.1"
|
||||
const_format = "0.2"
|
||||
crc32c = "0.6"
|
||||
dashmap = { version = "5.5.0", features = ["raw-api"] }
|
||||
diatomic-waker = { version = "0.2.3" }
|
||||
either = "1.8"
|
||||
enum-map = "2.4.2"
|
||||
enumset = "1.0.12"
|
||||
|
||||
@@ -118,9 +118,8 @@ pub struct ConfigToml {
|
||||
pub virtual_file_io_mode: Option<crate::models::virtual_file::IoMode>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub no_sync: Option<bool>,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub server_side_batch_timeout: Option<Duration>,
|
||||
pub wal_receiver_protocol: PostgresClientProtocol,
|
||||
pub page_service_pipelining: PageServicePipeliningConfig,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -137,6 +136,28 @@ pub struct DiskUsageEvictionTaskConfig {
|
||||
pub eviction_order: EvictionOrder,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(tag = "mode", rename_all = "kebab-case")]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub enum PageServicePipeliningConfig {
|
||||
Serial,
|
||||
Pipelined(PageServicePipeliningConfigPipelined),
|
||||
}
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct PageServicePipeliningConfigPipelined {
|
||||
/// Causes runtime errors if larger than max get_vectored batch size.
|
||||
pub max_batch_size: NonZeroUsize,
|
||||
pub execution: PageServiceProtocolPipelinedExecutionStrategy,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum PageServiceProtocolPipelinedExecutionStrategy {
|
||||
ConcurrentFutures,
|
||||
Tasks,
|
||||
}
|
||||
|
||||
pub mod statvfs {
|
||||
pub mod mock {
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -332,8 +353,6 @@ pub mod defaults {
|
||||
|
||||
pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 512;
|
||||
|
||||
pub const DEFAULT_SERVER_SIDE_BATCH_TIMEOUT: Option<&str> = None;
|
||||
|
||||
pub const DEFAULT_WAL_RECEIVER_PROTOCOL: utils::postgres_client::PostgresClientProtocol =
|
||||
utils::postgres_client::PostgresClientProtocol::Vanilla;
|
||||
}
|
||||
@@ -420,11 +439,10 @@ impl Default for ConfigToml {
|
||||
ephemeral_bytes_per_memory_kb: (DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
|
||||
l0_flush: None,
|
||||
virtual_file_io_mode: None,
|
||||
server_side_batch_timeout: DEFAULT_SERVER_SIDE_BATCH_TIMEOUT
|
||||
.map(|duration| humantime::parse_duration(duration).unwrap()),
|
||||
tenant_config: TenantConfigToml::default(),
|
||||
no_sync: None,
|
||||
wal_receiver_protocol: DEFAULT_WAL_RECEIVER_PROTOCOL,
|
||||
page_service_pipelining: PageServicePipeliningConfig::Serial,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ bincode.workspace = true
|
||||
bytes.workspace = true
|
||||
camino.workspace = true
|
||||
chrono.workspace = true
|
||||
diatomic-waker.workspace = true
|
||||
git-version.workspace = true
|
||||
hex = { workspace = true, features = ["serde"] }
|
||||
humantime.workspace = true
|
||||
@@ -45,6 +46,7 @@ tracing.workspace = true
|
||||
tracing-error.workspace = true
|
||||
tracing-subscriber = { workspace = true, features = ["json", "registry"] }
|
||||
rand.workspace = true
|
||||
scopeguard.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
url.workspace = true
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
pub mod heavier_once_cell;
|
||||
|
||||
pub mod gate;
|
||||
|
||||
pub mod spsc_fold;
|
||||
|
||||
452
libs/utils/src/sync/spsc_fold.rs
Normal file
452
libs/utils/src/sync/spsc_fold.rs
Normal file
@@ -0,0 +1,452 @@
|
||||
use core::{future::poll_fn, task::Poll};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use diatomic_waker::DiatomicWaker;
|
||||
|
||||
pub struct Sender<T> {
|
||||
state: Arc<Inner<T>>,
|
||||
}
|
||||
|
||||
pub struct Receiver<T> {
|
||||
state: Arc<Inner<T>>,
|
||||
}
|
||||
|
||||
struct Inner<T> {
|
||||
wake_receiver: DiatomicWaker,
|
||||
wake_sender: DiatomicWaker,
|
||||
value: Mutex<State<T>>,
|
||||
}
|
||||
|
||||
enum State<T> {
|
||||
NoData,
|
||||
HasData(T),
|
||||
TryFoldFailed, // transient state
|
||||
SenderWaitsForReceiverToConsume(T),
|
||||
SenderGone(Option<T>),
|
||||
ReceiverGone,
|
||||
AllGone,
|
||||
SenderDropping, // transient state
|
||||
ReceiverDropping, // transient state
|
||||
}
|
||||
|
||||
pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
|
||||
let inner = Inner {
|
||||
wake_receiver: DiatomicWaker::new(),
|
||||
wake_sender: DiatomicWaker::new(),
|
||||
value: Mutex::new(State::NoData),
|
||||
};
|
||||
|
||||
let state = Arc::new(inner);
|
||||
(
|
||||
Sender {
|
||||
state: state.clone(),
|
||||
},
|
||||
Receiver { state },
|
||||
)
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum SendError {
|
||||
#[error("receiver is gone")]
|
||||
ReceiverGone,
|
||||
}
|
||||
|
||||
impl<T: Send> Sender<T> {
|
||||
/// # Panics
|
||||
///
|
||||
/// If `try_fold` panics, any subsequent call to `send` panic.
|
||||
pub async fn send<F>(&mut self, value: T, try_fold: F) -> Result<(), SendError>
|
||||
where
|
||||
F: Fn(&mut T, T) -> Result<(), T>,
|
||||
{
|
||||
let mut value = Some(value);
|
||||
poll_fn(|cx| {
|
||||
let mut guard = self.state.value.lock().unwrap();
|
||||
match &mut *guard {
|
||||
State::NoData => {
|
||||
*guard = State::HasData(value.take().unwrap());
|
||||
self.state.wake_receiver.notify();
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
State::HasData(_) => {
|
||||
let State::HasData(acc_mut) = &mut *guard else {
|
||||
unreachable!("this match arm guarantees that the guard is HasData");
|
||||
};
|
||||
match try_fold(acc_mut, value.take().unwrap()) {
|
||||
Ok(()) => {
|
||||
// no need to wake receiver, if it was waiting it already
|
||||
// got a wake-up when we transitioned from NoData to HasData
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
Err(unfoldable_value) => {
|
||||
value = Some(unfoldable_value);
|
||||
let State::HasData(acc) =
|
||||
std::mem::replace(&mut *guard, State::TryFoldFailed)
|
||||
else {
|
||||
unreachable!("this match arm guarantees that the guard is HasData");
|
||||
};
|
||||
*guard = State::SenderWaitsForReceiverToConsume(acc);
|
||||
// SAFETY: send is single threaded due to `&mut self` requirement,
|
||||
// therefore register is not concurrent.
|
||||
unsafe {
|
||||
self.state.wake_sender.register(cx.waker());
|
||||
}
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
State::SenderWaitsForReceiverToConsume(_data) => {
|
||||
// Really, we shouldn't be polled until receiver has consumed and wakes us.
|
||||
Poll::Pending
|
||||
}
|
||||
State::ReceiverGone => Poll::Ready(Err(SendError::ReceiverGone)),
|
||||
State::SenderGone(_)
|
||||
| State::AllGone
|
||||
| State::SenderDropping
|
||||
| State::ReceiverDropping
|
||||
| State::TryFoldFailed => {
|
||||
unreachable!();
|
||||
}
|
||||
}
|
||||
})
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for Sender<T> {
|
||||
fn drop(&mut self) {
|
||||
scopeguard::defer! {
|
||||
self.state.wake_receiver.notify()
|
||||
};
|
||||
let Ok(mut guard) = self.state.value.lock() else {
|
||||
return;
|
||||
};
|
||||
*guard = match std::mem::replace(&mut *guard, State::SenderDropping) {
|
||||
State::NoData => State::SenderGone(None),
|
||||
State::HasData(data) | State::SenderWaitsForReceiverToConsume(data) => {
|
||||
State::SenderGone(Some(data))
|
||||
}
|
||||
State::ReceiverGone => State::AllGone,
|
||||
State::TryFoldFailed
|
||||
| State::SenderGone(_)
|
||||
| State::AllGone
|
||||
| State::SenderDropping
|
||||
| State::ReceiverDropping => {
|
||||
unreachable!("unreachable state {:?}", guard.discriminant_str())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum RecvError {
|
||||
#[error("sender is gone")]
|
||||
SenderGone,
|
||||
}
|
||||
|
||||
impl<T: Send> Receiver<T> {
|
||||
pub async fn recv(&mut self) -> Result<T, RecvError> {
|
||||
poll_fn(|cx| {
|
||||
let mut guard = self.state.value.lock().unwrap();
|
||||
match &mut *guard {
|
||||
State::NoData => {
|
||||
// SAFETY: recv is single threaded due to `&mut self` requirement,
|
||||
// therefore register is not concurrent.
|
||||
unsafe {
|
||||
self.state.wake_receiver.register(cx.waker());
|
||||
}
|
||||
Poll::Pending
|
||||
}
|
||||
guard @ State::HasData(_)
|
||||
| guard @ State::SenderWaitsForReceiverToConsume(_)
|
||||
| guard @ State::SenderGone(Some(_)) => {
|
||||
let data = guard
|
||||
.take_data()
|
||||
.expect("in these states, data is guaranteed to be present");
|
||||
self.state.wake_sender.notify();
|
||||
Poll::Ready(Ok(data))
|
||||
}
|
||||
State::SenderGone(None) => Poll::Ready(Err(RecvError::SenderGone)),
|
||||
State::ReceiverGone
|
||||
| State::AllGone
|
||||
| State::SenderDropping
|
||||
| State::ReceiverDropping
|
||||
| State::TryFoldFailed => {
|
||||
unreachable!("unreachable state {:?}", guard.discriminant_str());
|
||||
}
|
||||
}
|
||||
})
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for Receiver<T> {
|
||||
fn drop(&mut self) {
|
||||
scopeguard::defer! {
|
||||
self.state.wake_sender.notify()
|
||||
};
|
||||
let Ok(mut guard) = self.state.value.lock() else {
|
||||
return;
|
||||
};
|
||||
*guard = match std::mem::replace(&mut *guard, State::ReceiverDropping) {
|
||||
State::NoData => State::ReceiverGone,
|
||||
State::HasData(_) | State::SenderWaitsForReceiverToConsume(_) => State::ReceiverGone,
|
||||
State::SenderGone(_) => State::AllGone,
|
||||
State::TryFoldFailed
|
||||
| State::ReceiverGone
|
||||
| State::AllGone
|
||||
| State::SenderDropping
|
||||
| State::ReceiverDropping => {
|
||||
unreachable!("unreachable state {:?}", guard.discriminant_str())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> State<T> {
|
||||
fn take_data(&mut self) -> Option<T> {
|
||||
match self {
|
||||
State::HasData(_) => {
|
||||
let State::HasData(data) = std::mem::replace(self, State::NoData) else {
|
||||
unreachable!("this match arm guarantees that the state is HasData");
|
||||
};
|
||||
Some(data)
|
||||
}
|
||||
State::SenderWaitsForReceiverToConsume(_) => {
|
||||
let State::SenderWaitsForReceiverToConsume(data) =
|
||||
std::mem::replace(self, State::NoData)
|
||||
else {
|
||||
unreachable!(
|
||||
"this match arm guarantees that the state is SenderWaitsForReceiverToConsume"
|
||||
);
|
||||
};
|
||||
Some(data)
|
||||
}
|
||||
State::SenderGone(data) => Some(data.take().unwrap()),
|
||||
State::NoData
|
||||
| State::TryFoldFailed
|
||||
| State::ReceiverGone
|
||||
| State::AllGone
|
||||
| State::SenderDropping
|
||||
| State::ReceiverDropping => None,
|
||||
}
|
||||
}
|
||||
fn discriminant_str(&self) -> &'static str {
|
||||
match self {
|
||||
State::NoData => "NoData",
|
||||
State::HasData(_) => "HasData",
|
||||
State::TryFoldFailed => "TryFoldFailed",
|
||||
State::SenderWaitsForReceiverToConsume(_) => "SenderWaitsForReceiverToConsume",
|
||||
State::SenderGone(_) => "SenderGone",
|
||||
State::ReceiverGone => "ReceiverGone",
|
||||
State::AllGone => "AllGone",
|
||||
State::SenderDropping => "SenderDropping",
|
||||
State::ReceiverDropping => "ReceiverDropping",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::*;
|
||||
|
||||
const FOREVER: std::time::Duration = std::time::Duration::from_secs(u64::MAX);
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_send_recv() {
|
||||
let (mut sender, mut receiver) = channel();
|
||||
|
||||
sender
|
||||
.send(42, |acc, val| {
|
||||
*acc += val;
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let received = receiver.recv().await.unwrap();
|
||||
assert_eq!(received, 42);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_send_recv_with_fold() {
|
||||
let (mut sender, mut receiver) = channel();
|
||||
|
||||
sender
|
||||
.send(1, |acc, val| {
|
||||
*acc += val;
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
sender
|
||||
.send(2, |acc, val| {
|
||||
*acc += val;
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let received = receiver.recv().await.unwrap();
|
||||
assert_eq!(received, 3);
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn test_sender_waits_for_receiver_if_try_fold_fails() {
|
||||
let (mut sender, mut receiver) = channel();
|
||||
|
||||
sender.send(23, |_, _| panic!("first send")).await.unwrap();
|
||||
|
||||
let send_fut = sender.send(42, |_, val| Err(val));
|
||||
let mut send_fut = std::pin::pin!(send_fut);
|
||||
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(FOREVER) => {},
|
||||
_ = &mut send_fut => {
|
||||
panic!("send should not complete");
|
||||
},
|
||||
}
|
||||
|
||||
let val = receiver.recv().await.unwrap();
|
||||
assert_eq!(val, 23);
|
||||
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(FOREVER) => {
|
||||
panic!("receiver should have consumed the value");
|
||||
},
|
||||
_ = &mut send_fut => { },
|
||||
}
|
||||
|
||||
let val = receiver.recv().await.unwrap();
|
||||
assert_eq!(val, 42);
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn test_sender_errors_if_waits_for_receiver_and_receiver_drops() {
|
||||
let (mut sender, receiver) = channel();
|
||||
|
||||
sender.send(23, |_, _| unreachable!()).await.unwrap();
|
||||
|
||||
let send_fut = sender.send(42, |_, val| Err(val));
|
||||
let send_fut = std::pin::pin!(send_fut);
|
||||
|
||||
drop(receiver);
|
||||
|
||||
let result = send_fut.await;
|
||||
assert!(matches!(result, Err(SendError::ReceiverGone)));
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn test_receiver_errors_if_waits_for_sender_and_sender_drops() {
|
||||
let (sender, mut receiver) = channel::<()>();
|
||||
|
||||
let recv_fut = receiver.recv();
|
||||
let recv_fut = std::pin::pin!(recv_fut);
|
||||
|
||||
drop(sender);
|
||||
|
||||
let result = recv_fut.await;
|
||||
assert!(matches!(result, Err(RecvError::SenderGone)));
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn test_receiver_errors_if_waits_for_sender_and_sender_drops_with_data() {
|
||||
let (mut sender, mut receiver) = channel();
|
||||
|
||||
sender.send(42, |_, _| unreachable!()).await.unwrap();
|
||||
|
||||
{
|
||||
let recv_fut = receiver.recv();
|
||||
let recv_fut = std::pin::pin!(recv_fut);
|
||||
|
||||
drop(sender);
|
||||
|
||||
let val = recv_fut.await.unwrap();
|
||||
assert_eq!(val, 42);
|
||||
}
|
||||
|
||||
let result = receiver.recv().await;
|
||||
assert!(matches!(result, Err(RecvError::SenderGone)));
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn test_receiver_waits_for_sender_if_no_data() {
|
||||
let (mut sender, mut receiver) = channel();
|
||||
|
||||
let recv_fut = receiver.recv();
|
||||
let mut recv_fut = std::pin::pin!(recv_fut);
|
||||
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(FOREVER) => {},
|
||||
_ = &mut recv_fut => {
|
||||
panic!("recv should not complete");
|
||||
},
|
||||
}
|
||||
|
||||
sender.send(42, |_, _| Ok(())).await.unwrap();
|
||||
|
||||
let val = recv_fut.await.unwrap();
|
||||
assert_eq!(val, 42);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_receiver_gone_while_nodata() {
|
||||
let (mut sender, receiver) = channel();
|
||||
drop(receiver);
|
||||
|
||||
let result = sender.send(42, |_, _| Ok(())).await;
|
||||
assert!(matches!(result, Err(SendError::ReceiverGone)));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_sender_gone_while_nodata() {
|
||||
let (sender, mut receiver) = super::channel::<usize>();
|
||||
drop(sender);
|
||||
|
||||
let result = receiver.recv().await;
|
||||
assert!(matches!(result, Err(RecvError::SenderGone)));
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn test_receiver_drops_after_sender_went_to_sleep() {
|
||||
let (mut sender, receiver) = channel();
|
||||
let state = receiver.state.clone();
|
||||
|
||||
sender.send(23, |_, _| unreachable!()).await.unwrap();
|
||||
|
||||
let send_task = tokio::spawn(async move { sender.send(42, |_, v| Err(v)).await });
|
||||
|
||||
tokio::time::sleep(FOREVER).await;
|
||||
|
||||
assert!(matches!(
|
||||
&*state.value.lock().unwrap(),
|
||||
&State::SenderWaitsForReceiverToConsume(_)
|
||||
));
|
||||
|
||||
drop(receiver);
|
||||
|
||||
let err = send_task
|
||||
.await
|
||||
.unwrap()
|
||||
.expect_err("should unblock immediately");
|
||||
assert!(matches!(err, SendError::ReceiverGone));
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn test_sender_drops_after_receiver_went_to_sleep() {
|
||||
let (sender, mut receiver) = channel::<usize>();
|
||||
let state = sender.state.clone();
|
||||
|
||||
let recv_task = tokio::spawn(async move { receiver.recv().await });
|
||||
|
||||
tokio::time::sleep(FOREVER).await;
|
||||
|
||||
assert!(matches!(&*state.value.lock().unwrap(), &State::NoData));
|
||||
|
||||
drop(sender);
|
||||
|
||||
let err = recv_task.await.unwrap().expect_err("should error");
|
||||
assert!(matches!(err, RecvError::SenderGone));
|
||||
}
|
||||
}
|
||||
@@ -188,11 +188,9 @@ pub struct PageServerConf {
|
||||
/// Optionally disable disk syncs (unsafe!)
|
||||
pub no_sync: bool,
|
||||
|
||||
/// Maximum amount of time for which a get page request request
|
||||
/// might be held up for request merging.
|
||||
pub server_side_batch_timeout: Option<Duration>,
|
||||
|
||||
pub wal_receiver_protocol: PostgresClientProtocol,
|
||||
|
||||
pub page_service_pipelining: pageserver_api::config::PageServicePipeliningConfig,
|
||||
}
|
||||
|
||||
/// Token for authentication to safekeepers
|
||||
@@ -350,10 +348,10 @@ impl PageServerConf {
|
||||
concurrent_tenant_warmup,
|
||||
concurrent_tenant_size_logical_size_queries,
|
||||
virtual_file_io_engine,
|
||||
server_side_batch_timeout,
|
||||
tenant_config,
|
||||
no_sync,
|
||||
wal_receiver_protocol,
|
||||
page_service_pipelining,
|
||||
} = config_toml;
|
||||
|
||||
let mut conf = PageServerConf {
|
||||
@@ -393,11 +391,11 @@ impl PageServerConf {
|
||||
image_compression,
|
||||
timeline_offloading,
|
||||
ephemeral_bytes_per_memory_kb,
|
||||
server_side_batch_timeout,
|
||||
import_pgdata_upcall_api,
|
||||
import_pgdata_upcall_api_token: import_pgdata_upcall_api_token.map(SecretString::from),
|
||||
import_pgdata_aws_endpoint_url,
|
||||
wal_receiver_protocol,
|
||||
page_service_pipelining,
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// fields that require additional validation or custom handling
|
||||
|
||||
@@ -356,6 +356,25 @@ async fn timed<Fut: std::future::Future>(
|
||||
}
|
||||
}
|
||||
|
||||
/// Like [`timed`], but the warning timeout only starts after `cancel` has been cancelled.
|
||||
async fn timed_after_cancellation<Fut: std::future::Future>(
|
||||
fut: Fut,
|
||||
name: &str,
|
||||
warn_at: std::time::Duration,
|
||||
cancel: &CancellationToken,
|
||||
) -> <Fut as std::future::Future>::Output {
|
||||
let mut fut = std::pin::pin!(fut);
|
||||
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
timed(fut, name, warn_at).await
|
||||
}
|
||||
ret = &mut fut => {
|
||||
ret
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod timed_tests {
|
||||
use super::timed;
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -3804,9 +3804,10 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
# shared_buffers = 512kB to make postgres use LFC intensively
|
||||
# neon.max_file_cache_size and neon.file_cache size limit are
|
||||
# set to 1MB because small LFC is better for testing (helps to find more problems)
|
||||
lfc_path_escaped = str(lfc_path).replace("'", "''")
|
||||
config_lines = [
|
||||
"shared_buffers = 512kB",
|
||||
f"neon.file_cache_path = '{self.lfc_path()}'",
|
||||
f"neon.file_cache_path = '{lfc_path_escaped}'",
|
||||
"neon.max_file_cache_size = 1MB",
|
||||
"neon.file_cache_size_limit = 1MB",
|
||||
] + config_lines
|
||||
|
||||
@@ -11,36 +11,95 @@ from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
|
||||
from fixtures.utils import humantime_to_ms
|
||||
|
||||
TARGET_RUNTIME = 60
|
||||
TARGET_RUNTIME = 30
|
||||
|
||||
|
||||
@dataclass
|
||||
class PageServicePipeliningConfig:
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class PageServicePipeliningConfigSerial(PageServicePipeliningConfig):
|
||||
mode: str = "serial"
|
||||
|
||||
|
||||
@dataclass
|
||||
class PageServicePipeliningConfigPipelined(PageServicePipeliningConfig):
|
||||
max_batch_size: int
|
||||
execution: str
|
||||
mode: str = "pipelined"
|
||||
|
||||
|
||||
EXECUTION = ["concurrent-futures", "tasks"]
|
||||
|
||||
NON_BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
|
||||
for max_batch_size in [1, 32]:
|
||||
for execution in EXECUTION:
|
||||
NON_BATCHABLE.append(PageServicePipeliningConfigPipelined(max_batch_size, execution))
|
||||
|
||||
BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
|
||||
for max_batch_size in [1, 2, 4, 8, 16, 32]:
|
||||
for execution in EXECUTION:
|
||||
BATCHABLE.append(PageServicePipeliningConfigPipelined(max_batch_size, execution))
|
||||
|
||||
|
||||
@pytest.mark.skip("See https://github.com/neondatabase/neon/pull/9820#issue-2675856095")
|
||||
@pytest.mark.parametrize(
|
||||
"tablesize_mib, batch_timeout, target_runtime, effective_io_concurrency, readhead_buffer_size, name",
|
||||
"tablesize_mib, pipelining_config, target_runtime, effective_io_concurrency, readhead_buffer_size, name",
|
||||
[
|
||||
# the next 4 cases demonstrate how not-batchable workloads suffer from batching timeout
|
||||
(50, None, TARGET_RUNTIME, 1, 128, "not batchable no batching"),
|
||||
(50, "10us", TARGET_RUNTIME, 1, 128, "not batchable 10us timeout"),
|
||||
(50, "1ms", TARGET_RUNTIME, 1, 128, "not batchable 1ms timeout"),
|
||||
# the next 4 cases demonstrate how batchable workloads benefit from batching
|
||||
(50, None, TARGET_RUNTIME, 100, 128, "batchable no batching"),
|
||||
(50, "10us", TARGET_RUNTIME, 100, 128, "batchable 10us timeout"),
|
||||
(50, "100us", TARGET_RUNTIME, 100, 128, "batchable 100us timeout"),
|
||||
(50, "1ms", TARGET_RUNTIME, 100, 128, "batchable 1ms timeout"),
|
||||
# non-batchable workloads
|
||||
# (A separate benchmark will consider latency).
|
||||
*[
|
||||
(
|
||||
50,
|
||||
config,
|
||||
TARGET_RUNTIME,
|
||||
1,
|
||||
128,
|
||||
f"not batchable {dataclasses.asdict(config)}",
|
||||
)
|
||||
for config in NON_BATCHABLE
|
||||
],
|
||||
# batchable workloads should show throughput and CPU efficiency improvements
|
||||
*[
|
||||
(
|
||||
50,
|
||||
config,
|
||||
TARGET_RUNTIME,
|
||||
100,
|
||||
128,
|
||||
f"batchable {dataclasses.asdict(config)}",
|
||||
)
|
||||
for config in BATCHABLE
|
||||
],
|
||||
],
|
||||
)
|
||||
def test_getpage_merge_smoke(
|
||||
def test_throughput(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
zenbenchmark: NeonBenchmarker,
|
||||
tablesize_mib: int,
|
||||
batch_timeout: str | None,
|
||||
pipelining_config: PageServicePipeliningConfig,
|
||||
target_runtime: int,
|
||||
effective_io_concurrency: int,
|
||||
readhead_buffer_size: int,
|
||||
name: str,
|
||||
):
|
||||
"""
|
||||
Do a bunch of sequential scans and ensure that the pageserver does some merging.
|
||||
Do a bunch of sequential scans with varying compute and pipelining configurations.
|
||||
Primary performance metrics are the achieved batching factor and throughput (wall clock time).
|
||||
Resource utilization is also interesting - we currently measure CPU time.
|
||||
|
||||
The test is a fixed-runtime based type of test (target_runtime).
|
||||
Hence, the results are normalized to the number of iterations completed within target runtime.
|
||||
|
||||
If the compute doesn't provide pipeline depth (effective_io_concurrency=1),
|
||||
performance should be about identical in all configurations.
|
||||
Pipelining can still yield improvements in these scenarios because it parses the
|
||||
next request while the current one is still being executed.
|
||||
|
||||
If the compute provides pipeline depth (effective_io_concurrency=100), then
|
||||
pipelining configs, especially with max_batch_size>1 should yield dramatic improvements
|
||||
in all performance metrics.
|
||||
"""
|
||||
|
||||
#
|
||||
@@ -51,14 +110,16 @@ def test_getpage_merge_smoke(
|
||||
params.update(
|
||||
{
|
||||
"tablesize_mib": (tablesize_mib, {"unit": "MiB"}),
|
||||
"batch_timeout": (
|
||||
-1 if batch_timeout is None else 1e3 * humantime_to_ms(batch_timeout),
|
||||
{"unit": "us"},
|
||||
),
|
||||
# target_runtime is just a polite ask to the workload to run for this long
|
||||
"effective_io_concurrency": (effective_io_concurrency, {}),
|
||||
"readhead_buffer_size": (readhead_buffer_size, {}),
|
||||
# name is not a metric
|
||||
# name is not a metric, we just use it to identify the test easily in the `test_...[...]`` notation
|
||||
}
|
||||
)
|
||||
params.update(
|
||||
{
|
||||
f"pipelining_config.{k}": (v, {})
|
||||
for k, v in dataclasses.asdict(pipelining_config).items()
|
||||
}
|
||||
)
|
||||
|
||||
@@ -170,7 +231,9 @@ def test_getpage_merge_smoke(
|
||||
after = get_metrics()
|
||||
return (after - before).normalize(iters - 1)
|
||||
|
||||
env.pageserver.patch_config_toml_nonrecursive({"server_side_batch_timeout": batch_timeout})
|
||||
env.pageserver.patch_config_toml_nonrecursive(
|
||||
{"page_service_pipelining": dataclasses.asdict(pipelining_config)}
|
||||
)
|
||||
env.pageserver.restart()
|
||||
metrics = workload()
|
||||
|
||||
@@ -199,23 +262,30 @@ def test_getpage_merge_smoke(
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.skip("See https://github.com/neondatabase/neon/pull/9820#issue-2675856095")
|
||||
PRECISION_CONFIGS: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
|
||||
for max_batch_size in [1, 32]:
|
||||
for execution in EXECUTION:
|
||||
PRECISION_CONFIGS.append(PageServicePipeliningConfigPipelined(max_batch_size, execution))
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"batch_timeout", [None, "10us", "20us", "50us", "100us", "200us", "500us", "1ms"]
|
||||
"pipelining_config,name",
|
||||
[(config, f"{dataclasses.asdict(config)}") for config in PRECISION_CONFIGS],
|
||||
)
|
||||
def test_timer_precision(
|
||||
def test_latency(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
zenbenchmark: NeonBenchmarker,
|
||||
pg_bin: PgBin,
|
||||
batch_timeout: str | None,
|
||||
pipelining_config: PageServicePipeliningConfig,
|
||||
name: str,
|
||||
):
|
||||
"""
|
||||
Determine the batching timeout precision (mean latency) and tail latency impact.
|
||||
Measure the latency impact of pipelining in an un-batchable workloads.
|
||||
|
||||
The baseline is `None`; an ideal batching timeout implementation would increase
|
||||
the mean latency by exactly `batch_timeout`.
|
||||
An ideal implementation should not increase average or tail latencies for such workloads.
|
||||
|
||||
That is not the case with the current implementation, will be addressed in future changes.
|
||||
We don't have support in pagebench to create queue depth yet.
|
||||
=> https://github.com/neondatabase/neon/issues/9837
|
||||
"""
|
||||
|
||||
#
|
||||
@@ -223,7 +293,8 @@ def test_timer_precision(
|
||||
#
|
||||
|
||||
def patch_ps_config(ps_config):
|
||||
ps_config["server_side_batch_timeout"] = batch_timeout
|
||||
if pipelining_config is not None:
|
||||
ps_config["page_service_pipelining"] = dataclasses.asdict(pipelining_config)
|
||||
|
||||
neon_env_builder.pageserver_config_override = patch_ps_config
|
||||
|
||||
Reference in New Issue
Block a user