abandon execution='tasks' since it's impossible without Arc<RequestContext>

This commit is contained in:
Christian Schwarz
2024-11-29 19:29:19 +01:00
parent bf162f92d1
commit e95d5effac
3 changed files with 6 additions and 35 deletions

View File

@@ -148,14 +148,6 @@ pub enum PageServicePipeliningConfig {
pub struct PageServicePipeliningConfigPipelined {
/// Causes runtime errors if larger than max get_vectored batch size.
pub max_batch_size: NonZeroUsize,
pub execution: PageServiceProtocolPipelinedExecutionStrategy,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum PageServiceProtocolPipelinedExecutionStrategy {
ConcurrentFutures,
Tasks,
}
pub mod statvfs {
@@ -445,7 +437,6 @@ impl Default for ConfigToml {
page_service_pipelining: PageServicePipeliningConfig::Pipelined(
PageServicePipeliningConfigPipelined {
max_batch_size: NonZeroUsize::new(32).unwrap(),
execution: PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures,
},
),
}

View File

@@ -7,10 +7,7 @@ use bytes::Buf;
use futures::FutureExt;
use itertools::Itertools;
use once_cell::sync::OnceCell;
use pageserver_api::config::{
PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
PageServiceProtocolPipelinedExecutionStrategy,
};
use pageserver_api::config::{PageServicePipeliningConfig, PageServicePipeliningConfigPipelined};
use pageserver_api::models::{self, TenantState};
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
@@ -1259,10 +1256,7 @@ impl PageServerHandler {
// the batch that was in flight when the Batcher encountered an error,
// thereby beahving identical to a serial implementation.
let PageServicePipeliningConfigPipelined {
max_batch_size,
execution,
} = pipelining_config;
let PageServicePipeliningConfigPipelined { max_batch_size } = pipelining_config;
// Macro to _define_ a pipeline stage.
macro_rules! pipeline_stage {
@@ -1349,15 +1343,7 @@ impl PageServerHandler {
//
// Execute the stages.
//
match execution {
PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures => {
tokio::join!(read_messages, executor)
}
PageServiceProtocolPipelinedExecutionStrategy::Tasks => {
todo!()
}
}
tokio::join!(read_messages, executor)
}
/// Helper function to handle the LSN from client request.

View File

@@ -27,21 +27,16 @@ class PageServicePipeliningConfigSerial(PageServicePipeliningConfig):
@dataclass
class PageServicePipeliningConfigPipelined(PageServicePipeliningConfig):
max_batch_size: int
execution: str
mode: str = "pipelined"
EXECUTION = ["concurrent-futures", "tasks"]
NON_BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
for max_batch_size in [1, 32]:
for execution in EXECUTION:
NON_BATCHABLE.append(PageServicePipeliningConfigPipelined(max_batch_size, execution))
NON_BATCHABLE.append(PageServicePipeliningConfigPipelined(max_batch_size))
BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
for max_batch_size in [1, 2, 4, 8, 16, 32]:
for execution in EXECUTION:
BATCHABLE.append(PageServicePipeliningConfigPipelined(max_batch_size, execution))
BATCHABLE.append(PageServicePipeliningConfigPipelined(max_batch_size))
@pytest.mark.parametrize(
@@ -264,8 +259,7 @@ def test_throughput(
PRECISION_CONFIGS: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
for max_batch_size in [1, 32]:
for execution in EXECUTION:
PRECISION_CONFIGS.append(PageServicePipeliningConfigPipelined(max_batch_size, execution))
PRECISION_CONFIGS.append(PageServicePipeliningConfigPipelined(max_batch_size))
@pytest.mark.parametrize(