feat: Limit CPU in runtime (#3685) (#4782)

feat: add throttle runtime (#3685)
This commit is contained in:
pa
2024-10-24 15:30:24 +08:00
committed by GitHub
parent fcde0a4874
commit 9d3ee6384a
31 changed files with 787 additions and 41 deletions

30
Cargo.lock generated
View File

@@ -1808,6 +1808,17 @@ dependencies = [
"winapi",
]
[[package]]
name = "clocksource"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "129026dd5a8a9592d96916258f3a5379589e513ea5e86aeb0bd2530286e44e9e"
dependencies = [
"libc",
"time",
"winapi",
]
[[package]]
name = "cmake"
version = "0.1.51"
@@ -2332,16 +2343,24 @@ name = "common-runtime"
version = "0.9.5"
dependencies = [
"async-trait",
"clap 4.5.19",
"common-error",
"common-macro",
"common-telemetry",
"futures",
"lazy_static",
"num_cpus",
"once_cell",
"parking_lot 0.12.3",
"paste",
"pin-project",
"prometheus",
"rand",
"ratelimit",
"serde",
"serde_json",
"snafu 0.8.5",
"tempfile",
"tokio",
"tokio-metrics",
"tokio-metrics-collector",
@@ -9205,6 +9224,17 @@ dependencies = [
"rand",
]
[[package]]
name = "ratelimit"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c1bb13e2dcfa2232ac6887157aad8d9b3fe4ca57f7c8d4938ff5ea9be742300"
dependencies = [
"clocksource",
"parking_lot 0.12.3",
"thiserror",
]
[[package]]
name = "raw-cpuid"
version = "11.2.0"

View File

@@ -140,6 +140,7 @@ opentelemetry-proto = { version = "0.5", features = [
"with-serde",
"logs",
] }
parking_lot = "0.12"
parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] }
paste = "1.0"
pin-project = "1.0"
@@ -148,6 +149,7 @@ promql-parser = { version = "0.4.1" }
prost = "0.12"
raft-engine = { version = "0.4.1", default-features = false }
rand = "0.8"
ratelimit = "0.9"
regex = "1.8"
regex-automata = { version = "0.4" }
reqwest = { version = "0.12", default-features = false, features = [

View File

@@ -28,7 +28,7 @@ enum_dispatch = "0.3"
futures-util.workspace = true
lazy_static.workspace = true
moka = { workspace = true, features = ["future"] }
parking_lot = "0.12"
parking_lot.workspace = true
prometheus.workspace = true
prost.workspace = true
query.workspace = true

View File

@@ -4,21 +4,36 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[lib]
path = "src/lib.rs"
[[bin]]
name = "common-runtime-bin"
path = "src/bin.rs"
[lints]
workspace = true
[dependencies]
async-trait.workspace = true
clap.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-telemetry.workspace = true
futures.workspace = true
lazy_static.workspace = true
num_cpus.workspace = true
once_cell.workspace = true
parking_lot.workspace = true
paste.workspace = true
pin-project.workspace = true
prometheus.workspace = true
rand.workspace = true
ratelimit.workspace = true
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
tempfile.workspace = true
tokio.workspace = true
tokio-metrics = "0.3"
tokio-metrics-collector = { git = "https://github.com/MichaelScofield/tokio-metrics-collector.git", rev = "89d692d5753d28564a7aac73c6ac5aba22243ba0" }

View File

@@ -0,0 +1,60 @@
# Greptime Runtime
## Run performance test for different priority & workload type
```
# workspace is at this subcrate
cargo run --release -- --loop-cnt 500
```
## Related PRs & issues
- Preliminary support cpu limitation
ISSUE: https://github.com/GreptimeTeam/greptimedb/issues/3685
PR: https://github.com/GreptimeTeam/greptimedb/pull/4782
## CPU resource constraints (ThrottleableRuntime)
To achieve CPU resource constraints, we adopt the concept of rate limiting. When creating a future, we first wrap it with another layer of future to intercept the poll operation during runtime. By using the ratelimit library, we can simply implement a mechanism that allows only a limited number of polls for a batch of tasks under a certain priority within a specific time frame (the current token generation interval is set to 10ms).
The default used runtime can be switched by
``` rust
pub type Runtime = DefaultRuntime;
```
in `runtime.rs`.
We tested four type of workload with 5 priorities, whose setup are as follows:
``` rust
impl Priority {
fn ratelimiter_count(&self) -> Result<Option<Ratelimiter>> {
let max = 8000;
let gen_per_10ms = match self {
Priority::VeryLow => Some(2000),
Priority::Low => Some(4000),
Priority::Middle => Some(6000),
Priority::High => Some(8000),
Priority::VeryHigh => None,
};
if let Some(gen_per_10ms) = gen_per_10ms {
Ratelimiter::builder(gen_per_10ms, Duration::from_millis(10)) // generate poll count per 10ms
.max_tokens(max) // reserved token for batch request
.build()
.context(BuildRuntimeRateLimiterSnafu)
.map(Some)
} else {
Ok(None)
}
}
}
```
This is the preliminary experimental effect so far:
![](resources/rdme-exp.png)
## TODO
- Introduce PID to achieve more accurate limitation.

Binary file not shown.

After

Width:  |  Height:  |  Size: 226 KiB

View File

@@ -0,0 +1,205 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use clap::Parser;
#[derive(Debug, Default, Parser)]
pub struct Command {
#[clap(long)]
loop_cnt: usize,
}
fn main() {
common_telemetry::init_default_ut_logging();
let cmd = Command::parse();
test_diff_priority_cpu::test_diff_workload_priority(cmd.loop_cnt);
}
mod test_diff_priority_cpu {
use std::path::PathBuf;
use common_runtime::runtime::{BuilderBuild, Priority, RuntimeTrait};
use common_runtime::{Builder, Runtime};
use common_telemetry::debug;
use tempfile::TempDir;
fn compute_pi_str(precision: usize) -> String {
let mut pi = 0.0;
let mut sign = 1.0;
for i in 0..precision {
pi += sign / (2 * i + 1) as f64;
sign *= -1.0;
}
pi *= 4.0;
format!("{:.prec$}", pi, prec = precision)
}
macro_rules! def_workload_enum {
($($variant:ident),+) => {
#[derive(Debug)]
enum WorkloadType {
$($variant),+
}
/// array of workloads for iteration
const WORKLOADS: &'static [WorkloadType] = &[
$( WorkloadType::$variant ),+
];
};
}
def_workload_enum!(
ComputeHeavily,
ComputeHeavily2,
WriteFile,
SpawnBlockingWriteFile
);
async fn workload_compute_heavily() {
let prefix = 10;
for _ in 0..3000 {
let _ = compute_pi_str(prefix);
tokio::task::yield_now().await;
}
}
async fn workload_compute_heavily2() {
let prefix = 30;
for _ in 0..2000 {
let _ = compute_pi_str(prefix);
tokio::task::yield_now().await;
}
}
async fn workload_write_file(_idx: u64, tempdir: PathBuf) {
use tokio::io::AsyncWriteExt;
let prefix = 50;
let mut file = tokio::fs::OpenOptions::new()
.write(true)
.append(true)
.create(true)
.open(tempdir.join(format!("pi_{}", prefix)))
.await
.unwrap();
for i in 0..200 {
let pi = compute_pi_str(prefix);
if i % 2 == 0 {
file.write_all(pi.as_bytes()).await.unwrap();
}
}
}
async fn workload_spawn_blocking_write_file(tempdir: PathBuf) {
use std::io::Write;
let prefix = 100;
let mut file = Some(
std::fs::OpenOptions::new()
.append(true)
.create(true)
.open(tempdir.join(format!("pi_{}", prefix)))
.unwrap(),
);
for i in 0..100 {
let pi = compute_pi_str(prefix);
if i % 2 == 0 {
let mut file1 = file.take().unwrap();
file = Some(
tokio::task::spawn_blocking(move || {
file1.write_all(pi.as_bytes()).unwrap();
file1
})
.await
.unwrap(),
);
}
}
}
pub fn test_diff_workload_priority(loop_cnt: usize) {
let tempdir = tempfile::tempdir().unwrap();
let priorities = [
Priority::VeryLow,
Priority::Low,
Priority::Middle,
Priority::High,
Priority::VeryHigh,
];
for wl in WORKLOADS {
for p in priorities.iter() {
let runtime: Runtime = Builder::default()
.runtime_name("test")
.thread_name("test")
.worker_threads(8)
.priority(*p)
.build()
.expect("Fail to create runtime");
let runtime2 = runtime.clone();
runtime.block_on(test_spec_priority_and_workload(
*p, runtime2, wl, &tempdir, loop_cnt,
));
}
}
}
async fn test_spec_priority_and_workload(
priority: Priority,
runtime: Runtime,
workload_id: &WorkloadType,
tempdir: &TempDir,
loop_cnt: usize,
) {
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
debug!(
"testing cpu usage for priority {:?} workload_id {:?}",
priority, workload_id,
);
// start monitor thread
let mut tasks = vec![];
let start = std::time::Instant::now();
for i in 0..loop_cnt {
// persist cpu usage in json: {priority}.{workload_id}
match *workload_id {
WorkloadType::ComputeHeavily => {
tasks.push(runtime.spawn(workload_compute_heavily()));
}
WorkloadType::ComputeHeavily2 => {
tasks.push(runtime.spawn(workload_compute_heavily2()));
}
WorkloadType::SpawnBlockingWriteFile => {
tasks.push(runtime.spawn(workload_spawn_blocking_write_file(
tempdir.path().to_path_buf(),
)));
}
WorkloadType::WriteFile => {
tasks.push(
runtime.spawn(workload_write_file(i as u64, tempdir.path().to_path_buf())),
);
}
}
}
for task in tasks {
task.await.unwrap();
}
let elapsed = start.elapsed();
debug!(
"test cpu usage for priority {:?} workload_id {:?} elapsed {}ms",
priority,
workload_id,
elapsed.as_millis()
);
}
}

View File

@@ -33,6 +33,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to build runtime rate limiter"))]
BuildRuntimeRateLimiter {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: ratelimit::Error,
},
#[snafu(display("Repeated task {} is already started", name))]
IllegalState {
name: String,

View File

@@ -21,6 +21,7 @@ use once_cell::sync::Lazy;
use paste::paste;
use serde::{Deserialize, Serialize};
use crate::runtime::{BuilderBuild, RuntimeTrait};
use crate::{Builder, JoinHandle, Runtime};
const GLOBAL_WORKERS: usize = 8;

View File

@@ -17,6 +17,8 @@ pub mod global;
mod metrics;
mod repeated_task;
pub mod runtime;
pub mod runtime_default;
pub mod runtime_throttleable;
pub use global::{
block_on_compact, block_on_global, compact_runtime, create_runtime, global_runtime,

View File

@@ -23,6 +23,7 @@ use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::error::{IllegalStateSnafu, Result, WaitGcTaskStopSnafu};
use crate::runtime::RuntimeTrait;
use crate::Runtime;
/// Task to execute repeatedly.

View File

@@ -19,24 +19,20 @@ use std::thread;
use std::time::Duration;
use snafu::ResultExt;
use tokio::runtime::{Builder as RuntimeBuilder, Handle};
use tokio::runtime::Builder as RuntimeBuilder;
use tokio::sync::oneshot;
pub use tokio::task::{JoinError, JoinHandle};
use crate::error::*;
use crate::metrics::*;
use crate::runtime_default::DefaultRuntime;
use crate::runtime_throttleable::ThrottleableRuntime;
// configurations
pub type Runtime = DefaultRuntime;
static RUNTIME_ID: AtomicUsize = AtomicUsize::new(0);
/// A runtime to run future tasks
#[derive(Clone, Debug)]
pub struct Runtime {
name: String,
handle: Handle,
// Used to receive a drop signal when dropper is dropped, inspired by databend
_dropper: Arc<Dropper>,
}
/// Dropping the dropper will cause runtime to shutdown.
#[derive(Debug)]
pub struct Dropper {
@@ -50,45 +46,42 @@ impl Drop for Dropper {
}
}
impl Runtime {
pub fn builder() -> Builder {
pub trait RuntimeTrait {
/// Get a runtime builder
fn builder() -> Builder {
Builder::default()
}
/// Spawn a future and execute it in this thread pool
///
/// Similar to tokio::runtime::Runtime::spawn()
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.handle.spawn(future)
}
F::Output: Send + 'static;
/// Run the provided function on an executor dedicated to blocking
/// operations.
pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
self.handle.spawn_blocking(func)
}
R: Send + 'static;
/// Run a future to complete, this is the runtime's entry point
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
self.handle.block_on(future)
}
fn block_on<F: Future>(&self, future: F) -> F::Output;
pub fn name(&self) -> &str {
&self.name
}
/// Get the name of the runtime
fn name(&self) -> &str;
}
pub trait BuilderBuild<R: RuntimeTrait> {
fn build(&mut self) -> Result<R>;
}
pub struct Builder {
runtime_name: String,
thread_name: String,
priority: Priority,
builder: RuntimeBuilder,
}
@@ -98,11 +91,17 @@ impl Default for Builder {
runtime_name: format!("runtime-{}", RUNTIME_ID.fetch_add(1, Ordering::Relaxed)),
thread_name: "default-worker".to_string(),
builder: RuntimeBuilder::new_multi_thread(),
priority: Priority::VeryHigh,
}
}
}
impl Builder {
pub fn priority(&mut self, priority: Priority) -> &mut Self {
self.priority = priority;
self
}
/// Sets the number of worker threads the Runtime will use.
///
/// This can be any number above 0. The default value is the number of cores available to the system.
@@ -139,8 +138,10 @@ impl Builder {
self.thread_name = val.into();
self
}
}
pub fn build(&mut self) -> Result<Runtime> {
impl BuilderBuild<DefaultRuntime> for Builder {
fn build(&mut self) -> Result<DefaultRuntime> {
let runtime = self
.builder
.enable_all()
@@ -163,13 +164,48 @@ impl Builder {
#[cfg(tokio_unstable)]
register_collector(name.clone(), &handle);
Ok(Runtime {
name,
Ok(DefaultRuntime::new(
&name,
handle,
_dropper: Arc::new(Dropper {
Arc::new(Dropper {
close: Some(send_stop),
}),
})
))
}
}
impl BuilderBuild<ThrottleableRuntime> for Builder {
fn build(&mut self) -> Result<ThrottleableRuntime> {
let runtime = self
.builder
.enable_all()
.thread_name(self.thread_name.clone())
.on_thread_start(on_thread_start(self.thread_name.clone()))
.on_thread_stop(on_thread_stop(self.thread_name.clone()))
.on_thread_park(on_thread_park(self.thread_name.clone()))
.on_thread_unpark(on_thread_unpark(self.thread_name.clone()))
.build()
.context(BuildRuntimeSnafu)?;
let name = self.runtime_name.clone();
let handle = runtime.handle().clone();
let (send_stop, recv_stop) = oneshot::channel();
// Block the runtime to shutdown.
let _ = thread::Builder::new()
.name(format!("{}-blocker", self.thread_name))
.spawn(move || runtime.block_on(recv_stop));
#[cfg(tokio_unstable)]
register_collector(name.clone(), &handle);
ThrottleableRuntime::new(
&name,
self.priority,
handle,
Arc::new(Dropper {
close: Some(send_stop),
}),
)
}
}
@@ -213,8 +249,18 @@ fn on_thread_unpark(thread_name: String) -> impl Fn() + 'static {
}
}
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
pub enum Priority {
VeryLow = 0,
Low = 1,
Middle = 2,
High = 3,
VeryHigh = 4,
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::thread;
use std::time::Duration;
@@ -235,12 +281,12 @@ mod tests {
#[test]
fn test_metric() {
let runtime = Builder::default()
let runtime: Runtime = Builder::default()
.worker_threads(5)
.thread_name("test_runtime_metric")
.build()
.unwrap();
// wait threads created
// wait threads create
thread::sleep(Duration::from_millis(50));
let _handle = runtime.spawn(async {

View File

@@ -0,0 +1,77 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::future::Future;
use std::sync::Arc;
use tokio::runtime::Handle;
pub use tokio::task::JoinHandle;
use crate::runtime::{Dropper, RuntimeTrait};
use crate::Builder;
/// A runtime to run future tasks
#[derive(Clone, Debug)]
pub struct DefaultRuntime {
name: String,
handle: Handle,
// Used to receive a drop signal when dropper is dropped, inspired by databend
_dropper: Arc<Dropper>,
}
impl DefaultRuntime {
pub(crate) fn new(name: &str, handle: Handle, dropper: Arc<Dropper>) -> Self {
Self {
name: name.to_string(),
handle,
_dropper: dropper,
}
}
}
impl RuntimeTrait for DefaultRuntime {
fn builder() -> Builder {
Builder::default()
}
/// Spawn a future and execute it in this thread pool
///
/// Similar to tokio::runtime::Runtime::spawn()
fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.handle.spawn(future)
}
/// Run the provided function on an executor dedicated to blocking
/// operations.
fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
self.handle.spawn_blocking(func)
}
/// Run a future to complete, this is the runtime's entry point
fn block_on<F: Future>(&self, future: F) -> F::Output {
self.handle.block_on(future)
}
fn name(&self) -> &str {
&self.name
}
}

View File

@@ -0,0 +1,285 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use futures::FutureExt;
use ratelimit::Ratelimiter;
use snafu::ResultExt;
use tokio::runtime::Handle;
pub use tokio::task::JoinHandle;
use tokio::time::Sleep;
use crate::error::{BuildRuntimeRateLimiterSnafu, Result};
use crate::runtime::{Dropper, Priority, RuntimeTrait};
use crate::Builder;
struct RuntimeRateLimiter {
pub ratelimiter: Option<Ratelimiter>,
}
impl Debug for RuntimeRateLimiter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RuntimeThrottleShareWithFuture")
.field(
"ratelimiter_max_tokens",
&self.ratelimiter.as_ref().map(|v| v.max_tokens()),
)
.field(
"ratelimiter_refill_amount",
&self.ratelimiter.as_ref().map(|v| v.refill_amount()),
)
.finish()
}
}
/// A runtime to run future tasks
#[derive(Clone, Debug)]
pub struct ThrottleableRuntime {
name: String,
handle: Handle,
shared_with_future: Arc<RuntimeRateLimiter>,
// Used to receive a drop signal when dropper is dropped, inspired by databend
_dropper: Arc<Dropper>,
}
impl ThrottleableRuntime {
pub(crate) fn new(
name: &str,
priority: Priority,
handle: Handle,
dropper: Arc<Dropper>,
) -> Result<Self> {
Ok(Self {
name: name.to_string(),
handle,
shared_with_future: Arc::new(RuntimeRateLimiter {
ratelimiter: priority.ratelimiter_count()?,
}),
_dropper: dropper,
})
}
}
impl RuntimeTrait for ThrottleableRuntime {
fn builder() -> Builder {
Builder::default()
}
/// Spawn a future and execute it in this thread pool
///
/// Similar to tokio::runtime::Runtime::spawn()
fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.handle
.spawn(ThrottleFuture::new(self.shared_with_future.clone(), future))
}
/// Run the provided function on an executor dedicated to blocking
/// operations.
fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
self.handle.spawn_blocking(func)
}
/// Run a future to complete, this is the runtime's entry point
fn block_on<F: Future>(&self, future: F) -> F::Output {
self.handle.block_on(future)
}
fn name(&self) -> &str {
&self.name
}
}
enum State {
Pollable,
Throttled(Pin<Box<Sleep>>),
}
impl State {
fn unwrap_backoff(&mut self) -> &mut Pin<Box<Sleep>> {
match self {
State::Throttled(sleep) => sleep,
_ => panic!("unwrap_backoff failed"),
}
}
}
#[pin_project::pin_project]
pub struct ThrottleFuture<F: Future + Send + 'static> {
#[pin]
future: F,
/// RateLimiter of this future
handle: Arc<RuntimeRateLimiter>,
state: State,
}
impl<F> ThrottleFuture<F>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
fn new(handle: Arc<RuntimeRateLimiter>, future: F) -> Self {
Self {
future,
handle,
state: State::Pollable,
}
}
}
impl<F> Future for ThrottleFuture<F>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
type Output = F::Output;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.state {
State::Pollable => {}
State::Throttled(ref mut sleep) => match sleep.poll_unpin(cx) {
Poll::Ready(_) => {
*this.state = State::Pollable;
}
Poll::Pending => return Poll::Pending,
},
};
if let Some(ratelimiter) = &this.handle.ratelimiter {
if let Err(wait) = ratelimiter.try_wait() {
*this.state = State::Throttled(Box::pin(tokio::time::sleep(wait)));
match this.state.unwrap_backoff().poll_unpin(cx) {
Poll::Ready(_) => {
*this.state = State::Pollable;
}
Poll::Pending => {
return Poll::Pending;
}
}
}
}
let poll_res = this.future.poll(cx);
match poll_res {
Poll::Ready(r) => Poll::Ready(r),
Poll::Pending => Poll::Pending,
}
}
}
impl Priority {
fn ratelimiter_count(&self) -> Result<Option<Ratelimiter>> {
let max = 8000;
let gen_per_10ms = match self {
Priority::VeryLow => Some(2000),
Priority::Low => Some(4000),
Priority::Middle => Some(6000),
Priority::High => Some(8000),
Priority::VeryHigh => None,
};
if let Some(gen_per_10ms) = gen_per_10ms {
Ratelimiter::builder(gen_per_10ms, Duration::from_millis(10)) // generate poll count per 10ms
.max_tokens(max) // reserved token for batch request
.build()
.context(BuildRuntimeRateLimiterSnafu)
.map(Some)
} else {
Ok(None)
}
}
}
#[cfg(test)]
mod tests {
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tokio::time::Duration;
use super::*;
use crate::runtime::BuilderBuild;
#[tokio::test]
async fn test_throttleable_runtime_spawn_simple() {
for p in [
Priority::VeryLow,
Priority::Low,
Priority::Middle,
Priority::High,
Priority::VeryHigh,
] {
let runtime: ThrottleableRuntime = Builder::default()
.runtime_name("test")
.thread_name("test")
.worker_threads(8)
.priority(p)
.build()
.expect("Fail to create runtime");
// Spawn a simple future that returns 42
let handle = runtime.spawn(async {
tokio::time::sleep(Duration::from_millis(10)).await;
42
});
let result = handle.await.expect("Task panicked");
assert_eq!(result, 42);
}
}
#[tokio::test]
async fn test_throttleable_runtime_spawn_complex() {
let tempdir = tempfile::tempdir().unwrap();
for p in [
Priority::VeryLow,
Priority::Low,
Priority::Middle,
Priority::High,
Priority::VeryHigh,
] {
let runtime: ThrottleableRuntime = Builder::default()
.runtime_name("test")
.thread_name("test")
.worker_threads(8)
.priority(p)
.build()
.expect("Fail to create runtime");
let tempdirpath = tempdir.path().to_path_buf();
let handle = runtime.spawn(async move {
let mut file = File::create(tempdirpath.join("test.txt")).await.unwrap();
file.write_all(b"Hello, world!").await.unwrap();
42
});
let result = handle.await.expect("Task panicked");
assert_eq!(result, 42);
}
}
}

View File

@@ -26,7 +26,7 @@ opentelemetry = { version = "0.21.0", default-features = false, features = [
opentelemetry-otlp = { version = "0.14.0", features = ["tokio"] }
opentelemetry-semantic-conventions = "0.13.0"
opentelemetry_sdk = { version = "0.21.0", features = ["rt-tokio"] }
parking_lot = { version = "0.12" }
parking_lot.workspace = true
prometheus.workspace = true
serde.workspace = true
serde_json.workspace = true

View File

@@ -23,6 +23,7 @@ use common_function::function::FunctionRef;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::prelude::ScalarUdf;
use common_query::Output;
use common_runtime::runtime::{BuilderBuild, RuntimeTrait};
use common_runtime::Runtime;
use datafusion_expr::LogicalPlan;
use query::dataframe::DataFrame;

View File

@@ -42,7 +42,7 @@ humantime-serde.workspace = true
itertools.workspace = true
lazy_static.workspace = true
once_cell.workspace = true
parking_lot = "0.12"
parking_lot.workspace = true
prometheus.workspace = true
prost.workspace = true
rand.workspace = true

View File

@@ -24,6 +24,7 @@ pub mod mock {
use client::Client;
use common_grpc::channel_manager::ChannelManager;
use common_meta::peer::Peer;
use common_runtime::runtime::BuilderBuild;
use common_runtime::{Builder as RuntimeBuilder, Runtime};
use servers::grpc::region_server::{RegionServerHandler, RegionServerRequestHandler};
use tokio::sync::mpsc;

View File

@@ -21,6 +21,7 @@ use async_trait::async_trait;
use async_walkdir::{Filtering, WalkDir};
use base64::prelude::BASE64_URL_SAFE;
use base64::Engine;
use common_runtime::runtime::RuntimeTrait;
use common_telemetry::{info, warn};
use futures::{FutureExt, StreamExt};
use moka::future::Cache;

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_runtime::runtime::RuntimeTrait;
use common_runtime::JoinHandle;
use futures::Future;
use rustpython_vm::builtins::PyBaseExceptionRef;

View File

@@ -72,7 +72,7 @@ openmetrics-parser = "0.4"
# opensrv-mysql = "0.7.0"
opensrv-mysql = { git = "https://github.com/datafuselabs/opensrv", rev = "6bbc3b65e6b19212c4f7fc4f40c20daf6f452deb" }
opentelemetry-proto.workspace = true
parking_lot = "0.12"
parking_lot.workspace = true
pgwire = { version = "0.25.0", default-features = false, features = ["server-api-ring"] }
pin-project = "1.0"
pipeline.workspace = true

View File

@@ -25,6 +25,7 @@ use common_catalog::parse_catalog_and_schema_from_db_string;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::Output;
use common_runtime::runtime::RuntimeTrait;
use common_runtime::Runtime;
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{debug, error, tracing};

View File

@@ -18,6 +18,7 @@ use api::v1::region::region_server::Region as RegionServer;
use api::v1::region::{region_request, RegionRequest, RegionResponse};
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use common_runtime::runtime::RuntimeTrait;
use common_runtime::Runtime;
use common_telemetry::tracing::info_span;
use common_telemetry::tracing_context::{FutureExt, TracingContext};

View File

@@ -18,6 +18,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use auth::UserProviderRef;
use common_runtime::runtime::RuntimeTrait;
use common_runtime::Runtime;
use common_telemetry::{debug, warn};
use futures::StreamExt;

View File

@@ -18,6 +18,7 @@ use std::sync::Arc;
use ::auth::UserProviderRef;
use async_trait::async_trait;
use common_runtime::runtime::RuntimeTrait;
use common_runtime::Runtime;
use common_telemetry::{debug, warn};
use futures::StreamExt;

View File

@@ -22,6 +22,7 @@ use async_trait::async_trait;
use auth::tests::MockUserProvider;
use auth::UserProviderRef;
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_runtime::runtime::BuilderBuild;
use common_runtime::{Builder as RuntimeBuilder, Runtime};
use servers::error::{Result, StartGrpcSnafu, TcpBindSnafu};
use servers::grpc::flight::FlightCraftWrapper;

View File

@@ -19,6 +19,7 @@ use std::time::Duration;
use auth::tests::{DatabaseAuthInfo, MockUserProvider};
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_recordbatch::RecordBatch;
use common_runtime::runtime::BuilderBuild;
use common_runtime::Builder as RuntimeBuilder;
use datatypes::prelude::VectorRef;
use datatypes::schema::{ColumnSchema, Schema};

View File

@@ -19,6 +19,7 @@ use std::time::Duration;
use auth::tests::{DatabaseAuthInfo, MockUserProvider};
use auth::UserProviderRef;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_runtime::runtime::BuilderBuild;
use common_runtime::Builder as RuntimeBuilder;
use pgwire::api::Type;
use rand::rngs::StdRng;

View File

@@ -35,6 +35,7 @@ use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_meta::DatanodeId;
use common_runtime::runtime::BuilderBuild;
use common_runtime::Builder as RuntimeBuilder;
use common_test_util::temp_dir::create_temp_dir;
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};

View File

@@ -25,7 +25,8 @@ use common_base::secrets::ExposeSecret;
use common_config::Configurable;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_runtime::Builder as RuntimeBuilder;
use common_runtime::runtime::BuilderBuild;
use common_runtime::{Builder as RuntimeBuilder, Runtime};
use common_telemetry::warn;
use common_test_util::ports;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
@@ -494,7 +495,7 @@ pub async fn setup_grpc_server_with(
) -> (String, TestGuard, Arc<GrpcServer>) {
let instance = setup_standalone_instance(name, store_type).await;
let runtime = RuntimeBuilder::default()
let runtime: Runtime = RuntimeBuilder::default()
.worker_threads(2)
.thread_name("grpc-handlers")
.build()

View File

@@ -25,6 +25,7 @@ use common_catalog::consts::MITO_ENGINE;
use common_grpc::channel_manager::ClientTlsOption;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_runtime::runtime::{BuilderBuild, RuntimeTrait};
use common_runtime::Runtime;
use common_test_util::find_workspace_path;
use servers::grpc::builder::GrpcServerBuilder;