Compare commits

...

5 Commits

Author SHA1 Message Date
WU Jingdi
91820a8006 fix: empty by in range query (#2770)
* fix: empty by in range query

* Apply suggestions from code review

---------

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
2023-11-17 11:18:14 +00:00
WU Jingdi
500e299e40 feat: Enable distributed tracing in greptimedb (#2755)
* feat: implement distributed tracing

* fix: change usage of span

* fix: use otlp as exporter

* chore: update dependence

* chore: add span info

* chore: add alias

* chore: use instrument instead of trace
2023-11-17 08:51:57 +00:00
Ruihang Xia
ac4b6cd7f0 feat: write logical region to metric engine (#2759)
* transform write request

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add tests for put request

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* use table_id instead of metric_name

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* CR sugg.

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* define random state as const

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2023-11-17 07:44:11 +00:00
Weny Xu
3ab494764f feat: add migration start step (#2756)
* feat: add migration start state

* refactor: move PersistentContext and VolatileContext into Context

* chore: apply suggestions from CR
2023-11-17 07:05:04 +00:00
Lanqing Yang
5608035074 fix!: improve user experience on setting compression type (#2765)
* fixes: https://github.com/GreptimeTeam/greptimedb/issues/2758
Chore: improve user experience on setting compression type

* Update src/common/datasource/src/compression.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
2023-11-17 06:27:13 +00:00
79 changed files with 1671 additions and 549 deletions

180
Cargo.lock generated
View File

@@ -1940,18 +1940,18 @@ dependencies = [
"console-subscriber",
"lazy_static",
"once_cell",
"opentelemetry 0.17.0",
"opentelemetry-jaeger",
"opentelemetry 0.21.0 (registry+https://github.com/rust-lang/crates.io-index)",
"opentelemetry-otlp",
"opentelemetry-semantic-conventions",
"opentelemetry_sdk 0.21.0",
"parking_lot 0.12.1",
"prometheus",
"rand",
"rs-snowflake",
"serde",
"tokio",
"tracing",
"tracing-appender",
"tracing-futures",
"tracing-log",
"tracing-log 0.1.4",
"tracing-opentelemetry",
"tracing-subscriber",
]
@@ -3315,7 +3315,7 @@ dependencies = [
"moka",
"object-store",
"openmetrics-parser",
"opentelemetry-proto",
"opentelemetry-proto 0.3.0",
"operator",
"partition",
"prometheus",
@@ -3599,7 +3599,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=7eb2e78be7a104d2582fbea0bcb1e019407da702#7eb2e78be7a104d2582fbea0bcb1e019407da702"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=25429306d0379ad29211a062a81da2554a0208ab#25429306d0379ad29211a062a81da2554a0208ab"
dependencies = [
"prost 0.12.1",
"serde",
@@ -4772,6 +4772,7 @@ dependencies = [
name = "metric-engine"
version = "0.4.3"
dependencies = [
"ahash 0.8.6",
"api",
"async-trait",
"base64 0.21.5",
@@ -5418,23 +5419,18 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "opentelemetry"
version = "0.17.0"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6105e89802af13fdf48c49d7646d3b533a70e536d818aae7e78ba0433d01acb8"
checksum = "1e32339a5dc40459130b3bd269e9892439f55b33e772d2a9d402a789baaf4e8a"
dependencies = [
"async-trait",
"crossbeam-channel",
"futures-channel",
"futures-executor",
"futures-util",
"futures-core",
"futures-sink",
"indexmap 2.0.2",
"js-sys",
"lazy_static",
"percent-encoding",
"pin-project",
"rand",
"once_cell",
"pin-project-lite",
"thiserror",
"tokio",
"tokio-stream",
"urlencoding",
]
[[package]]
@@ -5453,18 +5449,22 @@ dependencies = [
]
[[package]]
name = "opentelemetry-jaeger"
version = "0.16.0"
name = "opentelemetry-otlp"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8c0b12cd9e3f9b35b52f6e0dac66866c519b26f424f4bbf96e3fe8bfbdc5229"
checksum = "f24cda83b20ed2433c68241f918d0f6fdec8b1d43b7a9590ab4420c5095ca930"
dependencies = [
"async-trait",
"lazy_static",
"opentelemetry 0.17.0",
"futures-core",
"http",
"opentelemetry 0.21.0 (registry+https://github.com/rust-lang/crates.io-index)",
"opentelemetry-proto 0.4.0",
"opentelemetry-semantic-conventions",
"opentelemetry_sdk 0.21.0",
"prost 0.11.9",
"thiserror",
"thrift 0.15.0",
"tokio",
"tonic 0.9.2",
]
[[package]]
@@ -5472,19 +5472,31 @@ name = "opentelemetry-proto"
version = "0.3.0"
source = "git+https://github.com/waynexia/opentelemetry-rust.git?rev=33841b38dda79b15f2024952be5f32533325ca02#33841b38dda79b15f2024952be5f32533325ca02"
dependencies = [
"opentelemetry 0.21.0",
"opentelemetry_sdk",
"opentelemetry 0.21.0 (git+https://github.com/waynexia/opentelemetry-rust.git?rev=33841b38dda79b15f2024952be5f32533325ca02)",
"opentelemetry_sdk 0.20.0",
"prost 0.12.1",
"tonic 0.10.2",
]
[[package]]
name = "opentelemetry-semantic-conventions"
version = "0.9.0"
name = "opentelemetry-proto"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "985cc35d832d412224b2cffe2f9194b1b89b6aa5d0bef76d080dce09d90e62bd"
checksum = "a2e155ce5cc812ea3d1dffbd1539aed653de4bf4882d60e6e04dcf0901d674e1"
dependencies = [
"opentelemetry 0.17.0",
"opentelemetry 0.21.0 (registry+https://github.com/rust-lang/crates.io-index)",
"opentelemetry_sdk 0.21.0",
"prost 0.11.9",
"tonic 0.9.2",
]
[[package]]
name = "opentelemetry-semantic-conventions"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f5774f1ef1f982ef2a447f6ee04ec383981a3ab99c8e77a1a7b30182e65bbc84"
dependencies = [
"opentelemetry 0.21.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -5499,13 +5511,35 @@ dependencies = [
"futures-util",
"glob",
"once_cell",
"opentelemetry 0.21.0",
"opentelemetry 0.21.0 (git+https://github.com/waynexia/opentelemetry-rust.git?rev=33841b38dda79b15f2024952be5f32533325ca02)",
"ordered-float 4.1.1",
"percent-encoding",
"rand",
"thiserror",
]
[[package]]
name = "opentelemetry_sdk"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5b3ce3f5705e2ae493be467a0b23be4bc563c193cdb7713e55372c89a906b34"
dependencies = [
"async-trait",
"crossbeam-channel",
"futures-channel",
"futures-executor",
"futures-util",
"glob",
"once_cell",
"opentelemetry 0.21.0 (registry+https://github.com/rust-lang/crates.io-index)",
"ordered-float 4.1.1",
"percent-encoding",
"rand",
"thiserror",
"tokio",
"tokio-stream",
]
[[package]]
name = "operator"
version = "0.4.3"
@@ -5588,15 +5622,6 @@ dependencies = [
"zstd 0.12.4",
]
[[package]]
name = "ordered-float"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3305af35278dd29f46fcdd139e0b1fbfae2153f0e5928b39b035542dd31e37b7"
dependencies = [
"num-traits",
]
[[package]]
name = "ordered-float"
version = "2.10.1"
@@ -5766,7 +5791,7 @@ dependencies = [
"paste",
"seq-macro",
"snap",
"thrift 0.17.0",
"thrift",
"tokio",
"twox-hash",
"zstd 0.12.4",
@@ -7211,12 +7236,6 @@ dependencies = [
"serde",
]
[[package]]
name = "rs-snowflake"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e60ef3b82994702bbe4e134d98aadca4b49ed04440148985678d415c68127666"
[[package]]
name = "rsa"
version = "0.6.1"
@@ -8262,7 +8281,7 @@ dependencies = [
"once_cell",
"openmetrics-parser",
"opensrv-mysql",
"opentelemetry-proto",
"opentelemetry-proto 0.3.0",
"parking_lot 0.12.1",
"pgwire",
"pin-project",
@@ -9306,7 +9325,7 @@ dependencies = [
"num_cpus",
"object-store",
"once_cell",
"opentelemetry-proto",
"opentelemetry-proto 0.3.0",
"operator",
"partition",
"paste",
@@ -9396,28 +9415,6 @@ dependencies = [
"once_cell",
]
[[package]]
name = "threadpool"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa"
dependencies = [
"num_cpus",
]
[[package]]
name = "thrift"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b82ca8f46f95b3ce96081fe3dd89160fdea970c254bb72925255d1b62aae692e"
dependencies = [
"byteorder",
"integer-encoding",
"log",
"ordered-float 1.1.1",
"threadpool",
]
[[package]]
name = "thrift"
version = "0.17.0"
@@ -9952,17 +9949,32 @@ dependencies = [
]
[[package]]
name = "tracing-opentelemetry"
version = "0.17.4"
name = "tracing-log"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbbe89715c1dbbb790059e2565353978564924ee85017b5fff365c872ff6721f"
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
dependencies = [
"log",
"once_cell",
"opentelemetry 0.17.0",
"tracing-core",
]
[[package]]
name = "tracing-opentelemetry"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c67ac25c5407e7b961fafc6f7e9aa5958fd297aada2d20fa2ae1737357e55596"
dependencies = [
"js-sys",
"once_cell",
"opentelemetry 0.21.0 (registry+https://github.com/rust-lang/crates.io-index)",
"opentelemetry_sdk 0.21.0",
"smallvec",
"tracing",
"tracing-core",
"tracing-log",
"tracing-log 0.2.0",
"tracing-subscriber",
"web-time",
]
[[package]]
@@ -9980,7 +9992,7 @@ dependencies = [
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
"tracing-log 0.1.4",
]
[[package]]
@@ -10544,6 +10556,16 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "web-time"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57099a701fb3a8043f993e8228dc24229c7b942e2b009a1b962e54489ba1d3bf"
dependencies = [
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "webpki"
version = "0.22.4"

View File

@@ -63,6 +63,7 @@ edition = "2021"
license = "Apache-2.0"
[workspace.dependencies]
ahash = { version = "0.8", features = ["compile-time-rng"] }
aquamarine = "0.3"
arrow = { version = "47.0" }
arrow-array = "47.0"
@@ -84,7 +85,7 @@ derive_builder = "0.12"
etcd-client = "0.12"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7eb2e78be7a104d2582fbea0bcb1e019407da702" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "25429306d0379ad29211a062a81da2554a0208ab" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"

View File

@@ -92,7 +92,7 @@ worker_request_batch_size = 64
# Number of meta action updated to trigger a new checkpoint for the manifest
manifest_checkpoint_distance = 10
# Manifest compression type
manifest_compress_type = "Uncompressed"
manifest_compress_type = "uncompressed"
# Max number of running background jobs
max_background_jobs = 4
# Interval to auto flush a region if it has not flushed yet.
@@ -109,9 +109,7 @@ vector_cache_size = "512MB"
sst_write_buffer_size = "8MB"
# Log options
# Log options, see `standalone.example.toml`
# [logging]
# Specify logs directory.
# dir = "/tmp/greptimedb/logs"
# Specify the log level [info | debug | error | warn]
# level = "info"

View File

@@ -158,3 +158,7 @@ global_write_buffer_size = "1GB"
# dir = "/tmp/greptimedb/logs"
# Specify the log level [info | debug | error | warn]
# level = "info"
# whether enable tracing, default is false
# enable_otlp_tracing = false
# tracing exporter endpoint with format `ip:port`, we use grpc oltp as exporter, default endpoint is `localhost:4317`
# otlp_endpoint = "localhost:4317"

View File

@@ -78,7 +78,7 @@ async fn run() {
let logical = mock_logical_plan();
event!(Level::INFO, "plan size: {:#?}", logical.len());
let result = db.logical_plan(logical, 0).await.unwrap();
let result = db.logical_plan(logical).await.unwrap();
event!(Level::INFO, "result: {:#?}", result);
}

View File

@@ -29,6 +29,7 @@ use common_query::Output;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::RecordBatchStreamAdaptor;
use common_telemetry::logging;
use common_telemetry::tracing_context::W3cTrace;
use futures_util::StreamExt;
use prost::Message;
use snafu::{ensure, ResultExt};
@@ -147,21 +148,21 @@ impl Database {
async fn handle(&self, request: Request) -> Result<u32> {
let mut client = self.client.make_database_client()?.inner;
let request = self.to_rpc_request(request, 0);
let request = self.to_rpc_request(request);
let response = client.handle(request).await?.into_inner();
from_grpc_response(response)
}
#[inline]
fn to_rpc_request(&self, request: Request, trace_id: u64) -> GreptimeRequest {
fn to_rpc_request(&self, request: Request) -> GreptimeRequest {
GreptimeRequest {
header: Some(RequestHeader {
catalog: self.catalog.clone(),
schema: self.schema.clone(),
authorization: self.ctx.auth_header.clone(),
dbname: self.dbname.clone(),
trace_id,
span_id: 0,
// TODO(Taylor-lagrange): add client grpc tracing
tracing_context: W3cTrace::new(),
}),
request: Some(request),
}
@@ -172,23 +173,17 @@ impl Database {
S: AsRef<str>,
{
let _timer = metrics::METRIC_GRPC_SQL.start_timer();
self.do_get(
Request::Query(QueryRequest {
query: Some(Query::Sql(sql.as_ref().to_string())),
}),
0,
)
self.do_get(Request::Query(QueryRequest {
query: Some(Query::Sql(sql.as_ref().to_string())),
}))
.await
}
pub async fn logical_plan(&self, logical_plan: Vec<u8>, trace_id: u64) -> Result<Output> {
pub async fn logical_plan(&self, logical_plan: Vec<u8>) -> Result<Output> {
let _timer = metrics::METRIC_GRPC_LOGICAL_PLAN.start_timer();
self.do_get(
Request::Query(QueryRequest {
query: Some(Query::LogicalPlan(logical_plan)),
}),
trace_id,
)
self.do_get(Request::Query(QueryRequest {
query: Some(Query::LogicalPlan(logical_plan)),
}))
.await
}
@@ -200,68 +195,53 @@ impl Database {
step: &str,
) -> Result<Output> {
let _timer = metrics::METRIC_GRPC_PROMQL_RANGE_QUERY.start_timer();
self.do_get(
Request::Query(QueryRequest {
query: Some(Query::PromRangeQuery(PromRangeQuery {
query: promql.to_string(),
start: start.to_string(),
end: end.to_string(),
step: step.to_string(),
})),
}),
0,
)
self.do_get(Request::Query(QueryRequest {
query: Some(Query::PromRangeQuery(PromRangeQuery {
query: promql.to_string(),
start: start.to_string(),
end: end.to_string(),
step: step.to_string(),
})),
}))
.await
}
pub async fn create(&self, expr: CreateTableExpr) -> Result<Output> {
let _timer = metrics::METRIC_GRPC_CREATE_TABLE.start_timer();
self.do_get(
Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(expr)),
}),
0,
)
self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(expr)),
}))
.await
}
pub async fn alter(&self, expr: AlterExpr) -> Result<Output> {
let _timer = metrics::METRIC_GRPC_ALTER.start_timer();
self.do_get(
Request::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(expr)),
}),
0,
)
self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(expr)),
}))
.await
}
pub async fn drop_table(&self, expr: DropTableExpr) -> Result<Output> {
let _timer = metrics::METRIC_GRPC_DROP_TABLE.start_timer();
self.do_get(
Request::Ddl(DdlRequest {
expr: Some(DdlExpr::DropTable(expr)),
}),
0,
)
self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::DropTable(expr)),
}))
.await
}
pub async fn truncate_table(&self, expr: TruncateTableExpr) -> Result<Output> {
let _timer = metrics::METRIC_GRPC_TRUNCATE_TABLE.start_timer();
self.do_get(
Request::Ddl(DdlRequest {
expr: Some(DdlExpr::TruncateTable(expr)),
}),
0,
)
self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::TruncateTable(expr)),
}))
.await
}
async fn do_get(&self, request: Request, trace_id: u64) -> Result<Output> {
async fn do_get(&self, request: Request) -> Result<Output> {
// FIXME(paomian): should be added some labels for metrics
let _timer = metrics::METRIC_GRPC_DO_GET.start_timer();
let request = self.to_rpc_request(request, trace_id);
let request = self.to_rpc_request(request);
let request = Ticket {
ticket: request.encode_to_vec().into(),
};

View File

@@ -208,7 +208,8 @@ async fn main() -> Result<()> {
};
common_telemetry::set_panic_hook();
let _guard = common_telemetry::init_global_logging(app_name, logging_opts, tracing_opts);
let _guard =
common_telemetry::init_global_logging(app_name, logging_opts, tracing_opts, opts.node_id());
// Report app version as gauge.
APP_VERSION

View File

@@ -176,7 +176,7 @@ impl Repl {
.encode(&plan)
.context(SubstraitEncodeLogicalPlanSnafu)?;
self.database.logical_plan(plan.to_vec(), 0).await
self.database.logical_plan(plan.to_vec()).await
} else {
self.database.sql(&sql).await
}

View File

@@ -133,6 +133,15 @@ impl Options {
Ok(opts)
}
pub fn node_id(&self) -> Option<String> {
match self {
Options::Metasrv(_) | Options::Cli(_) => None,
Options::Datanode(opt) => opt.node_id.map(|x| x.to_string()),
Options::Frontend(opt) => opt.node_id.clone(),
Options::Standalone(opt) => opt.frontend.node_id.clone(),
}
}
}
#[cfg(test)]

View File

@@ -26,7 +26,9 @@ use tokio::io::{AsyncRead, AsyncWriteExt, BufReader};
use tokio_util::io::{ReaderStream, StreamReader};
use crate::error::{self, Error, Result};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumIter, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum CompressionType {
/// Gzip-ed file
Gzip,

View File

@@ -15,6 +15,7 @@
use std::sync::Arc;
use api::v1::meta::Partition;
use common_telemetry::tracing_context::W3cTrace;
use store_api::storage::TableId;
use table::metadata::RawTableInfo;
@@ -34,6 +35,7 @@ pub mod utils;
#[derive(Debug, Default)]
pub struct ExecutorContext {
pub cluster_id: Option<u64>,
pub tracing_context: Option<W3cTrace>,
}
#[async_trait::async_trait]

View File

@@ -26,6 +26,7 @@ use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSn
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Status,
};
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{debug, info};
use futures::future;
use serde::{Deserialize, Serialize};
@@ -207,7 +208,7 @@ impl AlterTableProcedure {
let request = self.create_alter_region_request(region_id)?;
let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: common_telemetry::trace_id().unwrap_or_default(),
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::Alter(request)),

View File

@@ -21,6 +21,7 @@ use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_telemetry::info;
use common_telemetry::tracing_context::TracingContext;
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
@@ -199,7 +200,7 @@ impl CreateTableProcedure {
for request in requests {
let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: common_telemetry::trace_id().unwrap_or_default(),
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(request),

View File

@@ -22,6 +22,7 @@ use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
};
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{debug, info};
use futures::future::join_all;
use serde::{Deserialize, Serialize};
@@ -157,7 +158,7 @@ impl DropTableProcedure {
let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: common_telemetry::trace_id().unwrap_or_default(),
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::Drop(PbDropRegionRequest {

View File

@@ -21,6 +21,7 @@ use common_procedure::{
Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
};
use common_telemetry::debug;
use common_telemetry::tracing_context::TracingContext;
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
@@ -154,7 +155,7 @@ impl TruncateTableProcedure {
let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: common_telemetry::trace_id().unwrap_or_default(),
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::Truncate(PbTruncateRegionRequest {

View File

@@ -15,7 +15,8 @@
use std::sync::Arc;
use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
use common_telemetry::info;
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{info, tracing};
use snafu::{OptionExt, ResultExt};
use crate::cache_invalidator::CacheInvalidatorRef;
@@ -140,6 +141,7 @@ impl DdlManager {
})
}
#[tracing::instrument(skip_all)]
pub async fn submit_alter_table_task(
&self,
cluster_id: u64,
@@ -156,6 +158,7 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}
#[tracing::instrument(skip_all)]
pub async fn submit_create_table_task(
&self,
cluster_id: u64,
@@ -172,6 +175,7 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}
#[tracing::instrument(skip_all)]
pub async fn submit_drop_table_task(
&self,
cluster_id: u64,
@@ -194,6 +198,7 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}
#[tracing::instrument(skip_all)]
pub async fn submit_truncate_table_task(
&self,
cluster_id: u64,
@@ -383,21 +388,31 @@ impl DdlTaskExecutor for DdlManager {
ctx: &ExecutorContext,
request: SubmitDdlTaskRequest,
) -> Result<SubmitDdlTaskResponse> {
let cluster_id = ctx.cluster_id.unwrap_or_default();
info!("Submitting Ddl task: {:?}", request.task);
match request.task {
CreateTable(create_table_task) => {
handle_create_table_task(self, cluster_id, create_table_task).await
}
DropTable(drop_table_task) => {
handle_drop_table_task(self, cluster_id, drop_table_task).await
}
AlterTable(alter_table_task) => {
handle_alter_table_task(self, cluster_id, alter_table_task).await
}
TruncateTable(truncate_table_task) => {
handle_truncate_table_task(self, cluster_id, truncate_table_task).await
let span = ctx
.tracing_context
.as_ref()
.map(TracingContext::from_w3c)
.unwrap_or(TracingContext::from_current_span())
.attach(tracing::info_span!("DdlManager::submit_ddl_task"));
async move {
let cluster_id = ctx.cluster_id.unwrap_or_default();
info!("Submitting Ddl task: {:?}", request.task);
match request.task {
CreateTable(create_table_task) => {
handle_create_table_task(self, cluster_id, create_table_task).await
}
DropTable(drop_table_task) => {
handle_drop_table_task(self, cluster_id, drop_table_task).await
}
AlterTable(alter_table_task) => {
handle_alter_table_task(self, cluster_id, alter_table_task).await
}
TruncateTable(truncate_table_task) => {
handle_truncate_table_task(self, cluster_id, truncate_table_task).await
}
}
}
.trace(span)
.await
}
}

View File

@@ -23,7 +23,8 @@ use std::time::{Duration, Instant};
use async_trait::async_trait;
use backon::ExponentialBuilder;
use common_runtime::{RepeatedTask, TaskFunction};
use common_telemetry::{info, logging};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{info, logging, tracing};
use snafu::{ensure, ResultExt};
use tokio::sync::watch::{self, Receiver, Sender};
use tokio::sync::{Mutex as TokioMutex, Notify};
@@ -452,9 +453,19 @@ impl LocalManager {
DuplicateProcedureSnafu { procedure_id },
);
let tracing_context = TracingContext::from_current_span();
let _handle = common_runtime::spawn_bg(async move {
// Run the root procedure.
runner.run().await;
// The task was moved to another runtime for execution.
// In order not to interrupt tracing, a span needs to be created to continue tracing the current task.
runner
.run()
.trace(
tracing_context
.attach(tracing::info_span!("LocalManager::submit_root_procedure")),
)
.await;
});
Ok(watcher)

View File

@@ -14,20 +14,19 @@ common-error.workspace = true
console-subscriber = { version = "0.1", optional = true }
lazy_static.workspace = true
once_cell.workspace = true
opentelemetry = { version = "0.17", default-features = false, features = [
opentelemetry = { version = "0.21.0", default-features = false, features = [
"trace",
"rt-tokio",
] }
opentelemetry-jaeger = { version = "0.16", features = ["rt-tokio"] }
opentelemetry-otlp = { version = "0.14.0", features = ["tokio"] }
opentelemetry-semantic-conventions = "0.13.0"
opentelemetry_sdk = { version = "0.21.0", features = ["rt-tokio"] }
parking_lot = { version = "0.12" }
prometheus.workspace = true
rand.workspace = true
rs-snowflake = "0.6"
serde.workspace = true
tokio.workspace = true
tracing = "0.1"
tracing-appender = "0.2"
tracing-futures = { version = "0.2", features = ["futures-03"] }
tracing-log = "0.1"
tracing-opentelemetry = "0.17"
tracing-opentelemetry = "0.22.0"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View File

@@ -16,45 +16,9 @@ pub mod logging;
mod macros;
pub mod metric;
mod panic_hook;
pub mod tracing_context;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
pub use logging::{init_default_ut_logging, init_global_logging, trace_id, TRACE_ID};
pub use logging::{init_default_ut_logging, init_global_logging};
pub use metric::dump_metrics;
use once_cell::sync::OnceCell;
pub use panic_hook::set_panic_hook;
use parking_lot::Mutex;
use rand::random;
use snowflake::SnowflakeIdBucket;
pub use {common_error, tracing, tracing_appender, tracing_futures, tracing_subscriber};
static NODE_ID: OnceCell<u64> = OnceCell::new();
static TRACE_BUCKET: OnceCell<Mutex<SnowflakeIdBucket>> = OnceCell::new();
pub fn gen_trace_id() -> u64 {
let mut bucket = TRACE_BUCKET
.get_or_init(|| {
// if node_id is not initialized, how about random one?
let node_id = NODE_ID.get_or_init(|| 0);
info!("initializing bucket with node_id: {}", node_id);
let bucket = SnowflakeIdBucket::new(1, (*node_id) as i32);
Mutex::new(bucket)
})
.lock();
(*bucket).get_id() as u64
}
pub fn init_node_id(node_id: Option<String>) {
let node_id = node_id.map(|id| calculate_hash(&id)).unwrap_or(random());
match NODE_ID.set(node_id) {
Ok(_) => {}
Err(_) => warn!("node_id is already initialized"),
}
}
fn calculate_hash<T: Hash>(t: &T) -> u64 {
let mut s = DefaultHasher::new();
t.hash(&mut s);
s.finish()
}
pub use {common_error, tracing};

View File

@@ -17,10 +17,11 @@ use std::env;
use std::sync::{Arc, Mutex, Once};
use once_cell::sync::Lazy;
use opentelemetry::global;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry::{global, KeyValue};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_semantic_conventions::resource;
use serde::{Deserialize, Serialize};
pub use tracing::{event, span, Level};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_appender::rolling::{RollingFileAppender, Rotation};
use tracing_log::LogTracer;
@@ -29,38 +30,17 @@ use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::prelude::*;
use tracing_subscriber::{filter, EnvFilter, Registry};
pub use crate::{debug, error, info, log, trace, warn};
pub use crate::{debug, error, info, trace, warn};
tokio::task_local! {
/// Task local trace id. See [trace_id](crate::trace_id) for more details.
pub static TRACE_ID: u64;
}
/// Get current [TRACE_ID] from tokio [task_local](tokio::task_local) storage.
///
/// # Usage
/// To set current trace id, wrap your async code like this:
/// ```rust, no_run
/// common_telemetry::TRACE_ID
/// .scope(id, async move {
/// query_handler
/// .do_query(query, self.session.context())
/// .await
/// })
/// .await
/// ```
/// Then all functions called from this stack will be able to retrieve the trace id
/// via this method.
pub fn trace_id() -> Option<u64> {
TRACE_ID.try_with(|id| Some(*id)).unwrap_or(None)
}
const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct LoggingOptions {
pub dir: String,
pub level: Option<String>,
pub enable_jaeger_tracing: bool,
pub enable_otlp_tracing: bool,
pub otlp_endpoint: Option<String>,
}
impl Default for LoggingOptions {
@@ -68,7 +48,8 @@ impl Default for LoggingOptions {
Self {
dir: "/tmp/greptimedb/logs".to_string(),
level: None,
enable_jaeger_tracing: false,
enable_otlp_tracing: false,
otlp_endpoint: None,
}
}
}
@@ -106,9 +87,10 @@ pub fn init_default_ut_logging() {
"unittest",
&opts,
TracingOptions::default(),
None
));
info!("logs dir = {}", dir);
crate::info!("logs dir = {}", dir);
});
}
@@ -122,11 +104,12 @@ pub fn init_global_logging(
app_name: &str,
opts: &LoggingOptions,
tracing_opts: TracingOptions,
node_id: Option<String>,
) -> Vec<WorkerGuard> {
let mut guards = vec![];
let dir = &opts.dir;
let level = &opts.level;
let enable_jaeger_tracing = opts.enable_jaeger_tracing;
let enable_otlp_tracing = opts.enable_otlp_tracing;
// Enable log compatible layer to convert log record to tracing span.
LogTracer::init().expect("log tracer must be valid");
@@ -204,15 +187,34 @@ pub fn init_global_logging(
.with(file_logging_layer)
.with(err_file_logging_layer.with_filter(filter::LevelFilter::ERROR));
if enable_jaeger_tracing {
// Jaeger layer.
if enable_otlp_tracing {
global::set_text_map_propagator(TraceContextPropagator::new());
let tracer = opentelemetry_jaeger::new_pipeline()
.with_service_name(app_name)
.install_batch(opentelemetry::runtime::Tokio)
.expect("install");
let jaeger_layer = Some(tracing_opentelemetry::layer().with_tracer(tracer));
let subscriber = subscriber.with(jaeger_layer);
// otlp exporter
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter().tonic().with_endpoint(
opts.otlp_endpoint
.as_ref()
.map(|e| format!("http://{}", e))
.unwrap_or(DEFAULT_OTLP_ENDPOINT.to_string()),
),
)
.with_trace_config(opentelemetry_sdk::trace::config().with_resource(
opentelemetry_sdk::Resource::new(vec![
KeyValue::new(resource::SERVICE_NAME, app_name.to_string()),
KeyValue::new(
resource::SERVICE_INSTANCE_ID,
node_id.unwrap_or("none".to_string()),
),
KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
]),
))
.install_batch(opentelemetry_sdk::runtime::Tokio)
.expect("otlp tracer install failed");
let tracing_layer = Some(tracing_opentelemetry::layer().with_tracer(tracer));
let subscriber = subscriber.with(tracing_layer);
tracing::subscriber::set_global_default(subscriber)
.expect("error setting global tracing subscriber");
} else {

View File

@@ -17,14 +17,12 @@
macro_rules! log {
// log!(target: "my_target", Level::INFO, "a {} event", "log");
(target: $target:expr, $lvl:expr, $($arg:tt)+) => {{
let _trace_id = $crate::trace_id();
$crate::logging::event!(target: $target, $lvl, trace_id = _trace_id, $($arg)+)
$crate::tracing::event!(target: $target, $lvl, $($arg)+)
}};
// log!(Level::INFO, "a log event")
($lvl:expr, $($arg:tt)+) => {{
let _trace_id = $crate::trace_id();
$crate::logging::event!($lvl, trace_id = _trace_id, $($arg)+)
$crate::tracing::event!($lvl, $($arg)+)
}};
}
@@ -33,14 +31,14 @@ macro_rules! log {
macro_rules! error {
// error!(target: "my_target", "a {} event", "log")
(target: $target:expr, $($arg:tt)+) => ({
$crate::log!(target: $target, $crate::logging::Level::ERROR, $($arg)+)
$crate::log!(target: $target, $crate::tracing::Level::ERROR, $($arg)+)
});
// error!(e; target: "my_target", "a {} event", "log")
($e:expr; target: $target:expr, $($arg:tt)+) => ({
$crate::log!(
target: $target,
$crate::logging::Level::ERROR,
$crate::tracing::Level::ERROR,
err = ?$e,
$($arg)+
)
@@ -50,7 +48,7 @@ macro_rules! error {
(%$e:expr; target: $target:expr, $($arg:tt)+) => ({
$crate::log!(
target: $target,
$crate::logging::Level::ERROR,
$crate::tracing::Level::ERROR,
err = %$e,
$($arg)+
)
@@ -59,7 +57,7 @@ macro_rules! error {
// error!(e; "a {} event", "log")
($e:expr; $($arg:tt)+) => ({
$crate::log!(
$crate::logging::Level::ERROR,
$crate::tracing::Level::ERROR,
err = ?$e,
$($arg)+
)
@@ -68,7 +66,7 @@ macro_rules! error {
// error!(%e; "a {} event", "log")
(%$e:expr; $($arg:tt)+) => ({
$crate::log!(
$crate::logging::Level::ERROR,
$crate::tracing::Level::ERROR,
err = %$e,
$($arg)+
)
@@ -76,7 +74,7 @@ macro_rules! error {
// error!("a {} event", "log")
($($arg:tt)+) => ({
$crate::log!($crate::logging::Level::ERROR, $($arg)+)
$crate::log!($crate::tracing::Level::ERROR, $($arg)+)
});
}
@@ -85,13 +83,13 @@ macro_rules! error {
macro_rules! warn {
// warn!(target: "my_target", "a {} event", "log")
(target: $target:expr, $($arg:tt)+) => {
$crate::log!(target: $target, $crate::logging::Level::WARN, $($arg)+)
$crate::log!(target: $target, $crate::tracing::Level::WARN, $($arg)+)
};
// warn!(e; "a {} event", "log")
($e:expr; $($arg:tt)+) => ({
$crate::log!(
$crate::logging::Level::WARN,
$crate::tracing::Level::WARN,
err = ?$e,
$($arg)+
)
@@ -100,7 +98,7 @@ macro_rules! warn {
// warn!(%e; "a {} event", "log")
(%$e:expr; $($arg:tt)+) => ({
$crate::log!(
$crate::logging::Level::WARN,
$crate::tracing::Level::WARN,
err = %$e,
$($arg)+
)
@@ -108,7 +106,7 @@ macro_rules! warn {
// warn!("a {} event", "log")
($($arg:tt)+) => {
$crate::log!($crate::logging::Level::WARN, $($arg)+)
$crate::log!($crate::tracing::Level::WARN, $($arg)+)
};
}
@@ -117,12 +115,12 @@ macro_rules! warn {
macro_rules! info {
// info!(target: "my_target", "a {} event", "log")
(target: $target:expr, $($arg:tt)+) => {
$crate::log!(target: $target, $crate::logging::Level::INFO, $($arg)+)
$crate::log!(target: $target, $crate::tracing::Level::INFO, $($arg)+)
};
// info!("a {} event", "log")
($($arg:tt)+) => {
$crate::log!($crate::logging::Level::INFO, $($arg)+)
$crate::log!($crate::tracing::Level::INFO, $($arg)+)
};
}
@@ -131,12 +129,12 @@ macro_rules! info {
macro_rules! debug {
// debug!(target: "my_target", "a {} event", "log")
(target: $target:expr, $($arg:tt)+) => {
$crate::log!(target: $target, $crate::logging::Level::DEBUG, $($arg)+)
$crate::log!(target: $target, $crate::tracing::Level::DEBUG, $($arg)+)
};
// debug!("a {} event", "log")
($($arg:tt)+) => {
$crate::log!($crate::logging::Level::DEBUG, $($arg)+)
$crate::log!($crate::tracing::Level::DEBUG, $($arg)+)
};
}
@@ -145,12 +143,12 @@ macro_rules! debug {
macro_rules! trace {
// trace!(target: "my_target", "a {} event", "log")
(target: $target:expr, $($arg:tt)+) => {
$crate::log!(target: $target, $crate::logging::Level::TRACE, $($arg)+)
$crate::log!(target: $target, $crate::tracing::Level::TRACE, $($arg)+)
};
// trace!("a {} event", "log")
($($arg:tt)+) => {
$crate::log!($crate::logging::Level::TRACE, $($arg)+)
$crate::log!($crate::tracing::Level::TRACE, $($arg)+)
};
}
@@ -158,8 +156,7 @@ macro_rules! trace {
mod tests {
use common_error::mock::MockError;
use common_error::status_code::StatusCode;
use crate::logging::Level;
use tracing::Level;
macro_rules! all_log_macros {
($($arg:tt)*) => {

View File

@@ -0,0 +1,92 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! tracing stuffs, inspired by RisingWave
use std::collections::HashMap;
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use tracing_opentelemetry::OpenTelemetrySpanExt;
// An wapper for `Futures` that provides tracing instrument adapters.
pub trait FutureExt: std::future::Future + Sized {
fn trace(self, span: tracing::span::Span) -> tracing::instrument::Instrumented<Self>;
}
impl<T: std::future::Future> FutureExt for T {
#[inline]
fn trace(self, span: tracing::span::Span) -> tracing::instrument::Instrumented<Self> {
tracing::instrument::Instrument::instrument(self, span)
}
}
/// Context for tracing used for propagating tracing information in a distributed system.
///
/// Generally, the caller of a service should create a tracing context from the current tracing span
/// and pass it to the callee through the network. The callee will then attach its local tracing
/// span as a child of the tracing context, so that the external tracing service can associate them
/// in a single trace.
///
/// The tracing context must be serialized into the W3C trace context format and passed in rpc
/// message headers when communication of frontend, datanode and meta.
///
/// See [Trace Context](https://www.w3.org/TR/trace-context/) for more information.
#[derive(Debug, Clone)]
pub struct TracingContext(opentelemetry::Context);
pub type W3cTrace = HashMap<String, String>;
impl Default for TracingContext {
fn default() -> Self {
Self::new()
}
}
type Propagator = TraceContextPropagator;
impl TracingContext {
/// Create a new tracing context from a tracing span.
pub fn from_span(span: &tracing::Span) -> Self {
Self(span.context())
}
/// Create a new tracing context from the current tracing span considered by the subscriber.
pub fn from_current_span() -> Self {
Self::from_span(&tracing::Span::current())
}
/// Create a no-op tracing context.
pub fn new() -> Self {
Self(opentelemetry::Context::new())
}
/// Attach the given span as a child of the context. Returns the attached span.
pub fn attach(&self, span: tracing::Span) -> tracing::Span {
span.set_parent(self.0.clone());
span
}
/// Convert the tracing context to the W3C trace context format.
pub fn to_w3c(&self) -> W3cTrace {
let mut fields = HashMap::new();
Propagator::new().inject_context(&self.0, &mut fields);
fields
}
/// Create a new tracing context from the W3C trace context format.
pub fn from_w3c(fields: &W3cTrace) -> Self {
let context = Propagator::new().extract(fields);
Self(context)
}
}

View File

@@ -28,6 +28,8 @@ use common_query::physical_plan::DfPhysicalPlanAdapter;
use common_query::{DfPhysicalPlan, Output};
use common_recordbatch::SendableRecordBatchStream;
use common_runtime::Runtime;
use common_telemetry::tracing::{self, info_span};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{info, warn};
use dashmap::DashMap;
use datafusion::catalog::schema::SchemaProvider;
@@ -99,6 +101,7 @@ impl RegionServer {
self.inner.handle_request(region_id, request).await
}
#[tracing::instrument(skip_all)]
pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
self.inner.handle_read(request).await
}
@@ -154,9 +157,19 @@ impl RegionServerHandler for RegionServer {
.context(BuildRegionRequestsSnafu)
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)?;
let tracing_context = TracingContext::from_current_span();
let join_tasks = requests.into_iter().map(|(region_id, req)| {
let self_to_move = self.clone();
async move { self_to_move.handle_request(region_id, req).await }
let span = tracing_context.attach(info_span!(
"RegionServer::handle_region_request",
region_id = region_id.to_string()
));
async move {
self_to_move
.handle_request(region_id, req)
.trace(span)
.await
}
});
let results = try_join_all(join_tasks)
@@ -198,15 +211,18 @@ impl FlightCraft for RegionServer {
let ticket = request.into_inner().ticket;
let request = QueryRequest::decode(ticket.as_ref())
.context(servers_error::InvalidFlightTicketSnafu)?;
let trace_id = request
let tracing_context = request
.header
.as_ref()
.map(|h| h.trace_id)
.map(|h| TracingContext::from_w3c(&h.tracing_context))
.unwrap_or_default();
let result = self.handle_read(request).await?;
let result = self
.handle_read(request)
.trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
.await?;
let stream = Box::pin(FlightRecordBatchStream::new(result, trace_id));
let stream = Box::pin(FlightRecordBatchStream::new(result, tracing_context));
Ok(Response::new(stream))
}
}
@@ -283,6 +299,10 @@ impl RegionServerInner {
let result = engine
.handle_request(region_id, request)
.trace(info_span!(
"RegionEngine::handle_region_request",
engine_type
))
.await
.with_context(|_| HandleRegionRequestSnafu { region_id })?;

View File

@@ -207,8 +207,6 @@ impl Instance {
Arc::new(handlers_executor),
));
common_telemetry::init_node_id(opts.node_id.clone());
Ok(Instance {
catalog_manager,
script_executor,

View File

@@ -27,6 +27,8 @@ use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::sequence::{Sequence, SequenceRef};
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::tracing;
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use datanode::region_server::RegionServer;
use servers::grpc::region_server::RegionServerHandler;
use snafu::{OptionExt, ResultExt};
@@ -71,8 +73,15 @@ impl RegionInvoker {
#[async_trait]
impl Datanode for RegionInvoker {
async fn handle(&self, request: RegionRequest) -> MetaResult<AffectedRows> {
let span = request
.header
.as_ref()
.map(|h| TracingContext::from_w3c(&h.tracing_context))
.unwrap_or_default()
.attach(tracing::info_span!("RegionInvoker::handle_region_request"));
let response = self
.handle_inner(request)
.trace(span)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
@@ -83,8 +92,15 @@ impl Datanode for RegionInvoker {
}
async fn handle_query(&self, request: QueryRequest) -> MetaResult<SendableRecordBatchStream> {
let span = request
.header
.as_ref()
.map(|h| TracingContext::from_w3c(&h.tracing_context))
.unwrap_or_default()
.attach(tracing::info_span!("RegionInvoker::handle_query"));
self.region_server
.handle_read(request)
.trace(span)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)

View File

@@ -19,6 +19,7 @@ use api::v1::meta::heartbeat_client::HeartbeatClient;
use api::v1::meta::{AskLeaderRequest, RequestHeader, Role};
use common_grpc::channel_manager::ChannelManager;
use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::warn;
use rand::seq::SliceRandom;
use snafu::{OptionExt, ResultExt};
@@ -77,7 +78,11 @@ impl AskLeader {
peers.shuffle(&mut rand::thread_rng());
let req = AskLeaderRequest {
header: Some(RequestHeader::new(self.id, self.role)),
header: Some(RequestHeader::new(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
)),
};
let (tx, mut rx) = tokio::sync::mpsc::channel(peers.len());

View File

@@ -17,6 +17,7 @@ use std::sync::Arc;
use api::v1::meta::ddl_task_client::DdlTaskClient;
use api::v1::meta::{ErrorCode, ResponseHeader, Role, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_grpc::channel_manager::ChannelManager;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{info, warn};
use snafu::{ensure, ResultExt};
use tokio::sync::RwLock;
@@ -133,7 +134,11 @@ impl Inner {
}
);
req.set_header(self.id, self.role);
req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
let ask_leader = self.ask_leader.as_ref().unwrap();
let mut times = 0;

View File

@@ -19,6 +19,7 @@ use api::v1::meta::{HeartbeatRequest, HeartbeatResponse, RequestHeader, Role};
use common_grpc::channel_manager::ChannelManager;
use common_meta::rpc::util;
use common_telemetry::info;
use common_telemetry::tracing_context::TracingContext;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::{mpsc, RwLock};
use tokio_stream::wrappers::ReceiverStream;
@@ -49,7 +50,11 @@ impl HeartbeatSender {
#[inline]
pub async fn send(&self, mut req: HeartbeatRequest) -> Result<()> {
req.set_header(self.id, self.role);
req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
self.sender.send(req).await.map_err(|e| {
error::SendHeartbeatSnafu {
err_msg: e.to_string(),
@@ -207,7 +212,11 @@ impl Inner {
let (sender, receiver) = mpsc::channel::<HeartbeatRequest>(128);
let header = RequestHeader::new(self.id, self.role);
let header = RequestHeader::new(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
let handshake = HeartbeatRequest {
header: Some(header),
..Default::default()

View File

@@ -18,6 +18,7 @@ use std::sync::Arc;
use api::v1::meta::lock_client::LockClient;
use api::v1::meta::{LockRequest, LockResponse, Role, UnlockRequest, UnlockResponse};
use common_grpc::channel_manager::ChannelManager;
use common_telemetry::tracing_context::TracingContext;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::RwLock;
use tonic::transport::Channel;
@@ -127,7 +128,11 @@ impl Inner {
async fn lock(&self, mut req: LockRequest) -> Result<LockResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
let res = client.lock(req).await.map_err(error::Error::from)?;
Ok(res.into_inner())
@@ -135,7 +140,11 @@ impl Inner {
async fn unlock(&self, mut req: UnlockRequest) -> Result<UnlockResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
let res = client.unlock(req).await.map_err(error::Error::from)?;
Ok(res.into_inner())

View File

@@ -22,6 +22,7 @@ use api::v1::meta::{
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, Role,
};
use common_grpc::channel_manager::ChannelManager;
use common_telemetry::tracing_context::TracingContext;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::RwLock;
use tonic::transport::Channel;
@@ -134,7 +135,11 @@ impl Inner {
async fn range(&self, mut req: RangeRequest) -> Result<RangeResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
let res = client.range(req).await.map_err(error::Error::from)?;
Ok(res.into_inner())
@@ -142,7 +147,11 @@ impl Inner {
async fn put(&self, mut req: PutRequest) -> Result<PutResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
let res = client.put(req).await.map_err(error::Error::from)?;
Ok(res.into_inner())
@@ -150,7 +159,11 @@ impl Inner {
async fn batch_get(&self, mut req: BatchGetRequest) -> Result<BatchGetResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
let res = client.batch_get(req).await.map_err(error::Error::from)?;
@@ -159,7 +172,11 @@ impl Inner {
async fn batch_put(&self, mut req: BatchPutRequest) -> Result<BatchPutResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
let res = client.batch_put(req).await.map_err(error::Error::from)?;
Ok(res.into_inner())
@@ -167,7 +184,11 @@ impl Inner {
async fn batch_delete(&self, mut req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
let res = client.batch_delete(req).await.map_err(error::Error::from)?;
Ok(res.into_inner())
@@ -178,7 +199,11 @@ impl Inner {
mut req: CompareAndPutRequest,
) -> Result<CompareAndPutResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
let res = client
.compare_and_put(req)
.await
@@ -189,7 +214,11 @@ impl Inner {
async fn delete_range(&self, mut req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
let res = client.delete_range(req).await.map_err(error::Error::from)?;
Ok(res.into_inner())

View File

@@ -551,6 +551,13 @@ pub enum Error {
},
}
impl Error {
/// Returns `true` if the error is retryable.
pub fn is_retryable(&self) -> bool {
matches!(self, Error::RetryLater { .. })
}
}
pub type Result<T> = std::result::Result<T, Error>;
define_into_tonic_status!(Error);

View File

@@ -53,6 +53,7 @@ mod tests {
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::sequence::Sequence;
use common_telemetry::tracing_context::W3cTrace;
use super::*;
use crate::cluster::MetaPeerClientBuilder;
@@ -89,7 +90,7 @@ mod tests {
};
let req = HeartbeatRequest {
header: Some(RequestHeader::new((1, 2), Role::Datanode)),
header: Some(RequestHeader::new((1, 2), Role::Datanode, W3cTrace::new())),
..Default::default()
};
let mut acc = HeartbeatAccumulator::default();

View File

@@ -12,12 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod downgrade_leader_region;
pub(crate) mod migration_end;
pub(crate) mod migration_start;
pub(crate) mod open_candidate_region;
#[cfg(test)]
pub(crate) mod test_util;
use std::any::Any;
use std::fmt::Debug;
use common_meta::key::TableMetadataManagerRef;
use common_meta::peer::Peer;
use common_meta::ClusterId;
use common_procedure::error::{
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
};
@@ -37,10 +44,12 @@ use crate::procedure::utils::region_lock_key;
/// **Notes: Stores with too large data in the context might incur replication overhead.**
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PersistentContext {
/// The Id of the cluster.
cluster_id: ClusterId,
/// The [Peer] of migration source.
from_peer: Peer,
/// The [Peer] of migration destination.
to_peer: Option<Peer>,
to_peer: Peer,
/// The [RegionId] of migration region.
region_id: RegionId,
}
@@ -59,39 +68,75 @@ impl PersistentContext {
#[derive(Debug, Clone, Default)]
pub struct VolatileContext {}
/// Used to generate new [Context].
pub trait ContextFactory {
fn new_context(self, persistent_ctx: PersistentContext) -> Context;
}
/// Default implementation.
pub struct ContextFactoryImpl {
volatile_ctx: VolatileContext,
table_metadata_manager: TableMetadataManagerRef,
}
impl ContextFactory for ContextFactoryImpl {
fn new_context(self, persistent_ctx: PersistentContext) -> Context {
Context {
persistent_ctx,
volatile_ctx: self.volatile_ctx,
table_metadata_manager: self.table_metadata_manager,
}
}
}
// TODO(weny): remove it.
#[allow(dead_code)]
/// The context of procedure execution.
#[derive(Debug, Clone)]
pub struct Context {}
pub struct Context {
persistent_ctx: PersistentContext,
volatile_ctx: VolatileContext,
table_metadata_manager: TableMetadataManagerRef,
}
impl Context {
/// Returns address of meta server.
pub fn server_addr(&self) -> &str {
todo!()
}
}
#[async_trait::async_trait]
#[typetag::serde(tag = "region_migration_state")]
trait State: Sync + Send + Debug {
/// Yields the next state.
async fn next(
&mut self,
ctx: &Context,
pc: &mut PersistentContext,
vc: &mut VolatileContext,
) -> Result<Box<dyn State>>;
async fn next(&mut self, ctx: &mut Context) -> Result<Box<dyn State>>;
/// Indicates the procedure execution status of the `State`.
fn status(&self) -> Status {
Status::Executing { persist: true }
}
/// Returns as [Any](std::any::Any).
fn as_any(&self) -> &dyn Any;
}
/// Persistent data of [RegionMigrationProcedure].
#[derive(Debug, Serialize, Deserialize)]
pub struct RegionMigrationData {
context: PersistentContext,
pub struct RegionMigrationDataOwned {
persistent_ctx: PersistentContext,
state: Box<dyn State>,
}
#[derive(Debug)]
/// Persistent data of [RegionMigrationProcedure].
#[derive(Debug, Serialize)]
pub struct RegionMigrationData<'a> {
persistent_ctx: &'a PersistentContext,
state: &'a dyn State,
}
pub struct RegionMigrationProcedure {
data: RegionMigrationData,
state: Box<dyn State>,
context: Context,
volatile_context: VolatileContext,
}
// TODO(weny): remove it.
@@ -99,34 +144,34 @@ pub struct RegionMigrationProcedure {
impl RegionMigrationProcedure {
const TYPE_NAME: &str = "metasrv-procedure::RegionMigration";
pub fn new(persistent_context: PersistentContext, context: Context) -> Self {
pub fn new(
persistent_context: PersistentContext,
context_factory: impl ContextFactory,
) -> Self {
let state = Box::new(RegionMigrationStart {});
Self::new_inner(state, persistent_context, context)
Self::new_inner(state, persistent_context, context_factory)
}
fn new_inner(
state: Box<dyn State>,
persistent_context: PersistentContext,
context: Context,
context_factory: impl ContextFactory,
) -> Self {
Self {
data: RegionMigrationData {
context: persistent_context,
state,
},
context,
volatile_context: VolatileContext::default(),
state,
context: context_factory.new_context(persistent_context),
}
}
fn from_json(json: &str, context: Context) -> ProcedureResult<Self> {
let data: RegionMigrationData = serde_json::from_str(json).context(FromJsonSnafu)?;
fn from_json(json: &str, context_factory: impl ContextFactory) -> ProcedureResult<Self> {
let RegionMigrationDataOwned {
persistent_ctx,
state,
} = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(Self {
data,
context,
volatile_context: VolatileContext::default(),
})
let context = context_factory.new_context(persistent_ctx);
Ok(Self { state, context })
}
}
@@ -137,69 +182,56 @@ impl Procedure for RegionMigrationProcedure {
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let data = &mut self.data;
let state = &mut data.state;
let persistent_context = &mut data.context;
let volatile_context = &mut self.volatile_context;
let state = &mut self.state;
*state = state
.next(&self.context, persistent_context, volatile_context)
.await
.map_err(|e| {
if matches!(e, Error::RetryLater { .. }) {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
}
})?;
*state = state.next(&mut self.context).await.map_err(|e| {
if matches!(e, Error::RetryLater { .. }) {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
}
})?;
Ok(state.status())
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
let data = RegionMigrationData {
state: self.state.as_ref(),
persistent_ctx: &self.context.persistent_ctx,
};
serde_json::to_string(&data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
LockKey::single(self.data.context.lock_key())
let key = self.context.persistent_ctx.lock_key();
LockKey::single(key)
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;
use common_procedure::ProcedureId;
use common_procedure_test::MockContextProvider;
use super::migration_end::RegionMigrationEnd;
use super::*;
use crate::procedure::region_migration::test_util::TestingEnv;
fn persistent_context_factory() -> PersistentContext {
fn new_persistent_context() -> PersistentContext {
PersistentContext {
from_peer: Peer::empty(1),
to_peer: None,
to_peer: Peer::empty(2),
region_id: RegionId::new(1024, 1),
}
}
fn context_factory() -> Context {
Context {}
}
fn procedure_context_factory() -> ProcedureContext {
ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
cluster_id: 0,
}
}
#[test]
fn test_lock_key() {
let persistent_context = persistent_context_factory();
let persistent_context = new_persistent_context();
let expected_key = persistent_context.lock_key();
let context = context_factory();
let env = TestingEnv::new();
let context = env.context_factory();
let procedure = RegionMigrationProcedure::new(persistent_context, context);
@@ -211,72 +243,75 @@ mod tests {
#[test]
fn test_data_serialization() {
let persistent_context = persistent_context_factory();
let persistent_context = new_persistent_context();
let context = context_factory();
let env = TestingEnv::new();
let context = env.context_factory();
let procedure = RegionMigrationProcedure::new(persistent_context, context);
let serialized = procedure.dump().unwrap();
let expected = r#"{"context":{"from_peer":{"id":1,"addr":""},"to_peer":null,"region_id":4398046511105},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
let expected = r#"{"persistent_ctx":{"cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
assert_eq!(expected, serialized);
}
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct MockState {
count: usize,
}
pub struct MockState;
#[async_trait::async_trait]
#[typetag::serde]
impl State for MockState {
async fn next(
&mut self,
_: &Context,
_: &mut PersistentContext,
_: &mut VolatileContext,
) -> Result<Box<dyn State>> {
if self.count == 2 {
async fn next(&mut self, ctx: &mut Context) -> Result<Box<dyn State>> {
let pc = &mut ctx.persistent_ctx;
if pc.cluster_id == 2 {
Ok(Box::new(RegionMigrationEnd))
} else {
Ok(Box::new(MockState {
count: self.count + 1,
}))
pc.cluster_id += 1;
Ok(Box::new(MockState))
}
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[tokio::test]
async fn test_execution_after_deserialized() {
fn new_mock_procedure() -> RegionMigrationProcedure {
let persistent_context = persistent_context_factory();
let context = context_factory();
let env = TestingEnv::new();
fn new_mock_procedure(env: &TestingEnv) -> RegionMigrationProcedure {
let persistent_context = new_persistent_context();
let context_factory = env.context_factory();
let state = Box::<MockState>::default();
RegionMigrationProcedure::new_inner(state, persistent_context, context)
RegionMigrationProcedure::new_inner(state, persistent_context, context_factory)
}
let ctx = procedure_context_factory();
let mut procedure = new_mock_procedure();
let ctx = TestingEnv::procedure_context();
let mut procedure = new_mock_procedure(&env);
let mut status = None;
for _ in 0..3 {
status = Some(procedure.execute(&ctx).await.unwrap());
}
assert_matches!(status.unwrap(), Status::Done);
let ctx = procedure_context_factory();
let mut procedure = new_mock_procedure();
let ctx = TestingEnv::procedure_context();
let mut procedure = new_mock_procedure(&env);
status = Some(procedure.execute(&ctx).await.unwrap());
let serialized = procedure.dump().unwrap();
let context = context_factory();
let mut procedure = RegionMigrationProcedure::from_json(&serialized, context).unwrap();
let context_factory = env.context_factory();
let mut procedure =
RegionMigrationProcedure::from_json(&serialized, context_factory).unwrap();
for _ in 1..3 {
status = Some(procedure.execute(&ctx).await.unwrap());
}
assert_eq!(procedure.context.persistent_ctx.cluster_id, 2);
assert_matches!(status.unwrap(), Status::Done);
}
}

View File

@@ -0,0 +1,35 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use serde::{Deserialize, Serialize};
use crate::error::Result;
use crate::procedure::region_migration::{Context, State};
#[derive(Debug, Serialize, Deserialize)]
pub struct DowngradeLeaderRegion;
#[async_trait::async_trait]
#[typetag::serde]
impl State for DowngradeLeaderRegion {
async fn next(&mut self, _ctx: &mut Context) -> Result<Box<dyn State>> {
todo!()
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -12,11 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_procedure::Status;
use serde::{Deserialize, Serialize};
use crate::error::Result;
use crate::procedure::region_migration::{Context, PersistentContext, State, VolatileContext};
use crate::procedure::region_migration::{Context, State};
#[derive(Debug, Serialize, Deserialize)]
pub struct RegionMigrationEnd;
@@ -24,16 +26,15 @@ pub struct RegionMigrationEnd;
#[async_trait::async_trait]
#[typetag::serde]
impl State for RegionMigrationEnd {
async fn next(
&mut self,
_: &Context,
_: &mut PersistentContext,
_: &mut VolatileContext,
) -> Result<Box<dyn State>> {
async fn next(&mut self, _: &mut Context) -> Result<Box<dyn State>> {
Ok(Box::new(RegionMigrationEnd))
}
fn status(&self) -> Status {
Status::Done
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -12,24 +12,289 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use serde::{Deserialize, Serialize};
use std::any::Any;
use crate::error::Result;
use crate::procedure::region_migration::{Context, PersistentContext, State, VolatileContext};
use common_meta::peer::Peer;
use common_meta::rpc::router::RegionRoute;
use serde::{Deserialize, Serialize};
use snafu::{location, Location, OptionExt, ResultExt};
use store_api::storage::RegionId;
use super::downgrade_leader_region::DowngradeLeaderRegion;
use super::migration_end::RegionMigrationEnd;
use super::open_candidate_region::OpenCandidateRegion;
use crate::error::{self, Result};
use crate::procedure::region_migration::{Context, State};
#[derive(Debug, Serialize, Deserialize)]
pub struct RegionMigrationStart {}
pub struct RegionMigrationStart;
#[async_trait::async_trait]
#[typetag::serde]
impl State for RegionMigrationStart {
async fn next(
&mut self,
_ctx: &Context,
_pc: &mut PersistentContext,
_vc: &mut VolatileContext,
) -> Result<Box<dyn State>> {
// TODO(weny): It will be added in the following PRs.
todo!()
/// Yields next [State].
///
/// If the expected leader region has been opened on `to_peer`, go to the MigrationEnd state.
///
/// If the candidate region has been opened on `to_peer`, go to the DowngradeLeader state.
///
/// Otherwise go to the OpenCandidateRegion state.
async fn next(&mut self, ctx: &mut Context) -> Result<Box<dyn State>> {
let region_id = ctx.persistent_ctx.region_id;
let to_peer = &ctx.persistent_ctx.to_peer;
let region_route = self.retrieve_region_route(ctx, region_id).await?;
if self.check_leader_region_on_peer(&region_route, to_peer)? {
Ok(Box::new(RegionMigrationEnd))
} else if self.check_candidate_region_on_peer(&region_route, to_peer) {
Ok(Box::new(DowngradeLeaderRegion))
} else {
Ok(Box::new(OpenCandidateRegion))
}
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl RegionMigrationStart {
/// Retrieves region route.
///
/// Abort(non-retry):
/// - TableRoute is not found.
/// - RegionRoute is not found.
///
/// Retry:
/// - Failed to retrieve the metadata of table.
async fn retrieve_region_route(
&self,
ctx: &Context,
region_id: RegionId,
) -> Result<RegionRoute> {
let table_id = region_id.table_id();
let table_route = ctx
.table_metadata_manager
.table_route_manager()
.get(table_id)
.await
.context(error::TableMetadataManagerSnafu)
.map_err(|e| error::Error::RetryLater {
reason: e.to_string(),
location: location!(),
})?
.context(error::TableRouteNotFoundSnafu { table_id })?;
let region_route = table_route
.region_routes
.iter()
.find(|route| route.region.id == region_id)
.cloned()
.context(error::UnexpectedSnafu {
violated: format!(
"RegionRoute({}) is not found in TableRoute({})",
region_id, table_id
),
})?;
Ok(region_route)
}
/// Checks whether the candidate region on region has been opened.
/// Returns true if it's been opened.
fn check_candidate_region_on_peer(&self, region_route: &RegionRoute, to_peer: &Peer) -> bool {
let region_opened = region_route
.follower_peers
.iter()
.any(|peer| peer.id == to_peer.id);
region_opened
}
/// Checks whether the leader region on region has been opened.
/// Returns true if it's been opened.
///
/// Abort(non-retry):
/// - Leader peer of RegionRoute is not found.
fn check_leader_region_on_peer(
&self,
region_route: &RegionRoute,
to_peer: &Peer,
) -> Result<bool> {
let region_id = region_route.region.id;
let region_opened = region_route
.leader_peer
.as_ref()
.context(error::UnexpectedSnafu {
violated: format!("Leader peer is not found in TableRoute({})", region_id),
})?
.id
== to_peer.id;
Ok(region_opened)
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use store_api::storage::RegionId;
use super::*;
use crate::error::Error;
use crate::procedure::region_migration::test_util::TestingEnv;
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
fn new_persistent_context() -> PersistentContext {
PersistentContext {
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
region_id: RegionId::new(1024, 1),
cluster_id: 0,
}
}
#[tokio::test]
async fn test_table_route_is_not_found_error() {
let state = RegionMigrationStart;
let env = TestingEnv::new();
let persistent_context = new_persistent_context();
let ctx = env.context_factory().new_context(persistent_context);
let err = state
.retrieve_region_route(&ctx, RegionId::new(1024, 1))
.await
.unwrap_err();
assert_matches!(err, Error::TableRouteNotFound { .. });
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_region_route_is_not_found_error() {
let state = RegionMigrationStart;
let persistent_context = new_persistent_context();
let from_peer = persistent_context.from_peer.clone();
let env = TestingEnv::new();
let ctx = env.context_factory().new_context(persistent_context);
let table_info = new_test_table_info(1024, vec![1]).into();
let region_route = RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(from_peer.clone()),
..Default::default()
};
env.table_metadata_manager()
.create_table_metadata(table_info, vec![region_route])
.await
.unwrap();
let err = state
.retrieve_region_route(&ctx, RegionId::new(1024, 3))
.await
.unwrap_err();
assert_matches!(err, Error::Unexpected { .. });
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_next_downgrade_leader_region_state() {
let mut state = Box::new(RegionMigrationStart);
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let to_peer = persistent_context.to_peer.clone();
let region_id = persistent_context.region_id;
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(Peer::empty(3)),
follower_peers: vec![to_peer],
..Default::default()
}];
env.table_metadata_manager()
.create_table_metadata(table_info, region_routes)
.await
.unwrap();
let next = state.next(&mut ctx).await.unwrap();
let _ = next
.as_any()
.downcast_ref::<DowngradeLeaderRegion>()
.unwrap();
}
#[tokio::test]
async fn test_next_migration_end_state() {
let mut state = Box::new(RegionMigrationStart);
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let to_peer = persistent_context.to_peer.clone();
let from_peer = persistent_context.from_peer.clone();
let region_id = persistent_context.region_id;
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(to_peer),
follower_peers: vec![from_peer],
..Default::default()
}];
env.table_metadata_manager()
.create_table_metadata(table_info, region_routes)
.await
.unwrap();
let next = state.next(&mut ctx).await.unwrap();
let _ = next.as_any().downcast_ref::<RegionMigrationEnd>().unwrap();
}
#[tokio::test]
async fn test_next_open_candidate_region_state() {
let mut state = Box::new(RegionMigrationStart);
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let region_id = persistent_context.region_id;
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(Peer::empty(3)),
..Default::default()
}];
env.table_metadata_manager()
.create_table_metadata(table_info, region_routes)
.await
.unwrap();
let next = state.next(&mut ctx).await.unwrap();
let _ = next.as_any().downcast_ref::<OpenCandidateRegion>().unwrap();
}
}

View File

@@ -0,0 +1,35 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use serde::{Deserialize, Serialize};
use crate::error::Result;
use crate::procedure::region_migration::{Context, State};
#[derive(Debug, Serialize, Deserialize)]
pub struct OpenCandidateRegion;
#[async_trait::async_trait]
#[typetag::serde]
impl State for OpenCandidateRegion {
async fn next(&mut self, _ctx: &mut Context) -> Result<Box<dyn State>> {
todo!()
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -0,0 +1,59 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_procedure::{Context as ProcedureContext, ProcedureId};
use common_procedure_test::MockContextProvider;
use super::ContextFactoryImpl;
/// `TestingEnv` provides components during the tests.
pub struct TestingEnv {
table_metadata_manager: TableMetadataManagerRef,
}
impl TestingEnv {
/// Returns an empty [TestingEnv].
pub fn new() -> Self {
let kv_backend = Arc::new(MemoryKvBackend::new());
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
Self {
table_metadata_manager,
}
}
/// Returns a context of region migration procedure.
pub fn context_factory(&self) -> ContextFactoryImpl {
ContextFactoryImpl {
table_metadata_manager: self.table_metadata_manager.clone(),
volatile_ctx: Default::default(),
}
}
pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
&self.table_metadata_manager
}
/// Returns a [ProcedureContext] with a random [ProcedureId] and a [MockContextProvider].
pub fn procedure_context() -> ProcedureContext {
ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
}
}
}

View File

@@ -45,6 +45,7 @@ impl ddl_task_server::DdlTask for MetaSrv {
.submit_ddl_task(
&ExecutorContext {
cluster_id: Some(cluster_id),
tracing_context: Some(header.tracing_context),
},
SubmitDdlTaskRequest { task },
)

View File

@@ -168,6 +168,7 @@ mod tests {
use api::v1::meta::heartbeat_server::Heartbeat;
use api::v1::meta::*;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_telemetry::tracing_context::W3cTrace;
use tonic::IntoRequest;
use super::get_node_id;
@@ -184,7 +185,7 @@ mod tests {
.unwrap();
let req = AskLeaderRequest {
header: Some(RequestHeader::new((1, 1), Role::Datanode)),
header: Some(RequestHeader::new((1, 1), Role::Datanode, W3cTrace::new())),
};
let res = meta_srv.ask_leader(req.into_request()).await.unwrap();

View File

@@ -257,6 +257,7 @@ mod tests {
use api::v1::meta::store_server::Store;
use api::v1::meta::*;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_telemetry::tracing_context::W3cTrace;
use tonic::IntoRequest;
use crate::metasrv::builder::MetaSrvBuilder;
@@ -275,7 +276,7 @@ mod tests {
let meta_srv = new_meta_srv().await;
let mut req = RangeRequest::default();
req.set_header((1, 1), Role::Datanode);
req.set_header((1, 1), Role::Datanode, W3cTrace::new());
let res = meta_srv.range(req.into_request()).await;
let _ = res.unwrap();
@@ -286,7 +287,7 @@ mod tests {
let meta_srv = new_meta_srv().await;
let mut req = PutRequest::default();
req.set_header((1, 1), Role::Datanode);
req.set_header((1, 1), Role::Datanode, W3cTrace::new());
let res = meta_srv.put(req.into_request()).await;
let _ = res.unwrap();
@@ -297,7 +298,7 @@ mod tests {
let meta_srv = new_meta_srv().await;
let mut req = BatchGetRequest::default();
req.set_header((1, 1), Role::Datanode);
req.set_header((1, 1), Role::Datanode, W3cTrace::new());
let res = meta_srv.batch_get(req.into_request()).await;
let _ = res.unwrap();
@@ -308,7 +309,7 @@ mod tests {
let meta_srv = new_meta_srv().await;
let mut req = BatchPutRequest::default();
req.set_header((1, 1), Role::Datanode);
req.set_header((1, 1), Role::Datanode, W3cTrace::new());
let res = meta_srv.batch_put(req.into_request()).await;
let _ = res.unwrap();
@@ -319,7 +320,7 @@ mod tests {
let meta_srv = new_meta_srv().await;
let mut req = BatchDeleteRequest::default();
req.set_header((1, 1), Role::Datanode);
req.set_header((1, 1), Role::Datanode, W3cTrace::new());
let res = meta_srv.batch_delete(req.into_request()).await;
let _ = res.unwrap();
@@ -330,7 +331,7 @@ mod tests {
let meta_srv = new_meta_srv().await;
let mut req = CompareAndPutRequest::default();
req.set_header((1, 1), Role::Datanode);
req.set_header((1, 1), Role::Datanode, W3cTrace::new());
let res = meta_srv.compare_and_put(req.into_request()).await;
let _ = res.unwrap();
@@ -341,7 +342,7 @@ mod tests {
let meta_srv = new_meta_srv().await;
let mut req = DeleteRangeRequest::default();
req.set_header((1, 1), Role::Datanode);
req.set_header((1, 1), Role::Datanode, W3cTrace::new());
let res = meta_srv.delete_range(req.into_request()).await;
let _ = res.unwrap();

View File

@@ -5,6 +5,7 @@ edition.workspace = true
license.workspace = true
[dependencies]
ahash.workspace = true
api.workspace = true
async-trait.workspace = true
base64.workspace = true

View File

@@ -13,12 +13,15 @@
// limitations under the License.
use api::v1::SemanticType;
use common_query::Output;
use common_telemetry::tracing::warn;
use mito2::engine::MitoEngine;
use snafu::ResultExt;
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{AddColumn, AlterKind, RegionAlterRequest, RegionRequest};
use store_api::region_request::{
AddColumn, AlterKind, RegionAlterRequest, RegionPutRequest, RegionRequest,
};
use store_api::storage::RegionId;
use crate::error::{
@@ -122,6 +125,18 @@ impl DataRegion {
Ok(())
}
pub async fn write_data(
&self,
region_id: RegionId,
request: RegionPutRequest,
) -> Result<Output> {
let region_id = utils::to_data_region_id(region_id);
self.mito
.handle_request(region_id, RegionRequest::Put(request))
.await
.context(MitoWriteOperationSnafu)
}
}
#[cfg(test)]
@@ -184,7 +199,7 @@ mod test {
let expected = vec![
"greptime_timestamp",
"greptime_value",
"__metric",
"__table_id",
"__tsid",
"job",
"tag2",

View File

@@ -13,9 +13,13 @@
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::hash::{BuildHasher, Hash, Hasher};
use std::sync::Arc;
use api::v1::SemanticType;
use ahash::{AHasher, RandomState};
use api::helper::to_column_data_type;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema as PbColumnSchema, Row, Rows, SemanticType};
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_query::Output;
@@ -31,23 +35,23 @@ use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_request::{
AlterKind, RegionAlterRequest, RegionCreateRequest, RegionRequest,
AlterKind, RegionAlterRequest, RegionCreateRequest, RegionPutRequest, RegionRequest,
};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::{RegionGroup, RegionId, ScanRequest};
use store_api::storage::{RegionGroup, RegionId, ScanRequest, TableId};
use tokio::sync::RwLock;
use crate::data_region::DataRegion;
use crate::error::{
ConflictRegionOptionSnafu, CreateMitoRegionSnafu, ForbiddenPhysicalAlterSnafu,
InternalColumnOccupiedSnafu, LogicalRegionNotFoundSnafu, MissingRegionOptionSnafu,
ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu, Result,
ColumnNotFoundSnafu, ConflictRegionOptionSnafu, CreateMitoRegionSnafu,
ForbiddenPhysicalAlterSnafu, InternalColumnOccupiedSnafu, LogicalRegionNotFoundSnafu,
MissingRegionOptionSnafu, ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu, Result,
};
use crate::metadata_region::MetadataRegion;
use crate::metrics::{
FORBIDDEN_OPERATION_COUNT, LOGICAL_REGION_COUNT, PHYSICAL_COLUMN_COUNT, PHYSICAL_REGION_COUNT,
};
use crate::utils::{self, to_data_region_id};
use crate::utils::{to_data_region_id, to_metadata_region_id};
/// region group value for data region inside a metric region
pub const METRIC_DATA_REGION_GROUP: RegionGroup = 0;
@@ -64,7 +68,7 @@ pub const METADATA_SCHEMA_KEY_COLUMN_INDEX: usize = 1;
pub const METADATA_SCHEMA_VALUE_COLUMN_INDEX: usize = 2;
/// Column name of internal column `__metric` that stores the original metric name
pub const DATA_SCHEMA_METRIC_NAME_COLUMN_NAME: &str = "__metric";
pub const DATA_SCHEMA_TABLE_ID_COLUMN_NAME: &str = "__table_id";
pub const DATA_SCHEMA_TSID_COLUMN_NAME: &str = "__tsid";
pub const METADATA_REGION_SUBDIR: &str = "metadata";
@@ -100,6 +104,9 @@ pub const PHYSICAL_TABLE_METADATA_KEY: &str = "physical_metric_table";
/// And this key will be translated to corresponding physical **REGION** id in metasrv.
pub const LOGICAL_TABLE_METADATA_KEY: &str = "on_physical_table";
/// Fixed random state for generating tsid
const RANDOM_STATE: ahash::RandomState = ahash::RandomState::with_seeds(1, 2, 3, 4);
#[derive(Clone)]
pub struct MetricEngine {
inner: Arc<MetricEngineInner>,
@@ -121,7 +128,7 @@ impl RegionEngine for MetricEngine {
request: RegionRequest,
) -> std::result::Result<Output, BoxedError> {
let result = match request {
RegionRequest::Put(_) => todo!(),
RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
RegionRequest::Delete(_) => todo!(),
RegionRequest::Create(create) => self
.inner
@@ -399,6 +406,16 @@ impl MetricEngineInner {
self.metadata_region
.add_logical_region(metadata_region_id, logical_region_id)
.await?;
for col in &request.column_metadatas {
self.metadata_region
.add_column(
metadata_region_id,
logical_region_id,
&col.column_schema.name,
col.semantic_type,
)
.await?;
}
// update the mapping
// Safety: previous steps ensure the physical region exist
@@ -463,9 +480,9 @@ impl MetricEngineInner {
// check if internal columns are not occupied
ensure!(
!name_to_index.contains_key(DATA_SCHEMA_METRIC_NAME_COLUMN_NAME),
!name_to_index.contains_key(DATA_SCHEMA_TABLE_ID_COLUMN_NAME),
InternalColumnOccupiedSnafu {
column: DATA_SCHEMA_METRIC_NAME_COLUMN_NAME,
column: DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
}
);
ensure!(
@@ -495,8 +512,8 @@ impl MetricEngineInner {
/// Return value: (data_region_id, metadata_region_id)
fn transform_region_id(region_id: RegionId) -> (RegionId, RegionId) {
(
utils::to_data_region_id(region_id),
utils::to_metadata_region_id(region_id),
to_data_region_id(region_id),
to_metadata_region_id(region_id),
)
}
@@ -581,12 +598,25 @@ impl MetricEngineInner {
});
// add internal columns
let [table_id_col, tsid_col] = Self::internal_column_metadata();
data_region_request.column_metadatas.push(table_id_col);
data_region_request.column_metadatas.push(tsid_col);
data_region_request.primary_key =
vec![ReservedColumnId::table_id(), ReservedColumnId::tsid()];
data_region_request
}
/// Generate internal column metadata.
///
/// Return `[table_id_col, tsid_col]`
fn internal_column_metadata() -> [ColumnMetadata; 2] {
let metric_name_col = ColumnMetadata {
column_id: ReservedColumnId::metric_name(),
column_id: ReservedColumnId::table_id(),
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
DATA_SCHEMA_METRIC_NAME_COLUMN_NAME,
ConcreteDataType::string_datatype(),
DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
ConcreteDataType::uint32_datatype(),
false,
),
};
@@ -595,16 +625,11 @@ impl MetricEngineInner {
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
DATA_SCHEMA_TSID_COLUMN_NAME,
ConcreteDataType::int64_datatype(),
ConcreteDataType::uint64_datatype(),
false,
),
};
data_region_request.column_metadatas.push(metric_name_col);
data_region_request.column_metadatas.push(tsid_col);
data_region_request.primary_key =
vec![ReservedColumnId::metric_name(), ReservedColumnId::tsid()];
data_region_request
[metric_name_col, tsid_col]
}
}
@@ -646,9 +671,9 @@ impl MetricEngineInner {
return Ok(());
};
let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
let metadata_region_id = to_metadata_region_id(physical_region_id);
let mut columns_to_add = vec![];
for col in columns {
for col in &columns {
if self
.metadata_region
.column_semantic_type(
@@ -659,11 +684,12 @@ impl MetricEngineInner {
.await?
.is_none()
{
columns_to_add.push(col.column_metadata);
columns_to_add.push(col.column_metadata.clone());
}
}
let data_region_id = utils::to_data_region_id(physical_region_id);
// alter data region
let data_region_id = to_data_region_id(physical_region_id);
self.add_columns_to_physical_data_region(
data_region_id,
metadata_region_id,
@@ -672,6 +698,18 @@ impl MetricEngineInner {
)
.await?;
// register columns to logical region
for col in columns {
self.metadata_region
.add_column(
metadata_region_id,
region_id,
&col.column_metadata.column_schema.name,
col.column_metadata.semantic_type,
)
.await?;
}
Ok(())
}
@@ -687,14 +725,191 @@ impl MetricEngineInner {
}
}
impl MetricEngineInner {
/// Dispatch region put request
pub async fn put_region(
&self,
region_id: RegionId,
request: RegionPutRequest,
) -> Result<Output> {
let is_putting_physical_region = self
.state
.read()
.await
.physical_regions
.contains_key(&region_id);
if is_putting_physical_region {
info!(
"Metric region received put request {request:?} on physical region {region_id:?}"
);
FORBIDDEN_OPERATION_COUNT.inc();
ForbiddenPhysicalAlterSnafu.fail()
} else {
self.put_logical_region(region_id, request).await
}
}
async fn put_logical_region(
&self,
logical_region_id: RegionId,
mut request: RegionPutRequest,
) -> Result<Output> {
let physical_region_id = *self
.state
.read()
.await
.logical_regions
.get(&logical_region_id)
.with_context(|| LogicalRegionNotFoundSnafu {
region_id: logical_region_id,
})?;
let data_region_id = to_data_region_id(physical_region_id);
self.verify_put_request(logical_region_id, physical_region_id, &request)
.await?;
// write to data region
// TODO: retrieve table name
self.modify_rows(logical_region_id.table_id(), &mut request.rows)?;
self.data_region.write_data(data_region_id, request).await
}
/// Verifies a put request for a logical region against its corresponding metadata region.
///
/// Includes:
/// - Check if the logical region exists
/// - Check if the columns exist
async fn verify_put_request(
&self,
logical_region_id: RegionId,
physical_region_id: RegionId,
request: &RegionPutRequest,
) -> Result<()> {
// check if the region exists
let metadata_region_id = to_metadata_region_id(physical_region_id);
if !self
.metadata_region
.is_logical_region_exists(metadata_region_id, logical_region_id)
.await?
{
error!("Trying to write to an nonexistent region {logical_region_id}");
return LogicalRegionNotFoundSnafu {
region_id: logical_region_id,
}
.fail();
}
// check if the columns exist
for col in &request.rows.schema {
if self
.metadata_region
.column_semantic_type(metadata_region_id, logical_region_id, &col.column_name)
.await?
.is_none()
{
return ColumnNotFoundSnafu {
name: col.column_name.clone(),
region_id: logical_region_id,
}
.fail();
}
}
Ok(())
}
/// Perform metric engine specific logic to incoming rows.
/// - Change the semantic type of tag columns to field
/// - Add table_id column
/// - Generate tsid
fn modify_rows(&self, table_id: TableId, rows: &mut Rows) -> Result<()> {
// gather tag column indices
let mut tag_col_indices = rows
.schema
.iter()
.enumerate()
.filter_map(|(idx, col)| {
if col.semantic_type == SemanticType::Tag as i32 {
Some((idx, col.column_name.clone()))
} else {
None
}
})
.collect::<Vec<_>>();
// generate new schema
rows.schema = rows
.schema
.clone()
.into_iter()
.map(|mut col| {
if col.semantic_type == SemanticType::Tag as i32 {
col.semantic_type = SemanticType::Field as i32;
}
col
})
.collect::<Vec<_>>();
// add table_name column
rows.schema.push(PbColumnSchema {
column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
datatype: to_column_data_type(&ConcreteDataType::uint32_datatype())
.unwrap()
.into(),
semantic_type: SemanticType::Tag as _,
});
// add tsid column
rows.schema.push(PbColumnSchema {
column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
datatype: to_column_data_type(&ConcreteDataType::uint64_datatype())
.unwrap()
.into(),
semantic_type: SemanticType::Tag as _,
});
// fill internal columns
let mut random_state = RANDOM_STATE.clone();
for row in &mut rows.rows {
Self::fill_internal_columns(&mut random_state, table_id, &tag_col_indices, row);
}
Ok(())
}
/// Fills internal columns of a row with table name and a hash of tag values.
fn fill_internal_columns(
random_state: &mut RandomState,
table_id: TableId,
tag_col_indices: &[(usize, String)],
row: &mut Row,
) {
let mut hasher = random_state.build_hasher();
for (idx, name) in tag_col_indices {
let tag = row.values[*idx].clone();
name.hash(&mut hasher);
// The type is checked before. So only null is ignored.
if let Some(ValueData::StringValue(string)) = tag.value_data {
string.hash(&mut hasher);
}
}
let hash = hasher.finish();
// fill table id and tsid
row.values.push(ValueData::U32Value(table_id).into());
row.values.push(ValueData::U64Value(hash).into());
}
}
#[cfg(test)]
mod tests {
use std::hash::Hash;
use api::v1::region::alter_request;
use store_api::region_request::AddColumn;
use super::*;
use crate::test_util::TestEnv;
use crate::test_util::{self, TestEnv};
#[test]
fn test_verify_region_create_request() {
@@ -714,8 +929,8 @@ mod tests {
column_id: 1,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
DATA_SCHEMA_METRIC_NAME_COLUMN_NAME,
ConcreteDataType::string_datatype(),
DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
ConcreteDataType::uint32_datatype(),
false,
),
},
@@ -729,7 +944,7 @@ mod tests {
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
"Internal column __metric is reserved".to_string()
"Internal column __table_id is reserved".to_string()
);
// valid request
@@ -835,7 +1050,7 @@ mod tests {
assert_eq!(data_region_request.column_metadatas.len(), 4);
assert_eq!(
data_region_request.primary_key,
vec![ReservedColumnId::metric_name(), ReservedColumnId::tsid()]
vec![ReservedColumnId::table_id(), ReservedColumnId::tsid()]
);
}
@@ -896,5 +1111,82 @@ mod tests {
.unwrap()
.unwrap();
assert_eq!(semantic_type, SemanticType::Tag);
let timestamp_index = metadata_region
.column_semantic_type(physical_region_id, logical_region_id, "greptime_timestamp")
.await
.unwrap()
.unwrap();
assert_eq!(timestamp_index, SemanticType::Timestamp);
}
#[tokio::test]
async fn test_write_logical_region() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let engine = env.metric();
// add columns
let logical_region_id = env.default_logical_region_id();
let columns = &["odd", "even", "Ev_En"];
let alter_request = test_util::alter_logical_region_add_tag_columns(columns);
engine
.handle_request(logical_region_id, RegionRequest::Alter(alter_request))
.await
.unwrap();
// prepare data
let schema = test_util::row_schema_with_tags(columns);
let rows = test_util::build_rows(3, 100);
let request = RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
});
// write data
let Output::AffectedRows(count) = engine
.handle_request(logical_region_id, request)
.await
.unwrap()
else {
panic!()
};
assert_eq!(100, count);
}
#[tokio::test]
async fn test_write_physical_region() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let engine = env.metric();
let physical_region_id = env.default_physical_region_id();
let schema = test_util::row_schema_with_tags(&["abc"]);
let rows = test_util::build_rows(1, 100);
let request = RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
});
engine
.handle_request(physical_region_id, request)
.await
.unwrap_err();
}
#[tokio::test]
async fn test_write_nonexist_logical_region() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let engine = env.metric();
let logical_region_id = RegionId::new(175, 8345);
let schema = test_util::row_schema_with_tags(&["def"]);
let rows = test_util::build_rows(1, 100);
let request = RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
});
engine
.handle_request(logical_region_id, request)
.await
.unwrap_err();
}
}

View File

@@ -109,6 +109,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Column {} not found in logical region {}", name, region_id))]
ColumnNotFound {
name: String,
region_id: RegionId,
location: Location,
},
#[snafu(display("Alter request to physical region is forbidden"))]
ForbiddenPhysicalAlter { location: Location },
}
@@ -136,6 +143,8 @@ impl ErrorExt for Error {
StatusCode::RegionNotFound
}
ColumnNotFound { .. } => StatusCode::TableColumnNotFound,
CreateMitoRegion { source, .. }
| MitoReadOperation { source, .. }
| MitoWriteOperation { source, .. } => source.status_code(),

View File

@@ -14,7 +14,9 @@
//! Utilities for testing.
use api::v1::SemanticType;
use api::helper::to_column_data_type;
use api::v1::value::ValueData;
use api::v1::{ColumnSchema as PbColumnSchema, Row, SemanticType, Value};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use mito2::config::MitoConfig;
@@ -23,7 +25,9 @@ use mito2::test_util::TestEnv as MitoTestEnv;
use object_store::util::join_dir;
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionCreateRequest, RegionRequest};
use store_api::region_request::{
AddColumn, AlterKind, RegionAlterRequest, RegionCreateRequest, RegionRequest,
};
use store_api::storage::RegionId;
use crate::data_region::DataRegion;
@@ -181,6 +185,87 @@ impl TestEnv {
}
}
/// Generate a [RegionAlterRequest] for adding tag columns.
pub fn alter_logical_region_add_tag_columns(new_tags: &[&str]) -> RegionAlterRequest {
let mut new_columns = vec![];
for (i, tag) in new_tags.iter().enumerate() {
new_columns.push(AddColumn {
column_metadata: ColumnMetadata {
column_id: i as u32,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
tag.to_string(),
ConcreteDataType::string_datatype(),
false,
),
},
location: None,
});
}
RegionAlterRequest {
schema_version: 0,
kind: AlterKind::AddColumns {
columns: new_columns,
},
}
}
/// Generate a row schema with given tag columns.
///
/// The result will also contains default timestamp and value column at beginning.
pub fn row_schema_with_tags(tags: &[&str]) -> Vec<PbColumnSchema> {
let mut schema = vec![
PbColumnSchema {
column_name: "greptime_timestamp".to_string(),
datatype: to_column_data_type(&ConcreteDataType::timestamp_millisecond_datatype())
.unwrap()
.into(),
semantic_type: SemanticType::Timestamp as _,
},
PbColumnSchema {
column_name: "greptime_value".to_string(),
datatype: to_column_data_type(&ConcreteDataType::float64_datatype())
.unwrap()
.into(),
semantic_type: SemanticType::Field as _,
},
];
for tag in tags {
schema.push(PbColumnSchema {
column_name: tag.to_string(),
datatype: to_column_data_type(&ConcreteDataType::string_datatype())
.unwrap()
.into(),
semantic_type: SemanticType::Tag as _,
});
}
schema
}
/// Build [Rows] for assembling [RegionPutRequest](store_api::region_request::RegionPutRequest).
///
/// The schema is generated by [row_schema_with_tags].
pub fn build_rows(num_tags: usize, num_rows: usize) -> Vec<Row> {
let mut rows = vec![];
for i in 0..num_rows {
let mut values = vec![
Value {
value_data: Some(ValueData::TimestampMillisecondValue(i as _)),
},
Value {
value_data: Some(ValueData::F64Value(i as f64)),
},
];
for j in 0..num_tags {
values.push(Value {
value_data: Some(ValueData::StringValue(format!("tag_{}", j))),
});
}
rows.push(Row { values });
}
rows
}
#[cfg(test)]
mod test {

View File

@@ -25,8 +25,7 @@ use api::helper::{
use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value};
use common_query::Output;
use common_query::Output::AffectedRows;
use common_telemetry::tracing::log::info;
use common_telemetry::warn;
use common_telemetry::{info, warn};
use datatypes::prelude::DataType;
use prometheus::HistogramTimer;
use prost::Message;

View File

@@ -17,8 +17,7 @@
use std::time::Duration;
use common_query::Output;
use common_telemetry::info;
use common_telemetry::tracing::warn;
use common_telemetry::{info, warn};
use futures::TryStreamExt;
use object_store::util::join_path;
use object_store::{EntryMode, ObjectStore};

View File

@@ -22,6 +22,7 @@ use catalog::CatalogManagerRef;
use common_meta::datanode_manager::{AffectedRows, DatanodeManagerRef};
use common_meta::peer::Peer;
use common_query::Output;
use common_telemetry::tracing_context::TracingContext;
use futures_util::future;
use partition::manager::PartitionRuleManagerRef;
use session::context::QueryContextRef;
@@ -119,8 +120,10 @@ impl Deleter {
requests: RegionDeleteRequests,
ctx: &QueryContextRef,
) -> Result<AffectedRows> {
let header: RegionRequestHeader = ctx.as_ref().into();
let request_factory = RegionRequestFactory::new(header);
let request_factory = RegionRequestFactory::new(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
dbname: ctx.get_db_string(),
});
let tasks = self
.group_requests_by_peer(requests)

View File

@@ -26,6 +26,7 @@ use common_grpc_expr::util::{extract_new_columns, ColumnExpr};
use common_meta::datanode_manager::{AffectedRows, DatanodeManagerRef};
use common_meta::peer::Peer;
use common_query::Output;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{error, info};
use datatypes::schema::Schema;
use futures_util::future;
@@ -152,8 +153,10 @@ impl Inserter {
ctx: &QueryContextRef,
) -> Result<AffectedRows> {
write_meter!(ctx.current_catalog(), ctx.current_schema(), requests);
let header: RegionRequestHeader = ctx.as_ref().into();
let request_factory = RegionRequestFactory::new(header);
let request_factory = RegionRequestFactory::new(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
dbname: ctx.get_db_string(),
});
let tasks = self
.group_requests_by_peer(requests)

View File

@@ -32,6 +32,7 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::table_name::TableName;
use common_query::Output;
use common_telemetry::tracing;
use common_time::range::TimestampRange;
use common_time::Timestamp;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
@@ -88,6 +89,7 @@ impl StatementExecutor {
}
}
#[tracing::instrument(skip_all)]
pub async fn execute_stmt(
&self,
stmt: QueryStatement,
@@ -201,6 +203,7 @@ impl StatementExecutor {
.context(PlanStatementSnafu)
}
#[tracing::instrument(skip_all)]
async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
let plan = self.plan(stmt, query_ctx.clone()).await?;
self.query_engine

View File

@@ -14,7 +14,7 @@
use common_datasource::file_format::Format;
use common_query::Output;
use common_telemetry::info;
use common_telemetry::{info, tracing};
use session::context::QueryContextBuilder;
use snafu::{ensure, ResultExt};
use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest};
@@ -27,6 +27,7 @@ pub(crate) const COPY_DATABASE_TIME_START_KEY: &str = "start_time";
pub(crate) const COPY_DATABASE_TIME_END_KEY: &str = "end_time";
impl StatementExecutor {
#[tracing::instrument(skip_all)]
pub(crate) async fn copy_database(&self, req: CopyDatabaseRequest) -> error::Result<Output> {
// location must end with / so that every table is exported to a file.
ensure!(

View File

@@ -29,7 +29,7 @@ use common_datasource::object_store::{build_backend, parse_url};
use common_datasource::util::find_dir_and_filename;
use common_recordbatch::adapter::ParquetRecordBatchStreamAdapter;
use common_recordbatch::DfSendableRecordBatchStream;
use common_telemetry::debug;
use common_telemetry::{debug, tracing};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileStream};
@@ -237,6 +237,7 @@ impl StatementExecutor {
}
}
#[tracing::instrument(skip_all)]
pub async fn copy_table_from(
&self,
req: CopyTableRequest,

View File

@@ -23,7 +23,7 @@ use common_datasource::util::find_dir_and_filename;
use common_query::Output;
use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;
use common_telemetry::{debug, tracing};
use datafusion::datasource::DefaultTableSource;
use datafusion_common::TableReference as DfTableReference;
use datafusion_expr::LogicalPlanBuilder;
@@ -84,6 +84,7 @@ impl StatementExecutor {
}
}
#[tracing::instrument(skip_all)]
pub(crate) async fn copy_table_to(
&self,
req: CopyTableRequest,

View File

@@ -28,7 +28,7 @@ use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse
use common_meta::rpc::router::{Partition, Partition as MetaPartition};
use common_meta::table_name::TableName;
use common_query::Output;
use common_telemetry::info;
use common_telemetry::{info, tracing};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::RawSchema;
use partition::partition::{PartitionBound, PartitionDef};
@@ -58,11 +58,13 @@ impl StatementExecutor {
self.catalog_manager.clone()
}
#[tracing::instrument(skip_all)]
pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result<TableRef> {
let create_expr = &mut expr_factory::create_to_expr(&stmt, ctx)?;
self.create_table_inner(create_expr, stmt.partitions).await
}
#[tracing::instrument(skip_all)]
pub async fn create_external_table(
&self,
create_expr: CreateExternalTable,
@@ -151,6 +153,7 @@ impl StatementExecutor {
Ok(table)
}
#[tracing::instrument(skip_all)]
pub async fn drop_table(&self, table_name: TableName) -> Result<Output> {
let table = self
.catalog_manager
@@ -181,6 +184,7 @@ impl StatementExecutor {
Ok(Output::AffectedRows(0))
}
#[tracing::instrument(skip_all)]
pub async fn truncate_table(&self, table_name: TableName) -> Result<Output> {
let table = self
.catalog_manager
@@ -221,6 +225,7 @@ impl StatementExecutor {
Ok(())
}
#[tracing::instrument(skip_all)]
pub async fn alter_table(
&self,
alter_table: AlterTable,
@@ -347,6 +352,7 @@ impl StatementExecutor {
.context(error::ExecuteDdlSnafu)
}
#[tracing::instrument(skip_all)]
pub async fn create_database(
&self,
catalog: &str,

View File

@@ -14,6 +14,7 @@
use common_error::ext::BoxedError;
use common_query::Output;
use common_telemetry::tracing;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use sql::statements::describe::DescribeTable;
@@ -26,6 +27,7 @@ use crate::statement::StatementExecutor;
use crate::table::table_idents_to_full_name;
impl StatementExecutor {
#[tracing::instrument(skip_all)]
pub(super) async fn describe_table(
&self,
stmt: DescribeTable,

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use common_query::Output;
use common_telemetry::tracing;
use query::parser::QueryStatement;
use session::context::QueryContextRef;
use sql::statements::insert::Insert;
@@ -22,6 +23,7 @@ use super::StatementExecutor;
use crate::error::Result;
impl StatementExecutor {
#[tracing::instrument(skip_all)]
pub async fn insert(&self, insert: Box<Insert>, query_ctx: QueryContextRef) -> Result<Output> {
if insert.can_extract_values() {
// Fast path: plain insert ("insert with literal values") is executed directly

View File

@@ -14,6 +14,7 @@
use common_meta::table_name::TableName;
use common_query::Output;
use common_telemetry::tracing;
use partition::manager::PartitionInfo;
use partition::partition::PartitionBound;
use session::context::QueryContextRef;
@@ -28,6 +29,7 @@ use crate::error::{self, ExecuteStatementSnafu, Result};
use crate::statement::StatementExecutor;
impl StatementExecutor {
#[tracing::instrument(skip_all)]
pub(super) async fn show_databases(
&self,
stmt: ShowDatabases,
@@ -38,6 +40,7 @@ impl StatementExecutor {
.context(ExecuteStatementSnafu)
}
#[tracing::instrument(skip_all)]
pub(super) async fn show_tables(
&self,
stmt: ShowTables,
@@ -48,6 +51,7 @@ impl StatementExecutor {
.context(ExecuteStatementSnafu)
}
#[tracing::instrument(skip_all)]
pub async fn show_create_table(
&self,
table_name: TableName,

View File

@@ -15,6 +15,7 @@
use std::collections::HashMap;
use common_query::Output;
use common_telemetry::tracing;
use query::parser::{PromQuery, QueryLanguageParser, ANALYZE_NODE_NAME, EXPLAIN_NODE_NAME};
use session::context::QueryContextRef;
use snafu::ResultExt;
@@ -24,6 +25,7 @@ use crate::error::{ExecLogicalPlanSnafu, ParseQuerySnafu, PlanStatementSnafu, Re
use crate::statement::StatementExecutor;
impl StatementExecutor {
#[tracing::instrument(skip_all)]
pub(super) async fn execute_tql(&self, tql: Tql, query_ctx: QueryContextRef) -> Result<Output> {
let stmt = match tql {
Tql::Eval(eval) => {

View File

@@ -5,7 +5,7 @@ edition.workspace = true
license.workspace = true
[dependencies]
ahash = { version = "0.8", features = ["compile-time-rng"] }
ahash.workspace = true
api.workspace = true
arc-swap = "1.0"
arrow-schema.workspace = true

View File

@@ -34,6 +34,7 @@ use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{
EmptyRecordBatchStream, RecordBatch, RecordBatches, SendableRecordBatchStream,
};
use common_telemetry::tracing;
use datafusion::common::Column;
use datafusion::physical_plan::analyze::AnalyzeExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -77,6 +78,7 @@ impl DatafusionQueryEngine {
Self { state, plugins }
}
#[tracing::instrument(skip_all)]
async fn exec_query_plan(
&self,
plan: LogicalPlan,
@@ -97,6 +99,7 @@ impl DatafusionQueryEngine {
Ok(Output::Stream(self.execute_stream(&ctx, &physical_plan)?))
}
#[tracing::instrument(skip_all)]
async fn exec_dml_statement(
&self,
dml: DmlStatement,
@@ -147,6 +150,7 @@ impl DatafusionQueryEngine {
Ok(Output::AffectedRows(affected_rows))
}
#[tracing::instrument(skip_all)]
async fn delete<'a>(
&self,
table_name: &ResolvedTableReference<'a>,
@@ -189,6 +193,7 @@ impl DatafusionQueryEngine {
.await
}
#[tracing::instrument(skip_all)]
async fn insert<'a>(
&self,
table_name: &ResolvedTableReference<'a>,
@@ -285,6 +290,7 @@ impl QueryEngine for DatafusionQueryEngine {
}
impl LogicalOptimizer for DatafusionQueryEngine {
#[tracing::instrument(skip_all)]
fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
let _timer = metrics::METRIC_OPTIMIZE_LOGICAL_ELAPSED.start_timer();
match plan {
@@ -305,6 +311,7 @@ impl LogicalOptimizer for DatafusionQueryEngine {
#[async_trait::async_trait]
impl PhysicalPlanner for DatafusionQueryEngine {
#[tracing::instrument(skip_all)]
async fn create_physical_plan(
&self,
ctx: &mut QueryEngineContext,
@@ -338,6 +345,7 @@ impl PhysicalPlanner for DatafusionQueryEngine {
}
impl PhysicalOptimizer for DatafusionQueryEngine {
#[tracing::instrument(skip_all)]
fn optimize_physical_plan(
&self,
ctx: &mut QueryEngineContext,
@@ -385,6 +393,7 @@ impl PhysicalOptimizer for DatafusionQueryEngine {
}
impl QueryExecutor for DatafusionQueryEngine {
#[tracing::instrument(skip_all)]
fn execute_stream(
&self,
ctx: &QueryEngineContext,

View File

@@ -27,7 +27,8 @@ use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{
DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamAdaptor, SendableRecordBatchStream,
};
use common_telemetry::trace_id;
use common_telemetry::tracing;
use common_telemetry::tracing_context::TracingContext;
use datafusion::physical_plan::metrics::{
Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, Time,
};
@@ -155,6 +156,7 @@ impl MergeScanExec {
})
}
#[tracing::instrument(skip_all)]
pub fn to_stream(&self, context: Arc<TaskContext>) -> Result<SendableRecordBatchStream> {
let substrait_plan = self.substrait_plan.to_vec();
let regions = self.regions.clone();
@@ -163,7 +165,8 @@ impl MergeScanExec {
let schema = Self::arrow_schema_to_schema(self.schema())?;
let dbname = context.task_id().unwrap_or_default();
let trace_id = trace_id().unwrap_or_default();
let tracing_context = TracingContext::from_current_span().to_w3c();
let stream = Box::pin(stream!({
METRIC_MERGE_SCAN_REGIONS.observe(regions.len() as f64);
@@ -174,8 +177,7 @@ impl MergeScanExec {
for region_id in regions {
let request = QueryRequest {
header: Some(RegionRequestHeader {
trace_id,
span_id: 0,
tracing_context: tracing_context.clone(),
dbname: dbname.clone(),
}),
region_id: region_id.into(),

View File

@@ -17,6 +17,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use catalog::table_source::DfTableSourceProvider;
use common_error::ext::BoxedError;
use common_telemetry::tracing;
use datafusion::execution::context::SessionState;
use datafusion_sql::planner::{ParserOptions, SqlToRel};
use promql::planner::PromPlanner;
@@ -51,6 +52,7 @@ impl DfLogicalPlanner {
}
}
#[tracing::instrument(skip_all)]
async fn plan_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
let df_stmt = (&stmt).try_into().context(SqlSnafu)?;
@@ -85,6 +87,7 @@ impl DfLogicalPlanner {
Ok(LogicalPlan::DfPlan(plan))
}
#[tracing::instrument(skip_all)]
async fn plan_pql(&self, stmt: EvalStmt, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
let table_provider = DfTableSourceProvider::new(
self.engine_state.catalog_manager().clone(),
@@ -101,6 +104,7 @@ impl DfLogicalPlanner {
#[async_trait]
impl LogicalPlanner for DfLogicalPlanner {
#[tracing::instrument(skip_all)]
async fn plan(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
match stmt {
QueryStatement::Sql(stmt) => self.plan_sql(stmt, query_ctx).await,

View File

@@ -342,6 +342,12 @@ impl RangePlanRewriter {
.row_key_column_names()
.map(|key| Expr::Column(Column::new(Some(table_ref.clone()), key)))
.collect();
// If the user does not specify a primary key when creating a table,
// then by default all data will be aggregated into one time series,
// which is equivalent to using `by(1)` in SQL
if default_by.is_empty() {
default_by = vec![Expr::Literal(ScalarValue::Int64(Some(1)))];
}
time_index_expr = Expr::Column(Column::new(
Some(table_ref.clone()),
time_index_column.name.clone(),

View File

@@ -26,6 +26,7 @@ use arrow_flight::{
use async_trait::async_trait;
use common_grpc::flight::{FlightEncoder, FlightMessage};
use common_query::Output;
use common_telemetry::tracing_context::TracingContext;
use futures::Stream;
use prost::Message;
use snafu::ResultExt;
@@ -150,28 +151,26 @@ impl FlightCraft for GreptimeRequestHandler {
let ticket = request.into_inner().ticket;
let request =
GreptimeRequest::decode(ticket.as_ref()).context(error::InvalidFlightTicketSnafu)?;
let trace_id = request
.header
.as_ref()
.map(|h| h.trace_id)
.unwrap_or_default();
let output = self.handle_request(request).await?;
let stream: Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + Sync>> =
to_flight_data_stream(output, trace_id);
to_flight_data_stream(output, TracingContext::new());
Ok(Response::new(stream))
}
}
fn to_flight_data_stream(output: Output, trace_id: u64) -> TonicStream<FlightData> {
fn to_flight_data_stream(
output: Output,
tracing_context: TracingContext,
) -> TonicStream<FlightData> {
match output {
Output::Stream(stream) => {
let stream = FlightRecordBatchStream::new(stream, trace_id);
let stream = FlightRecordBatchStream::new(stream, tracing_context);
Box::pin(stream) as _
}
Output::RecordBatches(x) => {
let stream = FlightRecordBatchStream::new(x.as_stream(), trace_id);
let stream = FlightRecordBatchStream::new(x.as_stream(), tracing_context);
Box::pin(stream) as _
}
Output::AffectedRows(rows) => {

View File

@@ -18,7 +18,9 @@ use std::task::{Context, Poll};
use arrow_flight::FlightData;
use common_grpc::flight::{FlightEncoder, FlightMessage};
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{warn, TRACE_ID};
use common_telemetry::tracing::info_span;
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::warn;
use futures::channel::mpsc;
use futures::channel::mpsc::Sender;
use futures::{SinkExt, Stream, StreamExt};
@@ -39,11 +41,13 @@ pub struct FlightRecordBatchStream {
}
impl FlightRecordBatchStream {
pub fn new(recordbatches: SendableRecordBatchStream, trace_id: u64) -> Self {
pub fn new(recordbatches: SendableRecordBatchStream, tracing_context: TracingContext) -> Self {
let (tx, rx) = mpsc::channel::<TonicResult<FlightMessage>>(1);
let join_handle = common_runtime::spawn_read(TRACE_ID.scope(trace_id, async move {
Self::flight_data_stream(recordbatches, tx).await
}));
let join_handle = common_runtime::spawn_read(async move {
Self::flight_data_stream(recordbatches, tx)
.trace(tracing_context.attach(info_span!("flight_data_stream")))
.await
});
Self {
rx,
join_handle,
@@ -145,7 +149,7 @@ mod test {
let recordbatches = RecordBatches::try_new(schema.clone(), vec![recordbatch.clone()])
.unwrap()
.as_stream();
let mut stream = FlightRecordBatchStream::new(recordbatches, 0);
let mut stream = FlightRecordBatchStream::new(recordbatches, TracingContext::default());
let mut raw_data = Vec::with_capacity(2);
raw_data.push(stream.next().await.unwrap().unwrap());

View File

@@ -27,7 +27,7 @@ use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::Output;
use common_runtime::Runtime;
use common_telemetry::{logging, TRACE_ID};
use common_telemetry::logging;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
@@ -70,7 +70,6 @@ impl GreptimeRequestHandler {
let request_type = request_type(&query).to_string();
let db = query_ctx.get_db_string();
let timer = RequestTimer::new(db.clone(), request_type);
let trace_id = query_ctx.trace_id();
// Executes requests in another runtime to
// 1. prevent the execution from being cancelled unexpected by Tonic runtime;
@@ -79,7 +78,7 @@ impl GreptimeRequestHandler {
// - Obtaining a `JoinHandle` to get the panic message (if there's any).
// From its docs, `JoinHandle` is cancel safe. The task keeps running even it's handle been dropped.
// 2. avoid the handler blocks the gRPC runtime incidentally.
let handle = self.runtime.spawn(TRACE_ID.scope(trace_id, async move {
let handle = self.runtime.spawn(async move {
handler.do_query(query, query_ctx).await.map_err(|e| {
if e.status_code().should_log_error() {
logging::error!(e; "Failed to handle request");
@@ -89,7 +88,7 @@ impl GreptimeRequestHandler {
}
e
})
}));
});
handle.await.context(JoinTaskSnafu).map_err(|e| {
timer.record(e.status_code());
@@ -166,7 +165,6 @@ pub(crate) fn create_query_context(header: Option<&RequestHeader>) -> QueryConte
QueryContextBuilder::default()
.current_catalog(catalog.to_string())
.current_schema(schema.to_string())
.try_trace_id(header.map(|h| h.trace_id))
.build()
}

View File

@@ -19,7 +19,9 @@ use api::v1::region::{region_request, RegionRequest, RegionResponse};
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use common_runtime::Runtime;
use common_telemetry::{debug, error, TRACE_ID};
use common_telemetry::tracing::info_span;
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{debug, error};
use snafu::{OptionExt, ResultExt};
use tonic::{Request, Response};
@@ -45,12 +47,14 @@ impl RegionServerRequestHandler {
}
async fn handle(&self, request: RegionRequest) -> Result<RegionResponse> {
let trace_id = request
.header
.context(InvalidQuerySnafu {
reason: "Expecting non-empty region request header.",
})?
.trace_id;
let tracing_context = TracingContext::from_w3c(
&request
.header
.context(InvalidQuerySnafu {
reason: "Expecting non-empty region request header.",
})?
.tracing_context,
);
let query = request.body.context(InvalidQuerySnafu {
reason: "Expecting non-empty region request body.",
})?;
@@ -64,17 +68,21 @@ impl RegionServerRequestHandler {
// - Obtaining a `JoinHandle` to get the panic message (if there's any).
// From its docs, `JoinHandle` is cancel safe. The task keeps running even it's handle been dropped.
// 2. avoid the handler blocks the gRPC runtime incidentally.
let handle = self.runtime.spawn(TRACE_ID.scope(trace_id, async move {
handler.handle(query).await.map_err(|e| {
if e.status_code().should_log_error() {
error!(e; "Failed to handle request");
} else {
// Currently, we still print a debug log.
debug!("Failed to handle request, err: {:?}", e);
}
e
})
}));
let handle = self.runtime.spawn(async move {
handler
.handle(query)
.trace(tracing_context.attach(info_span!("RegionServerRequestHandler::handle")))
.await
.map_err(|e| {
if e.status_code().should_log_error() {
error!(e; "Failed to handle request");
} else {
// Currently, we still print a debug log.
debug!("Failed to handle request, err: {}", e);
}
e
})
});
handle.await.context(JoinTaskSnafu)?
}

View File

@@ -24,7 +24,7 @@ use chrono::{NaiveDate, NaiveDateTime};
use common_catalog::parse_catalog_and_schema_from_db_string;
use common_error::ext::ErrorExt;
use common_query::Output;
use common_telemetry::{error, logging, warn};
use common_telemetry::{error, logging, tracing, warn};
use datatypes::prelude::ConcreteDataType;
use opensrv_mysql::{
AsyncMysqlShim, Column, ErrorKind, InitWriter, ParamParser, ParamValue, QueryResultWriter,
@@ -91,18 +91,14 @@ impl MysqlInstanceShim {
}
}
#[tracing::instrument(skip_all)]
async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
if let Some(output) =
crate::mysql::federated::check(query, query_ctx.clone(), self.session.clone())
{
vec![Ok(output)]
} else {
let trace_id = query_ctx.trace_id();
common_telemetry::TRACE_ID
.scope(trace_id, async move {
self.query_handler.do_query(query, query_ctx).await
})
.await
self.query_handler.do_query(query, query_ctx).await
}
}

View File

@@ -37,8 +37,6 @@ pub struct QueryContext {
current_user: ArcSwap<Option<UserInfoRef>>,
time_zone: Option<TimeZone>,
sql_dialect: Box<dyn Dialect + Send + Sync>,
trace_id: u64,
span_id: u64,
}
impl Display for QueryContext {
@@ -61,18 +59,6 @@ impl From<&RegionRequestHeader> for QueryContext {
current_user: Default::default(),
time_zone: Default::default(),
sql_dialect: Box::new(GreptimeDbDialect {}),
trace_id: value.trace_id,
span_id: value.span_id,
}
}
}
impl From<&QueryContext> for RegionRequestHeader {
fn from(value: &QueryContext) -> Self {
RegionRequestHeader {
trace_id: value.trace_id,
span_id: value.span_id,
dbname: value.get_db_string(),
}
}
}
@@ -142,16 +128,6 @@ impl QueryContext {
pub fn set_current_user(&self, user: Option<UserInfoRef>) {
let _ = self.current_user.swap(Arc::new(user));
}
#[inline]
pub fn trace_id(&self) -> u64 {
self.trace_id
}
#[inline]
pub fn span_id(&self) -> u64 {
self.span_id
}
}
impl QueryContextBuilder {
@@ -170,15 +146,8 @@ impl QueryContextBuilder {
sql_dialect: self
.sql_dialect
.unwrap_or_else(|| Box::new(GreptimeDbDialect {})),
trace_id: self.trace_id.unwrap_or_else(common_telemetry::gen_trace_id),
span_id: self.span_id.unwrap_or_default(),
})
}
pub fn try_trace_id(mut self, trace_id: Option<u64>) -> Self {
self.trace_id = trace_id;
self
}
}
#[derive(Debug)]

View File

@@ -21,7 +21,7 @@ mod writer;
use std::sync::Arc;
use common_telemetry::tracing::log::warn;
use common_telemetry::warn;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
pub use picker::{LeveledTimeWindowPicker, Picker, PickerContext};

View File

@@ -17,8 +17,7 @@ use std::fmt::{Debug, Formatter};
use std::marker::PhantomData;
use std::time::Duration;
use common_telemetry::tracing::log::warn;
use common_telemetry::{debug, error, info};
use common_telemetry::{debug, error, info, warn};
use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
use common_time::Timestamp;

View File

@@ -18,8 +18,7 @@ use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
use std::marker::PhantomData;
use common_telemetry::tracing::warn;
use common_telemetry::{debug, info};
use common_telemetry::{debug, info, warn};
use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
use common_time::Timestamp;

View File

@@ -38,7 +38,7 @@ enum ReservedColumnType {
Sequence,
OpType,
Tsid,
MetricName,
TableId,
}
/// Column id reserved by the engine.
@@ -76,11 +76,11 @@ impl ReservedColumnId {
Self::BASE | ReservedColumnType::Tsid as ColumnId
}
/// Id for storing metric name column.
/// Id for storing logical table id column.
///
/// Used by: metric engine
pub const fn metric_name() -> ColumnId {
Self::BASE | ReservedColumnType::MetricName as ColumnId
pub const fn table_id() -> ColumnId {
Self::BASE | ReservedColumnType::TableId as ColumnId
}
}

View File

@@ -677,7 +677,7 @@ enable = true
enable = true
[frontend.logging]
enable_jaeger_tracing = false
enable_otlp_tracing = false
[frontend.datanode.client]
timeout = "10s"
@@ -736,7 +736,7 @@ num_workers = {}
worker_channel_size = 128
worker_request_batch_size = 64
manifest_checkpoint_distance = 10
manifest_compress_type = "Uncompressed"
manifest_compress_type = "uncompressed"
max_background_jobs = 4
auto_flush_interval = "30m"
global_write_buffer_size = "1GiB"
@@ -750,10 +750,10 @@ sst_write_buffer_size = "8MiB"
[datanode.region_engine.file]
[datanode.logging]
enable_jaeger_tracing = false
enable_otlp_tracing = false
[logging]
enable_jaeger_tracing = false"#,
enable_otlp_tracing = false"#,
store_type,
num_cpus::get() / 2
);

View File

@@ -69,3 +69,39 @@ DROP TABLE host;
Affected Rows: 0
-- Test no primary key and by keyword
CREATE TABLE host (
ts timestamp(3) time index,
host STRING,
val BIGINT,
);
Affected Rows: 0
INSERT INTO TABLE host VALUES
(0, 'host1', 0),
(5000, 'host1', null),
(10000, 'host1', 1),
(15000, 'host1', null),
(20000, 'host1', 2),
(0, 'host2', 3),
(5000, 'host2', null),
(10000, 'host2', 4),
(15000, 'host2', null),
(20000, 'host2', 5);
Affected Rows: 10
SELECT ts, max(val) RANGE '5s' FROM host ALIGN '20s' ORDER BY ts;
+---------------------+----------------------------------+
| ts | MAX(host.val) RANGE 5s FILL NULL |
+---------------------+----------------------------------+
| 1970-01-01T00:00:00 | 3 |
| 1970-01-01T00:00:20 | 5 |
+---------------------+----------------------------------+
DROP TABLE host;
Affected Rows: 0

View File

@@ -34,3 +34,27 @@ SELECT ts, CAST(length(host) as INT64) + 2, max(val) RANGE '5s' FROM host ALIGN
SELECT ts, host, max(val) RANGE '5s' FROM host ALIGN '20s' BY () ORDER BY ts;
DROP TABLE host;
-- Test no primary key and by keyword
CREATE TABLE host (
ts timestamp(3) time index,
host STRING,
val BIGINT,
);
INSERT INTO TABLE host VALUES
(0, 'host1', 0),
(5000, 'host1', null),
(10000, 'host1', 1),
(15000, 'host1', null),
(20000, 'host1', 2),
(0, 'host2', 3),
(5000, 'host2', null),
(10000, 'host2', 4),
(15000, 'host2', null),
(20000, 'host2', 5);
SELECT ts, max(val) RANGE '5s' FROM host ALIGN '20s' ORDER BY ts;
DROP TABLE host;

View File

@@ -35,6 +35,10 @@ SELECT min(val) FROM host ALIGN '5s';
Error: 2000(InvalidSyntax), sql parser error: Illegal Range select, no RANGE keyword found in any SelectItem
SELECT 1 FROM host ALIGN '5s';
Error: 2000(InvalidSyntax), sql parser error: Illegal Range select, no RANGE keyword found in any SelectItem
SELECT min(val) RANGE '10s', max(val) FROM host ALIGN '5s';
Error: 3001(EngineExecuteQuery), No field named "MAX(host.val)". Valid fields are "MIN(host.val) RANGE 10s FILL NULL", host.ts, host.host.

View File

@@ -28,6 +28,8 @@ SELECT min(val) RANGE '5s' FROM host ALIGN 'not_time';
SELECT min(val) FROM host ALIGN '5s';
SELECT 1 FROM host ALIGN '5s';
SELECT min(val) RANGE '10s', max(val) FROM host ALIGN '5s';
SELECT min(val) * 2 RANGE '10s' FROM host ALIGN '5s';