feat: convert different protocol to InsertRequest (#426)

* add line_writer and convert insert_stmt to InsertRequest

* support convert influxdb line protocol to InsertRequest

* support convert opentsdb to InsertRequest

* cr
This commit is contained in:
fys
2022-11-09 16:18:54 +08:00
committed by GitHub
parent 64a706d6f0
commit d08f8b87a6
11 changed files with 566 additions and 2 deletions

1
Cargo.lock generated
View File

@@ -2021,6 +2021,7 @@ dependencies = [
"servers",
"snafu",
"sql",
"sqlparser",
"store-api",
"table",
"tempdir",

View File

@@ -12,6 +12,7 @@ use crate::error::{Result, TypeMismatchSnafu};
type ColumnName = String;
// TODO(fys): will remove in the future.
#[derive(Default)]
pub struct LinesWriter {
column_name_index: HashMap<ColumnName, usize>,
@@ -208,7 +209,7 @@ impl LinesWriter {
}
}
fn to_ms_ts(p: Precision, ts: i64) -> i64 {
pub fn to_ms_ts(p: Precision, ts: i64) -> i64 {
match p {
Precision::NANOSECOND => ts / 1_000_000,
Precision::MICROSECOND => ts / 1000,

View File

@@ -26,6 +26,7 @@ openmetrics-parser = "0.4"
prost = "0.11"
query = { path = "../query" }
serde = "1.0"
sqlparser = "0.15"
servers = { path = "../servers" }
snafu = { version = "0.7", features = ["backtraces"] }
sql = { path = "../sql" }

View File

@@ -142,6 +142,36 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display("Failed to access catalog, source: {}", source))]
Catalog {
#[snafu(backtrace)]
source: catalog::error::Error,
},
#[snafu(display("Table not found: {}", table_name))]
TableNotFound {
table_name: String,
backtrace: Backtrace,
},
#[snafu(display("Column {} not found in table {}", column_name, table_name))]
ColumnNotFound {
column_name: String,
table_name: String,
backtrace: Backtrace,
},
#[snafu(display(
"Columns and values number mismatch, columns: {}, values: {}",
columns,
values,
))]
ColumnValuesNumberMismatch {
columns: usize,
values: usize,
backtrace: Backtrace,
},
#[snafu(display("Failed to join task, source: {}", source))]
JoinTask {
source: common_runtime::JoinError,
@@ -161,6 +191,7 @@ impl ErrorExt for Error {
| Error::FindRegions { .. }
| Error::InvalidInsertRequest { .. }
| Error::FindPartitionColumn { .. }
| Error::ColumnValuesNumberMismatch { .. }
| Error::RegionKeysSize { .. } => StatusCode::InvalidArguments,
Error::RuntimeResource { source, .. } => source.status_code(),
@@ -174,6 +205,8 @@ impl ErrorExt for Error {
Error::RequestDatanode { source } => source.status_code(),
Error::Catalog { source } => source.status_code(),
Error::ColumnDataType { .. }
| Error::FindDatanode { .. }
| Error::DatanodeInstance { .. } => StatusCode::Internal,
@@ -182,6 +215,10 @@ impl ErrorExt for Error {
StatusCode::Unexpected
}
Error::ExecOpentsdbPut { .. } => StatusCode::Internal,
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound,
Error::JoinTask { .. } => StatusCode::Unexpected,
}
}

View File

@@ -13,6 +13,7 @@ pub mod postgres;
pub mod prometheus;
mod server;
pub mod spliter;
mod sql;
mod table;
#[cfg(test)]
mod tests;

102
src/frontend/src/sql.rs Normal file
View File

@@ -0,0 +1,102 @@
use catalog::SchemaProviderRef;
use common_error::snafu::ensure;
use datatypes::prelude::ConcreteDataType;
use datatypes::vectors::VectorBuilder;
use snafu::{OptionExt, ResultExt};
use sql::ast::Value as SqlValue;
use sql::statements;
use sql::statements::insert::Insert;
use table::requests::InsertRequest;
use crate::error::{self, Result};
// TODO(fys): Extract the common logic in datanode and frontend in the future.
#[allow(dead_code)]
pub(crate) fn insert_to_request(
schema_provider: &SchemaProviderRef,
stmt: Insert,
) -> Result<InsertRequest> {
let columns = stmt.columns();
let values = stmt.values().context(error::ParseSqlSnafu)?;
let table_name = stmt.table_name();
let table = schema_provider
.table(&table_name)
.context(error::CatalogSnafu)?
.context(error::TableNotFoundSnafu {
table_name: &table_name,
})?;
let schema = table.schema();
let columns_num = if columns.is_empty() {
schema.column_schemas().len()
} else {
columns.len()
};
let rows_num = values.len();
let mut columns_builders: Vec<(&String, &ConcreteDataType, VectorBuilder)> =
Vec::with_capacity(columns_num);
if columns.is_empty() {
for column_schema in schema.column_schemas() {
let data_type = &column_schema.data_type;
columns_builders.push((
&column_schema.name,
data_type,
VectorBuilder::with_capacity(data_type.clone(), rows_num),
));
}
} else {
for column_name in columns {
let column_schema = schema.column_schema_by_name(column_name).with_context(|| {
error::ColumnNotFoundSnafu {
table_name: &table_name,
column_name: column_name.to_string(),
}
})?;
let data_type = &column_schema.data_type;
columns_builders.push((
column_name,
data_type,
VectorBuilder::with_capacity(data_type.clone(), rows_num),
));
}
}
for row in values {
ensure!(
row.len() == columns_num,
error::ColumnValuesNumberMismatchSnafu {
columns: columns_num,
values: row.len(),
}
);
for (sql_val, (column_name, data_type, builder)) in
row.iter().zip(columns_builders.iter_mut())
{
add_row_to_vector(column_name, data_type, sql_val, builder)?;
}
}
Ok(InsertRequest {
table_name,
columns_values: columns_builders
.into_iter()
.map(|(c, _, mut b)| (c.to_owned(), b.finish()))
.collect(),
})
}
fn add_row_to_vector(
column_name: &str,
data_type: &ConcreteDataType,
sql_val: &SqlValue,
builder: &mut VectorBuilder,
) -> Result<()> {
let value = statements::sql_value_to_value(column_name, data_type, sql_val)
.context(error::ParseSqlSnafu)?;
builder.push(&value);
Ok(())
}

View File

@@ -37,6 +37,7 @@ serde = "1.0"
serde_json = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }
snap = "1"
table = { path = "../table" }
tokio = { version = "1.20", features = ["full"] }
tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.8"

View File

@@ -7,8 +7,10 @@ use api::v1::{
use common_grpc::writer::{LinesWriter, Precision};
use influxdb_line_protocol::{parse_lines, FieldValue};
use snafu::ResultExt;
use table::requests::InsertRequest;
use crate::error::{Error, InfluxdbLineProtocolSnafu, InfluxdbLinesWriteSnafu};
use crate::line_writer::LineWriter;
pub const INFLUXDB_TIMESTAMP_COLUMN_NAME: &str = "ts";
pub const DEFAULT_TIME_PRECISION: Precision = Precision::NANOSECOND;
@@ -20,6 +22,60 @@ pub struct InfluxdbRequest {
type TableName = String;
impl TryFrom<&InfluxdbRequest> for Vec<InsertRequest> {
type Error = Error;
fn try_from(value: &InfluxdbRequest) -> Result<Self, Self::Error> {
let lines = parse_lines(&value.lines)
.collect::<influxdb_line_protocol::Result<Vec<_>>>()
.context(InfluxdbLineProtocolSnafu)?;
let line_len = lines.len();
let mut writers: HashMap<TableName, LineWriter> = HashMap::new();
for line in lines {
let table_name = line.series.measurement;
let writer = writers
.entry(table_name.to_string())
.or_insert_with(|| LineWriter::with_lines(table_name, line_len));
let tags = line.series.tag_set;
if let Some(tags) = tags {
for (k, v) in tags {
writer.write_tag(k.as_str(), v.as_str());
}
}
let fields = line.field_set;
for (k, v) in fields {
let column_name = k.as_str();
match v {
FieldValue::I64(value) => writer.write_i64(column_name, value),
FieldValue::U64(value) => writer.write_u64(column_name, value),
FieldValue::F64(value) => writer.write_f64(column_name, value),
FieldValue::String(value) => writer.write_string(column_name, value.as_str()),
FieldValue::Boolean(value) => writer.write_bool(column_name, value),
}
}
if let Some(timestamp) = line.timestamp {
let precision = if let Some(val) = &value.precision {
*val
} else {
DEFAULT_TIME_PRECISION
};
writer.write_ts(INFLUXDB_TIMESTAMP_COLUMN_NAME, (timestamp, precision));
}
writer.commit();
}
Ok(writers
.into_iter()
.map(|(_, writer)| writer.finish())
.collect())
}
}
// TODO(fys): will remove in the future.
impl TryFrom<&InfluxdbRequest> for Vec<InsertExpr> {
type Error = Error;
@@ -106,7 +162,7 @@ impl TryFrom<&InfluxdbRequest> for Vec<InsertExpr> {
#[cfg(test)]
mod tests {
use std::ops::Deref;
use std::{ops::Deref, sync::Arc};
use api::v1::{
codec::InsertBatch,
@@ -115,6 +171,9 @@ mod tests {
Column, ColumnDataType, InsertExpr,
};
use common_base::BitVec;
use common_time::{timestamp::TimeUnit, Timestamp};
use datatypes::{value::Value, vectors::Vector};
use table::requests::InsertRequest;
use crate::influxdb::InfluxdbRequest;
@@ -124,6 +183,30 @@ mod tests {
monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100
monitor1,host=host2 memory=1027 1663840496400340001
monitor2,host=host3 cpu=66.5 1663840496100023102
monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
let influxdb_req = &InfluxdbRequest {
precision: None,
lines: lines.to_string(),
};
let insert_reqs: Vec<InsertRequest> = influxdb_req.try_into().unwrap();
for insert_req in insert_reqs {
match &insert_req.table_name[..] {
"monitor1" => assert_table_1(&insert_req),
"monitor2" => assert_table_2(&insert_req),
_ => panic!(),
}
}
}
// TODO(fys): will remove in the future.
#[test]
fn test_convert_influxdb_lines_1() {
let lines = r"
monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100
monitor1,host=host2 memory=1027 1663840496400340001
monitor2,host=host3 cpu=66.5 1663840496100023102
monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
let influxdb_req = &InfluxdbRequest {
@@ -150,6 +233,77 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
}
}
fn assert_table_1(insert_req: &InsertRequest) {
let table_name = &insert_req.table_name;
assert_eq!("monitor1", table_name);
let columns = &insert_req.columns_values;
let host = columns.get("host").unwrap();
let expetcd: Vec<Value> = vec!["host1".into(), "host2".into()];
assert_vector(&expetcd, host);
let cpu = columns.get("cpu").unwrap();
let expetcd: Vec<Value> = vec![66.6.into(), Value::Null];
assert_vector(&expetcd, cpu);
let memory = columns.get("memory").unwrap();
let expetcd: Vec<Value> = vec![1024.0.into(), 1027.0.into()];
assert_vector(&expetcd, memory);
let ts = columns.get("ts").unwrap();
let expetcd: Vec<Value> = vec![
datatypes::prelude::Value::Timestamp(Timestamp::new(
1663840496100,
TimeUnit::Millisecond,
)),
datatypes::prelude::Value::Timestamp(Timestamp::new(
1663840496400,
TimeUnit::Millisecond,
)),
];
assert_vector(&expetcd, ts);
}
fn assert_table_2(insert_req: &InsertRequest) {
let table_name = &insert_req.table_name;
assert_eq!("monitor2", table_name);
let columns = &insert_req.columns_values;
let host = columns.get("host").unwrap();
let expetcd: Vec<Value> = vec!["host3".into(), "host4".into()];
assert_vector(&expetcd, host);
let cpu = columns.get("cpu").unwrap();
let expetcd: Vec<Value> = vec![66.5.into(), 66.3.into()];
assert_vector(&expetcd, cpu);
let memory = columns.get("memory").unwrap();
let expetcd: Vec<Value> = vec![Value::Null, 1029.0.into()];
assert_vector(&expetcd, memory);
let ts = columns.get("ts").unwrap();
let expetcd: Vec<Value> = vec![
datatypes::prelude::Value::Timestamp(Timestamp::new(
1663840496100,
TimeUnit::Millisecond,
)),
datatypes::prelude::Value::Timestamp(Timestamp::new(
1663840496400,
TimeUnit::Millisecond,
)),
];
assert_vector(&expetcd, ts);
}
fn assert_vector(expected: &[Value], vector: &Arc<dyn Vector>) {
for (idx, expected) in expected.iter().enumerate() {
let val = vector.get(idx);
assert_eq!(*expected, val);
}
}
fn assert_monitor_1(insert_batch: &InsertBatch) {
let columns = &insert_batch.columns;
assert_eq!(4, columns.len());

View File

@@ -5,6 +5,7 @@ pub mod error;
pub mod grpc;
pub mod http;
pub mod influxdb;
pub mod line_writer;
pub mod mysql;
pub mod opentsdb;
pub mod postgres;

View File

@@ -0,0 +1,202 @@
use std::collections::HashMap;
use common_grpc::writer::{to_ms_ts, Precision};
use common_time::{timestamp::TimeUnit::Millisecond, Timestamp};
use datatypes::{
prelude::ConcreteDataType,
types::TimestampType,
value::Value,
vectors::{VectorBuilder, VectorRef},
};
use table::requests::InsertRequest;
type ColumnLen = usize;
type ColumnName = String;
pub struct LineWriter {
table_name: String,
expected_rows: usize,
current_rows: usize,
columns_builders: HashMap<ColumnName, (VectorBuilder, ColumnLen)>,
}
impl LineWriter {
pub fn with_lines(table_name: impl Into<String>, lines: usize) -> Self {
Self {
table_name: table_name.into(),
expected_rows: lines,
current_rows: 0,
columns_builders: Default::default(),
}
}
pub fn write_ts(&mut self, column_name: &str, value: (i64, Precision)) {
let (val, precision) = value;
let datatype = ConcreteDataType::Timestamp(TimestampType { unit: Millisecond });
let ts_val = Value::Timestamp(Timestamp::new(to_ms_ts(precision, val), Millisecond));
self.write(column_name, datatype, ts_val);
}
pub fn write_tag(&mut self, column_name: &str, value: &str) {
self.write(
column_name,
ConcreteDataType::string_datatype(),
Value::String(value.into()),
);
}
pub fn write_u64(&mut self, column_name: &str, value: u64) {
self.write(
column_name,
ConcreteDataType::uint64_datatype(),
Value::UInt64(value),
);
}
pub fn write_i64(&mut self, column_name: &str, value: i64) {
self.write(
column_name,
ConcreteDataType::int64_datatype(),
Value::Int64(value),
);
}
pub fn write_f64(&mut self, column_name: &str, value: f64) {
self.write(
column_name,
ConcreteDataType::float64_datatype(),
Value::Float64(value.into()),
);
}
pub fn write_string(&mut self, column_name: &str, value: &str) {
self.write(
column_name,
ConcreteDataType::string_datatype(),
Value::String(value.into()),
);
}
pub fn write_bool(&mut self, column_name: &str, value: bool) {
self.write(
column_name,
ConcreteDataType::boolean_datatype(),
Value::Boolean(value),
);
}
fn write(&mut self, column_name: &str, datatype: ConcreteDataType, value: Value) {
let or_insert = || {
let rows = self.current_rows;
let mut builder = VectorBuilder::with_capacity(datatype, self.expected_rows);
(0..rows).into_iter().for_each(|_| builder.push_null());
(builder, rows)
};
let (builder, column_len) = self
.columns_builders
.entry(column_name.to_string())
.or_insert_with(or_insert);
builder.push(&value);
*column_len += 1;
}
pub fn commit(&mut self) {
self.current_rows += 1;
self.columns_builders
.values_mut()
.into_iter()
.for_each(|(builder, len)| {
if self.current_rows > *len {
builder.push(&Value::Null)
}
});
}
pub fn finish(self) -> InsertRequest {
let columns_values: HashMap<ColumnName, VectorRef> = self
.columns_builders
.into_iter()
.map(|(column_name, (mut builder, _))| (column_name, builder.finish()))
.collect();
InsertRequest {
table_name: self.table_name,
columns_values,
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_time::Timestamp;
use datatypes::{value::Value, vectors::Vector};
use crate::line_writer::{LineWriter, Precision};
#[test]
fn test_writer() {
let mut writer = LineWriter::with_lines("demo".to_string(), 4);
writer.write_ts("ts", (1665893727685, Precision::MILLISECOND));
writer.write_tag("host", "host-1");
writer.write_i64("memory", 10_i64);
writer.commit();
writer.write_ts("ts", (1665893727686, Precision::MILLISECOND));
writer.write_tag("host", "host-2");
writer.write_tag("region", "region-2");
writer.write_i64("memory", 9_i64);
writer.commit();
writer.write_ts("ts", (1665893727689, Precision::MILLISECOND));
writer.write_tag("host", "host-3");
writer.write_tag("region", "region-3");
writer.write_i64("cpu", 19_i64);
writer.commit();
let insert_request = writer.finish();
assert_eq!("demo", insert_request.table_name);
let columns = insert_request.columns_values;
assert_eq!(5, columns.len());
assert!(columns.contains_key("ts"));
assert!(columns.contains_key("host"));
assert!(columns.contains_key("memory"));
assert!(columns.contains_key("region"));
assert!(columns.contains_key("cpu"));
let ts = columns.get("ts").unwrap();
let host = columns.get("host").unwrap();
let memory = columns.get("memory").unwrap();
let region = columns.get("region").unwrap();
let cpu = columns.get("cpu").unwrap();
let expected: Vec<Value> = vec![
Value::Timestamp(Timestamp::from_millis(1665893727685_i64)),
Value::Timestamp(Timestamp::from_millis(1665893727686_i64)),
Value::Timestamp(Timestamp::from_millis(1665893727689_i64)),
];
assert_vector(&expected, ts);
let expected: Vec<Value> = vec!["host-1".into(), "host-2".into(), "host-3".into()];
assert_vector(&expected, host);
let expected: Vec<Value> = vec![10_i64.into(), 9_i64.into(), Value::Null];
assert_vector(&expected, memory);
let expected: Vec<Value> = vec![Value::Null, "region-2".into(), "region-3".into()];
assert_vector(&expected, region);
let expected: Vec<Value> = vec![Value::Null, Value::Null, 19_i64.into()];
assert_vector(&expected, cpu);
}
fn assert_vector(expected: &[Value], vector: &Arc<dyn Vector>) {
for (idx, expected) in expected.iter().enumerate() {
let val = vector.get(idx);
assert_eq!(*expected, val);
}
}
}

View File

@@ -2,8 +2,11 @@ use std::collections::HashMap;
use api::v1::codec::InsertBatch;
use api::v1::{column, column::SemanticType, insert_expr, Column, ColumnDataType, InsertExpr};
use common_grpc::writer::Precision;
use table::requests::InsertRequest;
use crate::error::{self, Result};
use crate::line_writer::LineWriter;
pub const OPENTSDB_TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp";
pub const OPENTSDB_VALUE_COLUMN_NAME: &str = "greptime_value";
@@ -112,6 +115,23 @@ impl DataPoint {
self.value
}
pub fn as_insert_request(&self) -> InsertRequest {
let mut line_writer = LineWriter::with_lines(self.metric.clone(), 1);
line_writer.write_ts(
OPENTSDB_TIMESTAMP_COLUMN_NAME,
(self.ts_millis(), Precision::MILLISECOND),
);
line_writer.write_f64(OPENTSDB_VALUE_COLUMN_NAME, self.value);
for (tagk, tagv) in self.tags.iter() {
line_writer.write_tag(tagk, tagv);
}
line_writer.commit();
line_writer.finish()
}
// TODO(fys): will remove in the future.
pub fn as_grpc_insert(&self) -> InsertExpr {
let mut columns = Vec::with_capacity(2 + self.tags.len());
@@ -180,6 +200,12 @@ impl DataPoint {
#[cfg(test)]
mod test {
use std::sync::Arc;
use common_time::{timestamp::TimeUnit, Timestamp};
use datatypes::value::Value;
use datatypes::vectors::Vector;
use super::*;
#[test]
@@ -239,6 +265,43 @@ mod test {
);
}
#[test]
fn test_as_insert_request() {
let data_point = DataPoint {
metric: "my_metric_1".to_string(),
ts_millis: 1000,
value: 1.0,
tags: vec![
("tagk1".to_string(), "tagv1".to_string()),
("tagk2".to_string(), "tagv2".to_string()),
],
};
let insert_request = data_point.as_insert_request();
assert_eq!("my_metric_1", insert_request.table_name);
let columns = insert_request.columns_values;
assert_eq!(4, columns.len());
let ts = columns.get(OPENTSDB_TIMESTAMP_COLUMN_NAME).unwrap();
let expected = vec![datatypes::prelude::Value::Timestamp(Timestamp::new(
1000,
TimeUnit::Millisecond,
))];
assert_vector(&expected, ts);
let val = columns.get(OPENTSDB_VALUE_COLUMN_NAME).unwrap();
assert_vector(&[1.0.into()], val);
let tagk1 = columns.get("tagk1").unwrap();
assert_vector(&["tagv1".into()], tagk1);
let tagk2 = columns.get("tagk2").unwrap();
assert_vector(&["tagv2".into()], tagk2);
}
fn assert_vector(expected: &[Value], vector: &Arc<dyn Vector>) {
for (idx, expected) in expected.iter().enumerate() {
let val = vector.get(idx);
assert_eq!(*expected, val);
}
}
// TODO(fys): will remove in the future.
#[test]
fn test_as_grpc_insert() {
let data_point = DataPoint {