From f54cf567fff27400bbca6a9747d92cd4437a1b35 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Sat, 14 Sep 2024 13:22:28 +0000 Subject: [PATCH] implement io concurrency control (serial / parallel). bug still happens --- pageserver/src/tenant/storage_layer.rs | 52 +++++++++++++++++++ .../src/tenant/storage_layer/delta_layer.rs | 2 +- .../src/tenant/storage_layer/image_layer.rs | 2 +- .../tenant/storage_layer/inmemory_layer.rs | 2 +- 4 files changed, 55 insertions(+), 3 deletions(-) diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index a344948d5c..2a9b83d0c2 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -154,6 +154,37 @@ pub(crate) struct ValuesReconstructState { // Statistics that are still accessible as a caller of `get_vectored_impl`. layers_visited: u32, delta_layers_visited: u32, + + io_concurrency: IoConcurrency, +} + +enum IoConcurrency { + Serial { + prev_io: Option>, + }, + Parallel, +} + +impl IoConcurrency { + pub(crate) fn spawn_io(&mut self, fut: F) + where + F: std::future::Future + Send + 'static, + { + match self { + IoConcurrency::Serial { prev_io } => { + let prev = prev_io.take(); + *prev_io = Some(tokio::spawn(async move { + if let Some(prev) = prev { + prev.await.unwrap(); + } + fut.await; + })); + } + IoConcurrency::Parallel => { + tokio::spawn(fut); + } + } + } } impl ValuesReconstructState { @@ -164,9 +195,30 @@ impl ValuesReconstructState { keys_with_image_coverage: None, layers_visited: 0, delta_layers_visited: 0, + io_concurrency: { + static IO_CONCURRENCY: once_cell::sync::Lazy = + once_cell::sync::Lazy::new(|| { + std::env::var("NEON_PAGESERVER_VALUE_RECONSTRUCT_IO_CONCURRENCY").unwrap() + }); + match IO_CONCURRENCY.as_str() { + "parallel" => IoConcurrency::Parallel, + "serial" => IoConcurrency::Serial { prev_io: None }, + x => panic!( + "Invalid value for NEON_PAGESERVER_VALUE_RECONSTRUCT_IO_CONCURRENCY: {}", + x + ), + } + }, } } + pub(crate) fn spawn_io(&mut self, fut: F) + where + F: std::future::Future + Send + 'static, + { + self.io_concurrency.spawn_io(fut); + } + /// Associate a key with the error which it encountered and mark it as done pub(crate) fn on_key_error(&mut self, key: Key, err: PageReconstructError) { let previous = self.keys.insert(key, Err(err)); diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 0c209fb2ac..e77e358d0b 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -1009,7 +1009,7 @@ impl DeltaLayerInner { let read_from = self.file.clone(); let read_ctx = ctx.attached_child(); - tokio::task::spawn(async move { + reconstruct_state.spawn_io(async move { let vectored_blob_reader = VectoredBlobReader::new(&read_from); let buf = BytesMut::with_capacity(buf_size); diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 2e3dbcff49..a51465010c 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -612,7 +612,7 @@ impl ImageLayerInner { let read_from = self.file.clone(); let read_ctx = ctx.attached_child(); - tokio::task::spawn(async move { + reconstruct_state.spawn_io(async move { let buf = BytesMut::with_capacity(buf_size); let vectored_blob_reader = VectoredBlobReader::new(&read_from); let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await; diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index e60ac3452d..462d49a3df 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -478,7 +478,7 @@ impl InMemoryLayer { let read_from = inner.file.clone(); let read_ctx = ctx.attached_child(); - tokio::task::spawn(async move { + reconstruct_state.spawn_io(async move { let locked = read_from.read().await; let f = vectored_dio_read::execute( &*locked,