feature: runtime crate and global runtimes (#49)

* feat: init common runtime crate

* feat: tokio Runtime wrapper and global runtime functions

* feat: adds block_on_read, block_on_write, block_on_bg functions to runtime

* refactor: panic when configure global runtimes which are already initialized

* refactor: adds read/write/bg thread pool size

* fix: code style

* fix: clippy warning

* fix: test_metric panic

* fix: address CR problems

* log: adds log when creating runtime
This commit is contained in:
dennis zhuang
2022-06-21 16:09:15 +08:00
committed by GitHub
parent 6ec870625f
commit 379d2e2f50
8 changed files with 537 additions and 2 deletions

31
Cargo.lock generated
View File

@@ -603,6 +603,20 @@ dependencies = [
"tokio",
]
[[package]]
name = "common-runtime"
version = "0.1.0"
dependencies = [
"common-error",
"common-telemetry",
"metrics",
"once_cell",
"paste",
"snafu",
"tokio",
"tokio-test",
]
[[package]]
name = "common-telemetry"
version = "0.1.0"
@@ -2032,9 +2046,9 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.10.0"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9"
checksum = "7709cef83f0c1f58f666e746a08b21e0085f7440fa6a29cc194d68aac97a4225"
[[package]]
name = "opendal"
@@ -3370,6 +3384,19 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-test"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53474327ae5e166530d17f2d956afcb4f8a004de581b3cae10f12006bc8163e3"
dependencies = [
"async-stream",
"bytes",
"futures-core",
"tokio",
"tokio-stream",
]
[[package]]
name = "tokio-util"
version = "0.6.9"

View File

@@ -3,6 +3,7 @@ members = [
"src/common/base",
"src/common/error",
"src/common/function",
"src/common/runtime",
"src/common/telemetry",
"src/common/query",
"src/common/recordbatch",

View File

@@ -0,0 +1,17 @@
[package]
name = "common-runtime"
version = "0.1.0"
edition = "2021"
[dependencies]
common-error = { path = "../error" }
common-telemetry = { path = "../telemetry" }
metrics = "0.18"
once_cell = "1.12"
paste = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }
tokio = { version = "1.18", features = ["full"] }
[dev-dependencies]
tokio-test = "0.4"

View File

@@ -0,0 +1,25 @@
use std::any::Any;
use common_error::prelude::*;
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub(crate)))]
pub enum Error {
#[snafu(display("Failed to build runtime, source: {}", source))]
BuildRuntime {
source: std::io::Error,
backtrace: Backtrace,
},
}
impl ErrorExt for Error {
fn backtrace_opt(&self) -> Option<&Backtrace> {
ErrorCompat::backtrace(self)
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -0,0 +1,197 @@
//! Global runtimes
use std::future::Future;
use std::sync::{Mutex, Once};
use common_telemetry::logging;
use once_cell::sync::Lazy;
use paste::paste;
use crate::{Builder, JoinHandle, Runtime};
const READ_WORKERS: usize = 8;
const WRITE_WORKERS: usize = 8;
const BG_WORKERS: usize = 8;
pub fn create_runtime(thread_name: &str, worker_threads: usize) -> Runtime {
logging::info!(
"Creating runtime, thread name: {}, work_threads: {}.",
thread_name,
worker_threads
);
Builder::default()
.thread_name(thread_name)
.worker_threads(worker_threads)
.build()
.expect("Fail to create runtime")
}
struct GlobalRuntimes {
read_runtime: Runtime,
write_runtime: Runtime,
bg_runtime: Runtime,
}
macro_rules! define_spawn {
($type: ident) => {
paste! {
fn [<spawn_ $type>]<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.[<$type _runtime>].spawn(future)
}
fn [<spawn_blocking_ $type>]<F, R>(&self, future: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
self.[<$type _runtime>].spawn_blocking(future)
}
fn [<block_on_ $type>]<F: Future>(&self, future: F) -> F::Output {
self.[<$type _runtime>].block_on(future)
}
}
};
}
impl GlobalRuntimes {
define_spawn!(read);
define_spawn!(write);
define_spawn!(bg);
fn new(read: Option<Runtime>, write: Option<Runtime>, background: Option<Runtime>) -> Self {
Self {
read_runtime: read.unwrap_or_else(|| create_runtime("read-worker", READ_WORKERS)),
write_runtime: write.unwrap_or_else(|| create_runtime("write-worker", WRITE_WORKERS)),
bg_runtime: background.unwrap_or_else(|| create_runtime("bg-worker", BG_WORKERS)),
}
}
}
#[derive(Default)]
struct ConfigRuntimes {
read_runtime: Option<Runtime>,
write_runtime: Option<Runtime>,
bg_runtime: Option<Runtime>,
already_init: bool,
}
static GLOBAL_RUNTIMES: Lazy<GlobalRuntimes> = Lazy::new(|| {
let mut c = CONFIG_RUNTIMES.lock().unwrap();
let read = c.read_runtime.take();
let write = c.write_runtime.take();
let background = c.bg_runtime.take();
c.already_init = true;
GlobalRuntimes::new(read, write, background)
});
static CONFIG_RUNTIMES: Lazy<Mutex<ConfigRuntimes>> =
Lazy::new(|| Mutex::new(ConfigRuntimes::default()));
/// Initialize the global runtimes
///
/// # Panics
/// Panics when the global runtimes are already initialized.
/// You should call this function before using any runtime functions.
pub fn init_global_runtimes(
read: Option<Runtime>,
write: Option<Runtime>,
background: Option<Runtime>,
) {
static START: Once = Once::new();
START.call_once(move || {
let mut c = CONFIG_RUNTIMES.lock().unwrap();
assert!(!c.already_init, "Global runtimes already initialized");
c.read_runtime = read;
c.write_runtime = write;
c.bg_runtime = background;
});
}
macro_rules! define_global_runtime_spawn {
($type: ident) => {
paste! {
#[doc = "Returns the global `" $type "` thread pool."]
pub fn [<$type _runtime>]() -> Runtime {
GLOBAL_RUNTIMES.[<$type _runtime>].clone()
}
#[doc = "Spawn a future and execute it in `" $type "` thread pool."]
pub fn [<spawn_ $type>]<F>(future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
GLOBAL_RUNTIMES.[<spawn_ $type>](future)
}
#[doc = "Run the blocking operation in `" $type "` thread pool."]
pub fn [<spawn_blocking_ $type>]<F, R>(future: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
GLOBAL_RUNTIMES.[<spawn_blocking_ $type>](future)
}
#[doc = "Run a future to complete in `" $type "` thread pool."]
pub fn [<block_on_ $type>]<F: Future>(future: F) -> F::Output {
GLOBAL_RUNTIMES.[<block_on_ $type>](future)
}
}
};
}
define_global_runtime_spawn!(read);
define_global_runtime_spawn!(write);
define_global_runtime_spawn!(bg);
#[cfg(test)]
mod tests {
use tokio_test::assert_ok;
use super::*;
#[test]
fn test_spawn_block_on() {
let handle = spawn_read(async { 1 + 1 });
assert_eq!(2, block_on_read(handle).unwrap());
let handle = spawn_write(async { 2 + 2 });
assert_eq!(4, block_on_write(handle).unwrap());
let handle = spawn_bg(async { 3 + 3 });
assert_eq!(6, block_on_bg(handle).unwrap());
}
macro_rules! define_spawn_blocking_test {
($type: ident) => {
paste! {
#[test]
fn [<test_spawn_ $type _from_blocking>]() {
let runtime = [<$type _runtime>]();
let out = runtime.block_on(async move {
let inner = assert_ok!(
[<spawn_blocking_ $type>](move || {
[<spawn_ $type>](async move { "hello" })
}).await
);
assert_ok!(inner.await)
});
assert_eq!(out, "hello")
}
}
};
}
define_spawn_blocking_test!(read);
define_spawn_blocking_test!(write);
define_spawn_blocking_test!(bg);
}

View File

@@ -0,0 +1,12 @@
pub mod error;
mod global;
pub mod metric;
pub mod runtime;
pub use global::{
bg_runtime, block_on_bg, block_on_read, block_on_write, create_runtime, init_global_runtimes,
read_runtime, spawn_bg, spawn_blocking_bg, spawn_blocking_read, spawn_blocking_write,
spawn_read, spawn_write, write_runtime,
};
pub use crate::runtime::{Builder, JoinHandle, Runtime};

View File

@@ -0,0 +1,4 @@
//! Runtime metrics
pub const THREAD_NAME_LABEL: &str = "thread.name";
pub const METRIC_RUNTIME_THREADS_ALIVE: &str = "runtime.threads.alive";
pub const METRIC_RUNTIME_THREADS_IDLE: &str = "runtime.threads.idle";

View File

@@ -0,0 +1,252 @@
use std::sync::Arc;
use std::thread;
use std::{future::Future, time::Duration};
use metrics::{decrement_gauge, increment_gauge};
use snafu::ResultExt;
use tokio::runtime::{Builder as RuntimeBuilder, Handle};
use tokio::sync::oneshot;
pub use tokio::task::JoinHandle;
use crate::error::*;
use crate::metric::*;
/// A runtime to run future tasks
#[derive(Clone)]
pub struct Runtime {
handle: Handle,
// Used to receive a drop signal when dropper is dropped, inspired by databend
_dropper: Arc<Dropper>,
}
/// Dropping the dropper will cause runtime to shutdown.
pub struct Dropper {
close: Option<oneshot::Sender<()>>,
}
impl Drop for Dropper {
fn drop(&mut self) {
// Send a signal to say i am dropping.
self.close.take().map(|v| v.send(()));
}
}
impl Runtime {
/// Spawn a future and execute it in this thread pool
///
/// Similar to tokio::runtime::Runtime::spawn()
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.handle.spawn(future)
}
/// Run the provided function on an executor dedicated to blocking
/// operations.
pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
self.handle.spawn_blocking(func)
}
/// Run a future to complete, this is the runtime's entry point
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
self.handle.block_on(future)
}
}
pub struct Builder {
thread_name: String,
builder: RuntimeBuilder,
}
impl Default for Builder {
fn default() -> Self {
Self {
thread_name: "default-worker".to_string(),
builder: RuntimeBuilder::new_multi_thread(),
}
}
}
impl Builder {
/// Sets the number of worker threads the Runtime will use.
///
/// This can be any number above 0. The default value is the number of cores available to the system.
pub fn worker_threads(&mut self, val: usize) -> &mut Self {
self.builder.worker_threads(val);
self
}
/// Specifies the limit for additional threads spawned by the Runtime.
///
/// These threads are used for blocking operations like tasks spawned through spawn_blocking,
/// they are not always active and will exit if left idle for too long, You can change this timeout duration
/// with thread_keep_alive. The default value is 512.
pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
self.builder.max_blocking_threads(val);
self
}
/// Sets a custom timeout for a thread in the blocking pool.
///
/// By default, the timeout for a thread is set to 10 seconds.
pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
self.builder.thread_keep_alive(duration);
self
}
/// Sets name of threads spawned by the Runtime thread pool
pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
self.thread_name = val.into();
self
}
pub fn build(&mut self) -> Result<Runtime> {
let runtime = self
.builder
.enable_all()
.thread_name(self.thread_name.clone())
.on_thread_start(on_thread_start(self.thread_name.clone()))
.on_thread_stop(on_thread_stop(self.thread_name.clone()))
.on_thread_park(on_thread_park(self.thread_name.clone()))
.on_thread_unpark(on_thread_unpark(self.thread_name.clone()))
.build()
.context(BuildRuntimeSnafu)?;
let handle = runtime.handle().clone();
let (send_stop, recv_stop) = oneshot::channel();
// Block the runtime to shutdown.
let _ = thread::Builder::new()
.name(format!("{}-blocker", self.thread_name))
.spawn(move || runtime.block_on(recv_stop));
Ok(Runtime {
handle,
_dropper: Arc::new(Dropper {
close: Some(send_stop),
}),
})
}
}
fn on_thread_start(thread_name: String) -> impl Fn() + 'static {
move || {
let labels = [(THREAD_NAME_LABEL, thread_name.clone())];
increment_gauge!(METRIC_RUNTIME_THREADS_ALIVE, 1.0, &labels);
}
}
fn on_thread_stop(thread_name: String) -> impl Fn() + 'static {
move || {
let labels = [(THREAD_NAME_LABEL, thread_name.clone())];
decrement_gauge!(METRIC_RUNTIME_THREADS_ALIVE, 1.0, &labels);
}
}
fn on_thread_park(thread_name: String) -> impl Fn() + 'static {
move || {
let labels = [(THREAD_NAME_LABEL, thread_name.clone())];
increment_gauge!(METRIC_RUNTIME_THREADS_IDLE, 1.0, &labels);
}
}
fn on_thread_unpark(thread_name: String) -> impl Fn() + 'static {
move || {
let labels = [(THREAD_NAME_LABEL, thread_name.clone())];
decrement_gauge!(METRIC_RUNTIME_THREADS_IDLE, 1.0, &labels);
}
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, thread, time::Duration};
use common_telemetry::metric;
use tokio::sync::oneshot;
use tokio_test::assert_ok;
use super::*;
fn runtime() -> Arc<Runtime> {
common_telemetry::init_default_metrics_recorder();
let runtime = Builder::default()
.worker_threads(2)
.thread_name("test_spawn_join")
.build();
assert!(runtime.is_ok());
Arc::new(runtime.unwrap())
}
#[test]
fn test_metric() {
common_telemetry::init_default_metrics_recorder();
let runtime = Builder::default()
.worker_threads(5)
.thread_name("test_runtime_metric")
.build()
.unwrap();
// wait threads created
thread::sleep(Duration::from_millis(50));
runtime.spawn(async {
thread::sleep(Duration::from_millis(50));
});
thread::sleep(Duration::from_millis(10));
let handle = metric::try_handle().unwrap();
let metric_text = handle.render();
assert!(metric_text.contains("runtime_threads_idle{thread_name=\"test_runtime_metric\"}"));
assert!(metric_text.contains("runtime_threads_alive{thread_name=\"test_runtime_metric\"}"));
}
#[test]
fn block_on_async() {
let runtime = runtime();
let out = runtime.block_on(async {
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
tx.send("ZOMG").unwrap();
});
assert_ok!(rx.await)
});
assert_eq!(out, "ZOMG");
}
#[test]
fn spawn_from_blocking() {
let runtime = runtime();
let runtime1 = runtime.clone();
let out = runtime.block_on(async move {
let runtime2 = runtime1.clone();
let inner = assert_ok!(
runtime1
.spawn_blocking(move || { runtime2.spawn(async move { "hello" }) })
.await
);
assert_ok!(inner.await)
});
assert_eq!(out, "hello")
}
#[test]
fn test_spawn_join() {
let runtime = runtime();
let handle = runtime.spawn(async { 1 + 1 });
assert_eq!(2, runtime.block_on(handle).unwrap());
}
}