diff --git a/Cargo.lock b/Cargo.lock index f5c2bda0d2..5703faee5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2021,6 +2021,7 @@ dependencies = [ "servers", "snafu", "sql", + "sqlparser", "store-api", "table", "tempdir", diff --git a/src/common/grpc/src/writer.rs b/src/common/grpc/src/writer.rs index 2f77cbe85d..904b710fde 100644 --- a/src/common/grpc/src/writer.rs +++ b/src/common/grpc/src/writer.rs @@ -12,6 +12,7 @@ use crate::error::{Result, TypeMismatchSnafu}; type ColumnName = String; +// TODO(fys): will remove in the future. #[derive(Default)] pub struct LinesWriter { column_name_index: HashMap, @@ -208,7 +209,7 @@ impl LinesWriter { } } -fn to_ms_ts(p: Precision, ts: i64) -> i64 { +pub fn to_ms_ts(p: Precision, ts: i64) -> i64 { match p { Precision::NANOSECOND => ts / 1_000_000, Precision::MICROSECOND => ts / 1000, diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index c367686402..63e924c18d 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -26,6 +26,7 @@ openmetrics-parser = "0.4" prost = "0.11" query = { path = "../query" } serde = "1.0" +sqlparser = "0.15" servers = { path = "../servers" } snafu = { version = "0.7", features = ["backtraces"] } sql = { path = "../sql" } diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index f0a8495d4d..ff1a5bee89 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -142,6 +142,36 @@ pub enum Error { backtrace: Backtrace, }, + #[snafu(display("Failed to access catalog, source: {}", source))] + Catalog { + #[snafu(backtrace)] + source: catalog::error::Error, + }, + + #[snafu(display("Table not found: {}", table_name))] + TableNotFound { + table_name: String, + backtrace: Backtrace, + }, + + #[snafu(display("Column {} not found in table {}", column_name, table_name))] + ColumnNotFound { + column_name: String, + table_name: String, + backtrace: Backtrace, + }, + + #[snafu(display( + "Columns and values number mismatch, columns: {}, values: {}", + columns, + values, + ))] + ColumnValuesNumberMismatch { + columns: usize, + values: usize, + backtrace: Backtrace, + }, + #[snafu(display("Failed to join task, source: {}", source))] JoinTask { source: common_runtime::JoinError, @@ -161,6 +191,7 @@ impl ErrorExt for Error { | Error::FindRegions { .. } | Error::InvalidInsertRequest { .. } | Error::FindPartitionColumn { .. } + | Error::ColumnValuesNumberMismatch { .. } | Error::RegionKeysSize { .. } => StatusCode::InvalidArguments, Error::RuntimeResource { source, .. } => source.status_code(), @@ -174,6 +205,8 @@ impl ErrorExt for Error { Error::RequestDatanode { source } => source.status_code(), + Error::Catalog { source } => source.status_code(), + Error::ColumnDataType { .. } | Error::FindDatanode { .. } | Error::DatanodeInstance { .. } => StatusCode::Internal, @@ -182,6 +215,10 @@ impl ErrorExt for Error { StatusCode::Unexpected } Error::ExecOpentsdbPut { .. } => StatusCode::Internal, + + Error::TableNotFound { .. } => StatusCode::TableNotFound, + Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound, + Error::JoinTask { .. } => StatusCode::Unexpected, } } diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 0bc53567ea..5a7c36226b 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -13,6 +13,7 @@ pub mod postgres; pub mod prometheus; mod server; pub mod spliter; +mod sql; mod table; #[cfg(test)] mod tests; diff --git a/src/frontend/src/sql.rs b/src/frontend/src/sql.rs new file mode 100644 index 0000000000..78b8003ab6 --- /dev/null +++ b/src/frontend/src/sql.rs @@ -0,0 +1,102 @@ +use catalog::SchemaProviderRef; +use common_error::snafu::ensure; +use datatypes::prelude::ConcreteDataType; +use datatypes::vectors::VectorBuilder; +use snafu::{OptionExt, ResultExt}; +use sql::ast::Value as SqlValue; +use sql::statements; +use sql::statements::insert::Insert; +use table::requests::InsertRequest; + +use crate::error::{self, Result}; + +// TODO(fys): Extract the common logic in datanode and frontend in the future. +#[allow(dead_code)] +pub(crate) fn insert_to_request( + schema_provider: &SchemaProviderRef, + stmt: Insert, +) -> Result { + let columns = stmt.columns(); + let values = stmt.values().context(error::ParseSqlSnafu)?; + let table_name = stmt.table_name(); + + let table = schema_provider + .table(&table_name) + .context(error::CatalogSnafu)? + .context(error::TableNotFoundSnafu { + table_name: &table_name, + })?; + let schema = table.schema(); + let columns_num = if columns.is_empty() { + schema.column_schemas().len() + } else { + columns.len() + }; + let rows_num = values.len(); + + let mut columns_builders: Vec<(&String, &ConcreteDataType, VectorBuilder)> = + Vec::with_capacity(columns_num); + + if columns.is_empty() { + for column_schema in schema.column_schemas() { + let data_type = &column_schema.data_type; + columns_builders.push(( + &column_schema.name, + data_type, + VectorBuilder::with_capacity(data_type.clone(), rows_num), + )); + } + } else { + for column_name in columns { + let column_schema = schema.column_schema_by_name(column_name).with_context(|| { + error::ColumnNotFoundSnafu { + table_name: &table_name, + column_name: column_name.to_string(), + } + })?; + let data_type = &column_schema.data_type; + columns_builders.push(( + column_name, + data_type, + VectorBuilder::with_capacity(data_type.clone(), rows_num), + )); + } + } + + for row in values { + ensure!( + row.len() == columns_num, + error::ColumnValuesNumberMismatchSnafu { + columns: columns_num, + values: row.len(), + } + ); + + for (sql_val, (column_name, data_type, builder)) in + row.iter().zip(columns_builders.iter_mut()) + { + add_row_to_vector(column_name, data_type, sql_val, builder)?; + } + } + + Ok(InsertRequest { + table_name, + columns_values: columns_builders + .into_iter() + .map(|(c, _, mut b)| (c.to_owned(), b.finish())) + .collect(), + }) +} + +fn add_row_to_vector( + column_name: &str, + data_type: &ConcreteDataType, + sql_val: &SqlValue, + builder: &mut VectorBuilder, +) -> Result<()> { + let value = statements::sql_value_to_value(column_name, data_type, sql_val) + .context(error::ParseSqlSnafu)?; + builder.push(&value); + + Ok(()) +} diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index d5b3582868..f17575a234 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -37,6 +37,7 @@ serde = "1.0" serde_json = "1.0" snafu = { version = "0.7", features = ["backtraces"] } snap = "1" +table = { path = "../table" } tokio = { version = "1.20", features = ["full"] } tokio-stream = { version = "0.1", features = ["net"] } tonic = "0.8" diff --git a/src/servers/src/influxdb.rs b/src/servers/src/influxdb.rs index fb85e79b7b..a8262e4bef 100644 --- a/src/servers/src/influxdb.rs +++ b/src/servers/src/influxdb.rs @@ -7,8 +7,10 @@ use api::v1::{ use common_grpc::writer::{LinesWriter, Precision}; use influxdb_line_protocol::{parse_lines, FieldValue}; use snafu::ResultExt; +use table::requests::InsertRequest; use crate::error::{Error, InfluxdbLineProtocolSnafu, InfluxdbLinesWriteSnafu}; +use crate::line_writer::LineWriter; pub const INFLUXDB_TIMESTAMP_COLUMN_NAME: &str = "ts"; pub const DEFAULT_TIME_PRECISION: Precision = Precision::NANOSECOND; @@ -20,6 +22,60 @@ pub struct InfluxdbRequest { type TableName = String; +impl TryFrom<&InfluxdbRequest> for Vec { + type Error = Error; + + fn try_from(value: &InfluxdbRequest) -> Result { + let lines = parse_lines(&value.lines) + .collect::>>() + .context(InfluxdbLineProtocolSnafu)?; + let line_len = lines.len(); + let mut writers: HashMap = HashMap::new(); + + for line in lines { + let table_name = line.series.measurement; + let writer = writers + .entry(table_name.to_string()) + .or_insert_with(|| LineWriter::with_lines(table_name, line_len)); + + let tags = line.series.tag_set; + if let Some(tags) = tags { + for (k, v) in tags { + writer.write_tag(k.as_str(), v.as_str()); + } + } + + let fields = line.field_set; + for (k, v) in fields { + let column_name = k.as_str(); + match v { + FieldValue::I64(value) => writer.write_i64(column_name, value), + FieldValue::U64(value) => writer.write_u64(column_name, value), + FieldValue::F64(value) => writer.write_f64(column_name, value), + FieldValue::String(value) => writer.write_string(column_name, value.as_str()), + FieldValue::Boolean(value) => writer.write_bool(column_name, value), + } + } + + if let Some(timestamp) = line.timestamp { + let precision = if let Some(val) = &value.precision { + *val + } else { + DEFAULT_TIME_PRECISION + }; + writer.write_ts(INFLUXDB_TIMESTAMP_COLUMN_NAME, (timestamp, precision)); + } + + writer.commit(); + } + Ok(writers + .into_iter() + .map(|(_, writer)| writer.finish()) + .collect()) + } +} + +// TODO(fys): will remove in the future. impl TryFrom<&InfluxdbRequest> for Vec { type Error = Error; @@ -106,7 +162,7 @@ impl TryFrom<&InfluxdbRequest> for Vec { #[cfg(test)] mod tests { - use std::ops::Deref; + use std::{ops::Deref, sync::Arc}; use api::v1::{ codec::InsertBatch, @@ -115,6 +171,9 @@ mod tests { Column, ColumnDataType, InsertExpr, }; use common_base::BitVec; + use common_time::{timestamp::TimeUnit, Timestamp}; + use datatypes::{value::Value, vectors::Vector}; + use table::requests::InsertRequest; use crate::influxdb::InfluxdbRequest; @@ -124,6 +183,30 @@ mod tests { monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100 monitor1,host=host2 memory=1027 1663840496400340001 monitor2,host=host3 cpu=66.5 1663840496100023102 +monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; + + let influxdb_req = &InfluxdbRequest { + precision: None, + lines: lines.to_string(), + }; + let insert_reqs: Vec = influxdb_req.try_into().unwrap(); + + for insert_req in insert_reqs { + match &insert_req.table_name[..] { + "monitor1" => assert_table_1(&insert_req), + "monitor2" => assert_table_2(&insert_req), + _ => panic!(), + } + } + } + + // TODO(fys): will remove in the future. + #[test] + fn test_convert_influxdb_lines_1() { + let lines = r" +monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100 +monitor1,host=host2 memory=1027 1663840496400340001 +monitor2,host=host3 cpu=66.5 1663840496100023102 monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; let influxdb_req = &InfluxdbRequest { @@ -150,6 +233,77 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; } } + fn assert_table_1(insert_req: &InsertRequest) { + let table_name = &insert_req.table_name; + assert_eq!("monitor1", table_name); + + let columns = &insert_req.columns_values; + + let host = columns.get("host").unwrap(); + let expetcd: Vec = vec!["host1".into(), "host2".into()]; + assert_vector(&expetcd, host); + + let cpu = columns.get("cpu").unwrap(); + let expetcd: Vec = vec![66.6.into(), Value::Null]; + assert_vector(&expetcd, cpu); + + let memory = columns.get("memory").unwrap(); + let expetcd: Vec = vec![1024.0.into(), 1027.0.into()]; + assert_vector(&expetcd, memory); + + let ts = columns.get("ts").unwrap(); + let expetcd: Vec = vec![ + datatypes::prelude::Value::Timestamp(Timestamp::new( + 1663840496100, + TimeUnit::Millisecond, + )), + datatypes::prelude::Value::Timestamp(Timestamp::new( + 1663840496400, + TimeUnit::Millisecond, + )), + ]; + assert_vector(&expetcd, ts); + } + + fn assert_table_2(insert_req: &InsertRequest) { + let table_name = &insert_req.table_name; + assert_eq!("monitor2", table_name); + + let columns = &insert_req.columns_values; + + let host = columns.get("host").unwrap(); + let expetcd: Vec = vec!["host3".into(), "host4".into()]; + assert_vector(&expetcd, host); + + let cpu = columns.get("cpu").unwrap(); + let expetcd: Vec = vec![66.5.into(), 66.3.into()]; + assert_vector(&expetcd, cpu); + + let memory = columns.get("memory").unwrap(); + let expetcd: Vec = vec![Value::Null, 1029.0.into()]; + assert_vector(&expetcd, memory); + + let ts = columns.get("ts").unwrap(); + let expetcd: Vec = vec![ + datatypes::prelude::Value::Timestamp(Timestamp::new( + 1663840496100, + TimeUnit::Millisecond, + )), + datatypes::prelude::Value::Timestamp(Timestamp::new( + 1663840496400, + TimeUnit::Millisecond, + )), + ]; + assert_vector(&expetcd, ts); + } + + fn assert_vector(expected: &[Value], vector: &Arc) { + for (idx, expected) in expected.iter().enumerate() { + let val = vector.get(idx); + assert_eq!(*expected, val); + } + } + fn assert_monitor_1(insert_batch: &InsertBatch) { let columns = &insert_batch.columns; assert_eq!(4, columns.len()); diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index 2f44f5830e..d7ad934502 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -5,6 +5,7 @@ pub mod error; pub mod grpc; pub mod http; pub mod influxdb; +pub mod line_writer; pub mod mysql; pub mod opentsdb; pub mod postgres; diff --git a/src/servers/src/line_writer.rs b/src/servers/src/line_writer.rs new file mode 100644 index 0000000000..0b9927a5dd --- /dev/null +++ b/src/servers/src/line_writer.rs @@ -0,0 +1,202 @@ +use std::collections::HashMap; + +use common_grpc::writer::{to_ms_ts, Precision}; +use common_time::{timestamp::TimeUnit::Millisecond, Timestamp}; +use datatypes::{ + prelude::ConcreteDataType, + types::TimestampType, + value::Value, + vectors::{VectorBuilder, VectorRef}, +}; +use table::requests::InsertRequest; + +type ColumnLen = usize; +type ColumnName = String; + +pub struct LineWriter { + table_name: String, + expected_rows: usize, + current_rows: usize, + columns_builders: HashMap, +} + +impl LineWriter { + pub fn with_lines(table_name: impl Into, lines: usize) -> Self { + Self { + table_name: table_name.into(), + expected_rows: lines, + current_rows: 0, + columns_builders: Default::default(), + } + } + + pub fn write_ts(&mut self, column_name: &str, value: (i64, Precision)) { + let (val, precision) = value; + let datatype = ConcreteDataType::Timestamp(TimestampType { unit: Millisecond }); + let ts_val = Value::Timestamp(Timestamp::new(to_ms_ts(precision, val), Millisecond)); + self.write(column_name, datatype, ts_val); + } + + pub fn write_tag(&mut self, column_name: &str, value: &str) { + self.write( + column_name, + ConcreteDataType::string_datatype(), + Value::String(value.into()), + ); + } + + pub fn write_u64(&mut self, column_name: &str, value: u64) { + self.write( + column_name, + ConcreteDataType::uint64_datatype(), + Value::UInt64(value), + ); + } + + pub fn write_i64(&mut self, column_name: &str, value: i64) { + self.write( + column_name, + ConcreteDataType::int64_datatype(), + Value::Int64(value), + ); + } + + pub fn write_f64(&mut self, column_name: &str, value: f64) { + self.write( + column_name, + ConcreteDataType::float64_datatype(), + Value::Float64(value.into()), + ); + } + + pub fn write_string(&mut self, column_name: &str, value: &str) { + self.write( + column_name, + ConcreteDataType::string_datatype(), + Value::String(value.into()), + ); + } + + pub fn write_bool(&mut self, column_name: &str, value: bool) { + self.write( + column_name, + ConcreteDataType::boolean_datatype(), + Value::Boolean(value), + ); + } + + fn write(&mut self, column_name: &str, datatype: ConcreteDataType, value: Value) { + let or_insert = || { + let rows = self.current_rows; + let mut builder = VectorBuilder::with_capacity(datatype, self.expected_rows); + (0..rows).into_iter().for_each(|_| builder.push_null()); + (builder, rows) + }; + let (builder, column_len) = self + .columns_builders + .entry(column_name.to_string()) + .or_insert_with(or_insert); + + builder.push(&value); + *column_len += 1; + } + + pub fn commit(&mut self) { + self.current_rows += 1; + self.columns_builders + .values_mut() + .into_iter() + .for_each(|(builder, len)| { + if self.current_rows > *len { + builder.push(&Value::Null) + } + }); + } + + pub fn finish(self) -> InsertRequest { + let columns_values: HashMap = self + .columns_builders + .into_iter() + .map(|(column_name, (mut builder, _))| (column_name, builder.finish())) + .collect(); + InsertRequest { + table_name: self.table_name, + columns_values, + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_time::Timestamp; + use datatypes::{value::Value, vectors::Vector}; + + use crate::line_writer::{LineWriter, Precision}; + + #[test] + fn test_writer() { + let mut writer = LineWriter::with_lines("demo".to_string(), 4); + writer.write_ts("ts", (1665893727685, Precision::MILLISECOND)); + writer.write_tag("host", "host-1"); + writer.write_i64("memory", 10_i64); + writer.commit(); + + writer.write_ts("ts", (1665893727686, Precision::MILLISECOND)); + writer.write_tag("host", "host-2"); + writer.write_tag("region", "region-2"); + writer.write_i64("memory", 9_i64); + writer.commit(); + + writer.write_ts("ts", (1665893727689, Precision::MILLISECOND)); + writer.write_tag("host", "host-3"); + writer.write_tag("region", "region-3"); + writer.write_i64("cpu", 19_i64); + writer.commit(); + + let insert_request = writer.finish(); + + assert_eq!("demo", insert_request.table_name); + + let columns = insert_request.columns_values; + assert_eq!(5, columns.len()); + assert!(columns.contains_key("ts")); + assert!(columns.contains_key("host")); + assert!(columns.contains_key("memory")); + assert!(columns.contains_key("region")); + assert!(columns.contains_key("cpu")); + + let ts = columns.get("ts").unwrap(); + let host = columns.get("host").unwrap(); + let memory = columns.get("memory").unwrap(); + let region = columns.get("region").unwrap(); + let cpu = columns.get("cpu").unwrap(); + + let expected: Vec = vec![ + Value::Timestamp(Timestamp::from_millis(1665893727685_i64)), + Value::Timestamp(Timestamp::from_millis(1665893727686_i64)), + Value::Timestamp(Timestamp::from_millis(1665893727689_i64)), + ]; + assert_vector(&expected, ts); + + let expected: Vec = vec!["host-1".into(), "host-2".into(), "host-3".into()]; + assert_vector(&expected, host); + + let expected: Vec = vec![10_i64.into(), 9_i64.into(), Value::Null]; + assert_vector(&expected, memory); + + let expected: Vec = vec![Value::Null, "region-2".into(), "region-3".into()]; + assert_vector(&expected, region); + + let expected: Vec = vec![Value::Null, Value::Null, 19_i64.into()]; + assert_vector(&expected, cpu); + } + + fn assert_vector(expected: &[Value], vector: &Arc) { + for (idx, expected) in expected.iter().enumerate() { + let val = vector.get(idx); + assert_eq!(*expected, val); + } + } +} diff --git a/src/servers/src/opentsdb/codec.rs b/src/servers/src/opentsdb/codec.rs index f02c44f135..7edbf63e60 100644 --- a/src/servers/src/opentsdb/codec.rs +++ b/src/servers/src/opentsdb/codec.rs @@ -2,8 +2,11 @@ use std::collections::HashMap; use api::v1::codec::InsertBatch; use api::v1::{column, column::SemanticType, insert_expr, Column, ColumnDataType, InsertExpr}; +use common_grpc::writer::Precision; +use table::requests::InsertRequest; use crate::error::{self, Result}; +use crate::line_writer::LineWriter; pub const OPENTSDB_TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp"; pub const OPENTSDB_VALUE_COLUMN_NAME: &str = "greptime_value"; @@ -112,6 +115,23 @@ impl DataPoint { self.value } + pub fn as_insert_request(&self) -> InsertRequest { + let mut line_writer = LineWriter::with_lines(self.metric.clone(), 1); + line_writer.write_ts( + OPENTSDB_TIMESTAMP_COLUMN_NAME, + (self.ts_millis(), Precision::MILLISECOND), + ); + + line_writer.write_f64(OPENTSDB_VALUE_COLUMN_NAME, self.value); + + for (tagk, tagv) in self.tags.iter() { + line_writer.write_tag(tagk, tagv); + } + line_writer.commit(); + line_writer.finish() + } + + // TODO(fys): will remove in the future. pub fn as_grpc_insert(&self) -> InsertExpr { let mut columns = Vec::with_capacity(2 + self.tags.len()); @@ -180,6 +200,12 @@ impl DataPoint { #[cfg(test)] mod test { + use std::sync::Arc; + + use common_time::{timestamp::TimeUnit, Timestamp}; + use datatypes::value::Value; + use datatypes::vectors::Vector; + use super::*; #[test] @@ -239,6 +265,43 @@ mod test { ); } + #[test] + fn test_as_insert_request() { + let data_point = DataPoint { + metric: "my_metric_1".to_string(), + ts_millis: 1000, + value: 1.0, + tags: vec![ + ("tagk1".to_string(), "tagv1".to_string()), + ("tagk2".to_string(), "tagv2".to_string()), + ], + }; + let insert_request = data_point.as_insert_request(); + assert_eq!("my_metric_1", insert_request.table_name); + let columns = insert_request.columns_values; + assert_eq!(4, columns.len()); + let ts = columns.get(OPENTSDB_TIMESTAMP_COLUMN_NAME).unwrap(); + let expected = vec![datatypes::prelude::Value::Timestamp(Timestamp::new( + 1000, + TimeUnit::Millisecond, + ))]; + assert_vector(&expected, ts); + let val = columns.get(OPENTSDB_VALUE_COLUMN_NAME).unwrap(); + assert_vector(&[1.0.into()], val); + let tagk1 = columns.get("tagk1").unwrap(); + assert_vector(&["tagv1".into()], tagk1); + let tagk2 = columns.get("tagk2").unwrap(); + assert_vector(&["tagv2".into()], tagk2); + } + + fn assert_vector(expected: &[Value], vector: &Arc) { + for (idx, expected) in expected.iter().enumerate() { + let val = vector.get(idx); + assert_eq!(*expected, val); + } + } + + // TODO(fys): will remove in the future. #[test] fn test_as_grpc_insert() { let data_point = DataPoint {