feat: Enable distributed tracing in greptimedb (#2755)

* feat: implement distributed tracing

* fix: change usage of span

* fix: use otlp as exporter

* chore: update dependence

* chore: add span info

* chore: add alias

* chore: use instrument instead of trace
This commit is contained in:
WU Jingdi
2023-11-17 16:51:57 +08:00
committed by GitHub
parent ac4b6cd7f0
commit 500e299e40
58 changed files with 602 additions and 394 deletions

179
Cargo.lock generated
View File

@@ -1940,18 +1940,18 @@ dependencies = [
"console-subscriber",
"lazy_static",
"once_cell",
"opentelemetry 0.17.0",
"opentelemetry-jaeger",
"opentelemetry 0.21.0 (registry+https://github.com/rust-lang/crates.io-index)",
"opentelemetry-otlp",
"opentelemetry-semantic-conventions",
"opentelemetry_sdk 0.21.0",
"parking_lot 0.12.1",
"prometheus",
"rand",
"rs-snowflake",
"serde",
"tokio",
"tracing",
"tracing-appender",
"tracing-futures",
"tracing-log",
"tracing-log 0.1.4",
"tracing-opentelemetry",
"tracing-subscriber",
]
@@ -3315,7 +3315,7 @@ dependencies = [
"moka",
"object-store",
"openmetrics-parser",
"opentelemetry-proto",
"opentelemetry-proto 0.3.0",
"operator",
"partition",
"prometheus",
@@ -3599,7 +3599,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=7eb2e78be7a104d2582fbea0bcb1e019407da702#7eb2e78be7a104d2582fbea0bcb1e019407da702"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=25429306d0379ad29211a062a81da2554a0208ab#25429306d0379ad29211a062a81da2554a0208ab"
dependencies = [
"prost 0.12.1",
"serde",
@@ -5419,23 +5419,18 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "opentelemetry"
version = "0.17.0"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6105e89802af13fdf48c49d7646d3b533a70e536d818aae7e78ba0433d01acb8"
checksum = "1e32339a5dc40459130b3bd269e9892439f55b33e772d2a9d402a789baaf4e8a"
dependencies = [
"async-trait",
"crossbeam-channel",
"futures-channel",
"futures-executor",
"futures-util",
"futures-core",
"futures-sink",
"indexmap 2.0.2",
"js-sys",
"lazy_static",
"percent-encoding",
"pin-project",
"rand",
"once_cell",
"pin-project-lite",
"thiserror",
"tokio",
"tokio-stream",
"urlencoding",
]
[[package]]
@@ -5454,18 +5449,22 @@ dependencies = [
]
[[package]]
name = "opentelemetry-jaeger"
version = "0.16.0"
name = "opentelemetry-otlp"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8c0b12cd9e3f9b35b52f6e0dac66866c519b26f424f4bbf96e3fe8bfbdc5229"
checksum = "f24cda83b20ed2433c68241f918d0f6fdec8b1d43b7a9590ab4420c5095ca930"
dependencies = [
"async-trait",
"lazy_static",
"opentelemetry 0.17.0",
"futures-core",
"http",
"opentelemetry 0.21.0 (registry+https://github.com/rust-lang/crates.io-index)",
"opentelemetry-proto 0.4.0",
"opentelemetry-semantic-conventions",
"opentelemetry_sdk 0.21.0",
"prost 0.11.9",
"thiserror",
"thrift 0.15.0",
"tokio",
"tonic 0.9.2",
]
[[package]]
@@ -5473,19 +5472,31 @@ name = "opentelemetry-proto"
version = "0.3.0"
source = "git+https://github.com/waynexia/opentelemetry-rust.git?rev=33841b38dda79b15f2024952be5f32533325ca02#33841b38dda79b15f2024952be5f32533325ca02"
dependencies = [
"opentelemetry 0.21.0",
"opentelemetry_sdk",
"opentelemetry 0.21.0 (git+https://github.com/waynexia/opentelemetry-rust.git?rev=33841b38dda79b15f2024952be5f32533325ca02)",
"opentelemetry_sdk 0.20.0",
"prost 0.12.1",
"tonic 0.10.2",
]
[[package]]
name = "opentelemetry-semantic-conventions"
version = "0.9.0"
name = "opentelemetry-proto"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "985cc35d832d412224b2cffe2f9194b1b89b6aa5d0bef76d080dce09d90e62bd"
checksum = "a2e155ce5cc812ea3d1dffbd1539aed653de4bf4882d60e6e04dcf0901d674e1"
dependencies = [
"opentelemetry 0.17.0",
"opentelemetry 0.21.0 (registry+https://github.com/rust-lang/crates.io-index)",
"opentelemetry_sdk 0.21.0",
"prost 0.11.9",
"tonic 0.9.2",
]
[[package]]
name = "opentelemetry-semantic-conventions"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f5774f1ef1f982ef2a447f6ee04ec383981a3ab99c8e77a1a7b30182e65bbc84"
dependencies = [
"opentelemetry 0.21.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -5500,13 +5511,35 @@ dependencies = [
"futures-util",
"glob",
"once_cell",
"opentelemetry 0.21.0",
"opentelemetry 0.21.0 (git+https://github.com/waynexia/opentelemetry-rust.git?rev=33841b38dda79b15f2024952be5f32533325ca02)",
"ordered-float 4.1.1",
"percent-encoding",
"rand",
"thiserror",
]
[[package]]
name = "opentelemetry_sdk"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5b3ce3f5705e2ae493be467a0b23be4bc563c193cdb7713e55372c89a906b34"
dependencies = [
"async-trait",
"crossbeam-channel",
"futures-channel",
"futures-executor",
"futures-util",
"glob",
"once_cell",
"opentelemetry 0.21.0 (registry+https://github.com/rust-lang/crates.io-index)",
"ordered-float 4.1.1",
"percent-encoding",
"rand",
"thiserror",
"tokio",
"tokio-stream",
]
[[package]]
name = "operator"
version = "0.4.3"
@@ -5589,15 +5622,6 @@ dependencies = [
"zstd 0.12.4",
]
[[package]]
name = "ordered-float"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3305af35278dd29f46fcdd139e0b1fbfae2153f0e5928b39b035542dd31e37b7"
dependencies = [
"num-traits",
]
[[package]]
name = "ordered-float"
version = "2.10.1"
@@ -5767,7 +5791,7 @@ dependencies = [
"paste",
"seq-macro",
"snap",
"thrift 0.17.0",
"thrift",
"tokio",
"twox-hash",
"zstd 0.12.4",
@@ -7212,12 +7236,6 @@ dependencies = [
"serde",
]
[[package]]
name = "rs-snowflake"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e60ef3b82994702bbe4e134d98aadca4b49ed04440148985678d415c68127666"
[[package]]
name = "rsa"
version = "0.6.1"
@@ -8263,7 +8281,7 @@ dependencies = [
"once_cell",
"openmetrics-parser",
"opensrv-mysql",
"opentelemetry-proto",
"opentelemetry-proto 0.3.0",
"parking_lot 0.12.1",
"pgwire",
"pin-project",
@@ -9307,7 +9325,7 @@ dependencies = [
"num_cpus",
"object-store",
"once_cell",
"opentelemetry-proto",
"opentelemetry-proto 0.3.0",
"operator",
"partition",
"paste",
@@ -9397,28 +9415,6 @@ dependencies = [
"once_cell",
]
[[package]]
name = "threadpool"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa"
dependencies = [
"num_cpus",
]
[[package]]
name = "thrift"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b82ca8f46f95b3ce96081fe3dd89160fdea970c254bb72925255d1b62aae692e"
dependencies = [
"byteorder",
"integer-encoding",
"log",
"ordered-float 1.1.1",
"threadpool",
]
[[package]]
name = "thrift"
version = "0.17.0"
@@ -9953,17 +9949,32 @@ dependencies = [
]
[[package]]
name = "tracing-opentelemetry"
version = "0.17.4"
name = "tracing-log"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbbe89715c1dbbb790059e2565353978564924ee85017b5fff365c872ff6721f"
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
dependencies = [
"log",
"once_cell",
"opentelemetry 0.17.0",
"tracing-core",
]
[[package]]
name = "tracing-opentelemetry"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c67ac25c5407e7b961fafc6f7e9aa5958fd297aada2d20fa2ae1737357e55596"
dependencies = [
"js-sys",
"once_cell",
"opentelemetry 0.21.0 (registry+https://github.com/rust-lang/crates.io-index)",
"opentelemetry_sdk 0.21.0",
"smallvec",
"tracing",
"tracing-core",
"tracing-log",
"tracing-log 0.2.0",
"tracing-subscriber",
"web-time",
]
[[package]]
@@ -9981,7 +9992,7 @@ dependencies = [
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
"tracing-log 0.1.4",
]
[[package]]
@@ -10545,6 +10556,16 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "web-time"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57099a701fb3a8043f993e8228dc24229c7b942e2b009a1b962e54489ba1d3bf"
dependencies = [
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "webpki"
version = "0.22.4"

View File

@@ -85,7 +85,7 @@ derive_builder = "0.12"
etcd-client = "0.12"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7eb2e78be7a104d2582fbea0bcb1e019407da702" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "25429306d0379ad29211a062a81da2554a0208ab" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"

View File

@@ -109,9 +109,7 @@ vector_cache_size = "512MB"
sst_write_buffer_size = "8MB"
# Log options
# Log options, see `standalone.example.toml`
# [logging]
# Specify logs directory.
# dir = "/tmp/greptimedb/logs"
# Specify the log level [info | debug | error | warn]
# level = "info"

View File

@@ -158,3 +158,7 @@ global_write_buffer_size = "1GB"
# dir = "/tmp/greptimedb/logs"
# Specify the log level [info | debug | error | warn]
# level = "info"
# whether enable tracing, default is false
# enable_otlp_tracing = false
# tracing exporter endpoint with format `ip:port`, we use grpc oltp as exporter, default endpoint is `localhost:4317`
# otlp_endpoint = "localhost:4317"

View File

@@ -78,7 +78,7 @@ async fn run() {
let logical = mock_logical_plan();
event!(Level::INFO, "plan size: {:#?}", logical.len());
let result = db.logical_plan(logical, 0).await.unwrap();
let result = db.logical_plan(logical).await.unwrap();
event!(Level::INFO, "result: {:#?}", result);
}

View File

@@ -29,6 +29,7 @@ use common_query::Output;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::RecordBatchStreamAdaptor;
use common_telemetry::logging;
use common_telemetry::tracing_context::W3cTrace;
use futures_util::StreamExt;
use prost::Message;
use snafu::{ensure, ResultExt};
@@ -147,21 +148,21 @@ impl Database {
async fn handle(&self, request: Request) -> Result<u32> {
let mut client = self.client.make_database_client()?.inner;
let request = self.to_rpc_request(request, 0);
let request = self.to_rpc_request(request);
let response = client.handle(request).await?.into_inner();
from_grpc_response(response)
}
#[inline]
fn to_rpc_request(&self, request: Request, trace_id: u64) -> GreptimeRequest {
fn to_rpc_request(&self, request: Request) -> GreptimeRequest {
GreptimeRequest {
header: Some(RequestHeader {
catalog: self.catalog.clone(),
schema: self.schema.clone(),
authorization: self.ctx.auth_header.clone(),
dbname: self.dbname.clone(),
trace_id,
span_id: 0,
// TODO(Taylor-lagrange): add client grpc tracing
tracing_context: W3cTrace::new(),
}),
request: Some(request),
}
@@ -172,23 +173,17 @@ impl Database {
S: AsRef<str>,
{
let _timer = metrics::METRIC_GRPC_SQL.start_timer();
self.do_get(
Request::Query(QueryRequest {
query: Some(Query::Sql(sql.as_ref().to_string())),
}),
0,
)
self.do_get(Request::Query(QueryRequest {
query: Some(Query::Sql(sql.as_ref().to_string())),
}))
.await
}
pub async fn logical_plan(&self, logical_plan: Vec<u8>, trace_id: u64) -> Result<Output> {
pub async fn logical_plan(&self, logical_plan: Vec<u8>) -> Result<Output> {
let _timer = metrics::METRIC_GRPC_LOGICAL_PLAN.start_timer();
self.do_get(
Request::Query(QueryRequest {
query: Some(Query::LogicalPlan(logical_plan)),
}),
trace_id,
)
self.do_get(Request::Query(QueryRequest {
query: Some(Query::LogicalPlan(logical_plan)),
}))
.await
}
@@ -200,68 +195,53 @@ impl Database {
step: &str,
) -> Result<Output> {
let _timer = metrics::METRIC_GRPC_PROMQL_RANGE_QUERY.start_timer();
self.do_get(
Request::Query(QueryRequest {
query: Some(Query::PromRangeQuery(PromRangeQuery {
query: promql.to_string(),
start: start.to_string(),
end: end.to_string(),
step: step.to_string(),
})),
}),
0,
)
self.do_get(Request::Query(QueryRequest {
query: Some(Query::PromRangeQuery(PromRangeQuery {
query: promql.to_string(),
start: start.to_string(),
end: end.to_string(),
step: step.to_string(),
})),
}))
.await
}
pub async fn create(&self, expr: CreateTableExpr) -> Result<Output> {
let _timer = metrics::METRIC_GRPC_CREATE_TABLE.start_timer();
self.do_get(
Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(expr)),
}),
0,
)
self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(expr)),
}))
.await
}
pub async fn alter(&self, expr: AlterExpr) -> Result<Output> {
let _timer = metrics::METRIC_GRPC_ALTER.start_timer();
self.do_get(
Request::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(expr)),
}),
0,
)
self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(expr)),
}))
.await
}
pub async fn drop_table(&self, expr: DropTableExpr) -> Result<Output> {
let _timer = metrics::METRIC_GRPC_DROP_TABLE.start_timer();
self.do_get(
Request::Ddl(DdlRequest {
expr: Some(DdlExpr::DropTable(expr)),
}),
0,
)
self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::DropTable(expr)),
}))
.await
}
pub async fn truncate_table(&self, expr: TruncateTableExpr) -> Result<Output> {
let _timer = metrics::METRIC_GRPC_TRUNCATE_TABLE.start_timer();
self.do_get(
Request::Ddl(DdlRequest {
expr: Some(DdlExpr::TruncateTable(expr)),
}),
0,
)
self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::TruncateTable(expr)),
}))
.await
}
async fn do_get(&self, request: Request, trace_id: u64) -> Result<Output> {
async fn do_get(&self, request: Request) -> Result<Output> {
// FIXME(paomian): should be added some labels for metrics
let _timer = metrics::METRIC_GRPC_DO_GET.start_timer();
let request = self.to_rpc_request(request, trace_id);
let request = self.to_rpc_request(request);
let request = Ticket {
ticket: request.encode_to_vec().into(),
};

View File

@@ -208,7 +208,8 @@ async fn main() -> Result<()> {
};
common_telemetry::set_panic_hook();
let _guard = common_telemetry::init_global_logging(app_name, logging_opts, tracing_opts);
let _guard =
common_telemetry::init_global_logging(app_name, logging_opts, tracing_opts, opts.node_id());
// Report app version as gauge.
APP_VERSION

View File

@@ -176,7 +176,7 @@ impl Repl {
.encode(&plan)
.context(SubstraitEncodeLogicalPlanSnafu)?;
self.database.logical_plan(plan.to_vec(), 0).await
self.database.logical_plan(plan.to_vec()).await
} else {
self.database.sql(&sql).await
}

View File

@@ -133,6 +133,15 @@ impl Options {
Ok(opts)
}
pub fn node_id(&self) -> Option<String> {
match self {
Options::Metasrv(_) | Options::Cli(_) => None,
Options::Datanode(opt) => opt.node_id.map(|x| x.to_string()),
Options::Frontend(opt) => opt.node_id.clone(),
Options::Standalone(opt) => opt.frontend.node_id.clone(),
}
}
}
#[cfg(test)]

View File

@@ -15,6 +15,7 @@
use std::sync::Arc;
use api::v1::meta::Partition;
use common_telemetry::tracing_context::W3cTrace;
use store_api::storage::TableId;
use table::metadata::RawTableInfo;
@@ -34,6 +35,7 @@ pub mod utils;
#[derive(Debug, Default)]
pub struct ExecutorContext {
pub cluster_id: Option<u64>,
pub tracing_context: Option<W3cTrace>,
}
#[async_trait::async_trait]

View File

@@ -26,6 +26,7 @@ use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSn
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Status,
};
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{debug, info};
use futures::future;
use serde::{Deserialize, Serialize};
@@ -207,7 +208,7 @@ impl AlterTableProcedure {
let request = self.create_alter_region_request(region_id)?;
let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: common_telemetry::trace_id().unwrap_or_default(),
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::Alter(request)),

View File

@@ -21,6 +21,7 @@ use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_telemetry::info;
use common_telemetry::tracing_context::TracingContext;
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
@@ -199,7 +200,7 @@ impl CreateTableProcedure {
for request in requests {
let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: common_telemetry::trace_id().unwrap_or_default(),
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(request),

View File

@@ -22,6 +22,7 @@ use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
};
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{debug, info};
use futures::future::join_all;
use serde::{Deserialize, Serialize};
@@ -157,7 +158,7 @@ impl DropTableProcedure {
let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: common_telemetry::trace_id().unwrap_or_default(),
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::Drop(PbDropRegionRequest {

View File

@@ -21,6 +21,7 @@ use common_procedure::{
Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
};
use common_telemetry::debug;
use common_telemetry::tracing_context::TracingContext;
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
@@ -154,7 +155,7 @@ impl TruncateTableProcedure {
let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: common_telemetry::trace_id().unwrap_or_default(),
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::Truncate(PbTruncateRegionRequest {

View File

@@ -15,7 +15,8 @@
use std::sync::Arc;
use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
use common_telemetry::info;
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{info, tracing};
use snafu::{OptionExt, ResultExt};
use crate::cache_invalidator::CacheInvalidatorRef;
@@ -140,6 +141,7 @@ impl DdlManager {
})
}
#[tracing::instrument(skip_all)]
pub async fn submit_alter_table_task(
&self,
cluster_id: u64,
@@ -156,6 +158,7 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}
#[tracing::instrument(skip_all)]
pub async fn submit_create_table_task(
&self,
cluster_id: u64,
@@ -172,6 +175,7 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}
#[tracing::instrument(skip_all)]
pub async fn submit_drop_table_task(
&self,
cluster_id: u64,
@@ -194,6 +198,7 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}
#[tracing::instrument(skip_all)]
pub async fn submit_truncate_table_task(
&self,
cluster_id: u64,
@@ -383,21 +388,31 @@ impl DdlTaskExecutor for DdlManager {
ctx: &ExecutorContext,
request: SubmitDdlTaskRequest,
) -> Result<SubmitDdlTaskResponse> {
let cluster_id = ctx.cluster_id.unwrap_or_default();
info!("Submitting Ddl task: {:?}", request.task);
match request.task {
CreateTable(create_table_task) => {
handle_create_table_task(self, cluster_id, create_table_task).await
}
DropTable(drop_table_task) => {
handle_drop_table_task(self, cluster_id, drop_table_task).await
}
AlterTable(alter_table_task) => {
handle_alter_table_task(self, cluster_id, alter_table_task).await
}
TruncateTable(truncate_table_task) => {
handle_truncate_table_task(self, cluster_id, truncate_table_task).await
let span = ctx
.tracing_context
.as_ref()
.map(TracingContext::from_w3c)
.unwrap_or(TracingContext::from_current_span())
.attach(tracing::info_span!("DdlManager::submit_ddl_task"));
async move {
let cluster_id = ctx.cluster_id.unwrap_or_default();
info!("Submitting Ddl task: {:?}", request.task);
match request.task {
CreateTable(create_table_task) => {
handle_create_table_task(self, cluster_id, create_table_task).await
}
DropTable(drop_table_task) => {
handle_drop_table_task(self, cluster_id, drop_table_task).await
}
AlterTable(alter_table_task) => {
handle_alter_table_task(self, cluster_id, alter_table_task).await
}
TruncateTable(truncate_table_task) => {
handle_truncate_table_task(self, cluster_id, truncate_table_task).await
}
}
}
.trace(span)
.await
}
}

View File

@@ -23,7 +23,8 @@ use std::time::{Duration, Instant};
use async_trait::async_trait;
use backon::ExponentialBuilder;
use common_runtime::{RepeatedTask, TaskFunction};
use common_telemetry::{info, logging};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{info, logging, tracing};
use snafu::{ensure, ResultExt};
use tokio::sync::watch::{self, Receiver, Sender};
use tokio::sync::{Mutex as TokioMutex, Notify};
@@ -452,9 +453,19 @@ impl LocalManager {
DuplicateProcedureSnafu { procedure_id },
);
let tracing_context = TracingContext::from_current_span();
let _handle = common_runtime::spawn_bg(async move {
// Run the root procedure.
runner.run().await;
// The task was moved to another runtime for execution.
// In order not to interrupt tracing, a span needs to be created to continue tracing the current task.
runner
.run()
.trace(
tracing_context
.attach(tracing::info_span!("LocalManager::submit_root_procedure")),
)
.await;
});
Ok(watcher)

View File

@@ -14,20 +14,19 @@ common-error.workspace = true
console-subscriber = { version = "0.1", optional = true }
lazy_static.workspace = true
once_cell.workspace = true
opentelemetry = { version = "0.17", default-features = false, features = [
opentelemetry = { version = "0.21.0", default-features = false, features = [
"trace",
"rt-tokio",
] }
opentelemetry-jaeger = { version = "0.16", features = ["rt-tokio"] }
opentelemetry-otlp = { version = "0.14.0", features = ["tokio"] }
opentelemetry-semantic-conventions = "0.13.0"
opentelemetry_sdk = { version = "0.21.0", features = ["rt-tokio"] }
parking_lot = { version = "0.12" }
prometheus.workspace = true
rand.workspace = true
rs-snowflake = "0.6"
serde.workspace = true
tokio.workspace = true
tracing = "0.1"
tracing-appender = "0.2"
tracing-futures = { version = "0.2", features = ["futures-03"] }
tracing-log = "0.1"
tracing-opentelemetry = "0.17"
tracing-opentelemetry = "0.22.0"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View File

@@ -16,45 +16,9 @@ pub mod logging;
mod macros;
pub mod metric;
mod panic_hook;
pub mod tracing_context;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
pub use logging::{init_default_ut_logging, init_global_logging, trace_id, TRACE_ID};
pub use logging::{init_default_ut_logging, init_global_logging};
pub use metric::dump_metrics;
use once_cell::sync::OnceCell;
pub use panic_hook::set_panic_hook;
use parking_lot::Mutex;
use rand::random;
use snowflake::SnowflakeIdBucket;
pub use {common_error, tracing, tracing_appender, tracing_futures, tracing_subscriber};
static NODE_ID: OnceCell<u64> = OnceCell::new();
static TRACE_BUCKET: OnceCell<Mutex<SnowflakeIdBucket>> = OnceCell::new();
pub fn gen_trace_id() -> u64 {
let mut bucket = TRACE_BUCKET
.get_or_init(|| {
// if node_id is not initialized, how about random one?
let node_id = NODE_ID.get_or_init(|| 0);
info!("initializing bucket with node_id: {}", node_id);
let bucket = SnowflakeIdBucket::new(1, (*node_id) as i32);
Mutex::new(bucket)
})
.lock();
(*bucket).get_id() as u64
}
pub fn init_node_id(node_id: Option<String>) {
let node_id = node_id.map(|id| calculate_hash(&id)).unwrap_or(random());
match NODE_ID.set(node_id) {
Ok(_) => {}
Err(_) => warn!("node_id is already initialized"),
}
}
fn calculate_hash<T: Hash>(t: &T) -> u64 {
let mut s = DefaultHasher::new();
t.hash(&mut s);
s.finish()
}
pub use {common_error, tracing};

View File

@@ -17,10 +17,11 @@ use std::env;
use std::sync::{Arc, Mutex, Once};
use once_cell::sync::Lazy;
use opentelemetry::global;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry::{global, KeyValue};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_semantic_conventions::resource;
use serde::{Deserialize, Serialize};
pub use tracing::{event, span, Level};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_appender::rolling::{RollingFileAppender, Rotation};
use tracing_log::LogTracer;
@@ -29,38 +30,17 @@ use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::prelude::*;
use tracing_subscriber::{filter, EnvFilter, Registry};
pub use crate::{debug, error, info, log, trace, warn};
pub use crate::{debug, error, info, trace, warn};
tokio::task_local! {
/// Task local trace id. See [trace_id](crate::trace_id) for more details.
pub static TRACE_ID: u64;
}
/// Get current [TRACE_ID] from tokio [task_local](tokio::task_local) storage.
///
/// # Usage
/// To set current trace id, wrap your async code like this:
/// ```rust, no_run
/// common_telemetry::TRACE_ID
/// .scope(id, async move {
/// query_handler
/// .do_query(query, self.session.context())
/// .await
/// })
/// .await
/// ```
/// Then all functions called from this stack will be able to retrieve the trace id
/// via this method.
pub fn trace_id() -> Option<u64> {
TRACE_ID.try_with(|id| Some(*id)).unwrap_or(None)
}
const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct LoggingOptions {
pub dir: String,
pub level: Option<String>,
pub enable_jaeger_tracing: bool,
pub enable_otlp_tracing: bool,
pub otlp_endpoint: Option<String>,
}
impl Default for LoggingOptions {
@@ -68,7 +48,8 @@ impl Default for LoggingOptions {
Self {
dir: "/tmp/greptimedb/logs".to_string(),
level: None,
enable_jaeger_tracing: false,
enable_otlp_tracing: false,
otlp_endpoint: None,
}
}
}
@@ -106,9 +87,10 @@ pub fn init_default_ut_logging() {
"unittest",
&opts,
TracingOptions::default(),
None
));
info!("logs dir = {}", dir);
crate::info!("logs dir = {}", dir);
});
}
@@ -122,11 +104,12 @@ pub fn init_global_logging(
app_name: &str,
opts: &LoggingOptions,
tracing_opts: TracingOptions,
node_id: Option<String>,
) -> Vec<WorkerGuard> {
let mut guards = vec![];
let dir = &opts.dir;
let level = &opts.level;
let enable_jaeger_tracing = opts.enable_jaeger_tracing;
let enable_otlp_tracing = opts.enable_otlp_tracing;
// Enable log compatible layer to convert log record to tracing span.
LogTracer::init().expect("log tracer must be valid");
@@ -204,15 +187,34 @@ pub fn init_global_logging(
.with(file_logging_layer)
.with(err_file_logging_layer.with_filter(filter::LevelFilter::ERROR));
if enable_jaeger_tracing {
// Jaeger layer.
if enable_otlp_tracing {
global::set_text_map_propagator(TraceContextPropagator::new());
let tracer = opentelemetry_jaeger::new_pipeline()
.with_service_name(app_name)
.install_batch(opentelemetry::runtime::Tokio)
.expect("install");
let jaeger_layer = Some(tracing_opentelemetry::layer().with_tracer(tracer));
let subscriber = subscriber.with(jaeger_layer);
// otlp exporter
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter().tonic().with_endpoint(
opts.otlp_endpoint
.as_ref()
.map(|e| format!("http://{}", e))
.unwrap_or(DEFAULT_OTLP_ENDPOINT.to_string()),
),
)
.with_trace_config(opentelemetry_sdk::trace::config().with_resource(
opentelemetry_sdk::Resource::new(vec![
KeyValue::new(resource::SERVICE_NAME, app_name.to_string()),
KeyValue::new(
resource::SERVICE_INSTANCE_ID,
node_id.unwrap_or("none".to_string()),
),
KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
]),
))
.install_batch(opentelemetry_sdk::runtime::Tokio)
.expect("otlp tracer install failed");
let tracing_layer = Some(tracing_opentelemetry::layer().with_tracer(tracer));
let subscriber = subscriber.with(tracing_layer);
tracing::subscriber::set_global_default(subscriber)
.expect("error setting global tracing subscriber");
} else {

View File

@@ -17,14 +17,12 @@
macro_rules! log {
// log!(target: "my_target", Level::INFO, "a {} event", "log");
(target: $target:expr, $lvl:expr, $($arg:tt)+) => {{
let _trace_id = $crate::trace_id();
$crate::logging::event!(target: $target, $lvl, trace_id = _trace_id, $($arg)+)
$crate::tracing::event!(target: $target, $lvl, $($arg)+)
}};
// log!(Level::INFO, "a log event")
($lvl:expr, $($arg:tt)+) => {{
let _trace_id = $crate::trace_id();
$crate::logging::event!($lvl, trace_id = _trace_id, $($arg)+)
$crate::tracing::event!($lvl, $($arg)+)
}};
}
@@ -33,14 +31,14 @@ macro_rules! log {
macro_rules! error {
// error!(target: "my_target", "a {} event", "log")
(target: $target:expr, $($arg:tt)+) => ({
$crate::log!(target: $target, $crate::logging::Level::ERROR, $($arg)+)
$crate::log!(target: $target, $crate::tracing::Level::ERROR, $($arg)+)
});
// error!(e; target: "my_target", "a {} event", "log")
($e:expr; target: $target:expr, $($arg:tt)+) => ({
$crate::log!(
target: $target,
$crate::logging::Level::ERROR,
$crate::tracing::Level::ERROR,
err = ?$e,
$($arg)+
)
@@ -50,7 +48,7 @@ macro_rules! error {
(%$e:expr; target: $target:expr, $($arg:tt)+) => ({
$crate::log!(
target: $target,
$crate::logging::Level::ERROR,
$crate::tracing::Level::ERROR,
err = %$e,
$($arg)+
)
@@ -59,7 +57,7 @@ macro_rules! error {
// error!(e; "a {} event", "log")
($e:expr; $($arg:tt)+) => ({
$crate::log!(
$crate::logging::Level::ERROR,
$crate::tracing::Level::ERROR,
err = ?$e,
$($arg)+
)
@@ -68,7 +66,7 @@ macro_rules! error {
// error!(%e; "a {} event", "log")
(%$e:expr; $($arg:tt)+) => ({
$crate::log!(
$crate::logging::Level::ERROR,
$crate::tracing::Level::ERROR,
err = %$e,
$($arg)+
)
@@ -76,7 +74,7 @@ macro_rules! error {
// error!("a {} event", "log")
($($arg:tt)+) => ({
$crate::log!($crate::logging::Level::ERROR, $($arg)+)
$crate::log!($crate::tracing::Level::ERROR, $($arg)+)
});
}
@@ -85,13 +83,13 @@ macro_rules! error {
macro_rules! warn {
// warn!(target: "my_target", "a {} event", "log")
(target: $target:expr, $($arg:tt)+) => {
$crate::log!(target: $target, $crate::logging::Level::WARN, $($arg)+)
$crate::log!(target: $target, $crate::tracing::Level::WARN, $($arg)+)
};
// warn!(e; "a {} event", "log")
($e:expr; $($arg:tt)+) => ({
$crate::log!(
$crate::logging::Level::WARN,
$crate::tracing::Level::WARN,
err = ?$e,
$($arg)+
)
@@ -100,7 +98,7 @@ macro_rules! warn {
// warn!(%e; "a {} event", "log")
(%$e:expr; $($arg:tt)+) => ({
$crate::log!(
$crate::logging::Level::WARN,
$crate::tracing::Level::WARN,
err = %$e,
$($arg)+
)
@@ -108,7 +106,7 @@ macro_rules! warn {
// warn!("a {} event", "log")
($($arg:tt)+) => {
$crate::log!($crate::logging::Level::WARN, $($arg)+)
$crate::log!($crate::tracing::Level::WARN, $($arg)+)
};
}
@@ -117,12 +115,12 @@ macro_rules! warn {
macro_rules! info {
// info!(target: "my_target", "a {} event", "log")
(target: $target:expr, $($arg:tt)+) => {
$crate::log!(target: $target, $crate::logging::Level::INFO, $($arg)+)
$crate::log!(target: $target, $crate::tracing::Level::INFO, $($arg)+)
};
// info!("a {} event", "log")
($($arg:tt)+) => {
$crate::log!($crate::logging::Level::INFO, $($arg)+)
$crate::log!($crate::tracing::Level::INFO, $($arg)+)
};
}
@@ -131,12 +129,12 @@ macro_rules! info {
macro_rules! debug {
// debug!(target: "my_target", "a {} event", "log")
(target: $target:expr, $($arg:tt)+) => {
$crate::log!(target: $target, $crate::logging::Level::DEBUG, $($arg)+)
$crate::log!(target: $target, $crate::tracing::Level::DEBUG, $($arg)+)
};
// debug!("a {} event", "log")
($($arg:tt)+) => {
$crate::log!($crate::logging::Level::DEBUG, $($arg)+)
$crate::log!($crate::tracing::Level::DEBUG, $($arg)+)
};
}
@@ -145,12 +143,12 @@ macro_rules! debug {
macro_rules! trace {
// trace!(target: "my_target", "a {} event", "log")
(target: $target:expr, $($arg:tt)+) => {
$crate::log!(target: $target, $crate::logging::Level::TRACE, $($arg)+)
$crate::log!(target: $target, $crate::tracing::Level::TRACE, $($arg)+)
};
// trace!("a {} event", "log")
($($arg:tt)+) => {
$crate::log!($crate::logging::Level::TRACE, $($arg)+)
$crate::log!($crate::tracing::Level::TRACE, $($arg)+)
};
}
@@ -158,8 +156,7 @@ macro_rules! trace {
mod tests {
use common_error::mock::MockError;
use common_error::status_code::StatusCode;
use crate::logging::Level;
use tracing::Level;
macro_rules! all_log_macros {
($($arg:tt)*) => {

View File

@@ -0,0 +1,92 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! tracing stuffs, inspired by RisingWave
use std::collections::HashMap;
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use tracing_opentelemetry::OpenTelemetrySpanExt;
// An wapper for `Futures` that provides tracing instrument adapters.
pub trait FutureExt: std::future::Future + Sized {
fn trace(self, span: tracing::span::Span) -> tracing::instrument::Instrumented<Self>;
}
impl<T: std::future::Future> FutureExt for T {
#[inline]
fn trace(self, span: tracing::span::Span) -> tracing::instrument::Instrumented<Self> {
tracing::instrument::Instrument::instrument(self, span)
}
}
/// Context for tracing used for propagating tracing information in a distributed system.
///
/// Generally, the caller of a service should create a tracing context from the current tracing span
/// and pass it to the callee through the network. The callee will then attach its local tracing
/// span as a child of the tracing context, so that the external tracing service can associate them
/// in a single trace.
///
/// The tracing context must be serialized into the W3C trace context format and passed in rpc
/// message headers when communication of frontend, datanode and meta.
///
/// See [Trace Context](https://www.w3.org/TR/trace-context/) for more information.
#[derive(Debug, Clone)]
pub struct TracingContext(opentelemetry::Context);
pub type W3cTrace = HashMap<String, String>;
impl Default for TracingContext {
fn default() -> Self {
Self::new()
}
}
type Propagator = TraceContextPropagator;
impl TracingContext {
/// Create a new tracing context from a tracing span.
pub fn from_span(span: &tracing::Span) -> Self {
Self(span.context())
}
/// Create a new tracing context from the current tracing span considered by the subscriber.
pub fn from_current_span() -> Self {
Self::from_span(&tracing::Span::current())
}
/// Create a no-op tracing context.
pub fn new() -> Self {
Self(opentelemetry::Context::new())
}
/// Attach the given span as a child of the context. Returns the attached span.
pub fn attach(&self, span: tracing::Span) -> tracing::Span {
span.set_parent(self.0.clone());
span
}
/// Convert the tracing context to the W3C trace context format.
pub fn to_w3c(&self) -> W3cTrace {
let mut fields = HashMap::new();
Propagator::new().inject_context(&self.0, &mut fields);
fields
}
/// Create a new tracing context from the W3C trace context format.
pub fn from_w3c(fields: &W3cTrace) -> Self {
let context = Propagator::new().extract(fields);
Self(context)
}
}

View File

@@ -28,6 +28,8 @@ use common_query::physical_plan::DfPhysicalPlanAdapter;
use common_query::{DfPhysicalPlan, Output};
use common_recordbatch::SendableRecordBatchStream;
use common_runtime::Runtime;
use common_telemetry::tracing::{self, info_span};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{info, warn};
use dashmap::DashMap;
use datafusion::catalog::schema::SchemaProvider;
@@ -99,6 +101,7 @@ impl RegionServer {
self.inner.handle_request(region_id, request).await
}
#[tracing::instrument(skip_all)]
pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
self.inner.handle_read(request).await
}
@@ -154,9 +157,19 @@ impl RegionServerHandler for RegionServer {
.context(BuildRegionRequestsSnafu)
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)?;
let tracing_context = TracingContext::from_current_span();
let join_tasks = requests.into_iter().map(|(region_id, req)| {
let self_to_move = self.clone();
async move { self_to_move.handle_request(region_id, req).await }
let span = tracing_context.attach(info_span!(
"RegionServer::handle_region_request",
region_id = region_id.to_string()
));
async move {
self_to_move
.handle_request(region_id, req)
.trace(span)
.await
}
});
let results = try_join_all(join_tasks)
@@ -198,15 +211,18 @@ impl FlightCraft for RegionServer {
let ticket = request.into_inner().ticket;
let request = QueryRequest::decode(ticket.as_ref())
.context(servers_error::InvalidFlightTicketSnafu)?;
let trace_id = request
let tracing_context = request
.header
.as_ref()
.map(|h| h.trace_id)
.map(|h| TracingContext::from_w3c(&h.tracing_context))
.unwrap_or_default();
let result = self.handle_read(request).await?;
let result = self
.handle_read(request)
.trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
.await?;
let stream = Box::pin(FlightRecordBatchStream::new(result, trace_id));
let stream = Box::pin(FlightRecordBatchStream::new(result, tracing_context));
Ok(Response::new(stream))
}
}
@@ -283,6 +299,10 @@ impl RegionServerInner {
let result = engine
.handle_request(region_id, request)
.trace(info_span!(
"RegionEngine::handle_region_request",
engine_type
))
.await
.with_context(|_| HandleRegionRequestSnafu { region_id })?;

View File

@@ -207,8 +207,6 @@ impl Instance {
Arc::new(handlers_executor),
));
common_telemetry::init_node_id(opts.node_id.clone());
Ok(Instance {
catalog_manager,
script_executor,

View File

@@ -27,6 +27,8 @@ use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::sequence::{Sequence, SequenceRef};
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::tracing;
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use datanode::region_server::RegionServer;
use servers::grpc::region_server::RegionServerHandler;
use snafu::{OptionExt, ResultExt};
@@ -71,8 +73,15 @@ impl RegionInvoker {
#[async_trait]
impl Datanode for RegionInvoker {
async fn handle(&self, request: RegionRequest) -> MetaResult<AffectedRows> {
let span = request
.header
.as_ref()
.map(|h| TracingContext::from_w3c(&h.tracing_context))
.unwrap_or_default()
.attach(tracing::info_span!("RegionInvoker::handle_region_request"));
let response = self
.handle_inner(request)
.trace(span)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
@@ -83,8 +92,15 @@ impl Datanode for RegionInvoker {
}
async fn handle_query(&self, request: QueryRequest) -> MetaResult<SendableRecordBatchStream> {
let span = request
.header
.as_ref()
.map(|h| TracingContext::from_w3c(&h.tracing_context))
.unwrap_or_default()
.attach(tracing::info_span!("RegionInvoker::handle_query"));
self.region_server
.handle_read(request)
.trace(span)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)

View File

@@ -19,6 +19,7 @@ use api::v1::meta::heartbeat_client::HeartbeatClient;
use api::v1::meta::{AskLeaderRequest, RequestHeader, Role};
use common_grpc::channel_manager::ChannelManager;
use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::warn;
use rand::seq::SliceRandom;
use snafu::{OptionExt, ResultExt};
@@ -77,7 +78,11 @@ impl AskLeader {
peers.shuffle(&mut rand::thread_rng());
let req = AskLeaderRequest {
header: Some(RequestHeader::new(self.id, self.role)),
header: Some(RequestHeader::new(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
)),
};
let (tx, mut rx) = tokio::sync::mpsc::channel(peers.len());

View File

@@ -17,6 +17,7 @@ use std::sync::Arc;
use api::v1::meta::ddl_task_client::DdlTaskClient;
use api::v1::meta::{ErrorCode, ResponseHeader, Role, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_grpc::channel_manager::ChannelManager;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{info, warn};
use snafu::{ensure, ResultExt};
use tokio::sync::RwLock;
@@ -133,7 +134,11 @@ impl Inner {
}
);
req.set_header(self.id, self.role);
req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
let ask_leader = self.ask_leader.as_ref().unwrap();
let mut times = 0;

View File

@@ -19,6 +19,7 @@ use api::v1::meta::{HeartbeatRequest, HeartbeatResponse, RequestHeader, Role};
use common_grpc::channel_manager::ChannelManager;
use common_meta::rpc::util;
use common_telemetry::info;
use common_telemetry::tracing_context::TracingContext;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::{mpsc, RwLock};
use tokio_stream::wrappers::ReceiverStream;
@@ -49,7 +50,11 @@ impl HeartbeatSender {
#[inline]
pub async fn send(&self, mut req: HeartbeatRequest) -> Result<()> {
req.set_header(self.id, self.role);
req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
self.sender.send(req).await.map_err(|e| {
error::SendHeartbeatSnafu {
err_msg: e.to_string(),
@@ -207,7 +212,11 @@ impl Inner {
let (sender, receiver) = mpsc::channel::<HeartbeatRequest>(128);
let header = RequestHeader::new(self.id, self.role);
let header = RequestHeader::new(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
let handshake = HeartbeatRequest {
header: Some(header),
..Default::default()

View File

@@ -18,6 +18,7 @@ use std::sync::Arc;
use api::v1::meta::lock_client::LockClient;
use api::v1::meta::{LockRequest, LockResponse, Role, UnlockRequest, UnlockResponse};
use common_grpc::channel_manager::ChannelManager;
use common_telemetry::tracing_context::TracingContext;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::RwLock;
use tonic::transport::Channel;
@@ -127,7 +128,11 @@ impl Inner {
async fn lock(&self, mut req: LockRequest) -> Result<LockResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
let res = client.lock(req).await.map_err(error::Error::from)?;
Ok(res.into_inner())
@@ -135,7 +140,11 @@ impl Inner {
async fn unlock(&self, mut req: UnlockRequest) -> Result<UnlockResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
let res = client.unlock(req).await.map_err(error::Error::from)?;
Ok(res.into_inner())

View File

@@ -22,6 +22,7 @@ use api::v1::meta::{
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, Role,
};
use common_grpc::channel_manager::ChannelManager;
use common_telemetry::tracing_context::TracingContext;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::RwLock;
use tonic::transport::Channel;
@@ -134,7 +135,11 @@ impl Inner {
async fn range(&self, mut req: RangeRequest) -> Result<RangeResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
let res = client.range(req).await.map_err(error::Error::from)?;
Ok(res.into_inner())
@@ -142,7 +147,11 @@ impl Inner {
async fn put(&self, mut req: PutRequest) -> Result<PutResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
let res = client.put(req).await.map_err(error::Error::from)?;
Ok(res.into_inner())
@@ -150,7 +159,11 @@ impl Inner {
async fn batch_get(&self, mut req: BatchGetRequest) -> Result<BatchGetResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
let res = client.batch_get(req).await.map_err(error::Error::from)?;
@@ -159,7 +172,11 @@ impl Inner {
async fn batch_put(&self, mut req: BatchPutRequest) -> Result<BatchPutResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
let res = client.batch_put(req).await.map_err(error::Error::from)?;
Ok(res.into_inner())
@@ -167,7 +184,11 @@ impl Inner {
async fn batch_delete(&self, mut req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
let res = client.batch_delete(req).await.map_err(error::Error::from)?;
Ok(res.into_inner())
@@ -178,7 +199,11 @@ impl Inner {
mut req: CompareAndPutRequest,
) -> Result<CompareAndPutResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
let res = client
.compare_and_put(req)
.await
@@ -189,7 +214,11 @@ impl Inner {
async fn delete_range(&self, mut req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
let res = client.delete_range(req).await.map_err(error::Error::from)?;
Ok(res.into_inner())

View File

@@ -53,6 +53,7 @@ mod tests {
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::sequence::Sequence;
use common_telemetry::tracing_context::W3cTrace;
use super::*;
use crate::cluster::MetaPeerClientBuilder;
@@ -89,7 +90,7 @@ mod tests {
};
let req = HeartbeatRequest {
header: Some(RequestHeader::new((1, 2), Role::Datanode)),
header: Some(RequestHeader::new((1, 2), Role::Datanode, W3cTrace::new())),
..Default::default()
};
let mut acc = HeartbeatAccumulator::default();

View File

@@ -45,6 +45,7 @@ impl ddl_task_server::DdlTask for MetaSrv {
.submit_ddl_task(
&ExecutorContext {
cluster_id: Some(cluster_id),
tracing_context: Some(header.tracing_context),
},
SubmitDdlTaskRequest { task },
)

View File

@@ -168,6 +168,7 @@ mod tests {
use api::v1::meta::heartbeat_server::Heartbeat;
use api::v1::meta::*;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_telemetry::tracing_context::W3cTrace;
use tonic::IntoRequest;
use super::get_node_id;
@@ -184,7 +185,7 @@ mod tests {
.unwrap();
let req = AskLeaderRequest {
header: Some(RequestHeader::new((1, 1), Role::Datanode)),
header: Some(RequestHeader::new((1, 1), Role::Datanode, W3cTrace::new())),
};
let res = meta_srv.ask_leader(req.into_request()).await.unwrap();

View File

@@ -257,6 +257,7 @@ mod tests {
use api::v1::meta::store_server::Store;
use api::v1::meta::*;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_telemetry::tracing_context::W3cTrace;
use tonic::IntoRequest;
use crate::metasrv::builder::MetaSrvBuilder;
@@ -275,7 +276,7 @@ mod tests {
let meta_srv = new_meta_srv().await;
let mut req = RangeRequest::default();
req.set_header((1, 1), Role::Datanode);
req.set_header((1, 1), Role::Datanode, W3cTrace::new());
let res = meta_srv.range(req.into_request()).await;
let _ = res.unwrap();
@@ -286,7 +287,7 @@ mod tests {
let meta_srv = new_meta_srv().await;
let mut req = PutRequest::default();
req.set_header((1, 1), Role::Datanode);
req.set_header((1, 1), Role::Datanode, W3cTrace::new());
let res = meta_srv.put(req.into_request()).await;
let _ = res.unwrap();
@@ -297,7 +298,7 @@ mod tests {
let meta_srv = new_meta_srv().await;
let mut req = BatchGetRequest::default();
req.set_header((1, 1), Role::Datanode);
req.set_header((1, 1), Role::Datanode, W3cTrace::new());
let res = meta_srv.batch_get(req.into_request()).await;
let _ = res.unwrap();
@@ -308,7 +309,7 @@ mod tests {
let meta_srv = new_meta_srv().await;
let mut req = BatchPutRequest::default();
req.set_header((1, 1), Role::Datanode);
req.set_header((1, 1), Role::Datanode, W3cTrace::new());
let res = meta_srv.batch_put(req.into_request()).await;
let _ = res.unwrap();
@@ -319,7 +320,7 @@ mod tests {
let meta_srv = new_meta_srv().await;
let mut req = BatchDeleteRequest::default();
req.set_header((1, 1), Role::Datanode);
req.set_header((1, 1), Role::Datanode, W3cTrace::new());
let res = meta_srv.batch_delete(req.into_request()).await;
let _ = res.unwrap();
@@ -330,7 +331,7 @@ mod tests {
let meta_srv = new_meta_srv().await;
let mut req = CompareAndPutRequest::default();
req.set_header((1, 1), Role::Datanode);
req.set_header((1, 1), Role::Datanode, W3cTrace::new());
let res = meta_srv.compare_and_put(req.into_request()).await;
let _ = res.unwrap();
@@ -341,7 +342,7 @@ mod tests {
let meta_srv = new_meta_srv().await;
let mut req = DeleteRangeRequest::default();
req.set_header((1, 1), Role::Datanode);
req.set_header((1, 1), Role::Datanode, W3cTrace::new());
let res = meta_srv.delete_range(req.into_request()).await;
let _ = res.unwrap();

View File

@@ -17,8 +17,7 @@
use std::time::Duration;
use common_query::Output;
use common_telemetry::info;
use common_telemetry::tracing::warn;
use common_telemetry::{info, warn};
use futures::TryStreamExt;
use object_store::util::join_path;
use object_store::{EntryMode, ObjectStore};

View File

@@ -22,6 +22,7 @@ use catalog::CatalogManagerRef;
use common_meta::datanode_manager::{AffectedRows, DatanodeManagerRef};
use common_meta::peer::Peer;
use common_query::Output;
use common_telemetry::tracing_context::TracingContext;
use futures_util::future;
use partition::manager::PartitionRuleManagerRef;
use session::context::QueryContextRef;
@@ -119,8 +120,10 @@ impl Deleter {
requests: RegionDeleteRequests,
ctx: &QueryContextRef,
) -> Result<AffectedRows> {
let header: RegionRequestHeader = ctx.as_ref().into();
let request_factory = RegionRequestFactory::new(header);
let request_factory = RegionRequestFactory::new(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
dbname: ctx.get_db_string(),
});
let tasks = self
.group_requests_by_peer(requests)

View File

@@ -26,6 +26,7 @@ use common_grpc_expr::util::{extract_new_columns, ColumnExpr};
use common_meta::datanode_manager::{AffectedRows, DatanodeManagerRef};
use common_meta::peer::Peer;
use common_query::Output;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{error, info};
use datatypes::schema::Schema;
use futures_util::future;
@@ -152,8 +153,10 @@ impl Inserter {
ctx: &QueryContextRef,
) -> Result<AffectedRows> {
write_meter!(ctx.current_catalog(), ctx.current_schema(), requests);
let header: RegionRequestHeader = ctx.as_ref().into();
let request_factory = RegionRequestFactory::new(header);
let request_factory = RegionRequestFactory::new(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
dbname: ctx.get_db_string(),
});
let tasks = self
.group_requests_by_peer(requests)

View File

@@ -32,6 +32,7 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::table_name::TableName;
use common_query::Output;
use common_telemetry::tracing;
use common_time::range::TimestampRange;
use common_time::Timestamp;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
@@ -88,6 +89,7 @@ impl StatementExecutor {
}
}
#[tracing::instrument(skip_all)]
pub async fn execute_stmt(
&self,
stmt: QueryStatement,
@@ -201,6 +203,7 @@ impl StatementExecutor {
.context(PlanStatementSnafu)
}
#[tracing::instrument(skip_all)]
async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
let plan = self.plan(stmt, query_ctx.clone()).await?;
self.query_engine

View File

@@ -14,7 +14,7 @@
use common_datasource::file_format::Format;
use common_query::Output;
use common_telemetry::info;
use common_telemetry::{info, tracing};
use session::context::QueryContextBuilder;
use snafu::{ensure, ResultExt};
use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest};
@@ -27,6 +27,7 @@ pub(crate) const COPY_DATABASE_TIME_START_KEY: &str = "start_time";
pub(crate) const COPY_DATABASE_TIME_END_KEY: &str = "end_time";
impl StatementExecutor {
#[tracing::instrument(skip_all)]
pub(crate) async fn copy_database(&self, req: CopyDatabaseRequest) -> error::Result<Output> {
// location must end with / so that every table is exported to a file.
ensure!(

View File

@@ -29,7 +29,7 @@ use common_datasource::object_store::{build_backend, parse_url};
use common_datasource::util::find_dir_and_filename;
use common_recordbatch::adapter::ParquetRecordBatchStreamAdapter;
use common_recordbatch::DfSendableRecordBatchStream;
use common_telemetry::debug;
use common_telemetry::{debug, tracing};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileStream};
@@ -237,6 +237,7 @@ impl StatementExecutor {
}
}
#[tracing::instrument(skip_all)]
pub async fn copy_table_from(
&self,
req: CopyTableRequest,

View File

@@ -23,7 +23,7 @@ use common_datasource::util::find_dir_and_filename;
use common_query::Output;
use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;
use common_telemetry::{debug, tracing};
use datafusion::datasource::DefaultTableSource;
use datafusion_common::TableReference as DfTableReference;
use datafusion_expr::LogicalPlanBuilder;
@@ -84,6 +84,7 @@ impl StatementExecutor {
}
}
#[tracing::instrument(skip_all)]
pub(crate) async fn copy_table_to(
&self,
req: CopyTableRequest,

View File

@@ -28,7 +28,7 @@ use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse
use common_meta::rpc::router::{Partition, Partition as MetaPartition};
use common_meta::table_name::TableName;
use common_query::Output;
use common_telemetry::info;
use common_telemetry::{info, tracing};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::RawSchema;
use partition::partition::{PartitionBound, PartitionDef};
@@ -58,11 +58,13 @@ impl StatementExecutor {
self.catalog_manager.clone()
}
#[tracing::instrument(skip_all)]
pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result<TableRef> {
let create_expr = &mut expr_factory::create_to_expr(&stmt, ctx)?;
self.create_table_inner(create_expr, stmt.partitions).await
}
#[tracing::instrument(skip_all)]
pub async fn create_external_table(
&self,
create_expr: CreateExternalTable,
@@ -151,6 +153,7 @@ impl StatementExecutor {
Ok(table)
}
#[tracing::instrument(skip_all)]
pub async fn drop_table(&self, table_name: TableName) -> Result<Output> {
let table = self
.catalog_manager
@@ -181,6 +184,7 @@ impl StatementExecutor {
Ok(Output::AffectedRows(0))
}
#[tracing::instrument(skip_all)]
pub async fn truncate_table(&self, table_name: TableName) -> Result<Output> {
let table = self
.catalog_manager
@@ -221,6 +225,7 @@ impl StatementExecutor {
Ok(())
}
#[tracing::instrument(skip_all)]
pub async fn alter_table(
&self,
alter_table: AlterTable,
@@ -347,6 +352,7 @@ impl StatementExecutor {
.context(error::ExecuteDdlSnafu)
}
#[tracing::instrument(skip_all)]
pub async fn create_database(
&self,
catalog: &str,

View File

@@ -14,6 +14,7 @@
use common_error::ext::BoxedError;
use common_query::Output;
use common_telemetry::tracing;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use sql::statements::describe::DescribeTable;
@@ -26,6 +27,7 @@ use crate::statement::StatementExecutor;
use crate::table::table_idents_to_full_name;
impl StatementExecutor {
#[tracing::instrument(skip_all)]
pub(super) async fn describe_table(
&self,
stmt: DescribeTable,

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use common_query::Output;
use common_telemetry::tracing;
use query::parser::QueryStatement;
use session::context::QueryContextRef;
use sql::statements::insert::Insert;
@@ -22,6 +23,7 @@ use super::StatementExecutor;
use crate::error::Result;
impl StatementExecutor {
#[tracing::instrument(skip_all)]
pub async fn insert(&self, insert: Box<Insert>, query_ctx: QueryContextRef) -> Result<Output> {
if insert.can_extract_values() {
// Fast path: plain insert ("insert with literal values") is executed directly

View File

@@ -14,6 +14,7 @@
use common_meta::table_name::TableName;
use common_query::Output;
use common_telemetry::tracing;
use partition::manager::PartitionInfo;
use partition::partition::PartitionBound;
use session::context::QueryContextRef;
@@ -28,6 +29,7 @@ use crate::error::{self, ExecuteStatementSnafu, Result};
use crate::statement::StatementExecutor;
impl StatementExecutor {
#[tracing::instrument(skip_all)]
pub(super) async fn show_databases(
&self,
stmt: ShowDatabases,
@@ -38,6 +40,7 @@ impl StatementExecutor {
.context(ExecuteStatementSnafu)
}
#[tracing::instrument(skip_all)]
pub(super) async fn show_tables(
&self,
stmt: ShowTables,
@@ -48,6 +51,7 @@ impl StatementExecutor {
.context(ExecuteStatementSnafu)
}
#[tracing::instrument(skip_all)]
pub async fn show_create_table(
&self,
table_name: TableName,

View File

@@ -15,6 +15,7 @@
use std::collections::HashMap;
use common_query::Output;
use common_telemetry::tracing;
use query::parser::{PromQuery, QueryLanguageParser, ANALYZE_NODE_NAME, EXPLAIN_NODE_NAME};
use session::context::QueryContextRef;
use snafu::ResultExt;
@@ -24,6 +25,7 @@ use crate::error::{ExecLogicalPlanSnafu, ParseQuerySnafu, PlanStatementSnafu, Re
use crate::statement::StatementExecutor;
impl StatementExecutor {
#[tracing::instrument(skip_all)]
pub(super) async fn execute_tql(&self, tql: Tql, query_ctx: QueryContextRef) -> Result<Output> {
let stmt = match tql {
Tql::Eval(eval) => {

View File

@@ -34,6 +34,7 @@ use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{
EmptyRecordBatchStream, RecordBatch, RecordBatches, SendableRecordBatchStream,
};
use common_telemetry::tracing;
use datafusion::common::Column;
use datafusion::physical_plan::analyze::AnalyzeExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -77,6 +78,7 @@ impl DatafusionQueryEngine {
Self { state, plugins }
}
#[tracing::instrument(skip_all)]
async fn exec_query_plan(
&self,
plan: LogicalPlan,
@@ -97,6 +99,7 @@ impl DatafusionQueryEngine {
Ok(Output::Stream(self.execute_stream(&ctx, &physical_plan)?))
}
#[tracing::instrument(skip_all)]
async fn exec_dml_statement(
&self,
dml: DmlStatement,
@@ -147,6 +150,7 @@ impl DatafusionQueryEngine {
Ok(Output::AffectedRows(affected_rows))
}
#[tracing::instrument(skip_all)]
async fn delete<'a>(
&self,
table_name: &ResolvedTableReference<'a>,
@@ -189,6 +193,7 @@ impl DatafusionQueryEngine {
.await
}
#[tracing::instrument(skip_all)]
async fn insert<'a>(
&self,
table_name: &ResolvedTableReference<'a>,
@@ -285,6 +290,7 @@ impl QueryEngine for DatafusionQueryEngine {
}
impl LogicalOptimizer for DatafusionQueryEngine {
#[tracing::instrument(skip_all)]
fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
let _timer = metrics::METRIC_OPTIMIZE_LOGICAL_ELAPSED.start_timer();
match plan {
@@ -305,6 +311,7 @@ impl LogicalOptimizer for DatafusionQueryEngine {
#[async_trait::async_trait]
impl PhysicalPlanner for DatafusionQueryEngine {
#[tracing::instrument(skip_all)]
async fn create_physical_plan(
&self,
ctx: &mut QueryEngineContext,
@@ -338,6 +345,7 @@ impl PhysicalPlanner for DatafusionQueryEngine {
}
impl PhysicalOptimizer for DatafusionQueryEngine {
#[tracing::instrument(skip_all)]
fn optimize_physical_plan(
&self,
ctx: &mut QueryEngineContext,
@@ -385,6 +393,7 @@ impl PhysicalOptimizer for DatafusionQueryEngine {
}
impl QueryExecutor for DatafusionQueryEngine {
#[tracing::instrument(skip_all)]
fn execute_stream(
&self,
ctx: &QueryEngineContext,

View File

@@ -27,7 +27,8 @@ use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{
DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamAdaptor, SendableRecordBatchStream,
};
use common_telemetry::trace_id;
use common_telemetry::tracing;
use common_telemetry::tracing_context::TracingContext;
use datafusion::physical_plan::metrics::{
Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, Time,
};
@@ -155,6 +156,7 @@ impl MergeScanExec {
})
}
#[tracing::instrument(skip_all)]
pub fn to_stream(&self, context: Arc<TaskContext>) -> Result<SendableRecordBatchStream> {
let substrait_plan = self.substrait_plan.to_vec();
let regions = self.regions.clone();
@@ -163,7 +165,8 @@ impl MergeScanExec {
let schema = Self::arrow_schema_to_schema(self.schema())?;
let dbname = context.task_id().unwrap_or_default();
let trace_id = trace_id().unwrap_or_default();
let tracing_context = TracingContext::from_current_span().to_w3c();
let stream = Box::pin(stream!({
METRIC_MERGE_SCAN_REGIONS.observe(regions.len() as f64);
@@ -174,8 +177,7 @@ impl MergeScanExec {
for region_id in regions {
let request = QueryRequest {
header: Some(RegionRequestHeader {
trace_id,
span_id: 0,
tracing_context: tracing_context.clone(),
dbname: dbname.clone(),
}),
region_id: region_id.into(),

View File

@@ -17,6 +17,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use catalog::table_source::DfTableSourceProvider;
use common_error::ext::BoxedError;
use common_telemetry::tracing;
use datafusion::execution::context::SessionState;
use datafusion_sql::planner::{ParserOptions, SqlToRel};
use promql::planner::PromPlanner;
@@ -51,6 +52,7 @@ impl DfLogicalPlanner {
}
}
#[tracing::instrument(skip_all)]
async fn plan_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
let df_stmt = (&stmt).try_into().context(SqlSnafu)?;
@@ -85,6 +87,7 @@ impl DfLogicalPlanner {
Ok(LogicalPlan::DfPlan(plan))
}
#[tracing::instrument(skip_all)]
async fn plan_pql(&self, stmt: EvalStmt, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
let table_provider = DfTableSourceProvider::new(
self.engine_state.catalog_manager().clone(),
@@ -101,6 +104,7 @@ impl DfLogicalPlanner {
#[async_trait]
impl LogicalPlanner for DfLogicalPlanner {
#[tracing::instrument(skip_all)]
async fn plan(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
match stmt {
QueryStatement::Sql(stmt) => self.plan_sql(stmt, query_ctx).await,

View File

@@ -26,6 +26,7 @@ use arrow_flight::{
use async_trait::async_trait;
use common_grpc::flight::{FlightEncoder, FlightMessage};
use common_query::Output;
use common_telemetry::tracing_context::TracingContext;
use futures::Stream;
use prost::Message;
use snafu::ResultExt;
@@ -150,28 +151,26 @@ impl FlightCraft for GreptimeRequestHandler {
let ticket = request.into_inner().ticket;
let request =
GreptimeRequest::decode(ticket.as_ref()).context(error::InvalidFlightTicketSnafu)?;
let trace_id = request
.header
.as_ref()
.map(|h| h.trace_id)
.unwrap_or_default();
let output = self.handle_request(request).await?;
let stream: Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + Sync>> =
to_flight_data_stream(output, trace_id);
to_flight_data_stream(output, TracingContext::new());
Ok(Response::new(stream))
}
}
fn to_flight_data_stream(output: Output, trace_id: u64) -> TonicStream<FlightData> {
fn to_flight_data_stream(
output: Output,
tracing_context: TracingContext,
) -> TonicStream<FlightData> {
match output {
Output::Stream(stream) => {
let stream = FlightRecordBatchStream::new(stream, trace_id);
let stream = FlightRecordBatchStream::new(stream, tracing_context);
Box::pin(stream) as _
}
Output::RecordBatches(x) => {
let stream = FlightRecordBatchStream::new(x.as_stream(), trace_id);
let stream = FlightRecordBatchStream::new(x.as_stream(), tracing_context);
Box::pin(stream) as _
}
Output::AffectedRows(rows) => {

View File

@@ -18,7 +18,9 @@ use std::task::{Context, Poll};
use arrow_flight::FlightData;
use common_grpc::flight::{FlightEncoder, FlightMessage};
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{warn, TRACE_ID};
use common_telemetry::tracing::info_span;
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::warn;
use futures::channel::mpsc;
use futures::channel::mpsc::Sender;
use futures::{SinkExt, Stream, StreamExt};
@@ -39,11 +41,13 @@ pub struct FlightRecordBatchStream {
}
impl FlightRecordBatchStream {
pub fn new(recordbatches: SendableRecordBatchStream, trace_id: u64) -> Self {
pub fn new(recordbatches: SendableRecordBatchStream, tracing_context: TracingContext) -> Self {
let (tx, rx) = mpsc::channel::<TonicResult<FlightMessage>>(1);
let join_handle = common_runtime::spawn_read(TRACE_ID.scope(trace_id, async move {
Self::flight_data_stream(recordbatches, tx).await
}));
let join_handle = common_runtime::spawn_read(async move {
Self::flight_data_stream(recordbatches, tx)
.trace(tracing_context.attach(info_span!("flight_data_stream")))
.await
});
Self {
rx,
join_handle,
@@ -145,7 +149,7 @@ mod test {
let recordbatches = RecordBatches::try_new(schema.clone(), vec![recordbatch.clone()])
.unwrap()
.as_stream();
let mut stream = FlightRecordBatchStream::new(recordbatches, 0);
let mut stream = FlightRecordBatchStream::new(recordbatches, TracingContext::default());
let mut raw_data = Vec::with_capacity(2);
raw_data.push(stream.next().await.unwrap().unwrap());

View File

@@ -27,7 +27,7 @@ use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::Output;
use common_runtime::Runtime;
use common_telemetry::{logging, TRACE_ID};
use common_telemetry::logging;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
@@ -70,7 +70,6 @@ impl GreptimeRequestHandler {
let request_type = request_type(&query).to_string();
let db = query_ctx.get_db_string();
let timer = RequestTimer::new(db.clone(), request_type);
let trace_id = query_ctx.trace_id();
// Executes requests in another runtime to
// 1. prevent the execution from being cancelled unexpected by Tonic runtime;
@@ -79,7 +78,7 @@ impl GreptimeRequestHandler {
// - Obtaining a `JoinHandle` to get the panic message (if there's any).
// From its docs, `JoinHandle` is cancel safe. The task keeps running even it's handle been dropped.
// 2. avoid the handler blocks the gRPC runtime incidentally.
let handle = self.runtime.spawn(TRACE_ID.scope(trace_id, async move {
let handle = self.runtime.spawn(async move {
handler.do_query(query, query_ctx).await.map_err(|e| {
if e.status_code().should_log_error() {
logging::error!(e; "Failed to handle request");
@@ -89,7 +88,7 @@ impl GreptimeRequestHandler {
}
e
})
}));
});
handle.await.context(JoinTaskSnafu).map_err(|e| {
timer.record(e.status_code());
@@ -166,7 +165,6 @@ pub(crate) fn create_query_context(header: Option<&RequestHeader>) -> QueryConte
QueryContextBuilder::default()
.current_catalog(catalog.to_string())
.current_schema(schema.to_string())
.try_trace_id(header.map(|h| h.trace_id))
.build()
}

View File

@@ -19,7 +19,9 @@ use api::v1::region::{region_request, RegionRequest, RegionResponse};
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use common_runtime::Runtime;
use common_telemetry::{debug, error, TRACE_ID};
use common_telemetry::tracing::info_span;
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{debug, error};
use snafu::{OptionExt, ResultExt};
use tonic::{Request, Response};
@@ -45,12 +47,14 @@ impl RegionServerRequestHandler {
}
async fn handle(&self, request: RegionRequest) -> Result<RegionResponse> {
let trace_id = request
.header
.context(InvalidQuerySnafu {
reason: "Expecting non-empty region request header.",
})?
.trace_id;
let tracing_context = TracingContext::from_w3c(
&request
.header
.context(InvalidQuerySnafu {
reason: "Expecting non-empty region request header.",
})?
.tracing_context,
);
let query = request.body.context(InvalidQuerySnafu {
reason: "Expecting non-empty region request body.",
})?;
@@ -64,17 +68,21 @@ impl RegionServerRequestHandler {
// - Obtaining a `JoinHandle` to get the panic message (if there's any).
// From its docs, `JoinHandle` is cancel safe. The task keeps running even it's handle been dropped.
// 2. avoid the handler blocks the gRPC runtime incidentally.
let handle = self.runtime.spawn(TRACE_ID.scope(trace_id, async move {
handler.handle(query).await.map_err(|e| {
if e.status_code().should_log_error() {
error!(e; "Failed to handle request");
} else {
// Currently, we still print a debug log.
debug!("Failed to handle request, err: {:?}", e);
}
e
})
}));
let handle = self.runtime.spawn(async move {
handler
.handle(query)
.trace(tracing_context.attach(info_span!("RegionServerRequestHandler::handle")))
.await
.map_err(|e| {
if e.status_code().should_log_error() {
error!(e; "Failed to handle request");
} else {
// Currently, we still print a debug log.
debug!("Failed to handle request, err: {}", e);
}
e
})
});
handle.await.context(JoinTaskSnafu)?
}

View File

@@ -24,7 +24,7 @@ use chrono::{NaiveDate, NaiveDateTime};
use common_catalog::parse_catalog_and_schema_from_db_string;
use common_error::ext::ErrorExt;
use common_query::Output;
use common_telemetry::{error, logging, warn};
use common_telemetry::{error, logging, tracing, warn};
use datatypes::prelude::ConcreteDataType;
use opensrv_mysql::{
AsyncMysqlShim, Column, ErrorKind, InitWriter, ParamParser, ParamValue, QueryResultWriter,
@@ -91,18 +91,14 @@ impl MysqlInstanceShim {
}
}
#[tracing::instrument(skip_all)]
async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
if let Some(output) =
crate::mysql::federated::check(query, query_ctx.clone(), self.session.clone())
{
vec![Ok(output)]
} else {
let trace_id = query_ctx.trace_id();
common_telemetry::TRACE_ID
.scope(trace_id, async move {
self.query_handler.do_query(query, query_ctx).await
})
.await
self.query_handler.do_query(query, query_ctx).await
}
}

View File

@@ -37,8 +37,6 @@ pub struct QueryContext {
current_user: ArcSwap<Option<UserInfoRef>>,
time_zone: Option<TimeZone>,
sql_dialect: Box<dyn Dialect + Send + Sync>,
trace_id: u64,
span_id: u64,
}
impl Display for QueryContext {
@@ -61,18 +59,6 @@ impl From<&RegionRequestHeader> for QueryContext {
current_user: Default::default(),
time_zone: Default::default(),
sql_dialect: Box::new(GreptimeDbDialect {}),
trace_id: value.trace_id,
span_id: value.span_id,
}
}
}
impl From<&QueryContext> for RegionRequestHeader {
fn from(value: &QueryContext) -> Self {
RegionRequestHeader {
trace_id: value.trace_id,
span_id: value.span_id,
dbname: value.get_db_string(),
}
}
}
@@ -142,16 +128,6 @@ impl QueryContext {
pub fn set_current_user(&self, user: Option<UserInfoRef>) {
let _ = self.current_user.swap(Arc::new(user));
}
#[inline]
pub fn trace_id(&self) -> u64 {
self.trace_id
}
#[inline]
pub fn span_id(&self) -> u64 {
self.span_id
}
}
impl QueryContextBuilder {
@@ -170,15 +146,8 @@ impl QueryContextBuilder {
sql_dialect: self
.sql_dialect
.unwrap_or_else(|| Box::new(GreptimeDbDialect {})),
trace_id: self.trace_id.unwrap_or_else(common_telemetry::gen_trace_id),
span_id: self.span_id.unwrap_or_default(),
})
}
pub fn try_trace_id(mut self, trace_id: Option<u64>) -> Self {
self.trace_id = trace_id;
self
}
}
#[derive(Debug)]

View File

@@ -21,7 +21,7 @@ mod writer;
use std::sync::Arc;
use common_telemetry::tracing::log::warn;
use common_telemetry::warn;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
pub use picker::{LeveledTimeWindowPicker, Picker, PickerContext};

View File

@@ -17,8 +17,7 @@ use std::fmt::{Debug, Formatter};
use std::marker::PhantomData;
use std::time::Duration;
use common_telemetry::tracing::log::warn;
use common_telemetry::{debug, error, info};
use common_telemetry::{debug, error, info, warn};
use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
use common_time::Timestamp;

View File

@@ -18,8 +18,7 @@ use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
use std::marker::PhantomData;
use common_telemetry::tracing::warn;
use common_telemetry::{debug, info};
use common_telemetry::{debug, info, warn};
use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
use common_time::Timestamp;

View File

@@ -677,7 +677,7 @@ enable = true
enable = true
[frontend.logging]
enable_jaeger_tracing = false
enable_otlp_tracing = false
[frontend.datanode.client]
timeout = "10s"
@@ -750,10 +750,10 @@ sst_write_buffer_size = "8MiB"
[datanode.region_engine.file]
[datanode.logging]
enable_jaeger_tracing = false
enable_otlp_tracing = false
[logging]
enable_jaeger_tracing = false"#,
enable_otlp_tracing = false"#,
store_type,
num_cpus::get() / 2
);