diff --git a/Cargo.lock b/Cargo.lock index c7af140f7d..56f74bcdac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -244,6 +244,19 @@ dependencies = [ "syn 2.0.52", ] +[[package]] +name = "async-timer" +version = "1.0.0-beta.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d420af8e042475e58a20d91af8eda7d6528989418c03f3f527e1c3415696f70" +dependencies = [ + "error-code", + "libc", + "tokio", + "wasm-bindgen", + "web-time", +] + [[package]] name = "async-trait" version = "0.1.68" @@ -1920,6 +1933,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "error-code" +version = "3.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5d9305ccc6942a704f4335694ecd3de2ea531b114ac2d51f5f843750787a92f" + [[package]] name = "event-listener" version = "2.5.3" @@ -3590,6 +3609,7 @@ dependencies = [ "arc-swap", "async-compression", "async-stream", + "async-timer", "bit_field", "byteorder", "bytes", diff --git a/Cargo.toml b/Cargo.toml index dbda930535..7797eb29ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ anyhow = { version = "1.0", features = ["backtrace"] } arc-swap = "1.6" async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] } atomic-take = "1.1.0" +async-timer = { version= "1.0.0-beta.15", features = ["tokio1"] } azure_core = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls", "hmac_rust"] } azure_identity = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] } azure_storage = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] } diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 143d8236df..ed2ee2f5c2 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -15,6 +15,7 @@ anyhow.workspace = true arc-swap.workspace = true async-compression.workspace = true async-stream.workspace = true +async-timer.workspace = true bit_field.workspace = true byteorder.workspace = true bytes.workspace = true diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 3472dda378..3e01887355 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -3,6 +3,7 @@ use anyhow::{bail, Context}; use async_compression::tokio::write::GzipEncoder; +use async_timer::Timer; use bytes::Buf; use futures::FutureExt; use itertools::Itertools; @@ -22,6 +23,7 @@ use pq_proto::FeStartupPacket; use pq_proto::{BeMessage, FeMessage, RowDescriptor}; use std::borrow::Cow; use std::io; +use std::pin::Pin; use std::str; use std::str::FromStr; use std::sync::Arc; @@ -316,6 +318,8 @@ struct PageServerHandler { /// See [`PageServerConf::server_side_batch_timeout`] server_side_batch_timeout: Option, + + server_side_batch_timer: Pin>, } struct Carry { @@ -585,6 +589,7 @@ impl PageServerHandler { timeline_handles: TimelineHandles::new(tenant_manager), cancel, server_side_batch_timeout, + server_side_batch_timer: Box::pin(async_timer::new_timer(Duration::from_secs(999))), // reset each iteration } } @@ -661,9 +666,8 @@ impl PageServerHandler { .msg ])); } else { - batching_deadline_storage = Some(Box::pin(async move { - tokio::time::sleep(batch_timeout).await; - })); + self.server_side_batch_timer.restart(batch_timeout); + batching_deadline_storage = Some(&mut self.server_side_batch_timer); Either::Right( batching_deadline_storage.as_mut().expect("we just set it"), )