fix: handle insert default value (#5307)

* fix: handle flow inserts with default values

* test: sqlness

* chore: typo

* chore: newline

* feat(WIP): impure default filler

* feat: fill impure default values

* test: add test for default fill impure

* feat: check for impure

* fix: also handle stmt to region

* refactor: per review

* refactor: per review

* chore: rebase fix

* chore: clippy

* chore: per review
This commit is contained in:
discord9
2025-01-14 17:06:53 +08:00
committed by GitHub
parent 1855dccdf1
commit 95b20592ac
14 changed files with 685 additions and 59 deletions

View File

@@ -123,6 +123,14 @@ impl ColumnSchema {
self.default_constraint.as_ref()
}
/// Check if the default constraint is a impure function.
pub fn is_default_impure(&self) -> bool {
self.default_constraint
.as_ref()
.map(|c| c.is_function())
.unwrap_or(false)
}
#[inline]
pub fn metadata(&self) -> &Metadata {
&self.metadata
@@ -290,6 +298,15 @@ impl ColumnSchema {
}
}
/// Creates an impure default value for this column, only if it have a impure default constraint.
/// Otherwise, returns `Ok(None)`.
pub fn create_impure_default(&self) -> Result<Option<Value>> {
match &self.default_constraint {
Some(c) => c.create_impure_default(&self.data_type),
None => Ok(None),
}
}
/// Retrieves the fulltext options for the column.
pub fn fulltext_options(&self) -> Result<Option<FulltextOptions>> {
match self.metadata.get(FULLTEXT_KEY) {

View File

@@ -178,12 +178,63 @@ impl ColumnDefaultConstraint {
}
}
/// Only create default vector if it's impure, i.e., it's a function.
///
/// This helps to delay creating constant default values to mito engine while also keeps impure default have consistent values
pub fn create_impure_default_vector(
&self,
data_type: &ConcreteDataType,
num_rows: usize,
) -> Result<Option<VectorRef>> {
assert!(num_rows > 0);
match self {
ColumnDefaultConstraint::Function(expr) => {
// Functions should also ensure its return value is not null when
// is_nullable is true.
match &expr[..] {
// TODO(dennis): we only supports current_timestamp right now,
// it's better to use a expression framework in future.
CURRENT_TIMESTAMP | CURRENT_TIMESTAMP_FN | NOW_FN => {
create_current_timestamp_vector(data_type, num_rows).map(Some)
}
_ => error::UnsupportedDefaultExprSnafu { expr }.fail(),
}
}
ColumnDefaultConstraint::Value(_) => Ok(None),
}
}
/// Only create default value if it's impure, i.e., it's a function.
///
/// This helps to delay creating constant default values to mito engine while also keeps impure default have consistent values
pub fn create_impure_default(&self, data_type: &ConcreteDataType) -> Result<Option<Value>> {
match self {
ColumnDefaultConstraint::Function(expr) => {
// Functions should also ensure its return value is not null when
// is_nullable is true.
match &expr[..] {
CURRENT_TIMESTAMP | CURRENT_TIMESTAMP_FN | NOW_FN => {
create_current_timestamp(data_type).map(Some)
}
_ => error::UnsupportedDefaultExprSnafu { expr }.fail(),
}
}
ColumnDefaultConstraint::Value(_) => Ok(None),
}
}
/// Returns true if this constraint might creates NULL.
fn maybe_null(&self) -> bool {
// Once we support more functions, we may return true if given function
// could return null.
matches!(self, ColumnDefaultConstraint::Value(Value::Null))
}
/// Returns true if this constraint is a function.
pub fn is_function(&self) -> bool {
matches!(self, ColumnDefaultConstraint::Function(_))
}
}
fn create_current_timestamp(data_type: &ConcreteDataType) -> Result<Value> {

View File

@@ -24,6 +24,7 @@ use common_error::ext::BoxedError;
use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu};
use common_meta::node_manager::Flownode;
use common_telemetry::{debug, trace};
use datatypes::value::Value;
use itertools::Itertools;
use snafu::{IntoError, OptionExt, ResultExt};
use store_api::storage::RegionId;
@@ -178,14 +179,32 @@ impl Flownode for FlowWorkerManager {
.table_from_id(&table_id)
.await
.map_err(to_meta_err(snafu::location!()))?;
let default_vals = table_schema
.default_values
.iter()
.zip(table_schema.relation_desc.typ().column_types.iter())
.map(|(v, ty)| {
v.as_ref().and_then(|v| {
match v.create_default(ty.scalar_type(), ty.nullable()) {
Ok(v) => Some(v),
Err(err) => {
common_telemetry::error!(err; "Failed to create default value");
None
}
}
})
})
.collect_vec();
let table_types = table_schema
.relation_desc
.typ()
.column_types
.clone()
.into_iter()
.map(|t| t.scalar_type)
.collect_vec();
let table_col_names = table_schema.names;
let table_col_names = table_schema.relation_desc.names;
let table_col_names = table_col_names
.iter().enumerate()
.map(|(idx,name)| match name {
@@ -202,31 +221,35 @@ impl Flownode for FlowWorkerManager {
.enumerate()
.map(|(i, name)| (&name.column_name, i)),
);
let fetch_order: Vec<usize> = table_col_names
let fetch_order: Vec<FetchFromRow> = table_col_names
.iter()
.map(|col_name| {
.zip(default_vals.into_iter())
.map(|(col_name, col_default_val)| {
name_to_col
.get(col_name)
.copied()
.map(FetchFromRow::Idx)
.or_else(|| col_default_val.clone().map(FetchFromRow::Default))
.with_context(|| UnexpectedSnafu {
err_msg: format!("Column not found: {}", col_name),
err_msg: format!(
"Column not found: {}, default_value: {:?}",
col_name, col_default_val
),
})
})
.try_collect()?;
if !fetch_order.iter().enumerate().all(|(i, &v)| i == v) {
trace!("Reordering columns: {:?}", fetch_order)
}
trace!("Reordering columns: {:?}", fetch_order);
(table_types, fetch_order)
};
// TODO(discord9): use column instead of row
let rows: Vec<DiffRow> = rows_proto
.into_iter()
.map(|r| {
let r = repr::Row::from(r);
let reordered = fetch_order
.iter()
.map(|&i| r.inner[i].clone())
.collect_vec();
let reordered = fetch_order.iter().map(|i| i.fetch(&r)).collect_vec();
repr::Row::new(reordered)
})
.map(|r| (r, now, 1))
@@ -258,3 +281,20 @@ impl Flownode for FlowWorkerManager {
Ok(Default::default())
}
}
/// Simple helper enum for fetching value from row with default value
#[derive(Debug, Clone)]
enum FetchFromRow {
Idx(usize),
Default(Value),
}
impl FetchFromRow {
/// Panic if idx is out of bound
fn fetch(&self, row: &repr::Row) -> Value {
match self {
FetchFromRow::Idx(idx) => row.get(*idx).unwrap().clone(),
FetchFromRow::Default(v) => v.clone(),
}
}
}

View File

@@ -353,7 +353,7 @@ impl FlownodeContext {
name: name.join("."),
})?;
let schema = self.table_source.table(name).await?;
Ok((id, schema))
Ok((id, schema.relation_desc))
}
/// Assign a global id to a table, if already assigned, return the existing global id

View File

@@ -17,6 +17,8 @@
use common_error::ext::BoxedError;
use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
use common_meta::key::table_name::{TableNameKey, TableNameManager};
use datatypes::schema::ColumnDefaultConstraint;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
@@ -27,6 +29,32 @@ use crate::error::{
};
use crate::repr::RelationDesc;
/// Table description, include relation desc and default values, which is the minimal information flow needed for table
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TableDesc {
pub relation_desc: RelationDesc,
pub default_values: Vec<Option<ColumnDefaultConstraint>>,
}
impl TableDesc {
pub fn new(
relation_desc: RelationDesc,
default_values: Vec<Option<ColumnDefaultConstraint>>,
) -> Self {
Self {
relation_desc,
default_values,
}
}
pub fn new_no_default(relation_desc: RelationDesc) -> Self {
Self {
relation_desc,
default_values: vec![],
}
}
}
/// Table source but for flow, provide table schema by table name/id
#[async_trait::async_trait]
pub trait FlowTableSource: Send + Sync + std::fmt::Debug {
@@ -34,11 +62,11 @@ pub trait FlowTableSource: Send + Sync + std::fmt::Debug {
async fn table_id_from_name(&self, name: &TableName) -> Result<TableId, Error>;
/// Get the table schema by table name
async fn table(&self, name: &TableName) -> Result<RelationDesc, Error> {
async fn table(&self, name: &TableName) -> Result<TableDesc, Error> {
let id = self.table_id_from_name(name).await?;
self.table_from_id(&id).await
}
async fn table_from_id(&self, table_id: &TableId) -> Result<RelationDesc, Error>;
async fn table_from_id(&self, table_id: &TableId) -> Result<TableDesc, Error>;
}
/// managed table source information, query from table info manager and table name manager
@@ -51,7 +79,7 @@ pub struct ManagedTableSource {
#[async_trait::async_trait]
impl FlowTableSource for ManagedTableSource {
async fn table_from_id(&self, table_id: &TableId) -> Result<RelationDesc, Error> {
async fn table_from_id(&self, table_id: &TableId) -> Result<TableDesc, Error> {
let table_info_value = self
.get_table_info_value(table_id)
.await?
@@ -175,7 +203,7 @@ impl ManagedTableSource {
pub async fn get_table_name_schema(
&self,
table_id: &TableId,
) -> Result<(TableName, RelationDesc), Error> {
) -> Result<(TableName, TableDesc), Error> {
let table_info_value = self
.get_table_info_value(table_id)
.await?
@@ -219,7 +247,7 @@ pub(crate) mod test {
use crate::repr::{ColumnType, RelationType};
pub struct FlowDummyTableSource {
pub id_names_to_desc: Vec<(TableId, TableName, RelationDesc)>,
pub id_names_to_desc: Vec<(TableId, TableName, TableDesc)>,
id_to_idx: HashMap<TableId, usize>,
name_to_idx: HashMap<TableName, usize>,
}
@@ -234,8 +262,10 @@ pub(crate) mod test {
"public".to_string(),
"numbers".to_string(),
],
RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)])
.into_named(vec![Some("number".to_string())]),
TableDesc::new_no_default(
RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)])
.into_named(vec![Some("number".to_string())]),
),
),
(
1025,
@@ -244,11 +274,13 @@ pub(crate) mod test {
"public".to_string(),
"numbers_with_ts".to_string(),
],
RelationType::new(vec![
ColumnType::new(CDT::uint32_datatype(), false),
ColumnType::new(CDT::timestamp_millisecond_datatype(), false),
])
.into_named(vec![Some("number".to_string()), Some("ts".to_string())]),
TableDesc::new_no_default(
RelationType::new(vec![
ColumnType::new(CDT::uint32_datatype(), false),
ColumnType::new(CDT::timestamp_millisecond_datatype(), false),
])
.into_named(vec![Some("number".to_string()), Some("ts".to_string())]),
),
),
];
let id_to_idx = id_names_to_desc
@@ -271,7 +303,7 @@ pub(crate) mod test {
#[async_trait::async_trait]
impl FlowTableSource for FlowDummyTableSource {
async fn table_from_id(&self, table_id: &TableId) -> Result<RelationDesc, Error> {
async fn table_from_id(&self, table_id: &TableId) -> Result<TableDesc, Error> {
let idx = self.id_to_idx.get(table_id).context(TableNotFoundSnafu {
name: format!("Table id = {:?}, couldn't found table desc", table_id),
})?;

View File

@@ -27,6 +27,7 @@ use session::context::QueryContextBuilder;
use snafu::{OptionExt, ResultExt};
use table::table_reference::TableReference;
use crate::adapter::table_source::TableDesc;
use crate::adapter::{TableName, AUTO_CREATED_PLACEHOLDER_TS_COL};
use crate::error::{Error, ExternalSnafu, UnexpectedSnafu};
use crate::repr::{ColumnType, RelationDesc, RelationType};
@@ -126,7 +127,7 @@ impl FlowWorkerManager {
pub fn table_info_value_to_relation_desc(
table_info_value: TableInfoValue,
) -> Result<RelationDesc, Error> {
) -> Result<TableDesc, Error> {
let raw_schema = table_info_value.table_info.meta.schema;
let (column_types, col_names): (Vec<_>, Vec<_>) = raw_schema
.column_schemas
@@ -147,8 +148,7 @@ pub fn table_info_value_to_relation_desc(
let keys = vec![crate::repr::Key::from(key)];
let time_index = raw_schema.timestamp_index;
Ok(RelationDesc {
let relation_desc = RelationDesc {
typ: RelationType {
column_types,
keys,
@@ -157,7 +157,14 @@ pub fn table_info_value_to_relation_desc(
auto_columns: vec![],
},
names: col_names,
})
};
let default_values = raw_schema
.column_schemas
.iter()
.map(|c| c.default_constraint().cloned())
.collect_vec();
Ok(TableDesc::new(relation_desc, default_values))
}
pub fn from_proto_to_data_type(

View File

@@ -925,6 +925,20 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Unexpected impure default value with region_id: {}, column: {}, default_value: {}",
region_id,
column,
default_value
))]
UnexpectedImpureDefault {
#[snafu(implicit)]
location: Location,
region_id: RegionId,
column: String,
default_value: String,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -964,7 +978,8 @@ impl ErrorExt for Error {
| InvalidParquet { .. }
| OperateAbortedIndex { .. }
| UnexpectedReplay { .. }
| IndexEncodeNull { .. } => StatusCode::Unexpected,
| IndexEncodeNull { .. }
| UnexpectedImpureDefault { .. } => StatusCode::Unexpected,
RegionNotFound { .. } => StatusCode::RegionNotFound,
ObjectStoreNotFound { .. }
| InvalidScanIndex { .. }

View File

@@ -42,7 +42,7 @@ use tokio::sync::oneshot::{self, Receiver, Sender};
use crate::error::{
CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
FlushRegionSnafu, InvalidRequestSnafu, Result,
FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedImpureDefaultSnafu,
};
use crate::manifest::action::RegionEdit;
use crate::memtable::MemtableId;
@@ -333,6 +333,14 @@ impl WriteRequest {
}
OpType::Put => {
// For put requests, we use the default value from column schema.
if column.column_schema.is_default_impure() {
UnexpectedImpureDefaultSnafu {
region_id: self.region_id,
column: &column.column_schema.name,
default_value: format!("{:?}", column.column_schema.default_constraint()),
}
.fail()?
}
column
.column_schema
.create_default()
@@ -1039,6 +1047,57 @@ mod tests {
check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
}
#[test]
fn test_fill_impure_columns_err() {
let rows = Rows {
schema: vec![new_column_schema(
"k0",
ColumnDataType::Int64,
SemanticType::Tag,
)],
rows: vec![Row {
values: vec![i64_value(1)],
}],
};
let metadata = {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
builder
.push_column_metadata(ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_default_constraint(Some(ColumnDefaultConstraint::Function(
"now()".to_string(),
)))
.unwrap(),
semantic_type: SemanticType::Timestamp,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema::new(
"k0",
ConcreteDataType::int64_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 2,
})
.primary_key(vec![2]);
builder.build().unwrap()
};
let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap();
let err = request.check_schema(&metadata).unwrap_err();
assert!(err.is_fill_default());
assert!(request
.fill_missing_columns(&metadata)
.unwrap_err()
.to_string()
.contains("Unexpected impure default value with region_id"));
}
#[test]
fn test_fill_missing_columns() {
let rows = Rows {

View File

@@ -47,7 +47,7 @@ use store_api::metric_engine_consts::{
};
use store_api::mito_engine_options::{APPEND_MODE_KEY, MERGE_MODE_KEY};
use store_api::storage::{RegionId, TableId};
use table::metadata::TableInfoRef;
use table::metadata::TableInfo;
use table::requests::{InsertRequest as TableInsertRequest, AUTO_CREATE_TABLE_KEY, TTL_KEY};
use table::table_reference::TableReference;
use table::TableRef;
@@ -59,7 +59,9 @@ use crate::error::{
use crate::expr_factory::CreateExprFactory;
use crate::region_req_factory::RegionRequestFactory;
use crate::req_convert::common::preprocess_row_insert_requests;
use crate::req_convert::insert::{ColumnToRow, RowToRegion, StatementToRegion, TableToRegion};
use crate::req_convert::insert::{
fill_reqs_with_impure_default, ColumnToRow, RowToRegion, StatementToRegion, TableToRegion,
};
use crate::statement::StatementExecutor;
pub struct Inserter {
@@ -202,18 +204,26 @@ impl Inserter {
});
validate_column_count_match(&requests)?;
let (tables_info, instant_table_ids) = self
let CreateAlterTableResult {
instant_table_ids,
table_infos,
} = self
.create_or_alter_tables_on_demand(&requests, &ctx, create_type, statement_executor)
.await?;
let name_to_info = table_infos
.values()
.map(|info| (info.name.clone(), info.clone()))
.collect::<HashMap<_, _>>();
let inserts = RowToRegion::new(
tables_info,
name_to_info,
instant_table_ids,
self.partition_manager.as_ref(),
)
.convert(requests)
.await?;
self.do_request(inserts, &ctx).await
self.do_request(inserts, &table_infos, &ctx).await
}
/// Handles row inserts request with metric engine.
@@ -238,7 +248,10 @@ impl Inserter {
.await?;
// check and create logical tables
let (tables_info, instant_table_ids) = self
let CreateAlterTableResult {
instant_table_ids,
table_infos,
} = self
.create_or_alter_tables_on_demand(
&requests,
&ctx,
@@ -246,11 +259,15 @@ impl Inserter {
statement_executor,
)
.await?;
let inserts = RowToRegion::new(tables_info, instant_table_ids, &self.partition_manager)
let name_to_info = table_infos
.values()
.map(|info| (info.name.clone(), info.clone()))
.collect::<HashMap<_, _>>();
let inserts = RowToRegion::new(name_to_info, instant_table_ids, &self.partition_manager)
.convert(requests)
.await?;
self.do_request(inserts, &ctx).await
self.do_request(inserts, &table_infos, &ctx).await
}
pub async fn handle_table_insert(
@@ -271,7 +288,10 @@ impl Inserter {
.convert(request)
.await?;
self.do_request(inserts, &ctx).await
let table_infos =
HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
self.do_request(inserts, &table_infos, &ctx).await
}
pub async fn handle_statement_insert(
@@ -279,12 +299,15 @@ impl Inserter {
insert: &Insert,
ctx: &QueryContextRef,
) -> Result<Output> {
let inserts =
let (inserts, table_info) =
StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
.convert(insert, ctx)
.await?;
self.do_request(inserts, ctx).await
let table_infos =
HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
self.do_request(inserts, &table_infos, ctx).await
}
}
@@ -292,8 +315,12 @@ impl Inserter {
async fn do_request(
&self,
requests: InstantAndNormalInsertRequests,
table_infos: &HashMap<TableId, Arc<TableInfo>>,
ctx: &QueryContextRef,
) -> Result<Output> {
// Fill impure default values in the request
let requests = fill_reqs_with_impure_default(table_infos, requests)?;
let write_cost = write_meter!(
ctx.current_catalog(),
ctx.current_schema(),
@@ -497,14 +524,15 @@ impl Inserter {
ctx: &QueryContextRef,
auto_create_table_type: AutoCreateTableType,
statement_executor: &StatementExecutor,
) -> Result<(HashMap<String, TableInfoRef>, HashSet<TableId>)> {
) -> Result<CreateAlterTableResult> {
let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
.with_label_values(&[auto_create_table_type.as_str()])
.start_timer();
let catalog = ctx.current_catalog();
let schema = ctx.current_schema();
let mut tables_info = HashMap::with_capacity(requests.inserts.len());
let mut table_infos = HashMap::new();
// If `auto_create_table` hint is disabled, skip creating/altering tables.
let auto_create_table_hint = ctx
.extension(AUTO_CREATE_TABLE_KEY)
@@ -533,9 +561,13 @@ impl Inserter {
if table_info.is_ttl_instant_table() {
instant_table_ids.insert(table_info.table_id());
}
tables_info.insert(table_info.name.clone(), table_info);
table_infos.insert(table_info.table_id(), table.table_info());
}
return Ok((tables_info, instant_table_ids));
let ret = CreateAlterTableResult {
instant_table_ids,
table_infos,
};
return Ok(ret);
}
let mut create_tables = vec![];
@@ -549,7 +581,7 @@ impl Inserter {
if table_info.is_ttl_instant_table() {
instant_table_ids.insert(table_info.table_id());
}
tables_info.insert(table_info.name.clone(), table_info);
table_infos.insert(table_info.table_id(), table.table_info());
if let Some(alter_expr) =
self.get_alter_table_expr_on_demand(req, &table, ctx)?
{
@@ -577,7 +609,7 @@ impl Inserter {
if table_info.is_ttl_instant_table() {
instant_table_ids.insert(table_info.table_id());
}
tables_info.insert(table_info.name.clone(), table_info);
table_infos.insert(table_info.table_id(), table.table_info());
}
}
if !alter_tables.is_empty() {
@@ -600,7 +632,7 @@ impl Inserter {
if table_info.is_ttl_instant_table() {
instant_table_ids.insert(table_info.table_id());
}
tables_info.insert(table_info.name.clone(), table_info);
table_infos.insert(table_info.table_id(), table.table_info());
}
for alter_expr in alter_tables.into_iter() {
statement_executor
@@ -610,7 +642,10 @@ impl Inserter {
}
}
Ok((tables_info, instant_table_ids))
Ok(CreateAlterTableResult {
instant_table_ids,
table_infos,
})
}
async fn create_physical_table_on_demand(
@@ -872,3 +907,11 @@ fn build_create_table_expr(
) -> Result<CreateTableExpr> {
CreateExprFactory.create_table_expr_by_column_schemas(table, request_schema, engine, None)
}
/// Result of `create_or_alter_tables_on_demand`.
struct CreateAlterTableResult {
/// table ids of ttl=instant tables.
instant_table_ids: HashSet<TableId>,
/// Table Info of the created tables.
table_infos: HashMap<TableId, Arc<TableInfo>>,
}

View File

@@ -13,12 +13,14 @@
// limitations under the License.
mod column_to_row;
mod fill_impure_default;
mod row_to_region;
mod stmt_to_region;
mod table_to_region;
use api::v1::SemanticType;
pub use column_to_row::ColumnToRow;
pub use fill_impure_default::fill_reqs_with_impure_default;
pub use row_to_region::RowToRegion;
use snafu::{OptionExt, ResultExt};
pub use stmt_to_region::StatementToRegion;

View File

@@ -0,0 +1,242 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Util functions to help with fill impure default values columns in request
use std::sync::Arc;
use ahash::{HashMap, HashMapExt, HashSet};
use datatypes::schema::ColumnSchema;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId};
use table::metadata::{TableInfo, TableInfoRef};
use crate::error::{ConvertColumnDefaultConstraintSnafu, Result, UnexpectedSnafu};
use crate::expr_factory::column_schemas_to_defs;
use crate::insert::InstantAndNormalInsertRequests;
/// Find all columns that have impure default values
pub fn find_all_impure_columns(table_info: &TableInfo) -> Vec<ColumnSchema> {
let columns = table_info.meta.schema.column_schemas();
columns
.iter()
.filter(|column| column.is_default_impure())
.cloned()
.collect()
}
/// Fill impure default values in the request
pub struct ImpureDefaultFiller {
impure_columns: HashMap<String, (api::v1::ColumnSchema, Option<api::v1::Value>)>,
}
impl ImpureDefaultFiller {
pub fn new(table_info: TableInfoRef) -> Result<Self> {
let impure_column_list = find_all_impure_columns(&table_info);
let pks = &table_info.meta.primary_key_indices;
let pk_names = pks
.iter()
.map(|&i| table_info.meta.schema.column_name_by_index(i).to_string())
.collect::<Vec<_>>();
let mut impure_columns = HashMap::new();
for column in impure_column_list {
let default_value = column
.create_impure_default()
.with_context(|_| ConvertColumnDefaultConstraintSnafu {
column_name: column.name.clone(),
})?
.with_context(|| UnexpectedSnafu {
violated: format!(
"Expect default value to be impure, found {:?}",
column.default_constraint()
),
})?;
let grpc_default_value = api::helper::to_proto_value(default_value);
let def = column_schemas_to_defs(vec![column], &pk_names)?.swap_remove(0);
let grpc_column_schema = api::v1::ColumnSchema {
column_name: def.name,
datatype: def.data_type,
semantic_type: def.semantic_type,
datatype_extension: def.datatype_extension,
options: def.options,
};
impure_columns.insert(
grpc_column_schema.column_name.clone(),
(grpc_column_schema, grpc_default_value),
);
}
Ok(Self { impure_columns })
}
/// Fill impure default values in the request
pub fn fill_rows(&self, rows: &mut api::v1::Rows) {
let impure_columns_in_reqs: HashSet<_> = rows
.schema
.iter()
.filter_map(|schema| {
if self.impure_columns.contains_key(&schema.column_name) {
Some(&schema.column_name)
} else {
None
}
})
.collect();
if self.impure_columns.len() == impure_columns_in_reqs.len() {
return;
}
let (schema_append, row_append): (Vec<_>, Vec<_>) = self
.impure_columns
.iter()
.filter_map(|(name, (schema, val))| {
if !impure_columns_in_reqs.contains(name) {
Some((schema.clone(), val.clone().unwrap_or_default()))
} else {
None
}
})
.unzip();
rows.schema.extend(schema_append);
for row in rows.rows.iter_mut() {
row.values.extend_from_slice(row_append.as_slice());
}
}
}
/// Fill impure default values in the request(only for normal insert requests, since instant insert can be filled in flownode directly as a single source of truth)
pub fn fill_reqs_with_impure_default(
table_infos: &HashMap<TableId, Arc<TableInfo>>,
mut inserts: InstantAndNormalInsertRequests,
) -> Result<InstantAndNormalInsertRequests> {
let fillers = table_infos
.iter()
.map(|(table_id, table_info)| {
let table_id = *table_id;
ImpureDefaultFiller::new(table_info.clone()).map(|filler| (table_id, filler))
})
.collect::<Result<HashMap<TableId, ImpureDefaultFiller>>>()?;
let normal_inserts = &mut inserts.normal_requests;
for request in normal_inserts.requests.iter_mut() {
let region_id = RegionId::from(request.region_id);
let table_id = region_id.table_id();
let filler = fillers.get(&table_id).with_context(|| UnexpectedSnafu {
violated: format!("impure default filler for table_id: {} not found", table_id),
})?;
if let Some(rows) = &mut request.rows {
filler.fill_rows(rows);
}
}
Ok(inserts)
}
#[cfg(test)]
mod tests {
use api::v1::value::ValueData;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
use datatypes::value::Value;
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
use super::*;
/// Create a test schema with 3 columns: `[col1 int32, ts timestampmills DEFAULT now(), col2 int32]`.
fn new_test_schema() -> Schema {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true)
.with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Function(
"now()".to_string(),
)))
.unwrap(),
ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true)
.with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Value(
Value::from(1i32),
)))
.unwrap(),
];
SchemaBuilder::try_from(column_schemas)
.unwrap()
.version(123)
.build()
.unwrap()
}
pub fn new_table_info() -> TableInfo {
let schema = Arc::new(new_test_schema());
let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.build()
.unwrap();
TableInfoBuilder::default()
.table_id(10)
.table_version(5)
.name("mytable")
.meta(meta)
.build()
.unwrap()
}
fn column_schema_to_proto(
column_schema: &[ColumnSchema],
pk_names: &[String],
) -> Vec<api::v1::ColumnSchema> {
column_schemas_to_defs(column_schema.to_vec(), pk_names)
.unwrap()
.into_iter()
.map(|def| api::v1::ColumnSchema {
column_name: def.name,
datatype: def.data_type,
semantic_type: def.semantic_type,
datatype_extension: def.datatype_extension,
options: def.options,
})
.collect()
}
#[test]
fn test_impure_append() {
let row = api::v1::Row {
values: vec![api::v1::Value {
value_data: Some(ValueData::I32Value(42)),
}],
};
let schema = new_test_schema().column_schemas()[0].clone();
let col_schemas = column_schema_to_proto(&[schema], &["col1".to_string()]);
let mut rows = api::v1::Rows {
schema: col_schemas,
rows: vec![row],
};
let info = new_table_info();
let filler = ImpureDefaultFiller::new(Arc::new(info)).unwrap();
filler.fill_rows(&mut rows);
assert_eq!(rows.schema[1].column_name, "ts");
assert!(rows.schema.len() == 2 && rows.rows[0].values.len() == 2);
}
}

View File

@@ -25,6 +25,7 @@ use snafu::{ensure, OptionExt, ResultExt};
use sql::statements;
use sql::statements::insert::Insert;
use sqlparser::ast::{ObjectName, Value as SqlValue};
use table::metadata::TableInfoRef;
use table::TableRef;
use crate::error::{
@@ -61,7 +62,7 @@ impl<'a> StatementToRegion<'a> {
&self,
stmt: &Insert,
query_ctx: &QueryContextRef,
) -> Result<InstantAndNormalInsertRequests> {
) -> Result<(InstantAndNormalInsertRequests, TableInfoRef)> {
let (catalog, schema, table_name) = self.get_full_name(stmt.table_name())?;
let table = self.get_table(&catalog, &schema, &table_name).await?;
let table_schema = table.schema();
@@ -137,15 +138,21 @@ impl<'a> StatementToRegion<'a> {
.await?;
let requests = RegionInsertRequests { requests };
if table_info.is_ttl_instant_table() {
Ok(InstantAndNormalInsertRequests {
normal_requests: Default::default(),
instant_requests: requests,
})
Ok((
InstantAndNormalInsertRequests {
normal_requests: Default::default(),
instant_requests: requests,
},
table_info,
))
} else {
Ok(InstantAndNormalInsertRequests {
normal_requests: requests,
instant_requests: Default::default(),
})
Ok((
InstantAndNormalInsertRequests {
normal_requests: requests,
instant_requests: Default::default(),
},
table_info,
))
}
}

View File

@@ -0,0 +1,70 @@
CREATE TABLE bytes_log (
byte INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- event time
TIME INDEX(ts)
);
Affected Rows: 0
CREATE TABLE approx_rate (
rate DOUBLE,
time_window TIMESTAMP,
update_at TIMESTAMP,
TIME INDEX(time_window)
);
Affected Rows: 0
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
date_bin(INTERVAL '30 second', ts) as time_window
from
bytes_log
GROUP BY
time_window;
Affected Rows: 0
INSERT INTO
bytes_log (byte)
VALUES
(NULL),
(300);
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
+--------------------------------------+
| ADMIN FLUSH_FLOW('find_approx_rate') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
-- since ts is default to now(), omit it when querying
SELECT
rate
FROM
approx_rate;
+------+
| rate |
+------+
| 0.0 |
+------+
DROP FLOW find_approx_rate;
Affected Rows: 0
DROP TABLE bytes_log;
Affected Rows: 0
DROP TABLE approx_rate;
Affected Rows: 0

View File

@@ -0,0 +1,41 @@
CREATE TABLE bytes_log (
byte INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- event time
TIME INDEX(ts)
);
CREATE TABLE approx_rate (
rate DOUBLE,
time_window TIMESTAMP,
update_at TIMESTAMP,
TIME INDEX(time_window)
);
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
date_bin(INTERVAL '30 second', ts) as time_window
from
bytes_log
GROUP BY
time_window;
INSERT INTO
bytes_log (byte)
VALUES
(NULL),
(300);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
-- since ts is default to now(), omit it when querying
SELECT
rate
FROM
approx_rate;
DROP FLOW find_approx_rate;
DROP TABLE bytes_log;
DROP TABLE approx_rate;