WIP fix: one task per connections to drive all the IO futures

This commit is contained in:
Christian Schwarz
2024-12-17 19:04:50 +01:00
parent e209fd8d77
commit 8e6c01ddea
8 changed files with 383 additions and 84 deletions

View File

@@ -175,7 +175,7 @@ thiserror = "1.0"
tikv-jemallocator = { version = "0.6", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] }
tikv-jemalloc-ctl = { version = "0.6", features = ["stats"] }
tokio = { version = "1.17", features = ["macros"] }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
tokio-epoll-uring = {path = "../tokio-epoll-uring/tokio-epoll-uring" }
tokio-io-timeout = "1.2.0"
tokio-postgres-rustls = "0.12.0"
tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"]}

View File

@@ -25,6 +25,7 @@ use tokio_tar::{Builder, EntryType, Header};
use crate::context::RequestContext;
use crate::pgdatadir_mapping::Version;
use crate::tenant::storage_layer::IoConcurrency;
use crate::tenant::Timeline;
use pageserver_api::reltag::{RelTag, SlruKind};
@@ -303,7 +304,17 @@ where
for part in slru_partitions.parts {
let blocks = self
.timeline
.get_vectored(part, self.lsn, self.ctx)
.get_vectored(
part,
self.lsn,
IoConcurrency::spawn_from_env(
self.timeline
.gate
.enter()
.map_err(|e| BasebackupError::Server(e.into()))?,
),
self.ctx,
)
.await
.map_err(|e| BasebackupError::Server(e.into()))?;

View File

@@ -39,6 +39,7 @@ use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::sync::gate::Gate;
use utils::sync::spsc_fold;
use utils::{
auth::{Claims, Scope, SwappableJwtAuth},
@@ -61,6 +62,7 @@ use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME};
use crate::tenant::mgr::ShardSelector;
use crate::tenant::mgr::TenantManager;
use crate::tenant::mgr::{GetActiveTenantError, GetTenantError, ShardResolveResult};
use crate::tenant::storage_layer::IoConcurrency;
use crate::tenant::timeline::{self, WaitLsnError};
use crate::tenant::GetTimelineError;
use crate::tenant::PageReconstructError;
@@ -89,6 +91,7 @@ pub struct Listener {
pub struct Connections {
cancel: CancellationToken,
tasks: tokio::task::JoinSet<ConnectionHandlerResult>,
gate: Arc<Gate>,
}
pub fn spawn(
@@ -98,6 +101,7 @@ pub fn spawn(
tcp_listener: tokio::net::TcpListener,
) -> Listener {
let cancel = CancellationToken::new();
let gate = Arc::new(Gate::default());
let libpq_ctx = RequestContext::todo_child(
TaskKind::LibpqEndpointListener,
// listener task shouldn't need to download anything. (We will
@@ -116,6 +120,7 @@ pub fn spawn(
conf.page_service_pipelining.clone(),
libpq_ctx,
cancel.clone(),
gate,
)
.map(anyhow::Ok),
));
@@ -133,11 +138,16 @@ impl Listener {
}
impl Connections {
pub(crate) async fn shutdown(self) {
let Self { cancel, mut tasks } = self;
let Self {
cancel,
mut tasks,
gate,
} = self;
cancel.cancel();
while let Some(res) = tasks.join_next().await {
Self::handle_connection_completion(res);
}
gate.close().await;
}
fn handle_connection_completion(res: Result<anyhow::Result<()>, tokio::task::JoinError>) {
@@ -165,6 +175,7 @@ pub async fn libpq_listener_main(
pipelining_config: PageServicePipeliningConfig,
listener_ctx: RequestContext,
listener_cancel: CancellationToken,
gate: Arc<Gate>,
) -> Connections {
let connections_cancel = CancellationToken::new();
let mut connection_handler_tasks = tokio::task::JoinSet::default();
@@ -196,6 +207,7 @@ pub async fn libpq_listener_main(
pipelining_config.clone(),
connection_ctx,
connections_cancel.child_token(),
Arc::clone(&gate),
));
}
Err(err) => {
@@ -210,6 +222,7 @@ pub async fn libpq_listener_main(
Connections {
cancel: connections_cancel,
tasks: connection_handler_tasks,
gate,
}
}
@@ -224,6 +237,7 @@ async fn page_service_conn_main(
pipelining_config: PageServicePipeliningConfig,
connection_ctx: RequestContext,
cancel: CancellationToken,
gate: Arc<Gate>,
) -> ConnectionHandlerResult {
let _guard = LIVE_CONNECTIONS
.with_label_values(&["page_service"])
@@ -278,6 +292,7 @@ async fn page_service_conn_main(
pipelining_config,
connection_ctx,
cancel.clone(),
gate,
);
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
@@ -324,6 +339,8 @@ struct PageServerHandler {
timeline_handles: Option<TimelineHandles>,
pipelining_config: PageServicePipeliningConfig,
gate: Arc<Gate>,
}
struct TimelineHandles {
@@ -616,6 +633,7 @@ impl PageServerHandler {
pipelining_config: PageServicePipeliningConfig,
connection_ctx: RequestContext,
cancel: CancellationToken,
gate: Arc<Gate>,
) -> Self {
PageServerHandler {
auth,
@@ -624,6 +642,7 @@ impl PageServerHandler {
timeline_handles: Some(TimelineHandles::new(tenant_manager)),
cancel,
pipelining_config,
gate,
}
}
@@ -906,6 +925,7 @@ impl PageServerHandler {
&mut self,
pgb_writer: &mut PostgresBackend<IO>,
batch: BatchedFeMessage,
io_concurrency: IoConcurrency,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<(), QueryError>
@@ -965,6 +985,7 @@ impl PageServerHandler {
&shard,
effective_request_lsn,
pages,
io_concurrency,
ctx,
)
.instrument(span.clone())
@@ -1140,6 +1161,14 @@ impl PageServerHandler {
}
}
let io_concurrency = IoConcurrency::spawn_from_env(match self.gate.enter() {
Ok(guard) => guard,
Err(_) => {
info!("shutdown request received in page handler");
return Err(QueryError::Shutdown);
}
});
let pgb_reader = pgb
.split()
.context("implementation error: split pgb into reader and writer")?;
@@ -1160,6 +1189,7 @@ impl PageServerHandler {
timeline_handles,
request_span,
pipelining_config,
io_concurrency,
&ctx,
)
.await
@@ -1172,6 +1202,7 @@ impl PageServerHandler {
timeline_id,
timeline_handles,
request_span,
io_concurrency,
&ctx,
)
.await
@@ -1198,6 +1229,7 @@ impl PageServerHandler {
timeline_id: TimelineId,
mut timeline_handles: TimelineHandles,
request_span: Span,
io_concurrency: IoConcurrency,
ctx: &RequestContext,
) -> (
(PostgresBackendReader<IO>, TimelineHandles),
@@ -1238,7 +1270,13 @@ impl PageServerHandler {
trace!("handling message");
let err = self
.pagesteam_handle_batched_message(pgb_writer, msg, &cancel, ctx)
.pagesteam_handle_batched_message(
pgb_writer,
msg,
io_concurrency.clone(),
&cancel,
ctx,
)
.await;
match err {
Ok(()) => {}
@@ -1262,6 +1300,7 @@ impl PageServerHandler {
mut timeline_handles: TimelineHandles,
request_span: Span,
pipelining_config: PageServicePipeliningConfigPipelined,
io_concurrency: IoConcurrency,
ctx: &RequestContext,
) -> (
(PostgresBackendReader<IO>, TimelineHandles),
@@ -1402,8 +1441,14 @@ impl PageServerHandler {
}
};
batch.throttle(&self.cancel).await?;
self.pagesteam_handle_batched_message(pgb_writer, batch, &cancel, &ctx)
.await?;
self.pagesteam_handle_batched_message(
pgb_writer,
batch,
io_concurrency.clone(),
&cancel,
&ctx,
)
.await?;
}
}
});
@@ -1652,6 +1697,7 @@ impl PageServerHandler {
timeline: &Timeline,
effective_lsn: Lsn,
requests: smallvec::SmallVec<[(RelTag, BlockNumber, SmgrOpTimer); 1]>,
io_concurrency: IoConcurrency,
ctx: &RequestContext,
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), PageStreamError>> {
debug_assert_current_span_has_tenant_and_timeline_id();
@@ -1664,6 +1710,7 @@ impl PageServerHandler {
.get_rel_page_at_lsn_batched(
requests.iter().map(|(reltag, blkno, _)| (reltag, blkno)),
effective_lsn,
io_concurrency,
ctx,
)
.await;

View File

@@ -17,6 +17,7 @@ use crate::span::{
debug_assert_current_span_has_tenant_and_timeline_id,
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
};
use crate::tenant::storage_layer::IoConcurrency;
use crate::tenant::timeline::GetVectoredError;
use anyhow::{ensure, Context};
use bytes::{Buf, Bytes, BytesMut};
@@ -208,6 +209,7 @@ impl Timeline {
.get_rel_page_at_lsn_batched(
pages.iter().map(|(tag, blknum)| (tag, blknum)),
effective_lsn,
IoConcurrency::todo(),
ctx,
)
.await;
@@ -246,6 +248,7 @@ impl Timeline {
&self,
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber)>,
effective_lsn: Lsn,
io_concurrency: IoConcurrency,
ctx: &RequestContext,
) -> Vec<Result<Bytes, PageReconstructError>> {
debug_assert_current_span_has_tenant_and_timeline_id();
@@ -309,7 +312,10 @@ impl Timeline {
acc.to_keyspace()
};
match self.get_vectored(keyspace, effective_lsn, ctx).await {
match self
.get_vectored(keyspace, effective_lsn, io_concurrency, ctx)
.await
{
Ok(results) => {
for (key, res) in results {
let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();

View File

@@ -5720,7 +5720,7 @@ mod tests {
use pageserver_api::value::Value;
use pageserver_compaction::helpers::overlaps_with;
use rand::{thread_rng, Rng};
use storage_layer::{PersistentLayerKey, SelectedIoConcurrency};
use storage_layer::{IoConcurrency, PersistentLayerKey, SelectedIoConcurrency};
use tests::storage_layer::ValuesReconstructState;
use tests::timeline::{GetVectoredError, ShutdownMode};
use timeline::{CompactOptions, DeltaLayerTestDesc};
@@ -6563,6 +6563,12 @@ mod tests {
for io_concurrency_level in io_concurrency_levels {
for read in reads.clone() {
// The type is not Copy() because FuturesUnordered variant is not Copy.
let io_concurrency_level = match io_concurrency_level {
SelectedIoConcurrency::Serial => SelectedIoConcurrency::Serial,
SelectedIoConcurrency::Parallel => SelectedIoConcurrency::Parallel,
SelectedIoConcurrency::FuturesUnordered(_) => unreachable!("not used"),
};
info!(
"Doing vectored read on {:?} with IO concurrency {:?}",
read, io_concurrency_level
@@ -6572,7 +6578,9 @@ mod tests {
.get_vectored_impl(
read.clone(),
reads_lsn,
&mut ValuesReconstructState::new_with_io_concurrency(io_concurrency_level),
&mut ValuesReconstructState::new(IoConcurrency::spawn(
io_concurrency_level,
)),
&ctx,
)
.await;
@@ -6655,7 +6663,7 @@ mod tests {
.get_vectored_impl(
aux_keyspace.clone(),
read_lsn,
&mut ValuesReconstructState::new(),
&mut ValuesReconstructState::new(IoConcurrency::todo()),
&ctx,
)
.await;
@@ -6803,7 +6811,7 @@ mod tests {
.get_vectored_impl(
read.clone(),
current_lsn,
&mut ValuesReconstructState::new(),
&mut ValuesReconstructState::new(IoConcurrency::todo()),
&ctx,
)
.await?;
@@ -6938,7 +6946,7 @@ mod tests {
ranges: vec![child_gap_at_key..child_gap_at_key.next()],
},
query_lsn,
&mut ValuesReconstructState::new(),
&mut ValuesReconstructState::new(IoConcurrency::todo()),
&ctx,
)
.await;
@@ -7437,7 +7445,7 @@ mod tests {
.get_vectored_impl(
keyspace.clone(),
lsn,
&mut ValuesReconstructState::default(),
&mut ValuesReconstructState::new(IoConcurrency::todo()),
&ctx,
)
.await?
@@ -7627,7 +7635,7 @@ mod tests {
lsn: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<(BTreeMap<Key, Result<Bytes, PageReconstructError>>, usize)> {
let mut reconstruct_state = ValuesReconstructState::default();
let mut reconstruct_state = ValuesReconstructState::new(IoConcurrency::todo());
let res = tline
.get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx)
.await?;
@@ -7851,7 +7859,7 @@ mod tests {
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Option<Bytes>, GetVectoredError> {
let mut reconstruct_state = ValuesReconstructState::new();
let mut reconstruct_state = ValuesReconstructState::new(IoConcurrency::todo());
let mut res = tline
.get_vectored_impl(
KeySpace::single(key..key.next()),

View File

@@ -12,6 +12,8 @@ pub mod merge_iterator;
use crate::context::{AccessStatsBehavior, RequestContext};
use bytes::Bytes;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use pageserver_api::key::{Key, NON_INHERITED_SPARSE_RANGE};
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
use pageserver_api::record::NeonWalRecord;
@@ -26,7 +28,9 @@ use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::task::Poll;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio_util::sync::CancellationToken;
use tracing::{trace, Instrument};
use utils::sync::gate::GateGuard;
use utils::lsn::Lsn;
@@ -170,11 +174,250 @@ pub(crate) enum IoConcurrency {
Serial,
Parallel,
FuturesUnordered {
futures: futures::stream::FuturesUnordered<Pin<Box<dyn Send + Future<Output = ()>>>>,
ios_tx: tokio::sync::mpsc::UnboundedSender<IoFuture>,
barriers_tx: tokio::sync::mpsc::UnboundedSender<tokio::sync::oneshot::Sender<()>>,
cancel_task_on_drop: Arc<tokio_util::sync::DropGuard>,
},
}
type IoFuture = Pin<Box<dyn Send + Future<Output = ()>>>;
pub(crate) enum SelectedIoConcurrency {
Serial,
Parallel,
FuturesUnordered(GateGuard),
}
impl std::fmt::Debug for IoConcurrency {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
IoConcurrency::Serial => write!(f, "Serial"),
IoConcurrency::Parallel => write!(f, "Parallel"),
IoConcurrency::FuturesUnordered { .. } => write!(f, "FuturesUnordered"),
}
}
}
impl std::fmt::Debug for SelectedIoConcurrency {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SelectedIoConcurrency::Serial => write!(f, "Serial"),
SelectedIoConcurrency::Parallel => write!(f, "Parallel"),
SelectedIoConcurrency::FuturesUnordered(_) => write!(f, "FuturesUnordered"),
}
}
}
impl IoConcurrency {
#[deprecated]
pub(crate) fn todo() -> Self {
// To test futuresunordered, we can create a gate guard here and Box::leak it
Self::spawn(SelectedIoConcurrency::Serial)
}
pub(crate) fn spawn_from_env(gate_guard: GateGuard) -> IoConcurrency {
static IO_CONCURRENCY: once_cell::sync::Lazy<String> = once_cell::sync::Lazy::new(|| {
std::env::var("NEON_PAGESERVER_VALUE_RECONSTRUCT_IO_CONCURRENCY")
.unwrap_or_else(|_| "serial".to_string())
});
let selected = match IO_CONCURRENCY.as_str() {
"parallel" => SelectedIoConcurrency::Parallel, // TODO: clonable gateguard, pass through Arc<Gate>? ?
"serial" => SelectedIoConcurrency::Serial,
"futures-unordered" => SelectedIoConcurrency::FuturesUnordered(gate_guard),
x => panic!(
"Invalid value for NEON_PAGESERVER_VALUE_RECONSTRUCT_IO_CONCURRENCY: {}",
x
),
};
Self::spawn(selected)
}
pub(crate) fn spawn(io_concurrency: SelectedIoConcurrency) -> Self {
match io_concurrency {
SelectedIoConcurrency::Serial => IoConcurrency::Serial,
SelectedIoConcurrency::Parallel => IoConcurrency::Parallel,
SelectedIoConcurrency::FuturesUnordered(gate_guard) => {
let (barriers_tx, barrier_rx) = tokio::sync::mpsc::unbounded_channel();
let (ios_tx, ios_rx) = tokio::sync::mpsc::unbounded_channel();
let (cancel, _cancel_task_on_drop) = {
let t = CancellationToken::new();
(t.clone(), Arc::new(t.drop_guard()))
};
static TASK_ID: AtomicUsize = AtomicUsize::new(0);
let task_id = TASK_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let span =
tracing::trace_span!(parent: None, "futures_unordered_io", task_id = task_id);
trace!(task_id, "spawning");
tokio::spawn(async move {
trace!("start");
scopeguard::defer!({ trace!("end") });
type IosRx = tokio::sync::mpsc::UnboundedReceiver<IoFuture>;
type BarrierReqRx =
tokio::sync::mpsc::UnboundedReceiver<tokio::sync::oneshot::Sender<()>>;
type BarrierDoneTx = tokio::sync::oneshot::Sender<()>;
enum State {
Waiting {
// invariant: is_empty(), but we recycle the allocation
empty_futures: FuturesUnordered<IoFuture>,
ios_rx: IosRx,
barrier_rx: BarrierReqRx,
},
Executing {
futures: FuturesUnordered<IoFuture>,
ios_rx: IosRx,
barrier_rx: BarrierReqRx,
},
Barriering {
futures: FuturesUnordered<IoFuture>,
ios_rx: IosRx,
barrier_rx: BarrierReqRx,
barrier_done: BarrierDoneTx,
},
ShuttingDown {
futures: FuturesUnordered<IoFuture>,
barrier_done: Option<BarrierDoneTx>,
},
}
let mut state = State::Waiting {
empty_futures: FuturesUnordered::new(),
ios_rx,
barrier_rx,
};
loop {
match state {
State::Waiting {
empty_futures,
mut ios_rx,
mut barrier_rx,
} => {
assert!(empty_futures.is_empty());
tokio::select! {
() = cancel.cancelled() => {
state = State::ShuttingDown { futures: empty_futures, barrier_done: None };
}
fut = ios_rx.recv() => {
if let Some(fut) = fut {
empty_futures.push(fut);
state = State::Executing { futures: empty_futures, ios_rx, barrier_rx };
} else {
state = State::ShuttingDown { futures: empty_futures, barrier_done: None }
}
}
barrier_done = barrier_rx.recv() => {
if let Some(barrier_done) = barrier_done {
state = State::Barriering { futures: empty_futures, ios_rx, barrier_rx, barrier_done };
} else {
state = State::ShuttingDown { futures: empty_futures, barrier_done: None };
}
}
}
}
State::Executing {
mut futures,
mut ios_rx,
mut barrier_rx,
} => {
tokio::select! {
() = cancel.cancelled() => {
state = State::ShuttingDown { futures, barrier_done: None };
}
res = futures.next() => {
assert!(res.is_some());
if futures.is_empty() {
state = State::Waiting { empty_futures: futures, ios_rx, barrier_rx };
} else {
state = State::Executing { futures, ios_rx, barrier_rx };
}
}
fut = ios_rx.recv() => {
if let Some(fut) = fut {
futures.push(fut);
state = State::Executing { futures, ios_rx, barrier_rx };
} else {
state = State::ShuttingDown { futures, barrier_done: None };
}
}
barrier_done = barrier_rx.recv() => {
if let Some(barrier_done) = barrier_done {
state = State::Barriering { futures, ios_rx, barrier_rx, barrier_done };
} else {
state = State::ShuttingDown { futures, barrier_done: None };
}
}
}
}
State::Barriering {
mut futures,
ios_rx,
barrier_rx,
barrier_done,
} => {
if futures.is_empty() {
barrier_done.send(()).unwrap();
state = State::Waiting {
empty_futures: futures,
ios_rx,
barrier_rx,
};
} else {
tokio::select! {
() = cancel.cancelled() => {
state = State::ShuttingDown { futures, barrier_done: Some(barrier_done) };
}
res = futures.next() => {
assert!(res.is_some());
if futures.is_empty() {
barrier_done.send(()).unwrap();
state = State::Waiting { empty_futures: futures , ios_rx, barrier_rx };
} else {
state = State::Barriering { futures, ios_rx, barrier_rx, barrier_done };
}
}
// in barriering mode, we don't accept new IOs or new barrier requests
}
}
}
State::ShuttingDown {
mut futures,
barrier_done,
} => {
trace!("shutting down");
while let Some(()) = futures.next().await {
// drain
}
if let Some(barrier_done) = barrier_done {
barrier_done.send(()).unwrap();
}
break;
}
}
}
drop(gate_guard); // drop it right before we exitlast
}.instrument(span));
IoConcurrency::FuturesUnordered {
ios_tx,
barriers_tx,
cancel_task_on_drop: _cancel_task_on_drop,
}
}
}
}
pub(crate) fn clone(&self) -> Self {
match self {
IoConcurrency::Serial => IoConcurrency::Serial,
IoConcurrency::Parallel => IoConcurrency::Parallel,
IoConcurrency::FuturesUnordered {
ios_tx,
barriers_tx,
cancel_task_on_drop,
} => IoConcurrency::FuturesUnordered {
ios_tx: ios_tx.clone(),
barriers_tx: barriers_tx.clone(),
cancel_task_on_drop: cancel_task_on_drop.clone(),
},
}
}
pub(crate) async fn spawn_io<F>(&mut self, fut: F)
where
F: std::future::Future<Output = ()> + Send + 'static,
@@ -193,12 +436,16 @@ impl IoConcurrency {
IoConcurrency::Parallel => {
tokio::spawn(fut);
}
IoConcurrency::FuturesUnordered { futures } => {
IoConcurrency::FuturesUnordered { ios_tx, .. } => {
let mut fut = Box::pin(fut);
match futures::poll!(&mut fut) {
Poll::Ready(()) => {}
Poll::Pending => {
futures.push(fut);
// opportunistic poll to give some boost (unproven if it helps, but sounds like a good idea)
if let Poll::Ready(()) = futures::poll!(&mut fut) {
return;
}
match ios_tx.send(fut) {
Ok(()) => {}
Err(_) => {
unreachable!("the io task must have exited, likely it panicked")
}
}
}
@@ -206,54 +453,15 @@ impl IoConcurrency {
}
}
#[cfg(test)]
#[derive(Debug, Copy, Clone)]
pub(crate) enum SelectedIoConcurrency {
Serial,
Parallel,
}
impl ValuesReconstructState {
pub(crate) fn new() -> Self {
pub(crate) fn new(io_concurrency: IoConcurrency) -> Self {
Self {
keys: HashMap::new(),
keys_done: KeySpaceRandomAccum::new(),
keys_with_image_coverage: None,
layers_visited: 0,
delta_layers_visited: 0,
io_concurrency: {
static IO_CONCURRENCY: once_cell::sync::Lazy<String> =
once_cell::sync::Lazy::new(|| {
std::env::var("NEON_PAGESERVER_VALUE_RECONSTRUCT_IO_CONCURRENCY")
.unwrap_or_else(|_| "serial".to_string())
});
match IO_CONCURRENCY.as_str() {
"parallel" => IoConcurrency::Parallel,
"serial" => IoConcurrency::Serial,
"futures-unordered" => IoConcurrency::FuturesUnordered {
futures: futures::stream::FuturesUnordered::new(),
},
x => panic!(
"Invalid value for NEON_PAGESERVER_VALUE_RECONSTRUCT_IO_CONCURRENCY: {}",
x
),
}
},
}
}
#[cfg(test)]
pub(crate) fn new_with_io_concurrency(io_concurrency: SelectedIoConcurrency) -> Self {
Self {
keys: HashMap::new(),
keys_done: KeySpaceRandomAccum::new(),
keys_with_image_coverage: None,
layers_visited: 0,
delta_layers_visited: 0,
io_concurrency: match io_concurrency {
SelectedIoConcurrency::Serial => IoConcurrency::Serial,
SelectedIoConcurrency::Parallel => IoConcurrency::Parallel,
},
io_concurrency,
}
}
@@ -345,12 +553,6 @@ impl ValuesReconstructState {
}
}
impl Default for ValuesReconstructState {
fn default() -> Self {
Self::new()
}
}
/// A key that uniquely identifies a layer in a timeline
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
pub(crate) enum LayerId {

View File

@@ -9,7 +9,7 @@ use utils::{
use super::failpoints::{Failpoint, FailpointKind};
use super::*;
use crate::{context::DownloadBehavior, tenant::storage_layer::LayerVisibilityHint};
use crate::{context::DownloadBehavior, tenant::storage_layer::{IoConcurrency, LayerVisibilityHint}};
use crate::{task_mgr::TaskKind, tenant::harness::TenantHarness};
/// Used in tests to advance a future to wanted await point, and not futher.
@@ -55,7 +55,7 @@ async fn smoke_test() {
};
let img_before = {
let mut data = ValuesReconstructState::default();
let mut data = ValuesReconstructState::new(IoConcurrency::todo());
layer
.get_values_reconstruct_data(
controlfile_keyspace.clone(),
@@ -90,7 +90,7 @@ async fn smoke_test() {
// on accesses when the layer is evicted, it will automatically be downloaded.
let img_after = {
let mut data = ValuesReconstructState::default();
let mut data = ValuesReconstructState::new(IoConcurrency::todo());
layer
.get_values_reconstruct_data(
controlfile_keyspace.clone(),

View File

@@ -78,7 +78,8 @@ use crate::{
layer_map::{LayerMap, SearchResult},
metadata::TimelineMetadata,
storage_layer::{
inmemory_layer::IndexEntry, PersistentLayerDesc, ValueReconstructSituation,
inmemory_layer::IndexEntry, IoConcurrency, PersistentLayerDesc,
ValueReconstructSituation,
},
},
walingest::WalLagCooldown,
@@ -1014,7 +1015,7 @@ impl Timeline {
ranges: vec![key..key.next()],
};
let mut reconstruct_state = ValuesReconstructState::new();
let mut reconstruct_state = ValuesReconstructState::new(IoConcurrency::todo());
let vectored_res = self
.get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx)
@@ -1057,6 +1058,7 @@ impl Timeline {
&self,
keyspace: KeySpace,
lsn: Lsn,
io_concurrency: super::storage_layer::IoConcurrency,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
if !lsn.is_valid() {
@@ -1091,7 +1093,7 @@ impl Timeline {
.get_vectored_impl(
keyspace.clone(),
lsn,
&mut ValuesReconstructState::new(),
&mut ValuesReconstructState::new(io_concurrency),
ctx,
)
.await;
@@ -1147,7 +1149,7 @@ impl Timeline {
.get_vectored_impl(
keyspace.clone(),
lsn,
&mut ValuesReconstructState::default(),
&mut ValuesReconstructState::new(IoConcurrency::todo()),
ctx,
)
.await;
@@ -3429,17 +3431,33 @@ impl Timeline {
}
}
match reconstruct_state.io_concurrency {
// TODO: internalize
trace!("waiting for futures to complete");
match &reconstruct_state.io_concurrency {
super::storage_layer::IoConcurrency::Serial => (),
super::storage_layer::IoConcurrency::Parallel => (),
super::storage_layer::IoConcurrency::FuturesUnordered { ref mut futures } => {
trace!("waiting for futures to complete");
while let Some(()) = futures.next().await {
trace!("future completed");
super::storage_layer::IoConcurrency::FuturesUnordered { barriers_tx, .. } => {
let (tx, rx) = tokio::sync::oneshot::channel();
match barriers_tx.send(tx) {
Ok(()) => {}
Err(_) => {
return Err(GetVectoredError::Other(anyhow::anyhow!(
"concurrent io task dropped its barriers_rx, likely it panicked"
)));
}
}
trace!("futures completed");
match rx.await {
Ok(()) => {}
Err(_) => {
return Err(GetVectoredError::Other(anyhow::anyhow!(
"concurrent io task dropped the barrier_done, likely it panicked"
)));
}
}
}
}
trace!("futures completed");
Ok(TimelineVisitOutcome {
completed_keyspace,
@@ -4182,7 +4200,12 @@ impl Timeline {
|| (last_key_in_range && key_request_accum.raw_size() > 0)
{
let results = self
.get_vectored(key_request_accum.consume_keyspace(), lsn, ctx)
.get_vectored(
key_request_accum.consume_keyspace(),
lsn,
IoConcurrency::todo(),
ctx,
)
.await?;
if self.cancel.is_cancelled() {
@@ -4263,7 +4286,7 @@ impl Timeline {
start: Key,
) -> Result<ImageLayerCreationOutcome, CreateImageLayersError> {
// Metadata keys image layer creation.
let mut reconstruct_state = ValuesReconstructState::default();
let mut reconstruct_state = ValuesReconstructState::new(IoConcurrency::todo());
let begin = Instant::now();
let data = self
.get_vectored_impl(partition.clone(), lsn, &mut reconstruct_state, ctx)
@@ -5757,12 +5780,14 @@ impl Timeline {
lsn: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<Vec<(Key, Bytes)>> {
use super::storage_layer::SelectedIoConcurrency;
let mut all_data = Vec::new();
let guard = self.layers.read().await;
for layer in guard.layer_map()?.iter_historic_layers() {
if !layer.is_delta() && layer.image_layer_lsn() == lsn {
let layer = guard.get_from_desc(&layer);
let mut reconstruct_data = ValuesReconstructState::default();
let mut reconstruct_data = ValuesReconstructState::new(IoConcurrency::todo());
layer
.get_values_reconstruct_data(
KeySpace::single(Key::MIN..Key::MAX),