diff --git a/Cargo.lock b/Cargo.lock index 8181207532..3a8f62aeb6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11228,6 +11228,7 @@ dependencies = [ "greptime-proto", "humantime", "humantime-serde", + "lazy_static", "parquet", "paste", "serde", @@ -11534,6 +11535,7 @@ dependencies = [ "common-runtime", "common-telemetry", "common-test-util", + "common-time", "common-wal", "datafusion", "datafusion-expr", diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index b33f4757c6..3a0c379983 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -33,6 +33,7 @@ futures.workspace = true greptime-proto.workspace = true humantime.workspace = true humantime-serde.workspace = true +lazy_static.workspace = true paste = "1.0" serde.workspace = true snafu.workspace = true diff --git a/src/table/src/table.rs b/src/table/src/table.rs index d0c8ede323..885d9aacf2 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -12,12 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use common_recordbatch::SendableRecordBatchStream; +use datafusion::execution::FunctionRegistry; +use datafusion::logical_expr::expr::ScalarFunction; +use datafusion::logical_expr::Cast; +use datafusion::prelude::SessionContext; use datafusion_expr::expr::Expr; -use datatypes::schema::{ColumnSchema, SchemaRef}; +use datatypes::data_type::DataType; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::constraint::{CURRENT_TIMESTAMP, CURRENT_TIMESTAMP_FN, NOW_FN}; +use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaRef}; +use lazy_static::lazy_static; use snafu::ResultExt; use store_api::data_source::DataSourceRef; use store_api::storage::ScanRequest; @@ -30,6 +38,20 @@ mod metrics; pub mod numbers; pub mod scan; +lazy_static! { + /// The [`Expr`] to call UDF function `now()`. + static ref NOW_EXPR: Expr = { + let ctx = SessionContext::new(); + + let now_udf = ctx.udf("now").expect("now UDF not found"); + + Expr::ScalarFunction(ScalarFunction { + func: now_udf, + args: vec![], + }) + }; +} + pub type TableRef = Arc; /// Table handle. @@ -37,6 +59,8 @@ pub struct Table { table_info: TableInfoRef, filter_pushdown: FilterPushDownType, data_source: DataSourceRef, + /// Columns default [`Expr`] + column_defaults: HashMap, } impl Table { @@ -46,12 +70,18 @@ impl Table { data_source: DataSourceRef, ) -> Self { Self { + column_defaults: collect_column_defaults(table_info.meta.schema.column_schemas()), table_info, filter_pushdown, data_source, } } + /// Get column default [`Expr`], if available. + pub fn get_column_default(&self, column: &str) -> Option<&Expr> { + self.column_defaults.get(column) + } + pub fn data_source(&self) -> DataSourceRef { self.data_source.clone() } @@ -113,3 +143,84 @@ impl Table { .map(|(_, c)| c.clone()) } } + +/// Collects column default [`Expr`] from column schemas. +fn collect_column_defaults(column_schemas: &[ColumnSchema]) -> HashMap { + column_schemas + .iter() + .filter_map(|column_schema| { + default_constraint_to_expr( + column_schema.default_constraint()?, + &column_schema.data_type, + ) + .map(|expr| (column_schema.name.to_string(), expr)) + }) + .collect() +} + +/// Try to cast the [`ColumnDefaultConstraint`] to [`Expr`] by the target data type. +fn default_constraint_to_expr( + default_constraint: &ColumnDefaultConstraint, + target_type: &ConcreteDataType, +) -> Option { + match default_constraint { + ColumnDefaultConstraint::Value(v) => { + v.try_to_scalar_value(target_type).ok().map(Expr::Literal) + } + + ColumnDefaultConstraint::Function(name) + if matches!( + name.as_str(), + CURRENT_TIMESTAMP | CURRENT_TIMESTAMP_FN | NOW_FN + ) => + { + Some(Expr::Cast(Cast { + expr: Box::new(NOW_EXPR.clone()), + data_type: target_type.as_arrow_type(), + })) + } + + ColumnDefaultConstraint::Function(_) => None, + } +} + +#[cfg(test)] +mod tests { + use datafusion_common::ScalarValue; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::ColumnDefaultConstraint; + + use super::*; + + #[test] + fn test_collect_columns_defaults() { + let column_schemas = vec![ + ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false), + ColumnSchema::new("col2", ConcreteDataType::string_datatype(), true) + .with_default_constraint(Some(ColumnDefaultConstraint::Value("test".into()))) + .unwrap(), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true) + .with_default_constraint(Some(ColumnDefaultConstraint::Function( + "current_timestamp".to_string(), + ))) + .unwrap(), + ]; + let column_defaults = collect_column_defaults(&column_schemas[..]); + + assert!(!column_defaults.contains_key("col1")); + assert!(matches!(column_defaults.get("col2").unwrap(), + Expr::Literal(ScalarValue::Utf8(Some(s))) if s == "test")); + assert!(matches!( + column_defaults.get("ts").unwrap(), + Expr::Cast(Cast { + expr, + data_type + }) if **expr == *NOW_EXPR && *data_type == ConcreteDataType::timestamp_millisecond_datatype().as_arrow_type() + )); + } +} diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs index 8df1fe9d8b..13c10fabe4 100644 --- a/src/table/src/table/adapter.rs +++ b/src/table/src/table/adapter.rs @@ -77,6 +77,10 @@ impl TableProvider for DfTableProviderAdapter { } } + fn get_column_default(&self, column: &str) -> Option<&Expr> { + self.table.get_column_default(column) + } + async fn scan( &self, _ctx: &SessionState, diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 905b03bdd1..3dbe3f79f2 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -35,6 +35,7 @@ common-recordbatch.workspace = true common-runtime.workspace = true common-telemetry.workspace = true common-test-util.workspace = true +common-time.workspace = true common-wal.workspace = true datanode = { workspace = true } datatypes.workspace = true diff --git a/tests-integration/tests/sql.rs b/tests-integration/tests/sql.rs index 672f2891c5..ba3d847b64 100644 --- a/tests-integration/tests/sql.rs +++ b/tests-integration/tests/sql.rs @@ -144,7 +144,7 @@ pub async fn test_mysql_crud(store_type: StorageType) { .unwrap(); sqlx::query( - "create table demo(i bigint, ts timestamp time index, d date, dt datetime, b blob)", + "create table demo(i bigint, ts timestamp time index default current_timestamp, d date default null, dt datetime default null, b blob default null)", ) .execute(&pool) .await @@ -234,6 +234,26 @@ pub async fn test_mysql_crud(store_type: StorageType) { .unwrap(); assert_eq!(rows.len(), 0); + // test prepare with default columns + sqlx::query("insert into demo(i) values(?)") + .bind(99) + .execute(&pool) + .await + .unwrap(); + let rows = sqlx::query("select * from demo") + .fetch_all(&pool) + .await + .unwrap(); + assert_eq!(rows.len(), 1); + + for row in rows { + let i: i64 = row.get("i"); + let ts: DateTime = row.get("ts"); + let now = common_time::util::current_time_millis(); + assert!(now - ts.timestamp_millis() < 1000); + assert_eq!(i, 99); + } + let _ = fe_mysql_server.shutdown().await; guard.remove_all().await; }