feat: implement storage for OTLP histogram (#2282)

* feat: implement new histogram data model

* feat:  use prometheus table format for histogram

* refactor: remove duplicated code

* fix: histogram tag column

* fix: use accumulated count in buckets

* refactor: using row based protocol for otlp WIP

* refactor: use row based writer for otlp.

Also updated row writer for owned keys

* refactor: use row writers for otlp

* test: add integration tests for histogram

* refactor: change le column name
This commit is contained in:
Ning Sun
2023-09-23 15:59:14 +08:00
committed by GitHub
parent 9d0de25bff
commit ffa729cdf5
6 changed files with 536 additions and 272 deletions

View File

@@ -42,7 +42,7 @@ impl OpenTelemetryProtocolHandler for Instance {
.context(AuthSnafu)?;
let (requests, rows) = otlp::to_grpc_insert_requests(request)?;
let _ = self
.handle_inserts(requests, ctx)
.handle_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;

View File

@@ -53,7 +53,7 @@ impl TryFrom<InfluxdbRequest> for RowInsertRequests {
// tags
if let Some(tags) = tags {
let kvs = tags.iter().map(|(k, v)| (k.as_str(), v.as_str()));
let kvs = tags.iter().map(|(k, v)| (k.to_string(), v.as_str()));
row_writer::write_tags(table_data, kvs, &mut one_row)?;
}
@@ -69,7 +69,7 @@ impl TryFrom<InfluxdbRequest> for RowInsertRequests {
),
FieldValue::Boolean(v) => (ColumnDataType::Boolean, ValueData::BoolValue(*v)),
};
(k.as_str(), datatype, value)
(k.to_string(), datatype, value)
});
row_writer::write_fields(table_data, fields, &mut one_row)?;

View File

@@ -12,17 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::{InsertRequest, InsertRequests};
use common_grpc::writer::{LinesWriter, Precision};
use api::v1::{RowInsertRequests, Value};
use common_grpc::writer::Precision;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::common::v1::{any_value, KeyValue};
use opentelemetry_proto::tonic::metrics::v1::{metric, number_data_point, *};
use snafu::ResultExt;
use crate::error::{self, Result};
use crate::error::Result;
use crate::row_writer::{self, MultiTableData, TableData};
const GREPTIME_TIMESTAMP: &str = "greptime_timestamp";
const GREPTIME_VALUE: &str = "greptime_value";
const GREPTIME_COUNT: &str = "greptime_count";
/// the default column count for table writer
const APPROXIMATE_COLUMN_COUNT: usize = 8;
/// Normalize otlp instrumentation, metric and attribute names
///
@@ -43,313 +46,350 @@ fn normalize_otlp_name(name: &str) -> String {
/// Returns `InsertRequests` and total number of rows to ingest
pub fn to_grpc_insert_requests(
request: ExportMetricsServiceRequest,
) -> Result<(InsertRequests, usize)> {
let mut insert_batch = Vec::new();
let mut rows = 0;
) -> Result<(RowInsertRequests, usize)> {
let mut table_writer = MultiTableData::default();
for resource in request.resource_metrics {
let resource_attrs = resource.resource.map(|r| r.attributes);
for scope in resource.scope_metrics {
let scope_attrs = scope.scope.map(|s| s.attributes);
for metric in scope.metrics {
if let Some(insert) =
encode_metrics(&metric, resource_attrs.as_ref(), scope_attrs.as_ref())?
{
rows += insert.row_count;
insert_batch.push(insert);
}
for resource in &request.resource_metrics {
let resource_attrs = resource.resource.as_ref().map(|r| &r.attributes);
for scope in &resource.scope_metrics {
let scope_attrs = scope.scope.as_ref().map(|s| &s.attributes);
for metric in &scope.metrics {
encode_metrics(&mut table_writer, metric, resource_attrs, scope_attrs)?;
}
}
}
let inserts = InsertRequests {
inserts: insert_batch,
};
Ok((inserts, rows as usize))
Ok(table_writer.into_row_insert_requests())
}
fn encode_metrics(
table_writer: &mut MultiTableData,
metric: &Metric,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<Option<InsertRequest>> {
) -> Result<()> {
let name = &metric.name;
// note that we don't store description or unit, we might want to deal with
// these fields in the future.
if let Some(data) = &metric.data {
match data {
metric::Data::Gauge(gauge) => {
encode_gauge(name, gauge, resource_attrs, scope_attrs).map(Some)
encode_gauge(table_writer, name, gauge, resource_attrs, scope_attrs)?;
}
metric::Data::Sum(sum) => {
encode_sum(table_writer, name, sum, resource_attrs, scope_attrs)?;
}
metric::Data::Sum(sum) => encode_sum(name, sum, resource_attrs, scope_attrs).map(Some),
metric::Data::Summary(summary) => {
encode_summary(name, summary, resource_attrs, scope_attrs).map(Some)
encode_summary(table_writer, name, summary, resource_attrs, scope_attrs)?;
}
// TODO(sunng87) leave histogram for next release
metric::Data::Histogram(_hist) => Ok(None),
metric::Data::ExponentialHistogram(_hist) => Ok(None),
metric::Data::Histogram(hist) => {
encode_histogram(table_writer, name, hist, resource_attrs, scope_attrs)?;
}
// TODO(sunng87) leave ExponentialHistogram for next release
metric::Data::ExponentialHistogram(_hist) => {}
}
} else {
Ok(None)
}
Ok(())
}
fn write_attributes(lines: &mut LinesWriter, attrs: Option<&Vec<KeyValue>>) -> Result<()> {
fn write_attributes(
writer: &mut TableData,
row: &mut Vec<Value>,
attrs: Option<&Vec<KeyValue>>,
) -> Result<()> {
if let Some(attrs) = attrs {
for attr in attrs {
write_attribute(lines, attr)?;
}
let table_tags = attrs.iter().filter_map(|attr| {
if let Some(val) = attr.value.as_ref().and_then(|v| v.value.as_ref()) {
let key = normalize_otlp_name(&attr.key);
match val {
any_value::Value::StringValue(s) => Some((key, s.to_string())),
any_value::Value::IntValue(v) => Some((key, v.to_string())),
any_value::Value::DoubleValue(v) => Some((key, v.to_string())),
_ => None, // TODO(sunng87): allow different type of values
}
} else {
None
}
});
row_writer::write_tags(writer, table_tags, row)?;
}
Ok(())
}
fn write_attribute(lines: &mut LinesWriter, attr: &KeyValue) -> Result<()> {
if let Some(val) = attr.value.as_ref().and_then(|v| v.value.as_ref()) {
match val {
any_value::Value::StringValue(s) => lines
.write_tag(&normalize_otlp_name(&attr.key), s)
.context(error::OtlpMetricsWriteSnafu)?,
any_value::Value::IntValue(v) => lines
.write_tag(&normalize_otlp_name(&attr.key), &v.to_string())
.context(error::OtlpMetricsWriteSnafu)?,
any_value::Value::DoubleValue(v) => lines
.write_tag(&normalize_otlp_name(&attr.key), &v.to_string())
.context(error::OtlpMetricsWriteSnafu)?,
// TODO(sunng87): allow different type of values
_ => {}
}
}
Ok(())
}
fn write_timestamp(lines: &mut LinesWriter, time_nano: i64) -> Result<()> {
lines
.write_ts(GREPTIME_TIMESTAMP, (time_nano, Precision::Nanosecond))
.context(error::OtlpMetricsWriteSnafu)?;
Ok(())
fn write_timestamp(table: &mut TableData, row: &mut Vec<Value>, time_nano: i64) -> Result<()> {
row_writer::write_ts_precision(
table,
GREPTIME_TIMESTAMP,
Some(time_nano),
Precision::Nanosecond,
row,
)
}
fn write_data_point_value(
lines: &mut LinesWriter,
table: &mut TableData,
row: &mut Vec<Value>,
field: &str,
value: &Option<number_data_point::Value>,
) -> Result<()> {
match value {
Some(number_data_point::Value::AsInt(val)) => {
// we coerce all values to f64
lines
.write_f64(field, *val as f64)
.context(error::OtlpMetricsWriteSnafu)?
row_writer::write_f64(table, field, *val as f64, row)?;
}
Some(number_data_point::Value::AsDouble(val)) => {
row_writer::write_f64(table, field, *val, row)?;
}
Some(number_data_point::Value::AsDouble(val)) => lines
.write_f64(field, *val)
.context(error::OtlpMetricsWriteSnafu)?,
_ => {}
}
Ok(())
}
fn write_tags_and_timestamp(
table: &mut TableData,
row: &mut Vec<Value>,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
data_point_attrs: Option<&Vec<KeyValue>>,
timestamp_nanos: i64,
) -> Result<()> {
write_attributes(table, row, resource_attrs)?;
write_attributes(table, row, scope_attrs)?;
write_attributes(table, row, data_point_attrs)?;
write_timestamp(table, row, timestamp_nanos)?;
Ok(())
}
/// encode this gauge metric
///
/// note that there can be multiple data points in the request, it's going to be
/// stored as multiple rows
fn encode_gauge(
table_writer: &mut MultiTableData,
name: &str,
gauge: &Gauge,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<InsertRequest> {
let mut lines = LinesWriter::with_lines(gauge.data_points.len());
for data_point in &gauge.data_points {
write_attributes(&mut lines, resource_attrs)?;
write_attributes(&mut lines, scope_attrs)?;
write_attributes(&mut lines, Some(data_point.attributes.as_ref()))?;
write_timestamp(&mut lines, data_point.time_unix_nano as i64)?;
write_data_point_value(&mut lines, GREPTIME_VALUE, &data_point.value)?;
) -> Result<()> {
let table = table_writer.get_or_default_table_data(
&normalize_otlp_name(name),
APPROXIMATE_COLUMN_COUNT,
gauge.data_points.len(),
);
lines.commit();
for data_point in &gauge.data_points {
let mut row = table.alloc_one_row();
write_tags_and_timestamp(
table,
&mut row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?;
table.add_row(row);
}
let (columns, row_count) = lines.finish();
Ok(InsertRequest {
table_name: normalize_otlp_name(name),
columns,
row_count,
})
Ok(())
}
/// encode this sum metric
///
/// `aggregation_temporality` and `monotonic` are ignored for now
fn encode_sum(
table_writer: &mut MultiTableData,
name: &str,
sum: &Sum,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<InsertRequest> {
let mut lines = LinesWriter::with_lines(sum.data_points.len());
) -> Result<()> {
let table = table_writer.get_or_default_table_data(
&normalize_otlp_name(name),
APPROXIMATE_COLUMN_COUNT,
sum.data_points.len(),
);
for data_point in &sum.data_points {
write_attributes(&mut lines, resource_attrs)?;
write_attributes(&mut lines, scope_attrs)?;
write_attributes(&mut lines, Some(data_point.attributes.as_ref()))?;
write_timestamp(&mut lines, data_point.time_unix_nano as i64)?;
write_data_point_value(&mut lines, GREPTIME_VALUE, &data_point.value)?;
lines.commit();
let mut row = table.alloc_one_row();
write_tags_and_timestamp(
table,
&mut row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?;
table.add_row(row);
}
let (columns, row_count) = lines.finish();
Ok(InsertRequest {
table_name: normalize_otlp_name(name),
columns,
row_count,
})
Ok(())
}
// TODO(sunng87): we may need better implementation for histogram
#[allow(dead_code)]
fn encode_histogram(name: &str, hist: &Histogram) -> Result<InsertRequest> {
let mut lines = LinesWriter::with_lines(hist.data_points.len());
const HISTOGRAM_LE_COLUMN: &str = "le";
/// Encode histogram data. This function returns 3 insert requests for 3 tables.
///
/// The implementation has been following Prometheus histogram table format:
///
/// - A `%metric%_bucket` table including `greptime_le` tag that stores bucket upper
/// limit, and `greptime_value` for bucket count
/// - A `%metric%_sum` table storing sum of samples
/// - A `%metric%_count` table storing count of samples.
///
/// By its Prometheus compatibility, we hope to be able to use prometheus
/// quantile functions on this table.
fn encode_histogram(
table_writer: &mut MultiTableData,
name: &str,
hist: &Histogram,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<()> {
let normalized_name = normalize_otlp_name(name);
let bucket_table_name = format!("{}_bucket", normalized_name);
let sum_table_name = format!("{}_sum", normalized_name);
let count_table_name = format!("{}_count", normalized_name);
let data_points_len = hist.data_points.len();
// Note that the row and columns number here is approximate
let mut bucket_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len * 3);
let mut sum_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
let mut count_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
for data_point in &hist.data_points {
for attr in &data_point.attributes {
write_attribute(&mut lines, attr)?;
}
write_timestamp(&mut lines, data_point.time_unix_nano as i64)?;
let mut accumulated_count = 0;
for (idx, count) in data_point.bucket_counts.iter().enumerate() {
// here we don't store bucket boundary
lines
.write_u64(&format!("bucket_{}", idx), *count)
.context(error::OtlpMetricsWriteSnafu)?;
let mut bucket_row = bucket_table.alloc_one_row();
write_tags_and_timestamp(
&mut bucket_table,
&mut bucket_row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
if let Some(upper_bounds) = data_point.explicit_bounds.get(idx) {
row_writer::write_tag(
&mut bucket_table,
HISTOGRAM_LE_COLUMN,
upper_bounds,
&mut bucket_row,
)?;
} else if idx == data_point.explicit_bounds.len() {
// The last bucket
row_writer::write_tag(
&mut bucket_table,
HISTOGRAM_LE_COLUMN,
f64::INFINITY,
&mut bucket_row,
)?;
}
accumulated_count += count;
row_writer::write_f64(
&mut bucket_table,
GREPTIME_VALUE,
accumulated_count as f64,
&mut bucket_row,
)?;
bucket_table.add_row(bucket_row);
}
if let Some(min) = data_point.min {
lines
.write_f64("min", min)
.context(error::OtlpMetricsWriteSnafu)?;
if let Some(sum) = data_point.sum {
let mut sum_row = sum_table.alloc_one_row();
write_tags_and_timestamp(
&mut sum_table,
&mut sum_row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
row_writer::write_f64(&mut sum_table, GREPTIME_VALUE, sum, &mut sum_row)?;
sum_table.add_row(sum_row);
}
if let Some(max) = data_point.max {
lines
.write_f64("max", max)
.context(error::OtlpMetricsWriteSnafu)?;
}
let mut count_row = count_table.alloc_one_row();
write_tags_and_timestamp(
&mut count_table,
&mut count_row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
lines.commit();
row_writer::write_f64(
&mut count_table,
GREPTIME_VALUE,
data_point.count as f64,
&mut count_row,
)?;
count_table.add_row(count_row);
}
let (columns, row_count) = lines.finish();
Ok(InsertRequest {
table_name: normalize_otlp_name(name),
columns,
row_count,
})
table_writer.add_table_data(bucket_table_name, bucket_table);
table_writer.add_table_data(sum_table_name, sum_table);
table_writer.add_table_data(count_table_name, count_table);
Ok(())
}
#[allow(dead_code)]
fn encode_exponential_histogram(name: &str, hist: &ExponentialHistogram) -> Result<InsertRequest> {
let mut lines = LinesWriter::with_lines(hist.data_points.len());
for data_point in &hist.data_points {
for attr in &data_point.attributes {
write_attribute(&mut lines, attr)?;
}
write_timestamp(&mut lines, data_point.time_unix_nano as i64)?;
// TODO(sunng87): confirm if this working
if let Some(positive_buckets) = &data_point.positive {
for (idx, count) in positive_buckets.bucket_counts.iter().enumerate() {
// here we don't store bucket boundary
lines
.write_u64(
&format!("bucket_{}", idx + positive_buckets.offset as usize),
*count,
)
.context(error::OtlpMetricsWriteSnafu)?;
}
}
if let Some(negative_buckets) = &data_point.negative {
for (idx, count) in negative_buckets.bucket_counts.iter().enumerate() {
lines
.write_u64(
&format!("bucket_{}", idx + negative_buckets.offset as usize),
*count,
)
.context(error::OtlpMetricsWriteSnafu)?;
}
}
if let Some(min) = data_point.min {
lines
.write_f64("min", min)
.context(error::OtlpMetricsWriteSnafu)?;
}
if let Some(max) = data_point.max {
lines
.write_f64("max", max)
.context(error::OtlpMetricsWriteSnafu)?;
}
lines.commit();
}
let (columns, row_count) = lines.finish();
Ok(InsertRequest {
table_name: normalize_otlp_name(name),
columns,
row_count,
})
fn encode_exponential_histogram(_name: &str, _hist: &ExponentialHistogram) -> Result<()> {
// TODO(sunng87): implement this using a prometheus compatible way
Ok(())
}
fn encode_summary(
table_writer: &mut MultiTableData,
name: &str,
summary: &Summary,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<InsertRequest> {
let mut lines = LinesWriter::with_lines(summary.data_points.len());
) -> Result<()> {
let table = table_writer.get_or_default_table_data(
&normalize_otlp_name(name),
APPROXIMATE_COLUMN_COUNT,
summary.data_points.len(),
);
for data_point in &summary.data_points {
write_attributes(&mut lines, resource_attrs)?;
write_attributes(&mut lines, scope_attrs)?;
write_attributes(&mut lines, Some(data_point.attributes.as_ref()))?;
write_timestamp(&mut lines, data_point.time_unix_nano as i64)?;
let mut row = table.alloc_one_row();
write_tags_and_timestamp(
table,
&mut row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
for quantile in &data_point.quantile_values {
// here we don't store bucket boundary
lines
.write_f64(
&format!("greptime_p{:02}", quantile.quantile * 100f64),
quantile.value,
)
.context(error::OtlpMetricsWriteSnafu)?;
row_writer::write_f64(
table,
&format!("greptime_p{:02}", quantile.quantile * 100f64),
quantile.value,
&mut row,
)?;
}
lines
.write_u64("greptime_count", data_point.count)
.context(error::OtlpMetricsWriteSnafu)?;
lines.commit();
row_writer::write_f64(table, GREPTIME_COUNT, data_point.count as f64, &mut row)?;
table.add_row(row);
}
let (columns, row_count) = lines.finish();
Ok(InsertRequest {
table_name: normalize_otlp_name(name),
columns,
row_count,
})
Ok(())
}
#[cfg(test)]
@@ -358,7 +398,7 @@ mod tests {
use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue};
use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value;
use opentelemetry_proto::tonic::metrics::v1::summary_data_point::ValueAtQuantile;
use opentelemetry_proto::tonic::metrics::v1::NumberDataPoint;
use opentelemetry_proto::tonic::metrics::v1::{HistogramDataPoint, NumberDataPoint};
use super::*;
@@ -382,6 +422,8 @@ mod tests {
#[test]
fn test_encode_gauge() {
let mut tables = MultiTableData::default();
let data_points = vec![
NumberDataPoint {
attributes: vec![keyvalue("host", "testsevrer")],
@@ -397,7 +439,8 @@ mod tests {
},
];
let gauge = Gauge { data_points };
let inserts = encode_gauge(
encode_gauge(
&mut tables,
"datamon",
&gauge,
Some(&vec![keyvalue("resource", "app")]),
@@ -405,12 +448,12 @@ mod tests {
)
.unwrap();
assert_eq!(inserts.table_name, "datamon");
assert_eq!(inserts.row_count, 2);
assert_eq!(inserts.columns.len(), 5);
let table = tables.get_or_default_table_data("datamon", 0, 0);
assert_eq!(table.num_rows(), 2);
assert_eq!(table.num_columns(), 5);
assert_eq!(
inserts
.columns
table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
@@ -426,6 +469,8 @@ mod tests {
#[test]
fn test_encode_sum() {
let mut tables = MultiTableData::default();
let data_points = vec![
NumberDataPoint {
attributes: vec![keyvalue("host", "testserver")],
@@ -444,7 +489,8 @@ mod tests {
data_points,
..Default::default()
};
let inserts = encode_sum(
encode_sum(
&mut tables,
"datamon",
&sum,
Some(&vec![keyvalue("resource", "app")]),
@@ -452,12 +498,12 @@ mod tests {
)
.unwrap();
assert_eq!(inserts.table_name, "datamon");
assert_eq!(inserts.row_count, 2);
assert_eq!(inserts.columns.len(), 5);
let table = tables.get_or_default_table_data("datamon", 0, 0);
assert_eq!(table.num_rows(), 2);
assert_eq!(table.num_columns(), 5);
assert_eq!(
inserts
.columns
table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
@@ -473,6 +519,8 @@ mod tests {
#[test]
fn test_encode_summary() {
let mut tables = MultiTableData::default();
let data_points = vec![SummaryDataPoint {
attributes: vec![keyvalue("host", "testserver")],
time_unix_nano: 100,
@@ -491,7 +539,8 @@ mod tests {
..Default::default()
}];
let summary = Summary { data_points };
let inserts = encode_summary(
encode_summary(
&mut tables,
"datamon",
&summary,
Some(&vec![keyvalue("resource", "app")]),
@@ -499,12 +548,12 @@ mod tests {
)
.unwrap();
assert_eq!(inserts.table_name, "datamon");
assert_eq!(inserts.row_count, 1);
assert_eq!(inserts.columns.len(), 7);
let table = tables.get_or_default_table_data("datamon", 0, 0);
assert_eq!(table.num_rows(), 1);
assert_eq!(table.num_columns(), 7);
assert_eq!(
inserts
.columns
table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
@@ -519,4 +568,93 @@ mod tests {
]
);
}
#[test]
fn test_encode_histogram() {
let mut tables = MultiTableData::default();
let data_points = vec![HistogramDataPoint {
attributes: vec![keyvalue("host", "testserver")],
time_unix_nano: 100,
start_time_unix_nano: 23,
count: 25,
sum: Some(100.),
max: Some(200.),
min: Some(0.03),
bucket_counts: vec![2, 4, 6, 9, 4],
explicit_bounds: vec![0.1, 1., 10., 100.],
..Default::default()
}];
let histogram = Histogram {
data_points,
aggregation_temporality: AggregationTemporality::Delta.into(),
};
encode_histogram(
&mut tables,
"histo",
&histogram,
Some(&vec![keyvalue("resource", "app")]),
Some(&vec![keyvalue("scope", "otel")]),
)
.unwrap();
assert_eq!(3, tables.num_tables());
// bucket table
let bucket_table = tables.get_or_default_table_data("histo_bucket", 0, 0);
assert_eq!(bucket_table.num_rows(), 5);
assert_eq!(bucket_table.num_columns(), 6);
assert_eq!(
bucket_table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"le",
"greptime_value",
]
);
let sum_table = tables.get_or_default_table_data("histo_sum", 0, 0);
assert_eq!(sum_table.num_rows(), 1);
assert_eq!(sum_table.num_columns(), 5);
assert_eq!(
sum_table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"greptime_value",
]
);
let count_table = tables.get_or_default_table_data("histo_count", 0, 0);
assert_eq!(count_table.num_rows(), 1);
assert_eq!(count_table.num_columns(), 5);
assert_eq!(
count_table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"greptime_value",
]
);
}
}

View File

@@ -334,7 +334,7 @@ pub fn to_grpc_row_insert_requests(request: WriteRequest) -> Result<(RowInsertRe
if label.name == METRIC_NAME_LABEL {
None
} else {
Some((label.name.as_str(), label.value.as_str()))
Some((label.name.to_string(), label.value.as_str()))
}
});
row_writer::write_tags(table_data, kvs, &mut one_row)?;

View File

@@ -27,13 +27,13 @@ use snafu::{ensure, OptionExt, ResultExt};
use crate::error::{IncompatibleSchemaSnafu, InfluxdbLinesWriteSnafu, Result, TimePrecisionSnafu};
pub struct TableData<'a> {
pub struct TableData {
schema: Vec<ColumnSchema>,
rows: Vec<Row>,
column_indexes: HashMap<&'a str, usize>,
column_indexes: HashMap<String, usize>,
}
impl TableData<'_> {
impl TableData {
pub fn new(num_columns: usize, num_rows: usize) -> Self {
Self {
schema: Vec::with_capacity(num_columns),
@@ -62,16 +62,27 @@ impl TableData<'_> {
self.rows.push(Row { values })
}
#[allow(dead_code)]
pub fn columns(&self) -> &Vec<ColumnSchema> {
&self.schema
}
pub fn into_schema_and_rows(self) -> (Vec<ColumnSchema>, Vec<Row>) {
(self.schema, self.rows)
}
}
pub struct MultiTableData<'a> {
table_data_map: HashMap<&'a str, TableData<'a>>,
pub struct MultiTableData {
table_data_map: HashMap<String, TableData>,
}
impl<'a> MultiTableData<'a> {
impl Default for MultiTableData {
fn default() -> Self {
Self::new()
}
}
impl MultiTableData {
pub fn new() -> Self {
Self {
table_data_map: HashMap::new(),
@@ -80,15 +91,25 @@ impl<'a> MultiTableData<'a> {
pub fn get_or_default_table_data(
&mut self,
table_name: &'a str,
table_name: impl ToString,
num_columns: usize,
num_rows: usize,
) -> &mut TableData<'a> {
) -> &mut TableData {
self.table_data_map
.entry(table_name)
.entry(table_name.to_string())
.or_insert_with(|| TableData::new(num_columns, num_rows))
}
pub fn add_table_data(&mut self, table_name: impl ToString, table_data: TableData) {
self.table_data_map
.insert(table_name.to_string(), table_data);
}
#[allow(dead_code)]
pub fn num_tables(&self) -> usize {
self.table_data_map.len()
}
/// Returns the request and number of rows in it.
pub fn into_row_insert_requests(self) -> (RowInsertRequests, usize) {
let mut total_rows = 0;
@@ -106,7 +127,7 @@ impl<'a> MultiTableData<'a> {
}
RowInsertRequest {
table_name: table_name.to_string(),
table_name,
rows: Some(Rows { schema, rows }),
}
})
@@ -117,9 +138,9 @@ impl<'a> MultiTableData<'a> {
}
}
pub fn write_tags<'a>(
table_data: &mut TableData<'a>,
kvs: impl Iterator<Item = (&'a str, &'a str)>,
pub fn write_tags(
table_data: &mut TableData,
kvs: impl Iterator<Item = (String, impl ToString)>,
one_row: &mut Vec<Value>,
) -> Result<()> {
let ktv_iter = kvs.map(|(k, v)| {
@@ -132,31 +153,53 @@ pub fn write_tags<'a>(
write_by_semantic_type(table_data, SemanticType::Tag, ktv_iter, one_row)
}
pub fn write_fields<'a>(
table_data: &mut TableData<'a>,
fields: impl Iterator<Item = (&'a str, ColumnDataType, ValueData)>,
pub fn write_fields(
table_data: &mut TableData,
fields: impl Iterator<Item = (String, ColumnDataType, ValueData)>,
one_row: &mut Vec<Value>,
) -> Result<()> {
write_by_semantic_type(table_data, SemanticType::Field, fields, one_row)
}
pub fn write_f64<'a>(
table_data: &mut TableData<'a>,
name: &'a str,
pub fn write_tag(
table_data: &mut TableData,
name: impl ToString,
value: impl ToString,
one_row: &mut Vec<Value>,
) -> Result<()> {
write_by_semantic_type(
table_data,
SemanticType::Tag,
std::iter::once((
name.to_string(),
ColumnDataType::String,
ValueData::StringValue(value.to_string()),
)),
one_row,
)
}
pub fn write_f64(
table_data: &mut TableData,
name: impl ToString,
value: f64,
one_row: &mut Vec<Value>,
) -> Result<()> {
write_fields(
table_data,
std::iter::once((name, ColumnDataType::Float64, ValueData::F64Value(value))),
std::iter::once((
name.to_string(),
ColumnDataType::Float64,
ValueData::F64Value(value),
)),
one_row,
)
}
fn write_by_semantic_type<'a>(
table_data: &mut TableData<'a>,
fn write_by_semantic_type(
table_data: &mut TableData,
semantic_type: SemanticType,
ktv_iter: impl Iterator<Item = (&'a str, ColumnDataType, ValueData)>,
ktv_iter: impl Iterator<Item = (String, ColumnDataType, ValueData)>,
one_row: &mut Vec<Value>,
) -> Result<()> {
let TableData {
@@ -166,7 +209,7 @@ fn write_by_semantic_type<'a>(
} = table_data;
for (name, datatype, value) in ktv_iter {
let index = column_indexes.entry(name).or_insert(schema.len());
let index = column_indexes.entry(name.clone()).or_insert(schema.len());
if *index == schema.len() {
schema.push(ColumnSchema {
column_name: name.to_string(),
@@ -183,18 +226,18 @@ fn write_by_semantic_type<'a>(
Ok(())
}
pub fn write_ts_millis<'a>(
table_data: &mut TableData<'a>,
name: &'a str,
pub fn write_ts_millis(
table_data: &mut TableData,
name: impl ToString,
ts: Option<i64>,
one_row: &mut Vec<Value>,
) -> Result<()> {
write_ts_precision(table_data, name, ts, Precision::Millisecond, one_row)
}
pub fn write_ts_precision<'a>(
table_data: &mut TableData<'a>,
name: &'a str,
pub fn write_ts_precision(
table_data: &mut TableData,
name: impl ToString,
ts: Option<i64>,
precision: Precision,
one_row: &mut Vec<Value>,
@@ -204,6 +247,7 @@ pub fn write_ts_precision<'a>(
column_indexes,
..
} = table_data;
let name = name.to_string();
let ts = match ts {
Some(timestamp) => writer::to_ms_ts(precision, timestamp),
@@ -219,10 +263,10 @@ pub fn write_ts_precision<'a>(
}
};
let index = column_indexes.entry(name).or_insert(schema.len());
let index = column_indexes.entry(name.clone()).or_insert(schema.len());
if *index == schema.len() {
schema.push(ColumnSchema {
column_name: name.to_string(),
column_name: name,
datatype: ColumnDataType::TimestampMillisecond as i32,
semantic_type: SemanticType::Timestamp as i32,
});

View File

@@ -87,6 +87,65 @@ mod test {
+------------+-------+--------------------+------------+---------------------+----------------+
| greptimedb | otel | java | testserver | 1970-01-01T00:00:00 | 105.0 |
| greptimedb | otel | java | testsevrer | 1970-01-01T00:00:00 | 100.0 |
+------------+-------+--------------------+------------+---------------------+----------------+",
);
let mut output = instance
.do_query(
"SELECT le, greptime_value FROM my_test_histo_bucket order by le",
ctx.clone(),
)
.await;
let output = output.remove(0).unwrap();
let Output::Stream(stream) = output else {
unreachable!()
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(
recordbatches.pretty_print().unwrap(),
"\
+-----+----------------+
| le | greptime_value |
+-----+----------------+
| 1 | 1.0 |
| 5 | 3.0 |
| inf | 4.0 |
+-----+----------------+",
);
let mut output = instance
.do_query("SELECT * FROM my_test_histo_sum", ctx.clone())
.await;
let output = output.remove(0).unwrap();
let Output::Stream(stream) = output else {
unreachable!()
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(
recordbatches.pretty_print().unwrap(),
"\
+------------+-------+--------------------+------------+---------------------+----------------+
| resource | scope | telemetry_sdk_name | host | greptime_timestamp | greptime_value |
+------------+-------+--------------------+------------+---------------------+----------------+
| greptimedb | otel | java | testserver | 1970-01-01T00:00:00 | 51.0 |
+------------+-------+--------------------+------------+---------------------+----------------+",
);
let mut output = instance
.do_query("SELECT * FROM my_test_histo_count", ctx.clone())
.await;
let output = output.remove(0).unwrap();
let Output::Stream(stream) = output else {
unreachable!()
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(
recordbatches.pretty_print().unwrap(),
"\
+------------+-------+--------------------+------------+---------------------+----------------+
| resource | scope | telemetry_sdk_name | host | greptime_timestamp | greptime_value |
+------------+-------+--------------------+------------+---------------------+----------------+
| greptimedb | otel | java | testserver | 1970-01-01T00:00:00 | 4.0 |
+------------+-------+--------------------+------------+---------------------+----------------+",
);
}
@@ -108,15 +167,38 @@ mod test {
];
let gauge = Gauge { data_points };
let histo_data_points = vec![HistogramDataPoint {
attributes: vec![keyvalue("host", "testserver")],
time_unix_nano: 100,
count: 4,
bucket_counts: vec![1, 2, 1],
explicit_bounds: vec![1.0f64, 5.0f64],
sum: Some(51f64),
..Default::default()
}];
let histo = Histogram {
data_points: histo_data_points,
aggregation_temporality: 0,
};
ExportMetricsServiceRequest {
resource_metrics: vec![ResourceMetrics {
scope_metrics: vec![ScopeMetrics {
metrics: vec![Metric {
name: "my.test.metric".into(),
description: "my ignored desc".into(),
unit: "my ignored unit".into(),
data: Some(metric::Data::Gauge(gauge)),
}],
metrics: vec![
Metric {
name: "my.test.metric".into(),
description: "my ignored desc".into(),
unit: "my ignored unit".into(),
data: Some(metric::Data::Gauge(gauge)),
},
Metric {
name: "my.test.histo".into(),
description: "my ignored desc".into(),
unit: "my ignored unit".into(),
data: Some(metric::Data::Histogram(histo)),
},
],
scope: Some(InstrumentationScope {
attributes: vec![
keyvalue("scope", "otel"),