reintroduce task-based execution

This commit is contained in:
Christian Schwarz
2024-11-28 20:50:06 +01:00
parent 6bd39f95f5
commit a2a3613185
3 changed files with 44 additions and 6 deletions

View File

@@ -131,6 +131,14 @@ pub struct DiskUsageEvictionTaskConfig {
pub struct PageServicePipeliningConfig {
/// Causes runtime errors if larger than max get_vectored batch size.
pub max_batch_size: NonZeroUsize,
pub protocol_pipelining_mode: PageServiceProtocolPipeliningMode,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum PageServiceProtocolPipeliningMode {
ConcurrentFutures,
Tasks,
}
pub mod statvfs {
@@ -409,6 +417,7 @@ impl Default for ConfigToml {
no_sync: None,
page_service_pipelining: Some(PageServicePipeliningConfig {
max_batch_size: NonZeroUsize::new(32).unwrap(),
protocol_pipelining_mode: PageServiceProtocolPipeliningMode::ConcurrentFutures,
}),
}
}

View File

@@ -7,7 +7,7 @@ use bytes::Buf;
use futures::FutureExt;
use itertools::Itertools;
use once_cell::sync::OnceCell;
use pageserver_api::config::PageServicePipeliningConfig;
use pageserver_api::config::{PageServicePipeliningConfig, PageServiceProtocolPipeliningMode};
use pageserver_api::models::{self, TenantState};
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
@@ -1166,7 +1166,10 @@ impl PageServerHandler {
// the `handle_*` function will fail with an error that bubbles up and results in
// the Executor stage exiting with Err(QueryError::Shutdown).
let PageServicePipeliningConfig { max_batch_size } = pipelining_config;
let PageServicePipeliningConfig {
max_batch_size,
protocol_pipelining_mode,
} = pipelining_config;
// Cancellation root for the pipeline.
// If any one stage exits, this gets cancelled.
@@ -1314,8 +1317,26 @@ impl PageServerHandler {
//
let read_messages_res;
let _batcher_res: ();
let executor_res: Result<(), QueryError>;
(read_messages_res, (), executor_res) = tokio::join!(read_messages, batcher, executor);
match protocol_pipelining_mode {
PageServiceProtocolPipeliningMode::ConcurrentFutures => {
(read_messages_res, _batcher_res, executor_res) =
tokio::join!(read_messages, batcher, executor);
}
PageServiceProtocolPipeliningMode::Tasks => {
// These tasks are not tracked anywhere.
let read_messages_task = tokio::spawn(read_messages);
let batcher_task = tokio::spawn(batcher);
let (read_messages_task_res, batcher_task_res, executor_res_) =
tokio::join!(read_messages_task, batcher_task, executor,);
(read_messages_res, _batcher_res, executor_res) = (
read_messages_task_res.expect("propagated panic from read_messages"),
batcher_task_res.expect("propagated panic from batcher"),
executor_res_,
);
}
}
(read_messages_res, executor_res)
}

View File

@@ -17,15 +17,20 @@ TARGET_RUNTIME = 30
@dataclass
class PageServicePipeliningConfig:
max_batch_size: int
protocol_pipelining_mode: str
PROTOCOL_PIPELINING_MODES = ["concurrent-futures", "tasks"]
NON_BATCHABLE: list[Optional[PageServicePipeliningConfig]] = [None]
for max_batch_size in [1, 32]:
NON_BATCHABLE.append(PageServicePipeliningConfig(max_batch_size))
for protocol_pipelining_mode in PROTOCOL_PIPELINING_MODES:
NON_BATCHABLE.append(PageServicePipeliningConfig(max_batch_size, protocol_pipelining_mode))
BATCHABLE: list[Optional[PageServicePipeliningConfig]] = [None]
for max_batch_size in [1, 2, 4, 8, 16, 32]:
BATCHABLE.append(PageServicePipeliningConfig(max_batch_size))
for protocol_pipelining_mode in PROTOCOL_PIPELINING_MODES:
BATCHABLE.append(PageServicePipeliningConfig(max_batch_size, protocol_pipelining_mode))
@pytest.mark.parametrize(
@@ -252,7 +257,10 @@ def test_throughput(
PRECISION_CONFIGS: list[Optional[PageServicePipeliningConfig]] = [None]
for max_batch_size in [1, 32]:
PRECISION_CONFIGS.append(PageServicePipeliningConfig(max_batch_size))
for protocol_pipelining_mode in PROTOCOL_PIPELINING_MODES:
PRECISION_CONFIGS.append(
PageServicePipeliningConfig(max_batch_size, protocol_pipelining_mode)
)
@pytest.mark.parametrize(