diff --git a/src/flow/src/adapter/util.rs b/src/flow/src/adapter/util.rs index 61ffc8014d..ceaa0c168d 100644 --- a/src/flow/src/adapter/util.rs +++ b/src/flow/src/adapter/util.rs @@ -20,7 +20,7 @@ use api::v1::{ColumnDataType, ColumnDataTypeExtension, CreateTableExpr, Semantic use common_error::ext::BoxedError; use common_meta::key::table_info::TableInfoValue; use datatypes::prelude::ConcreteDataType; -use datatypes::schema::ColumnSchema; +use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema}; use itertools::Itertools; use operator::expr_helper; use session::context::QueryContextBuilder; @@ -174,7 +174,15 @@ pub fn table_info_value_to_relation_desc( let default_values = raw_schema .column_schemas .iter() - .map(|c| c.default_constraint().cloned()) + .map(|c| { + c.default_constraint().cloned().or_else(|| { + if c.is_nullable() { + Some(ColumnDefaultConstraint::null_value()) + } else { + None + } + }) + }) .collect_vec(); Ok(TableDesc::new(relation_desc, default_values)) diff --git a/tests/cases/standalone/common/flow/flow_null.result b/tests/cases/standalone/common/flow/flow_null.result index 9ec66eb619..aaa151c51e 100644 --- a/tests/cases/standalone/common/flow/flow_null.result +++ b/tests/cases/standalone/common/flow/flow_null.result @@ -229,3 +229,106 @@ DROP TABLE ngx_country; Affected Rows: 0 +-- test nullable pk with no default value +CREATE TABLE nullable_pk ( + pid INT NULL, + client STRING, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY (pid) +) WITH ( + append_mode = 'true' +); + +Affected Rows: 0 + +CREATE TABLE out_nullable_pk ( + pid INT NULL, + client STRING, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY (pid, client) +); + +Affected Rows: 0 + +CREATE FLOW calc_nullable_pk SINK TO out_nullable_pk AS +SELECT + pid, + client, + ts +FROM + nullable_pk; + +Affected Rows: 0 + +INSERT INTO + nullable_pk +VALUES + (1, "name1", "2024-10-18 19:00:00"), + (2, "name2", "2024-10-18 19:00:00"), + (3, "name3", "2024-10-18 19:00:00"); + +Affected Rows: 3 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_nullable_pk'); + ++--------------------------------------+ +| ADMIN FLUSH_FLOW('calc_nullable_pk') | ++--------------------------------------+ +| FLOW_FLUSHED | ++--------------------------------------+ + +SELECT * FROM out_nullable_pk; + ++-----+--------+---------------------+ +| pid | client | ts | ++-----+--------+---------------------+ +| 1 | name1 | 2024-10-18T19:00:00 | +| 2 | name2 | 2024-10-18T19:00:00 | +| 3 | name3 | 2024-10-18T19:00:00 | ++-----+--------+---------------------+ + +-- pk is nullable +INSERT INTO + nullable_pk (client, ts) +VALUES + ("name1", "2024-10-18 19:00:00"), + ("name2", "2024-10-18 19:00:00"), + ("name3", "2024-10-18 19:00:00"); + +Affected Rows: 3 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_nullable_pk'); + ++--------------------------------------+ +| ADMIN FLUSH_FLOW('calc_nullable_pk') | ++--------------------------------------+ +| FLOW_FLUSHED | ++--------------------------------------+ + +SELECT * FROM out_nullable_pk; + ++-----+--------+---------------------+ +| pid | client | ts | ++-----+--------+---------------------+ +| | name1 | 2024-10-18T19:00:00 | +| | name2 | 2024-10-18T19:00:00 | +| | name3 | 2024-10-18T19:00:00 | +| 1 | name1 | 2024-10-18T19:00:00 | +| 2 | name2 | 2024-10-18T19:00:00 | +| 3 | name3 | 2024-10-18T19:00:00 | ++-----+--------+---------------------+ + +DROP FLOW calc_nullable_pk; + +Affected Rows: 0 + +DROP TABLE nullable_pk; + +Affected Rows: 0 + +DROP TABLE out_nullable_pk; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_null.sql b/tests/cases/standalone/common/flow/flow_null.sql index 89ccd366cb..b2bdfd74df 100644 --- a/tests/cases/standalone/common/flow/flow_null.sql +++ b/tests/cases/standalone/common/flow/flow_null.sql @@ -133,3 +133,59 @@ DROP FLOW calc_ngx_country; DROP TABLE ngx_access_log; DROP TABLE ngx_country; + +-- test nullable pk with no default value +CREATE TABLE nullable_pk ( + pid INT NULL, + client STRING, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY (pid) +) WITH ( + append_mode = 'true' +); + +CREATE TABLE out_nullable_pk ( + pid INT NULL, + client STRING, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY (pid, client) +); + +CREATE FLOW calc_nullable_pk SINK TO out_nullable_pk AS +SELECT + pid, + client, + ts +FROM + nullable_pk; + +INSERT INTO + nullable_pk +VALUES + (1, "name1", "2024-10-18 19:00:00"), + (2, "name2", "2024-10-18 19:00:00"), + (3, "name3", "2024-10-18 19:00:00"); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_nullable_pk'); + +SELECT * FROM out_nullable_pk; + +-- pk is nullable +INSERT INTO + nullable_pk (client, ts) +VALUES + ("name1", "2024-10-18 19:00:00"), + ("name2", "2024-10-18 19:00:00"), + ("name3", "2024-10-18 19:00:00"); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_nullable_pk'); + +SELECT * FROM out_nullable_pk; + +DROP FLOW calc_nullable_pk; + +DROP TABLE nullable_pk; + +DROP TABLE out_nullable_pk;