task-based mode

This commit is contained in:
Christian Schwarz
2024-11-22 08:56:30 +01:00
parent a3d1cf636b
commit c1040bc25d
3 changed files with 154 additions and 135 deletions

View File

@@ -129,7 +129,8 @@ pub struct DiskUsageEvictionTaskConfig {
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(deny_unknown_fields)]
pub struct PageServicePipeliningConfig {
pub max_batch_size: usize,
// Causes runtime errors if larger than max get_vectored batch size.
pub max_batch_size: NonZeroUsize,
}
pub mod statvfs {

View File

@@ -25,6 +25,7 @@ use pq_proto::FeStartupPacket;
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
use std::borrow::Cow;
use std::io;
use std::num::NonZeroUsize;
use std::str;
use std::str::FromStr;
use std::sync::Arc;
@@ -757,6 +758,7 @@ impl PageServerHandler {
/// Post-condition: `maybe_carry` is Some()
#[instrument(skip_all, level = tracing::Level::TRACE)]
fn pagestream_do_batch(
max_batch_size: NonZeroUsize,
maybe_carry: &mut Option<Box<BatchedFeMessage>>,
this_msg: Box<BatchedFeMessage>,
) -> Option<Box<BatchedFeMessage>> {
@@ -785,9 +787,9 @@ impl PageServerHandler {
},
) if (|| {
assert_eq!(this_pages.len(), 1);
if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize {
trace!(%accum_lsn, %this_lsn, "stopping batching because of batch size");
assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize);
if accum_pages.len() >= max_batch_size.get() {
trace!(%accum_lsn, %this_lsn, %max_batch_size, "stopping batching because of batch size");
assert_eq!(accum_pages.len(), max_batch_size.get());
return false;
}
if (accum_shard.tenant_shard_id, accum_shard.timeline_id)
@@ -1008,46 +1010,44 @@ impl PageServerHandler {
let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1);
let request_span = info_span!("request", shard_id = tracing::field::Empty);
let read_message_task: JoinHandle<Result<_, QueryError>> = tokio::spawn(
{
let cancel = self.cancel.child_token();
let ctx = ctx.attached_child();
async move {
scopeguard::defer! {
debug!("exiting");
}
let mut pgb_reader = pgb_reader;
loop {
let msg = Self::pagestream_read_message(
&mut pgb_reader,
tenant_id,
timeline_id,
&mut timeline_handles,
&cancel,
&ctx,
request_span.clone(),
)
.await?;
let msg = match msg {
Some(msg) => msg,
None => {
debug!("pagestream subprotocol end observed");
break;
}
};
match requests_tx.send(msg).await {
Ok(()) => {}
Err(tokio::sync::mpsc::error::SendError(_)) => {
debug!("downstream is gone");
break;
}
let read_messages = {
let cancel = self.cancel.child_token();
let ctx = ctx.attached_child();
async move {
scopeguard::defer! {
debug!("exiting");
}
let mut pgb_reader = pgb_reader;
loop {
let msg = Self::pagestream_read_message(
&mut pgb_reader,
tenant_id,
timeline_id,
&mut timeline_handles,
&cancel,
&ctx,
request_span.clone(),
)
.await?;
let msg = match msg {
Some(msg) => msg,
None => {
debug!("pagestream subprotocol end observed");
break;
}
};
match requests_tx.send(msg).await {
Ok(()) => {}
Err(tokio::sync::mpsc::error::SendError(_)) => {
debug!("downstream is gone");
break;
}
}
Ok((pgb_reader, timeline_handles))
}
Ok((pgb_reader, timeline_handles))
}
.instrument(tracing::info_span!("read_protocol")),
);
}
.instrument(tracing::info_span!("read_messages"));
enum BatchState {
Building(Option<Box<BatchedFeMessage>>),
@@ -1065,89 +1065,106 @@ impl PageServerHandler {
std::sync::Mutex::new(BatchState::Building(None)),
));
let notify_batcher = Arc::new(tokio::sync::Notify::new());
tokio::spawn(
{
let notify_batcher = notify_batcher.clone();
async move {
scopeguard::defer! {
debug!("exiting");
}
loop {
let maybe_req = requests_rx.recv().await;
let Some(req) = maybe_req else {
batch_tx.send_modify(|pending_batch| {
let mut guard = pending_batch.lock().unwrap();
match &mut *guard {
BatchState::Building(carry) => {
*guard = BatchState::UpstreamDead(carry.take());
}
BatchState::UpstreamDead(_) => panic!("twice"),
let batcher = {
let notify_batcher = notify_batcher.clone();
let max_batch_size = self
.pipelining_config
.as_ref()
.map(|PageServicePipeliningConfig { max_batch_size, .. }| *max_batch_size)
.unwrap_or(NonZeroUsize::new(1).unwrap());
async move {
scopeguard::defer! {
debug!("exiting");
}
loop {
let maybe_req = requests_rx.recv().await;
let Some(req) = maybe_req else {
batch_tx.send_modify(|pending_batch| {
let mut guard = pending_batch.lock().unwrap();
match &mut *guard {
BatchState::Building(carry) => {
*guard = BatchState::UpstreamDead(carry.take());
}
});
break;
};
// don't read new requests before this one has been processed
let mut req = Some(req);
loop {
let mut wait_notified = None;
let batched = batch_tx.send_if_modified(|pending_batch| {
let mut guard = pending_batch.lock().unwrap();
let building = guard.must_building_mut();
match Self::pagestream_do_batch(building, req.take().unwrap()) {
Some(req_was_not_batched) => {
req.replace(req_was_not_batched);
wait_notified = Some(notify_batcher.notified());
false
}
None => true,
}
});
if batched {
break;
} else {
wait_notified.unwrap().await;
BatchState::UpstreamDead(_) => panic!("twice"),
}
});
break;
};
// don't read new requests before this one has been processed
let mut req = Some(req);
loop {
let mut wait_notified = None;
let batched = batch_tx.send_if_modified(|pending_batch| {
let mut guard = pending_batch.lock().unwrap();
let building = guard.must_building_mut();
match Self::pagestream_do_batch(
max_batch_size,
building,
req.take().unwrap(),
) {
Some(req_was_not_batched) => {
req.replace(req_was_not_batched);
wait_notified = Some(notify_batcher.notified());
false
}
None => true,
}
});
if batched {
break;
} else {
wait_notified.unwrap().await;
}
}
}
}
.instrument(tracing::info_span!("batching")),
);
let mut stop = false;
while !stop {
match batch_rx.changed().await {
Ok(()) => {}
Err(_) => {
debug!("batch_rx observed disconnection of batcher");
}
};
let maybe_batch = {
let borrow = batch_rx.borrow();
let mut guard = borrow.lock().unwrap();
match &mut *guard {
BatchState::Building(maybe_batch) => maybe_batch.take(),
BatchState::UpstreamDead(maybe_batch) => {
debug!("upstream dead");
stop = true;
maybe_batch.take()
}
}
};
let Some(batch) = maybe_batch else {
break;
};
notify_batcher.notify_one();
debug!("processing batch");
self.pagesteam_handle_batched_message(pgb, *batch, &ctx)
.await?;
}
.instrument(tracing::info_span!("batcher"));
let (pgb_reader, timeline_handles) = read_message_task
.await
.context("read message task panicked")?
// if the client made a protocol error, this is where we bubble up the QueryError
?;
let executor = async {
let mut stop = false;
while !stop {
match batch_rx.changed().await {
Ok(()) => {}
Err(_) => {
debug!("batch_rx observed disconnection of batcher");
}
};
let maybe_batch = {
let borrow = batch_rx.borrow();
let mut guard = borrow.lock().unwrap();
match &mut *guard {
BatchState::Building(maybe_batch) => maybe_batch.take(),
BatchState::UpstreamDead(maybe_batch) => {
debug!("upstream dead");
stop = true;
maybe_batch.take()
}
}
};
let Some(batch) = maybe_batch else {
break;
};
notify_batcher.notify_one();
debug!("processing batch");
self.pagesteam_handle_batched_message(pgb, *batch, &ctx)
.await?;
}
Ok(())
};
let (read_message_task_res, _, executor_res): (_, (), _) =
tokio::join!(read_messages, batcher, executor);
let (pgb_reader, timeline_handles) = match (read_message_task_res, executor_res) {
(_, Err(e)) => {
return Err(e);
}
(Err(e), _) => {
return Err(e);
}
(Ok((pgb_reader, timeline_handles)), Ok(())) => (pgb_reader, timeline_handles),
};
debug!("pagestream subprotocol shut down cleanly");

View File

@@ -13,28 +13,24 @@ from fixtures.utils import humantime_to_ms
TARGET_RUNTIME = 5
MAX_BATCH_SIZES = [None, 1, 2, 4, 8, 16, 32]
@pytest.mark.parametrize(
"tablesize_mib, batch_timeout, target_runtime, effective_io_concurrency, readhead_buffer_size, name",
"tablesize_mib, max_batch_size, target_runtime, effective_io_concurrency, readhead_buffer_size, name",
[
# the next 4 cases demonstrate how not-batchable workloads suffer from batching timeout
(50, None, TARGET_RUNTIME, 1, 128, "not batchable no batching"),
(50, "10us", TARGET_RUNTIME, 1, 128, "not batchable 10us timeout"),
(50, "20us", TARGET_RUNTIME, 1, 128, "not batchable 20us timeout"),
(50, "1ms", TARGET_RUNTIME, 1, 128, "not batchable 1ms timeout"),
# the next 4 cases demonstrate how batchable workloads benefit from batching
(50, None, TARGET_RUNTIME, 100, 128, "batchable no batching"),
(50, "10us", TARGET_RUNTIME, 100, 128, "batchable 10us timeout"),
(50, "20us", TARGET_RUNTIME, 100, 128, "batchable 20us timeout"),
(50, "100us", TARGET_RUNTIME, 100, 128, "batchable 100us timeout"),
(50, "1ms", TARGET_RUNTIME, 100, 128, "batchable 1ms timeout"),
*[
(50, n, TARGET_RUNTIME, 1, 128, f"not batchable max batch size {n}") for n in MAX_BATCH_SIZES
],
*[
(50, n, TARGET_RUNTIME, 100, 128, f"batchable max batch size {n}") for n in MAX_BATCH_SIZES
]
],
)
def test_getpage_merge_smoke(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
tablesize_mib: int,
batch_timeout: Optional[str],
max_batch_size: Optional[int],
target_runtime: int,
effective_io_concurrency: int,
readhead_buffer_size: int,
@@ -52,9 +48,9 @@ def test_getpage_merge_smoke(
params.update(
{
"tablesize_mib": (tablesize_mib, {"unit": "MiB"}),
"batch_timeout": (
-1 if batch_timeout is None else 1e3 * humantime_to_ms(batch_timeout),
{"unit": "us"},
"max_batch_size": (
-1 if max_batch_size is None else max_batch_size,
{},
),
# target_runtime is just a polite ask to the workload to run for this long
"effective_io_concurrency": (effective_io_concurrency, {}),
@@ -171,7 +167,9 @@ def test_getpage_merge_smoke(
after = get_metrics()
return (after - before).normalize(iters - 1)
env.pageserver.patch_config_toml_nonrecursive({"server_side_batch_timeout": batch_timeout})
env.pageserver.patch_config_toml_nonrecursive({"page_service_pipelining": {
"max_batch_size": max_batch_size,
}} if max_batch_size is not None else {})
env.pageserver.restart()
metrics = workload()
@@ -201,13 +199,13 @@ def test_getpage_merge_smoke(
@pytest.mark.parametrize(
"batch_timeout", [None, "10us", "20us", "50us", "100us", "200us", "500us", "1ms"]
"max_batch_size", [None, 1, 32]
)
def test_timer_precision(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
pg_bin: PgBin,
batch_timeout: Optional[str],
max_batch_size: Optional[int],
):
"""
Determine the batching timeout precision (mean latency) and tail latency impact.
@@ -223,7 +221,10 @@ def test_timer_precision(
#
def patch_ps_config(ps_config):
ps_config["server_side_batch_timeout"] = batch_timeout
if max_batch_size is not None:
ps_config["page_service_pipelining"] = {
"max_batch_size": max_batch_size,
}
neon_env_builder.pageserver_config_override = patch_ps_config