mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 18:02:56 +00:00
remov the "parallel" mode, as we won't ever enable this in practice and benchmarks have shown very limited upside over futures-unordered
This commit is contained in:
@@ -1,4 +1,3 @@
|
||||
|
||||
use futures::{
|
||||
stream::{SplitSink, SplitStream},
|
||||
SinkExt, StreamExt,
|
||||
|
||||
@@ -6556,22 +6556,20 @@ mod tests {
|
||||
// Pick a big LSN such that we query over all the changes.
|
||||
let reads_lsn = Lsn(u64::MAX - 1);
|
||||
|
||||
let io_concurrency_levels = vec![
|
||||
SelectedIoConcurrency::Serial,
|
||||
SelectedIoConcurrency::Parallel,
|
||||
let gate = Gate::default();
|
||||
let io_concurrency_levels: Vec<Box<dyn Fn() -> SelectedIoConcurrency>> = vec![
|
||||
Box::new(|| SelectedIoConcurrency::Serial),
|
||||
Box::new(|| SelectedIoConcurrency::FuturesUnordered(gate.enter().unwrap())),
|
||||
];
|
||||
|
||||
for io_concurrency_level in io_concurrency_levels {
|
||||
for (io_concurrency_level_idx, io_concurrency_level) in
|
||||
io_concurrency_levels.into_iter().enumerate()
|
||||
{
|
||||
for read in reads.clone() {
|
||||
// The type is not Copy() because FuturesUnordered variant is not Copy.
|
||||
let io_concurrency_level = match io_concurrency_level {
|
||||
SelectedIoConcurrency::Serial => SelectedIoConcurrency::Serial,
|
||||
SelectedIoConcurrency::Parallel => SelectedIoConcurrency::Parallel,
|
||||
SelectedIoConcurrency::FuturesUnordered(_) => unreachable!("not used"),
|
||||
};
|
||||
let io_concurrency_level = io_concurrency_level();
|
||||
|
||||
info!(
|
||||
"Doing vectored read on {:?} with IO concurrency {:?}",
|
||||
read, io_concurrency_level
|
||||
"Doing vectored read on {read:?} with IO concurrency {io_concurrency_level_idx:?}",
|
||||
);
|
||||
|
||||
let vectored_res = tline
|
||||
|
||||
@@ -172,7 +172,6 @@ pub(crate) struct ValuesReconstructState {
|
||||
/// we've built enough confidence.
|
||||
pub(crate) enum IoConcurrency {
|
||||
Serial,
|
||||
Parallel,
|
||||
FuturesUnordered {
|
||||
ios_tx: tokio::sync::mpsc::UnboundedSender<IoFuture>,
|
||||
barriers_tx: tokio::sync::mpsc::UnboundedSender<tokio::sync::oneshot::Sender<()>>,
|
||||
@@ -184,7 +183,6 @@ type IoFuture = Pin<Box<dyn Send + Future<Output = ()>>>;
|
||||
|
||||
pub(crate) enum SelectedIoConcurrency {
|
||||
Serial,
|
||||
Parallel,
|
||||
FuturesUnordered(GateGuard),
|
||||
}
|
||||
|
||||
@@ -192,7 +190,6 @@ impl std::fmt::Debug for IoConcurrency {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
IoConcurrency::Serial => write!(f, "Serial"),
|
||||
IoConcurrency::Parallel => write!(f, "Parallel"),
|
||||
IoConcurrency::FuturesUnordered { .. } => write!(f, "FuturesUnordered"),
|
||||
}
|
||||
}
|
||||
@@ -202,7 +199,6 @@ impl std::fmt::Debug for SelectedIoConcurrency {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
SelectedIoConcurrency::Serial => write!(f, "Serial"),
|
||||
SelectedIoConcurrency::Parallel => write!(f, "Parallel"),
|
||||
SelectedIoConcurrency::FuturesUnordered(_) => write!(f, "FuturesUnordered"),
|
||||
}
|
||||
}
|
||||
@@ -220,7 +216,6 @@ impl IoConcurrency {
|
||||
.unwrap_or_else(|_| "serial".to_string())
|
||||
});
|
||||
let selected = match IO_CONCURRENCY.as_str() {
|
||||
"parallel" => SelectedIoConcurrency::Parallel, // TODO: clonable gateguard, pass through Arc<Gate>? ?
|
||||
"serial" => SelectedIoConcurrency::Serial,
|
||||
"futures-unordered" => SelectedIoConcurrency::FuturesUnordered(gate_guard),
|
||||
x => panic!(
|
||||
@@ -234,7 +229,6 @@ impl IoConcurrency {
|
||||
pub(crate) fn spawn(io_concurrency: SelectedIoConcurrency) -> Self {
|
||||
match io_concurrency {
|
||||
SelectedIoConcurrency::Serial => IoConcurrency::Serial,
|
||||
SelectedIoConcurrency::Parallel => IoConcurrency::Parallel,
|
||||
SelectedIoConcurrency::FuturesUnordered(gate_guard) => {
|
||||
let (barriers_tx, barrier_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (ios_tx, ios_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
@@ -405,7 +399,6 @@ impl IoConcurrency {
|
||||
pub(crate) fn clone(&self) -> Self {
|
||||
match self {
|
||||
IoConcurrency::Serial => IoConcurrency::Serial,
|
||||
IoConcurrency::Parallel => IoConcurrency::Parallel,
|
||||
IoConcurrency::FuturesUnordered {
|
||||
ios_tx,
|
||||
barriers_tx,
|
||||
@@ -433,9 +426,6 @@ impl IoConcurrency {
|
||||
tracing::trace!(%io_num, "spawning IO");
|
||||
match self {
|
||||
IoConcurrency::Serial => fut.await,
|
||||
IoConcurrency::Parallel => {
|
||||
tokio::spawn(fut);
|
||||
}
|
||||
IoConcurrency::FuturesUnordered { ios_tx, .. } => {
|
||||
let mut fut = Box::pin(fut);
|
||||
// opportunistic poll to give some boost (unproven if it helps, but sounds like a good idea)
|
||||
|
||||
@@ -3435,7 +3435,6 @@ impl Timeline {
|
||||
trace!("waiting for futures to complete");
|
||||
match &reconstruct_state.io_concurrency {
|
||||
super::storage_layer::IoConcurrency::Serial => (),
|
||||
super::storage_layer::IoConcurrency::Parallel => (),
|
||||
super::storage_layer::IoConcurrency::FuturesUnordered { barriers_tx, .. } => {
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
match barriers_tx.send(tx) {
|
||||
@@ -5779,8 +5778,6 @@ impl Timeline {
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<(Key, Bytes)>> {
|
||||
use super::storage_layer::SelectedIoConcurrency;
|
||||
|
||||
let mut all_data = Vec::new();
|
||||
let guard = self.layers.read().await;
|
||||
for layer in guard.layer_map()?.iter_historic_layers() {
|
||||
|
||||
@@ -15,7 +15,6 @@ pub(super) mod tokio_epoll_uring_ext;
|
||||
use tokio_epoll_uring::IoBuf;
|
||||
use tracing::Instrument;
|
||||
|
||||
|
||||
pub(crate) use super::api::IoEngineKind;
|
||||
#[derive(Clone, Copy)]
|
||||
#[repr(u8)]
|
||||
|
||||
@@ -67,7 +67,7 @@ def test_pageserver_characterize_throughput_with_n_tenants(
|
||||
# which by default uses 64 connections
|
||||
@pytest.mark.parametrize("n_clients", [1])
|
||||
@pytest.mark.parametrize("n_tenants", [1])
|
||||
@pytest.mark.parametrize("io_concurrency", ["serial", "parallel", "futures-unordered"])
|
||||
@pytest.mark.parametrize("io_concurrency", ["serial", "futures-unordered"])
|
||||
@pytest.mark.parametrize("ps_direct_io_mode", ["direct"])
|
||||
@pytest.mark.timeout(2400)
|
||||
@skip_on_ci(
|
||||
|
||||
@@ -32,7 +32,7 @@ class PageServicePipeliningConfigPipelined(PageServicePipeliningConfig):
|
||||
|
||||
|
||||
PS_DIRECT_IO = ["direct"]
|
||||
PS_IO_CONCURRENCY = ["serial", "parallel", "futures-unordered"]
|
||||
PS_IO_CONCURRENCY = ["serial", "futures-unordered"]
|
||||
EXECUTION = ["concurrent-futures"]
|
||||
|
||||
NON_BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
|
||||
|
||||
Reference in New Issue
Block a user