feat(otlp): initial OTLP trace support (#2627)

* feat: otlp tracing framework via http

* feat: otlp trace transformer plugin

* feat: successfully write traces into db

* chore: plugin to parse request

* test: helper functions

* feat: parse_request_to_spans function

* chore: remove implicite calling parse in PraceParser

* chore: fix clippy

* chore: add TODO marker for span fields

* refactor TraceParser trait

* refactor TraceParser trait

* table_name method in OTLP TraceParser trait

* fix: approximate row, column count

* chore: function signature without row

* chore: do not clone by moving span.kind upper

* docs for parse and to_grpc_insert_requests

---------

Co-authored-by: fys <fengys1996@gmail.com>
Co-authored-by: fys <40801205+fengys1996@users.noreply.github.com>
This commit is contained in:
yuanbohan
2023-10-23 14:37:43 +08:00
committed by GitHub
parent 0fbde48655
commit 44280f7c9d
11 changed files with 1210 additions and 653 deletions

View File

@@ -87,7 +87,7 @@ meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev =
metrics = "0.20"
moka = "0.12"
once_cell = "1.18"
opentelemetry-proto = { version = "0.2", features = ["gen-tonic", "metrics"] }
opentelemetry-proto = { version = "0.2", features = ["gen-tonic", "metrics", "traces"] }
parquet = "43.0"
paste = "1.0"
prost = "0.11"

View File

@@ -19,14 +19,18 @@ use metrics::counter;
use opentelemetry_proto::tonic::collector::metrics::v1::{
ExportMetricsServiceRequest, ExportMetricsServiceResponse,
};
use opentelemetry_proto::tonic::collector::trace::v1::{
ExportTraceServiceRequest, ExportTraceServiceResponse,
};
use servers::error::{self, AuthSnafu, Result as ServerResult};
use servers::otlp;
use servers::otlp::plugin::TraceParserRef;
use servers::query_handler::OpenTelemetryProtocolHandler;
use session::context::QueryContextRef;
use snafu::ResultExt;
use crate::instance::Instance;
use crate::metrics::OTLP_METRICS_ROWS;
use crate::metrics::{OTLP_METRICS_ROWS, OTLP_TRACES_ROWS};
#[async_trait]
impl OpenTelemetryProtocolHandler for Instance {
@@ -40,7 +44,7 @@ impl OpenTelemetryProtocolHandler for Instance {
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::Otlp)
.context(AuthSnafu)?;
let (requests, rows) = otlp::to_grpc_insert_requests(request)?;
let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request)?;
let _ = self
.handle_row_inserts(requests, ctx)
.await
@@ -55,4 +59,40 @@ impl OpenTelemetryProtocolHandler for Instance {
};
Ok(resp)
}
async fn traces(
&self,
request: ExportTraceServiceRequest,
ctx: QueryContextRef,
) -> ServerResult<ExportTraceServiceResponse> {
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::Otlp)
.context(AuthSnafu)?;
let (table_name, spans) = match self.plugins.get::<TraceParserRef>() {
Some(parser) => (parser.table_name(), parser.parse(request)),
None => (
otlp::trace::TRACE_TABLE_NAME.to_string(),
otlp::trace::parse(request),
),
};
let (requests, rows) = otlp::trace::to_grpc_insert_requests(table_name, spans)?;
let _ = self
.handle_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;
counter!(OTLP_TRACES_ROWS, rows as u64);
let resp = ExportTraceServiceResponse {
// TODO(fys): add support for partial_success in future patch
partial_success: None,
};
Ok(resp)
}
}

View File

@@ -22,3 +22,4 @@ pub(crate) const METRIC_RUN_SCRIPT_ELAPSED: &str = "frontend.run_script_elapsed"
pub const PROM_STORE_REMOTE_WRITE_SAMPLES: &str = "frontend.prometheus.remote_write.samples";
pub const OTLP_METRICS_ROWS: &str = "frontend.otlp.metrics.rows";
pub const OTLP_TRACES_ROWS: &str = "frontend.otlp.traces.rows";

View File

@@ -660,6 +660,7 @@ impl HttpServer {
fn route_otlp<S>(&self, otlp_handler: OpenTelemetryProtocolHandlerRef) -> Router<S> {
Router::new()
.route("/v1/metrics", routing::post(otlp::metrics))
.route("/v1/traces", routing::post(otlp::traces))
.with_state(otlp_handler)
}

View File

@@ -21,6 +21,9 @@ use hyper::Body;
use opentelemetry_proto::tonic::collector::metrics::v1::{
ExportMetricsServiceRequest, ExportMetricsServiceResponse,
};
use opentelemetry_proto::tonic::collector::trace::v1::{
ExportTraceServiceRequest, ExportTraceServiceResponse,
};
use prost::Message;
use session::context::QueryContextRef;
use snafu::prelude::*;
@@ -33,16 +36,19 @@ pub async fn metrics(
State(handler): State<OpenTelemetryProtocolHandlerRef>,
Extension(query_ctx): Extension<QueryContextRef>,
RawBody(body): RawBody,
) -> Result<OtlpResponse> {
) -> Result<OtlpMetricsResponse> {
let _timer = timer!(
crate::metrics::METRIC_HTTP_OPENTELEMETRY_ELAPSED,
crate::metrics::METRIC_HTTP_OPENTELEMETRY_METRICS_ELAPSED,
&[(crate::metrics::METRIC_DB_LABEL, query_ctx.get_db_string())]
);
let request = parse_body(body).await?;
handler.metrics(request, query_ctx).await.map(OtlpResponse)
let request = parse_metrics_body(body).await?;
handler
.metrics(request, query_ctx)
.await
.map(OtlpMetricsResponse)
}
async fn parse_body(body: Body) -> Result<ExportMetricsServiceRequest> {
async fn parse_metrics_body(body: Body) -> Result<ExportMetricsServiceRequest> {
hyper::body::to_bytes(body)
.await
.context(error::HyperSnafu)
@@ -51,9 +57,47 @@ async fn parse_body(body: Body) -> Result<ExportMetricsServiceRequest> {
})
}
pub struct OtlpResponse(ExportMetricsServiceResponse);
pub struct OtlpMetricsResponse(ExportMetricsServiceResponse);
impl IntoResponse for OtlpResponse {
impl IntoResponse for OtlpMetricsResponse {
fn into_response(self) -> axum::response::Response {
(
[(header::CONTENT_TYPE, "application/x-protobuf")],
self.0.encode_to_vec(),
)
.into_response()
}
}
#[axum_macros::debug_handler]
pub async fn traces(
State(handler): State<OpenTelemetryProtocolHandlerRef>,
Extension(query_ctx): Extension<QueryContextRef>,
RawBody(body): RawBody,
) -> Result<OtlpTracesResponse> {
let _timer = timer!(
crate::metrics::METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED,
&[(crate::metrics::METRIC_DB_LABEL, query_ctx.get_db_string())]
);
let request = parse_traces_body(body).await?;
handler
.traces(request, query_ctx)
.await
.map(OtlpTracesResponse)
}
async fn parse_traces_body(body: Body) -> Result<ExportTraceServiceRequest> {
hyper::body::to_bytes(body)
.await
.context(error::HyperSnafu)
.and_then(|buf| {
ExportTraceServiceRequest::decode(&buf[..]).context(error::DecodeOtlpRequestSnafu)
})
}
pub struct OtlpTracesResponse(ExportTraceServiceResponse);
impl IntoResponse for OtlpTracesResponse {
fn into_response(self) -> axum::response::Response {
(
[(header::CONTENT_TYPE, "application/x-protobuf")],

View File

@@ -37,7 +37,10 @@ pub(crate) const METRIC_HTTP_INFLUXDB_WRITE_ELAPSED: &str = "servers.http_influx
pub(crate) const METRIC_HTTP_PROM_STORE_WRITE_ELAPSED: &str =
"servers.http_prometheus_write_elapsed";
pub(crate) const METRIC_HTTP_PROM_STORE_READ_ELAPSED: &str = "servers.http_prometheus_read_elapsed";
pub(crate) const METRIC_HTTP_OPENTELEMETRY_ELAPSED: &str = "servers.http_otlp_elapsed";
pub(crate) const METRIC_HTTP_OPENTELEMETRY_METRICS_ELAPSED: &str =
"servers.http_otlp_metrics_elapsed";
pub(crate) const METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED: &str =
"servers.http_otlp_traces_elapsed";
pub(crate) const METRIC_TCP_OPENTSDB_LINE_WRITE_ELAPSED: &str =
"servers.opentsdb_line_write_elapsed";
pub(crate) const METRIC_HTTP_PROMQL_INSTANT_QUERY_ELAPSED: &str =

View File

@@ -12,649 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::{RowInsertRequests, Value};
use common_grpc::writer::Precision;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::common::v1::{any_value, KeyValue};
use opentelemetry_proto::tonic::metrics::v1::{metric, number_data_point, *};
use crate::error::Result;
use crate::row_writer::{self, MultiTableData, TableData};
pub mod metrics;
pub mod plugin;
pub mod trace;
const GREPTIME_TIMESTAMP: &str = "greptime_timestamp";
const GREPTIME_VALUE: &str = "greptime_value";
const GREPTIME_COUNT: &str = "greptime_count";
/// the default column count for table writer
const APPROXIMATE_COLUMN_COUNT: usize = 8;
/// Normalize otlp instrumentation, metric and attribute names
///
/// <https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#instrument-name-syntax>
/// - since the name are case-insensitive, we transform them to lowercase for
/// better sql usability
/// - replace `.` and `-` with `_`
fn normalize_otlp_name(name: &str) -> String {
name.to_lowercase().replace(|c| c == '.' || c == '-', "_")
}
/// Convert OpenTelemetry metrics to GreptimeDB insert requests
///
/// See
/// <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/metrics/v1/metrics.proto#L162>
/// for data structure of OTLP metrics.
///
/// Returns `InsertRequests` and total number of rows to ingest
pub fn to_grpc_insert_requests(
request: ExportMetricsServiceRequest,
) -> Result<(RowInsertRequests, usize)> {
let mut table_writer = MultiTableData::default();
for resource in &request.resource_metrics {
let resource_attrs = resource.resource.as_ref().map(|r| &r.attributes);
for scope in &resource.scope_metrics {
let scope_attrs = scope.scope.as_ref().map(|s| &s.attributes);
for metric in &scope.metrics {
encode_metrics(&mut table_writer, metric, resource_attrs, scope_attrs)?;
}
}
}
Ok(table_writer.into_row_insert_requests())
}
fn encode_metrics(
table_writer: &mut MultiTableData,
metric: &Metric,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<()> {
let name = &metric.name;
// note that we don't store description or unit, we might want to deal with
// these fields in the future.
if let Some(data) = &metric.data {
match data {
metric::Data::Gauge(gauge) => {
encode_gauge(table_writer, name, gauge, resource_attrs, scope_attrs)?;
}
metric::Data::Sum(sum) => {
encode_sum(table_writer, name, sum, resource_attrs, scope_attrs)?;
}
metric::Data::Summary(summary) => {
encode_summary(table_writer, name, summary, resource_attrs, scope_attrs)?;
}
metric::Data::Histogram(hist) => {
encode_histogram(table_writer, name, hist, resource_attrs, scope_attrs)?;
}
// TODO(sunng87) leave ExponentialHistogram for next release
metric::Data::ExponentialHistogram(_hist) => {}
}
}
Ok(())
}
fn write_attributes(
writer: &mut TableData,
row: &mut Vec<Value>,
attrs: Option<&Vec<KeyValue>>,
) -> Result<()> {
if let Some(attrs) = attrs {
let table_tags = attrs.iter().filter_map(|attr| {
if let Some(val) = attr.value.as_ref().and_then(|v| v.value.as_ref()) {
let key = normalize_otlp_name(&attr.key);
match val {
any_value::Value::StringValue(s) => Some((key, s.to_string())),
any_value::Value::IntValue(v) => Some((key, v.to_string())),
any_value::Value::DoubleValue(v) => Some((key, v.to_string())),
_ => None, // TODO(sunng87): allow different type of values
}
} else {
None
}
});
row_writer::write_tags(writer, table_tags, row)?;
}
Ok(())
}
fn write_timestamp(table: &mut TableData, row: &mut Vec<Value>, time_nano: i64) -> Result<()> {
row_writer::write_ts_precision(
table,
GREPTIME_TIMESTAMP,
Some(time_nano),
Precision::Nanosecond,
row,
)
}
fn write_data_point_value(
table: &mut TableData,
row: &mut Vec<Value>,
field: &str,
value: &Option<number_data_point::Value>,
) -> Result<()> {
match value {
Some(number_data_point::Value::AsInt(val)) => {
// we coerce all values to f64
row_writer::write_f64(table, field, *val as f64, row)?;
}
Some(number_data_point::Value::AsDouble(val)) => {
row_writer::write_f64(table, field, *val, row)?;
}
_ => {}
}
Ok(())
}
fn write_tags_and_timestamp(
table: &mut TableData,
row: &mut Vec<Value>,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
data_point_attrs: Option<&Vec<KeyValue>>,
timestamp_nanos: i64,
) -> Result<()> {
write_attributes(table, row, resource_attrs)?;
write_attributes(table, row, scope_attrs)?;
write_attributes(table, row, data_point_attrs)?;
write_timestamp(table, row, timestamp_nanos)?;
Ok(())
}
/// encode this gauge metric
///
/// note that there can be multiple data points in the request, it's going to be
/// stored as multiple rows
fn encode_gauge(
table_writer: &mut MultiTableData,
name: &str,
gauge: &Gauge,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<()> {
let table = table_writer.get_or_default_table_data(
&normalize_otlp_name(name),
APPROXIMATE_COLUMN_COUNT,
gauge.data_points.len(),
);
for data_point in &gauge.data_points {
let mut row = table.alloc_one_row();
write_tags_and_timestamp(
table,
&mut row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?;
table.add_row(row);
}
Ok(())
}
/// encode this sum metric
///
/// `aggregation_temporality` and `monotonic` are ignored for now
fn encode_sum(
table_writer: &mut MultiTableData,
name: &str,
sum: &Sum,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<()> {
let table = table_writer.get_or_default_table_data(
&normalize_otlp_name(name),
APPROXIMATE_COLUMN_COUNT,
sum.data_points.len(),
);
for data_point in &sum.data_points {
let mut row = table.alloc_one_row();
write_tags_and_timestamp(
table,
&mut row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?;
table.add_row(row);
}
Ok(())
}
const HISTOGRAM_LE_COLUMN: &str = "le";
/// Encode histogram data. This function returns 3 insert requests for 3 tables.
///
/// The implementation has been following Prometheus histogram table format:
///
/// - A `%metric%_bucket` table including `greptime_le` tag that stores bucket upper
/// limit, and `greptime_value` for bucket count
/// - A `%metric%_sum` table storing sum of samples
/// - A `%metric%_count` table storing count of samples.
///
/// By its Prometheus compatibility, we hope to be able to use prometheus
/// quantile functions on this table.
fn encode_histogram(
table_writer: &mut MultiTableData,
name: &str,
hist: &Histogram,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<()> {
let normalized_name = normalize_otlp_name(name);
let bucket_table_name = format!("{}_bucket", normalized_name);
let sum_table_name = format!("{}_sum", normalized_name);
let count_table_name = format!("{}_count", normalized_name);
let data_points_len = hist.data_points.len();
// Note that the row and columns number here is approximate
let mut bucket_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len * 3);
let mut sum_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
let mut count_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
for data_point in &hist.data_points {
let mut accumulated_count = 0;
for (idx, count) in data_point.bucket_counts.iter().enumerate() {
let mut bucket_row = bucket_table.alloc_one_row();
write_tags_and_timestamp(
&mut bucket_table,
&mut bucket_row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
if let Some(upper_bounds) = data_point.explicit_bounds.get(idx) {
row_writer::write_tag(
&mut bucket_table,
HISTOGRAM_LE_COLUMN,
upper_bounds,
&mut bucket_row,
)?;
} else if idx == data_point.explicit_bounds.len() {
// The last bucket
row_writer::write_tag(
&mut bucket_table,
HISTOGRAM_LE_COLUMN,
f64::INFINITY,
&mut bucket_row,
)?;
}
accumulated_count += count;
row_writer::write_f64(
&mut bucket_table,
GREPTIME_VALUE,
accumulated_count as f64,
&mut bucket_row,
)?;
bucket_table.add_row(bucket_row);
}
if let Some(sum) = data_point.sum {
let mut sum_row = sum_table.alloc_one_row();
write_tags_and_timestamp(
&mut sum_table,
&mut sum_row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
row_writer::write_f64(&mut sum_table, GREPTIME_VALUE, sum, &mut sum_row)?;
sum_table.add_row(sum_row);
}
let mut count_row = count_table.alloc_one_row();
write_tags_and_timestamp(
&mut count_table,
&mut count_row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
row_writer::write_f64(
&mut count_table,
GREPTIME_VALUE,
data_point.count as f64,
&mut count_row,
)?;
count_table.add_row(count_row);
}
table_writer.add_table_data(bucket_table_name, bucket_table);
table_writer.add_table_data(sum_table_name, sum_table);
table_writer.add_table_data(count_table_name, count_table);
Ok(())
}
#[allow(dead_code)]
fn encode_exponential_histogram(_name: &str, _hist: &ExponentialHistogram) -> Result<()> {
// TODO(sunng87): implement this using a prometheus compatible way
Ok(())
}
fn encode_summary(
table_writer: &mut MultiTableData,
name: &str,
summary: &Summary,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<()> {
let table = table_writer.get_or_default_table_data(
&normalize_otlp_name(name),
APPROXIMATE_COLUMN_COUNT,
summary.data_points.len(),
);
for data_point in &summary.data_points {
let mut row = table.alloc_one_row();
write_tags_and_timestamp(
table,
&mut row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
for quantile in &data_point.quantile_values {
row_writer::write_f64(
table,
&format!("greptime_p{:02}", quantile.quantile * 100f64),
quantile.value,
&mut row,
)?;
}
row_writer::write_f64(table, GREPTIME_COUNT, data_point.count as f64, &mut row)?;
table.add_row(row);
}
Ok(())
}
#[cfg(test)]
mod tests {
use opentelemetry_proto::tonic::common::v1::any_value::Value as Val;
use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue};
use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value;
use opentelemetry_proto::tonic::metrics::v1::summary_data_point::ValueAtQuantile;
use opentelemetry_proto::tonic::metrics::v1::{HistogramDataPoint, NumberDataPoint};
use super::*;
#[test]
fn test_normalize_otlp_name() {
assert_eq!(normalize_otlp_name("jvm.memory.free"), "jvm_memory_free");
assert_eq!(normalize_otlp_name("jvm-memory-free"), "jvm_memory_free");
assert_eq!(normalize_otlp_name("jvm_memory_free"), "jvm_memory_free");
assert_eq!(normalize_otlp_name("JVM_MEMORY_FREE"), "jvm_memory_free");
assert_eq!(normalize_otlp_name("JVM_memory_FREE"), "jvm_memory_free");
}
fn keyvalue(key: &str, value: &str) -> KeyValue {
KeyValue {
key: key.into(),
value: Some(AnyValue {
value: Some(Val::StringValue(value.into())),
}),
}
}
#[test]
fn test_encode_gauge() {
let mut tables = MultiTableData::default();
let data_points = vec![
NumberDataPoint {
attributes: vec![keyvalue("host", "testsevrer")],
time_unix_nano: 100,
value: Some(Value::AsInt(100)),
..Default::default()
},
NumberDataPoint {
attributes: vec![keyvalue("host", "testserver")],
time_unix_nano: 105,
value: Some(Value::AsInt(105)),
..Default::default()
},
];
let gauge = Gauge { data_points };
encode_gauge(
&mut tables,
"datamon",
&gauge,
Some(&vec![keyvalue("resource", "app")]),
Some(&vec![keyvalue("scope", "otel")]),
)
.unwrap();
let table = tables.get_or_default_table_data("datamon", 0, 0);
assert_eq!(table.num_rows(), 2);
assert_eq!(table.num_columns(), 5);
assert_eq!(
table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"greptime_value"
]
);
}
#[test]
fn test_encode_sum() {
let mut tables = MultiTableData::default();
let data_points = vec![
NumberDataPoint {
attributes: vec![keyvalue("host", "testserver")],
time_unix_nano: 100,
value: Some(Value::AsInt(100)),
..Default::default()
},
NumberDataPoint {
attributes: vec![keyvalue("host", "testserver")],
time_unix_nano: 105,
value: Some(Value::AsInt(0)),
..Default::default()
},
];
let sum = Sum {
data_points,
..Default::default()
};
encode_sum(
&mut tables,
"datamon",
&sum,
Some(&vec![keyvalue("resource", "app")]),
Some(&vec![keyvalue("scope", "otel")]),
)
.unwrap();
let table = tables.get_or_default_table_data("datamon", 0, 0);
assert_eq!(table.num_rows(), 2);
assert_eq!(table.num_columns(), 5);
assert_eq!(
table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"greptime_value"
]
);
}
#[test]
fn test_encode_summary() {
let mut tables = MultiTableData::default();
let data_points = vec![SummaryDataPoint {
attributes: vec![keyvalue("host", "testserver")],
time_unix_nano: 100,
count: 25,
sum: 5400.0,
quantile_values: vec![
ValueAtQuantile {
quantile: 0.90,
value: 1000.0,
},
ValueAtQuantile {
quantile: 0.95,
value: 3030.0,
},
],
..Default::default()
}];
let summary = Summary { data_points };
encode_summary(
&mut tables,
"datamon",
&summary,
Some(&vec![keyvalue("resource", "app")]),
Some(&vec![keyvalue("scope", "otel")]),
)
.unwrap();
let table = tables.get_or_default_table_data("datamon", 0, 0);
assert_eq!(table.num_rows(), 1);
assert_eq!(table.num_columns(), 7);
assert_eq!(
table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"greptime_p90",
"greptime_p95",
"greptime_count"
]
);
}
#[test]
fn test_encode_histogram() {
let mut tables = MultiTableData::default();
let data_points = vec![HistogramDataPoint {
attributes: vec![keyvalue("host", "testserver")],
time_unix_nano: 100,
start_time_unix_nano: 23,
count: 25,
sum: Some(100.),
max: Some(200.),
min: Some(0.03),
bucket_counts: vec![2, 4, 6, 9, 4],
explicit_bounds: vec![0.1, 1., 10., 100.],
..Default::default()
}];
let histogram = Histogram {
data_points,
aggregation_temporality: AggregationTemporality::Delta.into(),
};
encode_histogram(
&mut tables,
"histo",
&histogram,
Some(&vec![keyvalue("resource", "app")]),
Some(&vec![keyvalue("scope", "otel")]),
)
.unwrap();
assert_eq!(3, tables.num_tables());
// bucket table
let bucket_table = tables.get_or_default_table_data("histo_bucket", 0, 0);
assert_eq!(bucket_table.num_rows(), 5);
assert_eq!(bucket_table.num_columns(), 6);
assert_eq!(
bucket_table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"le",
"greptime_value",
]
);
let sum_table = tables.get_or_default_table_data("histo_sum", 0, 0);
assert_eq!(sum_table.num_rows(), 1);
assert_eq!(sum_table.num_columns(), 5);
assert_eq!(
sum_table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"greptime_value",
]
);
let count_table = tables.get_or_default_table_data("histo_count", 0, 0);
assert_eq!(count_table.num_rows(), 1);
assert_eq!(count_table.num_columns(), 5);
assert_eq!(
count_table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"greptime_value",
]
);
}
}

View File

@@ -0,0 +1,658 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::{RowInsertRequests, Value};
use common_grpc::writer::Precision;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::common::v1::{any_value, KeyValue};
use opentelemetry_proto::tonic::metrics::v1::{metric, number_data_point, *};
use super::{GREPTIME_COUNT, GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use crate::error::Result;
use crate::row_writer::{self, MultiTableData, TableData};
/// the default column count for table writer
const APPROXIMATE_COLUMN_COUNT: usize = 8;
/// Normalize otlp instrumentation, metric and attribute names
///
/// <https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#instrument-name-syntax>
/// - since the name are case-insensitive, we transform them to lowercase for
/// better sql usability
/// - replace `.` and `-` with `_`
fn normalize_otlp_name(name: &str) -> String {
name.to_lowercase().replace(|c| c == '.' || c == '-', "_")
}
/// Convert OpenTelemetry metrics to GreptimeDB insert requests
///
/// See
/// <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/metrics/v1/metrics.proto>
/// for data structure of OTLP metrics.
///
/// Returns `InsertRequests` and total number of rows to ingest
pub fn to_grpc_insert_requests(
request: ExportMetricsServiceRequest,
) -> Result<(RowInsertRequests, usize)> {
let mut table_writer = MultiTableData::default();
for resource in &request.resource_metrics {
let resource_attrs = resource.resource.as_ref().map(|r| &r.attributes);
for scope in &resource.scope_metrics {
let scope_attrs = scope.scope.as_ref().map(|s| &s.attributes);
for metric in &scope.metrics {
encode_metrics(&mut table_writer, metric, resource_attrs, scope_attrs)?;
}
}
}
Ok(table_writer.into_row_insert_requests())
}
fn encode_metrics(
table_writer: &mut MultiTableData,
metric: &Metric,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<()> {
let name = &metric.name;
// note that we don't store description or unit, we might want to deal with
// these fields in the future.
if let Some(data) = &metric.data {
match data {
metric::Data::Gauge(gauge) => {
encode_gauge(table_writer, name, gauge, resource_attrs, scope_attrs)?;
}
metric::Data::Sum(sum) => {
encode_sum(table_writer, name, sum, resource_attrs, scope_attrs)?;
}
metric::Data::Summary(summary) => {
encode_summary(table_writer, name, summary, resource_attrs, scope_attrs)?;
}
metric::Data::Histogram(hist) => {
encode_histogram(table_writer, name, hist, resource_attrs, scope_attrs)?;
}
// TODO(sunng87) leave ExponentialHistogram for next release
metric::Data::ExponentialHistogram(_hist) => {}
}
}
Ok(())
}
fn write_attributes(
writer: &mut TableData,
row: &mut Vec<Value>,
attrs: Option<&Vec<KeyValue>>,
) -> Result<()> {
if let Some(attrs) = attrs {
let table_tags = attrs.iter().filter_map(|attr| {
if let Some(val) = attr.value.as_ref().and_then(|v| v.value.as_ref()) {
let key = normalize_otlp_name(&attr.key);
match val {
any_value::Value::StringValue(s) => Some((key, s.to_string())),
any_value::Value::IntValue(v) => Some((key, v.to_string())),
any_value::Value::DoubleValue(v) => Some((key, v.to_string())),
_ => None, // TODO(sunng87): allow different type of values
}
} else {
None
}
});
row_writer::write_tags(writer, table_tags, row)?;
}
Ok(())
}
fn write_timestamp(table: &mut TableData, row: &mut Vec<Value>, time_nano: i64) -> Result<()> {
row_writer::write_ts_precision(
table,
GREPTIME_TIMESTAMP,
Some(time_nano),
Precision::Nanosecond,
row,
)
}
fn write_data_point_value(
table: &mut TableData,
row: &mut Vec<Value>,
field: &str,
value: &Option<number_data_point::Value>,
) -> Result<()> {
match value {
Some(number_data_point::Value::AsInt(val)) => {
// we coerce all values to f64
row_writer::write_f64(table, field, *val as f64, row)?;
}
Some(number_data_point::Value::AsDouble(val)) => {
row_writer::write_f64(table, field, *val, row)?;
}
_ => {}
}
Ok(())
}
fn write_tags_and_timestamp(
table: &mut TableData,
row: &mut Vec<Value>,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
data_point_attrs: Option<&Vec<KeyValue>>,
timestamp_nanos: i64,
) -> Result<()> {
write_attributes(table, row, resource_attrs)?;
write_attributes(table, row, scope_attrs)?;
write_attributes(table, row, data_point_attrs)?;
write_timestamp(table, row, timestamp_nanos)?;
Ok(())
}
/// encode this gauge metric
///
/// note that there can be multiple data points in the request, it's going to be
/// stored as multiple rows
fn encode_gauge(
table_writer: &mut MultiTableData,
name: &str,
gauge: &Gauge,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<()> {
let table = table_writer.get_or_default_table_data(
&normalize_otlp_name(name),
APPROXIMATE_COLUMN_COUNT,
gauge.data_points.len(),
);
for data_point in &gauge.data_points {
let mut row = table.alloc_one_row();
write_tags_and_timestamp(
table,
&mut row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?;
table.add_row(row);
}
Ok(())
}
/// encode this sum metric
///
/// `aggregation_temporality` and `monotonic` are ignored for now
fn encode_sum(
table_writer: &mut MultiTableData,
name: &str,
sum: &Sum,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<()> {
let table = table_writer.get_or_default_table_data(
&normalize_otlp_name(name),
APPROXIMATE_COLUMN_COUNT,
sum.data_points.len(),
);
for data_point in &sum.data_points {
let mut row = table.alloc_one_row();
write_tags_and_timestamp(
table,
&mut row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?;
table.add_row(row);
}
Ok(())
}
const HISTOGRAM_LE_COLUMN: &str = "le";
/// Encode histogram data. This function returns 3 insert requests for 3 tables.
///
/// The implementation has been following Prometheus histogram table format:
///
/// - A `%metric%_bucket` table including `greptime_le` tag that stores bucket upper
/// limit, and `greptime_value` for bucket count
/// - A `%metric%_sum` table storing sum of samples
/// - A `%metric%_count` table storing count of samples.
///
/// By its Prometheus compatibility, we hope to be able to use prometheus
/// quantile functions on this table.
fn encode_histogram(
table_writer: &mut MultiTableData,
name: &str,
hist: &Histogram,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<()> {
let normalized_name = normalize_otlp_name(name);
let bucket_table_name = format!("{}_bucket", normalized_name);
let sum_table_name = format!("{}_sum", normalized_name);
let count_table_name = format!("{}_count", normalized_name);
let data_points_len = hist.data_points.len();
// Note that the row and columns number here is approximate
let mut bucket_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len * 3);
let mut sum_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
let mut count_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
for data_point in &hist.data_points {
let mut accumulated_count = 0;
for (idx, count) in data_point.bucket_counts.iter().enumerate() {
let mut bucket_row = bucket_table.alloc_one_row();
write_tags_and_timestamp(
&mut bucket_table,
&mut bucket_row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
if let Some(upper_bounds) = data_point.explicit_bounds.get(idx) {
row_writer::write_tag(
&mut bucket_table,
HISTOGRAM_LE_COLUMN,
upper_bounds,
&mut bucket_row,
)?;
} else if idx == data_point.explicit_bounds.len() {
// The last bucket
row_writer::write_tag(
&mut bucket_table,
HISTOGRAM_LE_COLUMN,
f64::INFINITY,
&mut bucket_row,
)?;
}
accumulated_count += count;
row_writer::write_f64(
&mut bucket_table,
GREPTIME_VALUE,
accumulated_count as f64,
&mut bucket_row,
)?;
bucket_table.add_row(bucket_row);
}
if let Some(sum) = data_point.sum {
let mut sum_row = sum_table.alloc_one_row();
write_tags_and_timestamp(
&mut sum_table,
&mut sum_row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
row_writer::write_f64(&mut sum_table, GREPTIME_VALUE, sum, &mut sum_row)?;
sum_table.add_row(sum_row);
}
let mut count_row = count_table.alloc_one_row();
write_tags_and_timestamp(
&mut count_table,
&mut count_row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
row_writer::write_f64(
&mut count_table,
GREPTIME_VALUE,
data_point.count as f64,
&mut count_row,
)?;
count_table.add_row(count_row);
}
table_writer.add_table_data(bucket_table_name, bucket_table);
table_writer.add_table_data(sum_table_name, sum_table);
table_writer.add_table_data(count_table_name, count_table);
Ok(())
}
#[allow(dead_code)]
fn encode_exponential_histogram(_name: &str, _hist: &ExponentialHistogram) -> Result<()> {
// TODO(sunng87): implement this using a prometheus compatible way
Ok(())
}
fn encode_summary(
table_writer: &mut MultiTableData,
name: &str,
summary: &Summary,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<()> {
let table = table_writer.get_or_default_table_data(
&normalize_otlp_name(name),
APPROXIMATE_COLUMN_COUNT,
summary.data_points.len(),
);
for data_point in &summary.data_points {
let mut row = table.alloc_one_row();
write_tags_and_timestamp(
table,
&mut row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
for quantile in &data_point.quantile_values {
row_writer::write_f64(
table,
&format!("greptime_p{:02}", quantile.quantile * 100f64),
quantile.value,
&mut row,
)?;
}
row_writer::write_f64(table, GREPTIME_COUNT, data_point.count as f64, &mut row)?;
table.add_row(row);
}
Ok(())
}
#[cfg(test)]
mod tests {
use opentelemetry_proto::tonic::common::v1::any_value::Value as Val;
use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue};
use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value;
use opentelemetry_proto::tonic::metrics::v1::summary_data_point::ValueAtQuantile;
use opentelemetry_proto::tonic::metrics::v1::{HistogramDataPoint, NumberDataPoint};
use super::*;
#[test]
fn test_normalize_otlp_name() {
assert_eq!(normalize_otlp_name("jvm.memory.free"), "jvm_memory_free");
assert_eq!(normalize_otlp_name("jvm-memory-free"), "jvm_memory_free");
assert_eq!(normalize_otlp_name("jvm_memory_free"), "jvm_memory_free");
assert_eq!(normalize_otlp_name("JVM_MEMORY_FREE"), "jvm_memory_free");
assert_eq!(normalize_otlp_name("JVM_memory_FREE"), "jvm_memory_free");
}
fn keyvalue(key: &str, value: &str) -> KeyValue {
KeyValue {
key: key.into(),
value: Some(AnyValue {
value: Some(Val::StringValue(value.into())),
}),
}
}
#[test]
fn test_encode_gauge() {
let mut tables = MultiTableData::default();
let data_points = vec![
NumberDataPoint {
attributes: vec![keyvalue("host", "testsevrer")],
time_unix_nano: 100,
value: Some(Value::AsInt(100)),
..Default::default()
},
NumberDataPoint {
attributes: vec![keyvalue("host", "testserver")],
time_unix_nano: 105,
value: Some(Value::AsInt(105)),
..Default::default()
},
];
let gauge = Gauge { data_points };
encode_gauge(
&mut tables,
"datamon",
&gauge,
Some(&vec![keyvalue("resource", "app")]),
Some(&vec![keyvalue("scope", "otel")]),
)
.unwrap();
let table = tables.get_or_default_table_data("datamon", 0, 0);
assert_eq!(table.num_rows(), 2);
assert_eq!(table.num_columns(), 5);
assert_eq!(
table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"greptime_value"
]
);
}
#[test]
fn test_encode_sum() {
let mut tables = MultiTableData::default();
let data_points = vec![
NumberDataPoint {
attributes: vec![keyvalue("host", "testserver")],
time_unix_nano: 100,
value: Some(Value::AsInt(100)),
..Default::default()
},
NumberDataPoint {
attributes: vec![keyvalue("host", "testserver")],
time_unix_nano: 105,
value: Some(Value::AsInt(0)),
..Default::default()
},
];
let sum = Sum {
data_points,
..Default::default()
};
encode_sum(
&mut tables,
"datamon",
&sum,
Some(&vec![keyvalue("resource", "app")]),
Some(&vec![keyvalue("scope", "otel")]),
)
.unwrap();
let table = tables.get_or_default_table_data("datamon", 0, 0);
assert_eq!(table.num_rows(), 2);
assert_eq!(table.num_columns(), 5);
assert_eq!(
table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"greptime_value"
]
);
}
#[test]
fn test_encode_summary() {
let mut tables = MultiTableData::default();
let data_points = vec![SummaryDataPoint {
attributes: vec![keyvalue("host", "testserver")],
time_unix_nano: 100,
count: 25,
sum: 5400.0,
quantile_values: vec![
ValueAtQuantile {
quantile: 0.90,
value: 1000.0,
},
ValueAtQuantile {
quantile: 0.95,
value: 3030.0,
},
],
..Default::default()
}];
let summary = Summary { data_points };
encode_summary(
&mut tables,
"datamon",
&summary,
Some(&vec![keyvalue("resource", "app")]),
Some(&vec![keyvalue("scope", "otel")]),
)
.unwrap();
let table = tables.get_or_default_table_data("datamon", 0, 0);
assert_eq!(table.num_rows(), 1);
assert_eq!(table.num_columns(), 7);
assert_eq!(
table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"greptime_p90",
"greptime_p95",
"greptime_count"
]
);
}
#[test]
fn test_encode_histogram() {
let mut tables = MultiTableData::default();
let data_points = vec![HistogramDataPoint {
attributes: vec![keyvalue("host", "testserver")],
time_unix_nano: 100,
start_time_unix_nano: 23,
count: 25,
sum: Some(100.),
max: Some(200.),
min: Some(0.03),
bucket_counts: vec![2, 4, 6, 9, 4],
explicit_bounds: vec![0.1, 1., 10., 100.],
..Default::default()
}];
let histogram = Histogram {
data_points,
aggregation_temporality: AggregationTemporality::Delta.into(),
};
encode_histogram(
&mut tables,
"histo",
&histogram,
Some(&vec![keyvalue("resource", "app")]),
Some(&vec![keyvalue("scope", "otel")]),
)
.unwrap();
assert_eq!(3, tables.num_tables());
// bucket table
let bucket_table = tables.get_or_default_table_data("histo_bucket", 0, 0);
assert_eq!(bucket_table.num_rows(), 5);
assert_eq!(bucket_table.num_columns(), 6);
assert_eq!(
bucket_table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"le",
"greptime_value",
]
);
let sum_table = tables.get_or_default_table_data("histo_sum", 0, 0);
assert_eq!(sum_table.num_rows(), 1);
assert_eq!(sum_table.num_columns(), 5);
assert_eq!(
sum_table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"greptime_value",
]
);
let count_table = tables.get_or_default_table_data("histo_count", 0, 0);
assert_eq!(count_table.num_rows(), 1);
assert_eq!(count_table.num_columns(), 5);
assert_eq!(
count_table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"greptime_value",
]
);
}
}

View File

@@ -0,0 +1,28 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use super::trace::TraceSpans;
/// Transformer helps to transform ExportTraceServiceRequest based on logic, like:
/// - uplift some fields from Attributes (Map type) to column
pub trait TraceParser: Send + Sync {
fn parse(&self, request: ExportTraceServiceRequest) -> TraceSpans;
fn table_name(&self) -> String;
}
pub type TraceParserRef = Arc<dyn TraceParser>;

View File

@@ -0,0 +1,411 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, RowInsertRequests};
use common_grpc::writer::Precision;
use common_time::time::Time;
use itertools::Itertools;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue;
use opentelemetry_proto::tonic::common::v1::{
AnyValue, ArrayValue, InstrumentationScope, KeyValue, KeyValueList,
};
use opentelemetry_proto::tonic::trace::v1::span::{Event, Link};
use opentelemetry_proto::tonic::trace::v1::{Span, Status};
use serde_json::json;
use super::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use crate::error::Result;
use crate::row_writer::{self, MultiTableData, TableData};
const APPROXIMATE_COLUMN_COUNT: usize = 24;
pub const TRACE_TABLE_NAME: &str = "traces_preview_v01";
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct TraceSpan {
// the following are tags
pub trace_id: String,
pub span_id: String,
pub parent_span_id: String,
// the following are fields
pub resource_attributes: String, // TODO(yuanbohan): Map in the future
pub scope_name: String,
pub scope_version: String,
pub scope_attributes: String, // TODO(yuanbohan): Map in the future
pub trace_state: String,
pub span_name: String,
pub span_kind: String,
pub span_status_code: String,
pub span_status_message: String,
pub span_attributes: String, // TODO(yuanbohan): Map in the future
pub span_events: String, // TODO(yuanbohan): List in the future
pub span_links: String, // TODO(yuanbohan): List in the future
pub start_in_nanosecond: u64, // this is also the Timestamp Index
pub end_in_nanosecond: u64,
pub uplifted_fields: Vec<(String, ColumnDataType, ValueData)>,
}
pub type TraceSpans = Vec<TraceSpan>;
/// Convert SpanTraces to GreptimeDB row insert requests.
/// Returns `InsertRequests` and total number of rows to ingest
pub fn to_grpc_insert_requests(
table_name: String,
spans: TraceSpans,
) -> Result<(RowInsertRequests, usize)> {
let mut multi_table_writer = MultiTableData::default();
let one_table_writer = multi_table_writer.get_or_default_table_data(
table_name,
APPROXIMATE_COLUMN_COUNT,
spans.len(),
);
for span in spans {
write_span_to_row(one_table_writer, span)?;
}
Ok(multi_table_writer.into_row_insert_requests())
}
pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> {
let mut row = writer.alloc_one_row();
{
// tags
let iter = vec![
("trace_id", span.trace_id),
("span_id", span.span_id),
("parent_span_id", span.parent_span_id),
]
.into_iter()
.map(|(col, val)| (col.to_string(), val));
row_writer::write_tags(writer, iter, &mut row)?;
}
{
// fields
let str_fields_iter = vec![
("resource_attributes", span.resource_attributes),
("scope_name", span.scope_name),
("scope_version", span.scope_version),
("scope_attributes", span.scope_attributes),
("trace_state", span.trace_state),
("span_name", span.span_name),
("span_kind", span.span_kind),
("span_status_code", span.span_status_code),
("span_status_message", span.span_status_message),
("span_attributes", span.span_attributes),
("span_events", span.span_events),
("span_links", span.span_links),
]
.into_iter()
.map(|(col, val)| {
(
col.into(),
ColumnDataType::String,
ValueData::StringValue(val),
)
});
let time_fields_iter = vec![
("start", span.start_in_nanosecond),
("end", span.end_in_nanosecond),
]
.into_iter()
.map(|(col, val)| {
(
col.into(),
ColumnDataType::TimestampNanosecond,
ValueData::TimestampNanosecondValue(val as i64),
)
});
row_writer::write_fields(writer, str_fields_iter, &mut row)?;
row_writer::write_fields(writer, time_fields_iter, &mut row)?;
row_writer::write_fields(writer, span.uplifted_fields.into_iter(), &mut row)?;
}
row_writer::write_f64(
writer,
GREPTIME_VALUE,
(span.end_in_nanosecond - span.start_in_nanosecond) as f64 / 1_000_000.0, // duration in millisecond
&mut row,
)?;
row_writer::write_ts_precision(
writer,
GREPTIME_TIMESTAMP,
Some(span.start_in_nanosecond as i64),
Precision::Nanosecond,
&mut row,
)?;
writer.add_row(row);
Ok(())
}
pub fn parse_span(
resource_attrs: &[KeyValue],
scope: &InstrumentationScope,
span: Span,
) -> TraceSpan {
let (span_status_code, span_status_message) = status_to_string(&span.status);
let span_kind = span.kind().as_str_name().into();
TraceSpan {
trace_id: bytes_to_hex_string(&span.trace_id),
span_id: bytes_to_hex_string(&span.span_id),
parent_span_id: bytes_to_hex_string(&span.parent_span_id),
resource_attributes: vec_kv_to_string(resource_attrs),
trace_state: span.trace_state,
scope_name: scope.name.clone(),
scope_version: scope.version.clone(),
scope_attributes: vec_kv_to_string(&scope.attributes),
span_name: span.name,
span_kind,
span_status_code,
span_status_message,
span_attributes: vec_kv_to_string(&span.attributes),
span_events: events_to_string(&span.events),
span_links: links_to_string(&span.links),
start_in_nanosecond: span.start_time_unix_nano,
end_in_nanosecond: span.end_time_unix_nano,
uplifted_fields: vec![],
}
}
/// Convert OpenTelemetry traces to SpanTraces
///
/// See
/// <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto>
/// for data structure of OTLP traces.
pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans {
let mut spans = vec![];
for resource_spans in request.resource_spans {
let resource_attrs = resource_spans
.resource
.map(|r| r.attributes)
.unwrap_or_default();
for scope_spans in resource_spans.scope_spans {
let scope = scope_spans.scope.unwrap_or_default();
for span in scope_spans.spans {
spans.push(parse_span(&resource_attrs, &scope, span));
}
}
}
spans
}
pub fn bytes_to_hex_string(bs: &[u8]) -> String {
bs.iter().map(|b| format!("{:02x}", b)).join("")
}
pub fn arr_vals_to_string(arr: &ArrayValue) -> String {
let vs: Vec<String> = arr
.values
.iter()
.filter_map(|val| any_value_to_string(val.clone()))
.collect();
serde_json::to_string(&vs).unwrap_or_else(|_| "[]".into())
}
pub fn vec_kv_to_string(vec: &[KeyValue]) -> String {
let vs: HashMap<String, String> = vec
.iter()
.map(|kv| {
let val = kv
.value
.clone()
.and_then(any_value_to_string)
.unwrap_or_default();
(kv.key.clone(), val)
})
.collect();
serde_json::to_string(&vs).unwrap_or_else(|_| "{}".into())
}
pub fn kvlist_to_string(kvlist: &KeyValueList) -> String {
vec_kv_to_string(&kvlist.values)
}
pub fn any_value_to_string(val: AnyValue) -> Option<String> {
val.value.map(|value| match value {
OtlpValue::StringValue(s) => s,
OtlpValue::BoolValue(b) => b.to_string(),
OtlpValue::IntValue(i) => i.to_string(),
OtlpValue::DoubleValue(d) => d.to_string(),
OtlpValue::ArrayValue(arr) => arr_vals_to_string(&arr),
OtlpValue::KvlistValue(kv) => kvlist_to_string(&kv),
OtlpValue::BytesValue(bs) => bytes_to_hex_string(&bs),
})
}
pub fn event_to_string(event: &Event) -> String {
json!({
"name": event.name,
"time": Time::new_nanosecond(event.time_unix_nano as i64).to_iso8601_string(),
"attrs": vec_kv_to_string(&event.attributes),
})
.to_string()
}
pub fn events_to_string(events: &[Event]) -> String {
let v: Vec<String> = events.iter().map(event_to_string).collect();
serde_json::to_string(&v).unwrap_or_else(|_| "[]".into())
}
pub fn link_to_string(link: &Link) -> String {
json!({
"trace_id": link.trace_id,
"span_id": link.span_id,
"trace_state": link.trace_state,
"attributes": vec_kv_to_string(&link.attributes),
})
.to_string()
}
pub fn links_to_string(links: &[Link]) -> String {
let v: Vec<String> = links.iter().map(link_to_string).collect();
serde_json::to_string(&v).unwrap_or_else(|_| "[]".into())
}
pub fn status_to_string(status: &Option<Status>) -> (String, String) {
match status {
Some(status) => (status.code().as_str_name().into(), status.message.clone()),
None => ("".into(), "".into()),
}
}
#[cfg(test)]
mod tests {
use common_time::time::Time;
use opentelemetry_proto::tonic::common::v1::{
any_value, AnyValue, ArrayValue, KeyValue, KeyValueList,
};
use opentelemetry_proto::tonic::trace::v1::span::Event;
use opentelemetry_proto::tonic::trace::v1::Status;
use serde_json::json;
use crate::otlp::trace::{
arr_vals_to_string, bytes_to_hex_string, event_to_string, kvlist_to_string,
status_to_string, vec_kv_to_string,
};
#[test]
fn test_bytes_to_hex_string() {
assert_eq!(
"24fe79948641b110a29bc27859307e8d",
bytes_to_hex_string(&[
36, 254, 121, 148, 134, 65, 177, 16, 162, 155, 194, 120, 89, 48, 126, 141,
])
);
assert_eq!(
"baffeedd7b8debc0",
bytes_to_hex_string(&[186, 255, 238, 221, 123, 141, 235, 192,])
);
}
#[test]
fn test_arr_vals_to_string() {
assert_eq!("[]", arr_vals_to_string(&ArrayValue { values: vec![] }));
let arr = ArrayValue {
values: vec![
AnyValue {
value: Some(any_value::Value::StringValue("string_value".into())),
},
AnyValue {
value: Some(any_value::Value::BoolValue(true)),
},
AnyValue {
value: Some(any_value::Value::IntValue(1)),
},
AnyValue {
value: Some(any_value::Value::DoubleValue(1.2)),
},
],
};
let expect = json!(["string_value", "true", "1", "1.2"]).to_string();
assert_eq!(expect, arr_vals_to_string(&arr));
}
#[test]
fn test_kv_list_to_string() {
let kvlist = KeyValueList {
values: vec![KeyValue {
key: "str_key".into(),
value: Some(AnyValue {
value: Some(any_value::Value::StringValue("val1".into())),
}),
}],
};
let expect = json!({
"str_key": "val1",
})
.to_string();
assert_eq!(expect, kvlist_to_string(&kvlist))
}
#[test]
fn test_event_to_string() {
let attributes = vec![KeyValue {
key: "str_key".into(),
value: Some(AnyValue {
value: Some(any_value::Value::StringValue("val1".into())),
}),
}];
let event = Event {
time_unix_nano: 1697620662450128000_u64,
name: "event_name".into(),
attributes,
dropped_attributes_count: 0,
};
let event_string = event_to_string(&event);
let expect = json!({
"name": event.name,
"time": Time::new_nanosecond(event.time_unix_nano as i64).to_iso8601_string(),
"attrs": vec_kv_to_string(&event.attributes),
});
assert_eq!(
expect,
serde_json::from_str::<serde_json::value::Value>(event_string.as_str()).unwrap()
);
}
#[test]
fn test_status_to_string() {
let message = String::from("status message");
let status = Status {
code: 1,
message: message.clone(),
};
assert_eq!(
("STATUS_CODE_OK".into(), message),
status_to_string(&Some(status)),
);
}
}

View File

@@ -34,6 +34,9 @@ use common_query::Output;
use opentelemetry_proto::tonic::collector::metrics::v1::{
ExportMetricsServiceRequest, ExportMetricsServiceResponse,
};
use opentelemetry_proto::tonic::collector::trace::v1::{
ExportTraceServiceRequest, ExportTraceServiceResponse,
};
use session::context::QueryContextRef;
use crate::error::Result;
@@ -101,4 +104,11 @@ pub trait OpenTelemetryProtocolHandler {
request: ExportMetricsServiceRequest,
ctx: QueryContextRef,
) -> Result<ExportMetricsServiceResponse>;
/// Handling opentelemetry traces request
async fn traces(
&self,
request: ExportTraceServiceRequest,
ctx: QueryContextRef,
) -> Result<ExportTraceServiceResponse>;
}