feat(frontend): unify column inserter and row inserter (#2293)

* refactor: unify inserter

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat(frontend): unify column inserter and row inserter

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: remove redundant clone

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: move empty check ahead

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat: add more logs

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: leading license

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* adjust indent

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2023-08-31 21:53:03 +08:00
committed by Ruihang Xia
parent 365e557e7a
commit b42d343ae6
4 changed files with 571 additions and 415 deletions

View File

@@ -0,0 +1,554 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::helper::ColumnDataTypeWrapper;
use api::v1::alter_expr::Kind;
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::value::ValueData;
use api::v1::{
AlterExpr, Column, ColumnDataType, ColumnSchema, DdlRequest, InsertRequest, InsertRequests,
Row, RowInsertRequest, RowInsertRequests, Rows, Value,
};
use catalog::CatalogManagerRef;
use common_base::BitVec;
use common_catalog::consts::default_engine;
use common_grpc_expr::util::{extract_new_columns, ColumnExpr};
use common_query::Output;
use common_telemetry::info;
use datatypes::schema::Schema;
use servers::query_handler::grpc::GrpcQueryHandlerRef;
use session::context::QueryContextRef;
use snafu::prelude::*;
use table::engine::TableReference;
use table::TableRef;
use crate::error::{
CatalogSnafu, ColumnDataTypeSnafu, EmptyDataSnafu, Error, FindNewColumnsOnInsertionSnafu,
InvalidInsertRequestSnafu, Result,
};
use crate::expr_factory::CreateExprFactory;
pub(crate) struct Inserter<'a> {
catalog_manager: &'a CatalogManagerRef,
create_expr_factory: &'a CreateExprFactory,
grpc_query_handler: &'a GrpcQueryHandlerRef<Error>,
}
impl<'a> Inserter<'a> {
pub fn new(
catalog_manager: &'a CatalogManagerRef,
create_expr_factory: &'a CreateExprFactory,
grpc_query_handler: &'a GrpcQueryHandlerRef<Error>,
) -> Self {
Self {
catalog_manager,
create_expr_factory,
grpc_query_handler,
}
}
pub async fn handle_column_inserts(
&self,
requests: InsertRequests,
ctx: QueryContextRef,
) -> Result<Output> {
let row_inserts = requests_column_to_row(requests)?;
self.handle_row_inserts(row_inserts, ctx).await
}
pub async fn handle_row_inserts(
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
) -> Result<Output> {
requests.inserts.iter().try_for_each(|req| {
let non_empty = req.rows.as_ref().map(|r| !r.rows.is_empty());
let non_empty = non_empty.unwrap_or_default();
non_empty.then_some(()).with_context(|| EmptyDataSnafu {
msg: format!("insert to table: {:?}", &req.table_name),
})
})?;
self.create_or_alter_tables_on_demand(&requests, ctx.clone())
.await?;
let query = Request::RowInserts(requests);
self.grpc_query_handler.do_query(query, ctx).await
}
}
impl<'a> Inserter<'a> {
// check if tables already exist:
// - if table does not exist, create table by inferred CreateExpr
// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr`
async fn create_or_alter_tables_on_demand(
&self,
requests: &RowInsertRequests,
ctx: QueryContextRef,
) -> Result<()> {
// TODO(jeremy): create and alter in batch?
for req in &requests.inserts {
match self.get_table(req, &ctx).await? {
Some(table) => {
validate_request_with_table(req, &table)?;
self.alter_table_on_demand(req, table, &ctx).await?
}
None => self.create_table(req, &ctx).await?,
}
}
Ok(())
}
async fn get_table(
&self,
req: &RowInsertRequest,
ctx: &QueryContextRef,
) -> Result<Option<TableRef>> {
self.catalog_manager
.table(ctx.current_catalog(), ctx.current_schema(), &req.table_name)
.await
.context(CatalogSnafu)
}
async fn alter_table_on_demand(
&self,
req: &RowInsertRequest,
table: TableRef,
ctx: &QueryContextRef,
) -> Result<()> {
let catalog_name = ctx.current_catalog();
let schema_name = ctx.current_schema();
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let column_exprs = ColumnExpr::from_column_schemas(request_schema);
let add_columns = extract_new_columns(&table.schema(), column_exprs)
.context(FindNewColumnsOnInsertionSnafu)?;
let Some(add_columns) = add_columns else {
return Ok(());
};
let table_name = table.table_info().name.clone();
info!(
"Adding new columns: {:?} to table: {}.{}.{}",
add_columns, catalog_name, schema_name, table_name
);
let alter_table_expr = AlterExpr {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
kind: Some(Kind::AddColumns(add_columns)),
..Default::default()
};
let req = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(alter_table_expr)),
});
self.grpc_query_handler.do_query(req, ctx.clone()).await?;
info!(
"Successfully added new columns to table: {}.{}.{}",
catalog_name, schema_name, table_name
);
Ok(())
}
async fn create_table(&self, req: &RowInsertRequest, ctx: &QueryContextRef) -> Result<()> {
let table_ref =
TableReference::full(ctx.current_catalog(), ctx.current_schema(), &req.table_name);
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
info!(
"Table {}.{}.{} does not exist, try create table",
table_ref.catalog, table_ref.schema, table_ref.table,
);
let create_table_expr = self
.create_expr_factory
.create_table_expr_by_column_schemas(&table_ref, request_schema, default_engine())?;
let req = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(create_table_expr)),
});
self.grpc_query_handler.do_query(req, ctx.clone()).await?;
info!(
"Successfully created table on insertion: {}.{}.{}",
table_ref.catalog, table_ref.schema, table_ref.table,
);
Ok(())
}
}
fn requests_column_to_row(requests: InsertRequests) -> Result<RowInsertRequests> {
requests
.inserts
.into_iter()
.map(request_column_to_row)
.collect::<Result<Vec<_>>>()
.map(|inserts| RowInsertRequests { inserts })
}
fn request_column_to_row(request: InsertRequest) -> Result<RowInsertRequest> {
let row_count = request.row_count as usize;
let column_count = request.columns.len();
let mut schema = Vec::with_capacity(column_count);
let mut rows = vec![
Row {
values: Vec::with_capacity(column_count)
};
row_count
];
for column in request.columns {
let column_schema = ColumnSchema {
column_name: column.column_name.clone(),
datatype: column.datatype,
semantic_type: column.semantic_type,
};
schema.push(column_schema);
push_column_to_rows(column, &mut rows)?;
}
Ok(RowInsertRequest {
table_name: request.table_name,
rows: Some(Rows { schema, rows }),
region_number: request.region_number,
})
}
fn push_column_to_rows(column: Column, rows: &mut [Row]) -> Result<()> {
let null_mask = BitVec::from_vec(column.null_mask);
let column_type = ColumnDataTypeWrapper::try_new(column.datatype)
.context(ColumnDataTypeSnafu)?
.datatype();
let column_values = column.values.unwrap_or_default();
macro_rules! push_column_values_match_types {
($( ($arm:tt, $value_data_variant:tt, $field_name:tt), )*) => { match column_type { $(
ColumnDataType::$arm => {
let row_count = rows.len();
let actual_row_count = null_mask.count_ones() + column_values.$field_name.len();
ensure!(
actual_row_count == row_count,
InvalidInsertRequestSnafu {
reason: format!(
"Expecting {} rows of data for column '{}', but got {}.",
row_count, column.column_name, actual_row_count
),
}
);
let mut null_mask_iter = null_mask.into_iter();
let mut values_iter = column_values.$field_name.into_iter();
for row in rows {
let value_is_null = null_mask_iter.next();
if value_is_null == Some(true) {
row.values.push(Value { value_data: None });
} else {
// previous check ensures that there is a value for each row
let value = values_iter.next().unwrap();
row.values.push(Value {
value_data: Some(ValueData::$value_data_variant(value)),
});
}
}
}
)* }}
}
push_column_values_match_types!(
(Boolean, BoolValue, bool_values),
(Int8, I8Value, i8_values),
(Int16, I16Value, i16_values),
(Int32, I32Value, i32_values),
(Int64, I64Value, i64_values),
(Uint8, U8Value, u8_values),
(Uint16, U16Value, u16_values),
(Uint32, U32Value, u32_values),
(Uint64, U64Value, u64_values),
(Float32, F32Value, f32_values),
(Float64, F64Value, f64_values),
(Binary, BinaryValue, binary_values),
(String, StringValue, string_values),
(Date, DateValue, date_values),
(Datetime, DatetimeValue, datetime_values),
(TimestampSecond, TsSecondValue, ts_second_values),
(
TimestampMillisecond,
TsMillisecondValue,
ts_millisecond_values
),
(
TimestampMicrosecond,
TsMicrosecondValue,
ts_microsecond_values
),
(TimestampNanosecond, TsNanosecondValue, ts_nanosecond_values),
(TimeSecond, TimeSecondValue, time_second_values),
(
TimeMillisecond,
TimeMillisecondValue,
time_millisecond_values
),
(
TimeMicrosecond,
TimeMicrosecondValue,
time_microsecond_values
),
(TimeNanosecond, TimeNanosecondValue, time_nanosecond_values),
(
IntervalYearMonth,
IntervalYearMonthValues,
interval_year_month_values
),
(
IntervalDayTime,
IntervalDayTimeValues,
interval_day_time_values
),
(
IntervalMonthDayNano,
IntervalMonthDayNanoValues,
interval_month_day_nano_values
),
);
Ok(())
}
fn validate_request_with_table(req: &RowInsertRequest, table: &TableRef) -> Result<()> {
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let table_schema = table.schema();
validate_required_columns(request_schema, &table_schema)?;
Ok(())
}
fn validate_required_columns(request_schema: &[ColumnSchema], table_schema: &Schema) -> Result<()> {
for column_schema in table_schema.column_schemas() {
if column_schema.is_nullable() || column_schema.default_constraint().is_some() {
continue;
}
if !request_schema
.iter()
.any(|c| c.column_name == column_schema.name)
{
return InvalidInsertRequestSnafu {
reason: format!(
"Expecting insert data to be presented on a not null or no default value column '{}'.",
&column_schema.name
)
}.fail();
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use api::v1::column::Values;
use api::v1::SemanticType;
use common_base::bit_vec::prelude::*;
use datatypes::prelude::{ConcreteDataType, Value as DtValue};
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema as DtColumnSchema};
use super::*;
#[test]
fn test_validate_required_columns() {
let schema = Schema::new(vec![
DtColumnSchema::new("a", ConcreteDataType::int32_datatype(), true)
.with_default_constraint(None)
.unwrap(),
DtColumnSchema::new("b", ConcreteDataType::int32_datatype(), true)
.with_default_constraint(Some(ColumnDefaultConstraint::Value(DtValue::Int32(100))))
.unwrap(),
]);
let request_schema = &[ColumnSchema {
column_name: "c".to_string(),
..Default::default()
}];
// If nullable is true, it doesn't matter whether the insert request has the column.
validate_required_columns(request_schema, &schema).unwrap();
let schema = Schema::new(vec![
DtColumnSchema::new("a", ConcreteDataType::int32_datatype(), false)
.with_default_constraint(None)
.unwrap(),
DtColumnSchema::new("b", ConcreteDataType::int32_datatype(), false)
.with_default_constraint(Some(ColumnDefaultConstraint::Value(DtValue::Int32(-100))))
.unwrap(),
]);
let request_schema = &[ColumnSchema {
column_name: "a".to_string(),
..Default::default()
}];
// If nullable is false, but the column is defined with default value,
// it also doesn't matter whether the insert request has the column.
validate_required_columns(request_schema, &schema).unwrap();
let request_schema = &[ColumnSchema {
column_name: "b".to_string(),
..Default::default()
}];
// Neither of the above cases.
assert!(validate_required_columns(request_schema, &schema).is_err());
}
#[test]
fn test_request_column_to_row() {
let insert_request = InsertRequest {
table_name: String::from("test_table"),
row_count: 3,
region_number: 1,
columns: vec![
Column {
column_name: String::from("col1"),
datatype: ColumnDataType::Int32.into(),
semantic_type: SemanticType::Field.into(),
null_mask: bitvec![u8, Lsb0; 1, 0, 1].into_vec(),
values: Some(Values {
i32_values: vec![42],
..Default::default()
}),
},
Column {
column_name: String::from("col2"),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Tag.into(),
null_mask: vec![],
values: Some(Values {
string_values: vec![
String::from("value1"),
String::from("value2"),
String::from("value3"),
],
..Default::default()
}),
},
],
};
let result = request_column_to_row(insert_request);
let row_insert_request = result.unwrap();
assert_eq!(row_insert_request.table_name, "test_table");
assert_eq!(row_insert_request.region_number, 1);
let rows = row_insert_request.rows.unwrap();
assert_eq!(
rows.schema,
vec![
ColumnSchema {
column_name: String::from("col1"),
datatype: ColumnDataType::Int32.into(),
semantic_type: SemanticType::Field.into(),
},
ColumnSchema {
column_name: String::from("col2"),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Tag.into(),
},
]
);
assert_eq!(
rows.rows,
vec![
Row {
values: vec![
Value { value_data: None },
Value {
value_data: Some(ValueData::StringValue(String::from("value1"))),
},
],
},
Row {
values: vec![
Value {
value_data: Some(ValueData::I32Value(42)),
},
Value {
value_data: Some(ValueData::StringValue(String::from("value2"))),
},
],
},
Row {
values: vec![
Value { value_data: None },
Value {
value_data: Some(ValueData::StringValue(String::from("value3"))),
},
],
},
]
);
let invalid_request_with_wrong_type = InsertRequest {
table_name: String::from("test_table"),
row_count: 3,
region_number: 1,
columns: vec![Column {
column_name: String::from("col1"),
datatype: ColumnDataType::Int32.into(),
semantic_type: SemanticType::Field.into(),
null_mask: bitvec![u8, Lsb0; 1, 0, 1].into_vec(),
values: Some(Values {
i8_values: vec![42],
..Default::default()
}),
}],
};
assert!(request_column_to_row(invalid_request_with_wrong_type).is_err());
let invalid_request_with_wrong_row_count = InsertRequest {
table_name: String::from("test_table"),
row_count: 3,
region_number: 1,
columns: vec![Column {
column_name: String::from("col1"),
datatype: ColumnDataType::Int32.into(),
semantic_type: SemanticType::Field.into(),
null_mask: bitvec![u8, Lsb0; 0, 0, 1].into_vec(),
values: Some(Values {
i32_values: vec![42],
..Default::default()
}),
}],
};
assert!(request_column_to_row(invalid_request_with_wrong_row_count).is_err());
let invalid_request_with_wrong_row_count = InsertRequest {
table_name: String::from("test_table"),
row_count: 3,
region_number: 1,
columns: vec![Column {
column_name: String::from("col1"),
datatype: ColumnDataType::Int32.into(),
semantic_type: SemanticType::Field.into(),
null_mask: vec![],
values: Some(Values {
i32_values: vec![42],
..Default::default()
}),
}],
};
assert!(request_column_to_row(invalid_request_with_wrong_row_count).is_err());
}
}

View File

@@ -25,31 +25,24 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use api::v1::alter_expr::Kind;
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::meta::Role;
use api::v1::{
AddColumns, AlterExpr, Column, DdlRequest, InsertRequest, InsertRequests, RowInsertRequests,
};
use api::v1::{InsertRequests, RowInsertRequests};
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use catalog::remote::CachedMetaKvBackend;
use catalog::CatalogManagerRef;
use client::client_manager::DatanodeClients;
use common_base::Plugins;
use common_catalog::consts::default_engine;
use common_error::ext::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::key::TableMetadataManager;
use common_query::Output;
use common_telemetry::logging::{debug, info};
use common_telemetry::logging::info;
use common_telemetry::{error, timer};
use datanode::instance::sql::table_idents_to_full_name;
use datanode::instance::InstanceRef as DnInstanceRef;
use datatypes::schema::Schema;
use distributed::DistInstance;
use meta_client::client::{MetaClient, MetaClientBuilder};
use partition::manager::PartitionRuleManager;
@@ -78,21 +71,19 @@ use sql::parser::ParserContext;
use sql::statements::copy::CopyTable;
use sql::statements::statement::Statement;
use sqlparser::ast::ObjectName;
use table::engine::TableReference;
use crate::catalog::FrontendCatalogManager;
use crate::error::{
self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu,
InvalidInsertRequestSnafu, MissingMetasrvOptsSnafu, ParseSqlSnafu, PermissionSnafu,
PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, MissingMetasrvOptsSnafu,
ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
};
use crate::expr_factory::CreateExprFactory;
use crate::frontend::FrontendOptions;
use crate::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
use crate::heartbeat::HeartbeatTask;
use crate::inserter::Inserter;
use crate::instance::standalone::StandaloneGrpcQueryHandler;
use crate::metrics;
use crate::row_inserter::RowInserter;
use crate::script::ScriptExecutor;
use crate::server::{start_server, ServerHandlers, Services};
use crate::statement::StatementExecutor;
@@ -130,7 +121,6 @@ pub struct Instance {
plugins: Arc<Plugins>,
servers: Arc<ServerHandlers>,
heartbeat_task: Option<HeartbeatTask>,
row_inserter: Arc<RowInserter>,
}
impl Instance {
@@ -209,12 +199,6 @@ impl Instance {
let create_expr_factory = CreateExprFactory;
let row_inserter = Arc::new(RowInserter::new(
catalog_manager.clone(),
create_expr_factory,
dist_instance.clone(),
));
Ok(Instance {
catalog_manager,
script_executor,
@@ -225,7 +209,6 @@ impl Instance {
plugins: plugins.clone(),
servers: Arc::new(HashMap::new()),
heartbeat_task,
row_inserter,
})
}
@@ -281,12 +264,6 @@ impl Instance {
let create_expr_factory = CreateExprFactory;
let grpc_query_handler = StandaloneGrpcQueryHandler::arc(dn_instance.clone());
let row_inserter = Arc::new(RowInserter::new(
catalog_manager.clone(),
create_expr_factory,
grpc_query_handler.clone(),
));
Ok(Instance {
catalog_manager: catalog_manager.clone(),
script_executor,
@@ -297,7 +274,6 @@ impl Instance {
plugins: Default::default(),
servers: Arc::new(HashMap::new()),
heartbeat_task: None,
row_inserter,
})
}
@@ -318,7 +294,12 @@ impl Instance {
requests: RowInsertRequests,
ctx: QueryContextRef,
) -> Result<Output> {
self.row_inserter.handle_inserts(requests, ctx).await
let inserter = Inserter::new(
&self.catalog_manager,
&self.create_expr_factory,
&self.grpc_query_handler,
);
inserter.handle_row_inserts(requests, ctx).await
}
/// Handle batch inserts
@@ -327,130 +308,12 @@ impl Instance {
requests: InsertRequests,
ctx: QueryContextRef,
) -> Result<Output> {
for req in requests.inserts.iter() {
self.create_or_alter_table_on_demand(ctx.clone(), req)
.await?;
}
let query = Request::Inserts(requests);
GrpcQueryHandler::do_query(&*self.grpc_query_handler, query, ctx).await
}
// check if table already exist:
// - if table does not exist, create table by inferred CreateExpr
// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr`
async fn create_or_alter_table_on_demand(
&self,
ctx: QueryContextRef,
request: &InsertRequest,
) -> Result<()> {
let catalog_name = &ctx.current_catalog().to_owned();
let schema_name = &ctx.current_schema().to_owned();
let table_name = &request.table_name;
let columns = &request.columns;
let table = self
.catalog_manager
.table(catalog_name, schema_name, table_name)
.await
.context(error::CatalogSnafu)?;
match table {
None => {
info!(
"Table {}.{}.{} does not exist, try create table",
catalog_name, schema_name, table_name,
);
let _ = self
.create_table_by_columns(ctx, table_name, columns, default_engine())
.await?;
info!(
"Successfully created table on insertion: {}.{}.{}",
catalog_name, schema_name, table_name
);
}
Some(table) => {
let schema = table.schema();
validate_insert_request(schema.as_ref(), request)?;
if let Some(add_columns) = common_grpc_expr::find_new_columns(&schema, columns)
.context(error::FindNewColumnsOnInsertionSnafu)?
{
info!(
"Find new columns {:?} on insertion, try to alter table: {}.{}.{}",
add_columns, catalog_name, schema_name, table_name
);
let _ = self
.add_new_columns_to_table(ctx, table_name, add_columns)
.await?;
info!(
"Successfully altered table on insertion: {}.{}.{}",
catalog_name, schema_name, table_name
);
}
}
};
Ok(())
}
/// Infer create table expr from inserting data
async fn create_table_by_columns(
&self,
ctx: QueryContextRef,
table_name: &str,
columns: &[Column],
engine: &str,
) -> Result<Output> {
let catalog_name = ctx.current_catalog();
let schema_name = ctx.current_schema();
// Create table automatically, build schema from data.
let table_name = TableReference::full(catalog_name, schema_name, table_name);
let create_expr =
self.create_expr_factory
.create_table_expr_by_columns(&table_name, columns, engine)?;
info!(
"Try to create table: {} automatically with request: {:?}",
table_name, create_expr,
let inserter = Inserter::new(
&self.catalog_manager,
&self.create_expr_factory,
&self.grpc_query_handler,
);
self.grpc_query_handler
.do_query(
Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(create_expr)),
}),
ctx,
)
.await
}
async fn add_new_columns_to_table(
&self,
ctx: QueryContextRef,
table_name: &str,
add_columns: AddColumns,
) -> Result<Output> {
debug!(
"Adding new columns: {:?} to table: {}",
add_columns, table_name
);
let expr = AlterExpr {
catalog_name: ctx.current_catalog().to_owned(),
schema_name: ctx.current_schema().to_owned(),
table_name: table_name.to_string(),
kind: Some(Kind::AddColumns(add_columns)),
..Default::default()
};
self.grpc_query_handler
.do_query(
Request::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(expr)),
}),
ctx,
)
.await
inserter.handle_column_inserts(requests, ctx).await
}
pub fn set_plugins(&mut self, map: Arc<Plugins>) {
@@ -737,36 +600,10 @@ fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()>
.context(SqlExecInterceptedSnafu)
}
fn validate_insert_request(schema: &Schema, request: &InsertRequest) -> Result<()> {
for column_schema in schema.column_schemas() {
if column_schema.is_nullable() || column_schema.default_constraint().is_some() {
continue;
}
let not_null = request
.columns
.iter()
.find(|x| x.column_name == column_schema.name)
.map(|column| column.null_mask.is_empty() || column.null_mask.iter().all(|x| *x == 0));
ensure!(
not_null == Some(true),
InvalidInsertRequestSnafu {
reason: format!(
"Expecting insert data to be presented on a not null or no default value column '{}'.",
&column_schema.name
)
}
);
}
Ok(())
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use api::v1::column::Values;
use datatypes::prelude::{ConcreteDataType, Value};
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema};
use query::query_engine::options::QueryOptions;
use session::context::QueryContext;
use sql::dialect::GreptimeDbDialect;
@@ -774,71 +611,6 @@ mod tests {
use super::*;
#[test]
fn test_validate_insert_request() {
let schema = Schema::new(vec![
ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true)
.with_default_constraint(None)
.unwrap(),
ColumnSchema::new("b", ConcreteDataType::int32_datatype(), true)
.with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Int32(100))))
.unwrap(),
]);
let request = InsertRequest {
columns: vec![Column {
column_name: "c".to_string(),
values: Some(Values {
i32_values: vec![1],
..Default::default()
}),
null_mask: vec![0],
..Default::default()
}],
..Default::default()
};
// If nullable is true, it doesn't matter whether the insert request has the column.
validate_insert_request(&schema, &request).unwrap();
let schema = Schema::new(vec![
ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false)
.with_default_constraint(None)
.unwrap(),
ColumnSchema::new("b", ConcreteDataType::int32_datatype(), false)
.with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Int32(-100))))
.unwrap(),
]);
let request = InsertRequest {
columns: vec![Column {
column_name: "a".to_string(),
values: Some(Values {
i32_values: vec![1],
..Default::default()
}),
null_mask: vec![0],
..Default::default()
}],
..Default::default()
};
// If nullable is false, but the column is defined with default value,
// it also doesn't matter whether the insert request has the column.
validate_insert_request(&schema, &request).unwrap();
let request = InsertRequest {
columns: vec![Column {
column_name: "b".to_string(),
values: Some(Values {
i32_values: vec![1],
..Default::default()
}),
null_mask: vec![0],
..Default::default()
}],
..Default::default()
};
// Neither of the above cases.
assert!(validate_insert_request(&schema, &request).is_err());
}
#[test]
fn test_exec_validation() {
let query_ctx = QueryContext::arc();

View File

@@ -20,9 +20,9 @@ pub mod error;
pub mod expr_factory;
pub mod frontend;
pub mod heartbeat;
pub(crate) mod inserter;
pub mod instance;
pub(crate) mod metrics;
mod row_inserter;
mod script;
mod server;
pub mod service_config;

View File

@@ -1,170 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::alter_expr::Kind;
use api::v1::ddl_request::Expr;
use api::v1::greptime_request::Request;
use api::v1::{AlterExpr, ColumnSchema, DdlRequest, Row, RowInsertRequest, RowInsertRequests};
use catalog::CatalogManagerRef;
use common_catalog::consts::default_engine;
use common_grpc_expr::util::{extract_new_columns, ColumnExpr};
use common_query::Output;
use common_telemetry::info;
use servers::query_handler::grpc::GrpcQueryHandlerRef;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use table::engine::TableReference;
use table::TableRef;
use crate::error::{CatalogSnafu, EmptyDataSnafu, Error, FindNewColumnsOnInsertionSnafu, Result};
use crate::expr_factory::CreateExprFactory;
pub struct RowInserter {
catalog_manager: CatalogManagerRef,
create_expr_factory: CreateExprFactory,
grpc_query_handler: GrpcQueryHandlerRef<Error>,
}
impl RowInserter {
pub fn new(
catalog_manager: CatalogManagerRef,
create_expr_factory: CreateExprFactory,
grpc_query_handler: GrpcQueryHandlerRef<Error>,
) -> Self {
Self {
catalog_manager,
create_expr_factory,
grpc_query_handler,
}
}
pub async fn handle_inserts(
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
) -> Result<Output> {
self.create_or_alter_tables_on_demand(&requests, ctx.clone())
.await?;
let query = Request::RowInserts(requests);
self.grpc_query_handler.do_query(query, ctx).await
}
// check if tables already exist:
// - if table does not exist, create table by inferred CreateExpr
// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr`
async fn create_or_alter_tables_on_demand(
&self,
requests: &RowInsertRequests,
ctx: QueryContextRef,
) -> Result<()> {
let catalog_name = ctx.current_catalog();
let schema_name = ctx.current_schema();
// TODO(jeremy): create and alter in batch?
for req in &requests.inserts {
let table_name = &req.table_name;
let table = self
.catalog_manager
.table(catalog_name, schema_name, table_name)
.await
.context(CatalogSnafu)?;
match table {
Some(table) => {
self.alter_table_on_demand(catalog_name, schema_name, table, req, ctx.clone())
.await?
}
None => {
let table_name = TableReference::full(catalog_name, schema_name, table_name);
self.create_table(&table_name, req, ctx.clone()).await?
}
}
}
Ok(())
}
async fn create_table(
&self,
table_name: &TableReference<'_>,
req: &RowInsertRequest,
ctx: QueryContextRef,
) -> Result<()> {
let (column_schemas, _) = extract_schema_and_rows(req)?;
let create_table_expr = self
.create_expr_factory
.create_table_expr_by_column_schemas(table_name, column_schemas, default_engine())?;
let req = Request::Ddl(DdlRequest {
expr: Some(Expr::CreateTable(create_table_expr)),
});
self.grpc_query_handler.do_query(req, ctx).await?;
Ok(())
}
async fn alter_table_on_demand(
&self,
catalog_name: &str,
schema_name: &str,
table: TableRef,
req: &RowInsertRequest,
ctx: QueryContextRef,
) -> Result<()> {
let (column_schemas, _) = extract_schema_and_rows(req)?;
let column_exprs = ColumnExpr::from_column_schemas(column_schemas);
let add_columns = extract_new_columns(&table.schema(), column_exprs)
.context(FindNewColumnsOnInsertionSnafu)?;
let Some(add_columns) = add_columns else {
return Ok(());
};
let table_name = table.table_info().name.clone();
info!(
"Adding new columns: {:?} to table: {}.{}.{}",
add_columns, catalog_name, schema_name, table_name
);
let alter_table_expr = AlterExpr {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name,
kind: Some(Kind::AddColumns(add_columns)),
..Default::default()
};
let req = Request::Ddl(DdlRequest {
expr: Some(Expr::Alter(alter_table_expr)),
});
self.grpc_query_handler.do_query(req, ctx).await?;
Ok(())
}
}
fn extract_schema_and_rows(req: &RowInsertRequest) -> Result<(&[ColumnSchema], &[Row])> {
let rows = req.rows.as_ref().with_context(|| EmptyDataSnafu {
msg: format!("insert to table: {:?}", &req.table_name),
})?;
let schema = &rows.schema;
let rows = &rows.rows;
ensure!(
!rows.is_empty(),
EmptyDataSnafu {
msg: format!("insert to table: {:?}", &req.table_name),
}
);
Ok((schema, rows))
}