feat: impl insert data from query (#1025)

* feat: refactor insertion in datanode

* feat: supports inserting data by select query

* feat: impl cast operation for vector

* feat: streaming insert from select query results

* chore: minor changes

* fix: remove unwrap

* test: insert_to_requsts

* test: test_execute_insert_by_select

* fix: cast operation for vectors

* fix: test

* fix: typo

* chore: by CR comments

* fix: test_statement_to_request
This commit is contained in:
dennis zhuang
2023-02-17 17:56:12 +08:00
committed by GitHub
parent 7787cfdd42
commit a9c8584c98
16 changed files with 769 additions and 119 deletions

View File

@@ -27,6 +27,7 @@ common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datafusion.workspace = true
datafusion-expr.workspace = true
datatypes = { path = "../datatypes" }
futures = "0.3"
hyper = { version = "0.14", features = ["full"] }

View File

@@ -15,6 +15,8 @@
use std::any::Any;
use common_error::prelude::*;
use common_recordbatch::error::Error as RecordBatchError;
use datatypes::prelude::ConcreteDataType;
use storage::error::Error as StorageError;
use table::error::Error as TableError;
@@ -101,12 +103,33 @@ pub enum Error {
))]
ColumnValuesNumberMismatch { columns: usize, values: usize },
#[snafu(display(
"Column type mismatch, column: {}, expected type: {:?}, actual: {:?}",
column,
expected,
actual,
))]
ColumnTypeMismatch {
column: String,
expected: ConcreteDataType,
actual: ConcreteDataType,
},
#[snafu(display("Failed to collect record batch, source: {}", source))]
CollectRecords {
#[snafu(backtrace)]
source: RecordBatchError,
},
#[snafu(display("Failed to parse sql value, source: {}", source))]
ParseSqlValue {
#[snafu(backtrace)]
source: sql::error::Error,
},
#[snafu(display("Missing insert body"))]
MissingInsertBody { backtrace: Backtrace },
#[snafu(display("Failed to insert value to table: {}, source: {}", table_name, source))]
Insert {
table_name: String,
@@ -338,73 +361,71 @@ pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
Error::ExecuteSql { source } | Error::DescribeStatement { source } => {
ExecuteSql { source } | DescribeStatement { source } => source.status_code(),
DecodeLogicalPlan { source } => source.status_code(),
NewCatalog { source } | RegisterSchema { source } => source.status_code(),
FindTable { source, .. } => source.status_code(),
CreateTable { source, .. } | GetTable { source, .. } | AlterTable { source, .. } => {
source.status_code()
}
Error::DecodeLogicalPlan { source } => source.status_code(),
Error::NewCatalog { source } | Error::RegisterSchema { source } => source.status_code(),
Error::FindTable { source, .. } => source.status_code(),
Error::CreateTable { source, .. }
| Error::GetTable { source, .. }
| Error::AlterTable { source, .. } => source.status_code(),
Error::DropTable { source, .. } => source.status_code(),
DropTable { source, .. } => source.status_code(),
Error::Insert { source, .. } => source.status_code(),
Error::Delete { source, .. } => source.status_code(),
Insert { source, .. } => source.status_code(),
Delete { source, .. } => source.status_code(),
CollectRecords { source, .. } => source.status_code(),
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound,
TableNotFound { .. } => StatusCode::TableNotFound,
ColumnNotFound { .. } => StatusCode::TableColumnNotFound,
Error::ParseSqlValue { source, .. } | Error::ParseSql { source, .. } => {
source.status_code()
}
ParseSqlValue { source, .. } | ParseSql { source, .. } => source.status_code(),
Error::AlterExprToRequest { source, .. }
| Error::CreateExprToRequest { source }
| Error::InsertData { source } => source.status_code(),
AlterExprToRequest { source, .. }
| CreateExprToRequest { source }
| InsertData { source } => source.status_code(),
Error::ConvertSchema { source, .. } | Error::VectorComputation { source } => {
source.status_code()
}
ConvertSchema { source, .. } | VectorComputation { source } => source.status_code(),
Error::ColumnValuesNumberMismatch { .. }
| Error::InvalidSql { .. }
| Error::NotSupportSql { .. }
| Error::KeyColumnNotFound { .. }
| Error::IllegalPrimaryKeysDef { .. }
| Error::MissingTimestampColumn { .. }
| Error::CatalogNotFound { .. }
| Error::SchemaNotFound { .. }
| Error::ConstraintNotSupported { .. }
| Error::SchemaExists { .. }
| Error::ParseTimestamp { .. }
| Error::DatabaseNotFound { .. } => StatusCode::InvalidArguments,
ColumnValuesNumberMismatch { .. }
| ColumnTypeMismatch { .. }
| InvalidSql { .. }
| NotSupportSql { .. }
| KeyColumnNotFound { .. }
| IllegalPrimaryKeysDef { .. }
| MissingTimestampColumn { .. }
| CatalogNotFound { .. }
| SchemaNotFound { .. }
| ConstraintNotSupported { .. }
| SchemaExists { .. }
| ParseTimestamp { .. }
| MissingInsertBody { .. }
| DatabaseNotFound { .. }
| MissingNodeId { .. }
| MissingMetasrvOpts { .. }
| ColumnNoneDefaultValue { .. } => StatusCode::InvalidArguments,
// TODO(yingwen): Further categorize http error.
Error::StartServer { .. }
| Error::ParseAddr { .. }
| Error::TcpBind { .. }
| Error::StartGrpc { .. }
| Error::CreateDir { .. }
| Error::InsertSystemCatalog { .. }
| Error::RenameTable { .. }
| Error::Catalog { .. }
| Error::MissingRequiredField { .. }
| Error::IncorrectInternalState { .. } => StatusCode::Internal,
StartServer { .. }
| ParseAddr { .. }
| TcpBind { .. }
| StartGrpc { .. }
| CreateDir { .. }
| InsertSystemCatalog { .. }
| RenameTable { .. }
| Catalog { .. }
| MissingRequiredField { .. }
| IncorrectInternalState { .. } => StatusCode::Internal,
Error::InitBackend { .. } => StatusCode::StorageUnavailable,
Error::OpenLogStore { source } => source.status_code(),
Error::StartScriptManager { source } => source.status_code(),
Error::OpenStorageEngine { source } => source.status_code(),
Error::RuntimeResource { .. } => StatusCode::RuntimeResourcesExhausted,
Error::MetaClientInit { source, .. } => source.status_code(),
Error::TableIdProviderNotFound { .. } => StatusCode::Unsupported,
Error::BumpTableId { source, .. } => source.status_code(),
Error::MissingNodeId { .. } => StatusCode::InvalidArguments,
Error::MissingMetasrvOpts { .. } => StatusCode::InvalidArguments,
Error::ColumnDefaultValue { source, .. } => source.status_code(),
Error::ColumnNoneDefaultValue { .. } => StatusCode::InvalidArguments,
InitBackend { .. } => StatusCode::StorageUnavailable,
OpenLogStore { source } => source.status_code(),
StartScriptManager { source } => source.status_code(),
OpenStorageEngine { source } => source.status_code(),
RuntimeResource { .. } => StatusCode::RuntimeResourcesExhausted,
MetaClientInit { source, .. } => source.status_code(),
TableIdProviderNotFound { .. } => StatusCode::Unsupported,
BumpTableId { source, .. } => source.status_code(),
ColumnDefaultValue { source, .. } => source.status_code(),
}
}

View File

@@ -21,6 +21,7 @@ use common_recordbatch::RecordBatches;
use common_telemetry::logging::info;
use common_telemetry::timer;
use datatypes::schema::Schema;
use futures::StreamExt;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use servers::error as server_error;
use servers::promql::PromqlHandler;
@@ -35,6 +36,7 @@ use table::requests::{CreateDatabaseRequest, DropTableRequest};
use crate::error::{self, BumpTableIdSnafu, ExecuteSqlSnafu, Result, TableIdProviderNotFoundSnafu};
use crate::instance::Instance;
use crate::metric;
use crate::sql::insert::InsertRequests;
use crate::sql::SqlRequest;
impl Instance {
@@ -56,15 +58,33 @@ impl Instance {
.context(ExecuteSqlSnafu)
}
QueryStatement::Sql(Statement::Insert(i)) => {
let (catalog, schema, table) =
table_idents_to_full_name(i.table_name(), query_ctx.clone())?;
let table_ref = TableReference::full(&catalog, &schema, &table);
let request = self.sql_handler.insert_to_request(
self.catalog_manager.clone(),
*i,
table_ref,
)?;
self.sql_handler.execute(request, query_ctx).await
let requests = self
.sql_handler
.insert_to_requests(self.catalog_manager.clone(), *i, query_ctx.clone())
.await?;
match requests {
InsertRequests::Request(request) => {
self.sql_handler.execute(request, query_ctx.clone()).await
}
InsertRequests::Stream(mut s) => {
let mut rows = 0;
while let Some(request) = s.next().await {
match self
.sql_handler
.execute(request?, query_ctx.clone())
.await?
{
Output::AffectedRows(n) => {
rows += n;
}
_ => unreachable!(),
}
}
Ok(Output::AffectedRows(rows))
}
}
}
QueryStatement::Sql(Statement::Delete(d)) => {
let request = SqlRequest::Delete(*d);

View File

@@ -34,7 +34,7 @@ mod alter;
mod create;
mod delete;
mod drop_table;
mod insert;
pub(crate) mod insert;
#[derive(Debug)]
pub enum SqlRequest {
@@ -142,6 +142,7 @@ mod tests {
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
use datatypes::value::Value;
use futures::StreamExt;
use log_store::NoopLogStore;
use mito::config::EngineConfig as TableEngineConfig;
use mito::engine::MitoEngine;
@@ -149,17 +150,19 @@ mod tests {
use object_store::ObjectStore;
use query::parser::{QueryLanguageParser, QueryStatement};
use query::QueryEngineFactory;
use session::context::QueryContext;
use sql::statements::statement::Statement;
use storage::compaction::noop::NoopCompactionScheduler;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use table::engine::TableReference;
use table::error::Result as TableResult;
use table::metadata::TableInfoRef;
use table::Table;
use tempdir::TempDir;
use super::*;
use crate::error::Error;
use crate::sql::insert::InsertRequests;
struct DemoTable;
@@ -255,11 +258,12 @@ mod tests {
}
};
let request = sql_handler
.insert_to_request(catalog_list.clone(), *stmt, TableReference::bare("demo"))
.insert_to_requests(catalog_list.clone(), *stmt, QueryContext::arc())
.await
.unwrap();
match request {
SqlRequest::Insert(req) => {
InsertRequests::Request(SqlRequest::Insert(req)) => {
assert_eq!(req.table_name, "demo");
let columns_values = req.columns_values;
assert_eq!(4, columns_values.len());
@@ -294,5 +298,63 @@ mod tests {
panic!("Not supposed to reach here")
}
}
// test inert into select
// type mismatch
let sql = "insert into demo(ts) select number from numbers limit 3";
let stmt = match QueryLanguageParser::parse_sql(sql).unwrap() {
QueryStatement::Sql(Statement::Insert(i)) => i,
_ => {
unreachable!()
}
};
let request = sql_handler
.insert_to_requests(catalog_list.clone(), *stmt, QueryContext::arc())
.await
.unwrap();
match request {
InsertRequests::Stream(mut stream) => {
assert!(matches!(
stream.next().await.unwrap().unwrap_err(),
Error::ColumnTypeMismatch { .. }
));
}
_ => unreachable!(),
}
let sql = "insert into demo(cpu) select cast(number as double) from numbers limit 3";
let stmt = match QueryLanguageParser::parse_sql(sql).unwrap() {
QueryStatement::Sql(Statement::Insert(i)) => i,
_ => {
unreachable!()
}
};
let request = sql_handler
.insert_to_requests(catalog_list.clone(), *stmt, QueryContext::arc())
.await
.unwrap();
match request {
InsertRequests::Stream(mut stream) => {
let mut times = 0;
while let Some(Ok(SqlRequest::Insert(req))) = stream.next().await {
times += 1;
assert_eq!(req.table_name, "demo");
let columns_values = req.columns_values;
assert_eq!(1, columns_values.len());
let memories = &columns_values["cpu"];
assert_eq!(3, memories.len());
assert_eq!(Value::from(0.0f64), memories.get(0));
assert_eq!(Value::from(1.0f64), memories.get(1));
assert_eq!(Value::from(2.0f64), memories.get(2));
}
assert_eq!(1, times);
}
_ => unreachable!(),
}
}
}

View File

@@ -11,28 +11,48 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::pin::Pin;
use catalog::CatalogManagerRef;
use common_query::Output;
use common_recordbatch::RecordBatch;
use datafusion_expr::type_coercion::binary::coerce_types;
use datafusion_expr::Operator;
use datatypes::data_type::DataType;
use datatypes::schema::ColumnSchema;
use datatypes::vectors::MutableVector;
use futures::stream::{self, StreamExt};
use futures::Stream;
use query::parser::QueryStatement;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use sql::ast::Value as SqlValue;
use sql::statements::insert::Insert;
use sql::statements::statement::Statement;
use sql::statements::{self};
use table::engine::TableReference;
use table::requests::*;
use table::TableRef;
use crate::error::{
CatalogSnafu, ColumnDefaultValueSnafu, ColumnNoneDefaultValueSnafu, ColumnNotFoundSnafu,
ColumnValuesNumberMismatchSnafu, InsertSnafu, ParseSqlSnafu, ParseSqlValueSnafu, Result,
TableNotFoundSnafu,
CatalogSnafu, CollectRecordsSnafu, ColumnDefaultValueSnafu, ColumnNoneDefaultValueSnafu,
ColumnNotFoundSnafu, ColumnTypeMismatchSnafu, ColumnValuesNumberMismatchSnafu, Error,
ExecuteSqlSnafu, InsertSnafu, MissingInsertBodySnafu, ParseSqlSnafu, ParseSqlValueSnafu,
Result, TableNotFoundSnafu,
};
use crate::sql::{SqlHandler, SqlRequest};
use crate::sql::{table_idents_to_full_name, SqlHandler, SqlRequest};
const DEFAULT_PLACEHOLDER_VALUE: &str = "default";
type InsertRequestStream = Pin<Box<dyn Stream<Item = Result<SqlRequest>> + Send>>;
pub(crate) enum InsertRequests {
// Single request
Request(SqlRequest),
// Streaming requests
Stream(InsertRequestStream),
}
impl SqlHandler {
pub(crate) async fn insert(&self, req: InsertRequest) -> Result<Output> {
// FIXME(dennis): table_ref is used in InsertSnafu and the req is consumed
@@ -52,21 +72,16 @@ impl SqlHandler {
Ok(Output::AffectedRows(affected_rows))
}
pub(crate) fn insert_to_request(
&self,
catalog_manager: CatalogManagerRef,
stmt: Insert,
fn build_request_from_values(
table_ref: TableReference,
table: &TableRef,
stmt: Insert,
) -> Result<SqlRequest> {
let values = stmt
.values_body()
.context(ParseSqlValueSnafu)?
.context(MissingInsertBodySnafu)?;
let columns = stmt.columns();
let values = stmt.values().context(ParseSqlValueSnafu)?;
let table = catalog_manager
.table(table_ref.catalog, table_ref.schema, table_ref.table)
.context(CatalogSnafu)?
.context(TableNotFoundSnafu {
table_name: table_ref.table,
})?;
let schema = table.schema();
let columns_num = if columns.is_empty() {
schema.column_schemas().len()
@@ -78,6 +93,7 @@ impl SqlHandler {
let mut columns_builders: Vec<(&ColumnSchema, Box<dyn MutableVector>)> =
Vec::with_capacity(columns_num);
// Initialize vectors
if columns.is_empty() {
for column_schema in schema.column_schemas() {
let data_type = &column_schema.data_type;
@@ -123,6 +139,167 @@ impl SqlHandler {
region_number: 0,
}))
}
fn build_request_from_batch(
stmt: Insert,
table: TableRef,
batch: RecordBatch,
query_ctx: QueryContextRef,
) -> Result<SqlRequest> {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(stmt.table_name(), query_ctx)?;
let schema = table.schema();
let columns: Vec<_> = if stmt.columns().is_empty() {
schema
.column_schemas()
.iter()
.map(|c| c.name.to_string())
.collect()
} else {
stmt.columns().iter().map(|c| (*c).clone()).collect()
};
let columns_num = columns.len();
ensure!(
batch.num_columns() == columns_num,
ColumnValuesNumberMismatchSnafu {
columns: columns_num,
values: batch.num_columns(),
}
);
let batch_schema = &batch.schema;
let batch_columns = batch_schema.column_schemas();
assert_eq!(batch_columns.len(), columns_num);
let mut columns_values = HashMap::with_capacity(columns_num);
for (i, column_name) in columns.into_iter().enumerate() {
let column_schema = schema
.column_schema_by_name(&column_name)
.with_context(|| ColumnNotFoundSnafu {
table_name: &table_name,
column_name: &column_name,
})?;
let expect_datatype = column_schema.data_type.as_arrow_type();
// It's safe to retrieve the column schema by index, we already
// check columns number is the same above.
let batch_datatype = batch_columns[i].data_type.as_arrow_type();
let coerced_type = coerce_types(&expect_datatype, &Operator::Eq, &batch_datatype)
.map_err(|_| Error::ColumnTypeMismatch {
column: column_name.clone(),
expected: column_schema.data_type.clone(),
actual: batch_columns[i].data_type.clone(),
})?;
ensure!(
expect_datatype == coerced_type,
ColumnTypeMismatchSnafu {
column: column_name,
expected: column_schema.data_type.clone(),
actual: batch_columns[i].data_type.clone(),
}
);
let vector = batch
.column(i)
.cast(&column_schema.data_type)
.map_err(|_| Error::ColumnTypeMismatch {
column: column_name.clone(),
expected: column_schema.data_type.clone(),
actual: batch_columns[i].data_type.clone(),
})?;
columns_values.insert(column_name, vector);
}
Ok(SqlRequest::Insert(InsertRequest {
catalog_name,
schema_name,
table_name,
columns_values,
region_number: 0,
}))
}
// FIXME(dennis): move it to frontend when refactor is done.
async fn build_stream_from_query(
&self,
table: TableRef,
stmt: Insert,
query_ctx: QueryContextRef,
) -> Result<InsertRequestStream> {
let query = stmt
.query_body()
.context(ParseSqlValueSnafu)?
.context(MissingInsertBodySnafu)?;
let logical_plan = self
.query_engine
.statement_to_plan(
QueryStatement::Sql(Statement::Query(Box::new(query))),
query_ctx.clone(),
)
.context(ExecuteSqlSnafu)?;
let output = self
.query_engine
.execute(&logical_plan)
.await
.context(ExecuteSqlSnafu)?;
let stream: InsertRequestStream = match output {
Output::RecordBatches(batches) => {
Box::pin(stream::iter(batches.take()).map(move |batch| {
Self::build_request_from_batch(
stmt.clone(),
table.clone(),
batch,
query_ctx.clone(),
)
}))
}
Output::Stream(stream) => Box::pin(stream.map(move |batch| {
Self::build_request_from_batch(
stmt.clone(),
table.clone(),
batch.context(CollectRecordsSnafu)?,
query_ctx.clone(),
)
})),
_ => unreachable!(),
};
Ok(stream)
}
pub(crate) async fn insert_to_requests(
&self,
catalog_manager: CatalogManagerRef,
stmt: Insert,
query_ctx: QueryContextRef,
) -> Result<InsertRequests> {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(stmt.table_name(), query_ctx.clone())?;
let table = catalog_manager
.table(&catalog_name, &schema_name, &table_name)
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_name.clone(),
})?;
if stmt.is_insert_select() {
Ok(InsertRequests::Stream(
self.build_stream_from_query(table, stmt, query_ctx).await?,
))
} else {
let table_ref = TableReference::full(&catalog_name, &schema_name, &table_name);
Ok(InsertRequests::Request(Self::build_request_from_values(
table_ref, &table, stmt,
)?))
}
}
}
fn add_row_to_vector(

View File

@@ -21,6 +21,7 @@ use datatypes::data_type::ConcreteDataType;
use datatypes::vectors::{Int64Vector, StringVector, UInt64Vector, VectorRef};
use session::context::QueryContext;
use crate::error::Error;
use crate::tests::test_util::{self, check_output_stream, setup_test_instance, MockInstance};
#[tokio::test(flavor = "multi_thread")]
@@ -181,6 +182,67 @@ async fn test_execute_insert() {
assert!(matches!(output, Output::AffectedRows(2)));
}
#[tokio::test(flavor = "multi_thread")]
async fn test_execute_insert_by_select() {
let instance = setup_test_instance("test_execute_insert_by_select").await;
// create table
execute_sql(
&instance,
"create table demo1(host string, cpu double, memory double, ts timestamp time index);",
)
.await;
execute_sql(
&instance,
"create table demo2(host string, cpu double, memory double, ts timestamp time index);",
)
.await;
let output = execute_sql(
&instance,
r#"insert into demo1(host, cpu, memory, ts) values
('host1', 66.6, 1024, 1655276557000),
('host2', 88.8, 333.3, 1655276558000)
"#,
)
.await;
assert!(matches!(output, Output::AffectedRows(2)));
assert!(matches!(
try_execute_sql(&instance, "insert into demo2(host) select * from demo1")
.await
.unwrap_err(),
Error::ColumnValuesNumberMismatch { .. }
));
assert!(matches!(
try_execute_sql(&instance, "insert into demo2 select cpu,memory from demo1")
.await
.unwrap_err(),
Error::ColumnValuesNumberMismatch { .. }
));
assert!(matches!(
try_execute_sql(&instance, "insert into demo2(ts) select memory from demo1")
.await
.unwrap_err(),
Error::ColumnTypeMismatch { .. }
));
let output = execute_sql(&instance, "insert into demo2 select * from demo1").await;
assert!(matches!(output, Output::AffectedRows(2)));
let output = execute_sql(&instance, "select * from demo2 order by ts").await;
let expected = "\
+-------+------+--------+---------------------+
| host | cpu | memory | ts |
+-------+------+--------+---------------------+
| host1 | 66.6 | 1024 | 2022-06-15T07:02:37 |
| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 |
+-------+------+--------+---------------------+"
.to_string();
check_output_stream(output, expected).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_execute_insert_query_with_i64_timestamp() {
let instance = MockInstance::new("insert_query_i64_timestamp").await;
@@ -707,6 +769,13 @@ async fn execute_sql(instance: &MockInstance, sql: &str) -> Output {
execute_sql_in_db(instance, sql, DEFAULT_SCHEMA_NAME).await
}
async fn try_execute_sql(
instance: &MockInstance,
sql: &str,
) -> Result<Output, crate::error::Error> {
try_execute_sql_in_db(instance, sql, DEFAULT_SCHEMA_NAME).await
}
async fn try_execute_sql_in_db(
instance: &MockInstance,
sql: &str,

View File

@@ -155,7 +155,7 @@ pub async fn check_output_stream(output: Output, expected: String) {
_ => unreachable!(),
};
let pretty_print = recordbatches.pretty_print().unwrap();
assert_eq!(pretty_print, expected);
assert_eq!(pretty_print, expected, "{}", pretty_print);
}
pub async fn check_unordered_output_stream(output: Output, expected: String) {