From 8e6c01ddea4f74a09c10722a53bf84b3d076ac92 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 17 Dec 2024 19:04:50 +0100 Subject: [PATCH] WIP fix: one task per connections to drive all the IO futures --- Cargo.toml | 2 +- pageserver/src/basebackup.rs | 13 +- pageserver/src/page_service.rs | 55 +++- pageserver/src/pgdatadir_mapping.rs | 8 +- pageserver/src/tenant.rs | 24 +- pageserver/src/tenant/storage_layer.rs | 308 +++++++++++++++--- .../src/tenant/storage_layer/layer/tests.rs | 6 +- pageserver/src/tenant/timeline.rs | 51 ++- 8 files changed, 383 insertions(+), 84 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0654c25a3d..6ff584d053 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -175,7 +175,7 @@ thiserror = "1.0" tikv-jemallocator = { version = "0.6", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } tikv-jemalloc-ctl = { version = "0.6", features = ["stats"] } tokio = { version = "1.17", features = ["macros"] } -tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" } +tokio-epoll-uring = {path = "../tokio-epoll-uring/tokio-epoll-uring" } tokio-io-timeout = "1.2.0" tokio-postgres-rustls = "0.12.0" tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"]} diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index cae0ffb980..2e50a59e45 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -25,6 +25,7 @@ use tokio_tar::{Builder, EntryType, Header}; use crate::context::RequestContext; use crate::pgdatadir_mapping::Version; +use crate::tenant::storage_layer::IoConcurrency; use crate::tenant::Timeline; use pageserver_api::reltag::{RelTag, SlruKind}; @@ -303,7 +304,17 @@ where for part in slru_partitions.parts { let blocks = self .timeline - .get_vectored(part, self.lsn, self.ctx) + .get_vectored( + part, + self.lsn, + IoConcurrency::spawn_from_env( + self.timeline + .gate + .enter() + .map_err(|e| BasebackupError::Server(e.into()))?, + ), + self.ctx, + ) .await .map_err(|e| BasebackupError::Server(e.into()))?; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 928da3852d..03f2beac8c 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -39,6 +39,7 @@ use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tracing::*; +use utils::sync::gate::Gate; use utils::sync::spsc_fold; use utils::{ auth::{Claims, Scope, SwappableJwtAuth}, @@ -61,6 +62,7 @@ use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME}; use crate::tenant::mgr::ShardSelector; use crate::tenant::mgr::TenantManager; use crate::tenant::mgr::{GetActiveTenantError, GetTenantError, ShardResolveResult}; +use crate::tenant::storage_layer::IoConcurrency; use crate::tenant::timeline::{self, WaitLsnError}; use crate::tenant::GetTimelineError; use crate::tenant::PageReconstructError; @@ -89,6 +91,7 @@ pub struct Listener { pub struct Connections { cancel: CancellationToken, tasks: tokio::task::JoinSet, + gate: Arc, } pub fn spawn( @@ -98,6 +101,7 @@ pub fn spawn( tcp_listener: tokio::net::TcpListener, ) -> Listener { let cancel = CancellationToken::new(); + let gate = Arc::new(Gate::default()); let libpq_ctx = RequestContext::todo_child( TaskKind::LibpqEndpointListener, // listener task shouldn't need to download anything. (We will @@ -116,6 +120,7 @@ pub fn spawn( conf.page_service_pipelining.clone(), libpq_ctx, cancel.clone(), + gate, ) .map(anyhow::Ok), )); @@ -133,11 +138,16 @@ impl Listener { } impl Connections { pub(crate) async fn shutdown(self) { - let Self { cancel, mut tasks } = self; + let Self { + cancel, + mut tasks, + gate, + } = self; cancel.cancel(); while let Some(res) = tasks.join_next().await { Self::handle_connection_completion(res); } + gate.close().await; } fn handle_connection_completion(res: Result, tokio::task::JoinError>) { @@ -165,6 +175,7 @@ pub async fn libpq_listener_main( pipelining_config: PageServicePipeliningConfig, listener_ctx: RequestContext, listener_cancel: CancellationToken, + gate: Arc, ) -> Connections { let connections_cancel = CancellationToken::new(); let mut connection_handler_tasks = tokio::task::JoinSet::default(); @@ -196,6 +207,7 @@ pub async fn libpq_listener_main( pipelining_config.clone(), connection_ctx, connections_cancel.child_token(), + Arc::clone(&gate), )); } Err(err) => { @@ -210,6 +222,7 @@ pub async fn libpq_listener_main( Connections { cancel: connections_cancel, tasks: connection_handler_tasks, + gate, } } @@ -224,6 +237,7 @@ async fn page_service_conn_main( pipelining_config: PageServicePipeliningConfig, connection_ctx: RequestContext, cancel: CancellationToken, + gate: Arc, ) -> ConnectionHandlerResult { let _guard = LIVE_CONNECTIONS .with_label_values(&["page_service"]) @@ -278,6 +292,7 @@ async fn page_service_conn_main( pipelining_config, connection_ctx, cancel.clone(), + gate, ); let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?; @@ -324,6 +339,8 @@ struct PageServerHandler { timeline_handles: Option, pipelining_config: PageServicePipeliningConfig, + + gate: Arc, } struct TimelineHandles { @@ -616,6 +633,7 @@ impl PageServerHandler { pipelining_config: PageServicePipeliningConfig, connection_ctx: RequestContext, cancel: CancellationToken, + gate: Arc, ) -> Self { PageServerHandler { auth, @@ -624,6 +642,7 @@ impl PageServerHandler { timeline_handles: Some(TimelineHandles::new(tenant_manager)), cancel, pipelining_config, + gate, } } @@ -906,6 +925,7 @@ impl PageServerHandler { &mut self, pgb_writer: &mut PostgresBackend, batch: BatchedFeMessage, + io_concurrency: IoConcurrency, cancel: &CancellationToken, ctx: &RequestContext, ) -> Result<(), QueryError> @@ -965,6 +985,7 @@ impl PageServerHandler { &shard, effective_request_lsn, pages, + io_concurrency, ctx, ) .instrument(span.clone()) @@ -1140,6 +1161,14 @@ impl PageServerHandler { } } + let io_concurrency = IoConcurrency::spawn_from_env(match self.gate.enter() { + Ok(guard) => guard, + Err(_) => { + info!("shutdown request received in page handler"); + return Err(QueryError::Shutdown); + } + }); + let pgb_reader = pgb .split() .context("implementation error: split pgb into reader and writer")?; @@ -1160,6 +1189,7 @@ impl PageServerHandler { timeline_handles, request_span, pipelining_config, + io_concurrency, &ctx, ) .await @@ -1172,6 +1202,7 @@ impl PageServerHandler { timeline_id, timeline_handles, request_span, + io_concurrency, &ctx, ) .await @@ -1198,6 +1229,7 @@ impl PageServerHandler { timeline_id: TimelineId, mut timeline_handles: TimelineHandles, request_span: Span, + io_concurrency: IoConcurrency, ctx: &RequestContext, ) -> ( (PostgresBackendReader, TimelineHandles), @@ -1238,7 +1270,13 @@ impl PageServerHandler { trace!("handling message"); let err = self - .pagesteam_handle_batched_message(pgb_writer, msg, &cancel, ctx) + .pagesteam_handle_batched_message( + pgb_writer, + msg, + io_concurrency.clone(), + &cancel, + ctx, + ) .await; match err { Ok(()) => {} @@ -1262,6 +1300,7 @@ impl PageServerHandler { mut timeline_handles: TimelineHandles, request_span: Span, pipelining_config: PageServicePipeliningConfigPipelined, + io_concurrency: IoConcurrency, ctx: &RequestContext, ) -> ( (PostgresBackendReader, TimelineHandles), @@ -1402,8 +1441,14 @@ impl PageServerHandler { } }; batch.throttle(&self.cancel).await?; - self.pagesteam_handle_batched_message(pgb_writer, batch, &cancel, &ctx) - .await?; + self.pagesteam_handle_batched_message( + pgb_writer, + batch, + io_concurrency.clone(), + &cancel, + &ctx, + ) + .await?; } } }); @@ -1652,6 +1697,7 @@ impl PageServerHandler { timeline: &Timeline, effective_lsn: Lsn, requests: smallvec::SmallVec<[(RelTag, BlockNumber, SmgrOpTimer); 1]>, + io_concurrency: IoConcurrency, ctx: &RequestContext, ) -> Vec> { debug_assert_current_span_has_tenant_and_timeline_id(); @@ -1664,6 +1710,7 @@ impl PageServerHandler { .get_rel_page_at_lsn_batched( requests.iter().map(|(reltag, blkno, _)| (reltag, blkno)), effective_lsn, + io_concurrency, ctx, ) .await; diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 255bd01e25..648128c86d 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -17,6 +17,7 @@ use crate::span::{ debug_assert_current_span_has_tenant_and_timeline_id, debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id, }; +use crate::tenant::storage_layer::IoConcurrency; use crate::tenant::timeline::GetVectoredError; use anyhow::{ensure, Context}; use bytes::{Buf, Bytes, BytesMut}; @@ -208,6 +209,7 @@ impl Timeline { .get_rel_page_at_lsn_batched( pages.iter().map(|(tag, blknum)| (tag, blknum)), effective_lsn, + IoConcurrency::todo(), ctx, ) .await; @@ -246,6 +248,7 @@ impl Timeline { &self, pages: impl ExactSizeIterator, effective_lsn: Lsn, + io_concurrency: IoConcurrency, ctx: &RequestContext, ) -> Vec> { debug_assert_current_span_has_tenant_and_timeline_id(); @@ -309,7 +312,10 @@ impl Timeline { acc.to_keyspace() }; - match self.get_vectored(keyspace, effective_lsn, ctx).await { + match self + .get_vectored(keyspace, effective_lsn, io_concurrency, ctx) + .await + { Ok(results) => { for (key, res) in results { let mut key_slots = keys_slots.remove(&key).unwrap().into_iter(); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 6cfa5704c4..2555c43ae1 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -5720,7 +5720,7 @@ mod tests { use pageserver_api::value::Value; use pageserver_compaction::helpers::overlaps_with; use rand::{thread_rng, Rng}; - use storage_layer::{PersistentLayerKey, SelectedIoConcurrency}; + use storage_layer::{IoConcurrency, PersistentLayerKey, SelectedIoConcurrency}; use tests::storage_layer::ValuesReconstructState; use tests::timeline::{GetVectoredError, ShutdownMode}; use timeline::{CompactOptions, DeltaLayerTestDesc}; @@ -6563,6 +6563,12 @@ mod tests { for io_concurrency_level in io_concurrency_levels { for read in reads.clone() { + // The type is not Copy() because FuturesUnordered variant is not Copy. + let io_concurrency_level = match io_concurrency_level { + SelectedIoConcurrency::Serial => SelectedIoConcurrency::Serial, + SelectedIoConcurrency::Parallel => SelectedIoConcurrency::Parallel, + SelectedIoConcurrency::FuturesUnordered(_) => unreachable!("not used"), + }; info!( "Doing vectored read on {:?} with IO concurrency {:?}", read, io_concurrency_level @@ -6572,7 +6578,9 @@ mod tests { .get_vectored_impl( read.clone(), reads_lsn, - &mut ValuesReconstructState::new_with_io_concurrency(io_concurrency_level), + &mut ValuesReconstructState::new(IoConcurrency::spawn( + io_concurrency_level, + )), &ctx, ) .await; @@ -6655,7 +6663,7 @@ mod tests { .get_vectored_impl( aux_keyspace.clone(), read_lsn, - &mut ValuesReconstructState::new(), + &mut ValuesReconstructState::new(IoConcurrency::todo()), &ctx, ) .await; @@ -6803,7 +6811,7 @@ mod tests { .get_vectored_impl( read.clone(), current_lsn, - &mut ValuesReconstructState::new(), + &mut ValuesReconstructState::new(IoConcurrency::todo()), &ctx, ) .await?; @@ -6938,7 +6946,7 @@ mod tests { ranges: vec![child_gap_at_key..child_gap_at_key.next()], }, query_lsn, - &mut ValuesReconstructState::new(), + &mut ValuesReconstructState::new(IoConcurrency::todo()), &ctx, ) .await; @@ -7437,7 +7445,7 @@ mod tests { .get_vectored_impl( keyspace.clone(), lsn, - &mut ValuesReconstructState::default(), + &mut ValuesReconstructState::new(IoConcurrency::todo()), &ctx, ) .await? @@ -7627,7 +7635,7 @@ mod tests { lsn: Lsn, ctx: &RequestContext, ) -> anyhow::Result<(BTreeMap>, usize)> { - let mut reconstruct_state = ValuesReconstructState::default(); + let mut reconstruct_state = ValuesReconstructState::new(IoConcurrency::todo()); let res = tline .get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx) .await?; @@ -7851,7 +7859,7 @@ mod tests { lsn: Lsn, ctx: &RequestContext, ) -> Result, GetVectoredError> { - let mut reconstruct_state = ValuesReconstructState::new(); + let mut reconstruct_state = ValuesReconstructState::new(IoConcurrency::todo()); let mut res = tline .get_vectored_impl( KeySpace::single(key..key.next()), diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 0e9e27e135..add35b0ad5 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -12,6 +12,8 @@ pub mod merge_iterator; use crate::context::{AccessStatsBehavior, RequestContext}; use bytes::Bytes; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use pageserver_api::key::{Key, NON_INHERITED_SPARSE_RANGE}; use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum}; use pageserver_api::record::NeonWalRecord; @@ -26,7 +28,9 @@ use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::task::Poll; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use tokio_util::sync::CancellationToken; use tracing::{trace, Instrument}; +use utils::sync::gate::GateGuard; use utils::lsn::Lsn; @@ -170,11 +174,250 @@ pub(crate) enum IoConcurrency { Serial, Parallel, FuturesUnordered { - futures: futures::stream::FuturesUnordered>>>, + ios_tx: tokio::sync::mpsc::UnboundedSender, + barriers_tx: tokio::sync::mpsc::UnboundedSender>, + cancel_task_on_drop: Arc, }, } +type IoFuture = Pin>>; + +pub(crate) enum SelectedIoConcurrency { + Serial, + Parallel, + FuturesUnordered(GateGuard), +} + +impl std::fmt::Debug for IoConcurrency { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + IoConcurrency::Serial => write!(f, "Serial"), + IoConcurrency::Parallel => write!(f, "Parallel"), + IoConcurrency::FuturesUnordered { .. } => write!(f, "FuturesUnordered"), + } + } +} + +impl std::fmt::Debug for SelectedIoConcurrency { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SelectedIoConcurrency::Serial => write!(f, "Serial"), + SelectedIoConcurrency::Parallel => write!(f, "Parallel"), + SelectedIoConcurrency::FuturesUnordered(_) => write!(f, "FuturesUnordered"), + } + } +} + impl IoConcurrency { + #[deprecated] + pub(crate) fn todo() -> Self { + // To test futuresunordered, we can create a gate guard here and Box::leak it + Self::spawn(SelectedIoConcurrency::Serial) + } + pub(crate) fn spawn_from_env(gate_guard: GateGuard) -> IoConcurrency { + static IO_CONCURRENCY: once_cell::sync::Lazy = once_cell::sync::Lazy::new(|| { + std::env::var("NEON_PAGESERVER_VALUE_RECONSTRUCT_IO_CONCURRENCY") + .unwrap_or_else(|_| "serial".to_string()) + }); + let selected = match IO_CONCURRENCY.as_str() { + "parallel" => SelectedIoConcurrency::Parallel, // TODO: clonable gateguard, pass through Arc? ? + "serial" => SelectedIoConcurrency::Serial, + "futures-unordered" => SelectedIoConcurrency::FuturesUnordered(gate_guard), + x => panic!( + "Invalid value for NEON_PAGESERVER_VALUE_RECONSTRUCT_IO_CONCURRENCY: {}", + x + ), + }; + Self::spawn(selected) + } + + pub(crate) fn spawn(io_concurrency: SelectedIoConcurrency) -> Self { + match io_concurrency { + SelectedIoConcurrency::Serial => IoConcurrency::Serial, + SelectedIoConcurrency::Parallel => IoConcurrency::Parallel, + SelectedIoConcurrency::FuturesUnordered(gate_guard) => { + let (barriers_tx, barrier_rx) = tokio::sync::mpsc::unbounded_channel(); + let (ios_tx, ios_rx) = tokio::sync::mpsc::unbounded_channel(); + let (cancel, _cancel_task_on_drop) = { + let t = CancellationToken::new(); + (t.clone(), Arc::new(t.drop_guard())) + }; + static TASK_ID: AtomicUsize = AtomicUsize::new(0); + let task_id = TASK_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let span = + tracing::trace_span!(parent: None, "futures_unordered_io", task_id = task_id); + trace!(task_id, "spawning"); + tokio::spawn(async move { + trace!("start"); + scopeguard::defer!({ trace!("end") }); + type IosRx = tokio::sync::mpsc::UnboundedReceiver; + type BarrierReqRx = + tokio::sync::mpsc::UnboundedReceiver>; + type BarrierDoneTx = tokio::sync::oneshot::Sender<()>; + enum State { + Waiting { + // invariant: is_empty(), but we recycle the allocation + empty_futures: FuturesUnordered, + ios_rx: IosRx, + barrier_rx: BarrierReqRx, + }, + Executing { + futures: FuturesUnordered, + ios_rx: IosRx, + barrier_rx: BarrierReqRx, + }, + Barriering { + futures: FuturesUnordered, + ios_rx: IosRx, + barrier_rx: BarrierReqRx, + barrier_done: BarrierDoneTx, + }, + ShuttingDown { + futures: FuturesUnordered, + barrier_done: Option, + }, + } + let mut state = State::Waiting { + empty_futures: FuturesUnordered::new(), + ios_rx, + barrier_rx, + }; + loop { + match state { + State::Waiting { + empty_futures, + mut ios_rx, + mut barrier_rx, + } => { + assert!(empty_futures.is_empty()); + tokio::select! { + () = cancel.cancelled() => { + state = State::ShuttingDown { futures: empty_futures, barrier_done: None }; + } + fut = ios_rx.recv() => { + if let Some(fut) = fut { + empty_futures.push(fut); + state = State::Executing { futures: empty_futures, ios_rx, barrier_rx }; + } else { + state = State::ShuttingDown { futures: empty_futures, barrier_done: None } + } + } + barrier_done = barrier_rx.recv() => { + if let Some(barrier_done) = barrier_done { + state = State::Barriering { futures: empty_futures, ios_rx, barrier_rx, barrier_done }; + } else { + state = State::ShuttingDown { futures: empty_futures, barrier_done: None }; + } + } + } + } + State::Executing { + mut futures, + mut ios_rx, + mut barrier_rx, + } => { + tokio::select! { + () = cancel.cancelled() => { + state = State::ShuttingDown { futures, barrier_done: None }; + } + res = futures.next() => { + assert!(res.is_some()); + if futures.is_empty() { + state = State::Waiting { empty_futures: futures, ios_rx, barrier_rx }; + } else { + state = State::Executing { futures, ios_rx, barrier_rx }; + } + } + fut = ios_rx.recv() => { + if let Some(fut) = fut { + futures.push(fut); + state = State::Executing { futures, ios_rx, barrier_rx }; + } else { + state = State::ShuttingDown { futures, barrier_done: None }; + } + } + barrier_done = barrier_rx.recv() => { + if let Some(barrier_done) = barrier_done { + state = State::Barriering { futures, ios_rx, barrier_rx, barrier_done }; + } else { + state = State::ShuttingDown { futures, barrier_done: None }; + } + } + } + } + State::Barriering { + mut futures, + ios_rx, + barrier_rx, + barrier_done, + } => { + if futures.is_empty() { + barrier_done.send(()).unwrap(); + state = State::Waiting { + empty_futures: futures, + ios_rx, + barrier_rx, + }; + } else { + tokio::select! { + () = cancel.cancelled() => { + state = State::ShuttingDown { futures, barrier_done: Some(barrier_done) }; + } + res = futures.next() => { + assert!(res.is_some()); + if futures.is_empty() { + barrier_done.send(()).unwrap(); + state = State::Waiting { empty_futures: futures , ios_rx, barrier_rx }; + } else { + state = State::Barriering { futures, ios_rx, barrier_rx, barrier_done }; + } + } + // in barriering mode, we don't accept new IOs or new barrier requests + } + } + } + State::ShuttingDown { + mut futures, + barrier_done, + } => { + trace!("shutting down"); + while let Some(()) = futures.next().await { + // drain + } + if let Some(barrier_done) = barrier_done { + barrier_done.send(()).unwrap(); + } + break; + } + } + } + drop(gate_guard); // drop it right before we exitlast + }.instrument(span)); + IoConcurrency::FuturesUnordered { + ios_tx, + barriers_tx, + cancel_task_on_drop: _cancel_task_on_drop, + } + } + } + } + + pub(crate) fn clone(&self) -> Self { + match self { + IoConcurrency::Serial => IoConcurrency::Serial, + IoConcurrency::Parallel => IoConcurrency::Parallel, + IoConcurrency::FuturesUnordered { + ios_tx, + barriers_tx, + cancel_task_on_drop, + } => IoConcurrency::FuturesUnordered { + ios_tx: ios_tx.clone(), + barriers_tx: barriers_tx.clone(), + cancel_task_on_drop: cancel_task_on_drop.clone(), + }, + } + } + pub(crate) async fn spawn_io(&mut self, fut: F) where F: std::future::Future + Send + 'static, @@ -193,12 +436,16 @@ impl IoConcurrency { IoConcurrency::Parallel => { tokio::spawn(fut); } - IoConcurrency::FuturesUnordered { futures } => { + IoConcurrency::FuturesUnordered { ios_tx, .. } => { let mut fut = Box::pin(fut); - match futures::poll!(&mut fut) { - Poll::Ready(()) => {} - Poll::Pending => { - futures.push(fut); + // opportunistic poll to give some boost (unproven if it helps, but sounds like a good idea) + if let Poll::Ready(()) = futures::poll!(&mut fut) { + return; + } + match ios_tx.send(fut) { + Ok(()) => {} + Err(_) => { + unreachable!("the io task must have exited, likely it panicked") } } } @@ -206,54 +453,15 @@ impl IoConcurrency { } } -#[cfg(test)] -#[derive(Debug, Copy, Clone)] -pub(crate) enum SelectedIoConcurrency { - Serial, - Parallel, -} - impl ValuesReconstructState { - pub(crate) fn new() -> Self { + pub(crate) fn new(io_concurrency: IoConcurrency) -> Self { Self { keys: HashMap::new(), keys_done: KeySpaceRandomAccum::new(), keys_with_image_coverage: None, layers_visited: 0, delta_layers_visited: 0, - io_concurrency: { - static IO_CONCURRENCY: once_cell::sync::Lazy = - once_cell::sync::Lazy::new(|| { - std::env::var("NEON_PAGESERVER_VALUE_RECONSTRUCT_IO_CONCURRENCY") - .unwrap_or_else(|_| "serial".to_string()) - }); - match IO_CONCURRENCY.as_str() { - "parallel" => IoConcurrency::Parallel, - "serial" => IoConcurrency::Serial, - "futures-unordered" => IoConcurrency::FuturesUnordered { - futures: futures::stream::FuturesUnordered::new(), - }, - x => panic!( - "Invalid value for NEON_PAGESERVER_VALUE_RECONSTRUCT_IO_CONCURRENCY: {}", - x - ), - } - }, - } - } - - #[cfg(test)] - pub(crate) fn new_with_io_concurrency(io_concurrency: SelectedIoConcurrency) -> Self { - Self { - keys: HashMap::new(), - keys_done: KeySpaceRandomAccum::new(), - keys_with_image_coverage: None, - layers_visited: 0, - delta_layers_visited: 0, - io_concurrency: match io_concurrency { - SelectedIoConcurrency::Serial => IoConcurrency::Serial, - SelectedIoConcurrency::Parallel => IoConcurrency::Parallel, - }, + io_concurrency, } } @@ -345,12 +553,6 @@ impl ValuesReconstructState { } } -impl Default for ValuesReconstructState { - fn default() -> Self { - Self::new() - } -} - /// A key that uniquely identifies a layer in a timeline #[derive(Debug, PartialEq, Eq, Clone, Hash)] pub(crate) enum LayerId { diff --git a/pageserver/src/tenant/storage_layer/layer/tests.rs b/pageserver/src/tenant/storage_layer/layer/tests.rs index f59f6e2a8a..160a41d610 100644 --- a/pageserver/src/tenant/storage_layer/layer/tests.rs +++ b/pageserver/src/tenant/storage_layer/layer/tests.rs @@ -9,7 +9,7 @@ use utils::{ use super::failpoints::{Failpoint, FailpointKind}; use super::*; -use crate::{context::DownloadBehavior, tenant::storage_layer::LayerVisibilityHint}; +use crate::{context::DownloadBehavior, tenant::storage_layer::{IoConcurrency, LayerVisibilityHint}}; use crate::{task_mgr::TaskKind, tenant::harness::TenantHarness}; /// Used in tests to advance a future to wanted await point, and not futher. @@ -55,7 +55,7 @@ async fn smoke_test() { }; let img_before = { - let mut data = ValuesReconstructState::default(); + let mut data = ValuesReconstructState::new(IoConcurrency::todo()); layer .get_values_reconstruct_data( controlfile_keyspace.clone(), @@ -90,7 +90,7 @@ async fn smoke_test() { // on accesses when the layer is evicted, it will automatically be downloaded. let img_after = { - let mut data = ValuesReconstructState::default(); + let mut data = ValuesReconstructState::new(IoConcurrency::todo()); layer .get_values_reconstruct_data( controlfile_keyspace.clone(), diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index cd52843ecf..c6ed7acad9 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -78,7 +78,8 @@ use crate::{ layer_map::{LayerMap, SearchResult}, metadata::TimelineMetadata, storage_layer::{ - inmemory_layer::IndexEntry, PersistentLayerDesc, ValueReconstructSituation, + inmemory_layer::IndexEntry, IoConcurrency, PersistentLayerDesc, + ValueReconstructSituation, }, }, walingest::WalLagCooldown, @@ -1014,7 +1015,7 @@ impl Timeline { ranges: vec![key..key.next()], }; - let mut reconstruct_state = ValuesReconstructState::new(); + let mut reconstruct_state = ValuesReconstructState::new(IoConcurrency::todo()); let vectored_res = self .get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx) @@ -1057,6 +1058,7 @@ impl Timeline { &self, keyspace: KeySpace, lsn: Lsn, + io_concurrency: super::storage_layer::IoConcurrency, ctx: &RequestContext, ) -> Result>, GetVectoredError> { if !lsn.is_valid() { @@ -1091,7 +1093,7 @@ impl Timeline { .get_vectored_impl( keyspace.clone(), lsn, - &mut ValuesReconstructState::new(), + &mut ValuesReconstructState::new(io_concurrency), ctx, ) .await; @@ -1147,7 +1149,7 @@ impl Timeline { .get_vectored_impl( keyspace.clone(), lsn, - &mut ValuesReconstructState::default(), + &mut ValuesReconstructState::new(IoConcurrency::todo()), ctx, ) .await; @@ -3429,17 +3431,33 @@ impl Timeline { } } - match reconstruct_state.io_concurrency { + // TODO: internalize + trace!("waiting for futures to complete"); + match &reconstruct_state.io_concurrency { super::storage_layer::IoConcurrency::Serial => (), super::storage_layer::IoConcurrency::Parallel => (), - super::storage_layer::IoConcurrency::FuturesUnordered { ref mut futures } => { - trace!("waiting for futures to complete"); - while let Some(()) = futures.next().await { - trace!("future completed"); + super::storage_layer::IoConcurrency::FuturesUnordered { barriers_tx, .. } => { + let (tx, rx) = tokio::sync::oneshot::channel(); + match barriers_tx.send(tx) { + Ok(()) => {} + Err(_) => { + return Err(GetVectoredError::Other(anyhow::anyhow!( + "concurrent io task dropped its barriers_rx, likely it panicked" + ))); + } } - trace!("futures completed"); + match rx.await { + Ok(()) => {} + Err(_) => { + return Err(GetVectoredError::Other(anyhow::anyhow!( + "concurrent io task dropped the barrier_done, likely it panicked" + ))); + } + } + } } + trace!("futures completed"); Ok(TimelineVisitOutcome { completed_keyspace, @@ -4182,7 +4200,12 @@ impl Timeline { || (last_key_in_range && key_request_accum.raw_size() > 0) { let results = self - .get_vectored(key_request_accum.consume_keyspace(), lsn, ctx) + .get_vectored( + key_request_accum.consume_keyspace(), + lsn, + IoConcurrency::todo(), + ctx, + ) .await?; if self.cancel.is_cancelled() { @@ -4263,7 +4286,7 @@ impl Timeline { start: Key, ) -> Result { // Metadata keys image layer creation. - let mut reconstruct_state = ValuesReconstructState::default(); + let mut reconstruct_state = ValuesReconstructState::new(IoConcurrency::todo()); let begin = Instant::now(); let data = self .get_vectored_impl(partition.clone(), lsn, &mut reconstruct_state, ctx) @@ -5757,12 +5780,14 @@ impl Timeline { lsn: Lsn, ctx: &RequestContext, ) -> anyhow::Result> { + use super::storage_layer::SelectedIoConcurrency; + let mut all_data = Vec::new(); let guard = self.layers.read().await; for layer in guard.layer_map()?.iter_historic_layers() { if !layer.is_delta() && layer.image_layer_lsn() == lsn { let layer = guard.get_from_desc(&layer); - let mut reconstruct_data = ValuesReconstructState::default(); + let mut reconstruct_data = ValuesReconstructState::new(IoConcurrency::todo()); layer .get_values_reconstruct_data( KeySpace::single(Key::MIN..Key::MAX),