From deb7d5fc2c5a92b61d33c170d281d47e70650c8e Mon Sep 17 00:00:00 2001 From: "Lei, Huang" <6406592+v0y4g3r@users.noreply.github.com> Date: Tue, 15 Nov 2022 11:06:51 +0800 Subject: [PATCH] fix: opentsdb/influxdb tags are not put to primary key indices (#506) --- Cargo.lock | 1 + src/client/Cargo.toml | 1 + src/client/src/database.rs | 110 +++--------------------- src/client/src/error.rs | 24 ++---- src/common/insert/src/error.rs | 13 +++ src/common/insert/src/insert.rs | 98 ++++++++++++++++++++- src/common/insert/src/lib.rs | 4 +- src/frontend/src/error.rs | 7 ++ src/frontend/src/instance/influxdb.rs | 84 ++++++++++++++---- src/frontend/src/instance/opentsdb.rs | 2 +- src/frontend/src/instance/prometheus.rs | 2 +- 11 files changed, 205 insertions(+), 141 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 562878d9ff..3b5d98857d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1026,6 +1026,7 @@ dependencies = [ "common-base", "common-error", "common-grpc", + "common-insert", "common-query", "common-recordbatch", "common-time", diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index 541b6d2dd4..6706bfd203 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -12,6 +12,7 @@ common-error = { path = "../common/error" } common-grpc = { path = "../common/grpc" } common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } +common-insert = { path = "../common/insert" } common-time = { path = "../common/time" } datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [ "simd", diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 17e0908a48..2a49babbc2 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -1,27 +1,24 @@ use std::sync::Arc; -use api::helper::ColumnDataTypeWrapper; use api::v1::codec::SelectResult as GrpcSelectResult; use api::v1::{ - column::Values, object_expr, object_result, select_expr, Column, ColumnDataType, - DatabaseRequest, ExprHeader, InsertExpr, MutateResult as GrpcMutateResult, ObjectExpr, - ObjectResult as GrpcObjectResult, PhysicalPlan, SelectExpr, + object_expr, object_result, select_expr, DatabaseRequest, ExprHeader, InsertExpr, + MutateResult as GrpcMutateResult, ObjectExpr, ObjectResult as GrpcObjectResult, PhysicalPlan, + SelectExpr, }; -use common_base::BitVec; use common_error::status_code::StatusCode; use common_grpc::AsExcutionPlan; use common_grpc::DefaultAsPlanImpl; +use common_insert::column_to_vector; use common_query::Output; use common_recordbatch::{RecordBatch, RecordBatches}; -use common_time::date::Date; -use common_time::datetime::DateTime; -use common_time::timestamp::Timestamp; use datafusion::physical_plan::ExecutionPlan; use datatypes::prelude::*; use datatypes::schema::{ColumnSchema, Schema}; use snafu::{ensure, OptionExt, ResultExt}; use crate::error; +use crate::error::ColumnToVectorSnafu; use crate::{ error::{ConvertSchemaSnafu, DatanodeSnafu, DecodeSelectSnafu, EncodePhysicalSnafu}, Client, Result, @@ -199,7 +196,9 @@ impl TryFrom for Output { let vectors = select .columns .iter() - .map(|column| column_to_vector(column, select.row_count)) + .map(|column| { + column_to_vector(column, select.row_count).context(ColumnToVectorSnafu) + }) .collect::>>()?; let column_schemas = select @@ -237,99 +236,10 @@ impl TryFrom for Output { } } -fn column_to_vector(column: &Column, rows: u32) -> Result { - let wrapper = - ColumnDataTypeWrapper::try_new(column.datatype).context(error::ColumnDataTypeSnafu)?; - let column_datatype = wrapper.datatype(); - - let rows = rows as usize; - let mut vector = VectorBuilder::with_capacity(wrapper.into(), rows); - - if let Some(values) = &column.values { - let values = collect_column_values(column_datatype, values); - let mut values_iter = values.into_iter(); - - let null_mask = BitVec::from_slice(&column.null_mask); - let mut nulls_iter = null_mask.iter().by_vals().fuse(); - - for i in 0..rows { - if let Some(true) = nulls_iter.next() { - vector.push_null(); - } else { - let value_ref = values_iter.next().context(error::InvalidColumnProtoSnafu { - err_msg: format!( - "value not found at position {} of column {}", - i, &column.column_name - ), - })?; - vector - .try_push_ref(value_ref) - .context(error::CreateVectorSnafu)?; - } - } - } else { - (0..rows).for_each(|_| vector.push_null()); - } - Ok(vector.finish()) -} - -fn collect_column_values(column_datatype: ColumnDataType, values: &Values) -> Vec { - macro_rules! collect_values { - ($value: expr, $mapper: expr) => { - $value.iter().map($mapper).collect::>() - }; - } - - match column_datatype { - ColumnDataType::Boolean => collect_values!(values.bool_values, |v| ValueRef::from(*v)), - ColumnDataType::Int8 => collect_values!(values.i8_values, |v| ValueRef::from(*v as i8)), - ColumnDataType::Int16 => { - collect_values!(values.i16_values, |v| ValueRef::from(*v as i16)) - } - ColumnDataType::Int32 => { - collect_values!(values.i32_values, |v| ValueRef::from(*v)) - } - ColumnDataType::Int64 => { - collect_values!(values.i64_values, |v| ValueRef::from(*v as i64)) - } - ColumnDataType::Uint8 => { - collect_values!(values.u8_values, |v| ValueRef::from(*v as u8)) - } - ColumnDataType::Uint16 => { - collect_values!(values.u16_values, |v| ValueRef::from(*v as u16)) - } - ColumnDataType::Uint32 => { - collect_values!(values.u32_values, |v| ValueRef::from(*v)) - } - ColumnDataType::Uint64 => { - collect_values!(values.u64_values, |v| ValueRef::from(*v as u64)) - } - ColumnDataType::Float32 => collect_values!(values.f32_values, |v| ValueRef::from(*v)), - ColumnDataType::Float64 => collect_values!(values.f64_values, |v| ValueRef::from(*v)), - ColumnDataType::Binary => { - collect_values!(values.binary_values, |v| ValueRef::from(v.as_slice())) - } - ColumnDataType::String => { - collect_values!(values.string_values, |v| ValueRef::from(v.as_str())) - } - ColumnDataType::Date => { - collect_values!(values.date_values, |v| ValueRef::Date(Date::new(*v))) - } - ColumnDataType::Datetime => { - collect_values!(values.datetime_values, |v| ValueRef::DateTime( - DateTime::new(*v) - )) - } - ColumnDataType::Timestamp => { - collect_values!(values.ts_millis_values, |v| ValueRef::Timestamp( - Timestamp::from_millis(*v) - )) - } - } -} - #[cfg(test)] mod tests { + use api::helper::ColumnDataTypeWrapper; + use api::v1::Column; use datanode::server::grpc::select::{null_mask, values}; use datatypes::vectors::{ BinaryVector, BooleanVector, DateTimeVector, DateVector, Float32Vector, Float64Vector, diff --git a/src/client/src/error.rs b/src/client/src/error.rs index 1ca36f1c24..042949f60a 100644 --- a/src/client/src/error.rs +++ b/src/client/src/error.rs @@ -48,24 +48,12 @@ pub enum Error { #[snafu(display("Mutate result has failure {}", failure))] MutateFailure { failure: u32, backtrace: Backtrace }, - #[snafu(display("Invalid column proto: {}", err_msg))] - InvalidColumnProto { - err_msg: String, - backtrace: Backtrace, - }, - #[snafu(display("Column datatype error, source: {}", source))] ColumnDataType { #[snafu(backtrace)] source: api::error::Error, }, - #[snafu(display("Failed to create vector, source: {}", source))] - CreateVector { - #[snafu(backtrace)] - source: datatypes::error::Error, - }, - #[snafu(display("Failed to create RecordBatches, source: {}", source))] CreateRecordBatches { #[snafu(backtrace)] @@ -97,6 +85,12 @@ pub enum Error { #[snafu(backtrace)] source: common_grpc::error::Error, }, + + #[snafu(display("Failed to convert column to vector, source: {}", source))] + ColumnToVector { + #[snafu(backtrace)] + source: common_insert::error::Error, + }, } pub type Result = std::result::Result; @@ -112,15 +106,13 @@ impl ErrorExt for Error { | Error::Datanode { .. } | Error::EncodePhysical { .. } | Error::MutateFailure { .. } - | Error::InvalidColumnProto { .. } | Error::ColumnDataType { .. } | Error::MissingField { .. } => StatusCode::Internal, - Error::ConvertSchema { source } | Error::CreateVector { source } => { - source.status_code() - } + Error::ConvertSchema { source } => source.status_code(), Error::CreateRecordBatches { source } => source.status_code(), Error::CreateChannel { source, .. } => source.status_code(), Error::IllegalGrpcClientState { .. } => StatusCode::Unexpected, + Error::ColumnToVector { source, .. } => source.status_code(), } } diff --git a/src/common/insert/src/error.rs b/src/common/insert/src/error.rs index 1702be8539..d06568ff14 100644 --- a/src/common/insert/src/error.rs +++ b/src/common/insert/src/error.rs @@ -45,6 +45,17 @@ pub enum Error { #[snafu(display("Missing timestamp column in request"))] MissingTimestampColumn { backtrace: Backtrace }, + + #[snafu(display("Invalid column proto: {}", err_msg))] + InvalidColumnProto { + err_msg: String, + backtrace: Backtrace, + }, + #[snafu(display("Failed to create vector, source: {}", source))] + CreateVector { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, } pub type Result = std::result::Result; @@ -60,6 +71,8 @@ impl ErrorExt for Error { Error::CreateSchema { .. } | Error::DuplicatedTimestampColumn { .. } | Error::MissingTimestampColumn { .. } => StatusCode::InvalidArguments, + Error::InvalidColumnProto { .. } => StatusCode::InvalidArguments, + Error::CreateVector { .. } => StatusCode::InvalidArguments, } } fn backtrace_opt(&self) -> Option<&Backtrace> { diff --git a/src/common/insert/src/insert.rs b/src/common/insert/src/insert.rs index 18d0520f2a..57247b213a 100644 --- a/src/common/insert/src/insert.rs +++ b/src/common/insert/src/insert.rs @@ -5,14 +5,18 @@ use std::{ sync::Arc, }; +use api::helper::ColumnDataTypeWrapper; use api::v1::{ codec::InsertBatch, column::{SemanticType, Values}, - AddColumns, Column, + AddColumns, Column, ColumnDataType, }; use api::v1::{AddColumn, ColumnDef, CreateExpr}; use common_base::BitVec; use common_time::timestamp::Timestamp; +use common_time::Date; +use common_time::DateTime; +use datatypes::prelude::{ValueRef, VectorRef}; use datatypes::schema::SchemaRef; use datatypes::{data_type::ConcreteDataType, value::Value, vectors::VectorBuilder}; use snafu::{ensure, OptionExt, ResultExt}; @@ -23,10 +27,10 @@ use table::{ }; use crate::error::{ - ColumnNotFoundSnafu, DecodeInsertSnafu, DuplicatedTimestampColumnSnafu, IllegalInsertDataSnafu, + ColumnDataTypeSnafu, ColumnNotFoundSnafu, CreateVectorSnafu, DecodeInsertSnafu, + DuplicatedTimestampColumnSnafu, IllegalInsertDataSnafu, InvalidColumnProtoSnafu, MissingTimestampColumnSnafu, Result, }; - const TAG_SEMANTIC_TYPE: i32 = SemanticType::Tag as i32; const TIMESTAMP_SEMANTIC_TYPE: i32 = SemanticType::Timestamp as i32; @@ -95,6 +99,94 @@ pub fn build_alter_table_request( } } +pub fn column_to_vector(column: &Column, rows: u32) -> Result { + let wrapper = ColumnDataTypeWrapper::try_new(column.datatype).context(ColumnDataTypeSnafu)?; + let column_datatype = wrapper.datatype(); + + let rows = rows as usize; + let mut vector = VectorBuilder::with_capacity(wrapper.into(), rows); + + if let Some(values) = &column.values { + let values = collect_column_values(column_datatype, values); + let mut values_iter = values.into_iter(); + + let null_mask = BitVec::from_slice(&column.null_mask); + let mut nulls_iter = null_mask.iter().by_vals().fuse(); + + for i in 0..rows { + if let Some(true) = nulls_iter.next() { + vector.push_null(); + } else { + let value_ref = values_iter.next().context(InvalidColumnProtoSnafu { + err_msg: format!( + "value not found at position {} of column {}", + i, &column.column_name + ), + })?; + vector.try_push_ref(value_ref).context(CreateVectorSnafu)?; + } + } + } else { + (0..rows).for_each(|_| vector.push_null()); + } + Ok(vector.finish()) +} + +fn collect_column_values(column_datatype: ColumnDataType, values: &Values) -> Vec { + macro_rules! collect_values { + ($value: expr, $mapper: expr) => { + $value.iter().map($mapper).collect::>() + }; + } + + match column_datatype { + ColumnDataType::Boolean => collect_values!(values.bool_values, |v| ValueRef::from(*v)), + ColumnDataType::Int8 => collect_values!(values.i8_values, |v| ValueRef::from(*v as i8)), + ColumnDataType::Int16 => { + collect_values!(values.i16_values, |v| ValueRef::from(*v as i16)) + } + ColumnDataType::Int32 => { + collect_values!(values.i32_values, |v| ValueRef::from(*v)) + } + ColumnDataType::Int64 => { + collect_values!(values.i64_values, |v| ValueRef::from(*v as i64)) + } + ColumnDataType::Uint8 => { + collect_values!(values.u8_values, |v| ValueRef::from(*v as u8)) + } + ColumnDataType::Uint16 => { + collect_values!(values.u16_values, |v| ValueRef::from(*v as u16)) + } + ColumnDataType::Uint32 => { + collect_values!(values.u32_values, |v| ValueRef::from(*v)) + } + ColumnDataType::Uint64 => { + collect_values!(values.u64_values, |v| ValueRef::from(*v as u64)) + } + ColumnDataType::Float32 => collect_values!(values.f32_values, |v| ValueRef::from(*v)), + ColumnDataType::Float64 => collect_values!(values.f64_values, |v| ValueRef::from(*v)), + ColumnDataType::Binary => { + collect_values!(values.binary_values, |v| ValueRef::from(v.as_slice())) + } + ColumnDataType::String => { + collect_values!(values.string_values, |v| ValueRef::from(v.as_str())) + } + ColumnDataType::Date => { + collect_values!(values.date_values, |v| ValueRef::Date(Date::new(*v))) + } + ColumnDataType::Datetime => { + collect_values!(values.datetime_values, |v| ValueRef::DateTime( + DateTime::new(*v) + )) + } + ColumnDataType::Timestamp => { + collect_values!(values.ts_millis_values, |v| ValueRef::Timestamp( + Timestamp::from_millis(*v) + )) + } + } +} + /// Try to build create table request from insert data. pub fn build_create_expr_from_insertion( catalog_name: &str, diff --git a/src/common/insert/src/lib.rs b/src/common/insert/src/lib.rs index 7ad7542e54..eea4a6837b 100644 --- a/src/common/insert/src/lib.rs +++ b/src/common/insert/src/lib.rs @@ -1,6 +1,6 @@ pub mod error; mod insert; pub use insert::{ - build_alter_table_request, build_create_expr_from_insertion, find_new_columns, insert_batches, - insertion_expr_to_request, + build_alter_table_request, build_create_expr_from_insertion, column_to_vector, + find_new_columns, insert_batches, insertion_expr_to_request, }; diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index e209c626a6..448d7aeecb 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -260,6 +260,12 @@ pub enum Error { source: common_insert::error::Error, }, + #[snafu(display("Failed to deserialize insert batching: {}", source))] + InsertBatchToRequest { + #[snafu(backtrace)] + source: common_insert::error::Error, + }, + #[snafu(display("Failed to find catalog by name: {}", catalog_name))] CatalogNotFound { catalog_name: String, @@ -427,6 +433,7 @@ impl ErrorExt for Error { Error::DeserializeInsertBatch { source, .. } => source.status_code(), Error::PrimaryKeyNotFound { .. } => StatusCode::InvalidArguments, Error::ExecuteSql { source, .. } => source.status_code(), + Error::InsertBatchToRequest { source, .. } => source.status_code(), } } diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs index 4178ab0667..601a1750f4 100644 --- a/src/frontend/src/instance/influxdb.rs +++ b/src/frontend/src/instance/influxdb.rs @@ -1,13 +1,19 @@ +use std::collections::HashMap; + +use api::v1::codec::InsertBatch; +use api::v1::insert_expr::Expr; use api::v1::InsertExpr; use async_trait::async_trait; +use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_error::prelude::BoxedError; +use common_insert::column_to_vector; use servers::influxdb::InfluxdbRequest; use servers::{error as server_error, query_handler::InfluxdbLineProtocolHandler}; use snafu::{OptionExt, ResultExt}; use table::requests::InsertRequest; use crate::error; -use crate::error::Result; +use crate::error::{DeserializeInsertBatchSnafu, InsertBatchToRequestSnafu, Result}; use crate::frontend::Mode; use crate::instance::Instance; @@ -39,33 +45,55 @@ impl InfluxdbLineProtocolHandler for Instance { } impl Instance { - pub(crate) async fn dist_insert(&self, inserts: Vec) -> Result { + pub(crate) async fn dist_insert(&self, inserts: Vec) -> Result { let mut joins = Vec::with_capacity(inserts.len()); + let catalog_name = DEFAULT_CATALOG_NAME.to_string(); for insert in inserts { let self_clone = self.clone(); - let insert_batch = crate::table::insert::insert_request_to_insert_batch(&insert)?; + let insert_batches = match &insert.expr.unwrap() { + Expr::Values(values) => common_insert::insert_batches(&values.values) + .context(DeserializeInsertBatchSnafu)?, + Expr::Sql(_) => unreachable!(), + }; + self.create_or_alter_table_on_demand( - &insert.catalog_name, + DEFAULT_CATALOG_NAME, &insert.schema_name, &insert.table_name, - &[insert_batch], + &insert_batches, ) .await?; - // TODO(fys): need a separate runtime here - let join = tokio::spawn(async move { - let catalog = self_clone.get_catalog(&insert.catalog_name)?; - let schema = Self::get_schema(catalog, &insert.schema_name)?; - let table = schema - .table(&insert.table_name) - .context(error::CatalogSnafu)? - .context(error::TableNotFoundSnafu { - table_name: &insert.table_name, - })?; - table.insert(insert).await.context(error::TableSnafu) - }); - joins.push(join); + let schema_name = insert.schema_name.clone(); + let table_name = insert.table_name.clone(); + + for insert_batch in &insert_batches { + let catalog_name = catalog_name.clone(); + let schema_name = schema_name.clone(); + let table_name = table_name.clone(); + let request = Self::insert_batch_to_request( + DEFAULT_CATALOG_NAME, + &schema_name, + &table_name, + insert_batch, + )?; + // TODO(fys): need a separate runtime here + let self_clone = self_clone.clone(); + let join = tokio::spawn(async move { + let catalog = self_clone.get_catalog(&catalog_name)?; + let schema = Self::get_schema(catalog, &schema_name)?; + let table = schema + .table(&table_name) + .context(error::CatalogSnafu)? + .context(error::TableNotFoundSnafu { + table_name: &table_name, + })?; + + table.insert(request).await.context(error::TableSnafu) + }); + joins.push(join); + } } let mut affected = 0; @@ -76,4 +104,24 @@ impl Instance { Ok(affected) } + + fn insert_batch_to_request( + catalog_name: &str, + schema_name: &str, + table_name: &str, + batches: &InsertBatch, + ) -> Result { + let mut vectors = HashMap::with_capacity(batches.columns.len()); + for col in &batches.columns { + let vector = + column_to_vector(col, batches.row_count).context(InsertBatchToRequestSnafu)?; + vectors.insert(col.column_name.clone(), vector); + } + Ok(InsertRequest { + catalog_name: catalog_name.to_string(), + schema_name: schema_name.to_string(), + table_name: table_name.to_string(), + columns_values: vectors, + }) + } } diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs index 24e89ae256..273a7957e7 100644 --- a/src/frontend/src/instance/opentsdb.rs +++ b/src/frontend/src/instance/opentsdb.rs @@ -24,7 +24,7 @@ impl OpentsdbProtocolHandler for Instance { })?; } Mode::Distributed(_) => { - self.dist_insert(vec![data_point.as_insert_request()]) + self.dist_insert(vec![data_point.as_grpc_insert()]) .await .map_err(BoxedError::new) .context(server_error::ExecuteInsertSnafu { diff --git a/src/frontend/src/instance/prometheus.rs b/src/frontend/src/instance/prometheus.rs index 6c2ec04f00..babc91ef33 100644 --- a/src/frontend/src/instance/prometheus.rs +++ b/src/frontend/src/instance/prometheus.rs @@ -108,7 +108,7 @@ impl PrometheusProtocolHandler for Instance { })?; } Mode::Distributed(_) => { - let inserts = prometheus::write_request_to_insert_reqs(database, request)?; + let inserts = prometheus::write_request_to_insert_exprs(database, request)?; self.dist_insert(inserts) .await