From 033b650d0ddea3796051fa4f7437941346e2678a Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Sat, 19 Aug 2023 21:08:44 +0800 Subject: [PATCH] feat: row write protocol (#2189) * feat: datanode's row insrter * refactor: ExprFactory * feat: row inserter in standalon mode * chore: minor refactor * feat: influxdb line protocol's row protocol * chore: minor refactor * improve: avoid to use too many string * no longer async Co-authored-by: Ruihang Xia * chore: do not check empty data * chore: by review comment * chore: by comment * chore: by review comment * chore: by review comment --------- Co-authored-by: Ruihang Xia --- src/common/grpc-expr/src/insert.rs | 6 +- src/common/grpc-expr/src/lib.rs | 2 +- src/common/grpc-expr/src/util.rs | 56 +-- src/datanode/src/error.rs | 30 +- src/datanode/src/instance.rs | 29 +- src/datanode/src/instance/grpc.rs | 19 +- src/datanode/src/lib.rs | 1 + src/datanode/src/row_inserter.rs | 143 +++++++ src/frontend/src/error.rs | 6 +- src/frontend/src/expr_factory.rs | 85 ++-- src/frontend/src/instance.rs | 59 ++- src/frontend/src/instance/distributed.rs | 2 +- src/frontend/src/instance/grpc.rs | 5 +- src/frontend/src/instance/influxdb.rs | 2 +- src/frontend/src/lib.rs | 1 + src/frontend/src/row_inserter.rs | 172 +++++++++ src/meta-srv/src/service/store/etcd.rs | 16 +- src/servers/src/error.rs | 18 +- src/servers/src/http/influxdb.rs | 2 +- src/servers/src/influxdb.rs | 471 ++++++++++++++++++++++- src/servers/src/query_handler.rs | 2 +- src/servers/tests/http/influxdb_test.rs | 2 +- tests-integration/src/influxdb.rs | 4 +- 23 files changed, 988 insertions(+), 145 deletions(-) create mode 100644 src/datanode/src/row_inserter.rs create mode 100644 src/frontend/src/row_inserter.rs diff --git a/src/common/grpc-expr/src/insert.rs b/src/common/grpc-expr/src/insert.rs index fa8fe002fe..a465a4114e 100644 --- a/src/common/grpc-expr/src/insert.rs +++ b/src/common/grpc-expr/src/insert.rs @@ -23,6 +23,7 @@ use datatypes::data_type::{ConcreteDataType, DataType}; use datatypes::prelude::VectorRef; use datatypes::schema::SchemaRef; use snafu::{ensure, ResultExt}; +use table::engine::TableReference; use table::metadata::TableId; use table::requests::InsertRequest; @@ -47,12 +48,11 @@ pub fn build_create_expr_from_insertion( columns: &[Column], engine: &str, ) -> Result { + let table_name = TableReference::full(catalog_name, schema_name, table_name); let column_exprs = ColumnExpr::from_columns(columns); util::build_create_table_expr( - catalog_name, - schema_name, table_id, - table_name, + &table_name, column_exprs, engine, "Created on insertion", diff --git a/src/common/grpc-expr/src/lib.rs b/src/common/grpc-expr/src/lib.rs index 61da4fe195..cd3d9540b3 100644 --- a/src/common/grpc-expr/src/lib.rs +++ b/src/common/grpc-expr/src/lib.rs @@ -16,7 +16,7 @@ mod alter; pub mod delete; pub mod error; pub mod insert; -mod util; +pub mod util; pub use alter::{alter_expr_to_request, create_expr_to_request, create_table_schema}; pub use insert::{build_create_expr_from_insertion, find_new_columns}; diff --git a/src/common/grpc-expr/src/util.rs b/src/common/grpc-expr/src/util.rs index 89e6b2dd27..806f848173 100644 --- a/src/common/grpc-expr/src/util.rs +++ b/src/common/grpc-expr/src/util.rs @@ -19,39 +19,45 @@ use api::v1::{ }; use datatypes::schema::Schema; use snafu::{ensure, OptionExt}; +use table::engine::TableReference; use table::metadata::TableId; use crate::error::{ DuplicatedColumnNameSnafu, DuplicatedTimestampColumnSnafu, MissingTimestampColumnSnafu, Result, }; -pub struct ColumnExpr { - pub column_name: String, +pub struct ColumnExpr<'a> { + pub column_name: &'a str, pub datatype: i32, pub semantic_type: i32, } -impl ColumnExpr { +impl<'a> ColumnExpr<'a> { #[inline] - pub fn from_columns(columns: &[Column]) -> Vec { + pub fn from_columns(columns: &'a [Column]) -> Vec { columns.iter().map(Self::from).collect() } + + #[inline] + pub fn from_column_schemas(schemas: &'a [ColumnSchema]) -> Vec { + schemas.iter().map(Self::from).collect() + } } -impl From<&Column> for ColumnExpr { - fn from(column: &Column) -> Self { +impl<'a> From<&'a Column> for ColumnExpr<'a> { + fn from(column: &'a Column) -> Self { Self { - column_name: column.column_name.clone(), + column_name: &column.column_name, datatype: column.datatype, semantic_type: column.semantic_type, } } } -impl From<&ColumnSchema> for ColumnExpr { - fn from(schema: &ColumnSchema) -> Self { +impl<'a> From<&'a ColumnSchema> for ColumnExpr<'a> { + fn from(schema: &'a ColumnSchema) -> Self { Self { - column_name: schema.column_name.clone(), + column_name: &schema.column_name, datatype: schema.datatype, semantic_type: schema.semantic_type, } @@ -59,10 +65,8 @@ impl From<&ColumnSchema> for ColumnExpr { } pub fn build_create_table_expr( - catalog_name: &str, - schema_name: &str, table_id: Option, - table_name: &str, + table_name: &TableReference<'_>, column_exprs: Vec, engine: &str, desc: &str, @@ -77,8 +81,8 @@ pub fn build_create_table_expr( let mut distinct_names = HashSet::with_capacity(column_exprs.len()); for ColumnExpr { column_name, .. } in &column_exprs { ensure!( - distinct_names.insert(column_name), - DuplicatedColumnNameSnafu { name: column_name } + distinct_names.insert(*column_name), + DuplicatedColumnNameSnafu { name: *column_name } ); } @@ -94,16 +98,16 @@ pub fn build_create_table_expr( { let mut is_nullable = true; match semantic_type { - v if v == SemanticType::Tag as i32 => primary_keys.push(column_name.clone()), + v if v == SemanticType::Tag as i32 => primary_keys.push(column_name.to_string()), v if v == SemanticType::Timestamp as i32 => { ensure!( time_index.is_none(), DuplicatedTimestampColumnSnafu { exists: time_index.unwrap(), - duplicated: &column_name, + duplicated: column_name, } ); - time_index = Some(column_name.clone()); + time_index = Some(column_name.to_string()); // Timestamp column must not be null. is_nullable = false; } @@ -111,7 +115,7 @@ pub fn build_create_table_expr( } let column_def = ColumnDef { - name: column_name, + name: column_name.to_string(), datatype, is_nullable, default_constraint: vec![], @@ -120,13 +124,13 @@ pub fn build_create_table_expr( } let time_index = time_index.context(MissingTimestampColumnSnafu { - msg: format!("table is {}", table_name), + msg: format!("table is {}", table_name.table), })?; let expr = CreateTableExpr { - catalog_name: catalog_name.to_string(), - schema_name: schema_name.to_string(), - table_name: table_name.to_string(), + catalog_name: table_name.catalog.to_string(), + schema_name: table_name.schema.to_string(), + table_name: table_name.table.to_string(), desc: desc.to_string(), column_defs, time_index, @@ -148,11 +152,11 @@ pub fn extract_new_columns( ) -> Result> { let columns_to_add = column_exprs .into_iter() - .filter(|expr| schema.column_schema_by_name(&expr.column_name).is_none()) + .filter(|expr| schema.column_schema_by_name(expr.column_name).is_none()) .map(|expr| { let is_key = expr.semantic_type == SemanticType::Tag as i32; let column_def = Some(ColumnDef { - name: expr.column_name, + name: expr.column_name.to_string(), datatype: expr.datatype, is_nullable: true, default_constraint: vec![], @@ -170,7 +174,7 @@ pub fn extract_new_columns( } else { let mut distinct_names = HashSet::with_capacity(columns_to_add.len()); for add_column in &columns_to_add { - let name = &add_column.column_def.as_ref().unwrap().name; + let name = add_column.column_def.as_ref().unwrap().name.as_str(); ensure!( distinct_names.insert(name), DuplicatedColumnNameSnafu { name } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 7cd3a6eca9..0b4186a3e9 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -478,6 +478,31 @@ pub enum Error { location: Location, }, + #[snafu(display( + "Invalid insert row len, table: {}, expected: {}, actual: {}", + table_name, + expected, + actual + ))] + InvalidInsertRowLen { + table_name: String, + expected: usize, + actual: usize, + location: Location, + }, + + #[snafu(display("Column datatype error, source: {}", source))] + ColumnDataType { + location: Location, + source: api::error::Error, + }, + + #[snafu(display("Failed to create vector, source: {}", source))] + CreateVector { + location: Location, + source: datatypes::error::Error, + }, + #[snafu(display("Unexpected, violated: {}", violated))] Unexpected { violated: String, @@ -557,6 +582,7 @@ impl ErrorExt for Error { TableEngineNotFound { source, .. } | EngineProcedureNotFound { source, .. } => { source.status_code() } + CreateVector { source, .. } => source.status_code(), TableNotFound { .. } => StatusCode::TableNotFound, ColumnNotFound { .. } => StatusCode::TableColumnNotFound, @@ -583,7 +609,9 @@ impl ErrorExt for Error { | MissingMetasrvOpts { .. } | ColumnNoneDefaultValue { .. } | MissingWalDirConfig { .. } - | PrepareImmutableTable { .. } => StatusCode::InvalidArguments, + | PrepareImmutableTable { .. } + | InvalidInsertRowLen { .. } + | ColumnDataType { .. } => StatusCode::InvalidArguments, EncodeJson { .. } | DecodeJson { .. } | PayloadNotExist { .. } | Unexpected { .. } => { StatusCode::Unexpected diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index e7d4cbbf61..84daece242 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -67,6 +67,7 @@ use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::heartbeat::handler::close_region::CloseRegionHandler; use crate::heartbeat::handler::open_region::OpenRegionHandler; use crate::heartbeat::HeartbeatTask; +use crate::row_inserter::RowInserter; use crate::sql::{SqlHandler, SqlRequest}; use crate::store; @@ -81,6 +82,7 @@ pub struct Instance { pub(crate) sql_handler: SqlHandler, pub(crate) catalog_manager: CatalogManagerRef, pub(crate) table_id_provider: Option, + row_inserter: RowInserter, procedure_manager: ProcedureManagerRef, greptimedb_telemetry_task: Arc, } @@ -279,10 +281,14 @@ impl Instance { plugins, ); let query_engine = factory.query_engine(); - let procedure_manager = create_procedure_manager(opts.node_id.unwrap_or(0), &opts.procedure, object_store) .await?; + let sql_handler = SqlHandler::new( + engine_manager.clone(), + catalog_manager.clone(), + procedure_manager.clone(), + ); // Register all procedures. // Register procedures of the mito engine. mito_engine.register_procedure_loaders(&*procedure_manager); @@ -295,23 +301,22 @@ impl Instance { mito_engine.clone(), &*procedure_manager, ); + let row_inserter = RowInserter::new(catalog_manager.clone()); + let greptimedb_telemetry_task = get_greptimedb_telemetry_task( + Some(opts.storage.data_home.clone()), + &opts.mode, + opts.enable_telemetry, + ) + .await; let instance = Arc::new(Self { query_engine: query_engine.clone(), - sql_handler: SqlHandler::new( - engine_manager.clone(), - catalog_manager.clone(), - procedure_manager.clone(), - ), + sql_handler, catalog_manager: catalog_manager.clone(), table_id_provider, + row_inserter, procedure_manager, - greptimedb_telemetry_task: get_greptimedb_telemetry_task( - Some(opts.storage.data_home.clone()), - &opts.mode, - opts.enable_telemetry, - ) - .await, + greptimedb_telemetry_task, }); let heartbeat_task = Instance::build_heartbeat_task( diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index f857e13f3f..2d9e527f69 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use api::v1::ddl_request::Expr as DdlExpr; use api::v1::greptime_request::Request; use api::v1::query_request::Query; -use api::v1::{CreateDatabaseExpr, DdlRequest, DeleteRequests, InsertRequests}; +use api::v1::{CreateDatabaseExpr, DdlRequest, DeleteRequests, InsertRequests, RowInsertRequests}; use async_trait::async_trait; use catalog::CatalogManagerRef; use common_grpc_expr::insert::to_table_insert_request; @@ -129,7 +129,7 @@ impl Instance { pub async fn handle_inserts( &self, requests: InsertRequests, - ctx: &QueryContextRef, + ctx: QueryContextRef, ) -> Result { let results = future::try_join_all(requests.inserts.into_iter().map(|insert| { let catalog_manager = self.catalog_manager.clone(); @@ -164,6 +164,14 @@ impl Instance { Ok(Output::AffectedRows(affected_rows)) } + pub async fn handle_row_inserts( + &self, + requests: RowInsertRequests, + ctx: QueryContextRef, + ) -> Result { + self.row_inserter.handle_inserts(requests, ctx).await + } + async fn handle_deletes( &self, request: DeleteRequests, @@ -221,7 +229,7 @@ impl GrpcQueryHandler for Instance { async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result { match request { - Request::Inserts(requests) => self.handle_inserts(requests, &ctx).await, + Request::Inserts(requests) => self.handle_inserts(requests, ctx).await, Request::Deletes(request) => self.handle_deletes(request, ctx).await, Request::Query(query_request) => { let query = query_request @@ -232,8 +240,9 @@ impl GrpcQueryHandler for Instance { self.handle_query(query, ctx).await } Request::Ddl(request) => self.handle_ddl(request, ctx).await, - Request::RowInserts(_) | Request::RowDeletes(_) => UnsupportedGrpcRequestSnafu { - kind: "row insert/delete", + Request::RowInserts(requests) => self.handle_row_inserts(requests, ctx).await, + Request::RowDeletes(_) => UnsupportedGrpcRequestSnafu { + kind: "row deletes", } .fail(), } diff --git a/src/datanode/src/lib.rs b/src/datanode/src/lib.rs index 9097d680c1..f5b313188b 100644 --- a/src/datanode/src/lib.rs +++ b/src/datanode/src/lib.rs @@ -24,6 +24,7 @@ pub mod metrics; #[cfg(any(test, feature = "testing"))] mod mock; pub mod region_server; +mod row_inserter; pub mod server; pub mod sql; mod store; diff --git a/src/datanode/src/row_inserter.rs b/src/datanode/src/row_inserter.rs new file mode 100644 index 0000000000..8b2fde825d --- /dev/null +++ b/src/datanode/src/row_inserter.rs @@ -0,0 +1,143 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::helper; +use api::helper::ColumnDataTypeWrapper; +use api::v1::{RowInsertRequest, RowInsertRequests}; +use catalog::CatalogManagerRef; +use common_query::Output; +use datatypes::data_type::{ConcreteDataType, DataType}; +use futures_util::future; +use session::context::QueryContextRef; +use snafu::{ensure, OptionExt, ResultExt}; +use table::requests::InsertRequest; + +use crate::error::{ + CatalogSnafu, ColumnDataTypeSnafu, CreateVectorSnafu, InsertSnafu, InvalidInsertRowLenSnafu, + JoinTaskSnafu, Result, TableNotFoundSnafu, +}; + +pub struct RowInserter { + catalog_manager: CatalogManagerRef, +} + +impl RowInserter { + pub fn new(catalog_manager: CatalogManagerRef) -> Self { + Self { catalog_manager } + } + + pub async fn handle_inserts( + &self, + requests: RowInsertRequests, + ctx: QueryContextRef, + ) -> Result { + let insert_tasks = requests.inserts.into_iter().map(|insert| { + let catalog_manager = self.catalog_manager.clone(); + let catalog_name = ctx.current_catalog().to_owned(); + let schema_name = ctx.current_schema().to_owned(); + let table_name = insert.table_name.clone(); + + let insert_task = async move { + let Some(request) = + convert_to_table_insert_request(&catalog_name, &schema_name, insert)? + else { + // empty data + return Ok(0usize); + }; + + let table = catalog_manager + .table(&catalog_name, &schema_name, &table_name) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: format!("{catalog_name}.{schema_name}.{table_name}"), + })?; + + table.insert(request).await.with_context(|_| InsertSnafu { + table_name: format!("{catalog_name}.{schema_name}.{table_name}"), + }) + }; + + common_runtime::spawn_write(insert_task) + }); + + let results = future::try_join_all(insert_tasks) + .await + .context(JoinTaskSnafu)?; + let affected_rows = results.into_iter().sum::>()?; + + Ok(Output::AffectedRows(affected_rows)) + } +} + +fn convert_to_table_insert_request( + catalog_name: &str, + schema_name: &str, + request: RowInsertRequest, +) -> Result> { + let table_name = request.table_name; + let region_number = request.region_number; + let Some(rows) = request.rows else { + return Ok(None); + }; + let schema = rows.schema; + let rows = rows.rows; + let num_columns = schema.len(); + let num_rows = rows.len(); + + if num_rows == 0 || num_columns == 0 { + return Ok(None); + } + + let mut columns_values = Vec::with_capacity(num_columns); + for column_schema in schema { + let datatype: ConcreteDataType = ColumnDataTypeWrapper::try_new(column_schema.datatype) + .context(ColumnDataTypeSnafu)? + .into(); + let mutable_vector = datatype.create_mutable_vector(num_rows); + columns_values.push((column_schema.column_name, mutable_vector)); + } + + for row in rows { + ensure!( + row.values.len() == num_columns, + InvalidInsertRowLenSnafu { + table_name: format!("{catalog_name}.{schema_name}.{table_name}"), + expected: num_columns, + actual: row.values.len(), + } + ); + + for ((_, mutable_vector), value) in columns_values.iter_mut().zip(row.values.iter()) { + mutable_vector + .try_push_value_ref(helper::pb_value_to_value_ref(value)) + .context(CreateVectorSnafu)?; + } + } + + let columns_values = columns_values + .into_iter() + .map(|(k, mut v)| (k, v.to_vector())) + .collect(); + + let insert_request = InsertRequest { + catalog_name: catalog_name.to_string(), + schema_name: schema_name.to_string(), + table_name, + columns_values, + region_number, + }; + + Ok(Some(insert_request)) +} diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index efc3ae912a..72a7928914 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -597,6 +597,9 @@ pub enum Error { source: auth::error::Error, location: Location, }, + + #[snafu(display("Empty data: {}", msg))] + EmptyData { msg: String, location: Location }, } pub type Result = std::result::Result; @@ -618,7 +621,8 @@ impl ErrorExt for Error { | Error::PrepareImmutableTable { .. } | Error::BuildCsvConfig { .. } | Error::ProjectSchema { .. } - | Error::UnsupportedFormat { .. } => StatusCode::InvalidArguments, + | Error::UnsupportedFormat { .. } + | Error::EmptyData { .. } => StatusCode::InvalidArguments, Error::NotSupported { .. } => StatusCode::Unsupported, diff --git a/src/frontend/src/expr_factory.rs b/src/frontend/src/expr_factory.rs index 23c0869876..164550d4f1 100644 --- a/src/frontend/src/expr_factory.rs +++ b/src/frontend/src/expr_factory.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::collections::HashMap; -use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; use api::v1::alter_expr::Kind; @@ -22,6 +21,7 @@ use api::v1::{ DropColumns, RenameTable, }; use common_error::ext::BoxedError; +use common_grpc_expr::util::ColumnExpr; use datanode::instance::sql::table_idents_to_full_name; use datatypes::schema::ColumnSchema; use file_table_engine::table::immutable::ImmutableFileTableOptions; @@ -33,49 +33,51 @@ use sql::statements::alter::{AlterTable, AlterTableOperation}; use sql::statements::create::{CreateExternalTable, CreateTable, TIME_INDEX}; use sql::statements::{column_def_to_schema, sql_column_def_to_grpc_column_def}; use sql::util::to_lowercase_options_map; +use table::engine::TableReference; use table::requests::{TableOptions, IMMUTABLE_TABLE_META_KEY}; use crate::error::{ - self, BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, - ConvertColumnDefaultConstraintSnafu, ExternalSnafu, IllegalPrimaryKeysDefSnafu, - InvalidSqlSnafu, ParseSqlSnafu, Result, + BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu, + EncodeJsonSnafu, ExternalSnafu, IllegalPrimaryKeysDefSnafu, InvalidSqlSnafu, NotSupportedSnafu, + ParseSqlSnafu, PrepareImmutableTableSnafu, Result, UnrecognizedTableOptionSnafu, }; -pub type CreateExprFactoryRef = Arc; +#[derive(Debug, Copy, Clone)] +pub struct CreateExprFactory; -#[async_trait::async_trait] -pub trait CreateExprFactory { - async fn create_expr_by_columns( +impl CreateExprFactory { + pub fn create_table_expr_by_columns( &self, - catalog_name: &str, - schema_name: &str, - table_name: &str, - columns: &[Column], - engine: &str, - ) -> crate::error::Result; -} - -#[derive(Debug)] -pub struct DefaultCreateExprFactory; - -#[async_trait::async_trait] -impl CreateExprFactory for DefaultCreateExprFactory { - async fn create_expr_by_columns( - &self, - catalog_name: &str, - schema_name: &str, - table_name: &str, + table_name: &TableReference<'_>, columns: &[Column], engine: &str, ) -> Result { - let table_id = None; - let create_expr = common_grpc_expr::build_create_expr_from_insertion( - catalog_name, - schema_name, - table_id, + let column_exprs = ColumnExpr::from_columns(columns); + let create_expr = common_grpc_expr::util::build_create_table_expr( + None, table_name, - columns, + column_exprs, engine, + "Created on insertion", + ) + .context(BuildCreateExprOnInsertionSnafu)?; + + Ok(create_expr) + } + + pub fn create_table_expr_by_column_schemas( + &self, + table_name: &TableReference<'_>, + column_schemas: &[api::v1::ColumnSchema], + engine: &str, + ) -> Result { + let column_exprs = ColumnExpr::from_column_schemas(column_schemas); + let create_expr = common_grpc_expr::util::build_create_table_expr( + None, + table_name, + column_exprs, + engine, + "Created on insertion", ) .context(BuildCreateExprOnInsertionSnafu)?; @@ -90,18 +92,18 @@ pub(crate) async fn create_external_expr( let (catalog_name, schema_name, table_name) = table_idents_to_full_name(&create.name, query_ctx) .map_err(BoxedError::new) - .context(error::ExternalSnafu)?; + .context(ExternalSnafu)?; let mut options = create.options; let (files, schema) = prepare_immutable_file_table_files_and_schema(&options, &create.columns) .await - .context(error::PrepareImmutableTableSnafu)?; + .context(PrepareImmutableTableSnafu)?; let meta = ImmutableFileTableOptions { files }; let _ = options.insert( IMMUTABLE_TABLE_META_KEY.to_string(), - serde_json::to_string(&meta).context(error::EncodeJsonSnafu)?, + serde_json::to_string(&meta).context(EncodeJsonSnafu)?, ); let expr = CreateTableExpr { @@ -126,12 +128,12 @@ pub fn create_to_expr(create: &CreateTable, query_ctx: QueryContextRef) -> Resul let (catalog_name, schema_name, table_name) = table_idents_to_full_name(&create.name, query_ctx) .map_err(BoxedError::new) - .context(error::ExternalSnafu)?; + .context(ExternalSnafu)?; let time_index = find_time_index(&create.constraints)?; let table_options = HashMap::from( &TableOptions::try_from(&to_lowercase_options_map(&create.options)) - .context(error::UnrecognizedTableOptionSnafu)?, + .context(UnrecognizedTableOptionSnafu)?, ); let expr = CreateTableExpr { catalog_name, @@ -201,7 +203,7 @@ fn find_primary_keys( Ok(primary_keys) } -pub fn find_time_index(constraints: &[TableConstraint]) -> crate::error::Result { +pub fn find_time_index(constraints: &[TableConstraint]) -> Result { let time_index = constraints .iter() .filter_map(|constraint| match constraint { @@ -229,10 +231,7 @@ pub fn find_time_index(constraints: &[TableConstraint]) -> crate::error::Result< Ok(time_index.first().unwrap().to_string()) } -fn columns_to_expr( - column_defs: &[ColumnDef], - time_index: &str, -) -> crate::error::Result> { +fn columns_to_expr(column_defs: &[ColumnDef], time_index: &str) -> Result> { let column_schemas = column_defs .iter() .map(|c| column_def_to_schema(c, c.name.to_string() == time_index).context(ParseSqlSnafu)) @@ -286,7 +285,7 @@ pub(crate) fn to_alter_expr( let kind = match alter_table.alter_operation() { AlterTableOperation::AddConstraint(_) => { - return error::NotSupportedSnafu { + return NotSupportedSnafu { feat: "ADD CONSTRAINT", } .fail(); diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 00cbe4e0be..44cbf6e98b 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -29,7 +29,9 @@ use api::v1::alter_expr::Kind; use api::v1::ddl_request::Expr as DdlExpr; use api::v1::greptime_request::Request; use api::v1::meta::Role; -use api::v1::{AddColumns, AlterExpr, Column, DdlRequest, InsertRequest, InsertRequests}; +use api::v1::{ + AddColumns, AlterExpr, Column, DdlRequest, InsertRequest, InsertRequests, RowInsertRequests, +}; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use catalog::remote::CachedMetaKvBackend; @@ -76,6 +78,7 @@ use sql::parser::ParserContext; use sql::statements::copy::CopyTable; use sql::statements::statement::Statement; use sqlparser::ast::ObjectName; +use table::engine::TableReference; use crate::catalog::FrontendCatalogManager; use crate::error::{ @@ -83,12 +86,13 @@ use crate::error::{ InvalidInsertRequestSnafu, MissingMetasrvOptsSnafu, ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu, }; -use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory}; +use crate::expr_factory::CreateExprFactory; use crate::frontend::FrontendOptions; use crate::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler; use crate::heartbeat::HeartbeatTask; use crate::instance::standalone::StandaloneGrpcQueryHandler; use crate::metrics; +use crate::row_inserter::RowInserter; use crate::script::ScriptExecutor; use crate::server::{start_server, ServerHandlers, Services}; use crate::statement::StatementExecutor; @@ -120,16 +124,13 @@ pub struct Instance { statement_executor: Arc, query_engine: QueryEngineRef, grpc_query_handler: GrpcQueryHandlerRef, - - create_expr_factory: CreateExprFactoryRef, - + create_expr_factory: CreateExprFactory, /// plugins: this map holds extensions to customize query or auth /// behaviours. plugins: Arc, - servers: Arc, - heartbeat_task: Option, + row_inserter: Arc, } impl Instance { @@ -209,16 +210,26 @@ impl Instance { common_telemetry::init_node_id(opts.node_id.clone()); + let create_expr_factory = CreateExprFactory; + + let row_inserter = Arc::new(RowInserter::new( + MITO_ENGINE.to_string(), + catalog_manager.clone(), + create_expr_factory, + dist_instance.clone(), + )); + Ok(Instance { catalog_manager, script_executor, - create_expr_factory: Arc::new(DefaultCreateExprFactory), + create_expr_factory, statement_executor, query_engine, grpc_query_handler: dist_instance, plugins: plugins.clone(), servers: Arc::new(HashMap::new()), heartbeat_task, + row_inserter, }) } @@ -271,16 +282,27 @@ impl Instance { dn_instance.clone(), )); + let create_expr_factory = CreateExprFactory; + let grpc_query_handler = StandaloneGrpcQueryHandler::arc(dn_instance.clone()); + + let row_inserter = Arc::new(RowInserter::new( + MITO_ENGINE.to_string(), + catalog_manager.clone(), + create_expr_factory, + grpc_query_handler.clone(), + )); + Ok(Instance { catalog_manager: catalog_manager.clone(), script_executor, - create_expr_factory: Arc::new(DefaultCreateExprFactory), + create_expr_factory, statement_executor, query_engine, - grpc_query_handler: StandaloneGrpcQueryHandler::arc(dn_instance.clone()), + grpc_query_handler, plugins: Default::default(), servers: Arc::new(HashMap::new()), heartbeat_task: None, + row_inserter, }) } @@ -295,6 +317,15 @@ impl Instance { &self.catalog_manager } + // Handle batch inserts with row-format + pub async fn handle_row_inserts( + &self, + requests: RowInsertRequests, + ctx: QueryContextRef, + ) -> Result { + self.row_inserter.handle_inserts(requests, ctx).await + } + /// Handle batch inserts pub async fn handle_inserts( &self, @@ -379,10 +410,10 @@ impl Instance { let schema_name = ctx.current_schema(); // Create table automatically, build schema from data. - let create_expr = self - .create_expr_factory - .create_expr_by_columns(catalog_name, schema_name, table_name, columns, engine) - .await?; + let table_name = TableReference::full(catalog_name, schema_name, table_name); + let create_expr = + self.create_expr_factory + .create_table_expr_by_columns(&table_name, columns, engine)?; info!( "Try to create table: {} automatically with request: {:?}", diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 4f8d39d0b8..bb59dec2ae 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -666,7 +666,7 @@ impl GrpcQueryHandler for DistInstance { match request { Request::Inserts(requests) => self.handle_dist_insert(requests, ctx).await, Request::RowInserts(_) | Request::RowDeletes(_) => NotSupportedSnafu { - feat: "row insert/delete", + feat: "row inserts/deletes", } .fail(), Request::Deletes(requests) => self.handle_dist_delete(requests, ctx).await, diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 9a3b1c06b1..e888cff1db 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -44,9 +44,10 @@ impl GrpcQueryHandler for Instance { let output = match request { Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?, - Request::RowInserts(_) | Request::RowDeletes(_) => { + Request::RowInserts(requests) => self.handle_row_inserts(requests, ctx.clone()).await?, + Request::RowDeletes(_) => { return NotSupportedSnafu { - feat: "row insert/delete", + feat: "row deletes", } .fail(); } diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs index d85b6a0f5e..ed2088ae96 100644 --- a/src/frontend/src/instance/influxdb.rs +++ b/src/frontend/src/instance/influxdb.rs @@ -27,7 +27,7 @@ use crate::instance::Instance; impl InfluxdbLineProtocolHandler for Instance { async fn exec( &self, - request: &InfluxdbRequest, + request: InfluxdbRequest, ctx: QueryContextRef, ) -> servers::error::Result<()> { self.plugins diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 1c09176e5c..ab76e8d3a1 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -22,6 +22,7 @@ pub mod frontend; pub mod heartbeat; pub mod instance; pub(crate) mod metrics; +mod row_inserter; mod script; mod server; pub mod service_config; diff --git a/src/frontend/src/row_inserter.rs b/src/frontend/src/row_inserter.rs new file mode 100644 index 0000000000..d83af0bd65 --- /dev/null +++ b/src/frontend/src/row_inserter.rs @@ -0,0 +1,172 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::alter_expr::Kind; +use api::v1::ddl_request::Expr; +use api::v1::greptime_request::Request; +use api::v1::{AlterExpr, ColumnSchema, DdlRequest, Row, RowInsertRequest, RowInsertRequests}; +use catalog::CatalogManagerRef; +use common_grpc_expr::util::{extract_new_columns, ColumnExpr}; +use common_query::Output; +use common_telemetry::info; +use servers::query_handler::grpc::GrpcQueryHandlerRef; +use session::context::QueryContextRef; +use snafu::{ensure, OptionExt, ResultExt}; +use table::engine::TableReference; +use table::TableRef; + +use crate::error::{CatalogSnafu, EmptyDataSnafu, Error, FindNewColumnsOnInsertionSnafu, Result}; +use crate::expr_factory::CreateExprFactory; + +pub struct RowInserter { + engine_name: String, + catalog_manager: CatalogManagerRef, + create_expr_factory: CreateExprFactory, + grpc_query_handler: GrpcQueryHandlerRef, +} + +impl RowInserter { + pub fn new( + engine_name: String, + catalog_manager: CatalogManagerRef, + create_expr_factory: CreateExprFactory, + grpc_query_handler: GrpcQueryHandlerRef, + ) -> Self { + Self { + engine_name, + catalog_manager, + create_expr_factory, + grpc_query_handler, + } + } + + pub async fn handle_inserts( + &self, + requests: RowInsertRequests, + ctx: QueryContextRef, + ) -> Result { + self.create_or_alter_tables_on_demand(&requests, ctx.clone()) + .await?; + let query = Request::RowInserts(requests); + self.grpc_query_handler.do_query(query, ctx).await + } + + // check if tables already exist: + // - if table does not exist, create table by inferred CreateExpr + // - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr` + async fn create_or_alter_tables_on_demand( + &self, + requests: &RowInsertRequests, + ctx: QueryContextRef, + ) -> Result<()> { + let catalog_name = ctx.current_catalog(); + let schema_name = ctx.current_schema(); + + // TODO(jeremy): create and alter in batch? + for req in &requests.inserts { + let table_name = &req.table_name; + let table = self + .catalog_manager + .table(catalog_name, schema_name, table_name) + .await + .context(CatalogSnafu)?; + match table { + Some(table) => { + self.alter_table_on_demand(catalog_name, schema_name, table, req, ctx.clone()) + .await? + } + None => { + let table_name = TableReference::full(catalog_name, schema_name, table_name); + self.create_table(&table_name, req, ctx.clone()).await? + } + } + } + + Ok(()) + } + + async fn create_table( + &self, + table_name: &TableReference<'_>, + req: &RowInsertRequest, + ctx: QueryContextRef, + ) -> Result<()> { + let (column_schemas, _) = extract_schema_and_rows(req)?; + let create_table_expr = self + .create_expr_factory + .create_table_expr_by_column_schemas(table_name, column_schemas, &self.engine_name)?; + + let req = Request::Ddl(DdlRequest { + expr: Some(Expr::CreateTable(create_table_expr)), + }); + self.grpc_query_handler.do_query(req, ctx).await?; + + Ok(()) + } + + async fn alter_table_on_demand( + &self, + catalog_name: &str, + schema_name: &str, + table: TableRef, + req: &RowInsertRequest, + ctx: QueryContextRef, + ) -> Result<()> { + let (column_schemas, _) = extract_schema_and_rows(req)?; + let column_exprs = ColumnExpr::from_column_schemas(column_schemas); + let add_columns = extract_new_columns(&table.schema(), column_exprs) + .context(FindNewColumnsOnInsertionSnafu)?; + let Some(add_columns) = add_columns else { + return Ok(()); + }; + let table_name = table.table_info().name.clone(); + + info!( + "Adding new columns: {:?} to table: {}.{}.{}", + add_columns, catalog_name, schema_name, table_name + ); + + let alter_table_expr = AlterExpr { + catalog_name: catalog_name.to_string(), + schema_name: schema_name.to_string(), + table_name, + kind: Some(Kind::AddColumns(add_columns)), + ..Default::default() + }; + + let req = Request::Ddl(DdlRequest { + expr: Some(Expr::Alter(alter_table_expr)), + }); + self.grpc_query_handler.do_query(req, ctx).await?; + + Ok(()) + } +} + +fn extract_schema_and_rows(req: &RowInsertRequest) -> Result<(&[ColumnSchema], &[Row])> { + let rows = req.rows.as_ref().with_context(|| EmptyDataSnafu { + msg: format!("insert to table: {:?}", &req.table_name), + })?; + let schema = &rows.schema; + let rows = &rows.rows; + + ensure!( + !rows.is_empty(), + EmptyDataSnafu { + msg: format!("insert to table: {:?}", &req.table_name), + } + ); + + Ok((schema, rows)) +} diff --git a/src/meta-srv/src/service/store/etcd.rs b/src/meta-srv/src/service/store/etcd.rs index 21aa4cf702..a9d8f70713 100644 --- a/src/meta-srv/src/service/store/etcd.rs +++ b/src/meta-srv/src/service/store/etcd.rs @@ -391,7 +391,7 @@ struct Get { } impl TryFrom for Get { - type Error = error::Error; + type Error = Error; fn try_from(req: RangeRequest) -> Result { let RangeRequest { @@ -428,7 +428,7 @@ struct Put { } impl TryFrom for Put { - type Error = error::Error; + type Error = Error; fn try_from(req: PutRequest) -> Result { let PutRequest { @@ -456,7 +456,7 @@ struct BatchGet { } impl TryFrom for BatchGet { - type Error = error::Error; + type Error = Error; fn try_from(req: BatchGetRequest) -> Result { let BatchGetRequest { keys } = req; @@ -476,7 +476,7 @@ struct BatchPut { } impl TryFrom for BatchPut { - type Error = error::Error; + type Error = Error; fn try_from(req: BatchPutRequest) -> Result { let BatchPutRequest { kvs, prev_kv } = req; @@ -499,7 +499,7 @@ struct BatchDelete { } impl TryFrom for BatchDelete { - type Error = error::Error; + type Error = Error; fn try_from(req: BatchDeleteRequest) -> Result { let BatchDeleteRequest { keys, prev_kv } = req; @@ -524,7 +524,7 @@ struct CompareAndPut { } impl TryFrom for CompareAndPut { - type Error = error::Error; + type Error = Error; fn try_from(req: CompareAndPutRequest) -> Result { let CompareAndPutRequest { key, expect, value } = req; @@ -544,7 +544,7 @@ struct Delete { } impl TryFrom for Delete { - type Error = error::Error; + type Error = Error; fn try_from(req: DeleteRangeRequest) -> Result { let DeleteRangeRequest { @@ -577,7 +577,7 @@ struct MoveValue { } impl TryFrom for MoveValue { - type Error = error::Error; + type Error = Error; fn try_from(req: MoveValueRequest) -> Result { let MoveValueRequest { from_key, to_key } = req; diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 52c3842e6d..e1bb625453 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -328,6 +328,21 @@ pub enum Error { actual: opensrv_mysql::ColumnType, location: Location, }, + + #[snafu(display( + "Column: {}, {} incompatible, expected: {}, actual: {}", + column_name, + datatype, + expected, + actual + ))] + IncompatibleSchema { + column_name: String, + datatype: String, + expected: i32, + actual: i32, + location: Location, + }, } pub type Result = std::result::Result; @@ -371,7 +386,8 @@ impl ErrorExt for Error { | InvalidPrepareStatement { .. } | DataFrame { .. } | PreparedStmtTypeMismatch { .. } - | TimePrecision { .. } => StatusCode::InvalidArguments, + | TimePrecision { .. } + | IncompatibleSchema { .. } => StatusCode::InvalidArguments, InfluxdbLinesWrite { source, .. } | PromSeriesWrite { source, .. } diff --git a/src/servers/src/http/influxdb.rs b/src/servers/src/http/influxdb.rs index 6879ca42d5..9e3bec8558 100644 --- a/src/servers/src/http/influxdb.rs +++ b/src/servers/src/http/influxdb.rs @@ -97,7 +97,7 @@ pub async fn influxdb_write( let request = InfluxdbRequest { precision, lines }; - handler.exec(&request, ctx).await?; + handler.exec(request, ctx).await?; Ok((StatusCode::NO_CONTENT, ())) } diff --git a/src/servers/src/influxdb.rs b/src/servers/src/influxdb.rs index 4b302c1610..82dd832393 100644 --- a/src/servers/src/influxdb.rs +++ b/src/servers/src/influxdb.rs @@ -14,14 +14,22 @@ use std::collections::HashMap; -use api::v1::{InsertRequest as GrpcInsertRequest, InsertRequests}; +use api::v1::value::ValueData; +use api::v1::{ + ColumnDataType, ColumnSchema, InsertRequest as GrpcInsertRequest, InsertRequests, Row, + RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value, +}; +use common_grpc::writer; use common_grpc::writer::{LinesWriter, Precision}; use common_time::timestamp::TimeUnit; use common_time::Timestamp; -use influxdb_line_protocol::{parse_lines, FieldValue}; -use snafu::{OptionExt, ResultExt}; +use influxdb_line_protocol::{parse_lines, FieldSet, FieldValue, TagSet}; +use snafu::{ensure, OptionExt, ResultExt}; -use crate::error::{Error, InfluxdbLineProtocolSnafu, InfluxdbLinesWriteSnafu, TimePrecisionSnafu}; +use crate::error::{ + Error, IncompatibleSchemaSnafu, InfluxdbLineProtocolSnafu, InfluxdbLinesWriteSnafu, + TimePrecisionSnafu, +}; pub const INFLUXDB_TIMESTAMP_COLUMN_NAME: &str = "ts"; pub const DEFAULT_TIME_PRECISION: Precision = Precision::Nanosecond; @@ -34,10 +42,10 @@ pub struct InfluxdbRequest { type TableName = String; -impl TryFrom<&InfluxdbRequest> for InsertRequests { +impl TryFrom for InsertRequests { type Error = Error; - fn try_from(value: &InfluxdbRequest) -> Result { + fn try_from(value: InfluxdbRequest) -> Result { let mut writers: HashMap = HashMap::new(); let lines = parse_lines(&value.lines) .collect::>>() @@ -92,24 +100,14 @@ impl TryFrom<&InfluxdbRequest> for InsertRequests { } if let Some(timestamp) = line.timestamp { - let precision = unwarp_or_default_precision(value.precision); + let precision = unwrap_or_default_precision(value.precision); writer .write_ts(INFLUXDB_TIMESTAMP_COLUMN_NAME, (timestamp, precision)) .context(InfluxdbLinesWriteSnafu)?; } else { - let precision = unwarp_or_default_precision(value.precision); + let precision = unwrap_or_default_precision(value.precision); let timestamp = Timestamp::current_millis(); - let unit = match precision { - Precision::Second => TimeUnit::Second, - Precision::Millisecond => TimeUnit::Millisecond, - Precision::Microsecond => TimeUnit::Microsecond, - Precision::Nanosecond => TimeUnit::Nanosecond, - _ => { - return Err(Error::NotSupported { - feat: format!("convert {precision} into TimeUnit"), - }) - } - }; + let unit = get_time_unit(precision)?; let timestamp = timestamp .convert_to(unit) .with_context(|| TimePrecisionSnafu { @@ -141,7 +139,228 @@ impl TryFrom<&InfluxdbRequest> for InsertRequests { } } -fn unwarp_or_default_precision(precision: Option) -> Precision { +impl TryFrom for RowInsertRequests { + type Error = Error; + + fn try_from(value: InfluxdbRequest) -> Result { + let lines = parse_lines(&value.lines) + .collect::>>() + .context(InfluxdbLineProtocolSnafu)?; + + struct TableData<'a> { + schema: Vec, + rows: Vec, + column_indexes: HashMap<&'a str, usize>, + } + + let mut table_data_map = HashMap::new(); + + for line in &lines { + let table_name = line.series.measurement.as_str(); + let tags = &line.series.tag_set; + let fields = &line.field_set; + let ts = line.timestamp; + // tags.len + fields.len + timestamp(+1) + let num_columns = tags.as_ref().map(|x| x.len()).unwrap_or(0) + fields.len() + 1; + + let TableData { + schema, + rows, + column_indexes, + } = table_data_map + .entry(table_name) + .or_insert_with(|| TableData { + schema: Vec::with_capacity(num_columns), + rows: Vec::new(), + column_indexes: HashMap::with_capacity(num_columns), + }); + + let mut one_row = vec![Value { value_data: None }; schema.len()]; + + // tags + parse_tags(tags, column_indexes, schema, &mut one_row)?; + // fields + parse_fields(fields, column_indexes, schema, &mut one_row)?; + // timestamp + parse_ts(ts, value.precision, column_indexes, schema, &mut one_row)?; + + rows.push(Row { values: one_row }); + } + + let inserts = table_data_map + .into_iter() + .map( + |( + table_name, + TableData { + schema, mut rows, .. + }, + )| { + let num_columns = schema.len(); + for row in rows.iter_mut() { + if num_columns > row.values.len() { + row.values.resize(num_columns, Value { value_data: None }); + } + } + + RowInsertRequest { + table_name: table_name.to_string(), + rows: Some(Rows { schema, rows }), + ..Default::default() + } + }, + ) + .collect::>(); + + Ok(RowInsertRequests { inserts }) + } +} + +fn parse_tags<'a>( + tags: &'a Option, + column_indexes: &mut HashMap<&'a str, usize>, + schema: &mut Vec, + one_row: &mut Vec, +) -> Result<(), Error> { + let Some(tags) = tags else { + return Ok(()); + }; + + for (k, v) in tags { + let index = column_indexes.entry(k.as_str()).or_insert(schema.len()); + if *index == schema.len() { + schema.push(ColumnSchema { + column_name: k.to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as i32, + }); + one_row.push(to_value(ValueData::StringValue(v.to_string()))); + } else { + check_schema(ColumnDataType::String, SemanticType::Tag, &schema[*index])?; + one_row[*index].value_data = Some(ValueData::StringValue(v.to_string())); + } + } + + Ok(()) +} + +fn parse_fields<'a>( + fields: &'a FieldSet, + column_indexes: &mut HashMap<&'a str, usize>, + schema: &mut Vec, + one_row: &mut Vec, +) -> Result<(), Error> { + for (k, v) in fields { + let index = column_indexes.entry(k.as_str()).or_insert(schema.len()); + let (datatype, value) = match v { + FieldValue::I64(v) => (ColumnDataType::Int64, ValueData::I64Value(*v)), + FieldValue::U64(v) => (ColumnDataType::Uint64, ValueData::U64Value(*v)), + FieldValue::F64(v) => (ColumnDataType::Float64, ValueData::F64Value(*v)), + FieldValue::String(v) => ( + ColumnDataType::String, + ValueData::StringValue(v.to_string()), + ), + FieldValue::Boolean(v) => (ColumnDataType::Boolean, ValueData::BoolValue(*v)), + }; + + if *index == schema.len() { + schema.push(ColumnSchema { + column_name: k.to_string(), + datatype: datatype as i32, + semantic_type: SemanticType::Field as i32, + }); + one_row.push(to_value(value)); + } else { + check_schema(datatype, SemanticType::Field, &schema[*index])?; + one_row[*index].value_data = Some(value); + } + } + + Ok(()) +} + +fn parse_ts( + ts: Option, + precision: Option, + column_indexes: &mut HashMap<&str, usize>, + schema: &mut Vec, + one_row: &mut Vec, +) -> Result<(), Error> { + let precision = unwrap_or_default_precision(precision); + let ts = match ts { + Some(timestamp) => writer::to_ms_ts(precision, timestamp), + None => { + let timestamp = Timestamp::current_millis(); + let unit = get_time_unit(precision)?; + let timestamp = timestamp + .convert_to(unit) + .with_context(|| TimePrecisionSnafu { + name: precision.to_string(), + })?; + writer::to_ms_ts(precision, timestamp.into()) + } + }; + + let column_name = INFLUXDB_TIMESTAMP_COLUMN_NAME; + let index = column_indexes.entry(column_name).or_insert(schema.len()); + if *index == schema.len() { + schema.push(ColumnSchema { + column_name: column_name.to_string(), + datatype: ColumnDataType::TimestampMillisecond as i32, + semantic_type: SemanticType::Timestamp as i32, + }); + one_row.push(to_value(ValueData::TsMillisecondValue(ts))) + } else { + check_schema( + ColumnDataType::TimestampMillisecond, + SemanticType::Timestamp, + &schema[*index], + )?; + one_row[*index].value_data = Some(ValueData::TsMillisecondValue(ts)); + } + + Ok(()) +} + +#[inline] +fn check_schema( + datatype: ColumnDataType, + semantic_type: SemanticType, + schema: &ColumnSchema, +) -> Result<(), Error> { + ensure!( + schema.datatype == datatype as i32, + IncompatibleSchemaSnafu { + column_name: &schema.column_name, + datatype: "datatype", + expected: schema.datatype, + actual: datatype as i32, + } + ); + + ensure!( + schema.semantic_type == semantic_type as i32, + IncompatibleSchemaSnafu { + column_name: &schema.column_name, + datatype: "semantic_type", + expected: schema.semantic_type, + actual: semantic_type as i32, + } + ); + + Ok(()) +} + +// TODO(jeremy): impl From for Value +#[inline] +fn to_value(value: ValueData) -> Value { + Value { + value_data: Some(value), + } +} + +#[inline] +fn unwrap_or_default_precision(precision: Option) -> Precision { if let Some(val) = precision { val } else { @@ -149,6 +368,21 @@ fn unwarp_or_default_precision(precision: Option) -> Precision { } } +#[inline] +fn get_time_unit(precision: Precision) -> Result { + Ok(match precision { + Precision::Second => TimeUnit::Second, + Precision::Millisecond => TimeUnit::Millisecond, + Precision::Microsecond => TimeUnit::Microsecond, + Precision::Nanosecond => TimeUnit::Nanosecond, + _ => { + return Err(Error::NotSupported { + feat: format!("convert {precision} into TimeUnit"), + }) + } + }) +} + #[cfg(test)] mod tests { use api::v1::column::Values; @@ -166,7 +400,7 @@ monitor1,host=host2 memory=1027 1663840496400340001 monitor2,host=host3 cpu=66.5 1663840496100023102 monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; - let influxdb_req = &InfluxdbRequest { + let influxdb_req = InfluxdbRequest { precision: None, lines: lines.to_string(), }; @@ -306,4 +540,199 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; assert_eq!(b, bitvec.get(idx).unwrap()) } } + #[test] + fn test_convert_influxdb_lines_to_rows() { + let lines = r" +monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100 +monitor1,host=host2 memory=1027 1663840496400340001 +monitor2,host=host3 cpu=66.5 1663840496100023102 +monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; + + let influxdb_req = InfluxdbRequest { + precision: None, + lines: lines.to_string(), + }; + + let requests: RowInsertRequests = influxdb_req.try_into().unwrap(); + assert_eq!(2, requests.inserts.len()); + + for request in requests.inserts { + match &request.table_name[..] { + "monitor1" => assert_monitor1_rows(&request.rows), + "monitor2" => assert_monitor2_rows(&request.rows), + _ => panic!(), + } + } + } + + fn assert_monitor1_rows(rows: &Option) { + let rows = rows.as_ref().unwrap(); + let schema = &rows.schema; + let rows = &rows.rows; + assert_eq!(4, schema.len()); + assert_eq!(2, rows.len()); + + for (i, column_schema) in schema.iter().enumerate() { + match &column_schema.column_name[..] { + "host" => { + assert_eq!(ColumnDataType::String as i32, column_schema.datatype); + assert_eq!(SemanticType::Tag as i32, column_schema.semantic_type); + + for (j, row) in rows.iter().enumerate() { + let v = row.values[i].value_data.as_ref().unwrap(); + match j { + 0 => assert_eq!("host1", extract_string_value(v)), + 1 => assert_eq!("host2", extract_string_value(v)), + _ => panic!(), + } + } + } + "cpu" => { + assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype); + assert_eq!(SemanticType::Field as i32, column_schema.semantic_type); + + for (j, row) in rows.iter().enumerate() { + let v = row.values[i].value_data.as_ref(); + match j { + 0 => assert_eq!(66.6f64, extract_f64_value(v.as_ref().unwrap())), + 1 => assert_eq!(None, v), + _ => panic!(), + } + } + } + "memory" => { + assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype); + assert_eq!(SemanticType::Field as i32, column_schema.semantic_type); + + for (j, row) in rows.iter().enumerate() { + let v = row.values[i].value_data.as_ref(); + match j { + 0 => assert_eq!(1024f64, extract_f64_value(v.as_ref().unwrap())), + 1 => assert_eq!(1027f64, extract_f64_value(v.as_ref().unwrap())), + _ => panic!(), + } + } + } + "ts" => { + assert_eq!( + ColumnDataType::TimestampMillisecond as i32, + column_schema.datatype + ); + assert_eq!(SemanticType::Timestamp as i32, column_schema.semantic_type); + + for (j, row) in rows.iter().enumerate() { + let v = row.values[i].value_data.as_ref(); + match j { + 0 => assert_eq!( + 1663840496100023100 / 1_000_000, + extract_ts_millis_value(v.as_ref().unwrap()) + ), + 1 => assert_eq!( + 1663840496400340001 / 1_000_000, + extract_ts_millis_value(v.as_ref().unwrap()) + ), + _ => panic!(), + } + } + } + _ => panic!(), + } + } + } + + fn assert_monitor2_rows(rows: &Option) { + let rows = rows.as_ref().unwrap(); + let schema = &rows.schema; + let rows = &rows.rows; + assert_eq!(4, schema.len()); + assert_eq!(2, rows.len()); + + for (i, column_schema) in schema.iter().enumerate() { + match &column_schema.column_name[..] { + "host" => { + assert_eq!(ColumnDataType::String as i32, column_schema.datatype); + assert_eq!(SemanticType::Tag as i32, column_schema.semantic_type); + + for (j, row) in rows.iter().enumerate() { + let v = row.values[i].value_data.as_ref().unwrap(); + match j { + 0 => assert_eq!("host3", extract_string_value(v)), + 1 => assert_eq!("host4", extract_string_value(v)), + _ => panic!(), + } + } + } + "cpu" => { + assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype); + assert_eq!(SemanticType::Field as i32, column_schema.semantic_type); + + for (j, row) in rows.iter().enumerate() { + let v = row.values[i].value_data.as_ref(); + match j { + 0 => assert_eq!(66.5f64, extract_f64_value(v.as_ref().unwrap())), + 1 => assert_eq!(66.3f64, extract_f64_value(v.as_ref().unwrap())), + _ => panic!(), + } + } + } + "memory" => { + assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype); + assert_eq!(SemanticType::Field as i32, column_schema.semantic_type); + + for (j, row) in rows.iter().enumerate() { + let v = row.values[i].value_data.as_ref(); + match j { + 0 => assert_eq!(None, v), + 1 => assert_eq!(1029f64, extract_f64_value(v.as_ref().unwrap())), + _ => panic!(), + } + } + } + "ts" => { + assert_eq!( + ColumnDataType::TimestampMillisecond as i32, + column_schema.datatype + ); + assert_eq!(SemanticType::Timestamp as i32, column_schema.semantic_type); + + for (j, row) in rows.iter().enumerate() { + let v = row.values[i].value_data.as_ref(); + match j { + 0 => assert_eq!( + 1663840496100023102 / 1_000_000, + extract_ts_millis_value(v.as_ref().unwrap()) + ), + 1 => assert_eq!( + 1663840496400340003 / 1_000_000, + extract_ts_millis_value(v.as_ref().unwrap()) + ), + _ => panic!(), + } + } + } + _ => panic!(), + } + } + } + + fn extract_string_value(value: &ValueData) -> &str { + match value { + ValueData::StringValue(v) => v, + _ => panic!(), + } + } + + fn extract_f64_value(value: &ValueData) -> f64 { + match value { + ValueData::F64Value(v) => *v, + _ => panic!(), + } + } + + fn extract_ts_millis_value(value: &ValueData) -> i64 { + match value { + ValueData::TsMillisecondValue(v) => *v, + _ => panic!(), + } + } } diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index 14172bce0b..796303a888 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -62,7 +62,7 @@ pub trait ScriptHandler { pub trait InfluxdbLineProtocolHandler { /// A successful request will not return a response. /// Only on error will the socket return a line of data. - async fn exec(&self, request: &InfluxdbRequest, ctx: QueryContextRef) -> Result<()>; + async fn exec(&self, request: InfluxdbRequest, ctx: QueryContextRef) -> Result<()>; } #[async_trait] diff --git a/src/servers/tests/http/influxdb_test.rs b/src/servers/tests/http/influxdb_test.rs index ca26bbe8d9..3e2db63118 100644 --- a/src/servers/tests/http/influxdb_test.rs +++ b/src/servers/tests/http/influxdb_test.rs @@ -53,7 +53,7 @@ impl GrpcQueryHandler for DummyInstance { #[async_trait] impl InfluxdbLineProtocolHandler for DummyInstance { - async fn exec(&self, request: &InfluxdbRequest, ctx: QueryContextRef) -> Result<()> { + async fn exec(&self, request: InfluxdbRequest, ctx: QueryContextRef) -> Result<()> { let requests: InsertRequests = request.try_into()?; for expr in requests.inserts { let _ = self diff --git a/tests-integration/src/influxdb.rs b/tests-integration/src/influxdb.rs index 59d81457d7..aa3f319e66 100644 --- a/tests-integration/src/influxdb.rs +++ b/tests-integration/src/influxdb.rs @@ -64,7 +64,7 @@ monitor1,host=host2 memory=1027"; precision: None, lines: lines.to_string(), }; - assert!(instance.exec(&request, QueryContext::arc()).await.is_ok()); + assert!(instance.exec(request, QueryContext::arc()).await.is_ok()); let mut output = instance .do_query( @@ -93,7 +93,7 @@ monitor1,host=host2 memory=1027 1663840496400340001"; precision: None, lines: lines.to_string(), }; - instance.exec(&request, QueryContext::arc()).await.unwrap(); + instance.exec(request, QueryContext::arc()).await.unwrap(); let mut output = instance .do_query(