feat: move time index metadata from schema into field (#444)

* feat: move time index metadata from schema into field

* chore: remove useless code

* test: test select with column alias

* fix: conflicts with develop branch

* test: add test

* test: order by timestamp to ensure query results order

* fix: comment
This commit is contained in:
dennis zhuang
2022-11-11 15:36:27 +08:00
committed by GitHub
parent e7b4d24df5
commit 74ea529d1a
30 changed files with 249 additions and 165 deletions

View File

@@ -152,7 +152,8 @@ fn build_system_catalog_schema() -> Schema {
"timestamp".to_string(),
ConcreteDataType::timestamp_millis_datatype(),
false,
),
)
.with_time_index(true),
ColumnSchema::new(
"value".to_string(),
ConcreteDataType::binary_datatype(),
@@ -171,11 +172,7 @@ fn build_system_catalog_schema() -> Schema {
];
// The schema of this table must be valid.
SchemaBuilder::try_from(cols)
.unwrap()
.timestamp_index(Some(2))
.build()
.unwrap()
SchemaBuilder::try_from(cols).unwrap().build().unwrap()
}
pub fn build_table_insert_request(full_table_name: String, table_id: TableId) -> InsertRequest {

View File

@@ -124,6 +124,7 @@ pub fn build_create_table_request(
{
if !new_columns.contains(column_name) {
let mut is_nullable = true;
let mut is_time_index = false;
match *semantic_type {
TAG_SEMANTIC_TYPE => primary_key_indices.push(column_schemas.len()),
TIMESTAMP_SEMANTIC_TYPE => {
@@ -135,13 +136,15 @@ pub fn build_create_table_request(
}
);
timestamp_index = column_schemas.len();
is_time_index = true;
// Timestamp column must not be null.
is_nullable = false;
}
_ => {}
}
let column_schema = build_column_schema(column_name, *datatype, is_nullable)?;
let column_schema = build_column_schema(column_name, *datatype, is_nullable)?
.with_time_index(is_time_index);
column_schemas.push(column_schema);
new_columns.insert(column_name.to_string());
}
@@ -152,7 +155,6 @@ pub fn build_create_table_request(
let schema = Arc::new(
SchemaBuilder::try_from(column_schemas)
.unwrap()
.timestamp_index(Some(timestamp_index))
.build()
.context(CreateSchemaSnafu)?,
);
@@ -428,17 +430,13 @@ mod tests {
fn test_find_new_columns() {
let mut columns = Vec::with_capacity(1);
let cpu_column = build_column_schema("cpu", 10, true).unwrap();
let ts_column = build_column_schema("ts", 15, false).unwrap();
let ts_column = build_column_schema("ts", 15, false)
.unwrap()
.with_time_index(true);
columns.push(cpu_column);
columns.push(ts_column);
let schema = Arc::new(
SchemaBuilder::try_from(columns)
.unwrap()
.timestamp_index(Some(1))
.build()
.unwrap(),
);
let schema = Arc::new(SchemaBuilder::try_from(columns).unwrap().build().unwrap());
assert!(find_new_columns(&schema, &[]).unwrap().is_none());
@@ -539,13 +537,13 @@ mod tests {
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), true),
ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), true)
.with_time_index(true),
];
Arc::new(
SchemaBuilder::try_from(column_schemas)
.unwrap()
.timestamp_index(Some(3))
.build()
.unwrap(),
)

View File

@@ -72,6 +72,9 @@ pub enum Error {
#[snafu(display("Missing required field in protobuf, field: {}", field))]
MissingField { field: String, backtrace: Backtrace },
#[snafu(display("Missing timestamp column in request"))]
MissingTimestampColumn { backtrace: Backtrace },
#[snafu(display(
"Columns and values number mismatch, columns: {}, values: {}",
columns,
@@ -315,6 +318,7 @@ impl ErrorExt for Error {
| Error::KeyColumnNotFound { .. }
| Error::InvalidPrimaryKey { .. }
| Error::MissingField { .. }
| Error::MissingTimestampColumn { .. }
| Error::CatalogNotFound { .. }
| Error::SchemaNotFound { .. }
| Error::ConstraintNotSupported { .. }

View File

@@ -142,23 +142,30 @@ fn create_table_schema(expr: &CreateExpr) -> Result<SchemaRef> {
.iter()
.map(create_column_schema)
.collect::<Result<Vec<ColumnSchema>>>()?;
let ts_index = column_schemas
.iter()
.enumerate()
.find_map(|(i, column)| {
if column.name == expr.time_index {
Some(i)
ensure!(
column_schemas
.iter()
.any(|column| column.name == expr.time_index),
error::KeyColumnNotFoundSnafu {
name: &expr.time_index,
}
);
let column_schemas = column_schemas
.into_iter()
.map(|column_schema| {
if column_schema.name == expr.time_index {
column_schema.with_time_index(true)
} else {
None
column_schema
}
})
.context(error::KeyColumnNotFoundSnafu {
name: &expr.time_index,
})?;
.collect::<Vec<_>>();
Ok(Arc::new(
SchemaBuilder::try_from(column_schemas)
.context(error::CreateSchemaSnafu)?
.timestamp_index(Some(ts_index))
.build()
.context(error::CreateSchemaSnafu)?,
))
@@ -324,14 +331,14 @@ mod tests {
fn expected_table_schema() -> SchemaRef {
let column_schemas = vec![
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false),
ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false)
.with_time_index(true),
ColumnSchema::new("cpu", ConcreteDataType::float32_datatype(), true),
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
];
Arc::new(
SchemaBuilder::try_from(column_schemas)
.unwrap()
.timestamp_index(Some(1))
.build()
.unwrap(),
)

View File

@@ -129,13 +129,13 @@ mod tests {
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), true),
ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), true)
.with_time_index(true),
];
Arc::new(
SchemaBuilder::try_from(column_schemas)
.unwrap()
.timestamp_index(Some(3))
.build()
.unwrap(),
)

View File

@@ -36,7 +36,7 @@ impl SqlHandler {
}
AlterTableOperation::AddColumn { column_def } => AlterKind::AddColumns {
columns: vec![AddColumnRequest {
column_schema: column_def_to_schema(column_def)
column_schema: column_def_to_schema(column_def, false)
.context(error::ParseSqlSnafu)?,
// FIXME(dennis): supports adding key column
is_key: false,

View File

@@ -143,6 +143,8 @@ impl SqlHandler {
}
);
ensure!(ts_index != usize::MAX, error::MissingTimestampColumnSnafu);
if primary_keys.is_empty() {
info!(
"Creating table: {:?}.{:?}.{} but primary key not set, use time index column: {}",
@@ -154,13 +156,15 @@ impl SqlHandler {
let columns_schemas: Vec<_> = stmt
.columns
.iter()
.map(|column| column_def_to_schema(column).context(error::ParseSqlSnafu))
.enumerate()
.map(|(index, column)| {
column_def_to_schema(column, index == ts_index).context(error::ParseSqlSnafu)
})
.collect::<Result<Vec<_>>>()?;
let schema = Arc::new(
SchemaBuilder::try_from(columns_schemas)
.context(CreateSchemaSnafu)?
.timestamp_index(Some(ts_index))
.build()
.context(CreateSchemaSnafu)?,
);
@@ -239,7 +243,7 @@ mod tests {
PRIMARY KEY(host)) engine=mito with(regions=1);"#,
);
let error = handler.create_to_request(42, parsed_stmt).unwrap_err();
assert_matches!(error, Error::CreateSchema { .. });
assert_matches!(error, Error::MissingTimestampColumn { .. });
}
/// If primary key is not specified, time index should be used as primary key.

View File

@@ -81,6 +81,19 @@ async fn test_sql_api() {
body,
r#"{"code":0,"output":{"records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"Timestamp"}]},"rows":[[66.6,0]]}}}"#
);
// select with column alias
let res = client
.get("/v1/sql?sql=select cpu as c, ts as time from demo limit 10")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = res.text().await;
assert_eq!(
body,
r#"{"code":0,"output":{"records":{"schema":{"column_schemas":[{"name":"c","data_type":"Float64"},{"name":"time","data_type":"Timestamp"}]},"rows":[[66.6,0]]}}}"#
);
}
#[tokio::test(flavor = "multi_thread")]

View File

@@ -112,7 +112,28 @@ async fn test_execute_insert_query_with_i64_timestamp() {
.unwrap();
assert!(matches!(output, Output::AffectedRows(2)));
let query_output = instance.execute_sql("select ts from demo").await.unwrap();
let query_output = instance
.execute_sql("select ts from demo order by ts")
.await
.unwrap();
match query_output {
Output::Stream(s) => {
let batches = util::collect(s).await.unwrap();
let columns = batches[0].df_recordbatch.columns();
assert_eq!(1, columns.len());
assert_eq!(
&Int64Array::from_slice(&[1655276557000, 1655276558000]),
columns[0].as_any().downcast_ref::<Int64Array>().unwrap()
);
}
_ => unreachable!(),
}
let query_output = instance
.execute_sql("select ts as time from demo order by ts")
.await
.unwrap();
match query_output {
Output::Stream(s) => {

View File

@@ -48,7 +48,7 @@ pub async fn create_test_table(instance: &Instance, ts_type: ConcreteDataType) -
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("ts", ts_type, true),
ColumnSchema::new("ts", ts_type, true).with_time_index(true),
];
let table_name = "demo";
@@ -65,7 +65,6 @@ pub async fn create_test_table(instance: &Instance, ts_type: ConcreteDataType) -
schema: Arc::new(
SchemaBuilder::try_from(column_schemas)
.unwrap()
.timestamp_index(Some(3))
.build()
.expect("ts is expected to be timestamp column"),
),

View File

@@ -55,6 +55,13 @@ pub enum Error {
#[snafu(display("Invalid timestamp index: {}", index))]
InvalidTimestampIndex { index: usize, backtrace: Backtrace },
#[snafu(display("Duplicate timestamp index, exists: {}, new: {}", exists, new))]
DuplicateTimestampIndex {
exists: usize,
new: usize,
backtrace: Backtrace,
},
#[snafu(display("{}", msg))]
CastType { msg: String, backtrace: Backtrace },

View File

@@ -7,7 +7,7 @@ use std::sync::Arc;
pub use arrow::datatypes::Metadata;
use arrow::datatypes::{Field, Schema as ArrowSchema};
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use snafu::{ensure, ResultExt};
use crate::data_type::{ConcreteDataType, DataType};
use crate::error::{self, DeserializeSnafu, Error, Result, SerializeSnafu};
@@ -15,13 +15,8 @@ pub use crate::schema::constraint::ColumnDefaultConstraint;
pub use crate::schema::raw::RawSchema;
use crate::vectors::VectorRef;
/// Key used to store column name of the timestamp column in metadata.
///
/// Instead of storing the column index, we store the column name as the
/// query engine may modify the column order of the arrow schema, then
/// we would fail to recover the correct timestamp column when converting
/// the arrow schema back to our schema.
const TIMESTAMP_COLUMN_KEY: &str = "greptime:timestamp_column";
/// Key used to store whether the column is time index in arrow field's metadata.
const TIME_INDEX_KEY: &str = "greptime:time_index";
/// Key used to store version number of the schema in metadata.
const VERSION_KEY: &str = "greptime:version";
/// Key used to store default constraint in arrow field's metadata.
@@ -33,6 +28,7 @@ pub struct ColumnSchema {
pub name: String,
pub data_type: ConcreteDataType,
is_nullable: bool,
is_time_index: bool,
default_constraint: Option<ColumnDefaultConstraint>,
metadata: Metadata,
}
@@ -47,11 +43,17 @@ impl ColumnSchema {
name: name.into(),
data_type,
is_nullable,
is_time_index: false,
default_constraint: None,
metadata: Metadata::new(),
}
}
#[inline]
pub fn is_time_index(&self) -> bool {
self.is_time_index
}
#[inline]
pub fn is_nullable(&self) -> bool {
self.is_nullable
@@ -67,6 +69,17 @@ impl ColumnSchema {
&self.metadata
}
pub fn with_time_index(mut self, is_time_index: bool) -> Self {
self.is_time_index = is_time_index;
if is_time_index {
self.metadata
.insert(TIME_INDEX_KEY.to_string(), "true".to_string());
} else {
self.metadata.remove(TIME_INDEX_KEY);
}
self
}
pub fn with_default_constraint(
mut self,
default_constraint: Option<ColumnDefaultConstraint>,
@@ -229,24 +242,21 @@ impl TryFrom<Vec<ColumnSchema>> for SchemaBuilder {
impl SchemaBuilder {
pub fn try_from_columns(column_schemas: Vec<ColumnSchema>) -> Result<Self> {
let (fields, name_to_index) = collect_fields(&column_schemas)?;
let FieldsAndIndices {
fields,
name_to_index,
timestamp_index,
} = collect_fields(&column_schemas)?;
Ok(Self {
column_schemas,
name_to_index,
fields,
timestamp_index,
..Default::default()
})
}
/// Set timestamp index.
///
/// The validation of timestamp column is done in `build()`.
pub fn timestamp_index(mut self, timestamp_index: Option<usize>) -> Self {
self.timestamp_index = timestamp_index;
self
}
pub fn version(mut self, version: u32) -> Self {
self.version = version;
self
@@ -263,10 +273,8 @@ impl SchemaBuilder {
pub fn build(mut self) -> Result<Schema> {
if let Some(timestamp_index) = self.timestamp_index {
validate_timestamp_index(&self.column_schemas, timestamp_index)?;
let timestamp_name = self.column_schemas[timestamp_index].name.clone();
self.metadata
.insert(TIMESTAMP_COLUMN_KEY.to_string(), timestamp_name);
}
self.metadata
.insert(VERSION_KEY.to_string(), self.version.to_string());
@@ -282,16 +290,37 @@ impl SchemaBuilder {
}
}
fn collect_fields(column_schemas: &[ColumnSchema]) -> Result<(Vec<Field>, HashMap<String, usize>)> {
struct FieldsAndIndices {
fields: Vec<Field>,
name_to_index: HashMap<String, usize>,
timestamp_index: Option<usize>,
}
fn collect_fields(column_schemas: &[ColumnSchema]) -> Result<FieldsAndIndices> {
let mut fields = Vec::with_capacity(column_schemas.len());
let mut name_to_index = HashMap::with_capacity(column_schemas.len());
let mut timestamp_index = None;
for (index, column_schema) in column_schemas.iter().enumerate() {
if column_schema.is_time_index() {
ensure!(
timestamp_index.is_none(),
error::DuplicateTimestampIndexSnafu {
exists: timestamp_index.unwrap(),
new: index,
}
);
timestamp_index = Some(index);
}
let field = Field::try_from(column_schema)?;
fields.push(field);
name_to_index.insert(column_schema.name.clone(), index);
}
Ok((fields, name_to_index))
Ok(FieldsAndIndices {
fields,
name_to_index,
timestamp_index,
})
}
fn validate_timestamp_index(column_schemas: &[ColumnSchema], timestamp_index: usize) -> Result<()> {
@@ -309,6 +338,12 @@ fn validate_timestamp_index(column_schemas: &[ColumnSchema], timestamp_index: us
index: timestamp_index,
}
);
ensure!(
column_schema.is_time_index(),
error::InvalidTimestampIndexSnafu {
index: timestamp_index,
}
);
Ok(())
}
@@ -325,11 +360,13 @@ impl TryFrom<&Field> for ColumnSchema {
Some(json) => Some(serde_json::from_str(&json).context(DeserializeSnafu { json })?),
None => None,
};
let is_time_index = metadata.contains_key(TIME_INDEX_KEY);
Ok(ColumnSchema {
name: field.name.clone(),
data_type,
is_nullable: field.is_nullable,
is_time_index,
default_constraint,
metadata,
})
@@ -377,15 +414,21 @@ impl TryFrom<Arc<ArrowSchema>> for Schema {
column_schemas.push(column_schema);
}
let timestamp_name = arrow_schema.metadata.get(TIMESTAMP_COLUMN_KEY);
let mut timestamp_index = None;
if let Some(name) = timestamp_name {
let index = name_to_index
.get(name)
.context(error::TimestampNotFoundSnafu { name })?;
validate_timestamp_index(&column_schemas, *index)?;
timestamp_index = Some(*index);
for (index, column_schema) in column_schemas.iter().enumerate() {
if column_schema.is_time_index() {
validate_timestamp_index(&column_schemas, index)?;
ensure!(
timestamp_index.is_none(),
error::DuplicateTimestampIndexSnafu {
exists: timestamp_index.unwrap(),
new: index,
}
);
timestamp_index = Some(index);
}
}
let version = try_parse_version(&arrow_schema.metadata, VERSION_KEY)?;
Ok(Self {
@@ -549,11 +592,6 @@ mod tests {
let schema = SchemaBuilder::default().build().unwrap();
assert_eq!(0, schema.num_columns());
assert!(schema.is_empty());
assert!(SchemaBuilder::default()
.timestamp_index(Some(0))
.build()
.is_err());
}
#[test]
@@ -602,11 +640,11 @@ mod tests {
fn test_schema_with_timestamp() {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false),
ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false)
.with_time_index(true),
];
let schema = SchemaBuilder::try_from(column_schemas.clone())
.unwrap()
.timestamp_index(Some(1))
.version(123)
.build()
.unwrap();
@@ -623,22 +661,23 @@ mod tests {
#[test]
fn test_schema_wrong_timestamp() {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true)
.with_time_index(true),
ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false),
];
assert!(SchemaBuilder::try_from(column_schemas.clone())
.unwrap()
.timestamp_index(Some(0))
.build()
.is_err());
assert!(SchemaBuilder::try_from(column_schemas.clone())
.unwrap()
.timestamp_index(Some(1))
.build()
.is_err());
assert!(SchemaBuilder::try_from(column_schemas)
.unwrap()
.timestamp_index(Some(1))
.build()
.is_err());
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false)
.with_time_index(true),
];
assert!(SchemaBuilder::try_from(column_schemas)
.unwrap()
.build()
.is_err());
}

View File

@@ -18,7 +18,6 @@ impl TryFrom<RawSchema> for Schema {
fn try_from(raw: RawSchema) -> Result<Schema> {
SchemaBuilder::try_from(raw.column_schemas)?
.timestamp_index(raw.timestamp_index)
.version(raw.version)
.build()
}
@@ -43,11 +42,11 @@ mod tests {
fn test_raw_convert() {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false),
ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false)
.with_time_index(true),
];
let schema = SchemaBuilder::try_from(column_schemas)
.unwrap()
.timestamp_index(Some(1))
.version(123)
.build()
.unwrap();

View File

@@ -160,12 +160,13 @@ fn create_to_expr(create: CreateTable) -> Result<CreateExpr> {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(&create.name).context(error::ParseSqlSnafu)?;
let time_index = find_time_index(&create.constraints)?;
let expr = CreateExpr {
catalog_name: Some(catalog_name),
schema_name: Some(schema_name),
table_name,
column_defs: columns_to_expr(&create.columns)?,
time_index: find_time_index(&create.constraints)?,
column_defs: columns_to_expr(&create.columns, &time_index)?,
time_index,
primary_keys: find_primary_keys(&create.constraints)?,
create_if_not_exists: create.if_not_exists,
// TODO(LFC): Fill in other table options.
@@ -222,10 +223,12 @@ fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
Ok(time_index.first().unwrap().to_string())
}
fn columns_to_expr(column_defs: &[ColumnDef]) -> Result<Vec<GrpcColumnDef>> {
fn columns_to_expr(column_defs: &[ColumnDef], time_index: &str) -> Result<Vec<GrpcColumnDef>> {
let column_schemas = column_defs
.iter()
.map(|c| column_def_to_schema(c).context(error::ParseSqlSnafu))
.map(|c| {
column_def_to_schema(c, c.name.to_string() == time_index).context(error::ParseSqlSnafu)
})
.collect::<Result<Vec<ColumnSchema>>>()?;
let column_datatypes = column_schemas
@@ -423,8 +426,7 @@ mod tests {
ts_millis_values: vec![1000, 2000, 3000, 4000],
..Default::default()
}),
// FIXME(dennis): looks like the read schema in table scan doesn't have timestamp index, we have to investigate it.
semantic_type: SemanticType::Field as i32,
semantic_type: SemanticType::Timestamp as i32,
datatype: ColumnDataType::Timestamp as i32,
..Default::default()
};

View File

@@ -204,7 +204,8 @@ fn build_scripts_schema() -> Schema {
"timestamp".to_string(),
ConcreteDataType::timestamp_millis_datatype(),
false,
),
)
.with_time_index(true),
ColumnSchema::new(
"gmt_created".to_string(),
ConcreteDataType::timestamp_millis_datatype(),
@@ -218,9 +219,5 @@ fn build_scripts_schema() -> Schema {
];
// Schema is always valid here
SchemaBuilder::try_from(cols)
.unwrap()
.timestamp_index(Some(3))
.build()
.unwrap()
SchemaBuilder::try_from(cols).unwrap().build().unwrap()
}

View File

@@ -220,7 +220,7 @@ fn parse_column_default_constraint(
// TODO(yingwen): Make column nullable by default, and checks invalid case like
// a column is not nullable but has a default value null.
/// Create a `ColumnSchema` from `ColumnDef`.
pub fn column_def_to_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
pub fn column_def_to_schema(column_def: &ColumnDef, is_time_index: bool) -> Result<ColumnSchema> {
let is_nullable = column_def
.options
.iter()
@@ -232,6 +232,7 @@ pub fn column_def_to_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
parse_column_default_constraint(&name, &data_type, &column_def.options)?;
ColumnSchema::new(name, data_type, is_nullable)
.with_time_index(is_time_index)
.with_default_constraint(default_constraint)
.context(error::InvalidDefaultSnafu {
column: &column_def.name.value,

View File

@@ -9,15 +9,20 @@ pub type ColumnDef<'a> = (&'a str, LogicalTypeId, bool);
pub fn new_schema(column_defs: &[ColumnDef], timestamp_index: Option<usize>) -> Schema {
let column_schemas: Vec<_> = column_defs
.iter()
.map(|column_def| {
.enumerate()
.map(|(index, column_def)| {
let datatype = column_def.1.data_type();
ColumnSchema::new(column_def.0, datatype, column_def.2)
if let Some(timestamp_index) = timestamp_index {
ColumnSchema::new(column_def.0, datatype, column_def.2)
.with_time_index(index == timestamp_index)
} else {
ColumnSchema::new(column_def.0, datatype, column_def.2)
}
})
.collect();
SchemaBuilder::try_from(column_schemas)
.unwrap()
.timestamp_index(timestamp_index)
.build()
.unwrap()
}

View File

@@ -20,6 +20,7 @@ message ColumnSchema {
string name = 1;
DataType data_type = 2;
bool is_nullable = 3;
bool is_time_index = 4;
}
message Mutation {

View File

@@ -383,8 +383,10 @@ impl ColumnMetadata {
/// would store additional metadatas to the ColumnSchema.
pub fn to_column_schema(&self) -> Result<ColumnSchema> {
let desc = &self.desc;
ColumnSchema::new(&desc.name, desc.data_type.clone(), desc.is_nullable())
.with_metadata(self.to_metadata())
.with_time_index(self.desc.is_time_index())
.with_default_constraint(desc.default_constraint().cloned())
.context(ToColumnSchemaSnafu)
}
@@ -405,6 +407,7 @@ impl ColumnMetadata {
column_schema.data_type.clone(),
)
.is_nullable(column_schema.is_nullable())
.is_time_index(column_schema.is_time_index())
.default_constraint(column_schema.default_constraint().cloned())
.comment(comment)
.build()
@@ -1002,6 +1005,7 @@ mod tests {
let timestamp = ColumnDescriptorBuilder::new(2, "ts", ConcreteDataType::int64_datatype())
.is_nullable(false)
.is_time_index(true)
.build()
.unwrap();
let row_key = RowKeyDescriptorBuilder::new(timestamp)
@@ -1021,6 +1025,7 @@ mod tests {
let timestamp =
ColumnDescriptorBuilder::new(2, "ts", ConcreteDataType::timestamp_millis_datatype())
.is_nullable(false)
.is_time_index(true)
.build()
.unwrap();
let row_key = RowKeyDescriptorBuilder::new(timestamp)

View File

@@ -78,11 +78,9 @@ impl TryFrom<Schema> for schema::SchemaRef {
.map(schema::ColumnSchema::try_from)
.collect::<Result<Vec<_>>>()?;
let timestamp_index = schema.timestamp_index.map(|index| index.value as usize);
let schema = Arc::new(
schema::SchemaBuilder::try_from(column_schemas)
.context(ConvertSchemaSnafu)?
.timestamp_index(timestamp_index)
.build()
.context(ConvertSchemaSnafu)?,
);
@@ -97,6 +95,7 @@ impl From<&schema::ColumnSchema> for ColumnSchema {
name: cs.name.clone(),
data_type: DataType::from(&cs.data_type).into(),
is_nullable: cs.is_nullable(),
is_time_index: cs.is_time_index(),
}
}
}
@@ -110,7 +109,8 @@ impl TryFrom<&ColumnSchema> for schema::ColumnSchema {
column_schema.name.clone(),
data_type.into(),
column_schema.is_nullable,
))
)
.with_time_index(column_schema.is_time_index))
} else {
InvalidDataTypeSnafu {
data_type: column_schema.data_type,

View File

@@ -200,7 +200,6 @@ impl ProjectedSchema {
let store_schema = StoreSchema::new(
columns,
region_schema.version(),
region_schema.timestamp_key_index(),
region_schema.row_key_end(),
projection.num_user_columns,
)?;
@@ -212,18 +211,6 @@ impl ProjectedSchema {
region_schema: &RegionSchema,
projection: &Projection,
) -> Result<SchemaRef> {
let timestamp_index =
projection
.projected_columns
.iter()
.enumerate()
.find_map(|(idx, col_idx)| {
if *col_idx == region_schema.timestamp_key_index() {
Some(idx)
} else {
None
}
});
let column_schemas: Vec<_> = projection
.projected_columns
.iter()
@@ -237,7 +224,6 @@ impl ProjectedSchema {
let schema = SchemaBuilder::try_from(column_schemas)
.context(metadata::ConvertSchemaSnafu)?
.timestamp_index(timestamp_index)
.version(region_schema.version())
.build()
.context(metadata::InvalidSchemaSnafu)?;

View File

@@ -112,11 +112,6 @@ impl RegionSchema {
self.columns.column_metadata(idx)
}
#[inline]
pub(crate) fn timestamp_key_index(&self) -> usize {
self.columns.timestamp_key_index()
}
#[cfg(test)]
pub(crate) fn columns(&self) -> &[ColumnMetadata] {
self.columns.columns()
@@ -134,7 +129,6 @@ fn build_user_schema(columns: &ColumnsMetadata, version: u32) -> Result<Schema>
SchemaBuilder::try_from(column_schemas)
.context(metadata::ConvertSchemaSnafu)?
.timestamp_index(Some(columns.timestamp_key_index()))
.version(version)
.build()
.context(metadata::InvalidSchemaSnafu)

View File

@@ -74,7 +74,6 @@ impl StoreSchema {
StoreSchema::new(
columns.columns().to_vec(),
version,
columns.timestamp_key_index(),
columns.row_key_end(),
columns.user_column_end(),
)
@@ -83,7 +82,6 @@ impl StoreSchema {
pub(crate) fn new(
columns: Vec<ColumnMetadata>,
version: u32,
timestamp_key_index: usize,
row_key_end: usize,
user_column_end: usize,
) -> Result<StoreSchema> {
@@ -94,7 +92,6 @@ impl StoreSchema {
let schema = SchemaBuilder::try_from(column_schemas)
.context(metadata::ConvertSchemaSnafu)?
.timestamp_index(Some(timestamp_key_index))
.version(version)
.add_metadata(ROW_KEY_END_KEY, row_key_end.to_string())
.add_metadata(USER_COLUMN_END_KEY, user_column_end.to_string())
@@ -252,7 +249,6 @@ mod tests {
let expect_schema = SchemaBuilder::try_from(column_schemas)
.unwrap()
.version(123)
.timestamp_index(Some(1))
.build()
.unwrap();
// Only compare column schemas since SchemaRef in StoreSchema also contains other metadata that only used

View File

@@ -25,6 +25,7 @@ impl RegionDescBuilder {
ConcreteDataType::timestamp_millis_datatype(),
)
.is_nullable(false)
.is_time_index(true)
.build()
.unwrap(),
);
@@ -44,7 +45,7 @@ impl RegionDescBuilder {
}
pub fn timestamp(mut self, column_def: ColumnDef) -> Self {
let column = self.new_column(column_def);
let column = self.new_ts_column(column_def);
self.key_builder = self.key_builder.timestamp(column);
self
}
@@ -90,6 +91,15 @@ impl RegionDescBuilder {
self.last_column_id
}
fn new_ts_column(&mut self, column_def: ColumnDef) -> ColumnDescriptor {
let datatype = column_def.1.data_type();
ColumnDescriptorBuilder::new(self.alloc_column_id(), column_def.0, datatype)
.is_nullable(column_def.2)
.is_time_index(true)
.build()
.unwrap()
}
fn new_column(&mut self, column_def: ColumnDef) -> ColumnDescriptor {
let datatype = column_def.1.data_type();
ColumnDescriptorBuilder::new(self.alloc_column_id(), column_def.0, datatype)

View File

@@ -17,15 +17,20 @@ pub fn new_schema_with_version(
) -> Schema {
let column_schemas: Vec<_> = column_defs
.iter()
.map(|column_def| {
.enumerate()
.map(|(index, column_def)| {
let datatype = column_def.1.data_type();
ColumnSchema::new(column_def.0, datatype, column_def.2)
if let Some(timestamp_index) = timestamp_index {
ColumnSchema::new(column_def.0, datatype, column_def.2)
.with_time_index(index == timestamp_index)
} else {
ColumnSchema::new(column_def.0, datatype, column_def.2)
}
})
.collect();
SchemaBuilder::try_from(column_schemas)
.unwrap()
.timestamp_index(timestamp_index)
.version(version)
.build()
.unwrap()

View File

@@ -96,7 +96,8 @@ mod tests {
) -> SchemaBuilder {
let mut column_schemas = vec![
ColumnSchema::new("k0", ConcreteDataType::int32_datatype(), false),
ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false),
ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false)
.with_time_index(true),
];
if let Some(v0_constraint) = v0_constraint {
@@ -107,9 +108,7 @@ mod tests {
);
}
SchemaBuilder::try_from(column_schemas)
.unwrap()
.timestamp_index(Some(1))
SchemaBuilder::try_from(column_schemas).unwrap()
}
fn new_test_schema(v0_constraint: Option<Option<ColumnDefaultConstraint>>) -> SchemaRef {

View File

@@ -23,6 +23,9 @@ pub struct ColumnDescriptor {
/// Is column nullable, default is true.
#[builder(default = "true")]
is_nullable: bool,
/// Is time index column, default is true.
#[builder(default = "false")]
is_time_index: bool,
/// Default constraint of column, default is None, which means no default constraint
/// for this column, and user must provide a value for a not-null column.
#[builder(default)]
@@ -36,6 +39,10 @@ impl ColumnDescriptor {
pub fn is_nullable(&self) -> bool {
self.is_nullable
}
#[inline]
pub fn is_time_index(&self) -> bool {
self.is_time_index
}
#[inline]
pub fn default_constraint(&self) -> Option<&ColumnDefaultConstraint> {
@@ -46,6 +53,7 @@ impl ColumnDescriptor {
/// be stored as metadata.
pub fn to_column_schema(&self) -> ColumnSchema {
ColumnSchema::new(&self.name, self.data_type.clone(), self.is_nullable)
.with_time_index(self.is_time_index)
.with_default_constraint(self.default_constraint.clone())
.expect("ColumnDescriptor should validate default constraint")
}
@@ -226,6 +234,7 @@ mod tests {
fn new_timestamp_desc() -> ColumnDescriptor {
ColumnDescriptorBuilder::new(5, "timestamp", ConcreteDataType::int64_datatype())
.is_time_index(true)
.build()
.unwrap()
}

View File

@@ -146,6 +146,7 @@ fn build_row_key_desc(
)
.default_constraint(ts_column_schema.default_constraint().cloned())
.is_nullable(ts_column_schema.is_nullable())
.is_time_index(true)
.build()
.context(BuildColumnDescriptorSnafu {
column_name: &ts_column_schema.name,
@@ -461,13 +462,13 @@ mod tests {
"ts",
ConcreteDataType::timestamp_datatype(common_time::timestamp::TimeUnit::Millisecond),
true,
),
)
.with_time_index(true),
];
let schema = Arc::new(
SchemaBuilder::try_from(column_schemas)
.unwrap()
.timestamp_index(Some(2))
.build()
.expect("ts must be timestamp column"),
);

View File

@@ -48,12 +48,12 @@ pub fn schema_for_test() -> Schema {
"ts",
ConcreteDataType::timestamp_datatype(common_time::timestamp::TimeUnit::Millisecond),
true,
),
)
.with_time_index(true),
];
SchemaBuilder::try_from(column_schemas)
.unwrap()
.timestamp_index(Some(3))
.build()
.expect("ts must be timestamp column")
}

View File

@@ -201,7 +201,6 @@ impl TableMeta {
table_name
),
})?
.timestamp_index(table_schema.timestamp_index())
// Also bump the schema version.
.version(table_schema.version() + 1);
for (k, v) in table_schema.metadata().iter() {
@@ -271,18 +270,6 @@ impl TableMeta {
.filter(|column_schema| !column_names.contains(&column_schema.name))
.cloned()
.collect();
// Find the index of the timestamp column.
let timestamp_column = table_schema.timestamp_column();
let timestamp_index = columns.iter().enumerate().find_map(|(idx, column_schema)| {
let is_timestamp = timestamp_column
.map(|c| c.name == column_schema.name)
.unwrap_or(false);
if is_timestamp {
Some(idx)
} else {
None
}
});
let mut builder = SchemaBuilder::try_from_columns(columns)
.with_context(|_| error::SchemaBuildSnafu {
@@ -291,8 +278,6 @@ impl TableMeta {
table_name
),
})?
// Need to use the newly computed timestamp index.
.timestamp_index(timestamp_index)
// Also bump the schema version.
.version(table_schema.version() + 1);
for (k, v) in table_schema.metadata().iter() {
@@ -483,12 +468,12 @@ mod tests {
fn new_test_schema() -> Schema {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false),
ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false)
.with_time_index(true),
ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
];
SchemaBuilder::try_from(column_schemas)
.unwrap()
.timestamp_index(Some(1))
.version(123)
.build()
.unwrap()
@@ -607,12 +592,12 @@ mod tests {
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false),
ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false)
.with_time_index(true),
];
let schema = Arc::new(
SchemaBuilder::try_from(column_schemas)
.unwrap()
.timestamp_index(Some(3))
.version(123)
.build()
.unwrap(),