feat: prometheus row inserter (#2263)

* feat: prometheus row inserter

* chore: add unit test

* refactor: to row_insert_requests

* chore: typo

* chore: alloc row by TableData

* chore: by review comment
This commit is contained in:
JeremyHi
2023-08-28 11:22:23 +08:00
committed by GitHub
parent c56f5e39cd
commit 63b22b2403
9 changed files with 534 additions and 215 deletions

1
Cargo.lock generated
View File

@@ -1774,6 +1774,7 @@ dependencies = [
"common-recordbatch",
"common-runtime",
"common-telemetry",
"common-time",
"criterion 0.4.0",
"dashmap",
"datafusion",

View File

@@ -14,6 +14,7 @@ common-error = { workspace = true }
common-recordbatch = { workspace = true }
common-runtime = { workspace = true }
common-telemetry = { workspace = true }
common-time = { workspace = true }
dashmap = "5.4"
datafusion.workspace = true
datatypes = { workspace = true }

View File

@@ -75,6 +75,9 @@ pub enum Error {
location: Location,
source: datatypes::error::Error,
},
#[snafu(display("Not supported: {}", feat))]
NotSupported { feat: String },
}
impl ErrorExt for Error {
@@ -83,7 +86,8 @@ impl ErrorExt for Error {
Error::InvalidTlsConfig { .. }
| Error::InvalidConfigFilePath { .. }
| Error::TypeMismatch { .. }
| Error::InvalidFlightData { .. } => StatusCode::InvalidArguments,
| Error::InvalidFlightData { .. }
| Error::NotSupported { .. } => StatusCode::InvalidArguments,
Error::CreateChannel { .. }
| Error::Conversion { .. }

View File

@@ -18,9 +18,11 @@ use std::fmt::Display;
use api::helper::values_with_capacity;
use api::v1::{Column, ColumnDataType, SemanticType};
use common_base::BitVec;
use common_time::timestamp::TimeUnit;
use snafu::ensure;
use crate::error::{Result, TypeMismatchSnafu};
use crate::Error;
type ColumnName = String;
@@ -259,6 +261,24 @@ impl Display for Precision {
}
}
impl TryFrom<Precision> for TimeUnit {
type Error = Error;
fn try_from(precision: Precision) -> std::result::Result<Self, Self::Error> {
Ok(match precision {
Precision::Second => TimeUnit::Second,
Precision::Millisecond => TimeUnit::Millisecond,
Precision::Microsecond => TimeUnit::Microsecond,
Precision::Nanosecond => TimeUnit::Nanosecond,
_ => {
return Err(Error::NotSupported {
feat: format!("convert {precision} into TimeUnit"),
})
}
})
}
}
#[cfg(test)]
mod tests {
use api::v1::{ColumnDataType, SemanticType};

View File

@@ -154,9 +154,9 @@ impl PromStoreProtocolHandler for Instance {
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::PromStoreWrite)
.context(AuthSnafu)?;
let (requests, samples) = prom_store::to_grpc_insert_requests(request)?;
let (requests, samples) = prom_store::to_grpc_row_insert_requests(request)?;
let _ = self
.handle_inserts(requests, ctx)
.handle_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;

View File

@@ -16,20 +16,16 @@ use std::collections::HashMap;
use api::v1::value::ValueData;
use api::v1::{
ColumnDataType, ColumnSchema, InsertRequest as GrpcInsertRequest, InsertRequests, Row,
RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value,
ColumnDataType, InsertRequest as GrpcInsertRequest, InsertRequests, RowInsertRequests,
};
use common_grpc::writer;
use common_grpc::writer::{LinesWriter, Precision};
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use influxdb_line_protocol::{parse_lines, FieldSet, FieldValue, TagSet};
use snafu::{ensure, OptionExt, ResultExt};
use influxdb_line_protocol::{parse_lines, FieldValue};
use snafu::{OptionExt, ResultExt};
use crate::error::{
Error, IncompatibleSchemaSnafu, InfluxdbLineProtocolSnafu, InfluxdbLinesWriteSnafu,
TimePrecisionSnafu,
};
use crate::error::{Error, InfluxdbLineProtocolSnafu, InfluxdbLinesWriteSnafu, TimePrecisionSnafu};
use crate::row_writer::{self, MultiTableData};
pub const INFLUXDB_TIMESTAMP_COLUMN_NAME: &str = "ts";
pub const DEFAULT_TIME_PRECISION: Precision = Precision::Nanosecond;
@@ -107,7 +103,7 @@ impl TryFrom<InfluxdbRequest> for InsertRequests {
} else {
let precision = unwrap_or_default_precision(value.precision);
let timestamp = Timestamp::current_millis();
let unit = get_time_unit(precision)?;
let unit: TimeUnit = precision.try_into().context(InfluxdbLinesWriteSnafu)?;
let timestamp = timestamp
.convert_to(unit)
.with_context(|| TimePrecisionSnafu {
@@ -147,13 +143,7 @@ impl TryFrom<InfluxdbRequest> for RowInsertRequests {
.collect::<influxdb_line_protocol::Result<Vec<_>>>()
.context(InfluxdbLineProtocolSnafu)?;
struct TableData<'a> {
schema: Vec<ColumnSchema>,
rows: Vec<Row>,
column_indexes: HashMap<&'a str, usize>,
}
let mut table_data_map = HashMap::new();
let mut multi_table_data = MultiTableData::new();
for line in &lines {
let table_name = line.series.measurement.as_str();
@@ -163,194 +153,48 @@ impl TryFrom<InfluxdbRequest> for RowInsertRequests {
// tags.len + fields.len + timestamp(+1)
let num_columns = tags.as_ref().map(|x| x.len()).unwrap_or(0) + fields.len() + 1;
let TableData {
schema,
rows,
column_indexes,
} = table_data_map
.entry(table_name)
.or_insert_with(|| TableData {
schema: Vec::with_capacity(num_columns),
rows: Vec::new(),
column_indexes: HashMap::with_capacity(num_columns),
});
let mut one_row = vec![Value { value_data: None }; schema.len()];
let table_data = multi_table_data.get_or_default_table_data(table_name, num_columns, 0);
let mut one_row = table_data.alloc_one_row();
// tags
parse_tags(tags, column_indexes, schema, &mut one_row)?;
if let Some(tags) = tags {
let kvs = tags.iter().map(|(k, v)| (k.as_str(), v.as_str()));
row_writer::write_tags(table_data, kvs, &mut one_row)?;
}
// fields
parse_fields(fields, column_indexes, schema, &mut one_row)?;
let fields = fields.iter().map(|(k, v)| {
let (datatype, value) = match v {
FieldValue::I64(v) => (ColumnDataType::Int64, ValueData::I64Value(*v)),
FieldValue::U64(v) => (ColumnDataType::Uint64, ValueData::U64Value(*v)),
FieldValue::F64(v) => (ColumnDataType::Float64, ValueData::F64Value(*v)),
FieldValue::String(v) => (
ColumnDataType::String,
ValueData::StringValue(v.to_string()),
),
FieldValue::Boolean(v) => (ColumnDataType::Boolean, ValueData::BoolValue(*v)),
};
(k.as_str(), datatype, value)
});
row_writer::write_fields(table_data, fields, &mut one_row)?;
// timestamp
parse_ts(ts, value.precision, column_indexes, schema, &mut one_row)?;
let precision = unwrap_or_default_precision(value.precision);
row_writer::write_ts_precision(
table_data,
INFLUXDB_TIMESTAMP_COLUMN_NAME,
ts,
precision,
&mut one_row,
)?;
rows.push(Row { values: one_row });
table_data.add_row(one_row);
}
let inserts = table_data_map
.into_iter()
.map(
|(
table_name,
TableData {
schema, mut rows, ..
},
)| {
let num_columns = schema.len();
for row in rows.iter_mut() {
if num_columns > row.values.len() {
row.values.resize(num_columns, Value { value_data: None });
}
}
RowInsertRequest {
table_name: table_name.to_string(),
rows: Some(Rows { schema, rows }),
..Default::default()
}
},
)
.collect::<Vec<_>>();
Ok(RowInsertRequests { inserts })
Ok(multi_table_data.into_row_insert_requests().0)
}
}
fn parse_tags<'a>(
tags: &'a Option<TagSet>,
column_indexes: &mut HashMap<&'a str, usize>,
schema: &mut Vec<ColumnSchema>,
one_row: &mut Vec<Value>,
) -> Result<(), Error> {
let Some(tags) = tags else {
return Ok(());
};
for (k, v) in tags {
let index = column_indexes.entry(k.as_str()).or_insert(schema.len());
if *index == schema.len() {
schema.push(ColumnSchema {
column_name: k.to_string(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as i32,
});
one_row.push(ValueData::StringValue(v.to_string()).into());
} else {
check_schema(ColumnDataType::String, SemanticType::Tag, &schema[*index])?;
one_row[*index].value_data = Some(ValueData::StringValue(v.to_string()));
}
}
Ok(())
}
fn parse_fields<'a>(
fields: &'a FieldSet,
column_indexes: &mut HashMap<&'a str, usize>,
schema: &mut Vec<ColumnSchema>,
one_row: &mut Vec<Value>,
) -> Result<(), Error> {
for (k, v) in fields {
let index = column_indexes.entry(k.as_str()).or_insert(schema.len());
let (datatype, value) = match v {
FieldValue::I64(v) => (ColumnDataType::Int64, ValueData::I64Value(*v)),
FieldValue::U64(v) => (ColumnDataType::Uint64, ValueData::U64Value(*v)),
FieldValue::F64(v) => (ColumnDataType::Float64, ValueData::F64Value(*v)),
FieldValue::String(v) => (
ColumnDataType::String,
ValueData::StringValue(v.to_string()),
),
FieldValue::Boolean(v) => (ColumnDataType::Boolean, ValueData::BoolValue(*v)),
};
if *index == schema.len() {
schema.push(ColumnSchema {
column_name: k.to_string(),
datatype: datatype as i32,
semantic_type: SemanticType::Field as i32,
});
one_row.push(value.into());
} else {
check_schema(datatype, SemanticType::Field, &schema[*index])?;
one_row[*index].value_data = Some(value);
}
}
Ok(())
}
fn parse_ts(
ts: Option<i64>,
precision: Option<Precision>,
column_indexes: &mut HashMap<&str, usize>,
schema: &mut Vec<ColumnSchema>,
one_row: &mut Vec<Value>,
) -> Result<(), Error> {
let precision = unwrap_or_default_precision(precision);
let ts = match ts {
Some(timestamp) => writer::to_ms_ts(precision, timestamp),
None => {
let timestamp = Timestamp::current_millis();
let unit = get_time_unit(precision)?;
let timestamp = timestamp
.convert_to(unit)
.with_context(|| TimePrecisionSnafu {
name: precision.to_string(),
})?;
writer::to_ms_ts(precision, timestamp.into())
}
};
let column_name = INFLUXDB_TIMESTAMP_COLUMN_NAME;
let index = column_indexes.entry(column_name).or_insert(schema.len());
if *index == schema.len() {
schema.push(ColumnSchema {
column_name: column_name.to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
semantic_type: SemanticType::Timestamp as i32,
});
one_row.push(ValueData::TsMillisecondValue(ts).into())
} else {
check_schema(
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
&schema[*index],
)?;
one_row[*index].value_data = Some(ValueData::TsMillisecondValue(ts));
}
Ok(())
}
#[inline]
fn check_schema(
datatype: ColumnDataType,
semantic_type: SemanticType,
schema: &ColumnSchema,
) -> Result<(), Error> {
ensure!(
schema.datatype == datatype as i32,
IncompatibleSchemaSnafu {
column_name: &schema.column_name,
datatype: "datatype",
expected: schema.datatype,
actual: datatype as i32,
}
);
ensure!(
schema.semantic_type == semantic_type as i32,
IncompatibleSchemaSnafu {
column_name: &schema.column_name,
datatype: "semantic_type",
expected: schema.semantic_type,
actual: semantic_type as i32,
}
);
Ok(())
}
#[inline]
fn unwrap_or_default_precision(precision: Option<Precision>) -> Precision {
if let Some(val) = precision {
@@ -360,25 +204,11 @@ fn unwrap_or_default_precision(precision: Option<Precision>) -> Precision {
}
}
#[inline]
fn get_time_unit(precision: Precision) -> Result<TimeUnit, Error> {
Ok(match precision {
Precision::Second => TimeUnit::Second,
Precision::Millisecond => TimeUnit::Millisecond,
Precision::Microsecond => TimeUnit::Microsecond,
Precision::Nanosecond => TimeUnit::Nanosecond,
_ => {
return Err(Error::NotSupported {
feat: format!("convert {precision} into TimeUnit"),
})
}
})
}
#[cfg(test)]
mod tests {
use api::v1::column::Values;
use api::v1::{Column, ColumnDataType, SemanticType};
use api::v1::value::ValueData;
use api::v1::{Column, ColumnDataType, Rows, SemanticType};
use common_base::BitVec;
use super::*;

View File

@@ -37,6 +37,7 @@ pub mod postgres;
pub mod prom_store;
pub mod prometheus_handler;
pub mod query_handler;
mod row_writer;
pub mod server;
mod shutdown;
pub mod tls;

View File

@@ -20,7 +20,7 @@ use std::hash::{Hash, Hasher};
use api::prom_store::remote::label_matcher::Type as MatcherType;
use api::prom_store::remote::{Label, Query, Sample, TimeSeries, WriteRequest};
use api::v1::{InsertRequest as GrpcInsertRequest, InsertRequests};
use api::v1::{InsertRequest as GrpcInsertRequest, InsertRequests, RowInsertRequests};
use common_grpc::writer::{LinesWriter, Precision};
use common_recordbatch::{RecordBatch, RecordBatches};
use common_time::timestamp::TimeUnit;
@@ -34,6 +34,7 @@ use snafu::{ensure, OptionExt, ResultExt};
use snap::raw::{Decoder, Encoder};
use crate::error::{self, Result};
use crate::row_writer::{self, MultiTableData};
pub const TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp";
pub const FIELD_COLUMN_NAME: &str = "greptime_value";
@@ -300,6 +301,61 @@ fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Ve
Ok(timeseries_map.into_values().collect())
}
pub fn to_grpc_row_insert_requests(request: WriteRequest) -> Result<(RowInsertRequests, usize)> {
let mut multi_table_data = MultiTableData::new();
for series in &request.timeseries {
let table_name = &series
.labels
.iter()
.find(|label| {
// The metric name is a special label
label.name == METRIC_NAME_LABEL
})
.context(error::InvalidPromRemoteRequestSnafu {
msg: "missing '__name__' label in time-series",
})?
.value;
// The metric name is a special label,
// num_columns = labels.len() - 1 + 1 (value) + 1 (timestamp)
let num_columns = series.labels.len() + 1;
let table_data = multi_table_data.get_or_default_table_data(
table_name,
num_columns,
series.samples.len(),
);
for Sample { value, timestamp } in &series.samples {
let mut one_row = table_data.alloc_one_row();
// labels
let kvs = series.labels.iter().filter_map(|label| {
if label.name == METRIC_NAME_LABEL {
None
} else {
Some((label.name.as_str(), label.value.as_str()))
}
});
row_writer::write_tags(table_data, kvs, &mut one_row)?;
// value
row_writer::write_f64(table_data, FIELD_COLUMN_NAME, *value, &mut one_row)?;
// timestamp
row_writer::write_ts_millis(
table_data,
TIMESTAMP_COLUMN_NAME,
Some(*timestamp),
&mut one_row,
)?;
table_data.add_row(one_row);
}
}
Ok(multi_table_data.into_row_insert_requests())
}
pub fn to_grpc_insert_requests(request: WriteRequest) -> Result<(InsertRequests, usize)> {
let mut writers: HashMap<String, LinesWriter> = HashMap::new();
for timeseries in &request.timeseries {
@@ -450,6 +506,7 @@ mod tests {
use std::sync::Arc;
use api::prom_store::remote::LabelMatcher;
use api::v1::{ColumnDataType, Row, SemanticType};
use datafusion::prelude::SessionContext;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
@@ -564,6 +621,141 @@ mod tests {
assert_eq!("Filter: ?table?.greptime_timestamp >= TimestampMillisecond(1000, None) AND ?table?.greptime_timestamp <= TimestampMillisecond(2000, None) AND regexp_match(?table?.job, Utf8(\"*prom*\")) IS NOT NULL AND ?table?.instance != Utf8(\"localhost\")\n TableScan: ?table?", display_string);
}
fn column_schemas_with(
mut kts_iter: Vec<(&str, ColumnDataType, SemanticType)>,
) -> Vec<api::v1::ColumnSchema> {
kts_iter.push((
"greptime_value",
ColumnDataType::Float64,
SemanticType::Field,
));
kts_iter.push((
"greptime_timestamp",
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
));
kts_iter
.into_iter()
.map(|(k, t, s)| api::v1::ColumnSchema {
column_name: k.to_string(),
datatype: t as i32,
semantic_type: s as i32,
})
.collect()
}
fn make_row_with_label(l1: &str, value: f64, timestamp: i64) -> Row {
Row {
values: vec![
api::v1::Value {
value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
},
api::v1::Value {
value_data: Some(api::v1::value::ValueData::F64Value(value)),
},
api::v1::Value {
value_data: Some(api::v1::value::ValueData::TsMillisecondValue(timestamp)),
},
],
}
}
fn make_row_with_2_labels(l1: &str, l2: &str, value: f64, timestamp: i64) -> Row {
Row {
values: vec![
api::v1::Value {
value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
},
api::v1::Value {
value_data: Some(api::v1::value::ValueData::StringValue(l2.to_string())),
},
api::v1::Value {
value_data: Some(api::v1::value::ValueData::F64Value(value)),
},
api::v1::Value {
value_data: Some(api::v1::value::ValueData::TsMillisecondValue(timestamp)),
},
],
}
}
#[test]
fn test_write_request_to_row_insert_exprs() {
let write_request = WriteRequest {
timeseries: mock_timeseries(),
..Default::default()
};
let mut exprs = to_grpc_row_insert_requests(write_request)
.unwrap()
.0
.inserts;
exprs.sort_unstable_by(|l, r| l.table_name.cmp(&r.table_name));
assert_eq!(3, exprs.len());
assert_eq!("metric1", exprs[0].table_name);
assert_eq!("metric2", exprs[1].table_name);
assert_eq!("metric3", exprs[2].table_name);
let rows = exprs[0].rows.as_ref().unwrap();
let schema = &rows.schema;
let rows = &rows.rows;
assert_eq!(2, rows.len());
assert_eq!(3, schema.len());
assert_eq!(
column_schemas_with(vec![("job", ColumnDataType::String, SemanticType::Tag)]),
*schema
);
assert_eq!(
&vec![
make_row_with_label("spark", 1.0, 1000),
make_row_with_label("spark", 2.0, 2000),
],
rows
);
let rows = exprs[1].rows.as_ref().unwrap();
let schema = &rows.schema;
let rows = &rows.rows;
assert_eq!(2, rows.len());
assert_eq!(4, schema.len());
assert_eq!(
column_schemas_with(vec![
("instance", ColumnDataType::String, SemanticType::Tag),
("idc", ColumnDataType::String, SemanticType::Tag)
]),
*schema
);
assert_eq!(
&vec![
make_row_with_2_labels("test_host1", "z001", 3.0, 1000),
make_row_with_2_labels("test_host1", "z001", 4.0, 2000),
],
rows
);
let rows = exprs[2].rows.as_ref().unwrap();
let schema = &rows.schema;
let rows = &rows.rows;
assert_eq!(3, rows.len());
assert_eq!(4, schema.len());
assert_eq!(
column_schemas_with(vec![
("idc", ColumnDataType::String, SemanticType::Tag),
("app", ColumnDataType::String, SemanticType::Tag)
]),
*schema
);
assert_eq!(
&vec![
make_row_with_2_labels("z002", "biz", 5.0, 1000),
make_row_with_2_labels("z002", "biz", 6.0, 2000),
make_row_with_2_labels("z002", "biz", 7.0, 3000),
],
rows
);
}
#[test]
fn test_write_request_to_insert_exprs() {
let write_request = WriteRequest {

View File

@@ -0,0 +1,270 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::value::ValueData;
use api::v1::{
ColumnDataType, ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows, SemanticType,
Value,
};
use common_grpc::writer;
use common_grpc::writer::Precision;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use snafu::{ensure, OptionExt, ResultExt};
use crate::error::{IncompatibleSchemaSnafu, InfluxdbLinesWriteSnafu, Result, TimePrecisionSnafu};
pub struct TableData<'a> {
schema: Vec<ColumnSchema>,
rows: Vec<Row>,
column_indexes: HashMap<&'a str, usize>,
}
impl TableData<'_> {
pub fn new(num_columns: usize, num_rows: usize) -> Self {
Self {
schema: Vec::with_capacity(num_columns),
rows: Vec::with_capacity(num_rows),
column_indexes: HashMap::with_capacity(num_columns),
}
}
#[inline]
pub fn num_columns(&self) -> usize {
self.schema.len()
}
#[inline]
pub fn num_rows(&self) -> usize {
self.rows.len()
}
#[inline]
pub fn alloc_one_row(&self) -> Vec<Value> {
vec![Value { value_data: None }; self.num_columns()]
}
#[inline]
pub fn add_row(&mut self, values: Vec<Value>) {
self.rows.push(Row { values })
}
pub fn into_schema_and_rows(self) -> (Vec<ColumnSchema>, Vec<Row>) {
(self.schema, self.rows)
}
}
pub struct MultiTableData<'a> {
table_data_map: HashMap<&'a str, TableData<'a>>,
}
impl<'a> MultiTableData<'a> {
pub fn new() -> Self {
Self {
table_data_map: HashMap::new(),
}
}
pub fn get_or_default_table_data(
&mut self,
table_name: &'a str,
num_columns: usize,
num_rows: usize,
) -> &mut TableData<'a> {
self.table_data_map
.entry(table_name)
.or_insert_with(|| TableData::new(num_columns, num_rows))
}
/// Returns the request and number of rows in it.
pub fn into_row_insert_requests(self) -> (RowInsertRequests, usize) {
let mut total_rows = 0;
let inserts = self
.table_data_map
.into_iter()
.map(|(table_name, table_data)| {
total_rows += table_data.num_rows();
let num_columns = table_data.num_columns();
let (schema, mut rows) = table_data.into_schema_and_rows();
for row in &mut rows {
if num_columns > row.values.len() {
row.values.resize(num_columns, Value { value_data: None });
}
}
RowInsertRequest {
table_name: table_name.to_string(),
rows: Some(Rows { schema, rows }),
..Default::default()
}
})
.collect::<Vec<_>>();
let row_insert_requests = RowInsertRequests { inserts };
(row_insert_requests, total_rows)
}
}
pub fn write_tags<'a>(
table_data: &mut TableData<'a>,
kvs: impl Iterator<Item = (&'a str, &'a str)>,
one_row: &mut Vec<Value>,
) -> Result<()> {
let ktv_iter = kvs.map(|(k, v)| {
(
k,
ColumnDataType::String,
ValueData::StringValue(v.to_string()),
)
});
write_by_semantic_type(table_data, SemanticType::Tag, ktv_iter, one_row)
}
pub fn write_fields<'a>(
table_data: &mut TableData<'a>,
fields: impl Iterator<Item = (&'a str, ColumnDataType, ValueData)>,
one_row: &mut Vec<Value>,
) -> Result<()> {
write_by_semantic_type(table_data, SemanticType::Field, fields, one_row)
}
pub fn write_f64<'a>(
table_data: &mut TableData<'a>,
name: &'a str,
value: f64,
one_row: &mut Vec<Value>,
) -> Result<()> {
write_fields(
table_data,
std::iter::once((name, ColumnDataType::Float64, ValueData::F64Value(value))),
one_row,
)
}
fn write_by_semantic_type<'a>(
table_data: &mut TableData<'a>,
semantic_type: SemanticType,
ktv_iter: impl Iterator<Item = (&'a str, ColumnDataType, ValueData)>,
one_row: &mut Vec<Value>,
) -> Result<()> {
let TableData {
schema,
column_indexes,
..
} = table_data;
for (name, datatype, value) in ktv_iter {
let index = column_indexes.entry(name).or_insert(schema.len());
if *index == schema.len() {
schema.push(ColumnSchema {
column_name: name.to_string(),
datatype: datatype as i32,
semantic_type: semantic_type as i32,
});
one_row.push(value.into());
} else {
check_schema(datatype, semantic_type, &schema[*index])?;
one_row[*index].value_data = Some(value);
}
}
Ok(())
}
pub fn write_ts_millis<'a>(
table_data: &mut TableData<'a>,
name: &'a str,
ts: Option<i64>,
one_row: &mut Vec<Value>,
) -> Result<()> {
write_ts_precision(table_data, name, ts, Precision::Millisecond, one_row)
}
pub fn write_ts_precision<'a>(
table_data: &mut TableData<'a>,
name: &'a str,
ts: Option<i64>,
precision: Precision,
one_row: &mut Vec<Value>,
) -> Result<()> {
let TableData {
schema,
column_indexes,
..
} = table_data;
let ts = match ts {
Some(timestamp) => writer::to_ms_ts(precision, timestamp),
None => {
let timestamp = Timestamp::current_millis();
let unit: TimeUnit = precision.try_into().context(InfluxdbLinesWriteSnafu)?;
let timestamp = timestamp
.convert_to(unit)
.with_context(|| TimePrecisionSnafu {
name: precision.to_string(),
})?;
writer::to_ms_ts(precision, timestamp.into())
}
};
let index = column_indexes.entry(name).or_insert(schema.len());
if *index == schema.len() {
schema.push(ColumnSchema {
column_name: name.to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
semantic_type: SemanticType::Timestamp as i32,
});
one_row.push(ValueData::TsMillisecondValue(ts).into())
} else {
check_schema(
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
&schema[*index],
)?;
one_row[*index].value_data = Some(ValueData::TsMillisecondValue(ts));
}
Ok(())
}
#[inline]
fn check_schema(
datatype: ColumnDataType,
semantic_type: SemanticType,
schema: &ColumnSchema,
) -> Result<()> {
ensure!(
schema.datatype == datatype as i32,
IncompatibleSchemaSnafu {
column_name: &schema.column_name,
datatype: "datatype",
expected: schema.datatype,
actual: datatype as i32,
}
);
ensure!(
schema.semantic_type == semantic_type as i32,
IncompatibleSchemaSnafu {
column_name: &schema.column_name,
datatype: "semantic_type",
expected: schema.semantic_type,
actual: semantic_type as i32,
}
);
Ok(())
}