fix: opentsdb/influxdb tags are not put to primary key indices (#506)

This commit is contained in:
Lei, Huang
2022-11-15 11:06:51 +08:00
committed by GitHub
parent 3f12f5443d
commit deb7d5fc2c
11 changed files with 205 additions and 141 deletions

1
Cargo.lock generated
View File

@@ -1026,6 +1026,7 @@ dependencies = [
"common-base",
"common-error",
"common-grpc",
"common-insert",
"common-query",
"common-recordbatch",
"common-time",

View File

@@ -12,6 +12,7 @@ common-error = { path = "../common/error" }
common-grpc = { path = "../common/grpc" }
common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-insert = { path = "../common/insert" }
common-time = { path = "../common/time" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [
"simd",

View File

@@ -1,27 +1,24 @@
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::codec::SelectResult as GrpcSelectResult;
use api::v1::{
column::Values, object_expr, object_result, select_expr, Column, ColumnDataType,
DatabaseRequest, ExprHeader, InsertExpr, MutateResult as GrpcMutateResult, ObjectExpr,
ObjectResult as GrpcObjectResult, PhysicalPlan, SelectExpr,
object_expr, object_result, select_expr, DatabaseRequest, ExprHeader, InsertExpr,
MutateResult as GrpcMutateResult, ObjectExpr, ObjectResult as GrpcObjectResult, PhysicalPlan,
SelectExpr,
};
use common_base::BitVec;
use common_error::status_code::StatusCode;
use common_grpc::AsExcutionPlan;
use common_grpc::DefaultAsPlanImpl;
use common_insert::column_to_vector;
use common_query::Output;
use common_recordbatch::{RecordBatch, RecordBatches};
use common_time::date::Date;
use common_time::datetime::DateTime;
use common_time::timestamp::Timestamp;
use datafusion::physical_plan::ExecutionPlan;
use datatypes::prelude::*;
use datatypes::schema::{ColumnSchema, Schema};
use snafu::{ensure, OptionExt, ResultExt};
use crate::error;
use crate::error::ColumnToVectorSnafu;
use crate::{
error::{ConvertSchemaSnafu, DatanodeSnafu, DecodeSelectSnafu, EncodePhysicalSnafu},
Client, Result,
@@ -199,7 +196,9 @@ impl TryFrom<ObjectResult> for Output {
let vectors = select
.columns
.iter()
.map(|column| column_to_vector(column, select.row_count))
.map(|column| {
column_to_vector(column, select.row_count).context(ColumnToVectorSnafu)
})
.collect::<Result<Vec<VectorRef>>>()?;
let column_schemas = select
@@ -237,99 +236,10 @@ impl TryFrom<ObjectResult> for Output {
}
}
fn column_to_vector(column: &Column, rows: u32) -> Result<VectorRef> {
let wrapper =
ColumnDataTypeWrapper::try_new(column.datatype).context(error::ColumnDataTypeSnafu)?;
let column_datatype = wrapper.datatype();
let rows = rows as usize;
let mut vector = VectorBuilder::with_capacity(wrapper.into(), rows);
if let Some(values) = &column.values {
let values = collect_column_values(column_datatype, values);
let mut values_iter = values.into_iter();
let null_mask = BitVec::from_slice(&column.null_mask);
let mut nulls_iter = null_mask.iter().by_vals().fuse();
for i in 0..rows {
if let Some(true) = nulls_iter.next() {
vector.push_null();
} else {
let value_ref = values_iter.next().context(error::InvalidColumnProtoSnafu {
err_msg: format!(
"value not found at position {} of column {}",
i, &column.column_name
),
})?;
vector
.try_push_ref(value_ref)
.context(error::CreateVectorSnafu)?;
}
}
} else {
(0..rows).for_each(|_| vector.push_null());
}
Ok(vector.finish())
}
fn collect_column_values(column_datatype: ColumnDataType, values: &Values) -> Vec<ValueRef> {
macro_rules! collect_values {
($value: expr, $mapper: expr) => {
$value.iter().map($mapper).collect::<Vec<ValueRef>>()
};
}
match column_datatype {
ColumnDataType::Boolean => collect_values!(values.bool_values, |v| ValueRef::from(*v)),
ColumnDataType::Int8 => collect_values!(values.i8_values, |v| ValueRef::from(*v as i8)),
ColumnDataType::Int16 => {
collect_values!(values.i16_values, |v| ValueRef::from(*v as i16))
}
ColumnDataType::Int32 => {
collect_values!(values.i32_values, |v| ValueRef::from(*v))
}
ColumnDataType::Int64 => {
collect_values!(values.i64_values, |v| ValueRef::from(*v as i64))
}
ColumnDataType::Uint8 => {
collect_values!(values.u8_values, |v| ValueRef::from(*v as u8))
}
ColumnDataType::Uint16 => {
collect_values!(values.u16_values, |v| ValueRef::from(*v as u16))
}
ColumnDataType::Uint32 => {
collect_values!(values.u32_values, |v| ValueRef::from(*v))
}
ColumnDataType::Uint64 => {
collect_values!(values.u64_values, |v| ValueRef::from(*v as u64))
}
ColumnDataType::Float32 => collect_values!(values.f32_values, |v| ValueRef::from(*v)),
ColumnDataType::Float64 => collect_values!(values.f64_values, |v| ValueRef::from(*v)),
ColumnDataType::Binary => {
collect_values!(values.binary_values, |v| ValueRef::from(v.as_slice()))
}
ColumnDataType::String => {
collect_values!(values.string_values, |v| ValueRef::from(v.as_str()))
}
ColumnDataType::Date => {
collect_values!(values.date_values, |v| ValueRef::Date(Date::new(*v)))
}
ColumnDataType::Datetime => {
collect_values!(values.datetime_values, |v| ValueRef::DateTime(
DateTime::new(*v)
))
}
ColumnDataType::Timestamp => {
collect_values!(values.ts_millis_values, |v| ValueRef::Timestamp(
Timestamp::from_millis(*v)
))
}
}
}
#[cfg(test)]
mod tests {
use api::helper::ColumnDataTypeWrapper;
use api::v1::Column;
use datanode::server::grpc::select::{null_mask, values};
use datatypes::vectors::{
BinaryVector, BooleanVector, DateTimeVector, DateVector, Float32Vector, Float64Vector,

View File

@@ -48,24 +48,12 @@ pub enum Error {
#[snafu(display("Mutate result has failure {}", failure))]
MutateFailure { failure: u32, backtrace: Backtrace },
#[snafu(display("Invalid column proto: {}", err_msg))]
InvalidColumnProto {
err_msg: String,
backtrace: Backtrace,
},
#[snafu(display("Column datatype error, source: {}", source))]
ColumnDataType {
#[snafu(backtrace)]
source: api::error::Error,
},
#[snafu(display("Failed to create vector, source: {}", source))]
CreateVector {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display("Failed to create RecordBatches, source: {}", source))]
CreateRecordBatches {
#[snafu(backtrace)]
@@ -97,6 +85,12 @@ pub enum Error {
#[snafu(backtrace)]
source: common_grpc::error::Error,
},
#[snafu(display("Failed to convert column to vector, source: {}", source))]
ColumnToVector {
#[snafu(backtrace)]
source: common_insert::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -112,15 +106,13 @@ impl ErrorExt for Error {
| Error::Datanode { .. }
| Error::EncodePhysical { .. }
| Error::MutateFailure { .. }
| Error::InvalidColumnProto { .. }
| Error::ColumnDataType { .. }
| Error::MissingField { .. } => StatusCode::Internal,
Error::ConvertSchema { source } | Error::CreateVector { source } => {
source.status_code()
}
Error::ConvertSchema { source } => source.status_code(),
Error::CreateRecordBatches { source } => source.status_code(),
Error::CreateChannel { source, .. } => source.status_code(),
Error::IllegalGrpcClientState { .. } => StatusCode::Unexpected,
Error::ColumnToVector { source, .. } => source.status_code(),
}
}

View File

@@ -45,6 +45,17 @@ pub enum Error {
#[snafu(display("Missing timestamp column in request"))]
MissingTimestampColumn { backtrace: Backtrace },
#[snafu(display("Invalid column proto: {}", err_msg))]
InvalidColumnProto {
err_msg: String,
backtrace: Backtrace,
},
#[snafu(display("Failed to create vector, source: {}", source))]
CreateVector {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -60,6 +71,8 @@ impl ErrorExt for Error {
Error::CreateSchema { .. }
| Error::DuplicatedTimestampColumn { .. }
| Error::MissingTimestampColumn { .. } => StatusCode::InvalidArguments,
Error::InvalidColumnProto { .. } => StatusCode::InvalidArguments,
Error::CreateVector { .. } => StatusCode::InvalidArguments,
}
}
fn backtrace_opt(&self) -> Option<&Backtrace> {

View File

@@ -5,14 +5,18 @@ use std::{
sync::Arc,
};
use api::helper::ColumnDataTypeWrapper;
use api::v1::{
codec::InsertBatch,
column::{SemanticType, Values},
AddColumns, Column,
AddColumns, Column, ColumnDataType,
};
use api::v1::{AddColumn, ColumnDef, CreateExpr};
use common_base::BitVec;
use common_time::timestamp::Timestamp;
use common_time::Date;
use common_time::DateTime;
use datatypes::prelude::{ValueRef, VectorRef};
use datatypes::schema::SchemaRef;
use datatypes::{data_type::ConcreteDataType, value::Value, vectors::VectorBuilder};
use snafu::{ensure, OptionExt, ResultExt};
@@ -23,10 +27,10 @@ use table::{
};
use crate::error::{
ColumnNotFoundSnafu, DecodeInsertSnafu, DuplicatedTimestampColumnSnafu, IllegalInsertDataSnafu,
ColumnDataTypeSnafu, ColumnNotFoundSnafu, CreateVectorSnafu, DecodeInsertSnafu,
DuplicatedTimestampColumnSnafu, IllegalInsertDataSnafu, InvalidColumnProtoSnafu,
MissingTimestampColumnSnafu, Result,
};
const TAG_SEMANTIC_TYPE: i32 = SemanticType::Tag as i32;
const TIMESTAMP_SEMANTIC_TYPE: i32 = SemanticType::Timestamp as i32;
@@ -95,6 +99,94 @@ pub fn build_alter_table_request(
}
}
pub fn column_to_vector(column: &Column, rows: u32) -> Result<VectorRef> {
let wrapper = ColumnDataTypeWrapper::try_new(column.datatype).context(ColumnDataTypeSnafu)?;
let column_datatype = wrapper.datatype();
let rows = rows as usize;
let mut vector = VectorBuilder::with_capacity(wrapper.into(), rows);
if let Some(values) = &column.values {
let values = collect_column_values(column_datatype, values);
let mut values_iter = values.into_iter();
let null_mask = BitVec::from_slice(&column.null_mask);
let mut nulls_iter = null_mask.iter().by_vals().fuse();
for i in 0..rows {
if let Some(true) = nulls_iter.next() {
vector.push_null();
} else {
let value_ref = values_iter.next().context(InvalidColumnProtoSnafu {
err_msg: format!(
"value not found at position {} of column {}",
i, &column.column_name
),
})?;
vector.try_push_ref(value_ref).context(CreateVectorSnafu)?;
}
}
} else {
(0..rows).for_each(|_| vector.push_null());
}
Ok(vector.finish())
}
fn collect_column_values(column_datatype: ColumnDataType, values: &Values) -> Vec<ValueRef> {
macro_rules! collect_values {
($value: expr, $mapper: expr) => {
$value.iter().map($mapper).collect::<Vec<ValueRef>>()
};
}
match column_datatype {
ColumnDataType::Boolean => collect_values!(values.bool_values, |v| ValueRef::from(*v)),
ColumnDataType::Int8 => collect_values!(values.i8_values, |v| ValueRef::from(*v as i8)),
ColumnDataType::Int16 => {
collect_values!(values.i16_values, |v| ValueRef::from(*v as i16))
}
ColumnDataType::Int32 => {
collect_values!(values.i32_values, |v| ValueRef::from(*v))
}
ColumnDataType::Int64 => {
collect_values!(values.i64_values, |v| ValueRef::from(*v as i64))
}
ColumnDataType::Uint8 => {
collect_values!(values.u8_values, |v| ValueRef::from(*v as u8))
}
ColumnDataType::Uint16 => {
collect_values!(values.u16_values, |v| ValueRef::from(*v as u16))
}
ColumnDataType::Uint32 => {
collect_values!(values.u32_values, |v| ValueRef::from(*v))
}
ColumnDataType::Uint64 => {
collect_values!(values.u64_values, |v| ValueRef::from(*v as u64))
}
ColumnDataType::Float32 => collect_values!(values.f32_values, |v| ValueRef::from(*v)),
ColumnDataType::Float64 => collect_values!(values.f64_values, |v| ValueRef::from(*v)),
ColumnDataType::Binary => {
collect_values!(values.binary_values, |v| ValueRef::from(v.as_slice()))
}
ColumnDataType::String => {
collect_values!(values.string_values, |v| ValueRef::from(v.as_str()))
}
ColumnDataType::Date => {
collect_values!(values.date_values, |v| ValueRef::Date(Date::new(*v)))
}
ColumnDataType::Datetime => {
collect_values!(values.datetime_values, |v| ValueRef::DateTime(
DateTime::new(*v)
))
}
ColumnDataType::Timestamp => {
collect_values!(values.ts_millis_values, |v| ValueRef::Timestamp(
Timestamp::from_millis(*v)
))
}
}
}
/// Try to build create table request from insert data.
pub fn build_create_expr_from_insertion(
catalog_name: &str,

View File

@@ -1,6 +1,6 @@
pub mod error;
mod insert;
pub use insert::{
build_alter_table_request, build_create_expr_from_insertion, find_new_columns, insert_batches,
insertion_expr_to_request,
build_alter_table_request, build_create_expr_from_insertion, column_to_vector,
find_new_columns, insert_batches, insertion_expr_to_request,
};

View File

@@ -260,6 +260,12 @@ pub enum Error {
source: common_insert::error::Error,
},
#[snafu(display("Failed to deserialize insert batching: {}", source))]
InsertBatchToRequest {
#[snafu(backtrace)]
source: common_insert::error::Error,
},
#[snafu(display("Failed to find catalog by name: {}", catalog_name))]
CatalogNotFound {
catalog_name: String,
@@ -427,6 +433,7 @@ impl ErrorExt for Error {
Error::DeserializeInsertBatch { source, .. } => source.status_code(),
Error::PrimaryKeyNotFound { .. } => StatusCode::InvalidArguments,
Error::ExecuteSql { source, .. } => source.status_code(),
Error::InsertBatchToRequest { source, .. } => source.status_code(),
}
}

View File

@@ -1,13 +1,19 @@
use std::collections::HashMap;
use api::v1::codec::InsertBatch;
use api::v1::insert_expr::Expr;
use api::v1::InsertExpr;
use async_trait::async_trait;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_error::prelude::BoxedError;
use common_insert::column_to_vector;
use servers::influxdb::InfluxdbRequest;
use servers::{error as server_error, query_handler::InfluxdbLineProtocolHandler};
use snafu::{OptionExt, ResultExt};
use table::requests::InsertRequest;
use crate::error;
use crate::error::Result;
use crate::error::{DeserializeInsertBatchSnafu, InsertBatchToRequestSnafu, Result};
use crate::frontend::Mode;
use crate::instance::Instance;
@@ -39,33 +45,55 @@ impl InfluxdbLineProtocolHandler for Instance {
}
impl Instance {
pub(crate) async fn dist_insert(&self, inserts: Vec<InsertRequest>) -> Result<usize> {
pub(crate) async fn dist_insert(&self, inserts: Vec<InsertExpr>) -> Result<usize> {
let mut joins = Vec::with_capacity(inserts.len());
let catalog_name = DEFAULT_CATALOG_NAME.to_string();
for insert in inserts {
let self_clone = self.clone();
let insert_batch = crate::table::insert::insert_request_to_insert_batch(&insert)?;
let insert_batches = match &insert.expr.unwrap() {
Expr::Values(values) => common_insert::insert_batches(&values.values)
.context(DeserializeInsertBatchSnafu)?,
Expr::Sql(_) => unreachable!(),
};
self.create_or_alter_table_on_demand(
&insert.catalog_name,
DEFAULT_CATALOG_NAME,
&insert.schema_name,
&insert.table_name,
&[insert_batch],
&insert_batches,
)
.await?;
// TODO(fys): need a separate runtime here
let join = tokio::spawn(async move {
let catalog = self_clone.get_catalog(&insert.catalog_name)?;
let schema = Self::get_schema(catalog, &insert.schema_name)?;
let table = schema
.table(&insert.table_name)
.context(error::CatalogSnafu)?
.context(error::TableNotFoundSnafu {
table_name: &insert.table_name,
})?;
table.insert(insert).await.context(error::TableSnafu)
});
joins.push(join);
let schema_name = insert.schema_name.clone();
let table_name = insert.table_name.clone();
for insert_batch in &insert_batches {
let catalog_name = catalog_name.clone();
let schema_name = schema_name.clone();
let table_name = table_name.clone();
let request = Self::insert_batch_to_request(
DEFAULT_CATALOG_NAME,
&schema_name,
&table_name,
insert_batch,
)?;
// TODO(fys): need a separate runtime here
let self_clone = self_clone.clone();
let join = tokio::spawn(async move {
let catalog = self_clone.get_catalog(&catalog_name)?;
let schema = Self::get_schema(catalog, &schema_name)?;
let table = schema
.table(&table_name)
.context(error::CatalogSnafu)?
.context(error::TableNotFoundSnafu {
table_name: &table_name,
})?;
table.insert(request).await.context(error::TableSnafu)
});
joins.push(join);
}
}
let mut affected = 0;
@@ -76,4 +104,24 @@ impl Instance {
Ok(affected)
}
fn insert_batch_to_request(
catalog_name: &str,
schema_name: &str,
table_name: &str,
batches: &InsertBatch,
) -> Result<InsertRequest> {
let mut vectors = HashMap::with_capacity(batches.columns.len());
for col in &batches.columns {
let vector =
column_to_vector(col, batches.row_count).context(InsertBatchToRequestSnafu)?;
vectors.insert(col.column_name.clone(), vector);
}
Ok(InsertRequest {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
columns_values: vectors,
})
}
}

View File

@@ -24,7 +24,7 @@ impl OpentsdbProtocolHandler for Instance {
})?;
}
Mode::Distributed(_) => {
self.dist_insert(vec![data_point.as_insert_request()])
self.dist_insert(vec![data_point.as_grpc_insert()])
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteInsertSnafu {

View File

@@ -108,7 +108,7 @@ impl PrometheusProtocolHandler for Instance {
})?;
}
Mode::Distributed(_) => {
let inserts = prometheus::write_request_to_insert_reqs(database, request)?;
let inserts = prometheus::write_request_to_insert_exprs(database, request)?;
self.dist_insert(inserts)
.await