From 74ea529d1a409551394ad457ce6d571e238c774a Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Fri, 11 Nov 2022 15:36:27 +0800 Subject: [PATCH] feat: move time index metadata from schema into field (#444) * feat: move time index metadata from schema into field * chore: remove useless code * test: test select with column alias * fix: conflicts with develop branch * test: add test * test: order by timestamp to ensure query results order * fix: comment --- src/catalog/src/system.rs | 9 +- src/common/insert/src/insert.rs | 22 ++- src/datanode/src/error.rs | 4 + src/datanode/src/server/grpc/ddl.rs | 33 +++-- src/datanode/src/sql.rs | 4 +- src/datanode/src/sql/alter.rs | 2 +- src/datanode/src/sql/create.rs | 10 +- src/datanode/src/tests/http_test.rs | 13 ++ src/datanode/src/tests/instance_test.rs | 23 ++- src/datanode/src/tests/test_util.rs | 3 +- src/datatypes/src/error.rs | 7 + src/datatypes/src/schema.rs | 135 +++++++++++------- src/datatypes/src/schema/raw.rs | 5 +- src/frontend/src/instance.rs | 14 +- src/script/src/table.rs | 9 +- src/sql/src/statements.rs | 3 +- .../benches/memtable/util/schema_util.rs | 11 +- src/storage/proto/write_batch.proto | 1 + src/storage/src/metadata.rs | 5 + src/storage/src/proto/write_batch.rs | 6 +- src/storage/src/schema/projected.rs | 14 -- src/storage/src/schema/region.rs | 6 - src/storage/src/schema/store.rs | 4 - src/storage/src/test_util/descriptor_util.rs | 12 +- src/storage/src/test_util/schema_util.rs | 11 +- src/storage/src/write_batch/compat.rs | 7 +- src/store-api/src/storage/descriptors.rs | 9 ++ src/table-engine/src/engine.rs | 5 +- src/table-engine/src/table/test_util.rs | 4 +- src/table/src/metadata.rs | 23 +-- 30 files changed, 249 insertions(+), 165 deletions(-) diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index 812f41a269..dff4c644c2 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -152,7 +152,8 @@ fn build_system_catalog_schema() -> Schema { "timestamp".to_string(), ConcreteDataType::timestamp_millis_datatype(), false, - ), + ) + .with_time_index(true), ColumnSchema::new( "value".to_string(), ConcreteDataType::binary_datatype(), @@ -171,11 +172,7 @@ fn build_system_catalog_schema() -> Schema { ]; // The schema of this table must be valid. - SchemaBuilder::try_from(cols) - .unwrap() - .timestamp_index(Some(2)) - .build() - .unwrap() + SchemaBuilder::try_from(cols).unwrap().build().unwrap() } pub fn build_table_insert_request(full_table_name: String, table_id: TableId) -> InsertRequest { diff --git a/src/common/insert/src/insert.rs b/src/common/insert/src/insert.rs index aa7f294a96..132d95a000 100644 --- a/src/common/insert/src/insert.rs +++ b/src/common/insert/src/insert.rs @@ -124,6 +124,7 @@ pub fn build_create_table_request( { if !new_columns.contains(column_name) { let mut is_nullable = true; + let mut is_time_index = false; match *semantic_type { TAG_SEMANTIC_TYPE => primary_key_indices.push(column_schemas.len()), TIMESTAMP_SEMANTIC_TYPE => { @@ -135,13 +136,15 @@ pub fn build_create_table_request( } ); timestamp_index = column_schemas.len(); + is_time_index = true; // Timestamp column must not be null. is_nullable = false; } _ => {} } - let column_schema = build_column_schema(column_name, *datatype, is_nullable)?; + let column_schema = build_column_schema(column_name, *datatype, is_nullable)? + .with_time_index(is_time_index); column_schemas.push(column_schema); new_columns.insert(column_name.to_string()); } @@ -152,7 +155,6 @@ pub fn build_create_table_request( let schema = Arc::new( SchemaBuilder::try_from(column_schemas) .unwrap() - .timestamp_index(Some(timestamp_index)) .build() .context(CreateSchemaSnafu)?, ); @@ -428,17 +430,13 @@ mod tests { fn test_find_new_columns() { let mut columns = Vec::with_capacity(1); let cpu_column = build_column_schema("cpu", 10, true).unwrap(); - let ts_column = build_column_schema("ts", 15, false).unwrap(); + let ts_column = build_column_schema("ts", 15, false) + .unwrap() + .with_time_index(true); columns.push(cpu_column); columns.push(ts_column); - let schema = Arc::new( - SchemaBuilder::try_from(columns) - .unwrap() - .timestamp_index(Some(1)) - .build() - .unwrap(), - ); + let schema = Arc::new(SchemaBuilder::try_from(columns).unwrap().build().unwrap()); assert!(find_new_columns(&schema, &[]).unwrap().is_none()); @@ -539,13 +537,13 @@ mod tests { ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), - ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), true), + ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), true) + .with_time_index(true), ]; Arc::new( SchemaBuilder::try_from(column_schemas) .unwrap() - .timestamp_index(Some(3)) .build() .unwrap(), ) diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 6473271691..1bd0553a21 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -72,6 +72,9 @@ pub enum Error { #[snafu(display("Missing required field in protobuf, field: {}", field))] MissingField { field: String, backtrace: Backtrace }, + #[snafu(display("Missing timestamp column in request"))] + MissingTimestampColumn { backtrace: Backtrace }, + #[snafu(display( "Columns and values number mismatch, columns: {}, values: {}", columns, @@ -315,6 +318,7 @@ impl ErrorExt for Error { | Error::KeyColumnNotFound { .. } | Error::InvalidPrimaryKey { .. } | Error::MissingField { .. } + | Error::MissingTimestampColumn { .. } | Error::CatalogNotFound { .. } | Error::SchemaNotFound { .. } | Error::ConstraintNotSupported { .. } diff --git a/src/datanode/src/server/grpc/ddl.rs b/src/datanode/src/server/grpc/ddl.rs index ac60c80ce1..b675424989 100644 --- a/src/datanode/src/server/grpc/ddl.rs +++ b/src/datanode/src/server/grpc/ddl.rs @@ -142,23 +142,30 @@ fn create_table_schema(expr: &CreateExpr) -> Result { .iter() .map(create_column_schema) .collect::>>()?; - let ts_index = column_schemas - .iter() - .enumerate() - .find_map(|(i, column)| { - if column.name == expr.time_index { - Some(i) + + ensure!( + column_schemas + .iter() + .any(|column| column.name == expr.time_index), + error::KeyColumnNotFoundSnafu { + name: &expr.time_index, + } + ); + + let column_schemas = column_schemas + .into_iter() + .map(|column_schema| { + if column_schema.name == expr.time_index { + column_schema.with_time_index(true) } else { - None + column_schema } }) - .context(error::KeyColumnNotFoundSnafu { - name: &expr.time_index, - })?; + .collect::>(); + Ok(Arc::new( SchemaBuilder::try_from(column_schemas) .context(error::CreateSchemaSnafu)? - .timestamp_index(Some(ts_index)) .build() .context(error::CreateSchemaSnafu)?, )) @@ -324,14 +331,14 @@ mod tests { fn expected_table_schema() -> SchemaRef { let column_schemas = vec![ ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), - ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false), + ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false) + .with_time_index(true), ColumnSchema::new("cpu", ConcreteDataType::float32_datatype(), true), ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), ]; Arc::new( SchemaBuilder::try_from(column_schemas) .unwrap() - .timestamp_index(Some(1)) .build() .unwrap(), ) diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index c3da100f7e..0b2d7a11f8 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -129,13 +129,13 @@ mod tests { ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), - ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), true), + ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), true) + .with_time_index(true), ]; Arc::new( SchemaBuilder::try_from(column_schemas) .unwrap() - .timestamp_index(Some(3)) .build() .unwrap(), ) diff --git a/src/datanode/src/sql/alter.rs b/src/datanode/src/sql/alter.rs index 7f92c1f2e8..da4df1e40e 100644 --- a/src/datanode/src/sql/alter.rs +++ b/src/datanode/src/sql/alter.rs @@ -36,7 +36,7 @@ impl SqlHandler { } AlterTableOperation::AddColumn { column_def } => AlterKind::AddColumns { columns: vec![AddColumnRequest { - column_schema: column_def_to_schema(column_def) + column_schema: column_def_to_schema(column_def, false) .context(error::ParseSqlSnafu)?, // FIXME(dennis): supports adding key column is_key: false, diff --git a/src/datanode/src/sql/create.rs b/src/datanode/src/sql/create.rs index 634973ca2b..99cb619514 100644 --- a/src/datanode/src/sql/create.rs +++ b/src/datanode/src/sql/create.rs @@ -143,6 +143,8 @@ impl SqlHandler { } ); + ensure!(ts_index != usize::MAX, error::MissingTimestampColumnSnafu); + if primary_keys.is_empty() { info!( "Creating table: {:?}.{:?}.{} but primary key not set, use time index column: {}", @@ -154,13 +156,15 @@ impl SqlHandler { let columns_schemas: Vec<_> = stmt .columns .iter() - .map(|column| column_def_to_schema(column).context(error::ParseSqlSnafu)) + .enumerate() + .map(|(index, column)| { + column_def_to_schema(column, index == ts_index).context(error::ParseSqlSnafu) + }) .collect::>>()?; let schema = Arc::new( SchemaBuilder::try_from(columns_schemas) .context(CreateSchemaSnafu)? - .timestamp_index(Some(ts_index)) .build() .context(CreateSchemaSnafu)?, ); @@ -239,7 +243,7 @@ mod tests { PRIMARY KEY(host)) engine=mito with(regions=1);"#, ); let error = handler.create_to_request(42, parsed_stmt).unwrap_err(); - assert_matches!(error, Error::CreateSchema { .. }); + assert_matches!(error, Error::MissingTimestampColumn { .. }); } /// If primary key is not specified, time index should be used as primary key. diff --git a/src/datanode/src/tests/http_test.rs b/src/datanode/src/tests/http_test.rs index b84ec01197..a517d12d41 100644 --- a/src/datanode/src/tests/http_test.rs +++ b/src/datanode/src/tests/http_test.rs @@ -81,6 +81,19 @@ async fn test_sql_api() { body, r#"{"code":0,"output":{"records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"Timestamp"}]},"rows":[[66.6,0]]}}}"# ); + + // select with column alias + let res = client + .get("/v1/sql?sql=select cpu as c, ts as time from demo limit 10") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + let body = res.text().await; + assert_eq!( + body, + r#"{"code":0,"output":{"records":{"schema":{"column_schemas":[{"name":"c","data_type":"Float64"},{"name":"time","data_type":"Timestamp"}]},"rows":[[66.6,0]]}}}"# + ); } #[tokio::test(flavor = "multi_thread")] diff --git a/src/datanode/src/tests/instance_test.rs b/src/datanode/src/tests/instance_test.rs index 8edd6fd500..7e5c761082 100644 --- a/src/datanode/src/tests/instance_test.rs +++ b/src/datanode/src/tests/instance_test.rs @@ -112,7 +112,28 @@ async fn test_execute_insert_query_with_i64_timestamp() { .unwrap(); assert!(matches!(output, Output::AffectedRows(2))); - let query_output = instance.execute_sql("select ts from demo").await.unwrap(); + let query_output = instance + .execute_sql("select ts from demo order by ts") + .await + .unwrap(); + + match query_output { + Output::Stream(s) => { + let batches = util::collect(s).await.unwrap(); + let columns = batches[0].df_recordbatch.columns(); + assert_eq!(1, columns.len()); + assert_eq!( + &Int64Array::from_slice(&[1655276557000, 1655276558000]), + columns[0].as_any().downcast_ref::().unwrap() + ); + } + _ => unreachable!(), + } + + let query_output = instance + .execute_sql("select ts as time from demo order by ts") + .await + .unwrap(); match query_output { Output::Stream(s) => { diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index d54f15f8c1..268a686de4 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -48,7 +48,7 @@ pub async fn create_test_table(instance: &Instance, ts_type: ConcreteDataType) - ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), - ColumnSchema::new("ts", ts_type, true), + ColumnSchema::new("ts", ts_type, true).with_time_index(true), ]; let table_name = "demo"; @@ -65,7 +65,6 @@ pub async fn create_test_table(instance: &Instance, ts_type: ConcreteDataType) - schema: Arc::new( SchemaBuilder::try_from(column_schemas) .unwrap() - .timestamp_index(Some(3)) .build() .expect("ts is expected to be timestamp column"), ), diff --git a/src/datatypes/src/error.rs b/src/datatypes/src/error.rs index a3053a5293..52cefb3026 100644 --- a/src/datatypes/src/error.rs +++ b/src/datatypes/src/error.rs @@ -55,6 +55,13 @@ pub enum Error { #[snafu(display("Invalid timestamp index: {}", index))] InvalidTimestampIndex { index: usize, backtrace: Backtrace }, + #[snafu(display("Duplicate timestamp index, exists: {}, new: {}", exists, new))] + DuplicateTimestampIndex { + exists: usize, + new: usize, + backtrace: Backtrace, + }, + #[snafu(display("{}", msg))] CastType { msg: String, backtrace: Backtrace }, diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index 82fd3c874b..41e4b28496 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -7,7 +7,7 @@ use std::sync::Arc; pub use arrow::datatypes::Metadata; use arrow::datatypes::{Field, Schema as ArrowSchema}; use serde::{Deserialize, Serialize}; -use snafu::{ensure, OptionExt, ResultExt}; +use snafu::{ensure, ResultExt}; use crate::data_type::{ConcreteDataType, DataType}; use crate::error::{self, DeserializeSnafu, Error, Result, SerializeSnafu}; @@ -15,13 +15,8 @@ pub use crate::schema::constraint::ColumnDefaultConstraint; pub use crate::schema::raw::RawSchema; use crate::vectors::VectorRef; -/// Key used to store column name of the timestamp column in metadata. -/// -/// Instead of storing the column index, we store the column name as the -/// query engine may modify the column order of the arrow schema, then -/// we would fail to recover the correct timestamp column when converting -/// the arrow schema back to our schema. -const TIMESTAMP_COLUMN_KEY: &str = "greptime:timestamp_column"; +/// Key used to store whether the column is time index in arrow field's metadata. +const TIME_INDEX_KEY: &str = "greptime:time_index"; /// Key used to store version number of the schema in metadata. const VERSION_KEY: &str = "greptime:version"; /// Key used to store default constraint in arrow field's metadata. @@ -33,6 +28,7 @@ pub struct ColumnSchema { pub name: String, pub data_type: ConcreteDataType, is_nullable: bool, + is_time_index: bool, default_constraint: Option, metadata: Metadata, } @@ -47,11 +43,17 @@ impl ColumnSchema { name: name.into(), data_type, is_nullable, + is_time_index: false, default_constraint: None, metadata: Metadata::new(), } } + #[inline] + pub fn is_time_index(&self) -> bool { + self.is_time_index + } + #[inline] pub fn is_nullable(&self) -> bool { self.is_nullable @@ -67,6 +69,17 @@ impl ColumnSchema { &self.metadata } + pub fn with_time_index(mut self, is_time_index: bool) -> Self { + self.is_time_index = is_time_index; + if is_time_index { + self.metadata + .insert(TIME_INDEX_KEY.to_string(), "true".to_string()); + } else { + self.metadata.remove(TIME_INDEX_KEY); + } + self + } + pub fn with_default_constraint( mut self, default_constraint: Option, @@ -229,24 +242,21 @@ impl TryFrom> for SchemaBuilder { impl SchemaBuilder { pub fn try_from_columns(column_schemas: Vec) -> Result { - let (fields, name_to_index) = collect_fields(&column_schemas)?; + let FieldsAndIndices { + fields, + name_to_index, + timestamp_index, + } = collect_fields(&column_schemas)?; Ok(Self { column_schemas, name_to_index, fields, + timestamp_index, ..Default::default() }) } - /// Set timestamp index. - /// - /// The validation of timestamp column is done in `build()`. - pub fn timestamp_index(mut self, timestamp_index: Option) -> Self { - self.timestamp_index = timestamp_index; - self - } - pub fn version(mut self, version: u32) -> Self { self.version = version; self @@ -263,10 +273,8 @@ impl SchemaBuilder { pub fn build(mut self) -> Result { if let Some(timestamp_index) = self.timestamp_index { validate_timestamp_index(&self.column_schemas, timestamp_index)?; - let timestamp_name = self.column_schemas[timestamp_index].name.clone(); - self.metadata - .insert(TIMESTAMP_COLUMN_KEY.to_string(), timestamp_name); } + self.metadata .insert(VERSION_KEY.to_string(), self.version.to_string()); @@ -282,16 +290,37 @@ impl SchemaBuilder { } } -fn collect_fields(column_schemas: &[ColumnSchema]) -> Result<(Vec, HashMap)> { +struct FieldsAndIndices { + fields: Vec, + name_to_index: HashMap, + timestamp_index: Option, +} + +fn collect_fields(column_schemas: &[ColumnSchema]) -> Result { let mut fields = Vec::with_capacity(column_schemas.len()); let mut name_to_index = HashMap::with_capacity(column_schemas.len()); + let mut timestamp_index = None; for (index, column_schema) in column_schemas.iter().enumerate() { + if column_schema.is_time_index() { + ensure!( + timestamp_index.is_none(), + error::DuplicateTimestampIndexSnafu { + exists: timestamp_index.unwrap(), + new: index, + } + ); + timestamp_index = Some(index); + } let field = Field::try_from(column_schema)?; fields.push(field); name_to_index.insert(column_schema.name.clone(), index); } - Ok((fields, name_to_index)) + Ok(FieldsAndIndices { + fields, + name_to_index, + timestamp_index, + }) } fn validate_timestamp_index(column_schemas: &[ColumnSchema], timestamp_index: usize) -> Result<()> { @@ -309,6 +338,12 @@ fn validate_timestamp_index(column_schemas: &[ColumnSchema], timestamp_index: us index: timestamp_index, } ); + ensure!( + column_schema.is_time_index(), + error::InvalidTimestampIndexSnafu { + index: timestamp_index, + } + ); Ok(()) } @@ -325,11 +360,13 @@ impl TryFrom<&Field> for ColumnSchema { Some(json) => Some(serde_json::from_str(&json).context(DeserializeSnafu { json })?), None => None, }; + let is_time_index = metadata.contains_key(TIME_INDEX_KEY); Ok(ColumnSchema { name: field.name.clone(), data_type, is_nullable: field.is_nullable, + is_time_index, default_constraint, metadata, }) @@ -377,15 +414,21 @@ impl TryFrom> for Schema { column_schemas.push(column_schema); } - let timestamp_name = arrow_schema.metadata.get(TIMESTAMP_COLUMN_KEY); let mut timestamp_index = None; - if let Some(name) = timestamp_name { - let index = name_to_index - .get(name) - .context(error::TimestampNotFoundSnafu { name })?; - validate_timestamp_index(&column_schemas, *index)?; - timestamp_index = Some(*index); + for (index, column_schema) in column_schemas.iter().enumerate() { + if column_schema.is_time_index() { + validate_timestamp_index(&column_schemas, index)?; + ensure!( + timestamp_index.is_none(), + error::DuplicateTimestampIndexSnafu { + exists: timestamp_index.unwrap(), + new: index, + } + ); + timestamp_index = Some(index); + } } + let version = try_parse_version(&arrow_schema.metadata, VERSION_KEY)?; Ok(Self { @@ -549,11 +592,6 @@ mod tests { let schema = SchemaBuilder::default().build().unwrap(); assert_eq!(0, schema.num_columns()); assert!(schema.is_empty()); - - assert!(SchemaBuilder::default() - .timestamp_index(Some(0)) - .build() - .is_err()); } #[test] @@ -602,11 +640,11 @@ mod tests { fn test_schema_with_timestamp() { let column_schemas = vec![ ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), - ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false), + ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false) + .with_time_index(true), ]; let schema = SchemaBuilder::try_from(column_schemas.clone()) .unwrap() - .timestamp_index(Some(1)) .version(123) .build() .unwrap(); @@ -623,22 +661,23 @@ mod tests { #[test] fn test_schema_wrong_timestamp() { let column_schemas = vec![ - ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), + ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true) + .with_time_index(true), ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false), ]; - assert!(SchemaBuilder::try_from(column_schemas.clone()) - .unwrap() - .timestamp_index(Some(0)) - .build() - .is_err()); - assert!(SchemaBuilder::try_from(column_schemas.clone()) - .unwrap() - .timestamp_index(Some(1)) - .build() - .is_err()); assert!(SchemaBuilder::try_from(column_schemas) .unwrap() - .timestamp_index(Some(1)) + .build() + .is_err()); + + let column_schemas = vec![ + ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), + ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false) + .with_time_index(true), + ]; + + assert!(SchemaBuilder::try_from(column_schemas) + .unwrap() .build() .is_err()); } diff --git a/src/datatypes/src/schema/raw.rs b/src/datatypes/src/schema/raw.rs index 1845f8b3ee..8e70d9d0ea 100644 --- a/src/datatypes/src/schema/raw.rs +++ b/src/datatypes/src/schema/raw.rs @@ -18,7 +18,6 @@ impl TryFrom for Schema { fn try_from(raw: RawSchema) -> Result { SchemaBuilder::try_from(raw.column_schemas)? - .timestamp_index(raw.timestamp_index) .version(raw.version) .build() } @@ -43,11 +42,11 @@ mod tests { fn test_raw_convert() { let column_schemas = vec![ ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), - ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false), + ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false) + .with_time_index(true), ]; let schema = SchemaBuilder::try_from(column_schemas) .unwrap() - .timestamp_index(Some(1)) .version(123) .build() .unwrap(); diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index efd6544e19..efeaa10f07 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -160,12 +160,13 @@ fn create_to_expr(create: CreateTable) -> Result { let (catalog_name, schema_name, table_name) = table_idents_to_full_name(&create.name).context(error::ParseSqlSnafu)?; + let time_index = find_time_index(&create.constraints)?; let expr = CreateExpr { catalog_name: Some(catalog_name), schema_name: Some(schema_name), table_name, - column_defs: columns_to_expr(&create.columns)?, - time_index: find_time_index(&create.constraints)?, + column_defs: columns_to_expr(&create.columns, &time_index)?, + time_index, primary_keys: find_primary_keys(&create.constraints)?, create_if_not_exists: create.if_not_exists, // TODO(LFC): Fill in other table options. @@ -222,10 +223,12 @@ fn find_time_index(constraints: &[TableConstraint]) -> Result { Ok(time_index.first().unwrap().to_string()) } -fn columns_to_expr(column_defs: &[ColumnDef]) -> Result> { +fn columns_to_expr(column_defs: &[ColumnDef], time_index: &str) -> Result> { let column_schemas = column_defs .iter() - .map(|c| column_def_to_schema(c).context(error::ParseSqlSnafu)) + .map(|c| { + column_def_to_schema(c, c.name.to_string() == time_index).context(error::ParseSqlSnafu) + }) .collect::>>()?; let column_datatypes = column_schemas @@ -423,8 +426,7 @@ mod tests { ts_millis_values: vec![1000, 2000, 3000, 4000], ..Default::default() }), - // FIXME(dennis): looks like the read schema in table scan doesn't have timestamp index, we have to investigate it. - semantic_type: SemanticType::Field as i32, + semantic_type: SemanticType::Timestamp as i32, datatype: ColumnDataType::Timestamp as i32, ..Default::default() }; diff --git a/src/script/src/table.rs b/src/script/src/table.rs index 39cb261740..bd45875bb1 100644 --- a/src/script/src/table.rs +++ b/src/script/src/table.rs @@ -204,7 +204,8 @@ fn build_scripts_schema() -> Schema { "timestamp".to_string(), ConcreteDataType::timestamp_millis_datatype(), false, - ), + ) + .with_time_index(true), ColumnSchema::new( "gmt_created".to_string(), ConcreteDataType::timestamp_millis_datatype(), @@ -218,9 +219,5 @@ fn build_scripts_schema() -> Schema { ]; // Schema is always valid here - SchemaBuilder::try_from(cols) - .unwrap() - .timestamp_index(Some(3)) - .build() - .unwrap() + SchemaBuilder::try_from(cols).unwrap().build().unwrap() } diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index db9b5bdb07..0f60898ab8 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -220,7 +220,7 @@ fn parse_column_default_constraint( // TODO(yingwen): Make column nullable by default, and checks invalid case like // a column is not nullable but has a default value null. /// Create a `ColumnSchema` from `ColumnDef`. -pub fn column_def_to_schema(column_def: &ColumnDef) -> Result { +pub fn column_def_to_schema(column_def: &ColumnDef, is_time_index: bool) -> Result { let is_nullable = column_def .options .iter() @@ -232,6 +232,7 @@ pub fn column_def_to_schema(column_def: &ColumnDef) -> Result { parse_column_default_constraint(&name, &data_type, &column_def.options)?; ColumnSchema::new(name, data_type, is_nullable) + .with_time_index(is_time_index) .with_default_constraint(default_constraint) .context(error::InvalidDefaultSnafu { column: &column_def.name.value, diff --git a/src/storage/benches/memtable/util/schema_util.rs b/src/storage/benches/memtable/util/schema_util.rs index 073675365e..04e15a1591 100644 --- a/src/storage/benches/memtable/util/schema_util.rs +++ b/src/storage/benches/memtable/util/schema_util.rs @@ -9,15 +9,20 @@ pub type ColumnDef<'a> = (&'a str, LogicalTypeId, bool); pub fn new_schema(column_defs: &[ColumnDef], timestamp_index: Option) -> Schema { let column_schemas: Vec<_> = column_defs .iter() - .map(|column_def| { + .enumerate() + .map(|(index, column_def)| { let datatype = column_def.1.data_type(); - ColumnSchema::new(column_def.0, datatype, column_def.2) + if let Some(timestamp_index) = timestamp_index { + ColumnSchema::new(column_def.0, datatype, column_def.2) + .with_time_index(index == timestamp_index) + } else { + ColumnSchema::new(column_def.0, datatype, column_def.2) + } }) .collect(); SchemaBuilder::try_from(column_schemas) .unwrap() - .timestamp_index(timestamp_index) .build() .unwrap() } diff --git a/src/storage/proto/write_batch.proto b/src/storage/proto/write_batch.proto index c0730bad23..6f0ec4a388 100644 --- a/src/storage/proto/write_batch.proto +++ b/src/storage/proto/write_batch.proto @@ -20,6 +20,7 @@ message ColumnSchema { string name = 1; DataType data_type = 2; bool is_nullable = 3; + bool is_time_index = 4; } message Mutation { diff --git a/src/storage/src/metadata.rs b/src/storage/src/metadata.rs index 0a733ec79a..eca8314fc6 100644 --- a/src/storage/src/metadata.rs +++ b/src/storage/src/metadata.rs @@ -383,8 +383,10 @@ impl ColumnMetadata { /// would store additional metadatas to the ColumnSchema. pub fn to_column_schema(&self) -> Result { let desc = &self.desc; + ColumnSchema::new(&desc.name, desc.data_type.clone(), desc.is_nullable()) .with_metadata(self.to_metadata()) + .with_time_index(self.desc.is_time_index()) .with_default_constraint(desc.default_constraint().cloned()) .context(ToColumnSchemaSnafu) } @@ -405,6 +407,7 @@ impl ColumnMetadata { column_schema.data_type.clone(), ) .is_nullable(column_schema.is_nullable()) + .is_time_index(column_schema.is_time_index()) .default_constraint(column_schema.default_constraint().cloned()) .comment(comment) .build() @@ -1002,6 +1005,7 @@ mod tests { let timestamp = ColumnDescriptorBuilder::new(2, "ts", ConcreteDataType::int64_datatype()) .is_nullable(false) + .is_time_index(true) .build() .unwrap(); let row_key = RowKeyDescriptorBuilder::new(timestamp) @@ -1021,6 +1025,7 @@ mod tests { let timestamp = ColumnDescriptorBuilder::new(2, "ts", ConcreteDataType::timestamp_millis_datatype()) .is_nullable(false) + .is_time_index(true) .build() .unwrap(); let row_key = RowKeyDescriptorBuilder::new(timestamp) diff --git a/src/storage/src/proto/write_batch.rs b/src/storage/src/proto/write_batch.rs index 6aefe757a5..7289750cf7 100644 --- a/src/storage/src/proto/write_batch.rs +++ b/src/storage/src/proto/write_batch.rs @@ -78,11 +78,9 @@ impl TryFrom for schema::SchemaRef { .map(schema::ColumnSchema::try_from) .collect::>>()?; - let timestamp_index = schema.timestamp_index.map(|index| index.value as usize); let schema = Arc::new( schema::SchemaBuilder::try_from(column_schemas) .context(ConvertSchemaSnafu)? - .timestamp_index(timestamp_index) .build() .context(ConvertSchemaSnafu)?, ); @@ -97,6 +95,7 @@ impl From<&schema::ColumnSchema> for ColumnSchema { name: cs.name.clone(), data_type: DataType::from(&cs.data_type).into(), is_nullable: cs.is_nullable(), + is_time_index: cs.is_time_index(), } } } @@ -110,7 +109,8 @@ impl TryFrom<&ColumnSchema> for schema::ColumnSchema { column_schema.name.clone(), data_type.into(), column_schema.is_nullable, - )) + ) + .with_time_index(column_schema.is_time_index)) } else { InvalidDataTypeSnafu { data_type: column_schema.data_type, diff --git a/src/storage/src/schema/projected.rs b/src/storage/src/schema/projected.rs index 075b949bf8..4af9cde7d9 100644 --- a/src/storage/src/schema/projected.rs +++ b/src/storage/src/schema/projected.rs @@ -200,7 +200,6 @@ impl ProjectedSchema { let store_schema = StoreSchema::new( columns, region_schema.version(), - region_schema.timestamp_key_index(), region_schema.row_key_end(), projection.num_user_columns, )?; @@ -212,18 +211,6 @@ impl ProjectedSchema { region_schema: &RegionSchema, projection: &Projection, ) -> Result { - let timestamp_index = - projection - .projected_columns - .iter() - .enumerate() - .find_map(|(idx, col_idx)| { - if *col_idx == region_schema.timestamp_key_index() { - Some(idx) - } else { - None - } - }); let column_schemas: Vec<_> = projection .projected_columns .iter() @@ -237,7 +224,6 @@ impl ProjectedSchema { let schema = SchemaBuilder::try_from(column_schemas) .context(metadata::ConvertSchemaSnafu)? - .timestamp_index(timestamp_index) .version(region_schema.version()) .build() .context(metadata::InvalidSchemaSnafu)?; diff --git a/src/storage/src/schema/region.rs b/src/storage/src/schema/region.rs index e5cb36cb08..dcc9ec4656 100644 --- a/src/storage/src/schema/region.rs +++ b/src/storage/src/schema/region.rs @@ -112,11 +112,6 @@ impl RegionSchema { self.columns.column_metadata(idx) } - #[inline] - pub(crate) fn timestamp_key_index(&self) -> usize { - self.columns.timestamp_key_index() - } - #[cfg(test)] pub(crate) fn columns(&self) -> &[ColumnMetadata] { self.columns.columns() @@ -134,7 +129,6 @@ fn build_user_schema(columns: &ColumnsMetadata, version: u32) -> Result SchemaBuilder::try_from(column_schemas) .context(metadata::ConvertSchemaSnafu)? - .timestamp_index(Some(columns.timestamp_key_index())) .version(version) .build() .context(metadata::InvalidSchemaSnafu) diff --git a/src/storage/src/schema/store.rs b/src/storage/src/schema/store.rs index 2ac5856059..a91e1360b2 100644 --- a/src/storage/src/schema/store.rs +++ b/src/storage/src/schema/store.rs @@ -74,7 +74,6 @@ impl StoreSchema { StoreSchema::new( columns.columns().to_vec(), version, - columns.timestamp_key_index(), columns.row_key_end(), columns.user_column_end(), ) @@ -83,7 +82,6 @@ impl StoreSchema { pub(crate) fn new( columns: Vec, version: u32, - timestamp_key_index: usize, row_key_end: usize, user_column_end: usize, ) -> Result { @@ -94,7 +92,6 @@ impl StoreSchema { let schema = SchemaBuilder::try_from(column_schemas) .context(metadata::ConvertSchemaSnafu)? - .timestamp_index(Some(timestamp_key_index)) .version(version) .add_metadata(ROW_KEY_END_KEY, row_key_end.to_string()) .add_metadata(USER_COLUMN_END_KEY, user_column_end.to_string()) @@ -252,7 +249,6 @@ mod tests { let expect_schema = SchemaBuilder::try_from(column_schemas) .unwrap() .version(123) - .timestamp_index(Some(1)) .build() .unwrap(); // Only compare column schemas since SchemaRef in StoreSchema also contains other metadata that only used diff --git a/src/storage/src/test_util/descriptor_util.rs b/src/storage/src/test_util/descriptor_util.rs index 9d51a0b8b8..a02712bd3d 100644 --- a/src/storage/src/test_util/descriptor_util.rs +++ b/src/storage/src/test_util/descriptor_util.rs @@ -25,6 +25,7 @@ impl RegionDescBuilder { ConcreteDataType::timestamp_millis_datatype(), ) .is_nullable(false) + .is_time_index(true) .build() .unwrap(), ); @@ -44,7 +45,7 @@ impl RegionDescBuilder { } pub fn timestamp(mut self, column_def: ColumnDef) -> Self { - let column = self.new_column(column_def); + let column = self.new_ts_column(column_def); self.key_builder = self.key_builder.timestamp(column); self } @@ -90,6 +91,15 @@ impl RegionDescBuilder { self.last_column_id } + fn new_ts_column(&mut self, column_def: ColumnDef) -> ColumnDescriptor { + let datatype = column_def.1.data_type(); + ColumnDescriptorBuilder::new(self.alloc_column_id(), column_def.0, datatype) + .is_nullable(column_def.2) + .is_time_index(true) + .build() + .unwrap() + } + fn new_column(&mut self, column_def: ColumnDef) -> ColumnDescriptor { let datatype = column_def.1.data_type(); ColumnDescriptorBuilder::new(self.alloc_column_id(), column_def.0, datatype) diff --git a/src/storage/src/test_util/schema_util.rs b/src/storage/src/test_util/schema_util.rs index 1a2698b24d..af5e0fd056 100644 --- a/src/storage/src/test_util/schema_util.rs +++ b/src/storage/src/test_util/schema_util.rs @@ -17,15 +17,20 @@ pub fn new_schema_with_version( ) -> Schema { let column_schemas: Vec<_> = column_defs .iter() - .map(|column_def| { + .enumerate() + .map(|(index, column_def)| { let datatype = column_def.1.data_type(); - ColumnSchema::new(column_def.0, datatype, column_def.2) + if let Some(timestamp_index) = timestamp_index { + ColumnSchema::new(column_def.0, datatype, column_def.2) + .with_time_index(index == timestamp_index) + } else { + ColumnSchema::new(column_def.0, datatype, column_def.2) + } }) .collect(); SchemaBuilder::try_from(column_schemas) .unwrap() - .timestamp_index(timestamp_index) .version(version) .build() .unwrap() diff --git a/src/storage/src/write_batch/compat.rs b/src/storage/src/write_batch/compat.rs index 18268b370c..e8044a7d64 100644 --- a/src/storage/src/write_batch/compat.rs +++ b/src/storage/src/write_batch/compat.rs @@ -96,7 +96,8 @@ mod tests { ) -> SchemaBuilder { let mut column_schemas = vec![ ColumnSchema::new("k0", ConcreteDataType::int32_datatype(), false), - ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false), + ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false) + .with_time_index(true), ]; if let Some(v0_constraint) = v0_constraint { @@ -107,9 +108,7 @@ mod tests { ); } - SchemaBuilder::try_from(column_schemas) - .unwrap() - .timestamp_index(Some(1)) + SchemaBuilder::try_from(column_schemas).unwrap() } fn new_test_schema(v0_constraint: Option>) -> SchemaRef { diff --git a/src/store-api/src/storage/descriptors.rs b/src/store-api/src/storage/descriptors.rs index cb5dd5075d..7fa50f6e1d 100644 --- a/src/store-api/src/storage/descriptors.rs +++ b/src/store-api/src/storage/descriptors.rs @@ -23,6 +23,9 @@ pub struct ColumnDescriptor { /// Is column nullable, default is true. #[builder(default = "true")] is_nullable: bool, + /// Is time index column, default is true. + #[builder(default = "false")] + is_time_index: bool, /// Default constraint of column, default is None, which means no default constraint /// for this column, and user must provide a value for a not-null column. #[builder(default)] @@ -36,6 +39,10 @@ impl ColumnDescriptor { pub fn is_nullable(&self) -> bool { self.is_nullable } + #[inline] + pub fn is_time_index(&self) -> bool { + self.is_time_index + } #[inline] pub fn default_constraint(&self) -> Option<&ColumnDefaultConstraint> { @@ -46,6 +53,7 @@ impl ColumnDescriptor { /// be stored as metadata. pub fn to_column_schema(&self) -> ColumnSchema { ColumnSchema::new(&self.name, self.data_type.clone(), self.is_nullable) + .with_time_index(self.is_time_index) .with_default_constraint(self.default_constraint.clone()) .expect("ColumnDescriptor should validate default constraint") } @@ -226,6 +234,7 @@ mod tests { fn new_timestamp_desc() -> ColumnDescriptor { ColumnDescriptorBuilder::new(5, "timestamp", ConcreteDataType::int64_datatype()) + .is_time_index(true) .build() .unwrap() } diff --git a/src/table-engine/src/engine.rs b/src/table-engine/src/engine.rs index f85bbd9cb6..da0fe797f2 100644 --- a/src/table-engine/src/engine.rs +++ b/src/table-engine/src/engine.rs @@ -146,6 +146,7 @@ fn build_row_key_desc( ) .default_constraint(ts_column_schema.default_constraint().cloned()) .is_nullable(ts_column_schema.is_nullable()) + .is_time_index(true) .build() .context(BuildColumnDescriptorSnafu { column_name: &ts_column_schema.name, @@ -461,13 +462,13 @@ mod tests { "ts", ConcreteDataType::timestamp_datatype(common_time::timestamp::TimeUnit::Millisecond), true, - ), + ) + .with_time_index(true), ]; let schema = Arc::new( SchemaBuilder::try_from(column_schemas) .unwrap() - .timestamp_index(Some(2)) .build() .expect("ts must be timestamp column"), ); diff --git a/src/table-engine/src/table/test_util.rs b/src/table-engine/src/table/test_util.rs index 228a910242..fc95b1bad6 100644 --- a/src/table-engine/src/table/test_util.rs +++ b/src/table-engine/src/table/test_util.rs @@ -48,12 +48,12 @@ pub fn schema_for_test() -> Schema { "ts", ConcreteDataType::timestamp_datatype(common_time::timestamp::TimeUnit::Millisecond), true, - ), + ) + .with_time_index(true), ]; SchemaBuilder::try_from(column_schemas) .unwrap() - .timestamp_index(Some(3)) .build() .expect("ts must be timestamp column") } diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index f349771d2b..460576027b 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -201,7 +201,6 @@ impl TableMeta { table_name ), })? - .timestamp_index(table_schema.timestamp_index()) // Also bump the schema version. .version(table_schema.version() + 1); for (k, v) in table_schema.metadata().iter() { @@ -271,18 +270,6 @@ impl TableMeta { .filter(|column_schema| !column_names.contains(&column_schema.name)) .cloned() .collect(); - // Find the index of the timestamp column. - let timestamp_column = table_schema.timestamp_column(); - let timestamp_index = columns.iter().enumerate().find_map(|(idx, column_schema)| { - let is_timestamp = timestamp_column - .map(|c| c.name == column_schema.name) - .unwrap_or(false); - if is_timestamp { - Some(idx) - } else { - None - } - }); let mut builder = SchemaBuilder::try_from_columns(columns) .with_context(|_| error::SchemaBuildSnafu { @@ -291,8 +278,6 @@ impl TableMeta { table_name ), })? - // Need to use the newly computed timestamp index. - .timestamp_index(timestamp_index) // Also bump the schema version. .version(table_schema.version() + 1); for (k, v) in table_schema.metadata().iter() { @@ -483,12 +468,12 @@ mod tests { fn new_test_schema() -> Schema { let column_schemas = vec![ ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), - ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false), + ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false) + .with_time_index(true), ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true), ]; SchemaBuilder::try_from(column_schemas) .unwrap() - .timestamp_index(Some(1)) .version(123) .build() .unwrap() @@ -607,12 +592,12 @@ mod tests { ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true), ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true), - ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false), + ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false) + .with_time_index(true), ]; let schema = Arc::new( SchemaBuilder::try_from(column_schemas) .unwrap() - .timestamp_index(Some(3)) .version(123) .build() .unwrap(),