refactor: move insert-to-create logic to a separate crate (#447)

This commit is contained in:
Lei, Huang
2022-11-10 17:16:40 +08:00
committed by GitHub
parent 23f0320ffb
commit 952e646e1d
10 changed files with 154 additions and 48 deletions

17
Cargo.lock generated
View File

@@ -1121,6 +1121,22 @@ dependencies = [
"tower",
]
[[package]]
name = "common-insert"
version = "0.1.0"
dependencies = [
"api",
"async-trait",
"common-base",
"common-error",
"common-query",
"common-telemetry",
"common-time",
"datatypes",
"snafu",
"table",
]
[[package]]
name = "common-query"
version = "0.1.0"
@@ -1635,6 +1651,7 @@ dependencies = [
"common-catalog",
"common-error",
"common-grpc",
"common-insert",
"common-query",
"common-recordbatch",
"common-runtime",

View File

@@ -15,6 +15,7 @@ members = [
"src/common/recordbatch",
"src/common/runtime",
"src/common/substrait",
"src/common/insert",
"src/common/telemetry",
"src/common/time",
"src/datanode",

View File

@@ -0,0 +1,18 @@
[package]
name = "common-insert"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
api = { path = "../../api" }
async-trait = "0.1"
common-base = { path = "../base" }
common-error = { path = "../error" }
common-telemetry = { path = "../telemetry" }
common-time = { path = "../time" }
common-query = { path = "../query" }
datatypes = { path = "../../datatypes" }
snafu = { version = "0.7", features = ["backtraces"] }
table = { path = "../../table" }

View File

@@ -0,0 +1,72 @@
use std::any::Any;
use api::DecodeError;
use common_error::ext::ErrorExt;
use common_error::prelude::{Snafu, StatusCode};
use snafu::{Backtrace, ErrorCompat};
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Column {} not found in table {}", column_name, table_name))]
ColumnNotFound {
column_name: String,
table_name: String,
},
#[snafu(display("Failed to convert bytes to insert batch, source: {}", source))]
DecodeInsert { source: DecodeError },
#[snafu(display("Illegal insert data"))]
IllegalInsertData,
#[snafu(display("Column datatype error, source: {}", source))]
ColumnDataType {
#[snafu(backtrace)]
source: api::error::Error,
},
#[snafu(display("Failed to create schema when creating table, source: {}", source))]
CreateSchema {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display(
"Duplicated timestamp column in gRPC requests, exists {}, duplicated: {}",
exists,
duplicated
))]
DuplicatedTimestampColumn {
exists: String,
duplicated: String,
backtrace: Backtrace,
},
#[snafu(display("Missing timestamp column in request"))]
MissingTimestampColumn { backtrace: Backtrace },
}
pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound,
Error::DecodeInsert { .. } | Error::IllegalInsertData { .. } => {
StatusCode::InvalidArguments
}
Error::ColumnDataType { .. } => StatusCode::Internal,
Error::CreateSchema { .. }
| Error::DuplicatedTimestampColumn { .. }
| Error::MissingTimestampColumn { .. } => StatusCode::InvalidArguments,
}
}
fn backtrace_opt(&self) -> Option<&Backtrace> {
ErrorCompat::backtrace(self)
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -24,15 +24,17 @@ use table::{
Table,
};
use crate::error::{self, ColumnNotFoundSnafu, DecodeInsertSnafu, IllegalInsertDataSnafu, Result};
use crate::error::{
ColumnDataTypeSnafu, ColumnNotFoundSnafu, CreateSchemaSnafu, DecodeInsertSnafu,
DuplicatedTimestampColumnSnafu, IllegalInsertDataSnafu, MissingTimestampColumnSnafu, Result,
};
const TAG_SEMANTIC_TYPE: i32 = SemanticType::Tag as i32;
const TIMESTAMP_SEMANTIC_TYPE: i32 = SemanticType::Timestamp as i32;
#[inline]
fn build_column_schema(column_name: &str, datatype: i32, nullable: bool) -> Result<ColumnSchema> {
let datatype_wrapper =
ColumnDataTypeWrapper::try_new(datatype).context(error::ColumnDataTypeSnafu)?;
let datatype_wrapper = ColumnDataTypeWrapper::try_new(datatype).context(ColumnDataTypeSnafu)?;
Ok(ColumnSchema::new(
column_name,
@@ -127,7 +129,7 @@ pub fn build_create_table_request(
TIMESTAMP_SEMANTIC_TYPE => {
ensure!(
timestamp_index == usize::MAX,
error::DuplicatedTimestampColumnSnafu {
DuplicatedTimestampColumnSnafu {
exists: &columns[timestamp_index].column_name,
duplicated: column_name,
}
@@ -145,17 +147,14 @@ pub fn build_create_table_request(
}
}
ensure!(
timestamp_index != usize::MAX,
error::MissingTimestampColumnSnafu
);
ensure!(timestamp_index != usize::MAX, MissingTimestampColumnSnafu);
let schema = Arc::new(
SchemaBuilder::try_from(column_schemas)
.unwrap()
.timestamp_index(Some(timestamp_index))
.build()
.context(error::CreateSchemaSnafu)?,
.context(CreateSchemaSnafu)?,
);
return Ok(CreateTableRequest {
@@ -172,7 +171,7 @@ pub fn build_create_table_request(
});
}
error::IllegalInsertDataSnafu.fail()
IllegalInsertDataSnafu.fail()
}
pub fn insertion_expr_to_request(
@@ -357,7 +356,8 @@ fn is_null(null_mask: &BitVec, idx: usize) -> Option<bool> {
#[cfg(test)]
mod tests {
use std::{any::Any, sync::Arc};
use std::any::Any;
use std::sync::Arc;
use api::v1::{
codec::InsertBatch,

View File

@@ -0,0 +1,6 @@
pub mod error;
mod insert;
pub use insert::{
build_alter_table_request, build_create_table_request, find_new_columns, insert_batches,
insertion_expr_to_request,
};

View File

@@ -22,6 +22,7 @@ common-recordbatch = { path = "../common/recordbatch" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
common-insert = { path = "../common/insert" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [
"simd",
] }

View File

@@ -1,6 +1,5 @@
use std::any::Any;
use api::serde::DecodeError;
use common_error::prelude::*;
use datatypes::arrow::error::ArrowError;
use storage::error::Error as StorageError;
@@ -73,9 +72,6 @@ pub enum Error {
#[snafu(display("Missing required field in protobuf, field: {}", field))]
MissingField { field: String, backtrace: Backtrace },
#[snafu(display("Missing timestamp column in request"))]
MissingTimestampColumn { backtrace: Backtrace },
#[snafu(display(
"Columns and values number mismatch, columns: {}, values: {}",
columns,
@@ -96,12 +92,6 @@ pub enum Error {
source: TableError,
},
#[snafu(display("Illegal insert data"))]
IllegalInsertData,
#[snafu(display("Failed to convert bytes to insert batch, source: {}", source))]
DecodeInsert { source: DecodeError },
#[snafu(display("Failed to start server, source: {}", source))]
StartServer {
#[snafu(backtrace)]
@@ -258,17 +248,6 @@ pub enum Error {
source: datatypes::error::Error,
},
#[snafu(display(
"Duplicated timestamp column in gRPC requests, exists {}, duplicated: {}",
exists,
duplicated
))]
DuplicatedTimestampColumn {
exists: String,
duplicated: String,
backtrace: Backtrace,
},
#[snafu(display("Failed to access catalog, source: {}", source))]
Catalog {
#[snafu(backtrace)]
@@ -286,6 +265,15 @@ pub enum Error {
#[snafu(backtrace)]
source: meta_client::error::Error,
},
#[snafu(display("Failed to insert data, source: {}", source))]
InsertData {
#[snafu(backtrace)]
source: common_insert::error::Error,
},
#[snafu(display("Insert batch is empty"))]
EmptyInsertBatch,
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -317,18 +305,14 @@ impl ErrorExt for Error {
| Error::ConvertSchema { source, .. } => source.status_code(),
Error::ColumnValuesNumberMismatch { .. }
| Error::IllegalInsertData { .. }
| Error::DecodeInsert { .. }
| Error::InvalidSql { .. }
| Error::KeyColumnNotFound { .. }
| Error::InvalidPrimaryKey { .. }
| Error::MissingField { .. }
| Error::MissingTimestampColumn { .. }
| Error::CatalogNotFound { .. }
| Error::SchemaNotFound { .. }
| Error::ConstraintNotSupported { .. }
| Error::ParseTimestamp { .. }
| Error::DuplicatedTimestampColumn { .. } => StatusCode::InvalidArguments,
| Error::ParseTimestamp { .. } => StatusCode::InvalidArguments,
// TODO(yingwen): Further categorize http error.
Error::StartServer { .. }
@@ -354,6 +338,8 @@ impl ErrorExt for Error {
Error::ArrowComputation { .. } => StatusCode::Unexpected,
Error::MetaClientInit { source, .. } => source.status_code(),
Error::InsertData { source, .. } => source.status_code(),
Error::EmptyInsertBatch => StatusCode::InvalidArguments,
}
}

View File

@@ -17,12 +17,11 @@ use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::requests::AddColumnRequest;
use crate::error::{
self, CatalogSnafu, DecodeLogicalPlanSnafu, ExecuteSqlSnafu, InsertSnafu, Result,
TableNotFoundSnafu, UnsupportedExprSnafu,
CatalogSnafu, DecodeLogicalPlanSnafu, EmptyInsertBatchSnafu, ExecuteSqlSnafu, InsertDataSnafu,
InsertSnafu, Result, TableNotFoundSnafu, UnsupportedExprSnafu,
};
use crate::instance::Instance;
use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder};
use crate::server::grpc::insert::{self, insertion_expr_to_request};
use crate::server::grpc::plan::PhysicalPlanner;
use crate::server::grpc::select::to_object_result;
use crate::sql::SqlRequest;
@@ -38,7 +37,7 @@ impl Instance {
.map(|req| req.column_schema.name.clone())
.collect::<Vec<_>>();
let alter_request = insert::build_alter_table_request(table_name, add_columns);
let alter_request = common_insert::build_alter_table_request(table_name, add_columns);
debug!(
"Adding new columns: {:?} to table: {}",
@@ -70,13 +69,14 @@ impl Instance {
.next_table_id()
.await
.context(CatalogSnafu)?;
let create_table_request = insert::build_create_table_request(
let create_table_request = common_insert::build_create_table_request(
catalog_name,
schema_name,
table_id,
table_name,
insert_batches,
)?;
)
.context(InsertDataSnafu)?;
info!(
"Try to create table: {} automatically with request: {:?}",
@@ -111,12 +111,16 @@ impl Instance {
.expect("default schema must exist")
.unwrap();
let insert_batches = insert::insert_batches(values.values)?;
ensure!(!insert_batches.is_empty(), error::IllegalInsertDataSnafu);
let insert_batches =
common_insert::insert_batches(values.values).context(InsertDataSnafu)?;
ensure!(!insert_batches.is_empty(), EmptyInsertBatchSnafu);
let table = if let Some(table) = schema_provider.table(table_name).context(CatalogSnafu)? {
let schema = table.schema();
if let Some(add_columns) = insert::find_new_columns(&schema, &insert_batches)? {
if let Some(add_columns) = common_insert::find_new_columns(&schema, &insert_batches)
.context(InsertDataSnafu)?
{
self.add_new_columns_to_table(table_name, add_columns)
.await?;
}
@@ -137,7 +141,9 @@ impl Instance {
.context(TableNotFoundSnafu { table_name })?
};
let insert = insertion_expr_to_request(table_name, insert_batches, table.clone())?;
let insert =
common_insert::insertion_expr_to_request(table_name, insert_batches, table.clone())
.context(InsertDataSnafu)?;
let affected_rows = table
.insert(insert)

View File

@@ -1,5 +1,4 @@
mod ddl;
pub(crate) mod handler;
pub(crate) mod insert;
pub(crate) mod plan;
pub mod select;