diff --git a/pageserver/client/src/page_service.rs b/pageserver/client/src/page_service.rs index 5d3783bed0..6d6e0e1559 100644 --- a/pageserver/client/src/page_service.rs +++ b/pageserver/client/src/page_service.rs @@ -1,4 +1,3 @@ - use futures::{ stream::{SplitSink, SplitStream}, SinkExt, StreamExt, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 2555c43ae1..3669172d3a 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -6556,22 +6556,20 @@ mod tests { // Pick a big LSN such that we query over all the changes. let reads_lsn = Lsn(u64::MAX - 1); - let io_concurrency_levels = vec![ - SelectedIoConcurrency::Serial, - SelectedIoConcurrency::Parallel, + let gate = Gate::default(); + let io_concurrency_levels: Vec SelectedIoConcurrency>> = vec![ + Box::new(|| SelectedIoConcurrency::Serial), + Box::new(|| SelectedIoConcurrency::FuturesUnordered(gate.enter().unwrap())), ]; - for io_concurrency_level in io_concurrency_levels { + for (io_concurrency_level_idx, io_concurrency_level) in + io_concurrency_levels.into_iter().enumerate() + { for read in reads.clone() { - // The type is not Copy() because FuturesUnordered variant is not Copy. - let io_concurrency_level = match io_concurrency_level { - SelectedIoConcurrency::Serial => SelectedIoConcurrency::Serial, - SelectedIoConcurrency::Parallel => SelectedIoConcurrency::Parallel, - SelectedIoConcurrency::FuturesUnordered(_) => unreachable!("not used"), - }; + let io_concurrency_level = io_concurrency_level(); + info!( - "Doing vectored read on {:?} with IO concurrency {:?}", - read, io_concurrency_level + "Doing vectored read on {read:?} with IO concurrency {io_concurrency_level_idx:?}", ); let vectored_res = tline diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 77bef5976e..ab252df573 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -172,7 +172,6 @@ pub(crate) struct ValuesReconstructState { /// we've built enough confidence. pub(crate) enum IoConcurrency { Serial, - Parallel, FuturesUnordered { ios_tx: tokio::sync::mpsc::UnboundedSender, barriers_tx: tokio::sync::mpsc::UnboundedSender>, @@ -184,7 +183,6 @@ type IoFuture = Pin>>; pub(crate) enum SelectedIoConcurrency { Serial, - Parallel, FuturesUnordered(GateGuard), } @@ -192,7 +190,6 @@ impl std::fmt::Debug for IoConcurrency { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { IoConcurrency::Serial => write!(f, "Serial"), - IoConcurrency::Parallel => write!(f, "Parallel"), IoConcurrency::FuturesUnordered { .. } => write!(f, "FuturesUnordered"), } } @@ -202,7 +199,6 @@ impl std::fmt::Debug for SelectedIoConcurrency { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { SelectedIoConcurrency::Serial => write!(f, "Serial"), - SelectedIoConcurrency::Parallel => write!(f, "Parallel"), SelectedIoConcurrency::FuturesUnordered(_) => write!(f, "FuturesUnordered"), } } @@ -220,7 +216,6 @@ impl IoConcurrency { .unwrap_or_else(|_| "serial".to_string()) }); let selected = match IO_CONCURRENCY.as_str() { - "parallel" => SelectedIoConcurrency::Parallel, // TODO: clonable gateguard, pass through Arc? ? "serial" => SelectedIoConcurrency::Serial, "futures-unordered" => SelectedIoConcurrency::FuturesUnordered(gate_guard), x => panic!( @@ -234,7 +229,6 @@ impl IoConcurrency { pub(crate) fn spawn(io_concurrency: SelectedIoConcurrency) -> Self { match io_concurrency { SelectedIoConcurrency::Serial => IoConcurrency::Serial, - SelectedIoConcurrency::Parallel => IoConcurrency::Parallel, SelectedIoConcurrency::FuturesUnordered(gate_guard) => { let (barriers_tx, barrier_rx) = tokio::sync::mpsc::unbounded_channel(); let (ios_tx, ios_rx) = tokio::sync::mpsc::unbounded_channel(); @@ -405,7 +399,6 @@ impl IoConcurrency { pub(crate) fn clone(&self) -> Self { match self { IoConcurrency::Serial => IoConcurrency::Serial, - IoConcurrency::Parallel => IoConcurrency::Parallel, IoConcurrency::FuturesUnordered { ios_tx, barriers_tx, @@ -433,9 +426,6 @@ impl IoConcurrency { tracing::trace!(%io_num, "spawning IO"); match self { IoConcurrency::Serial => fut.await, - IoConcurrency::Parallel => { - tokio::spawn(fut); - } IoConcurrency::FuturesUnordered { ios_tx, .. } => { let mut fut = Box::pin(fut); // opportunistic poll to give some boost (unproven if it helps, but sounds like a good idea) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 60a77fec14..67ea9d4f22 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3435,7 +3435,6 @@ impl Timeline { trace!("waiting for futures to complete"); match &reconstruct_state.io_concurrency { super::storage_layer::IoConcurrency::Serial => (), - super::storage_layer::IoConcurrency::Parallel => (), super::storage_layer::IoConcurrency::FuturesUnordered { barriers_tx, .. } => { let (tx, rx) = tokio::sync::oneshot::channel(); match barriers_tx.send(tx) { @@ -5779,8 +5778,6 @@ impl Timeline { lsn: Lsn, ctx: &RequestContext, ) -> anyhow::Result> { - use super::storage_layer::SelectedIoConcurrency; - let mut all_data = Vec::new(); let guard = self.layers.read().await; for layer in guard.layer_map()?.iter_historic_layers() { diff --git a/pageserver/src/virtual_file/io_engine.rs b/pageserver/src/virtual_file/io_engine.rs index 7a77817b18..75a2675dc5 100644 --- a/pageserver/src/virtual_file/io_engine.rs +++ b/pageserver/src/virtual_file/io_engine.rs @@ -15,7 +15,6 @@ pub(super) mod tokio_epoll_uring_ext; use tokio_epoll_uring::IoBuf; use tracing::Instrument; - pub(crate) use super::api::IoEngineKind; #[derive(Clone, Copy)] #[repr(u8)] diff --git a/test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py b/test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py index cb132b7ad9..24d8384520 100644 --- a/test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py +++ b/test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py @@ -67,7 +67,7 @@ def test_pageserver_characterize_throughput_with_n_tenants( # which by default uses 64 connections @pytest.mark.parametrize("n_clients", [1]) @pytest.mark.parametrize("n_tenants", [1]) -@pytest.mark.parametrize("io_concurrency", ["serial", "parallel", "futures-unordered"]) +@pytest.mark.parametrize("io_concurrency", ["serial", "futures-unordered"]) @pytest.mark.parametrize("ps_direct_io_mode", ["direct"]) @pytest.mark.timeout(2400) @skip_on_ci( diff --git a/test_runner/performance/pageserver/test_page_service_batching.py b/test_runner/performance/pageserver/test_page_service_batching.py index a2a4015cbf..49ee939790 100644 --- a/test_runner/performance/pageserver/test_page_service_batching.py +++ b/test_runner/performance/pageserver/test_page_service_batching.py @@ -32,7 +32,7 @@ class PageServicePipeliningConfigPipelined(PageServicePipeliningConfig): PS_DIRECT_IO = ["direct"] -PS_IO_CONCURRENCY = ["serial", "parallel", "futures-unordered"] +PS_IO_CONCURRENCY = ["serial", "futures-unordered"] EXECUTION = ["concurrent-futures"] NON_BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]