refactor(otlp_metric): make otlp metric compatible with promql (#6543)

* chore: tmp save

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: minor update

* chore: remove metric metadata and introduce shared attrs

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: replace . with _ in metric name

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: minor update & fix tests

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: add legacy mode param to otlp metrics

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update test

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update test & fix

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: add automatically legacy check for otlp metrics

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: fix clippy

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: typos

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: insert table options in compat mode & add test

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: check table options consistency

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update test and add comments

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: minor tags update

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: minor update about scope labeling

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update opts using header & update test

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: minor code refactor

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: fix cr issue

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
This commit is contained in:
shuiyisong
2025-07-30 19:20:03 +08:00
committed by GitHub
parent 1df605ec4b
commit 2b4fb2f32a
23 changed files with 1060 additions and 182 deletions

View File

@@ -39,6 +39,7 @@ common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-version.workspace = true
dashmap.workspace = true
datafusion.workspace = true
datafusion-expr.workspace = true
datanode.workspace = true

View File

@@ -42,6 +42,7 @@ use common_error::ext::{BoxedError, ErrorExt};
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::key::runtime_switch::RuntimeSwitchManager;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
@@ -53,8 +54,10 @@ use common_query::Output;
use common_recordbatch::error::StreamTimeoutSnafu;
use common_recordbatch::RecordBatchStreamWrapper;
use common_telemetry::{debug, error, info, tracing};
use dashmap::DashMap;
use datafusion_expr::LogicalPlan;
use futures::{Stream, StreamExt};
use lazy_static::lazy_static;
use log_store::raft_engine::RaftEngineBackend;
use operator::delete::DeleterRef;
use operator::insert::InserterRef;
@@ -68,11 +71,14 @@ use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
use query::query_engine::DescribeResult;
use query::QueryEngineRef;
use servers::error as server_error;
use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu};
use servers::error::{
self as server_error, AuthSnafu, CommonMetaSnafu, ExecuteQuerySnafu,
OtlpMetricModeIncompatibleSnafu, ParsePromQLSnafu,
};
use servers::interceptor::{
PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
};
use servers::otlp::metrics::legacy_normalize_otlp_name;
use servers::prometheus_handler::PrometheusHandler;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::{Channel, QueryContextRef};
@@ -85,6 +91,7 @@ use sql::statements::statement::Statement;
use sql::statements::tql::Tql;
use sqlparser::ast::ObjectName;
pub use standalone::StandaloneDatanodeManager;
use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
use crate::error::{
self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
@@ -95,6 +102,10 @@ use crate::limiter::LimiterRef;
use crate::slow_query_recorder::SlowQueryRecorder;
use crate::stream_wrapper::CancellableStreamWrapper;
lazy_static! {
static ref OTLP_LEGACY_DEFAULT_VALUE: String = "legacy".to_string();
}
/// The frontend instance contains necessary components, and implements many
/// traits, like [`servers::query_handler::grpc::GrpcQueryHandler`],
/// [`servers::query_handler::sql::SqlQueryHandler`], etc.
@@ -111,6 +122,12 @@ pub struct Instance {
slow_query_recorder: Option<SlowQueryRecorder>,
limiter: Option<LimiterRef>,
process_manager: ProcessManagerRef,
// cache for otlp metrics
// first layer key: db-string
// key: direct input metric name
// value: if runs in legacy mode
otlp_metrics_table_legacy_cache: DashMap<String, DashMap<String, bool>>,
}
impl Instance {
@@ -331,6 +348,112 @@ impl Instance {
.await
.context(TableOperationSnafu)
}
async fn check_otlp_legacy(
&self,
names: &[&String],
ctx: QueryContextRef,
) -> server_error::Result<bool> {
let db_string = ctx.get_db_string();
let cache = self
.otlp_metrics_table_legacy_cache
.entry(db_string)
.or_default();
// check cache
let hit_cache = names
.iter()
.filter_map(|name| cache.get(*name))
.collect::<Vec<_>>();
if !hit_cache.is_empty() {
let hit_legacy = hit_cache.iter().any(|en| *en.value());
let hit_prom = hit_cache.iter().any(|en| !*en.value());
// hit but have true and false, means both legacy and new mode are used
// we cannot handle this case, so return error
// add doc links in err msg later
ensure!(!(hit_legacy && hit_prom), OtlpMetricModeIncompatibleSnafu);
let flag = hit_legacy;
// set cache for all names
names.iter().for_each(|name| {
if !cache.contains_key(*name) {
cache.insert(name.to_string(), flag);
}
});
return Ok(flag);
}
let catalog = ctx.current_catalog();
let schema = ctx.current_schema();
// query legacy table names
let normalized_names = names
.iter()
.map(|n| legacy_normalize_otlp_name(n))
.collect::<Vec<_>>();
let table_names = normalized_names
.iter()
.map(|n| TableNameKey::new(catalog, &schema, n))
.collect::<Vec<_>>();
let table_values = self
.table_metadata_manager()
.table_name_manager()
.batch_get(table_names)
.await
.context(CommonMetaSnafu)?;
let table_ids = table_values
.into_iter()
.filter_map(|v| v.map(|vi| vi.table_id()))
.collect::<Vec<_>>();
// means no existing table is found, use new mode
if table_ids.is_empty() {
// set cache
names.iter().for_each(|name| {
cache.insert(name.to_string(), false);
});
return Ok(false);
}
// has existing table, check table options
let table_infos = self
.table_metadata_manager()
.table_info_manager()
.batch_get(&table_ids)
.await
.context(CommonMetaSnafu)?;
let options = table_infos
.values()
.map(|info| {
info.table_info
.meta
.options
.extra_options
.get(OTLP_METRIC_COMPAT_KEY)
.unwrap_or(&OTLP_LEGACY_DEFAULT_VALUE)
})
.collect::<Vec<_>>();
if !options.is_empty() {
// check value consistency
let has_prom = options.iter().any(|opt| *opt == OTLP_METRIC_COMPAT_PROM);
let has_legacy = options
.iter()
.any(|opt| *opt == OTLP_LEGACY_DEFAULT_VALUE.as_str());
ensure!(!(has_prom && has_legacy), OtlpMetricModeIncompatibleSnafu);
let flag = has_legacy;
names.iter().for_each(|name| {
cache.insert(name.to_string(), flag);
});
Ok(flag)
} else {
// no table info, use new mode
names.iter().for_each(|name| {
cache.insert(name.to_string(), false);
});
Ok(false)
}
}
}
/// If the relevant variables are set, the timeout is enforced for all PostgreSQL statements.

View File

@@ -25,6 +25,7 @@ use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
use dashmap::DashMap;
use operator::delete::Deleter;
use operator::flow::FlowServiceOperator;
use operator::insert::Inserter;
@@ -223,6 +224,7 @@ impl FrontendBuilder {
slow_query_recorder,
limiter,
process_manager,
otlp_metrics_table_legacy_cache: DashMap::new(),
})
}
}

View File

@@ -12,21 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use client::Output;
use common_error::ext::BoxedError;
use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
use common_telemetry::tracing;
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use pipeline::{GreptimePipelineParams, PipelineWay};
use servers::error::{self, AuthSnafu, InFlightWriteBytesExceededSnafu, Result as ServerResult};
use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
use servers::otlp;
use servers::query_handler::{OpenTelemetryProtocolHandler, PipelineHandlerRef};
use session::context::QueryContextRef;
use snafu::ResultExt;
use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
use crate::instance::Instance;
use crate::metrics::{OTLP_LOGS_ROWS, OTLP_METRICS_ROWS, OTLP_TRACES_ROWS};
@@ -50,9 +55,37 @@ impl OpenTelemetryProtocolHandler for Instance {
.get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_execute(ctx.clone())?;
let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request)?;
let input_names = request
.resource_metrics
.iter()
.flat_map(|r| r.scope_metrics.iter())
.flat_map(|s| s.metrics.iter().map(|m| &m.name))
.collect::<Vec<_>>();
// If the user uses OTLP metrics ingestion before v0.16, it uses the old path.
// So we call this path 'legacy'.
// After v0.16, we store the OTLP metrics using prometheus compatible format, the new path.
// The difference is how we convert the input data into the final table schema.
let is_legacy = self.check_otlp_legacy(&input_names, ctx.clone()).await?;
let mut metric_ctx = ctx
.protocol_ctx()
.get_otlp_metric_ctx()
.cloned()
.unwrap_or_default();
metric_ctx.is_legacy = is_legacy;
let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request, &metric_ctx)?;
OTLP_METRICS_ROWS.inc_by(rows as u64);
let ctx = if !is_legacy {
let mut c = (*ctx).clone();
c.set_extension(OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM.to_string());
Arc::new(c)
} else {
ctx
};
let _guard = if let Some(limiter) = &self.limiter {
let result = limiter.limit_row_inserts(&requests);
if result.is_none() {
@@ -63,10 +96,22 @@ impl OpenTelemetryProtocolHandler for Instance {
None
};
self.handle_row_inserts(requests, ctx, false, false)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)
// If the user uses the legacy path, it is by default without metric engine.
if metric_ctx.is_legacy || !metric_ctx.with_metric_engine {
self.handle_row_inserts(requests, ctx, false, false)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)
} else {
let physical_table = ctx
.extension(PHYSICAL_TABLE_PARAM)
.unwrap_or(GREPTIME_PHYSICAL_TABLE)
.to_string();
self.handle_metric_row_inserts(requests, ctx, physical_table.to_string())
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)
}
}
#[tracing::instrument(skip_all)]

View File

@@ -106,7 +106,8 @@ where
}
if opts.otlp.enable {
builder = builder.with_otlp_handler(self.instance.clone());
builder = builder
.with_otlp_handler(self.instance.clone(), opts.prom_store.with_metric_engine);
}
if opts.jaeger.enable {
@@ -158,7 +159,7 @@ where
let grpc_server = builder
.database_handler(greptime_request_handler.clone())
.prometheus_handler(self.instance.clone(), user_provider.clone())
.otel_arrow_handler(OtelArrowServiceHandler(self.instance.clone()))
.otel_arrow_handler(OtelArrowServiceHandler::new(self.instance.clone()))
.flight_handler(Arc::new(greptime_request_handler))
.frontend_grpc_handler(frontend_grpc_handler)
.build();