From 689788cbbaf9db2c86b6be22e23cde475e1cbca0 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 20 Nov 2024 14:36:46 +0100 Subject: [PATCH] async-timer based approach (again, with data) Yep, it's clearly the best one with best batching factor at lowest CPU usage. https://www.notion.so/neondatabase/benchmarking-notes-143f189e004780c4a630cb5f426e39ba?pvs=4#144f189e004780d0a205e081458b46db --- Cargo.lock | 12 ++++++++++++ Cargo.toml | 1 + pageserver/Cargo.toml | 1 + pageserver/src/page_service.rs | 17 ++++++++++++++--- .../pageserver/test_pageserver_getpage_merge.py | 4 +++- 5 files changed, 31 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c7af140f7d..9196015057 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -244,6 +244,17 @@ dependencies = [ "syn 2.0.52", ] +[[package]] +name = "async-timer" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5fa6ed76cb2aa820707b4eb9ec46f42da9ce70b0eafab5e5e34942b38a44d5" +dependencies = [ + "libc", + "wasm-bindgen", + "winapi", +] + [[package]] name = "async-trait" version = "0.1.68" @@ -3590,6 +3601,7 @@ dependencies = [ "arc-swap", "async-compression", "async-stream", + "async-timer", "bit_field", "byteorder", "bytes", diff --git a/Cargo.toml b/Cargo.toml index dbda930535..d74efd51f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ anyhow = { version = "1.0", features = ["backtrace"] } arc-swap = "1.6" async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] } atomic-take = "1.1.0" +async-timer = "0.7.4" azure_core = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls", "hmac_rust"] } azure_identity = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] } azure_storage = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] } diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 143d8236df..ed2ee2f5c2 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -15,6 +15,7 @@ anyhow.workspace = true arc-swap.workspace = true async-compression.workspace = true async-stream.workspace = true +async-timer.workspace = true bit_field.workspace = true byteorder.workspace = true bytes.workspace = true diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 3472dda378..c32f889958 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -3,6 +3,7 @@ use anyhow::{bail, Context}; use async_compression::tokio::write::GzipEncoder; +use async_timer::Oneshot; use bytes::Buf; use futures::FutureExt; use itertools::Itertools; @@ -22,6 +23,7 @@ use pq_proto::FeStartupPacket; use pq_proto::{BeMessage, FeMessage, RowDescriptor}; use std::borrow::Cow; use std::io; +use std::pin::Pin; use std::str; use std::str::FromStr; use std::sync::Arc; @@ -316,6 +318,8 @@ struct PageServerHandler { /// See [`PageServerConf::server_side_batch_timeout`] server_side_batch_timeout: Option, + + server_side_batch_timer: Pin>, } struct Carry { @@ -585,6 +589,9 @@ impl PageServerHandler { timeline_handles: TimelineHandles::new(tenant_manager), cancel, server_side_batch_timeout, + server_side_batch_timer: Box::pin(async_timer::oneshot::Timer::new( + Duration::from_secs(999), + )), // reset each iteration } } @@ -661,9 +668,13 @@ impl PageServerHandler { .msg ])); } else { - batching_deadline_storage = Some(Box::pin(async move { - tokio::time::sleep(batch_timeout).await; - })); + std::future::poll_fn(|ctx| { + self.server_side_batch_timer + .restart(batch_timeout, ctx.waker()); + std::task::Poll::Ready(()) + }) + .await; + batching_deadline_storage = Some(&mut self.server_side_batch_timer); Either::Right( batching_deadline_storage.as_mut().expect("we just set it"), ) diff --git a/test_runner/performance/pageserver/test_pageserver_getpage_merge.py b/test_runner/performance/pageserver/test_pageserver_getpage_merge.py index be7cf66c79..850bc1b8df 100644 --- a/test_runner/performance/pageserver/test_pageserver_getpage_merge.py +++ b/test_runner/performance/pageserver/test_pageserver_getpage_merge.py @@ -9,7 +9,7 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnvBuilder from fixtures.utils import humantime_to_ms -TARGET_RUNTIME = 60 +TARGET_RUNTIME = 5 @pytest.mark.parametrize( @@ -18,10 +18,12 @@ TARGET_RUNTIME = 60 # the next 4 cases demonstrate how not-batchable workloads suffer from batching timeout (50, None, TARGET_RUNTIME, 1, 128, "not batchable no batching"), (50, "10us", TARGET_RUNTIME, 1, 128, "not batchable 10us timeout"), + (50, "20us", TARGET_RUNTIME, 1, 128, "not batchable 20us timeout"), (50, "1ms", TARGET_RUNTIME, 1, 128, "not batchable 1ms timeout"), # the next 4 cases demonstrate how batchable workloads benefit from batching (50, None, TARGET_RUNTIME, 100, 128, "batchable no batching"), (50, "10us", TARGET_RUNTIME, 100, 128, "batchable 10us timeout"), + (50, "20us", TARGET_RUNTIME, 100, 128, "batchable 20us timeout"), (50, "100us", TARGET_RUNTIME, 100, 128, "batchable 100us timeout"), (50, "1ms", TARGET_RUNTIME, 100, 128, "batchable 1ms timeout"), ],