1use std::sync::Arc;
16
17use api::helper::ColumnDataTypeWrapper;
18use api::v1::{ColumnDataType, RowInsertRequests};
19use async_trait::async_trait;
20use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
21use client::Output;
22use common_error::ext::BoxedError;
23use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
24use common_telemetry::tracing;
25use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
26use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
27use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
28use pipeline::{GreptimePipelineParams, PipelineWay};
29use servers::error::{self, AuthSnafu, Result as ServerResult};
30use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
31use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
32use servers::otlp;
33use servers::otlp::trace::coerce::{
34 coerce_value_data, is_supported_trace_coercion, resolve_new_trace_column_type,
35 trace_value_datatype,
36};
37use servers::query_handler::{OpenTelemetryProtocolHandler, PipelineHandlerRef};
38use session::context::QueryContextRef;
39use snafu::ResultExt;
40use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
41
42use crate::instance::Instance;
43use crate::metrics::{OTLP_LOGS_ROWS, OTLP_METRICS_ROWS, OTLP_TRACES_ROWS};
44
45#[async_trait]
46impl OpenTelemetryProtocolHandler for Instance {
47 #[tracing::instrument(skip_all)]
48 async fn metrics(
49 &self,
50 request: ExportMetricsServiceRequest,
51 ctx: QueryContextRef,
52 ) -> ServerResult<Output> {
53 self.plugins
54 .get::<PermissionCheckerRef>()
55 .as_ref()
56 .check_permission(ctx.current_user(), PermissionReq::Otlp)
57 .context(AuthSnafu)?;
58
59 let interceptor_ref = self
60 .plugins
61 .get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
62 interceptor_ref.pre_execute(ctx.clone())?;
63
64 let input_names = request
65 .resource_metrics
66 .iter()
67 .flat_map(|r| r.scope_metrics.iter())
68 .flat_map(|s| s.metrics.iter().map(|m| &m.name))
69 .collect::<Vec<_>>();
70
71 let is_legacy = self.check_otlp_legacy(&input_names, ctx.clone()).await?;
73
74 let mut metric_ctx = ctx
75 .protocol_ctx()
76 .get_otlp_metric_ctx()
77 .cloned()
78 .unwrap_or_default();
79 metric_ctx.is_legacy = is_legacy;
80
81 let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request, &mut metric_ctx)?;
82 OTLP_METRICS_ROWS.inc_by(rows as u64);
83
84 let ctx = if !is_legacy {
85 let mut c = (*ctx).clone();
86 c.set_extension(OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM.to_string());
87 Arc::new(c)
88 } else {
89 ctx
90 };
91
92 if metric_ctx.is_legacy || !metric_ctx.with_metric_engine {
94 self.handle_row_inserts(requests, ctx, false, false)
95 .await
96 .map_err(BoxedError::new)
97 .context(error::ExecuteGrpcQuerySnafu)
98 } else {
99 let physical_table = ctx
100 .extension(PHYSICAL_TABLE_PARAM)
101 .unwrap_or(GREPTIME_PHYSICAL_TABLE)
102 .to_string();
103 self.handle_metric_row_inserts(requests, ctx, physical_table.clone())
104 .await
105 .map_err(BoxedError::new)
106 .context(error::ExecuteGrpcQuerySnafu)
107 }
108 }
109
110 #[tracing::instrument(skip_all)]
111 async fn traces(
112 &self,
113 pipeline_handler: PipelineHandlerRef,
114 request: ExportTraceServiceRequest,
115 pipeline: PipelineWay,
116 pipeline_params: GreptimePipelineParams,
117 table_name: String,
118 ctx: QueryContextRef,
119 ) -> ServerResult<Output> {
120 self.plugins
121 .get::<PermissionCheckerRef>()
122 .as_ref()
123 .check_permission(ctx.current_user(), PermissionReq::Otlp)
124 .context(AuthSnafu)?;
125
126 let interceptor_ref = self
127 .plugins
128 .get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
129 interceptor_ref.pre_execute(ctx.clone())?;
130
131 let is_trace_v1_model = matches!(pipeline, PipelineWay::OtlpTraceDirectV1);
132
133 let (mut requests, rows) = otlp::trace::to_grpc_insert_requests(
134 request,
135 pipeline,
136 pipeline_params,
137 table_name,
138 &ctx,
139 pipeline_handler,
140 )?;
141
142 OTLP_TRACES_ROWS.inc_by(rows as u64);
143
144 if is_trace_v1_model {
145 self.reconcile_trace_column_types(&mut requests, &ctx)
146 .await?;
147 self.handle_trace_inserts(requests, ctx)
148 .await
149 .map_err(BoxedError::new)
150 .context(error::ExecuteGrpcQuerySnafu)
151 } else {
152 self.handle_log_inserts(requests, ctx)
153 .await
154 .map_err(BoxedError::new)
155 .context(error::ExecuteGrpcQuerySnafu)
156 }
157 }
158
159 #[tracing::instrument(skip_all)]
160 async fn logs(
161 &self,
162 pipeline_handler: PipelineHandlerRef,
163 request: ExportLogsServiceRequest,
164 pipeline: PipelineWay,
165 pipeline_params: GreptimePipelineParams,
166 table_name: String,
167 ctx: QueryContextRef,
168 ) -> ServerResult<Vec<Output>> {
169 self.plugins
170 .get::<PermissionCheckerRef>()
171 .as_ref()
172 .check_permission(ctx.current_user(), PermissionReq::Otlp)
173 .context(AuthSnafu)?;
174
175 let interceptor_ref = self
176 .plugins
177 .get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
178 interceptor_ref.pre_execute(ctx.clone())?;
179
180 let opt_req = otlp::logs::to_grpc_insert_requests(
181 request,
182 pipeline,
183 pipeline_params,
184 table_name,
185 &ctx,
186 pipeline_handler,
187 )
188 .await?;
189
190 let mut outputs = vec![];
191
192 for (temp_ctx, requests) in opt_req.as_req_iter(ctx) {
193 let cnt = requests
194 .inserts
195 .iter()
196 .filter_map(|r| r.rows.as_ref().map(|r| r.rows.len()))
197 .sum::<usize>();
198
199 let o = self
200 .handle_log_inserts(requests, temp_ctx)
201 .await
202 .inspect(|_| OTLP_LOGS_ROWS.inc_by(cnt as u64))
203 .map_err(BoxedError::new)
204 .context(error::ExecuteGrpcQuerySnafu)?;
205 outputs.push(o);
206 }
207
208 Ok(outputs)
209 }
210}
211
212impl Instance {
213 fn choose_trace_target_type(
218 observed_types: &[ColumnDataType],
219 existing_type: Option<ColumnDataType>,
220 ) -> ServerResult<Option<ColumnDataType>> {
221 let Some(existing_type) = existing_type else {
222 return resolve_new_trace_column_type(observed_types.iter().copied()).map_err(|_| {
223 error::InvalidParameterSnafu {
224 reason: "unsupported trace type mix".to_string(),
225 }
226 .build()
227 });
228 };
229
230 if observed_types.iter().copied().all(|request_type| {
231 request_type == existing_type
232 || is_supported_trace_coercion(request_type, existing_type)
233 }) {
234 Ok(Some(existing_type))
235 } else {
236 error::InvalidParameterSnafu {
237 reason: "unsupported trace type mix".to_string(),
238 }
239 .fail()
240 }
241 }
242
243 async fn reconcile_trace_column_types(
247 &self,
248 requests: &mut RowInsertRequests,
249 ctx: &QueryContextRef,
250 ) -> ServerResult<()> {
251 let catalog = ctx.current_catalog();
252 let schema = ctx.current_schema();
253
254 for req in &mut requests.inserts {
255 let table = self
256 .catalog_manager
257 .table(catalog, &schema, &req.table_name, None)
258 .await?;
259
260 let Some(rows) = req.rows.as_mut() else {
261 continue;
262 };
263
264 let table_schema = table.map(|table| table.schema());
265 let mut pending_coercions = Vec::new();
266
267 for (col_idx, col_schema) in rows.schema.iter().enumerate() {
268 let Some(current_type) = ColumnDataType::try_from(col_schema.datatype).ok() else {
269 continue;
270 };
271
272 let mut observed_types = Vec::new();
273 push_observed_trace_type(&mut observed_types, current_type);
274
275 for row in &rows.rows {
278 let Some(value) = row
279 .values
280 .get(col_idx)
281 .and_then(|value| value.value_data.as_ref())
282 else {
283 continue;
284 };
285
286 let Some(value_type) = trace_value_datatype(value) else {
287 continue;
288 };
289 push_observed_trace_type(&mut observed_types, value_type);
290 }
291
292 let existing_type = table_schema
293 .as_ref()
294 .and_then(|schema| schema.column_schema_by_name(&col_schema.column_name))
295 .and_then(|table_col| {
296 ColumnDataTypeWrapper::try_from(table_col.data_type.clone())
297 .ok()
298 .map(|wrapper| wrapper.datatype())
299 });
300
301 if !observed_types
302 .iter()
303 .copied()
304 .any(is_trace_reconcile_candidate_type)
305 && existing_type
306 .map(|datatype| !is_trace_reconcile_candidate_type(datatype))
307 .unwrap_or(true)
308 {
309 continue;
310 }
311
312 let Some(target_type) =
315 Self::choose_trace_target_type(&observed_types, existing_type).map_err(
316 |_| {
317 enrich_trace_reconcile_error(
318 &req.table_name,
319 &col_schema.column_name,
320 &observed_types,
321 existing_type,
322 )
323 },
324 )?
325 else {
326 continue;
327 };
328
329 if observed_types
330 .iter()
331 .all(|observed| *observed == target_type)
332 && col_schema.datatype == target_type as i32
333 {
334 continue;
335 }
336
337 pending_coercions.push((col_idx, target_type, col_schema.column_name.clone()));
338 }
339
340 if pending_coercions.is_empty() {
341 continue;
342 }
343
344 for (col_idx, target_type, ..) in &pending_coercions {
346 rows.schema[*col_idx].datatype = *target_type as i32;
347 }
348
349 for row in &mut rows.rows {
351 for (col_idx, target_type, column_name) in &pending_coercions {
352 let Some(value) = row.values.get_mut(*col_idx) else {
353 continue;
354 };
355 let Some(request_type) =
356 value.value_data.as_ref().and_then(trace_value_datatype)
357 else {
358 continue;
359 };
360 if request_type == *target_type {
361 continue;
362 }
363
364 value.value_data = coerce_value_data(
365 &value.value_data,
366 *target_type,
367 request_type,
368 )
369 .map_err(|_| {
370 error::InvalidParameterSnafu {
371 reason: format!(
372 "failed to coerce trace column '{}' in table '{}' from {:?} to {:?}",
373 column_name, req.table_name, request_type, target_type
374 ),
375 }
376 .build()
377 })?;
378 }
379 }
380 }
381
382 Ok(())
383 }
384}
385
386fn enrich_trace_reconcile_error(
387 table_name: &str,
388 column_name: &str,
389 observed_types: &[ColumnDataType],
390 existing_type: Option<ColumnDataType>,
391) -> servers::error::Error {
392 let observed_types = observed_types
393 .iter()
394 .map(|datatype| format!("{datatype:?}"))
395 .collect::<Vec<_>>()
396 .join(", ");
397
398 error::InvalidParameterSnafu {
399 reason: match existing_type {
400 Some(existing_type) => format!(
401 "failed to reconcile trace column '{}' in table '{}' with observed types [{}] against existing {:?}",
402 column_name, table_name, observed_types, existing_type
403 ),
404 None => format!(
405 "failed to reconcile trace column '{}' in table '{}' with observed types [{}]",
406 column_name, table_name, observed_types
407 ),
408 },
409 }
410 .build()
411}
412
413fn is_trace_reconcile_candidate_type(datatype: ColumnDataType) -> bool {
416 matches!(
417 datatype,
418 ColumnDataType::String
419 | ColumnDataType::Boolean
420 | ColumnDataType::Int64
421 | ColumnDataType::Float64
422 )
423}
424
425fn push_observed_trace_type(observed_types: &mut Vec<ColumnDataType>, datatype: ColumnDataType) {
427 if !observed_types.contains(&datatype) {
428 observed_types.push(datatype);
429 }
430}