From 379d2e2f50494dd1f9493bee593ffd333741465b Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Tue, 21 Jun 2022 16:09:15 +0800 Subject: [PATCH] feature: runtime crate and global runtimes (#49) * feat: init common runtime crate * feat: tokio Runtime wrapper and global runtime functions * feat: adds block_on_read, block_on_write, block_on_bg functions to runtime * refactor: panic when configure global runtimes which are already initialized * refactor: adds read/write/bg thread pool size * fix: code style * fix: clippy warning * fix: test_metric panic * fix: address CR problems * log: adds log when creating runtime --- Cargo.lock | 31 +++- Cargo.toml | 1 + src/common/runtime/Cargo.toml | 17 ++ src/common/runtime/src/error.rs | 25 +++ src/common/runtime/src/global.rs | 197 +++++++++++++++++++++++ src/common/runtime/src/lib.rs | 12 ++ src/common/runtime/src/metric.rs | 4 + src/common/runtime/src/runtime.rs | 252 ++++++++++++++++++++++++++++++ 8 files changed, 537 insertions(+), 2 deletions(-) create mode 100644 src/common/runtime/Cargo.toml create mode 100644 src/common/runtime/src/error.rs create mode 100644 src/common/runtime/src/global.rs create mode 100644 src/common/runtime/src/lib.rs create mode 100644 src/common/runtime/src/metric.rs create mode 100644 src/common/runtime/src/runtime.rs diff --git a/Cargo.lock b/Cargo.lock index 6c873666f6..d8e2355e5b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -603,6 +603,20 @@ dependencies = [ "tokio", ] +[[package]] +name = "common-runtime" +version = "0.1.0" +dependencies = [ + "common-error", + "common-telemetry", + "metrics", + "once_cell", + "paste", + "snafu", + "tokio", + "tokio-test", +] + [[package]] name = "common-telemetry" version = "0.1.0" @@ -2032,9 +2046,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.10.0" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9" +checksum = "7709cef83f0c1f58f666e746a08b21e0085f7440fa6a29cc194d68aac97a4225" [[package]] name = "opendal" @@ -3370,6 +3384,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-test" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53474327ae5e166530d17f2d956afcb4f8a004de581b3cae10f12006bc8163e3" +dependencies = [ + "async-stream", + "bytes", + "futures-core", + "tokio", + "tokio-stream", +] + [[package]] name = "tokio-util" version = "0.6.9" diff --git a/Cargo.toml b/Cargo.toml index a2d48d13d7..a0d58a55f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ members = [ "src/common/base", "src/common/error", "src/common/function", + "src/common/runtime", "src/common/telemetry", "src/common/query", "src/common/recordbatch", diff --git a/src/common/runtime/Cargo.toml b/src/common/runtime/Cargo.toml new file mode 100644 index 0000000000..8327e73b09 --- /dev/null +++ b/src/common/runtime/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "common-runtime" +version = "0.1.0" +edition = "2021" + + +[dependencies] +common-error = { path = "../error" } +common-telemetry = { path = "../telemetry" } +metrics = "0.18" +once_cell = "1.12" +paste = "1.0" +snafu = { version = "0.7", features = ["backtraces"] } +tokio = { version = "1.18", features = ["full"] } + +[dev-dependencies] +tokio-test = "0.4" diff --git a/src/common/runtime/src/error.rs b/src/common/runtime/src/error.rs new file mode 100644 index 0000000000..b6b5504db2 --- /dev/null +++ b/src/common/runtime/src/error.rs @@ -0,0 +1,25 @@ +use std::any::Any; + +use common_error::prelude::*; + +pub type Result = std::result::Result; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub(crate)))] +pub enum Error { + #[snafu(display("Failed to build runtime, source: {}", source))] + BuildRuntime { + source: std::io::Error, + backtrace: Backtrace, + }, +} + +impl ErrorExt for Error { + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/common/runtime/src/global.rs b/src/common/runtime/src/global.rs new file mode 100644 index 0000000000..2fec97b6be --- /dev/null +++ b/src/common/runtime/src/global.rs @@ -0,0 +1,197 @@ +//! Global runtimes +use std::future::Future; +use std::sync::{Mutex, Once}; + +use common_telemetry::logging; +use once_cell::sync::Lazy; +use paste::paste; + +use crate::{Builder, JoinHandle, Runtime}; + +const READ_WORKERS: usize = 8; +const WRITE_WORKERS: usize = 8; +const BG_WORKERS: usize = 8; + +pub fn create_runtime(thread_name: &str, worker_threads: usize) -> Runtime { + logging::info!( + "Creating runtime, thread name: {}, work_threads: {}.", + thread_name, + worker_threads + ); + Builder::default() + .thread_name(thread_name) + .worker_threads(worker_threads) + .build() + .expect("Fail to create runtime") +} + +struct GlobalRuntimes { + read_runtime: Runtime, + write_runtime: Runtime, + bg_runtime: Runtime, +} + +macro_rules! define_spawn { + ($type: ident) => { + paste! { + + fn [](&self, future: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.[<$type _runtime>].spawn(future) + } + + fn [](&self, future: F) -> JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + self.[<$type _runtime>].spawn_blocking(future) + } + + fn [](&self, future: F) -> F::Output { + self.[<$type _runtime>].block_on(future) + } + } + }; +} + +impl GlobalRuntimes { + define_spawn!(read); + define_spawn!(write); + define_spawn!(bg); + + fn new(read: Option, write: Option, background: Option) -> Self { + Self { + read_runtime: read.unwrap_or_else(|| create_runtime("read-worker", READ_WORKERS)), + write_runtime: write.unwrap_or_else(|| create_runtime("write-worker", WRITE_WORKERS)), + bg_runtime: background.unwrap_or_else(|| create_runtime("bg-worker", BG_WORKERS)), + } + } +} + +#[derive(Default)] +struct ConfigRuntimes { + read_runtime: Option, + write_runtime: Option, + bg_runtime: Option, + already_init: bool, +} + +static GLOBAL_RUNTIMES: Lazy = Lazy::new(|| { + let mut c = CONFIG_RUNTIMES.lock().unwrap(); + let read = c.read_runtime.take(); + let write = c.write_runtime.take(); + let background = c.bg_runtime.take(); + c.already_init = true; + + GlobalRuntimes::new(read, write, background) +}); + +static CONFIG_RUNTIMES: Lazy> = + Lazy::new(|| Mutex::new(ConfigRuntimes::default())); + +/// Initialize the global runtimes +/// +/// # Panics +/// Panics when the global runtimes are already initialized. +/// You should call this function before using any runtime functions. +pub fn init_global_runtimes( + read: Option, + write: Option, + background: Option, +) { + static START: Once = Once::new(); + START.call_once(move || { + let mut c = CONFIG_RUNTIMES.lock().unwrap(); + assert!(!c.already_init, "Global runtimes already initialized"); + c.read_runtime = read; + c.write_runtime = write; + c.bg_runtime = background; + }); +} + +macro_rules! define_global_runtime_spawn { + ($type: ident) => { + paste! { + #[doc = "Returns the global `" $type "` thread pool."] + pub fn [<$type _runtime>]() -> Runtime { + GLOBAL_RUNTIMES.[<$type _runtime>].clone() + } + + #[doc = "Spawn a future and execute it in `" $type "` thread pool."] + pub fn [](future: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + GLOBAL_RUNTIMES.[](future) + } + + #[doc = "Run the blocking operation in `" $type "` thread pool."] + pub fn [](future: F) -> JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + GLOBAL_RUNTIMES.[](future) + } + + #[doc = "Run a future to complete in `" $type "` thread pool."] + pub fn [](future: F) -> F::Output { + GLOBAL_RUNTIMES.[](future) + } + } + }; +} + +define_global_runtime_spawn!(read); +define_global_runtime_spawn!(write); +define_global_runtime_spawn!(bg); + +#[cfg(test)] +mod tests { + use tokio_test::assert_ok; + + use super::*; + + #[test] + fn test_spawn_block_on() { + let handle = spawn_read(async { 1 + 1 }); + assert_eq!(2, block_on_read(handle).unwrap()); + + let handle = spawn_write(async { 2 + 2 }); + assert_eq!(4, block_on_write(handle).unwrap()); + + let handle = spawn_bg(async { 3 + 3 }); + assert_eq!(6, block_on_bg(handle).unwrap()); + } + + macro_rules! define_spawn_blocking_test { + ($type: ident) => { + paste! { + #[test] + fn []() { + let runtime = [<$type _runtime>](); + let out = runtime.block_on(async move { + let inner = assert_ok!( + [](move || { + [](async move { "hello" }) + }).await + ); + + assert_ok!(inner.await) + }); + + assert_eq!(out, "hello") + } + } + }; + } + + define_spawn_blocking_test!(read); + define_spawn_blocking_test!(write); + define_spawn_blocking_test!(bg); +} diff --git a/src/common/runtime/src/lib.rs b/src/common/runtime/src/lib.rs new file mode 100644 index 0000000000..e2e78a1d25 --- /dev/null +++ b/src/common/runtime/src/lib.rs @@ -0,0 +1,12 @@ +pub mod error; +mod global; +pub mod metric; +pub mod runtime; + +pub use global::{ + bg_runtime, block_on_bg, block_on_read, block_on_write, create_runtime, init_global_runtimes, + read_runtime, spawn_bg, spawn_blocking_bg, spawn_blocking_read, spawn_blocking_write, + spawn_read, spawn_write, write_runtime, +}; + +pub use crate::runtime::{Builder, JoinHandle, Runtime}; diff --git a/src/common/runtime/src/metric.rs b/src/common/runtime/src/metric.rs new file mode 100644 index 0000000000..8138fdac97 --- /dev/null +++ b/src/common/runtime/src/metric.rs @@ -0,0 +1,4 @@ +//! Runtime metrics +pub const THREAD_NAME_LABEL: &str = "thread.name"; +pub const METRIC_RUNTIME_THREADS_ALIVE: &str = "runtime.threads.alive"; +pub const METRIC_RUNTIME_THREADS_IDLE: &str = "runtime.threads.idle"; diff --git a/src/common/runtime/src/runtime.rs b/src/common/runtime/src/runtime.rs new file mode 100644 index 0000000000..3b2842433d --- /dev/null +++ b/src/common/runtime/src/runtime.rs @@ -0,0 +1,252 @@ +use std::sync::Arc; +use std::thread; +use std::{future::Future, time::Duration}; + +use metrics::{decrement_gauge, increment_gauge}; +use snafu::ResultExt; +use tokio::runtime::{Builder as RuntimeBuilder, Handle}; +use tokio::sync::oneshot; +pub use tokio::task::JoinHandle; + +use crate::error::*; +use crate::metric::*; + +/// A runtime to run future tasks +#[derive(Clone)] +pub struct Runtime { + handle: Handle, + // Used to receive a drop signal when dropper is dropped, inspired by databend + _dropper: Arc, +} + +/// Dropping the dropper will cause runtime to shutdown. +pub struct Dropper { + close: Option>, +} + +impl Drop for Dropper { + fn drop(&mut self) { + // Send a signal to say i am dropping. + self.close.take().map(|v| v.send(())); + } +} + +impl Runtime { + /// Spawn a future and execute it in this thread pool + /// + /// Similar to tokio::runtime::Runtime::spawn() + pub fn spawn(&self, future: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.handle.spawn(future) + } + + /// Run the provided function on an executor dedicated to blocking + /// operations. + pub fn spawn_blocking(&self, func: F) -> JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + self.handle.spawn_blocking(func) + } + + /// Run a future to complete, this is the runtime's entry point + pub fn block_on(&self, future: F) -> F::Output { + self.handle.block_on(future) + } +} + +pub struct Builder { + thread_name: String, + builder: RuntimeBuilder, +} + +impl Default for Builder { + fn default() -> Self { + Self { + thread_name: "default-worker".to_string(), + builder: RuntimeBuilder::new_multi_thread(), + } + } +} + +impl Builder { + /// Sets the number of worker threads the Runtime will use. + /// + /// This can be any number above 0. The default value is the number of cores available to the system. + pub fn worker_threads(&mut self, val: usize) -> &mut Self { + self.builder.worker_threads(val); + self + } + + /// Specifies the limit for additional threads spawned by the Runtime. + /// + /// These threads are used for blocking operations like tasks spawned through spawn_blocking, + /// they are not always active and will exit if left idle for too long, You can change this timeout duration + /// with thread_keep_alive. The default value is 512. + pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self { + self.builder.max_blocking_threads(val); + self + } + + /// Sets a custom timeout for a thread in the blocking pool. + /// + /// By default, the timeout for a thread is set to 10 seconds. + pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self { + self.builder.thread_keep_alive(duration); + self + } + + /// Sets name of threads spawned by the Runtime thread pool + pub fn thread_name(&mut self, val: impl Into) -> &mut Self { + self.thread_name = val.into(); + self + } + + pub fn build(&mut self) -> Result { + let runtime = self + .builder + .enable_all() + .thread_name(self.thread_name.clone()) + .on_thread_start(on_thread_start(self.thread_name.clone())) + .on_thread_stop(on_thread_stop(self.thread_name.clone())) + .on_thread_park(on_thread_park(self.thread_name.clone())) + .on_thread_unpark(on_thread_unpark(self.thread_name.clone())) + .build() + .context(BuildRuntimeSnafu)?; + + let handle = runtime.handle().clone(); + let (send_stop, recv_stop) = oneshot::channel(); + // Block the runtime to shutdown. + let _ = thread::Builder::new() + .name(format!("{}-blocker", self.thread_name)) + .spawn(move || runtime.block_on(recv_stop)); + + Ok(Runtime { + handle, + _dropper: Arc::new(Dropper { + close: Some(send_stop), + }), + }) + } +} + +fn on_thread_start(thread_name: String) -> impl Fn() + 'static { + move || { + let labels = [(THREAD_NAME_LABEL, thread_name.clone())]; + increment_gauge!(METRIC_RUNTIME_THREADS_ALIVE, 1.0, &labels); + } +} + +fn on_thread_stop(thread_name: String) -> impl Fn() + 'static { + move || { + let labels = [(THREAD_NAME_LABEL, thread_name.clone())]; + decrement_gauge!(METRIC_RUNTIME_THREADS_ALIVE, 1.0, &labels); + } +} + +fn on_thread_park(thread_name: String) -> impl Fn() + 'static { + move || { + let labels = [(THREAD_NAME_LABEL, thread_name.clone())]; + increment_gauge!(METRIC_RUNTIME_THREADS_IDLE, 1.0, &labels); + } +} + +fn on_thread_unpark(thread_name: String) -> impl Fn() + 'static { + move || { + let labels = [(THREAD_NAME_LABEL, thread_name.clone())]; + decrement_gauge!(METRIC_RUNTIME_THREADS_IDLE, 1.0, &labels); + } +} + +#[cfg(test)] +mod tests { + use std::{sync::Arc, thread, time::Duration}; + + use common_telemetry::metric; + use tokio::sync::oneshot; + use tokio_test::assert_ok; + + use super::*; + + fn runtime() -> Arc { + common_telemetry::init_default_metrics_recorder(); + let runtime = Builder::default() + .worker_threads(2) + .thread_name("test_spawn_join") + .build(); + assert!(runtime.is_ok()); + Arc::new(runtime.unwrap()) + } + + #[test] + fn test_metric() { + common_telemetry::init_default_metrics_recorder(); + let runtime = Builder::default() + .worker_threads(5) + .thread_name("test_runtime_metric") + .build() + .unwrap(); + // wait threads created + thread::sleep(Duration::from_millis(50)); + + runtime.spawn(async { + thread::sleep(Duration::from_millis(50)); + }); + + thread::sleep(Duration::from_millis(10)); + + let handle = metric::try_handle().unwrap(); + let metric_text = handle.render(); + + assert!(metric_text.contains("runtime_threads_idle{thread_name=\"test_runtime_metric\"}")); + assert!(metric_text.contains("runtime_threads_alive{thread_name=\"test_runtime_metric\"}")); + } + + #[test] + fn block_on_async() { + let runtime = runtime(); + + let out = runtime.block_on(async { + let (tx, rx) = oneshot::channel(); + + thread::spawn(move || { + thread::sleep(Duration::from_millis(50)); + tx.send("ZOMG").unwrap(); + }); + + assert_ok!(rx.await) + }); + + assert_eq!(out, "ZOMG"); + } + + #[test] + fn spawn_from_blocking() { + let runtime = runtime(); + let runtime1 = runtime.clone(); + let out = runtime.block_on(async move { + let runtime2 = runtime1.clone(); + let inner = assert_ok!( + runtime1 + .spawn_blocking(move || { runtime2.spawn(async move { "hello" }) }) + .await + ); + + assert_ok!(inner.await) + }); + + assert_eq!(out, "hello") + } + + #[test] + fn test_spawn_join() { + let runtime = runtime(); + let handle = runtime.spawn(async { 1 + 1 }); + + assert_eq!(2, runtime.block_on(handle).unwrap()); + } +}