fix: prepare inserting with column defaults not work, #4244 (#4272)

* fix: prepare inserting with column defaults not work, #4244

* fix: build column_defaults every time when creating adapters

* feat: cache the column_defaults in table

* test: assert ts column

* fix: unit

* chore: style

Co-authored-by: Yingwen <realevenyag@gmail.com>

* fix: typo

* chore: style

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
dennis zhuang
2024-07-04 23:50:12 -07:00
committed by GitHub
parent d9efa564ee
commit d2f6daf7b7
6 changed files with 142 additions and 3 deletions

2
Cargo.lock generated
View File

@@ -11228,6 +11228,7 @@ dependencies = [
"greptime-proto",
"humantime",
"humantime-serde",
"lazy_static",
"parquet",
"paste",
"serde",
@@ -11534,6 +11535,7 @@ dependencies = [
"common-runtime",
"common-telemetry",
"common-test-util",
"common-time",
"common-wal",
"datafusion",
"datafusion-expr",

View File

@@ -33,6 +33,7 @@ futures.workspace = true
greptime-proto.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
lazy_static.workspace = true
paste = "1.0"
serde.workspace = true
snafu.workspace = true

View File

@@ -12,12 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use common_recordbatch::SendableRecordBatchStream;
use datafusion::execution::FunctionRegistry;
use datafusion::logical_expr::expr::ScalarFunction;
use datafusion::logical_expr::Cast;
use datafusion::prelude::SessionContext;
use datafusion_expr::expr::Expr;
use datatypes::schema::{ColumnSchema, SchemaRef};
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::constraint::{CURRENT_TIMESTAMP, CURRENT_TIMESTAMP_FN, NOW_FN};
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaRef};
use lazy_static::lazy_static;
use snafu::ResultExt;
use store_api::data_source::DataSourceRef;
use store_api::storage::ScanRequest;
@@ -30,6 +38,20 @@ mod metrics;
pub mod numbers;
pub mod scan;
lazy_static! {
/// The [`Expr`] to call UDF function `now()`.
static ref NOW_EXPR: Expr = {
let ctx = SessionContext::new();
let now_udf = ctx.udf("now").expect("now UDF not found");
Expr::ScalarFunction(ScalarFunction {
func: now_udf,
args: vec![],
})
};
}
pub type TableRef = Arc<Table>;
/// Table handle.
@@ -37,6 +59,8 @@ pub struct Table {
table_info: TableInfoRef,
filter_pushdown: FilterPushDownType,
data_source: DataSourceRef,
/// Columns default [`Expr`]
column_defaults: HashMap<String, Expr>,
}
impl Table {
@@ -46,12 +70,18 @@ impl Table {
data_source: DataSourceRef,
) -> Self {
Self {
column_defaults: collect_column_defaults(table_info.meta.schema.column_schemas()),
table_info,
filter_pushdown,
data_source,
}
}
/// Get column default [`Expr`], if available.
pub fn get_column_default(&self, column: &str) -> Option<&Expr> {
self.column_defaults.get(column)
}
pub fn data_source(&self) -> DataSourceRef {
self.data_source.clone()
}
@@ -113,3 +143,84 @@ impl Table {
.map(|(_, c)| c.clone())
}
}
/// Collects column default [`Expr`] from column schemas.
fn collect_column_defaults(column_schemas: &[ColumnSchema]) -> HashMap<String, Expr> {
column_schemas
.iter()
.filter_map(|column_schema| {
default_constraint_to_expr(
column_schema.default_constraint()?,
&column_schema.data_type,
)
.map(|expr| (column_schema.name.to_string(), expr))
})
.collect()
}
/// Try to cast the [`ColumnDefaultConstraint`] to [`Expr`] by the target data type.
fn default_constraint_to_expr(
default_constraint: &ColumnDefaultConstraint,
target_type: &ConcreteDataType,
) -> Option<Expr> {
match default_constraint {
ColumnDefaultConstraint::Value(v) => {
v.try_to_scalar_value(target_type).ok().map(Expr::Literal)
}
ColumnDefaultConstraint::Function(name)
if matches!(
name.as_str(),
CURRENT_TIMESTAMP | CURRENT_TIMESTAMP_FN | NOW_FN
) =>
{
Some(Expr::Cast(Cast {
expr: Box::new(NOW_EXPR.clone()),
data_type: target_type.as_arrow_type(),
}))
}
ColumnDefaultConstraint::Function(_) => None,
}
}
#[cfg(test)]
mod tests {
use datafusion_common::ScalarValue;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnDefaultConstraint;
use super::*;
#[test]
fn test_collect_columns_defaults() {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
ColumnSchema::new("col2", ConcreteDataType::string_datatype(), true)
.with_default_constraint(Some(ColumnDefaultConstraint::Value("test".into())))
.unwrap(),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true)
.with_default_constraint(Some(ColumnDefaultConstraint::Function(
"current_timestamp".to_string(),
)))
.unwrap(),
];
let column_defaults = collect_column_defaults(&column_schemas[..]);
assert!(!column_defaults.contains_key("col1"));
assert!(matches!(column_defaults.get("col2").unwrap(),
Expr::Literal(ScalarValue::Utf8(Some(s))) if s == "test"));
assert!(matches!(
column_defaults.get("ts").unwrap(),
Expr::Cast(Cast {
expr,
data_type
}) if **expr == *NOW_EXPR && *data_type == ConcreteDataType::timestamp_millisecond_datatype().as_arrow_type()
));
}
}

View File

@@ -77,6 +77,10 @@ impl TableProvider for DfTableProviderAdapter {
}
}
fn get_column_default(&self, column: &str) -> Option<&Expr> {
self.table.get_column_default(column)
}
async fn scan(
&self,
_ctx: &SessionState,

View File

@@ -35,6 +35,7 @@ common-recordbatch.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
common-test-util.workspace = true
common-time.workspace = true
common-wal.workspace = true
datanode = { workspace = true }
datatypes.workspace = true

View File

@@ -144,7 +144,7 @@ pub async fn test_mysql_crud(store_type: StorageType) {
.unwrap();
sqlx::query(
"create table demo(i bigint, ts timestamp time index, d date, dt datetime, b blob)",
"create table demo(i bigint, ts timestamp time index default current_timestamp, d date default null, dt datetime default null, b blob default null)",
)
.execute(&pool)
.await
@@ -234,6 +234,26 @@ pub async fn test_mysql_crud(store_type: StorageType) {
.unwrap();
assert_eq!(rows.len(), 0);
// test prepare with default columns
sqlx::query("insert into demo(i) values(?)")
.bind(99)
.execute(&pool)
.await
.unwrap();
let rows = sqlx::query("select * from demo")
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(rows.len(), 1);
for row in rows {
let i: i64 = row.get("i");
let ts: DateTime<Utc> = row.get("ts");
let now = common_time::util::current_time_millis();
assert!(now - ts.timestamp_millis() < 1000);
assert_eq!(i, 99);
}
let _ = fe_mysql_server.shutdown().await;
guard.remove_all().await;
}