feat: execute "delete" in query engine (in the form of "LogicalPlan") (#1222)

fix: execute "delete" in query engine (in the form of "LogicalPlan")
This commit is contained in:
LFC
2023-03-24 12:11:58 +08:00
committed by GitHub
parent f1139fba59
commit 92963b9614
16 changed files with 179 additions and 336 deletions

View File

@@ -17,6 +17,7 @@ use std::any::Any;
use common_error::ext::BoxedError;
use common_error::prelude::*;
use datatypes::prelude::ConcreteDataType;
pub type Result<T> = std::result::Result<T, Error>;
@@ -70,6 +71,26 @@ pub enum Error {
source: datafusion_common::DataFusionError,
backtrace: Backtrace,
},
#[snafu(display("Column {} not exists in table {}", column_name, table_name))]
ColumnNotExists {
column_name: String,
table_name: String,
backtrace: Backtrace,
},
#[snafu(display(
"Failed to cast vector of type '{:?}' to type '{:?}', source: {}",
from_type,
to_type,
source
))]
CastVector {
from_type: ConcreteDataType,
to_type: ConcreteDataType,
#[snafu(backtrace)]
source: datatypes::error::Error,
},
}
impl ErrorExt for Error {
@@ -81,11 +102,14 @@ impl ErrorExt for Error {
| Error::CreateRecordBatches { .. }
| Error::PollStream { .. }
| Error::Format { .. }
| Error::InitRecordbatchStream { .. } => StatusCode::Internal,
| Error::InitRecordbatchStream { .. }
| Error::ColumnNotExists { .. } => StatusCode::Internal,
Error::External { source } => source.status_code(),
Error::SchemaConversion { source, .. } => source.status_code(),
Error::SchemaConversion { source, .. } | Error::CastVector { source, .. } => {
source.status_code()
}
}
}

View File

@@ -12,14 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use datatypes::schema::SchemaRef;
use datatypes::value::Value;
use datatypes::vectors::{Helper, VectorRef};
use serde::ser::{Error, SerializeStruct};
use serde::{Serialize, Serializer};
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use crate::error::{self, Result};
use crate::error::{self, CastVectorSnafu, ColumnNotExistsSnafu, Result};
use crate::DfRecordBatch;
/// A two-dimensional batch of column-oriented data with a defined schema.
@@ -108,6 +110,41 @@ impl RecordBatch {
pub fn rows(&self) -> RecordBatchRowIterator<'_> {
RecordBatchRowIterator::new(self)
}
pub fn column_vectors(
&self,
table_name: &str,
table_schema: SchemaRef,
) -> Result<HashMap<String, VectorRef>> {
let mut vectors = HashMap::with_capacity(self.num_columns());
// column schemas in recordbatch must match its vectors, otherwise it's corrupted
for (vector_schema, vector) in self.schema.column_schemas().iter().zip(self.columns.iter())
{
let column_name = &vector_schema.name;
let column_schema =
table_schema
.column_schema_by_name(column_name)
.context(ColumnNotExistsSnafu {
table_name,
column_name,
})?;
let vector = if vector_schema.data_type != column_schema.data_type {
vector
.cast(&column_schema.data_type)
.with_context(|_| CastVectorSnafu {
from_type: vector.data_type(),
to_type: column_schema.data_type.clone(),
})?
} else {
vector.clone()
};
vectors.insert(column_name.clone(), vector);
}
Ok(vectors)
}
}
impl Serialize for RecordBatch {

View File

@@ -53,10 +53,6 @@ impl Instance {
.await?;
self.sql_handler.insert(request).await
}
QueryStatement::Sql(Statement::Delete(delete)) => {
let request = SqlRequest::Delete(*delete);
self.sql_handler.execute(request, query_ctx).await
}
QueryStatement::Sql(Statement::CreateDatabase(create_database)) => {
let request = CreateDatabaseRequest {
db_name: create_database.name.to_string(),
@@ -185,6 +181,7 @@ impl Instance {
| QueryStatement::Sql(Statement::Explain(_))
| QueryStatement::Sql(Statement::Use(_))
| QueryStatement::Sql(Statement::Tql(_))
| QueryStatement::Sql(Statement::Delete(_))
| QueryStatement::Promql(_) => unreachable!(),
}
}

View File

@@ -20,7 +20,6 @@ use common_telemetry::error;
use query::sql::{describe_table, show_databases, show_tables};
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use sql::statements::delete::Delete;
use sql::statements::describe::DescribeTable;
use sql::statements::show::{ShowDatabases, ShowTables};
use table::engine::{EngineContext, TableEngineProcedureRef, TableEngineRef, TableReference};
@@ -36,7 +35,6 @@ mod alter;
mod copy_table_from;
mod copy_table_to;
mod create;
mod delete;
mod drop_table;
mod flush_table;
pub(crate) mod insert;
@@ -51,7 +49,6 @@ pub enum SqlRequest {
ShowDatabases(ShowDatabases),
ShowTables(ShowTables),
DescribeTable(DescribeTable),
Delete(Delete),
CopyTable(CopyTableRequest),
}
@@ -89,7 +86,6 @@ impl SqlHandler {
SqlRequest::CreateDatabase(req) => self.create_database(req, query_ctx.clone()).await,
SqlRequest::Alter(req) => self.alter(req).await,
SqlRequest::DropTable(req) => self.drop_table(req).await,
SqlRequest::Delete(req) => self.delete(query_ctx.clone(), req).await,
SqlRequest::CopyTable(req) => match req.direction {
CopyDirection::Export => self.copy_table_to(req).await,
CopyDirection::Import => self.copy_table_from(req).await,

View File

@@ -1,142 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use common_query::Output;
use datatypes::data_type::DataType;
use datatypes::prelude::VectorRef;
use datatypes::vectors::StringVector;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use sql::ast::{BinaryOperator, Expr, Value};
use sql::statements::delete::Delete;
use sql::statements::sql_value_to_value;
use table::engine::TableReference;
use table::requests::DeleteRequest;
use table::TableRef;
use crate::error::{ColumnNotFoundSnafu, DeleteSnafu, InvalidSqlSnafu, NotSupportSqlSnafu, Result};
use crate::instance::sql::table_idents_to_full_name;
use crate::sql::SqlHandler;
impl SqlHandler {
pub(crate) async fn delete(&self, query_ctx: QueryContextRef, stmt: Delete) -> Result<Output> {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(stmt.table_name(), query_ctx)?;
let table_ref = TableReference {
catalog: &catalog_name.to_string(),
schema: &schema_name.to_string(),
table: &table_name.to_string(),
};
let table = self.get_table(&table_ref)?;
let req = DeleteRequest {
key_column_values: parse_selection(stmt.selection(), &table)?,
};
let affected_rows = table.delete(req).await.with_context(|_| DeleteSnafu {
table_name: table_ref.to_string(),
})?;
Ok(Output::AffectedRows(affected_rows))
}
}
/// parse selection, currently supported format is `tagkey1 = 'tagvalue1' and 'ts' = 'value'`.
/// (only uses =, and in the where clause and provides all columns needed by the key.)
fn parse_selection(
selection: &Option<Expr>,
table: &TableRef,
) -> Result<HashMap<String, VectorRef>> {
let mut key_column_values = HashMap::new();
if let Some(expr) = selection {
parse_expr(expr, &mut key_column_values, table)?;
}
Ok(key_column_values)
}
fn parse_expr(
expr: &Expr,
key_column_values: &mut HashMap<String, VectorRef>,
table: &TableRef,
) -> Result<()> {
// match BinaryOp
if let Expr::BinaryOp { left, op, right } = expr {
match (&**left, op, &**right) {
// match And operator
(Expr::BinaryOp { .. }, BinaryOperator::And, Expr::BinaryOp { .. }) => {
parse_expr(left, key_column_values, table)?;
parse_expr(right, key_column_values, table)?;
return Ok(());
}
// match Eq operator
(Expr::Identifier(column_name), BinaryOperator::Eq, Expr::Value(value)) => {
key_column_values.insert(
column_name.to_string(),
value_to_vector(&column_name.to_string(), value, table)?,
);
return Ok(());
}
(Expr::Identifier(column_name), BinaryOperator::Eq, Expr::Identifier(value)) => {
key_column_values.insert(
column_name.to_string(),
Arc::new(StringVector::from(vec![value.to_string()])),
);
return Ok(());
}
_ => {}
}
}
NotSupportSqlSnafu {
msg: format!(
"Not support sql expr:{expr},correct format is tagkey1 = tagvalue1 and ts = value"
),
}
.fail()
}
/// parse value to vector
fn value_to_vector(column_name: &String, sql_value: &Value, table: &TableRef) -> Result<VectorRef> {
let schema = table.schema();
let column_schema =
schema
.column_schema_by_name(column_name)
.with_context(|| ColumnNotFoundSnafu {
table_name: table.table_info().name.clone(),
column_name: column_name.to_string(),
})?;
let data_type = &column_schema.data_type;
let value = sql_value_to_value(column_name, data_type, sql_value);
match value {
Ok(value) => {
let mut vec = data_type.create_mutable_vector(1);
if vec.try_push_value_ref(value.as_value_ref()).is_err() {
return InvalidSqlSnafu {
msg: format!(
"invalid sql, column name is {column_name}, value is {sql_value}",
),
}
.fail();
}
Ok(vec.to_vector())
}
_ => InvalidSqlSnafu {
msg: format!("invalid sql, column name is {column_name}, value is {sql_value}",),
}
.fail(),
}
}

View File

@@ -751,7 +751,7 @@ async fn test_delete() {
let output = execute_sql(
&instance,
"delete from test_table where host = host1 and ts = 1655276557000 ",
"delete from test_table where host = 'host1' and ts = 1655276557000 ",
)
.await;
assert!(matches!(output, Output::AffectedRows(1)));
@@ -981,7 +981,9 @@ async fn try_execute_sql_in_db(
let stmt = QueryLanguageParser::parse_sql(sql).unwrap();
match stmt {
QueryStatement::Sql(Statement::Query(_)) => plan_exec(instance, stmt, query_ctx).await,
QueryStatement::Sql(Statement::Query(_)) | QueryStatement::Sql(Statement::Delete(_)) => {
plan_exec(instance, stmt, query_ctx).await
}
QueryStatement::Sql(Statement::Insert(ref insert)) if insert.is_insert_select() => {
plan_exec(instance, stmt, query_ctx).await
}

View File

@@ -468,7 +468,9 @@ impl Instance {
check_permission(self.plugins.clone(), &stmt, &query_ctx)?;
match stmt {
Statement::Query(_) | Statement::Explain(_) => self.plan_exec(stmt, query_ctx).await,
Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
self.plan_exec(stmt, query_ctx).await
}
// For performance consideration, only "insert with select" is executed by query engine.
// Plain insert ("insert with values") is still executed directly in statement.
@@ -483,7 +485,6 @@ impl Instance {
| Statement::ShowTables(_)
| Statement::DescribeTable(_)
| Statement::Insert(_)
| Statement::Delete(_)
| Statement::Alter(_)
| Statement::DropTable(_)
| Statement::Copy(_) => self
@@ -657,8 +658,8 @@ pub fn check_permission(
}
match stmt {
// query,explain and tql will be checked in QueryEngineState
Statement::Query(_) | Statement::Explain(_) | Statement::Tql(_) => {}
// These are executed by query engine, and will be checked there.
Statement::Query(_) | Statement::Explain(_) | Statement::Tql(_) | Statement::Delete(_) => {}
// database ops won't be checked
Statement::CreateDatabase(_) | Statement::ShowDatabases(_) | Statement::Use(_) => {}
// show create table and alter are not supported yet
@@ -683,9 +684,6 @@ pub fn check_permission(
Statement::DescribeTable(stmt) => {
validate_param(stmt.name(), query_ctx)?;
}
Statement::Delete(delete) => {
validate_param(delete.table_name(), query_ctx)?;
}
Statement::Copy(stmd) => match stmd {
CopyTable::To(copy_table_to) => validate_param(&copy_table_to.table_name, query_ctx)?,
CopyTable::From(copy_table_from) => {

View File

@@ -18,6 +18,7 @@ mod catalog_adapter;
mod error;
mod planner;
use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
@@ -35,18 +36,20 @@ use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::ResolvedTableReference;
use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, WriteOp};
use datatypes::prelude::VectorRef;
use datatypes::schema::Schema;
use futures_util::StreamExt;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use table::requests::InsertRequest;
use snafu::{ensure, OptionExt, ResultExt};
use table::requests::{DeleteRequest, InsertRequest};
use table::TableRef;
pub use crate::datafusion::catalog_adapter::DfCatalogListAdapter;
pub use crate::datafusion::planner::DfContextProviderAdapter;
use crate::error::{
CatalogNotFoundSnafu, CatalogSnafu, CreateRecordBatchSnafu, DataFusionSnafu,
QueryExecutionSnafu, Result, SchemaNotFoundSnafu, TableNotFoundSnafu, UnsupportedExprSnafu,
MissingTimestampColumnSnafu, QueryExecutionSnafu, Result, SchemaNotFoundSnafu,
TableNotFoundSnafu, UnsupportedExprSnafu,
};
use crate::executor::QueryExecutor;
use crate::logical_optimizer::LogicalOptimizer;
@@ -76,11 +79,18 @@ impl DatafusionQueryEngine {
Ok(Output::Stream(self.execute_stream(&ctx, &physical_plan)?))
}
async fn exec_insert_plan(
async fn exec_dml_statement(
&self,
dml: &DmlStatement,
dml: DmlStatement,
query_ctx: QueryContextRef,
) -> Result<Output> {
ensure!(
matches!(dml.op, WriteOp::Insert | WriteOp::Delete),
UnsupportedExprSnafu {
name: format!("DML op {}", dml.op),
}
);
let default_catalog = query_ctx.current_catalog();
let default_schema = query_ctx.current_schema();
let table_name = dml
@@ -101,20 +111,75 @@ impl DatafusionQueryEngine {
let mut affected_rows = 0;
while let Some(batch) = stream.next().await {
let batch = batch.context(CreateRecordBatchSnafu)?;
let request = InsertRequest::try_from_recordbatch(&table_name, table.schema(), batch)
let column_vectors = batch
.column_vectors(&table_name.to_string(), table.schema())
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let rows = table
.insert(request)
.await
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let rows = match dml.op {
WriteOp::Insert => Self::insert(&table_name, &table, column_vectors).await?,
WriteOp::Delete => Self::delete(&table_name, &table, column_vectors).await?,
_ => unreachable!("guarded by the 'ensure!' at the beginning"),
};
affected_rows += rows;
}
Ok(Output::AffectedRows(affected_rows))
}
async fn delete<'a>(
table_name: &ResolvedTableReference<'a>,
table: &TableRef,
column_vectors: HashMap<String, VectorRef>,
) -> Result<usize> {
let table_schema = table.schema();
let ts_column = table_schema
.timestamp_column()
.map(|x| &x.name)
.with_context(|| MissingTimestampColumnSnafu {
table_name: table_name.to_string(),
})?;
let table_info = table.table_info();
let rowkey_columns = table_info
.meta
.row_key_column_names()
.collect::<Vec<&String>>();
let column_vectors = column_vectors
.into_iter()
.filter(|x| &x.0 == ts_column || rowkey_columns.contains(&&x.0))
.collect::<HashMap<_, _>>();
let request = DeleteRequest {
key_column_values: column_vectors,
};
table
.delete(request)
.await
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)
}
async fn insert<'a>(
table_name: &ResolvedTableReference<'a>,
table: &TableRef,
column_vectors: HashMap<String, VectorRef>,
) -> Result<usize> {
let request = InsertRequest {
catalog_name: table_name.catalog.to_string(),
schema_name: table_name.schema.to_string(),
table_name: table_name.table.to_string(),
columns_values: column_vectors,
region_number: 0,
};
table
.insert(request)
.await
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)
}
async fn find_table(&self, table_name: &ResolvedTableReference<'_>) -> Result<TableRef> {
let catalog_name = table_name.catalog.as_ref();
let schema_name = table_name.schema.as_ref();
@@ -163,13 +228,9 @@ impl QueryEngine for DatafusionQueryEngine {
async fn execute(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
match plan {
LogicalPlan::DfPlan(DfLogicalPlan::Dml(dml)) => match dml.op {
WriteOp::Insert => self.exec_insert_plan(&dml, query_ctx).await,
_ => UnsupportedExprSnafu {
name: format!("DML op {}", dml.op),
}
.fail(),
},
LogicalPlan::DfPlan(DfLogicalPlan::Dml(dml)) => {
self.exec_dml_statement(dml, query_ctx).await
}
_ => self.exec_query_plan(plan).await,
}
}

View File

@@ -110,6 +110,12 @@ pub enum Error {
source: DataFusionError,
backtrace: Backtrace,
},
#[snafu(display("Timestamp column for table '{table_name}' is missing!"))]
MissingTimestampColumn {
table_name: String,
backtrace: Backtrace,
},
}
impl ErrorExt for Error {
@@ -131,7 +137,7 @@ impl ErrorExt for Error {
}
CreateRecordBatch { source } => source.status_code(),
QueryExecution { source } | QueryPlan { source } => source.status_code(),
DataFusion { .. } => StatusCode::Internal,
DataFusion { .. } | MissingTimestampColumn { .. } => StatusCode::Internal,
Sql { source } => source.status_code(),
PlanSql { .. } => StatusCode::PlanQuery,
}

View File

@@ -31,7 +31,7 @@ impl<'a> ParserContext<'a> {
match spstatement {
SpStatement::Delete { .. } => {
Ok(Statement::Delete(Box::new(Delete::try_from(spstatement)?)))
Ok(Statement::Delete(Box::new(Delete { inner: spstatement })))
}
unexp => error::UnsupportedSnafu {
sql: self.sql.to_string(),

View File

@@ -12,58 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use sqlparser::ast::{Expr, ObjectName, Statement, TableFactor};
use crate::error::{Error, InvalidSqlSnafu, Result};
use sqlparser::ast::Statement;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Delete {
table_name: ObjectName,
selection: Option<Expr>,
}
impl Delete {
pub fn table_name(&self) -> &ObjectName {
&self.table_name
}
pub fn selection(&self) -> &Option<Expr> {
&self.selection
}
}
impl TryFrom<Statement> for Delete {
type Error = Error;
fn try_from(stmt: Statement) -> Result<Self> {
match stmt {
Statement::Delete {
table_name,
using,
selection,
returning,
} => {
if using.is_some() || returning.is_some() {
return InvalidSqlSnafu {
msg: "delete sql isn't support using and returning.".to_string(),
}
.fail();
}
match table_name {
TableFactor::Table { name, .. } => Ok(Delete {
table_name: name,
selection,
}),
_ => InvalidSqlSnafu {
msg: "can't find table name, tableFactor is not Table type".to_string(),
}
.fail(),
}
}
unexp => InvalidSqlSnafu {
msg: format!("Not expected to be {unexp}"),
}
.fail(),
}
}
pub inner: Statement,
}

View File

@@ -80,6 +80,7 @@ impl TryFrom<&Statement> for DfStatement {
Statement::Query(query) => SpStatement::Query(Box::new(query.inner.clone())),
Statement::Explain(explain) => explain.inner.clone(),
Statement::Insert(insert) => insert.inner.clone(),
Statement::Delete(delete) => delete.inner.clone(),
_ => {
return ConvertToDfStatementSnafu {
statement: format!("{s:?}"),

View File

@@ -18,7 +18,6 @@ use common_error::prelude::*;
use common_recordbatch::error::Error as RecordBatchError;
use datafusion::error::DataFusionError;
use datatypes::arrow::error::ArrowError;
use datatypes::prelude::ConcreteDataType;
pub type Result<T> = std::result::Result<T, Error>;
@@ -115,19 +114,6 @@ pub enum Error {
value: String,
backtrace: Backtrace,
},
#[snafu(display(
"Failed to cast vector of type '{:?}' to type '{:?}', source: {}",
from_type,
to_type,
source
))]
CastVector {
from_type: ConcreteDataType,
to_type: ConcreteDataType,
#[snafu(backtrace)]
source: datatypes::error::Error,
},
}
impl ErrorExt for Error {
@@ -142,9 +128,7 @@ impl ErrorExt for Error {
}
Error::TablesRecordBatch { .. } => StatusCode::Unexpected,
Error::ColumnExists { .. } => StatusCode::TableColumnExists,
Error::SchemaBuild { source, .. } | Error::CastVector { source, .. } => {
source.status_code()
}
Error::SchemaBuild { source, .. } => source.status_code(),
Error::TableOperation { source } => source.status_code(),
Error::ColumnNotExists { .. } => StatusCode::TableColumnNotFound,
Error::RegionSchemaMismatch { .. } => StatusCode::StorageUnavailable,

View File

@@ -13,7 +13,6 @@
// limitations under the License.
//! Table and TableEngine requests
mod insert;
use std::collections::HashMap;
use std::str::FromStr;
@@ -22,7 +21,6 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize;
use datatypes::prelude::VectorRef;
use datatypes::schema::{ColumnSchema, RawSchema};
pub use insert::InsertRequest;
use serde::{Deserialize, Serialize};
use store_api::storage::RegionNumber;
@@ -174,6 +172,15 @@ pub struct DropTableRequest {
pub table_name: String,
}
#[derive(Debug)]
pub struct InsertRequest {
pub catalog_name: String,
pub schema_name: String,
pub table_name: String,
pub columns_values: HashMap<String, VectorRef>,
pub region_number: RegionNumber,
}
/// Delete (by primary key) request
#[derive(Debug)]
pub struct DeleteRequest {

View File

@@ -1,79 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use common_recordbatch::RecordBatch;
use datafusion_common::ResolvedTableReference;
use datatypes::prelude::VectorRef;
use datatypes::schema::SchemaRef;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionNumber;
use crate::error::{CastVectorSnafu, ColumnNotExistsSnafu, Result};
#[derive(Debug)]
pub struct InsertRequest {
pub catalog_name: String,
pub schema_name: String,
pub table_name: String,
pub columns_values: HashMap<String, VectorRef>,
pub region_number: RegionNumber,
}
impl InsertRequest {
pub fn try_from_recordbatch(
table_name: &ResolvedTableReference,
table_schema: SchemaRef,
recordbatch: RecordBatch,
) -> Result<Self> {
let mut columns_values = HashMap::with_capacity(recordbatch.num_columns());
// column schemas in recordbatch must match its vectors, otherwise it's corrupted
for (vector_schema, vector) in recordbatch
.schema
.column_schemas()
.iter()
.zip(recordbatch.columns().iter())
{
let column_name = &vector_schema.name;
let column_schema = table_schema
.column_schema_by_name(column_name)
.with_context(|| ColumnNotExistsSnafu {
table_name: table_name.table.to_string(),
column_name,
})?;
let vector = if vector_schema.data_type != column_schema.data_type {
vector
.cast(&column_schema.data_type)
.with_context(|_| CastVectorSnafu {
from_type: vector.data_type(),
to_type: column_schema.data_type.clone(),
})?
} else {
vector.clone()
};
columns_values.insert(column_name.clone(), vector);
}
Ok(InsertRequest {
catalog_name: table_name.catalog.to_string(),
schema_name: table_name.schema.to_string(),
table_name: table_name.table.to_string(),
columns_values,
region_number: 0,
})
}
}

View File

@@ -8,19 +8,19 @@ Affected Rows: 3
delete from monitor where cpu = 66.6 and ts = 1655276557000;
Error: 1004(InvalidArguments), Missing column host in write batch
Affected Rows: 1
delete from monitor where host = 'host1' or ts = 1655276557000;
Error: 1004(InvalidArguments), Not support SQL, error: Not support sql expr:host = 'host1' OR ts = 1655276557000,correct format is tagkey1 = tagvalue1 and ts = value
Affected Rows: 0
delete from monitor where host = 'host1' or ts != 1655276557000;
Error: 1004(InvalidArguments), Not support SQL, error: Not support sql expr:host = 'host1' OR ts <> 1655276557000,correct format is tagkey1 = tagvalue1 and ts = value
Affected Rows: 2
delete from monitor where ts != 1655276557000;
Error: 1004(InvalidArguments), Not support SQL, error: Not support sql expr:ts <> 1655276557000,correct format is tagkey1 = tagvalue1 and ts = value
Affected Rows: 0
drop table monitor;