the final choice: async-timer 1.0beta15 with features=["tokio1"]

This commit is contained in:
Christian Schwarz
2024-11-21 11:15:02 +01:00
parent 89b6cb8eba
commit fa7ce2ca07
4 changed files with 29 additions and 3 deletions

20
Cargo.lock generated
View File

@@ -244,6 +244,19 @@ dependencies = [
"syn 2.0.52",
]
[[package]]
name = "async-timer"
version = "1.0.0-beta.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d420af8e042475e58a20d91af8eda7d6528989418c03f3f527e1c3415696f70"
dependencies = [
"error-code",
"libc",
"tokio",
"wasm-bindgen",
"web-time",
]
[[package]]
name = "async-trait"
version = "0.1.68"
@@ -1920,6 +1933,12 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "error-code"
version = "3.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5d9305ccc6942a704f4335694ecd3de2ea531b114ac2d51f5f843750787a92f"
[[package]]
name = "event-listener"
version = "2.5.3"
@@ -3590,6 +3609,7 @@ dependencies = [
"arc-swap",
"async-compression",
"async-stream",
"async-timer",
"bit_field",
"byteorder",
"bytes",

View File

@@ -47,6 +47,7 @@ anyhow = { version = "1.0", features = ["backtrace"] }
arc-swap = "1.6"
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
atomic-take = "1.1.0"
async-timer = { version= "1.0.0-beta.15", features = ["tokio1"] }
azure_core = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls", "hmac_rust"] }
azure_identity = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
azure_storage = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }

View File

@@ -15,6 +15,7 @@ anyhow.workspace = true
arc-swap.workspace = true
async-compression.workspace = true
async-stream.workspace = true
async-timer.workspace = true
bit_field.workspace = true
byteorder.workspace = true
bytes.workspace = true

View File

@@ -3,6 +3,7 @@
use anyhow::{bail, Context};
use async_compression::tokio::write::GzipEncoder;
use async_timer::Timer;
use bytes::Buf;
use futures::FutureExt;
use itertools::Itertools;
@@ -22,6 +23,7 @@ use pq_proto::FeStartupPacket;
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
use std::borrow::Cow;
use std::io;
use std::pin::Pin;
use std::str;
use std::str::FromStr;
use std::sync::Arc;
@@ -316,6 +318,8 @@ struct PageServerHandler {
/// See [`PageServerConf::server_side_batch_timeout`]
server_side_batch_timeout: Option<Duration>,
server_side_batch_timer: Pin<Box<async_timer::timer::Platform>>,
}
struct Carry {
@@ -585,6 +589,7 @@ impl PageServerHandler {
timeline_handles: TimelineHandles::new(tenant_manager),
cancel,
server_side_batch_timeout,
server_side_batch_timer: Box::pin(async_timer::new_timer(Duration::from_secs(999))), // reset each iteration
}
}
@@ -661,9 +666,8 @@ impl PageServerHandler {
.msg
]));
} else {
batching_deadline_storage = Some(Box::pin(async move {
tokio::time::sleep(batch_timeout).await;
}));
self.server_side_batch_timer.restart(batch_timeout);
batching_deadline_storage = Some(&mut self.server_side_batch_timer);
Either::Right(
batching_deadline_storage.as_mut().expect("we just set it"),
)