diff --git a/Cargo.lock b/Cargo.lock index b5ce49cf0c..8857459182 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1121,6 +1121,22 @@ dependencies = [ "tower", ] +[[package]] +name = "common-insert" +version = "0.1.0" +dependencies = [ + "api", + "async-trait", + "common-base", + "common-error", + "common-query", + "common-telemetry", + "common-time", + "datatypes", + "snafu", + "table", +] + [[package]] name = "common-query" version = "0.1.0" @@ -1635,6 +1651,7 @@ dependencies = [ "common-catalog", "common-error", "common-grpc", + "common-insert", "common-query", "common-recordbatch", "common-runtime", diff --git a/Cargo.toml b/Cargo.toml index c84de7f8b2..ad51afb707 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ members = [ "src/common/recordbatch", "src/common/runtime", "src/common/substrait", + "src/common/insert", "src/common/telemetry", "src/common/time", "src/datanode", diff --git a/src/common/insert/Cargo.toml b/src/common/insert/Cargo.toml new file mode 100644 index 0000000000..b6d28c3680 --- /dev/null +++ b/src/common/insert/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "common-insert" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +api = { path = "../../api" } +async-trait = "0.1" +common-base = { path = "../base" } +common-error = { path = "../error" } +common-telemetry = { path = "../telemetry" } +common-time = { path = "../time" } +common-query = { path = "../query" } +datatypes = { path = "../../datatypes" } +snafu = { version = "0.7", features = ["backtraces"] } +table = { path = "../../table" } diff --git a/src/common/insert/src/error.rs b/src/common/insert/src/error.rs new file mode 100644 index 0000000000..1702be8539 --- /dev/null +++ b/src/common/insert/src/error.rs @@ -0,0 +1,72 @@ +use std::any::Any; + +use api::DecodeError; +use common_error::ext::ErrorExt; +use common_error::prelude::{Snafu, StatusCode}; +use snafu::{Backtrace, ErrorCompat}; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Column {} not found in table {}", column_name, table_name))] + ColumnNotFound { + column_name: String, + table_name: String, + }, + + #[snafu(display("Failed to convert bytes to insert batch, source: {}", source))] + DecodeInsert { source: DecodeError }, + + #[snafu(display("Illegal insert data"))] + IllegalInsertData, + + #[snafu(display("Column datatype error, source: {}", source))] + ColumnDataType { + #[snafu(backtrace)] + source: api::error::Error, + }, + + #[snafu(display("Failed to create schema when creating table, source: {}", source))] + CreateSchema { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + + #[snafu(display( + "Duplicated timestamp column in gRPC requests, exists {}, duplicated: {}", + exists, + duplicated + ))] + DuplicatedTimestampColumn { + exists: String, + duplicated: String, + backtrace: Backtrace, + }, + + #[snafu(display("Missing timestamp column in request"))] + MissingTimestampColumn { backtrace: Backtrace }, +} + +pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + match self { + Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound, + Error::DecodeInsert { .. } | Error::IllegalInsertData { .. } => { + StatusCode::InvalidArguments + } + Error::ColumnDataType { .. } => StatusCode::Internal, + Error::CreateSchema { .. } + | Error::DuplicatedTimestampColumn { .. } + | Error::MissingTimestampColumn { .. } => StatusCode::InvalidArguments, + } + } + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/datanode/src/server/grpc/insert.rs b/src/common/insert/src/insert.rs similarity index 97% rename from src/datanode/src/server/grpc/insert.rs rename to src/common/insert/src/insert.rs index 817f55ea44..cc7c9eea91 100644 --- a/src/datanode/src/server/grpc/insert.rs +++ b/src/common/insert/src/insert.rs @@ -24,15 +24,17 @@ use table::{ Table, }; -use crate::error::{self, ColumnNotFoundSnafu, DecodeInsertSnafu, IllegalInsertDataSnafu, Result}; +use crate::error::{ + ColumnDataTypeSnafu, ColumnNotFoundSnafu, CreateSchemaSnafu, DecodeInsertSnafu, + DuplicatedTimestampColumnSnafu, IllegalInsertDataSnafu, MissingTimestampColumnSnafu, Result, +}; const TAG_SEMANTIC_TYPE: i32 = SemanticType::Tag as i32; const TIMESTAMP_SEMANTIC_TYPE: i32 = SemanticType::Timestamp as i32; #[inline] fn build_column_schema(column_name: &str, datatype: i32, nullable: bool) -> Result { - let datatype_wrapper = - ColumnDataTypeWrapper::try_new(datatype).context(error::ColumnDataTypeSnafu)?; + let datatype_wrapper = ColumnDataTypeWrapper::try_new(datatype).context(ColumnDataTypeSnafu)?; Ok(ColumnSchema::new( column_name, @@ -127,7 +129,7 @@ pub fn build_create_table_request( TIMESTAMP_SEMANTIC_TYPE => { ensure!( timestamp_index == usize::MAX, - error::DuplicatedTimestampColumnSnafu { + DuplicatedTimestampColumnSnafu { exists: &columns[timestamp_index].column_name, duplicated: column_name, } @@ -145,17 +147,14 @@ pub fn build_create_table_request( } } - ensure!( - timestamp_index != usize::MAX, - error::MissingTimestampColumnSnafu - ); + ensure!(timestamp_index != usize::MAX, MissingTimestampColumnSnafu); let schema = Arc::new( SchemaBuilder::try_from(column_schemas) .unwrap() .timestamp_index(Some(timestamp_index)) .build() - .context(error::CreateSchemaSnafu)?, + .context(CreateSchemaSnafu)?, ); return Ok(CreateTableRequest { @@ -172,7 +171,7 @@ pub fn build_create_table_request( }); } - error::IllegalInsertDataSnafu.fail() + IllegalInsertDataSnafu.fail() } pub fn insertion_expr_to_request( @@ -357,7 +356,8 @@ fn is_null(null_mask: &BitVec, idx: usize) -> Option { #[cfg(test)] mod tests { - use std::{any::Any, sync::Arc}; + use std::any::Any; + use std::sync::Arc; use api::v1::{ codec::InsertBatch, diff --git a/src/common/insert/src/lib.rs b/src/common/insert/src/lib.rs new file mode 100644 index 0000000000..2d36dfa33c --- /dev/null +++ b/src/common/insert/src/lib.rs @@ -0,0 +1,6 @@ +pub mod error; +mod insert; +pub use insert::{ + build_alter_table_request, build_create_table_request, find_new_columns, insert_batches, + insertion_expr_to_request, +}; diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 0f6b1ae5d6..0d322df754 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -22,6 +22,7 @@ common-recordbatch = { path = "../common/recordbatch" } common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } +common-insert = { path = "../common/insert" } datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [ "simd", ] } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index b8ec95b9be..a4b6a8ffaa 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -1,6 +1,5 @@ use std::any::Any; -use api::serde::DecodeError; use common_error::prelude::*; use datatypes::arrow::error::ArrowError; use storage::error::Error as StorageError; @@ -73,9 +72,6 @@ pub enum Error { #[snafu(display("Missing required field in protobuf, field: {}", field))] MissingField { field: String, backtrace: Backtrace }, - #[snafu(display("Missing timestamp column in request"))] - MissingTimestampColumn { backtrace: Backtrace }, - #[snafu(display( "Columns and values number mismatch, columns: {}, values: {}", columns, @@ -96,12 +92,6 @@ pub enum Error { source: TableError, }, - #[snafu(display("Illegal insert data"))] - IllegalInsertData, - - #[snafu(display("Failed to convert bytes to insert batch, source: {}", source))] - DecodeInsert { source: DecodeError }, - #[snafu(display("Failed to start server, source: {}", source))] StartServer { #[snafu(backtrace)] @@ -258,17 +248,6 @@ pub enum Error { source: datatypes::error::Error, }, - #[snafu(display( - "Duplicated timestamp column in gRPC requests, exists {}, duplicated: {}", - exists, - duplicated - ))] - DuplicatedTimestampColumn { - exists: String, - duplicated: String, - backtrace: Backtrace, - }, - #[snafu(display("Failed to access catalog, source: {}", source))] Catalog { #[snafu(backtrace)] @@ -286,6 +265,15 @@ pub enum Error { #[snafu(backtrace)] source: meta_client::error::Error, }, + + #[snafu(display("Failed to insert data, source: {}", source))] + InsertData { + #[snafu(backtrace)] + source: common_insert::error::Error, + }, + + #[snafu(display("Insert batch is empty"))] + EmptyInsertBatch, } pub type Result = std::result::Result; @@ -317,18 +305,14 @@ impl ErrorExt for Error { | Error::ConvertSchema { source, .. } => source.status_code(), Error::ColumnValuesNumberMismatch { .. } - | Error::IllegalInsertData { .. } - | Error::DecodeInsert { .. } | Error::InvalidSql { .. } | Error::KeyColumnNotFound { .. } | Error::InvalidPrimaryKey { .. } | Error::MissingField { .. } - | Error::MissingTimestampColumn { .. } | Error::CatalogNotFound { .. } | Error::SchemaNotFound { .. } | Error::ConstraintNotSupported { .. } - | Error::ParseTimestamp { .. } - | Error::DuplicatedTimestampColumn { .. } => StatusCode::InvalidArguments, + | Error::ParseTimestamp { .. } => StatusCode::InvalidArguments, // TODO(yingwen): Further categorize http error. Error::StartServer { .. } @@ -354,6 +338,8 @@ impl ErrorExt for Error { Error::ArrowComputation { .. } => StatusCode::Unexpected, Error::MetaClientInit { source, .. } => source.status_code(), + Error::InsertData { source, .. } => source.status_code(), + Error::EmptyInsertBatch => StatusCode::InvalidArguments, } } diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index 9c37e4eceb..d21d4b7b4f 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -17,12 +17,11 @@ use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::requests::AddColumnRequest; use crate::error::{ - self, CatalogSnafu, DecodeLogicalPlanSnafu, ExecuteSqlSnafu, InsertSnafu, Result, - TableNotFoundSnafu, UnsupportedExprSnafu, + CatalogSnafu, DecodeLogicalPlanSnafu, EmptyInsertBatchSnafu, ExecuteSqlSnafu, InsertDataSnafu, + InsertSnafu, Result, TableNotFoundSnafu, UnsupportedExprSnafu, }; use crate::instance::Instance; use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder}; -use crate::server::grpc::insert::{self, insertion_expr_to_request}; use crate::server::grpc::plan::PhysicalPlanner; use crate::server::grpc::select::to_object_result; use crate::sql::SqlRequest; @@ -38,7 +37,7 @@ impl Instance { .map(|req| req.column_schema.name.clone()) .collect::>(); - let alter_request = insert::build_alter_table_request(table_name, add_columns); + let alter_request = common_insert::build_alter_table_request(table_name, add_columns); debug!( "Adding new columns: {:?} to table: {}", @@ -70,13 +69,14 @@ impl Instance { .next_table_id() .await .context(CatalogSnafu)?; - let create_table_request = insert::build_create_table_request( + let create_table_request = common_insert::build_create_table_request( catalog_name, schema_name, table_id, table_name, insert_batches, - )?; + ) + .context(InsertDataSnafu)?; info!( "Try to create table: {} automatically with request: {:?}", @@ -111,12 +111,16 @@ impl Instance { .expect("default schema must exist") .unwrap(); - let insert_batches = insert::insert_batches(values.values)?; - ensure!(!insert_batches.is_empty(), error::IllegalInsertDataSnafu); + let insert_batches = + common_insert::insert_batches(values.values).context(InsertDataSnafu)?; + + ensure!(!insert_batches.is_empty(), EmptyInsertBatchSnafu); let table = if let Some(table) = schema_provider.table(table_name).context(CatalogSnafu)? { let schema = table.schema(); - if let Some(add_columns) = insert::find_new_columns(&schema, &insert_batches)? { + if let Some(add_columns) = common_insert::find_new_columns(&schema, &insert_batches) + .context(InsertDataSnafu)? + { self.add_new_columns_to_table(table_name, add_columns) .await?; } @@ -137,7 +141,9 @@ impl Instance { .context(TableNotFoundSnafu { table_name })? }; - let insert = insertion_expr_to_request(table_name, insert_batches, table.clone())?; + let insert = + common_insert::insertion_expr_to_request(table_name, insert_batches, table.clone()) + .context(InsertDataSnafu)?; let affected_rows = table .insert(insert) diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index 051cd4e1e4..7805b0093f 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -1,5 +1,4 @@ mod ddl; pub(crate) mod handler; -pub(crate) mod insert; pub(crate) mod plan; pub mod select;