feat: impl default constraint for column (#273)

* feat: impl default value for column in schema

* test: add test for column's default value

* refactor: rename ColumnDefaultValue to ColumnDefaultConstraint

* fix: timestamp column may be a constant vector

* fix: test_shutdown_pg_server

* fix: typo

Co-authored-by: LFC <bayinamine@gmail.com>

* fix: typo

Co-authored-by: LFC <bayinamine@gmail.com>

* fix: typo

Co-authored-by: LFC <bayinamine@gmail.com>

* chore: use table_info directly

Co-authored-by: LFC <bayinamine@gmail.com>

* refactor: by CR comments

Co-authored-by: LFC <bayinamine@gmail.com>
This commit is contained in:
dennis zhuang
2022-09-22 10:43:21 +08:00
committed by GitHub
parent a954ba862a
commit 5f322ba16e
35 changed files with 938 additions and 349 deletions

1
Cargo.lock generated
View File

@@ -4831,6 +4831,7 @@ name = "sql"
version = "0.1.0"
dependencies = [
"common-error",
"common-time",
"datatypes",
"snafu",
"sqlparser",

View File

@@ -56,6 +56,7 @@ message ColumnDef {
string name = 1;
ColumnDataType datatype = 2;
bool is_nullable = 3;
optional bytes default_constraint = 4;
}
enum ColumnDataType {

View File

@@ -151,7 +151,8 @@ fn build_system_catalog_schema() -> Schema {
];
// The schema of this table must be valid.
SchemaBuilder::from(cols)
SchemaBuilder::try_from(cols)
.unwrap()
.timestamp_index(2)
.build()
.unwrap()

View File

@@ -23,8 +23,11 @@ use snafu::{ensure, OptionExt, ResultExt};
use crate::error;
use crate::{
error::DatanodeSnafu, error::DecodeSelectSnafu, error::EncodePhysicalSnafu,
error::MissingFieldSnafu, Client, Result,
error::{
ConvertSchemaSnafu, DatanodeSnafu, DecodeSelectSnafu, EncodePhysicalSnafu,
MissingFieldSnafu,
},
Client, Result,
};
pub const PROTOCOL_VERSION: u32 = 1;
@@ -194,7 +197,7 @@ impl TryFrom<ObjectResult> for Output {
})
.collect::<Vec<ColumnSchema>>();
let schema = Arc::new(Schema::new(column_schemas));
let schema = Arc::new(Schema::try_new(column_schemas).context(ConvertSchemaSnafu)?);
let recordbatches = if vectors.is_empty() {
RecordBatches::try_new(schema, vec![])
} else {

View File

@@ -79,6 +79,12 @@ pub enum Error {
#[snafu(display("Missing required field in protobuf, field: {}", field))]
MissingField { field: String, backtrace: Backtrace },
#[snafu(display("Failed to convert schema, source: {}", source))]
ConvertSchema {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -97,7 +103,9 @@ impl ErrorExt for Error {
| Error::InvalidColumnProto { .. }
| Error::ColumnDataType { .. }
| Error::MissingField { .. } => StatusCode::Internal,
Error::CreateVector { source } => source.status_code(),
Error::ConvertSchema { source } | Error::CreateVector { source } => {
source.status_code()
}
Error::CreateRecordBatches { source } => source.status_code(),
Error::IllegalGrpcClientState { .. } => StatusCode::Unexpected,
}

View File

@@ -2,7 +2,6 @@ use std::any::Any;
use api::serde::DecodeError;
use common_error::prelude::*;
use datatypes::prelude::ConcreteDataType;
use storage::error::Error as StorageError;
use table::error::Error as TableError;
@@ -68,19 +67,10 @@ pub enum Error {
))]
ColumnValuesNumberMismatch { columns: usize, values: usize },
#[snafu(display("Failed to parse value: {}, {}", msg, backtrace))]
ParseSqlValue { msg: String, backtrace: Backtrace },
#[snafu(display(
"Column {} expect type: {:?}, actual: {:?}",
column_name,
expect,
actual
))]
ColumnTypeMismatch {
column_name: String,
expect: ConcreteDataType,
actual: ConcreteDataType,
#[snafu(display("Failed to parse sql value, source: {}", source))]
ParseSqlValue {
#[snafu(backtrace)]
source: sql::error::Error,
},
#[snafu(display("Failed to insert value to table: {}, source: {}", table_name, source))]
@@ -189,6 +179,12 @@ pub enum Error {
source: api::error::Error,
},
#[snafu(display("Column default constraint error, source: {}", source))]
ColumnDefaultConstraint {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display("Failed to parse SQL, source: {}", source))]
ParseSql {
#[snafu(backtrace)]
@@ -216,23 +212,32 @@ impl ErrorExt for Error {
Error::ExecuteSql { source } => source.status_code(),
Error::ExecutePhysicalPlan { source } => source.status_code(),
Error::NewCatalog { source } => source.status_code(),
Error::CreateTable { source, .. }
| Error::GetTable { source, .. }
| Error::AlterTable { source, .. } => source.status_code(),
Error::Insert { source, .. } => source.status_code(),
Error::ConvertSchema { source, .. } => source.status_code(),
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound,
Error::ParseSqlValue { source, .. } | Error::ParseSql { source, .. } => {
source.status_code()
}
Error::ColumnDefaultConstraint { source, .. }
| Error::CreateSchema { source, .. }
| Error::ConvertSchema { source, .. } => source.status_code(),
Error::ColumnValuesNumberMismatch { .. }
| Error::ParseSqlValue { .. }
| Error::ColumnTypeMismatch { .. }
| Error::IllegalInsertData { .. }
| Error::DecodeInsert { .. }
| Error::InvalidSql { .. }
| Error::CreateSchema { .. }
| Error::KeyColumnNotFound { .. }
| Error::MissingField { .. }
| Error::ConstraintNotSupported { .. } => StatusCode::InvalidArguments,
// TODO(yingwen): Further categorize http error.
Error::StartServer { .. }
| Error::ParseAddr { .. }
@@ -244,7 +249,7 @@ impl ErrorExt for Error {
| Error::IntoPhysicalPlan { .. }
| Error::UnsupportedExpr { .. }
| Error::ColumnDataType { .. } => StatusCode::Internal,
Error::ParseSql { source } => source.status_code(),
Error::InitBackend { .. } => StatusCode::StorageUnavailable,
Error::OpenLogStore { source } => source.status_code(),
Error::StartScriptManager { source } => source.status_code(),

View File

@@ -4,12 +4,13 @@ use api::helper::ColumnDataTypeWrapper;
use api::v1::{alter_expr::Kind, AdminResult, AlterExpr, ColumnDef, CreateExpr};
use common_error::prelude::{ErrorExt, StatusCode};
use common_query::Output;
use datatypes::schema::ColumnDefaultConstraint;
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
use futures::TryFutureExt;
use snafu::prelude::*;
use table::requests::{AlterKind, AlterTableRequest, CreateTableRequest};
use crate::error::{self, MissingFieldSnafu, Result};
use crate::error::{self, ColumnDefaultConstraintSnafu, MissingFieldSnafu, Result};
use crate::instance::Instance;
use crate::server::grpc::handler::AdminResultBuilder;
use crate::sql::SqlRequest;
@@ -131,7 +132,8 @@ fn create_table_schema(expr: &CreateExpr) -> Result<SchemaRef> {
name: &expr.time_index,
})?;
Ok(Arc::new(
SchemaBuilder::from(column_schemas)
SchemaBuilder::try_from(column_schemas)
.context(error::CreateSchemaSnafu)?
.timestamp_index(ts_index)
.build()
.context(error::CreateSchemaSnafu)?,
@@ -145,6 +147,12 @@ fn create_column_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
name: column_def.name.clone(),
data_type: data_type.into(),
is_nullable: column_def.is_nullable,
default_constraint: match &column_def.default_constraint {
None => None,
Some(v) => Some(
ColumnDefaultConstraint::try_from(&v[..]).context(ColumnDefaultConstraintSnafu)?,
),
},
})
}
@@ -154,6 +162,7 @@ mod tests {
use catalog::MIN_USER_TABLE_ID;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;
use super::*;
use crate::tests::test_util;
@@ -206,6 +215,7 @@ mod tests {
name: "a".to_string(),
datatype: 1024,
is_nullable: true,
default_constraint: None,
};
let result = create_column_schema(&column_def);
assert!(result.is_err());
@@ -218,11 +228,28 @@ mod tests {
name: "a".to_string(),
datatype: 12, // string
is_nullable: true,
default_constraint: None,
};
let column_schema = create_column_schema(&column_def).unwrap();
assert_eq!(column_schema.name, "a");
assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype());
assert!(column_schema.is_nullable);
let default_constraint = ColumnDefaultConstraint::Value(Value::from("defaut value"));
let column_def = ColumnDef {
name: "a".to_string(),
datatype: 12, // string
is_nullable: true,
default_constraint: Some(default_constraint.clone().try_into().unwrap()),
};
let column_schema = create_column_schema(&column_def).unwrap();
assert_eq!(column_schema.name, "a");
assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype());
assert!(column_schema.is_nullable);
assert_eq!(
default_constraint,
column_schema.default_constraint.unwrap()
);
}
fn testing_create_expr() -> CreateExpr {
@@ -231,21 +258,25 @@ mod tests {
name: "host".to_string(),
datatype: 12, // string
is_nullable: false,
default_constraint: None,
},
ColumnDef {
name: "ts".to_string(),
datatype: 15, // timestamp
is_nullable: false,
default_constraint: None,
},
ColumnDef {
name: "cpu".to_string(),
datatype: 9, // float32
is_nullable: true,
default_constraint: None,
},
ColumnDef {
name: "memory".to_string(),
datatype: 10, // float64
is_nullable: true,
default_constraint: None,
},
];
CreateExpr {
@@ -267,25 +298,30 @@ mod tests {
name: "host".to_string(),
data_type: ConcreteDataType::string_datatype(),
is_nullable: false,
default_constraint: None,
},
ColumnSchema {
name: "ts".to_string(),
data_type: ConcreteDataType::timestamp_millis_datatype(),
is_nullable: false,
default_constraint: None,
},
ColumnSchema {
name: "cpu".to_string(),
data_type: ConcreteDataType::float32_datatype(),
is_nullable: true,
default_constraint: None,
},
ColumnSchema {
name: "memory".to_string(),
data_type: ConcreteDataType::float64_datatype(),
is_nullable: true,
default_constraint: None,
},
];
Arc::new(
SchemaBuilder::from(column_schemas)
SchemaBuilder::try_from(column_schemas)
.unwrap()
.timestamp_index(1)
.build()
.unwrap(),

View File

@@ -297,7 +297,8 @@ mod tests {
];
Arc::new(
SchemaBuilder::from(column_schemas)
SchemaBuilder::try_from(column_schemas)
.unwrap()
.timestamp_index(3)
.build()
.unwrap(),

View File

@@ -97,7 +97,8 @@ mod tests {
];
Arc::new(
SchemaBuilder::from(column_schemas)
SchemaBuilder::try_from(column_schemas)
.unwrap()
.timestamp_index(3)
.build()
.unwrap(),

View File

@@ -135,7 +135,8 @@ impl SqlHandler {
.collect::<Result<Vec<_>>>()?;
let schema = Arc::new(
SchemaBuilder::from(columns_schemas)
SchemaBuilder::try_from(columns_schemas)
.context(CreateSchemaSnafu)?
.timestamp_index(ts_index)
.build()
.context(CreateSchemaSnafu)?,

View File

@@ -1,20 +1,17 @@
use std::str::FromStr;
use catalog::SchemaProviderRef;
use common_query::Output;
use datatypes::prelude::ConcreteDataType;
use datatypes::prelude::VectorBuilder;
use datatypes::value::Value;
use snafu::ensure;
use snafu::OptionExt;
use snafu::ResultExt;
use sql::ast::Value as SqlValue;
use sql::statements::insert::Insert;
use sql::statements::{self, insert::Insert};
use table::requests::*;
use crate::error::{
ColumnNotFoundSnafu, ColumnTypeMismatchSnafu, ColumnValuesNumberMismatchSnafu, InsertSnafu,
ParseSqlValueSnafu, Result, TableNotFoundSnafu,
ColumnNotFoundSnafu, ColumnValuesNumberMismatchSnafu, InsertSnafu, ParseSqlValueSnafu, Result,
TableNotFoundSnafu,
};
use crate::sql::{SqlHandler, SqlRequest};
@@ -118,217 +115,9 @@ fn add_row_to_vector(
sql_val: &SqlValue,
builder: &mut VectorBuilder,
) -> Result<()> {
let value = parse_sql_value(column_name, data_type, sql_val)?;
let value = statements::sql_value_to_value(column_name, data_type, sql_val)
.context(ParseSqlValueSnafu)?;
builder.push(&value);
Ok(())
}
fn parse_sql_value(
column_name: &str,
data_type: &ConcreteDataType,
sql_val: &SqlValue,
) -> Result<Value> {
Ok(match sql_val {
SqlValue::Number(n, _) => sql_number_to_value(data_type, n)?,
SqlValue::Null => Value::Null,
SqlValue::Boolean(b) => {
ensure!(
data_type.is_boolean(),
ColumnTypeMismatchSnafu {
column_name,
expect: data_type.clone(),
actual: ConcreteDataType::boolean_datatype(),
}
);
(*b).into()
}
SqlValue::DoubleQuotedString(s) | SqlValue::SingleQuotedString(s) => {
ensure!(
data_type.is_string(),
ColumnTypeMismatchSnafu {
column_name,
expect: data_type.clone(),
actual: ConcreteDataType::string_datatype(),
}
);
parse_string_to_value(s.to_owned(), data_type)?
}
_ => todo!("Other sql value"),
})
}
fn parse_string_to_value(s: String, data_type: &ConcreteDataType) -> Result<Value> {
match data_type {
ConcreteDataType::String(_) => Ok(Value::String(s.into())),
ConcreteDataType::Date(_) => {
if let Ok(date) = common_time::date::Date::from_str(&s) {
Ok(Value::Date(date))
} else {
ParseSqlValueSnafu {
msg: format!("Failed to parse {} to Date value", s),
}
.fail()
}
}
ConcreteDataType::DateTime(_) => {
if let Ok(datetime) = common_time::datetime::DateTime::from_str(&s) {
Ok(Value::DateTime(datetime))
} else {
ParseSqlValueSnafu {
msg: format!("Failed to parse {} to DateTime value", s),
}
.fail()
}
}
_ => {
unreachable!()
}
}
}
macro_rules! parse_number_to_value {
($data_type: expr, $n: ident, $(($Type: ident, $PrimitiveType: ident)), +) => {
match $data_type {
$(
ConcreteDataType::$Type(_) => {
let n = parse_sql_number::<$PrimitiveType>($n)?;
Ok(Value::from(n))
},
)+
_ => ParseSqlValueSnafu {
msg: format!("Fail to parse number {}, invalid column type: {:?}",
$n, $data_type
)}.fail(),
}
}
}
fn sql_number_to_value(data_type: &ConcreteDataType, n: &str) -> Result<Value> {
parse_number_to_value!(
data_type,
n,
(UInt8, u8),
(UInt16, u16),
(UInt32, u32),
(UInt64, u64),
(Int8, i8),
(Int16, i16),
(Int32, i32),
(Int64, i64),
(Float64, f64),
(Float32, f32),
(Timestamp, i64)
)
// TODO(hl): also Date/DateTime
}
fn parse_sql_number<R: FromStr + std::fmt::Debug>(n: &str) -> Result<R>
where
<R as FromStr>::Err: std::fmt::Debug,
{
match n.parse::<R>() {
Ok(n) => Ok(n),
Err(e) => ParseSqlValueSnafu {
msg: format!("Fail to parse number {}, {:?}", n, e),
}
.fail(),
}
}
#[cfg(test)]
mod tests {
use datatypes::value::OrderedFloat;
use super::*;
#[test]
fn test_sql_number_to_value() {
let v = sql_number_to_value(&ConcreteDataType::float64_datatype(), "3.0").unwrap();
assert_eq!(Value::Float64(OrderedFloat(3.0)), v);
let v = sql_number_to_value(&ConcreteDataType::int32_datatype(), "999").unwrap();
assert_eq!(Value::Int32(999), v);
let v = sql_number_to_value(&ConcreteDataType::string_datatype(), "999");
assert!(v.is_err(), "parse value error is: {:?}", v);
}
#[test]
fn test_parse_sql_value() {
let sql_val = SqlValue::Null;
assert_eq!(
Value::Null,
parse_sql_value("a", &ConcreteDataType::float64_datatype(), &sql_val).unwrap()
);
let sql_val = SqlValue::Boolean(true);
assert_eq!(
Value::Boolean(true),
parse_sql_value("a", &ConcreteDataType::boolean_datatype(), &sql_val).unwrap()
);
let sql_val = SqlValue::Number("3.0".to_string(), false);
assert_eq!(
Value::Float64(OrderedFloat(3.0)),
parse_sql_value("a", &ConcreteDataType::float64_datatype(), &sql_val).unwrap()
);
let sql_val = SqlValue::Number("3.0".to_string(), false);
let v = parse_sql_value("a", &ConcreteDataType::boolean_datatype(), &sql_val);
assert!(v.is_err());
assert!(format!("{:?}", v)
.contains("Fail to parse number 3.0, invalid column type: Boolean(BooleanType)"));
let sql_val = SqlValue::Boolean(true);
let v = parse_sql_value("a", &ConcreteDataType::float64_datatype(), &sql_val);
assert!(v.is_err());
assert!(format!("{:?}", v).contains(
"column_name: \"a\", expect: Float64(Float64), actual: Boolean(BooleanType)"
));
}
#[test]
pub fn test_parse_date_literal() {
let value = parse_sql_value(
"date",
&ConcreteDataType::date_datatype(),
&SqlValue::DoubleQuotedString("2022-02-22".to_string()),
)
.unwrap();
assert_eq!(ConcreteDataType::date_datatype(), value.data_type());
if let Value::Date(d) = value {
assert_eq!("2022-02-22", d.to_string());
} else {
unreachable!()
}
}
#[test]
pub fn test_parse_datetime_literal() {
let value = parse_sql_value(
"datetime_col",
&ConcreteDataType::datetime_datatype(),
&SqlValue::DoubleQuotedString("2022-02-22 00:01:03".to_string()),
)
.unwrap();
assert_eq!(ConcreteDataType::date_datatype(), value.data_type());
if let Value::DateTime(d) = value {
assert_eq!("2022-02-22 00:01:03", d.to_string());
} else {
unreachable!()
}
}
#[test]
pub fn test_parse_illegal_datetime_literal() {
assert!(parse_sql_value(
"datetime_col",
&ConcreteDataType::datetime_datatype(),
&SqlValue::DoubleQuotedString("2022-02-22 00:01:61".to_string()),
)
.is_err());
}
}

View File

@@ -97,6 +97,7 @@ async fn test_insert_and_select() {
name: "test_column".to_string(),
datatype: ColumnDataType::Int64.into(),
is_nullable: true,
default_constraint: None,
};
let kind = Kind::AddColumn(AddColumn {
column_def: Some(add_column),
@@ -162,21 +163,25 @@ fn testing_create_expr() -> CreateExpr {
name: "host".to_string(),
datatype: 12, // string
is_nullable: false,
default_constraint: None,
},
ColumnDef {
name: "cpu".to_string(),
datatype: 10, // float64
is_nullable: true,
default_constraint: None,
},
ColumnDef {
name: "memory".to_string(),
datatype: 10, // float64
is_nullable: true,
default_constraint: None,
},
ColumnDef {
name: "ts".to_string(),
datatype: 15, // timestamp
is_nullable: true,
default_constraint: None,
},
];
CreateExpr {

View File

@@ -63,7 +63,8 @@ pub async fn create_test_table(instance: &Instance) -> Result<()> {
table_name: table_name.to_string(),
desc: Some(" a test table".to_string()),
schema: Arc::new(
SchemaBuilder::from(column_schemas)
SchemaBuilder::try_from(column_schemas)
.unwrap()
.timestamp_index(3)
.build()
.expect("ts is expected to be timestamp column"),

View File

@@ -12,6 +12,13 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display("Failed to deserialize data, source: {}, json: {}", source, json))]
Deserialize {
source: serde_json::Error,
backtrace: Backtrace,
json: String,
},
#[snafu(display("Failed to convert datafusion type: {}", from))]
Conversion { from: String, backtrace: Backtrace },

View File

@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
pub use arrow::datatypes::Metadata;
@@ -7,7 +7,36 @@ use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use crate::data_type::{ConcreteDataType, DataType};
use crate::error::{self, Error, Result};
use crate::error::{self, DeserializeSnafu, Error, Result, SerializeSnafu};
use crate::value::Value;
/// Column's default constraint.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ColumnDefaultConstraint {
// A function invocation
// TODO(dennis): we save the function expression here, maybe use a struct in future.
Function(String),
// A value
Value(Value),
}
impl TryFrom<&[u8]> for ColumnDefaultConstraint {
type Error = error::Error;
fn try_from(bytes: &[u8]) -> Result<Self> {
let json = String::from_utf8_lossy(bytes);
serde_json::from_str(&json).context(DeserializeSnafu { json })
}
}
impl TryInto<Vec<u8>> for ColumnDefaultConstraint {
type Error = error::Error;
fn try_into(self) -> Result<Vec<u8>> {
let s = serde_json::to_string(&self).context(SerializeSnafu)?;
Ok(s.into_bytes())
}
}
/// Key used to store column name of the timestamp column in metadata.
///
@@ -18,12 +47,15 @@ use crate::error::{self, Error, Result};
const TIMESTAMP_COLUMN_KEY: &str = "greptime:timestamp_column";
/// Key used to store version number of the schema in metadata.
const VERSION_KEY: &str = "greptime:version";
/// Key used to store default constraint in arrow field's metadata.
const ARROW_FIELD_DEFAULT_CONSTRAINT_KEY: &str = "greptime:default_constraint";
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ColumnSchema {
pub name: String,
pub data_type: ConcreteDataType,
pub is_nullable: bool,
pub default_constraint: Option<ColumnDefaultConstraint>,
}
impl ColumnSchema {
@@ -36,8 +68,17 @@ impl ColumnSchema {
name: name.into(),
data_type,
is_nullable,
default_constraint: None,
}
}
pub fn with_default_constraint(
mut self,
default_constraint: Option<ColumnDefaultConstraint>,
) -> Self {
self.default_constraint = default_constraint;
self
}
}
/// A common schema, should be immutable.
@@ -61,9 +102,20 @@ impl Schema {
/// Initial version of the schema.
pub const INITIAL_VERSION: u32 = 0;
/// Create a schema from a vector of [ColumnSchema].
/// # Panics
/// Panics when ColumnSchema's `default_constrait` can't be serialized into json.
pub fn new(column_schemas: Vec<ColumnSchema>) -> Schema {
// Builder won't fail
SchemaBuilder::from(column_schemas).build().unwrap()
SchemaBuilder::try_from(column_schemas)
.unwrap()
.build()
.unwrap()
}
pub fn try_new(column_schemas: Vec<ColumnSchema>) -> Result<Schema> {
// Builder won't fail
Ok(SchemaBuilder::try_from(column_schemas)?.build().unwrap())
}
#[inline]
@@ -137,22 +189,24 @@ pub struct SchemaBuilder {
metadata: Metadata,
}
impl From<Vec<ColumnSchema>> for SchemaBuilder {
fn from(column_schemas: Vec<ColumnSchema>) -> SchemaBuilder {
SchemaBuilder::from_columns(column_schemas)
impl TryFrom<Vec<ColumnSchema>> for SchemaBuilder {
type Error = Error;
fn try_from(column_schemas: Vec<ColumnSchema>) -> Result<SchemaBuilder> {
SchemaBuilder::try_from_columns(column_schemas)
}
}
impl SchemaBuilder {
pub fn from_columns(column_schemas: Vec<ColumnSchema>) -> Self {
let (fields, name_to_index) = collect_fields(&column_schemas);
pub fn try_from_columns(column_schemas: Vec<ColumnSchema>) -> Result<Self> {
let (fields, name_to_index) = collect_fields(&column_schemas)?;
Self {
Ok(Self {
column_schemas,
name_to_index,
fields,
..Default::default()
}
})
}
/// Set timestamp index.
@@ -198,16 +252,16 @@ impl SchemaBuilder {
}
}
fn collect_fields(column_schemas: &[ColumnSchema]) -> (Vec<Field>, HashMap<String, usize>) {
fn collect_fields(column_schemas: &[ColumnSchema]) -> Result<(Vec<Field>, HashMap<String, usize>)> {
let mut fields = Vec::with_capacity(column_schemas.len());
let mut name_to_index = HashMap::with_capacity(column_schemas.len());
for (index, column_schema) in column_schemas.iter().enumerate() {
let field = Field::from(column_schema);
let field = Field::try_from(column_schema)?;
fields.push(field);
name_to_index.insert(column_schema.name.clone(), index);
}
(fields, name_to_index)
Ok((fields, name_to_index))
}
fn validate_timestamp_index(column_schemas: &[ColumnSchema], timestamp_index: usize) -> Result<()> {
@@ -236,22 +290,41 @@ impl TryFrom<&Field> for ColumnSchema {
fn try_from(field: &Field) -> Result<ColumnSchema> {
let data_type = ConcreteDataType::try_from(&field.data_type)?;
let default_constraint = match field.metadata.get(ARROW_FIELD_DEFAULT_CONSTRAINT_KEY) {
Some(json) => Some(serde_json::from_str(json).context(DeserializeSnafu { json })?),
None => None,
};
Ok(ColumnSchema {
name: field.name.clone(),
data_type,
is_nullable: field.is_nullable,
default_constraint,
})
}
}
impl From<&ColumnSchema> for Field {
fn from(column_schema: &ColumnSchema) -> Field {
Field::new(
impl TryFrom<&ColumnSchema> for Field {
type Error = Error;
fn try_from(column_schema: &ColumnSchema) -> Result<Field> {
let metadata = if let Some(value) = &column_schema.default_constraint {
let mut m = BTreeMap::new();
m.insert(
ARROW_FIELD_DEFAULT_CONSTRAINT_KEY.to_string(),
serde_json::to_string(&value).context(SerializeSnafu)?,
);
m
} else {
BTreeMap::default()
};
Ok(Field::new(
column_schema.name.clone(),
column_schema.data_type.as_arrow_type(),
column_schema.is_nullable,
)
.with_metadata(metadata))
}
}
@@ -319,7 +392,7 @@ mod tests {
#[test]
fn test_column_schema() {
let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true);
let field = Field::from(&column_schema);
let field = Field::try_from(&column_schema).unwrap();
assert_eq!("test", field.name);
assert_eq!(ArrowDataType::Int32, field.data_type);
assert!(field.is_nullable);
@@ -328,6 +401,36 @@ mod tests {
assert_eq!(column_schema, new_column_schema);
}
#[test]
fn test_column_schema_with_default_constraint() {
let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
.with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::from(99))));
let field = Field::try_from(&column_schema).unwrap();
assert_eq!("test", field.name);
assert_eq!(ArrowDataType::Int32, field.data_type);
assert!(field.is_nullable);
assert_eq!(
"{\"Value\":{\"Int32\":99}}",
field
.metadata
.get(ARROW_FIELD_DEFAULT_CONSTRAINT_KEY)
.unwrap()
);
let new_column_schema = ColumnSchema::try_from(&field).unwrap();
assert_eq!(column_schema, new_column_schema);
}
#[test]
fn test_column_default_constraint_try_into_from() {
let default_constraint = ColumnDefaultConstraint::Value(Value::from(42i64));
let bytes: Vec<u8> = default_constraint.clone().try_into().unwrap();
let from_value = ColumnDefaultConstraint::try_from(&bytes[..]).unwrap();
assert_eq!(default_constraint, from_value);
}
#[test]
fn test_build_empty_schema() {
let schema = SchemaBuilder::default().build().unwrap();
@@ -370,7 +473,8 @@ mod tests {
ConcreteDataType::int32_datatype(),
false,
)];
let schema = SchemaBuilder::from(column_schemas)
let schema = SchemaBuilder::try_from(column_schemas)
.unwrap()
.add_metadata("k1", "v1")
.build()
.unwrap();
@@ -384,7 +488,8 @@ mod tests {
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false),
];
let schema = SchemaBuilder::from(column_schemas.clone())
let schema = SchemaBuilder::try_from(column_schemas.clone())
.unwrap()
.timestamp_index(1)
.version(123)
.build()
@@ -405,15 +510,18 @@ mod tests {
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false),
];
assert!(SchemaBuilder::from(column_schemas.clone())
assert!(SchemaBuilder::try_from(column_schemas.clone())
.unwrap()
.timestamp_index(0)
.build()
.is_err());
assert!(SchemaBuilder::from(column_schemas.clone())
assert!(SchemaBuilder::try_from(column_schemas.clone())
.unwrap()
.timestamp_index(1)
.build()
.is_err());
assert!(SchemaBuilder::from(column_schemas)
assert!(SchemaBuilder::try_from(column_schemas)
.unwrap()
.timestamp_index(1)
.build()
.is_err());

View File

@@ -42,6 +42,17 @@ pub enum Error {
source: api::error::Error,
},
#[snafu(display(
"Failed to convert column default constraint, column: {}, source: {}",
column_name,
source
))]
ConvertColumnDefaultConstraint {
column_name: String,
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display("Invalid SQL, error: {}", err_msg))]
InvalidSql {
err_msg: String,
@@ -66,6 +77,7 @@ impl ErrorExt for Error {
Error::RuntimeResource { source, .. } => source.status_code(),
Error::StartServer { source, .. } => source.status_code(),
Error::ParseSql { source } => source.status_code(),
Error::ConvertColumnDefaultConstraint { source, .. } => source.status_code(),
Error::ColumnDataType { .. } => StatusCode::Internal,
Error::IllegalFrontendState { .. } => StatusCode::Unexpected,
}

View File

@@ -21,7 +21,7 @@ use sql::statements::statement::Statement;
use sql::statements::{column_def_to_schema, table_idents_to_full_name};
use sql::{dialect::GenericDialect, parser::ParserContext};
use crate::error::{self, Result};
use crate::error::{self, ConvertColumnDefaultConstraintSnafu, Result};
use crate::frontend::FrontendOptions;
pub(crate) type InstanceRef = Arc<Instance>;
@@ -206,15 +206,25 @@ fn columns_to_expr(column_defs: &[ColumnDef]) -> Result<Vec<GrpcColumnDef>> {
})
.collect::<Result<Vec<ColumnDataType>>>()?;
Ok(column_schemas
column_schemas
.iter()
.zip(column_datatypes.into_iter())
.map(|(schema, datatype)| GrpcColumnDef {
name: schema.name.clone(),
datatype: datatype as i32,
is_nullable: schema.is_nullable,
.map(|(schema, datatype)| {
Ok(GrpcColumnDef {
name: schema.name.clone(),
datatype: datatype as i32,
is_nullable: schema.is_nullable,
default_constraint: match &schema.default_constraint {
None => None,
Some(v) => Some(v.clone().try_into().context(
ConvertColumnDefaultConstraintSnafu {
column_name: &schema.name,
},
)?),
},
})
})
.collect::<Vec<GrpcColumnDef>>())
.collect()
}
#[async_trait]
@@ -257,6 +267,8 @@ mod tests {
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datanode::datanode::{DatanodeOptions, ObjectStoreConfig};
use datanode::instance::Instance as DatanodeInstance;
use datatypes::schema::ColumnDefaultConstraint;
use datatypes::value::Value;
use servers::grpc::GrpcServer;
use tempdir::TempDir;
use tonic::transport::{Endpoint, Server};
@@ -276,6 +288,7 @@ mod tests {
ts TIMESTAMP,
cpu DOUBLE NULL,
memory DOUBLE NULL,
disk_util DOUBLE DEFAULT 9.9,
TIME INDEX (ts),
PRIMARY KEY(ts, host)
) engine=mito with(regions=1);"#;
@@ -314,13 +327,13 @@ mod tests {
let pretty_print = arrow_print::write(&recordbatches);
let pretty_print = pretty_print.lines().collect::<Vec<&str>>();
let expected = vec![
"+----------------+---------------------+-----+--------+",
"| host | ts | cpu | memory |",
"+----------------+---------------------+-----+--------+",
"| frontend.host1 | 1970-01-01 00:00:01 | 1.1 | 100 |",
"| frontend.host2 | 1970-01-01 00:00:02 | | |",
"| frontend.host3 | 1970-01-01 00:00:03 | 3.3 | 300 |",
"+----------------+---------------------+-----+--------+",
"+----------------+---------------------+-----+--------+-----------+",
"| host | ts | cpu | memory | disk_util |",
"+----------------+---------------------+-----+--------+-----------+",
"| frontend.host1 | 1970-01-01 00:00:01 | 1.1 | 100 | 9.9 |",
"| frontend.host2 | 1970-01-01 00:00:02 | | | 9.9 |",
"| frontend.host3 | 1970-01-01 00:00:03 | 3.3 | 300 | 9.9 |",
"+----------------+---------------------+-----+--------+-----------+",
];
assert_eq!(pretty_print, expected);
}
@@ -341,12 +354,12 @@ mod tests {
let pretty_print = arrow_print::write(&recordbatches);
let pretty_print = pretty_print.lines().collect::<Vec<&str>>();
let expected = vec![
"+----------------+---------------------+-----+--------+",
"| host | ts | cpu | memory |",
"+----------------+---------------------+-----+--------+",
"| frontend.host2 | 1970-01-01 00:00:02 | | |",
"| frontend.host3 | 1970-01-01 00:00:03 | 3.3 | 300 |",
"+----------------+---------------------+-----+--------+",
"+----------------+---------------------+-----+--------+-----------+",
"| host | ts | cpu | memory | disk_util |",
"+----------------+---------------------+-----+--------+-----------+",
"| frontend.host2 | 1970-01-01 00:00:02 | | | 9.9 |",
"| frontend.host3 | 1970-01-01 00:00:03 | 3.3 | 300 | 9.9 |",
"+----------------+---------------------+-----+--------+-----------+",
];
assert_eq!(pretty_print, expected);
}
@@ -394,6 +407,15 @@ mod tests {
datatype: Some(10), // float64
..Default::default()
};
let expected_disk_col = Column {
column_name: "disk_util".to_string(),
values: Some(column::Values {
f64_values: vec![9.9, 9.9, 9.9, 9.9],
..Default::default()
}),
datatype: Some(10), // float64
..Default::default()
};
let expected_ts_col = Column {
column_name: "ts".to_string(),
values: Some(column::Values {
@@ -467,13 +489,14 @@ mod tests {
assert_eq!(4, select_result.row_count);
let actual_columns = select_result.columns;
assert_eq!(4, actual_columns.len());
assert_eq!(5, actual_columns.len());
// Respect the order in create table schema
let expected_columns = vec![
expected_host_col,
expected_cpu_col,
expected_mem_col,
expected_disk_col,
expected_ts_col,
];
expected_columns
@@ -548,21 +571,35 @@ mod tests {
name: "host".to_string(),
datatype: 12, // string
is_nullable: false,
default_constraint: None,
},
GrpcColumnDef {
name: "cpu".to_string(),
datatype: 10, // float64
is_nullable: true,
default_constraint: None,
},
GrpcColumnDef {
name: "memory".to_string(),
datatype: 10, // float64
is_nullable: true,
default_constraint: None,
},
GrpcColumnDef {
name: "disk_util".to_string(),
datatype: 10, // float64
is_nullable: true,
default_constraint: Some(
ColumnDefaultConstraint::Value(Value::from(9.9f64))
.try_into()
.unwrap(),
),
},
GrpcColumnDef {
name: "ts".to_string(),
datatype: 15, // timestamp
is_nullable: true,
default_constraint: None,
},
];
CreateExpr {

View File

@@ -214,7 +214,9 @@ fn build_scripts_schema() -> Schema {
),
];
SchemaBuilder::from(cols)
// Schema is always valid here
SchemaBuilder::try_from(cols)
.unwrap()
.timestamp_index(3)
.build()
.unwrap()

View File

@@ -65,14 +65,20 @@ async fn test_shutdown_pg_server() -> Result<()> {
for _ in 0..1000 {
match create_connection(server_port).await {
Ok(connection) => {
let rows = connection
match connection
.simple_query("SELECT uint32s FROM numbers LIMIT 1")
.await
.unwrap();
let result_text = unwrap_results(&rows)[0];
let result: i32 = result_text.parse().unwrap();
assert_eq!(result, 0);
tokio::time::sleep(Duration::from_millis(10)).await;
{
Ok(rows) => {
let result_text = unwrap_results(&rows)[0];
let result: i32 = result_text.parse().unwrap();
assert_eq!(result, 0);
tokio::time::sleep(Duration::from_millis(10)).await;
}
Err(e) => {
return Err(e);
}
}
}
Err(e) => {
return Err(e);
@@ -91,7 +97,11 @@ async fn test_shutdown_pg_server() -> Result<()> {
let result = handle.await.unwrap();
assert!(result.is_err());
let error = result.unwrap_err().to_string();
assert!(error.contains("Connection refused") || error.contains("Connection reset by peer"));
assert!(
error.contains("Connection refused")
|| error.contains("Connection reset by peer")
|| error.contains("close")
);
}
Ok(())

View File

@@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
common-error = { path = "../common/error" }
common-time = { path = "../common/time" }
datatypes = { path = "../datatypes" }
snafu = { version = "0.7", features = ["backtraces"] }
sqlparser = "0.15.0"

View File

@@ -1,4 +1,4 @@
pub use sqlparser::ast::{
ColumnDef, ColumnOption, ColumnOptionDef, DataType, Expr, Ident, ObjectName, SqlOption,
TableConstraint, Value,
ColumnDef, ColumnOption, ColumnOptionDef, DataType, Expr, Function, FunctionArg,
FunctionArgExpr, Ident, ObjectName, SqlOption, TableConstraint, Value,
};

View File

@@ -1,9 +1,12 @@
use std::any::Any;
use common_error::prelude::*;
use datatypes::prelude::ConcreteDataType;
use sqlparser::parser::ParserError;
use sqlparser::tokenizer::TokenizerError;
use crate::ast::Expr;
pub type Result<T> = std::result::Result<T, Error>;
/// SQL parser errors.
@@ -29,6 +32,17 @@ pub enum Error {
source: ParserError,
},
#[snafu(display(
"Unsupported expr in default constraint: {} for column: {}",
expr,
column_name
))]
UnsupportedDefaultValue {
column_name: String,
expr: Expr,
backtrace: Backtrace,
},
// Syntax error from sql parser.
#[snafu(display("Syntax error, sql: {}, source: {}", sql, source))]
Syntax { sql: String, source: ParserError },
@@ -50,6 +64,21 @@ pub enum Error {
t: crate::ast::DataType,
backtrace: Backtrace,
},
#[snafu(display("Failed to parse value: {}, {}", msg, backtrace))]
ParseSqlValue { msg: String, backtrace: Backtrace },
#[snafu(display(
"Column {} expect type: {:?}, actual: {:?}",
column_name,
expect,
actual
))]
ColumnTypeMismatch {
column_name: String,
expect: ConcreteDataType,
actual: ConcreteDataType,
},
}
impl ErrorExt for Error {
@@ -57,13 +86,16 @@ impl ErrorExt for Error {
use Error::*;
match self {
Unsupported { .. } => StatusCode::Unsupported,
UnsupportedDefaultValue { .. } | Unsupported { .. } => StatusCode::Unsupported,
Unexpected { .. }
| Syntax { .. }
| InvalidTimeIndex { .. }
| Tokenizer { .. }
| InvalidSql { .. }
| ParseSqlValue { .. }
| SqlTypeNotSupported { .. } => StatusCode::InvalidSyntax,
ColumnTypeMismatch { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -6,15 +6,24 @@ pub mod show_database;
pub mod show_kind;
pub mod statement;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::types::DateTimeType;
use std::str::FromStr;
use crate::ast::{ColumnDef, ColumnOption, DataType as SqlDataType, ObjectName};
use crate::error::{self, Result};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema};
use datatypes::types::DateTimeType;
use datatypes::value::Value;
use snafu::ensure;
use crate::ast::{
ColumnDef, ColumnOption, ColumnOptionDef, DataType as SqlDataType, Expr, ObjectName,
Value as SqlValue,
};
use crate::error::{
self, ColumnTypeMismatchSnafu, ParseSqlValueSnafu, Result, UnsupportedDefaultValueSnafu,
};
/// Converts maybe fully-qualified table name (`<catalog>.<schema>.<table>` or `<table>` when
/// catalog and schema are default) to tuple.
/// catalog and schema are default) to tuple.
pub fn table_idents_to_full_name(
obj_name: &ObjectName,
) -> Result<(Option<String>, Option<String>, String)> {
@@ -35,15 +44,175 @@ pub fn table_idents_to_full_name(
}
}
fn parse_string_to_value(
column_name: &str,
s: String,
data_type: &ConcreteDataType,
) -> Result<Value> {
ensure!(
data_type.is_string(),
ColumnTypeMismatchSnafu {
column_name,
expect: data_type.clone(),
actual: ConcreteDataType::string_datatype(),
}
);
match data_type {
ConcreteDataType::String(_) => Ok(Value::String(s.into())),
ConcreteDataType::Date(_) => {
if let Ok(date) = common_time::date::Date::from_str(&s) {
Ok(Value::Date(date))
} else {
ParseSqlValueSnafu {
msg: format!("Failed to parse {} to Date value", s),
}
.fail()
}
}
ConcreteDataType::DateTime(_) => {
if let Ok(datetime) = common_time::datetime::DateTime::from_str(&s) {
Ok(Value::DateTime(datetime))
} else {
ParseSqlValueSnafu {
msg: format!("Failed to parse {} to DateTime value", s),
}
.fail()
}
}
_ => {
unreachable!()
}
}
}
macro_rules! parse_number_to_value {
($data_type: expr, $n: ident, $(($Type: ident, $PrimitiveType: ident)), +) => {
match $data_type {
$(
ConcreteDataType::$Type(_) => {
let n = parse_sql_number::<$PrimitiveType>($n)?;
Ok(Value::from(n))
},
)+
_ => ParseSqlValueSnafu {
msg: format!("Fail to parse number {}, invalid column type: {:?}",
$n, $data_type
)}.fail(),
}
}
}
/// Convert a sql value into datatype's value
pub fn sql_number_to_value(data_type: &ConcreteDataType, n: &str) -> Result<Value> {
parse_number_to_value!(
data_type,
n,
(UInt8, u8),
(UInt16, u16),
(UInt32, u32),
(UInt64, u64),
(Int8, i8),
(Int16, i16),
(Int32, i32),
(Int64, i64),
(Float64, f64),
(Float32, f32),
(Timestamp, i64)
)
// TODO(hl): also Date/DateTime
}
fn parse_sql_number<R: FromStr + std::fmt::Debug>(n: &str) -> Result<R>
where
<R as FromStr>::Err: std::fmt::Debug,
{
match n.parse::<R>() {
Ok(n) => Ok(n),
Err(e) => ParseSqlValueSnafu {
msg: format!("Fail to parse number {}, {:?}", n, e),
}
.fail(),
}
}
pub fn sql_value_to_value(
column_name: &str,
data_type: &ConcreteDataType,
sql_val: &SqlValue,
) -> Result<Value> {
Ok(match sql_val {
SqlValue::Number(n, _) => sql_number_to_value(data_type, n)?,
SqlValue::Null => Value::Null,
SqlValue::Boolean(b) => {
ensure!(
data_type.is_boolean(),
ColumnTypeMismatchSnafu {
column_name,
expect: data_type.clone(),
actual: ConcreteDataType::boolean_datatype(),
}
);
(*b).into()
}
SqlValue::DoubleQuotedString(s) | SqlValue::SingleQuotedString(s) => {
parse_string_to_value(column_name, s.to_owned(), data_type)?
}
_ => todo!("Other sql value"),
})
}
fn parse_column_default_constraint(
column_name: &str,
data_type: &ConcreteDataType,
opts: &[ColumnOptionDef],
) -> Result<Option<ColumnDefaultConstraint>> {
if let Some(opt) = opts
.iter()
.find(|o| matches!(o.option, ColumnOption::Default(_)))
{
let default_constraint = match &opt.option {
ColumnOption::Default(Expr::Value(v)) => {
ColumnDefaultConstraint::Value(sql_value_to_value(column_name, data_type, v)?)
}
ColumnOption::Default(Expr::Function(func)) => {
// Always use lowercase for function expression
ColumnDefaultConstraint::Function(format!("{}", func).to_lowercase())
}
ColumnOption::Default(expr) => {
return UnsupportedDefaultValueSnafu {
column_name,
expr: expr.clone(),
}
.fail();
}
_ => unreachable!(),
};
Ok(Some(default_constraint))
} else {
Ok(None)
}
}
/// Create a `ColumnSchema` from `ColumnDef`.
pub fn column_def_to_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
let is_nullable = column_def
.options
.iter()
.any(|o| matches!(o.option, ColumnOption::Null));
let name = column_def.name.value.clone();
let data_type = sql_data_type_to_concrete_data_type(&column_def.data_type)?;
let default_constraint =
parse_column_default_constraint(&name, &data_type, &column_def.options)?;
Ok(ColumnSchema {
name: column_def.name.value.clone(),
data_type: sql_data_type_to_concrete_data_type(&column_def.data_type)?,
name,
data_type,
is_nullable,
default_constraint,
})
}
@@ -86,6 +255,8 @@ fn sql_data_type_to_concrete_data_type(data_type: &SqlDataType) -> Result<Concre
#[cfg(test)]
mod tests {
use datatypes::value::OrderedFloat;
use super::*;
use crate::ast::Ident;
@@ -130,4 +301,92 @@ mod tests {
ConcreteDataType::timestamp_millis_datatype(),
);
}
#[test]
fn test_sql_number_to_value() {
let v = sql_number_to_value(&ConcreteDataType::float64_datatype(), "3.0").unwrap();
assert_eq!(Value::Float64(OrderedFloat(3.0)), v);
let v = sql_number_to_value(&ConcreteDataType::int32_datatype(), "999").unwrap();
assert_eq!(Value::Int32(999), v);
let v = sql_number_to_value(&ConcreteDataType::string_datatype(), "999");
assert!(v.is_err(), "parse value error is: {:?}", v);
}
#[test]
fn test_sql_value_to_value() {
let sql_val = SqlValue::Null;
assert_eq!(
Value::Null,
sql_value_to_value("a", &ConcreteDataType::float64_datatype(), &sql_val).unwrap()
);
let sql_val = SqlValue::Boolean(true);
assert_eq!(
Value::Boolean(true),
sql_value_to_value("a", &ConcreteDataType::boolean_datatype(), &sql_val).unwrap()
);
let sql_val = SqlValue::Number("3.0".to_string(), false);
assert_eq!(
Value::Float64(OrderedFloat(3.0)),
sql_value_to_value("a", &ConcreteDataType::float64_datatype(), &sql_val).unwrap()
);
let sql_val = SqlValue::Number("3.0".to_string(), false);
let v = sql_value_to_value("a", &ConcreteDataType::boolean_datatype(), &sql_val);
assert!(v.is_err());
assert!(format!("{:?}", v)
.contains("Fail to parse number 3.0, invalid column type: Boolean(BooleanType)"));
let sql_val = SqlValue::Boolean(true);
let v = sql_value_to_value("a", &ConcreteDataType::float64_datatype(), &sql_val);
assert!(v.is_err());
assert!(format!("{:?}", v).contains(
"column_name: \"a\", expect: Float64(Float64), actual: Boolean(BooleanType)"
));
}
#[test]
pub fn test_parse_date_literal() {
let value = sql_value_to_value(
"date",
&ConcreteDataType::date_datatype(),
&SqlValue::DoubleQuotedString("2022-02-22".to_string()),
)
.unwrap();
assert_eq!(ConcreteDataType::date_datatype(), value.data_type());
if let Value::Date(d) = value {
assert_eq!("2022-02-22", d.to_string());
} else {
unreachable!()
}
}
#[test]
pub fn test_parse_datetime_literal() {
let value = sql_value_to_value(
"datetime_col",
&ConcreteDataType::datetime_datatype(),
&SqlValue::DoubleQuotedString("2022-02-22 00:01:03".to_string()),
)
.unwrap();
assert_eq!(ConcreteDataType::date_datatype(), value.data_type());
if let Value::DateTime(d) = value {
assert_eq!("2022-02-22 00:01:03", d.to_string());
} else {
unreachable!()
}
}
#[test]
pub fn test_parse_illegal_datetime_literal() {
assert!(sql_value_to_value(
"datetime_col",
&ConcreteDataType::datetime_datatype(),
&SqlValue::DoubleQuotedString("2022-02-22 00:01:61".to_string()),
)
.is_err());
}
}

View File

@@ -16,7 +16,8 @@ pub fn new_schema(column_defs: &[ColumnDef], timestamp_index: Option<usize>) ->
.collect();
if let Some(index) = timestamp_index {
SchemaBuilder::from(column_schemas)
SchemaBuilder::try_from(column_schemas)
.unwrap()
.timestamp_index(index)
.build()
.unwrap()

View File

@@ -80,12 +80,13 @@ impl TryFrom<Schema> for schema::SchemaRef {
let schema: schema::SchemaRef = match schema.timestamp_index {
Some(index) => Arc::new(
schema::SchemaBuilder::from(column_schemas)
schema::SchemaBuilder::try_from(column_schemas)
.context(ConvertSchemaSnafu)?
.timestamp_index(index.value as usize)
.build()
.context(ConvertSchemaSnafu)?,
),
None => Arc::new(schema::Schema::new(column_schemas)),
None => Arc::new(schema::Schema::try_new(column_schemas).context(ConvertSchemaSnafu)?),
};
Ok(schema)

View File

@@ -68,6 +68,12 @@ pub enum Error {
source: datatypes::error::Error,
},
#[snafu(display("Failed to convert schema, source: {}", source))]
ConvertSchema {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display("Invalid projection, {}", msg))]
InvalidProjection { msg: String, backtrace: Backtrace },
}
@@ -255,7 +261,8 @@ impl StoreSchema {
row_key_end: usize,
user_column_end: usize,
) -> Result<StoreSchema> {
let schema = SchemaBuilder::from(column_schemas)
let schema = SchemaBuilder::try_from(column_schemas)
.context(ConvertSchemaSnafu)?
.timestamp_index(timestamp_key_index)
.version(version)
.add_metadata(ROW_KEY_END_KEY, row_key_end.to_string())
@@ -575,7 +582,9 @@ impl ProjectedSchema {
.map(|col_idx| ColumnSchema::from(&region_schema.column_metadata(*col_idx).desc))
.collect();
let mut builder = SchemaBuilder::from(column_schemas).version(region_schema.version());
let mut builder = SchemaBuilder::try_from(column_schemas)
.context(ConvertSchemaSnafu)?
.version(region_schema.version());
if let Some(timestamp_index) = timestamp_index {
builder = builder.timestamp_index(timestamp_index);
}
@@ -685,7 +694,8 @@ fn build_user_schema(columns: &ColumnsMetadata, version: u32) -> Result<Schema>
.map(|col| ColumnSchema::from(&col.desc))
.collect();
SchemaBuilder::from(column_schemas)
SchemaBuilder::try_from(column_schemas)
.context(ConvertSchemaSnafu)?
.timestamp_index(columns.timestamp_key_index())
.version(version)
.build()

View File

@@ -23,7 +23,9 @@ pub fn new_schema_with_version(
})
.collect();
let mut builder = SchemaBuilder::from(column_schemas).version(version);
let mut builder = SchemaBuilder::try_from(column_schemas)
.unwrap()
.version(version);
if let Some(index) = timestamp_index {
builder = builder.timestamp_index(index);
}

View File

@@ -9,7 +9,7 @@ use common_error::prelude::*;
use common_time::{RangeMillis, TimestampMillis};
use datatypes::vectors::TimestampVector;
use datatypes::{
arrow::error::ArrowError, data_type::ConcreteDataType, prelude::ScalarVector,
arrow::error::ArrowError, data_type::ConcreteDataType, prelude::ScalarVector, prelude::Value,
schema::SchemaRef, vectors::VectorRef,
};
use prost::{DecodeError, EncodeError};
@@ -202,11 +202,22 @@ impl WriteRequest for WriteBatch {
let column = put_data
.column_by_name(ts_col_name)
.unwrap_or_else(|| panic!("Cannot find column by name: {}", ts_col_name));
let ts_vector = column.as_any().downcast_ref::<TimestampVector>().unwrap(); // not expected to fail
for ts in ts_vector.iter_data().flatten() {
if column.is_const() {
let ts = match column.get(0) {
Value::Timestamp(ts) => ts,
_ => unreachable!(),
};
let aligned = align_timestamp(ts.value(), durations_millis)
.context(TimestampOverflowSnafu { ts: ts.value() })?;
aligned_timestamps.insert(aligned);
} else {
let ts_vector = column.as_any().downcast_ref::<TimestampVector>().unwrap(); // not expected to fail
for ts in ts_vector.iter_data().flatten() {
let aligned = align_timestamp(ts.value(), durations_millis)
.context(TimestampOverflowSnafu { ts: ts.value() })?;
aligned_timestamps.insert(aligned);
}
}
}
}
@@ -260,7 +271,7 @@ pub enum Mutation {
Put(PutData),
}
#[derive(Default)]
#[derive(Default, Debug)]
pub struct PutData {
columns: HashMap<String, VectorRef>,
}
@@ -806,7 +817,9 @@ mod tests {
use std::sync::Arc;
use datatypes::type_id::LogicalTypeId;
use datatypes::vectors::{BooleanVector, Int32Vector, Int64Vector, UInt64Vector};
use datatypes::vectors::{
BooleanVector, ConstantVector, Int32Vector, Int64Vector, UInt64Vector,
};
use super::*;
use crate::codec::{Decoder, Encoder};
@@ -1033,6 +1046,36 @@ mod tests {
)
}
#[test]
pub fn test_write_batch_time_range_const_vector() {
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3, 4, 5, 6]));
let tsv = Arc::new(ConstantVector::new(
Arc::new(TimestampVector::from_vec(vec![20])),
6,
));
let boolv = Arc::new(BooleanVector::from(vec![
true, false, true, false, false, false,
]));
let mut put_data = PutData::new();
put_data.add_key_column("k1", intv.clone()).unwrap();
put_data.add_version_column(intv).unwrap();
put_data.add_value_column("v1", boolv).unwrap();
put_data.add_key_column("ts", tsv).unwrap();
let mut batch = new_test_batch();
batch.put(put_data).unwrap();
let duration_millis = 20i64;
let ranges = batch
.time_ranges(Duration::from_millis(duration_millis as u64))
.unwrap();
assert_eq!(
[20].map(|v| RangeMillis::new(v, v + duration_millis).unwrap()),
ranges.as_slice()
)
}
fn gen_new_batch_and_extras() -> (WriteBatch, Vec<MutationExtra>) {
let mut batch = new_test_batch();
for i in 0..10 {

View File

@@ -12,7 +12,9 @@ mod snapshot;
mod types;
pub use datatypes::data_type::ConcreteDataType;
pub use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef};
pub use datatypes::schema::{
ColumnDefaultConstraint, ColumnSchema, Schema, SchemaBuilder, SchemaRef,
};
pub use self::chunk::{Chunk, ChunkReader};
pub use self::descriptors::*;

View File

@@ -1,8 +1,7 @@
use datatypes::value::Value;
use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use crate::storage::{consts, ColumnSchema, ConcreteDataType};
use crate::storage::{consts, ColumnDefaultConstraint, ColumnSchema, ConcreteDataType};
/// Id of column, unique in each region.
pub type ColumnId = u32;
@@ -23,10 +22,10 @@ pub struct ColumnDescriptor {
/// Is column nullable, default is true.
#[builder(default = "true")]
pub is_nullable: bool,
/// Default value of column, default is None, which means no default value
/// for this column, and user must provide value for a not-null column.
/// Default constraint of column, default is None, which means no default constraint
/// for this column, and user must provide a value for a not-null column.
#[builder(default)]
pub default_value: Option<Value>,
pub default_constraint: Option<ColumnDefaultConstraint>,
#[builder(default, setter(into))]
pub comment: String,
}
@@ -45,6 +44,7 @@ impl ColumnDescriptorBuilder {
impl From<&ColumnDescriptor> for ColumnSchema {
fn from(desc: &ColumnDescriptor) -> ColumnSchema {
ColumnSchema::new(&desc.name, desc.data_type.clone(), desc.is_nullable)
.with_default_constraint(desc.default_constraint.clone())
}
}
@@ -116,6 +116,8 @@ impl ColumnFamilyDescriptorBuilder {
#[cfg(test)]
mod tests {
use datatypes::value::Value;
use super::*;
#[inline]
@@ -130,7 +132,7 @@ mod tests {
assert_eq!("test", desc.name);
assert_eq!(ConcreteDataType::int32_datatype(), desc.data_type);
assert!(desc.is_nullable);
assert!(desc.default_value.is_none());
assert!(desc.default_constraint.is_none());
assert!(desc.comment.is_empty());
let desc = new_column_desc_builder()
@@ -140,16 +142,22 @@ mod tests {
assert!(!desc.is_nullable);
let desc = new_column_desc_builder()
.default_value(Some(Value::Null))
.default_constraint(Some(ColumnDefaultConstraint::Value(Value::Null)))
.build()
.unwrap();
assert_eq!(Value::Null, desc.default_value.unwrap());
assert_eq!(
ColumnDefaultConstraint::Value(Value::Null),
desc.default_constraint.unwrap()
);
let desc = new_column_desc_builder()
.default_value(Some(Value::Int32(123)))
.default_constraint(Some(ColumnDefaultConstraint::Value(Value::Int32(123))))
.build()
.unwrap();
assert_eq!(Value::Int32(123), desc.default_value.unwrap());
assert_eq!(
ColumnDefaultConstraint::Value(Value::Int32(123)),
desc.default_constraint.unwrap()
);
let desc = new_column_desc_builder()
.comment("A test column")

View File

@@ -25,7 +25,7 @@ pub trait WriteRequest: Send {
}
/// Put multiple rows.
pub trait PutOperation: Send {
pub trait PutOperation: Send + std::fmt::Debug {
type Error: ErrorExt + Send + Sync;
fn add_key_column(&mut self, name: &str, vector: VectorRef) -> Result<(), Self::Error>;

View File

@@ -143,6 +143,7 @@ pub(crate) fn build_row_key_desc(
ts_column_schema.name.clone(),
ts_column_schema.data_type.clone(),
)
.default_constraint(ts_column_schema.default_constraint.clone())
.is_nullable(ts_column_schema.is_nullable)
.build()
.context(BuildColumnDescriptorSnafu {
@@ -168,6 +169,7 @@ pub(crate) fn build_row_key_desc(
column_schema.name.clone(),
column_schema.data_type.clone(),
)
.default_constraint(column_schema.default_constraint.clone())
.is_nullable(column_schema.is_nullable)
.build()
.context(BuildColumnDescriptorSnafu {
@@ -210,6 +212,7 @@ pub(crate) fn build_column_family(
column_schema.name.clone(),
column_schema.data_type.clone(),
)
.default_constraint(column_schema.default_constraint.clone())
.is_nullable(column_schema.is_nullable)
.build()
.context(BuildColumnDescriptorSnafu {
@@ -421,15 +424,141 @@ mod tests {
use datafusion_common::field_util::SchemaExt;
use datatypes::prelude::{ConcreteDataType, ScalarVector};
use datatypes::schema::ColumnSchema;
use datatypes::schema::{ColumnDefaultConstraint, SchemaBuilder};
use datatypes::value::Value;
use datatypes::vectors::*;
use log_store::fs::noop::NoopLogStore;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use store_api::manifest::Manifest;
use store_api::storage::ReadContext;
use table::requests::{AlterKind, InsertRequest};
use tempdir::TempDir;
use super::*;
use crate::table::test_util;
use crate::table::test_util::{MockRegion, TABLE_NAME};
async fn setup_table_with_column_default_constraint() -> (TempDir, String, TableRef) {
let table_name = "test_default_constraint";
let column_schemas = vec![
ColumnSchema::new("name", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("n", ConcreteDataType::int32_datatype(), true)
.with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::from(42i32)))),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_datatype(common_time::timestamp::TimeUnit::Millisecond),
true,
),
];
let schema = Arc::new(
SchemaBuilder::try_from(column_schemas)
.unwrap()
.timestamp_index(2)
.build()
.expect("ts must be timestamp column"),
);
let (dir, object_store) =
test_util::new_test_object_store("test_insert_with_column_default_constraint").await;
let table_engine = MitoEngine::new(
EngineConfig::default(),
EngineImpl::new(
StorageEngineConfig::default(),
Arc::new(NoopLogStore::default()),
object_store.clone(),
),
object_store,
);
let table = table_engine
.create_table(
&EngineContext::default(),
CreateTableRequest {
id: 1,
catalog_name: None,
schema_name: None,
table_name: table_name.to_string(),
desc: Some("a test table".to_string()),
schema: schema.clone(),
create_if_not_exists: true,
primary_key_indices: Vec::default(),
table_options: HashMap::new(),
},
)
.await
.unwrap();
(dir, table_name.to_string(), table)
}
#[tokio::test]
async fn test_column_default_constraint() {
let (_dir, table_name, table) = setup_table_with_column_default_constraint().await;
let mut columns_values: HashMap<String, VectorRef> = HashMap::with_capacity(4);
let names = StringVector::from(vec!["first", "second"]);
let tss = TimestampVector::from_vec(vec![1, 2]);
columns_values.insert("name".to_string(), Arc::new(names.clone()));
columns_values.insert("ts".to_string(), Arc::new(tss.clone()));
let insert_req = InsertRequest {
table_name: table_name.to_string(),
columns_values,
};
assert_eq!(2, table.insert(insert_req).await.unwrap());
let stream = table.scan(&None, &[], None).await.unwrap();
let batches = util::collect(stream).await.unwrap();
assert_eq!(1, batches.len());
let record = &batches[0].df_recordbatch;
assert_eq!(record.num_columns(), 3);
let columns = record.columns();
assert_eq!(3, columns.len());
assert_eq!(names.to_arrow_array(), columns[0]);
assert_eq!(
Int32Vector::from_vec(vec![42, 42]).to_arrow_array(),
columns[1]
);
assert_eq!(tss.to_arrow_array(), columns[2]);
}
#[tokio::test]
async fn test_insert_with_column_default_constraint() {
let (_dir, table_name, table) = setup_table_with_column_default_constraint().await;
let mut columns_values: HashMap<String, VectorRef> = HashMap::with_capacity(4);
let names = StringVector::from(vec!["first", "second"]);
let nums = Int32Vector::from(vec![None, Some(66)]);
let tss = TimestampVector::from_vec(vec![1, 2]);
columns_values.insert("name".to_string(), Arc::new(names.clone()));
columns_values.insert("n".to_string(), Arc::new(nums.clone()));
columns_values.insert("ts".to_string(), Arc::new(tss.clone()));
let insert_req = InsertRequest {
table_name: table_name.to_string(),
columns_values,
};
assert_eq!(2, table.insert(insert_req).await.unwrap());
let stream = table.scan(&None, &[], None).await.unwrap();
let batches = util::collect(stream).await.unwrap();
assert_eq!(1, batches.len());
let record = &batches[0].df_recordbatch;
assert_eq!(record.num_columns(), 3);
let columns = record.columns();
assert_eq!(3, columns.len());
assert_eq!(names.to_arrow_array(), columns[0]);
assert_eq!(nums.to_arrow_array(), columns[1]);
assert_eq!(tss.to_arrow_array(), columns[2]);
}
#[test]
fn test_region_name() {
assert_eq!("1_0000000000", region_name(1, 0));

View File

@@ -165,6 +165,9 @@ pub enum Error {
backtrace: Backtrace,
column_qualified_name: String,
},
#[snafu(display("Unsupported column default constraint: {}", expr))]
UnsupportedDefaultConstraint { expr: String, backtrace: Backtrace },
}
impl From<Error> for table::error::Error {
@@ -196,6 +199,7 @@ impl ErrorExt for Error {
| ColumnExists { .. }
| ProjectedColumnNotFound { .. }
| MissingTimestampIndex { .. }
| UnsupportedDefaultConstraint { .. }
| TableNotFound { .. } => StatusCode::InvalidArguments,
TableInfoNotFound { .. } => StatusCode::Unexpected,

View File

@@ -11,7 +11,12 @@ use common_query::logical_plan::Expr;
use common_recordbatch::error::{Error as RecordBatchError, Result as RecordBatchResult};
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use common_telemetry::logging;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use common_time::util;
use common_time::Timestamp;
use datatypes::data_type::DataType;
use datatypes::prelude::ScalarVector;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaBuilder};
use datatypes::vectors::{ConstantVector, TimestampVector, VectorRef};
use futures::task::{Context, Poll};
use futures::Stream;
use object_store::ObjectStore;
@@ -34,8 +39,8 @@ use tokio::sync::Mutex;
use crate::engine::{build_column_family, build_row_key_desc, INIT_COLUMN_ID};
use crate::error::{
self, ProjectedColumnNotFoundSnafu, Result, ScanTableManifestSnafu, TableInfoNotFoundSnafu,
UpdateTableManifestSnafu,
self, ProjectedColumnNotFoundSnafu, Result, ScanTableManifestSnafu, SchemaBuildSnafu,
TableInfoNotFoundSnafu, UnsupportedDefaultConstraintSnafu, UpdateTableManifestSnafu,
};
use crate::manifest::action::*;
use crate::manifest::TableManifest;
@@ -76,30 +81,43 @@ impl<R: Region> Table for MitoTable<R> {
let mut columns_values = request.columns_values;
let table_info = self.table_info();
let schema = self.schema();
let key_columns = table_info.meta.row_key_column_names();
let value_columns = table_info.meta.value_column_names();
// columns_values is not empty, it's safe to unwrap
let rows_num = columns_values.values().next().unwrap().len();
//Add row key and columns
for name in key_columns {
let vector = columns_values
.remove(name)
.or_else(|| {
Self::try_get_column_default_constraint_vector(&schema, name, rows_num).ok()?
})
.context(MissingColumnSnafu { name })
.map_err(TableError::from)?;
put_op
.add_key_column(
name,
columns_values
.get(name)
.context(MissingColumnSnafu { name })?
.clone(),
)
.add_key_column(name, vector)
.map_err(TableError::new)?;
}
// Add vaue columns
let mut rows_num = 0;
for name in value_columns {
if let Some(v) = columns_values.remove(name) {
rows_num = v.len();
let vector = columns_values.remove(name).or_else(|| {
Self::try_get_column_default_constraint_vector(&schema, name, rows_num).ok()?
});
if let Some(v) = vector {
put_op.add_value_column(name, v).map_err(TableError::new)?;
}
}
logging::debug!(
"Insert into table {} with put_op: {:?}",
table_info.name,
put_op
);
write_request.put(put_op).map_err(TableError::new)?;
let _resp = self
@@ -272,7 +290,11 @@ fn build_table_schema_with_new_column(
// Right now we are not support adding the column
// before or after some column, so just clone a new schema like this.
// TODO(LFC): support adding column before or after some column
let mut builder = SchemaBuilder::from_columns(columns).version(table_schema.version() + 1);
let mut builder = SchemaBuilder::try_from_columns(columns)
.context(SchemaBuildSnafu {
msg: "Failed to convert column schemas into table schema",
})?
.version(table_schema.version() + 1);
if let Some(index) = table_schema.timestamp_index() {
builder = builder.timestamp_index(index);
@@ -398,6 +420,50 @@ impl<R: Region> MitoTable<R> {
Ok(MitoTable::new(table_info, region, manifest))
}
fn try_get_column_default_constraint_vector(
schema: &SchemaRef,
name: &str,
rows_num: usize,
) -> TableResult<Option<VectorRef>> {
// TODO(dennis): when we support altering schema, we should check the schemas difference between table and region
let column_schema = schema
.column_schema_by_name(name)
.expect("column schema not found");
if let Some(v) = &column_schema.default_constraint {
assert!(rows_num > 0);
match v {
ColumnDefaultConstraint::Value(v) => {
let mut mutable_vector = column_schema.data_type.create_mutable_vector(1);
mutable_vector
.push_value_ref(v.as_value_ref())
.map_err(TableError::new)?;
let vector =
Arc::new(ConstantVector::new(mutable_vector.to_vector(), rows_num));
Ok(Some(vector))
}
ColumnDefaultConstraint::Function(expr) => {
match &expr[..] {
// TODO(dennis): we only supports current_timestamp right now,
// it's better to use a expression framework in future.
"current_timestamp()" => {
let vector =
Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis(
util::current_time_millis(),
)]));
Ok(Some(Arc::new(ConstantVector::new(vector, rows_num))))
}
_ => UnsupportedDefaultConstraintSnafu { expr }
.fail()
.map_err(TableError::new),
}
}
}
} else {
Ok(None)
}
}
pub async fn open(
table_name: &str,
region: R,
@@ -487,6 +553,7 @@ mod tests {
use datatypes::prelude::ConcreteDataType;
use super::*;
use crate::table::test_util;
#[test]
fn test_table_manifest_dir() {

View File

@@ -36,7 +36,8 @@ pub fn schema_for_test() -> Schema {
),
];
SchemaBuilder::from(column_schemas)
SchemaBuilder::try_from(column_schemas)
.unwrap()
.timestamp_index(3)
.build()
.expect("ts must be timestamp column")