diff --git a/Cargo.lock b/Cargo.lock index e93d75563b..d103e0181b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1665,7 +1665,9 @@ name = "common-function-macro" version = "0.2.0" dependencies = [ "arc-swap", + "backtrace", "common-query", + "common-telemetry", "datatypes", "proc-macro2", "quote", @@ -1681,11 +1683,14 @@ dependencies = [ "api", "arrow-flight", "async-trait", + "backtrace", "common-base", "common-error", + "common-function-macro", "common-query", "common-recordbatch", "common-runtime", + "common-telemetry", "criterion 0.4.0", "dashmap", "datafusion", @@ -1835,6 +1840,7 @@ dependencies = [ "opentelemetry-jaeger", "parking_lot", "serde", + "tokio", "tracing", "tracing-appender", "tracing-bunyan-formatter", @@ -8941,9 +8947,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.27.0" +version = "1.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0de47a4eecbe11f498978a9b29d792f0d2692d1dd003650c24c76510e3bc001" +checksum = "c3c786bf8134e5a3a166db9b29ab8f48134739014a3eca7bc6bfa95d673b136f" dependencies = [ "autocfg", "bytes", @@ -8956,7 +8962,7 @@ dependencies = [ "socket2 0.4.9", "tokio-macros", "tracing", - "windows-sys 0.45.0", + "windows-sys 0.48.0", ] [[package]] @@ -8971,9 +8977,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.0.0" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61a573bdc87985e9d6ddeed1b3d864e8a302c847e40d647746df2f1de209d1ce" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 3a46a61b6b..1cd431c9c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,7 +80,7 @@ serde_json = "1.0" snafu = { version = "0.7", features = ["backtraces"] } sqlparser = "0.33" tempfile = "3" -tokio = { version = "1.24.2", features = ["full"] } +tokio = { version = "1.28", features = ["full"] } tokio-util = { version = "0.7", features = ["io-util", "compat"] } tonic = { version = "0.9", features = ["tls"] } uuid = { version = "1", features = ["serde", "v4", "fast-rng"] } diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 03c9839949..24541c5f76 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -56,8 +56,9 @@ impl Database { Self { catalog: catalog.into(), schema: schema.into(), + dbname: "".to_string(), client, - ..Default::default() + ctx: FlightContext::default(), } } @@ -70,9 +71,11 @@ impl Database { /// environment pub fn new_with_dbname(dbname: impl Into, client: Client) -> Self { Self { + catalog: "".to_string(), + schema: "".to_string(), dbname: dbname.into(), client, - ..Default::default() + ctx: FlightContext::default(), } } diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index bd02294aa3..ea80dde095 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -11,6 +11,7 @@ path = "src/bin/greptime.rs" [features] mem-prof = ["tikv-jemallocator", "tikv-jemalloc-ctl"] +tokio-console = ["common-telemetry/tokio-console"] [dependencies] anymap = "1.0.0-beta.2" diff --git a/src/cmd/src/bin/greptime.rs b/src/cmd/src/bin/greptime.rs index ada4ce21eb..d419a2432d 100644 --- a/src/cmd/src/bin/greptime.rs +++ b/src/cmd/src/bin/greptime.rs @@ -20,7 +20,7 @@ use clap::Parser; use cmd::error::Result; use cmd::options::{Options, TopLevelOptions}; use cmd::{cli, datanode, frontend, metasrv, standalone}; -use common_telemetry::logging::{error, info}; +use common_telemetry::logging::{error, info, TracingOptions}; #[derive(Parser)] #[clap(name = "greptimedb", version = print_version())] @@ -31,6 +31,10 @@ struct Command { log_level: Option, #[clap(subcommand)] subcmd: SubCommand, + + #[cfg(feature = "tokio-console")] + #[clap(long)] + tokio_console_addr: Option, } pub enum Application { @@ -172,10 +176,14 @@ async fn main() -> Result<()> { let opts = cmd.load_options()?; let logging_opts = opts.logging_options(); + let tracing_opts = TracingOptions { + #[cfg(feature = "tokio-console")] + tokio_console_addr: cmd.tokio_console_addr.clone(), + }; common_telemetry::set_panic_hook(); common_telemetry::init_default_metrics_recorder(); - let _guard = common_telemetry::init_global_logging(app_name, logging_opts); + let _guard = common_telemetry::init_global_logging(app_name, logging_opts, tracing_opts); let mut app = cmd.build(opts).await?; diff --git a/src/common/function-macro/Cargo.toml b/src/common/function-macro/Cargo.toml index c9078dc13e..bc7bdd3672 100644 --- a/src/common/function-macro/Cargo.toml +++ b/src/common/function-macro/Cargo.toml @@ -8,6 +8,8 @@ license.workspace = true proc-macro = true [dependencies] +common-telemetry = { path = "../telemetry" } +backtrace = "0.3" quote = "1.0" syn = "1.0" proc-macro2 = "1.0" diff --git a/src/common/function-macro/src/lib.rs b/src/common/function-macro/src/lib.rs index 0afa5b7e7b..790b6ac5b0 100644 --- a/src/common/function-macro/src/lib.rs +++ b/src/common/function-macro/src/lib.rs @@ -15,11 +15,13 @@ mod range_fn; use proc_macro::TokenStream; -use quote::{quote, quote_spanned}; +use quote::{quote, quote_spanned, ToTokens}; use range_fn::process_range_fn; use syn::parse::Parser; use syn::spanned::Spanned; -use syn::{parse_macro_input, DeriveInput, ItemStruct}; +use syn::{ + parse_macro_input, AttributeArgs, DeriveInput, ItemFn, ItemStruct, Lit, Meta, NestedMeta, +}; /// Make struct implemented trait [AggrFuncTypeStore], which is necessary when writing UDAF. /// This derive macro is expect to be used along with attribute macro [as_aggr_func_creator]. @@ -114,3 +116,109 @@ pub fn as_aggr_func_creator(_args: TokenStream, input: TokenStream) -> TokenStre pub fn range_fn(args: TokenStream, input: TokenStream) -> TokenStream { process_range_fn(args, input) } + +/// Attribute macro to print the caller to the annotated function. +/// The caller is printed as its filename and the call site line number. +/// +/// This macro works like this: inject the tracking codes as the first statement to the annotated +/// function body. The tracking codes use [backtrace-rs](https://crates.io/crates/backtrace) to get +/// the callers. So you must dependent on the `backtrace-rs` crate. +/// +/// # Arguments +/// - `depth`: The max depth of call stack to print. Optional, defaults to 1. +/// +/// # Example +/// ```rust, ignore +/// +/// #[print_caller(depth = 3)] +/// fn foo() {} +/// ``` +#[proc_macro_attribute] +pub fn print_caller(args: TokenStream, input: TokenStream) -> TokenStream { + let mut depth = 1; + + let args = parse_macro_input!(args as AttributeArgs); + for meta in args.iter() { + if let NestedMeta::Meta(Meta::NameValue(name_value)) = meta { + let ident = name_value + .path + .get_ident() + .expect("Expected an ident!") + .to_string(); + if ident == "depth" { + let Lit::Int(i) = &name_value.lit else { panic!("Expected 'depth' to be a valid int!") }; + depth = i.base10_parse::().expect("Invalid 'depth' value"); + break; + } + } + } + + let tokens: TokenStream = quote! { + { + let curr_file = file!(); + + let bt = backtrace::Backtrace::new(); + let call_stack = bt + .frames() + .iter() + .skip_while(|f| { + !f.symbols().iter().any(|s| { + s.filename() + .map(|p| p.ends_with(curr_file)) + .unwrap_or(false) + }) + }) + .skip(1) + .take(#depth); + + let call_stack = call_stack + .map(|f| { + f.symbols() + .iter() + .map(|s| { + let filename = s + .filename() + .map(|p| format!("{:?}", p)) + .unwrap_or_else(|| "unknown".to_string()); + + let lineno = s + .lineno() + .map(|l| format!("{}", l)) + .unwrap_or_else(|| "unknown".to_string()); + + format!("filename: {}, lineno: {}", filename, lineno) + }) + .collect::>() + .join(", ") + }) + .collect::>(); + + match call_stack.len() { + 0 => common_telemetry::info!("unable to find call stack"), + 1 => common_telemetry::info!("caller: {}", call_stack[0]), + _ => { + let mut s = String::new(); + s.push_str("[\n"); + for e in call_stack { + s.push_str("\t"); + s.push_str(&e); + s.push_str("\n"); + } + s.push_str("]"); + common_telemetry::info!("call stack: {}", s) + } + } + } + } + .into(); + + let stmt = match syn::parse(tokens) { + Ok(stmt) => stmt, + Err(e) => return e.into_compile_error().into(), + }; + + let mut item = parse_macro_input!(input as ItemFn); + item.block.stmts.insert(0, stmt); + + item.into_token_stream().into() +} diff --git a/src/common/grpc/Cargo.toml b/src/common/grpc/Cargo.toml index 77bf287460..69898ea07a 100644 --- a/src/common/grpc/Cargo.toml +++ b/src/common/grpc/Cargo.toml @@ -8,11 +8,14 @@ license.workspace = true api = { path = "../../api" } arrow-flight.workspace = true async-trait = "0.1" +backtrace = "0.3" common-base = { path = "../base" } common-error = { path = "../error" } +common-function-macro = { path = "../function-macro" } common-query = { path = "../query" } common-recordbatch = { path = "../recordbatch" } common-runtime = { path = "../runtime" } +common-telemetry = { path = "../telemetry" } dashmap = "5.4" datafusion.workspace = true datatypes = { path = "../../datatypes" } diff --git a/src/common/grpc/src/channel_manager.rs b/src/common/grpc/src/channel_manager.rs index 439e3e2fe9..857aff8fd0 100644 --- a/src/common/grpc/src/channel_manager.rs +++ b/src/common/grpc/src/channel_manager.rs @@ -16,6 +16,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; +use common_telemetry::info; use dashmap::mapref::entry::Entry; use dashmap::DashMap; use snafu::{OptionExt, ResultExt}; @@ -33,6 +34,7 @@ pub struct ChannelManager { config: ChannelConfig, client_tls_config: Option, pool: Arc, + channel_recycle_started: bool, } impl Default for ChannelManager { @@ -48,19 +50,28 @@ impl ChannelManager { pub fn with_config(config: ChannelConfig) -> Self { let pool = Arc::new(Pool::default()); - let cloned_pool = pool.clone(); - - common_runtime::spawn_bg(async { - recycle_channel_in_loop(cloned_pool, RECYCLE_CHANNEL_INTERVAL_SECS).await; - }); - Self { config, client_tls_config: None, pool, + channel_recycle_started: false, } } + pub fn start_channel_recycle(&mut self) { + if self.channel_recycle_started { + return; + } + + let pool = self.pool.clone(); + common_runtime::spawn_bg(async { + recycle_channel_in_loop(pool, RECYCLE_CHANNEL_INTERVAL_SECS).await; + }); + info!("Channel recycle is started, running in the background!"); + + self.channel_recycle_started = true; + } + pub fn with_tls_config(config: ChannelConfig) -> Result { let mut cm = Self::with_config(config.clone()); @@ -224,8 +235,8 @@ pub struct ChannelConfig { impl Default for ChannelConfig { fn default() -> Self { Self { - timeout: None, - connect_timeout: None, + timeout: Some(Duration::from_secs(2)), + connect_timeout: Some(Duration::from_secs(4)), concurrency_limit: None, rate_limit: None, initial_stream_window_size: None, @@ -455,13 +466,7 @@ mod tests { #[tokio::test] async fn test_access_count() { - let pool = Arc::new(Pool::default()); - let config = ChannelConfig::new(); - let mgr = Arc::new(ChannelManager { - pool, - config, - client_tls_config: None, - }); + let mgr = Arc::new(ChannelManager::new()); let addr = "test_uri"; let mut joins = Vec::with_capacity(10); @@ -491,8 +496,8 @@ mod tests { let default_cfg = ChannelConfig::new(); assert_eq!( ChannelConfig { - timeout: None, - connect_timeout: None, + timeout: Some(Duration::from_secs(2)), + connect_timeout: Some(Duration::from_secs(4)), concurrency_limit: None, rate_limit: None, initial_stream_window_size: None, @@ -553,7 +558,6 @@ mod tests { #[test] fn test_build_endpoint() { - let pool = Arc::new(Pool::default()); let config = ChannelConfig::new() .timeout(Duration::from_secs(3)) .connect_timeout(Duration::from_secs(5)) @@ -567,11 +571,7 @@ mod tests { .http2_adaptive_window(true) .tcp_keepalive(Duration::from_secs(2)) .tcp_nodelay(true); - let mgr = ChannelManager { - pool, - config, - client_tls_config: None, - }; + let mgr = ChannelManager::with_config(config); let res = mgr.build_endpoint("test_addr"); @@ -580,18 +580,7 @@ mod tests { #[tokio::test] async fn test_channel_with_connector() { - let pool = Pool { - channels: DashMap::default(), - }; - - let pool = Arc::new(pool); - - let config = ChannelConfig::new(); - let mgr = ChannelManager { - pool, - config, - client_tls_config: None, - }; + let mgr = ChannelManager::new(); let addr = "test_addr"; let res = mgr.get(addr); diff --git a/src/common/runtime/src/global.rs b/src/common/runtime/src/global.rs index 98f9e9917b..51bad13107 100644 --- a/src/common/runtime/src/global.rs +++ b/src/common/runtime/src/global.rs @@ -16,7 +16,7 @@ use std::future::Future; use std::sync::{Mutex, Once}; -use common_telemetry::logging; +use common_telemetry::info; use once_cell::sync::Lazy; use paste::paste; @@ -26,13 +26,10 @@ const READ_WORKERS: usize = 8; const WRITE_WORKERS: usize = 8; const BG_WORKERS: usize = 8; -pub fn create_runtime(thread_name: &str, worker_threads: usize) -> Runtime { - logging::info!( - "Creating runtime, thread name: {}, work_threads: {}.", - thread_name, - worker_threads - ); +pub fn create_runtime(runtime_name: &str, thread_name: &str, worker_threads: usize) -> Runtime { + info!("Creating runtime with runtime_name: {runtime_name}, thread_name: {thread_name}, work_threads: {worker_threads}."); Builder::default() + .runtime_name(runtime_name) .thread_name(thread_name) .worker_threads(worker_threads) .build() @@ -79,9 +76,12 @@ impl GlobalRuntimes { fn new(read: Option, write: Option, background: Option) -> Self { Self { - read_runtime: read.unwrap_or_else(|| create_runtime("read-worker", READ_WORKERS)), - write_runtime: write.unwrap_or_else(|| create_runtime("write-worker", WRITE_WORKERS)), - bg_runtime: background.unwrap_or_else(|| create_runtime("bg-worker", BG_WORKERS)), + read_runtime: read + .unwrap_or_else(|| create_runtime("global-read", "read-worker", READ_WORKERS)), + write_runtime: write + .unwrap_or_else(|| create_runtime("global-write", "write-worker", WRITE_WORKERS)), + bg_runtime: background + .unwrap_or_else(|| create_runtime("global-bg", "bg-worker", BG_WORKERS)), } } } diff --git a/src/common/runtime/src/runtime.rs b/src/common/runtime/src/runtime.rs index 2caabc327b..1112b9acb9 100644 --- a/src/common/runtime/src/runtime.rs +++ b/src/common/runtime/src/runtime.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::future::Future; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::thread; use std::time::Duration; @@ -26,9 +27,12 @@ pub use tokio::task::{JoinError, JoinHandle}; use crate::error::*; use crate::metrics::*; +static RUNTIME_ID: AtomicUsize = AtomicUsize::new(0); + /// A runtime to run future tasks #[derive(Clone, Debug)] pub struct Runtime { + name: String, handle: Handle, // Used to receive a drop signal when dropper is dropped, inspired by databend _dropper: Arc, @@ -73,9 +77,14 @@ impl Runtime { pub fn block_on(&self, future: F) -> F::Output { self.handle.block_on(future) } + + pub fn name(&self) -> &str { + &self.name + } } pub struct Builder { + runtime_name: String, thread_name: String, builder: RuntimeBuilder, } @@ -83,6 +92,7 @@ pub struct Builder { impl Default for Builder { fn default() -> Self { Self { + runtime_name: format!("runtime-{}", RUNTIME_ID.fetch_add(1, Ordering::Relaxed)), thread_name: "default-worker".to_string(), builder: RuntimeBuilder::new_multi_thread(), } @@ -116,6 +126,11 @@ impl Builder { self } + pub fn runtime_name(&mut self, val: impl Into) -> &mut Self { + self.runtime_name = val.into(); + self + } + /// Sets name of threads spawned by the Runtime thread pool pub fn thread_name(&mut self, val: impl Into) -> &mut Self { self.thread_name = val.into(); @@ -142,6 +157,7 @@ impl Builder { .spawn(move || runtime.block_on(recv_stop)); Ok(Runtime { + name: self.runtime_name.clone(), handle, _dropper: Arc::new(Dropper { close: Some(send_stop), diff --git a/src/common/telemetry/Cargo.toml b/src/common/telemetry/Cargo.toml index e0dcf5ab11..571185bba1 100644 --- a/src/common/telemetry/Cargo.toml +++ b/src/common/telemetry/Cargo.toml @@ -5,7 +5,7 @@ edition.workspace = true license.workspace = true [features] -console = ["console-subscriber"] +tokio-console = ["console-subscriber", "tokio/tracing"] deadlock_detection = ["parking_lot"] [dependencies] @@ -24,6 +24,7 @@ parking_lot = { version = "0.12", features = [ "deadlock_detection", ], optional = true } serde = "1.0" +tokio.workspace = true tracing = "0.1" tracing-appender = "0.2" tracing-bunyan-formatter = "0.3" diff --git a/src/common/telemetry/src/logging.rs b/src/common/telemetry/src/logging.rs index 5bc3a34a3e..96f8908138 100644 --- a/src/common/telemetry/src/logging.rs +++ b/src/common/telemetry/src/logging.rs @@ -50,6 +50,12 @@ impl Default for LoggingOptions { } } +#[derive(Default)] +pub struct TracingOptions { + #[cfg(feature = "tokio-console")] + pub tokio_console_addr: Option, +} + /// Init tracing for unittest. /// Write logs to file `unittest`. pub fn init_default_ut_logging() { @@ -71,7 +77,11 @@ pub fn init_default_ut_logging() { level, ..Default::default() }; - *g = Some(init_global_logging("unittest", &opts)); + *g = Some(init_global_logging( + "unittest", + &opts, + TracingOptions::default(), + )); info!("logs dir = {}", dir); }); @@ -80,7 +90,12 @@ pub fn init_default_ut_logging() { static GLOBAL_UT_LOG_GUARD: Lazy>>>> = Lazy::new(|| Arc::new(Mutex::new(None))); -pub fn init_global_logging(app_name: &str, opts: &LoggingOptions) -> Vec { +#[allow(clippy::print_stdout)] +pub fn init_global_logging( + app_name: &str, + opts: &LoggingOptions, + tracing_opts: TracingOptions, +) -> Vec { let mut guards = vec![]; let dir = &opts.dir; let level = &opts.level; @@ -127,6 +142,42 @@ pub fn init_global_logging(app_name: &str, opts: &LoggingOptions) -> Vec Vec Re .timeout(Duration::from_millis(meta_config.timeout_millis)) .connect_timeout(Duration::from_millis(meta_config.connect_timeout_millis)) .tcp_nodelay(meta_config.tcp_nodelay); - let channel_manager = ChannelManager::with_config(config); + + let mut channel_manager = ChannelManager::with_config(config); + channel_manager.start_channel_recycle(); + let mut meta_client = MetaClientBuilder::new(cluster_id, member_id) .enable_heartbeat() .enable_router() diff --git a/src/frontend/src/datanode.rs b/src/frontend/src/datanode.rs index ea496d37d5..7be786f4a4 100644 --- a/src/frontend/src/datanode.rs +++ b/src/frontend/src/datanode.rs @@ -16,12 +16,14 @@ use std::time::Duration; use client::Client; use common_grpc::channel_manager::ChannelManager; +use common_telemetry::info; use meta_client::rpc::Peer; use moka::future::{Cache, CacheBuilder}; pub struct DatanodeClients { channel_manager: ChannelManager, clients: Cache, + started: bool, } impl Default for DatanodeClients { @@ -32,11 +34,23 @@ impl Default for DatanodeClients { .time_to_live(Duration::from_secs(30 * 60)) .time_to_idle(Duration::from_secs(5 * 60)) .build(), + started: false, } } } impl DatanodeClients { + pub(crate) fn start(&mut self) { + if self.started { + return; + } + + self.channel_manager.start_channel_recycle(); + + info!("Datanode clients manager is started!"); + self.started = true; + } + pub(crate) async fn get_client(&self, datanode: &Peer) -> Client { self.clients .get_with_by_ref(datanode, async move { diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 35f8697d4b..bec85dcb2d 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -128,7 +128,10 @@ impl Instance { }); let table_routes = Arc::new(TableRoutes::new(meta_client.clone())); let partition_manager = Arc::new(PartitionRuleManager::new(table_routes)); - let datanode_clients = Arc::new(DatanodeClients::default()); + + let mut datanode_clients = DatanodeClients::default(); + datanode_clients.start(); + let datanode_clients = Arc::new(datanode_clients); let mut catalog_manager = FrontendCatalogManager::new(meta_backend, partition_manager, datanode_clients.clone()); @@ -186,7 +189,9 @@ impl Instance { .timeout(Duration::from_millis(meta_config.timeout_millis)) .connect_timeout(Duration::from_millis(meta_config.connect_timeout_millis)) .tcp_nodelay(meta_config.tcp_nodelay); - let channel_manager = ChannelManager::with_config(channel_config); + + let mut channel_manager = ChannelManager::with_config(channel_config); + channel_manager.start_channel_recycle(); let mut meta_client = MetaClientBuilder::new(0, 0) .enable_router()