diff --git a/README.md b/README.md index 0280daada4..dead8eac22 100644 --- a/README.md +++ b/README.md @@ -125,7 +125,7 @@ cargo run -- --log-dir=logs --log-level=debug frontend start -c ./config/fronten cpu DOUBLE DEFAULT 0, memory DOUBLE, TIME INDEX (ts), - PRIMARY KEY(ts,host)) ENGINE=mito WITH(regions=1); + PRIMARY KEY(host)) ENGINE=mito WITH(regions=1); ``` 3. Insert data: diff --git a/src/api/greptime/v1/column.proto b/src/api/greptime/v1/column.proto index 59338bbd68..ec6993abe9 100644 --- a/src/api/greptime/v1/column.proto +++ b/src/api/greptime/v1/column.proto @@ -49,7 +49,7 @@ message Column { bytes null_mask = 4; // Helpful in creating vector from column. - optional ColumnDataType datatype = 5; + ColumnDataType datatype = 5; } message ColumnDef { diff --git a/src/client/src/database.rs b/src/client/src/database.rs index a6e1b5e5be..6a26131da9 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -23,10 +23,7 @@ use snafu::{ensure, OptionExt, ResultExt}; use crate::error; use crate::{ - error::{ - ConvertSchemaSnafu, DatanodeSnafu, DecodeSelectSnafu, EncodePhysicalSnafu, - MissingFieldSnafu, - }, + error::{ConvertSchemaSnafu, DatanodeSnafu, DecodeSelectSnafu, EncodePhysicalSnafu}, Client, Result, }; @@ -240,12 +237,8 @@ impl TryFrom for Output { } fn column_to_vector(column: &Column, rows: u32) -> Result { - let wrapper = ColumnDataTypeWrapper::try_new( - column - .datatype - .context(MissingFieldSnafu { field: "datatype" })?, - ) - .context(error::ColumnDataTypeSnafu)?; + let wrapper = + ColumnDataTypeWrapper::try_new(column.datatype).context(error::ColumnDataTypeSnafu)?; let column_datatype = wrapper.datatype(); let rows = rows as usize; @@ -348,7 +341,7 @@ mod tests { #[test] fn test_column_to_vector() { let mut column = create_test_column(Arc::new(BooleanVector::from(vec![true]))); - column.datatype = Some(-100); + column.datatype = -100; let result = column_to_vector(&column, 1); assert!(result.is_err()); assert_eq!( @@ -426,7 +419,7 @@ mod tests { semantic_type: 1, values: Some(values(&[array.clone()]).unwrap()), null_mask: null_mask(&vec![array], vector.len()), - datatype: Some(wrapper.datatype() as i32), + datatype: wrapper.datatype() as i32, } } } diff --git a/src/common/grpc/src/writer.rs b/src/common/grpc/src/writer.rs index 2404b74cc1..2f77cbe85d 100644 --- a/src/common/grpc/src/writer.rs +++ b/src/common/grpc/src/writer.rs @@ -35,7 +35,7 @@ impl LinesWriter { SemanticType::Timestamp, ); ensure!( - column.datatype == Some(ColumnDataType::Timestamp.into()), + column.datatype == ColumnDataType::Timestamp as i32, TypeMismatchSnafu { column_name, expected: "timestamp", @@ -52,7 +52,7 @@ impl LinesWriter { pub fn write_tag(&mut self, column_name: &str, value: &str) -> Result<()> { let (idx, column) = self.mut_column(column_name, ColumnDataType::String, SemanticType::Tag); ensure!( - column.datatype == Some(ColumnDataType::String.into()), + column.datatype == ColumnDataType::String as i32, TypeMismatchSnafu { column_name, expected: "string", @@ -70,7 +70,7 @@ impl LinesWriter { let (idx, column) = self.mut_column(column_name, ColumnDataType::Uint64, SemanticType::Field); ensure!( - column.datatype == Some(ColumnDataType::Uint64.into()), + column.datatype == ColumnDataType::Uint64 as i32, TypeMismatchSnafu { column_name, expected: "u64", @@ -88,7 +88,7 @@ impl LinesWriter { let (idx, column) = self.mut_column(column_name, ColumnDataType::Int64, SemanticType::Field); ensure!( - column.datatype == Some(ColumnDataType::Int64.into()), + column.datatype == ColumnDataType::Int64 as i32, TypeMismatchSnafu { column_name, expected: "i64", @@ -106,7 +106,7 @@ impl LinesWriter { let (idx, column) = self.mut_column(column_name, ColumnDataType::Float64, SemanticType::Field); ensure!( - column.datatype == Some(ColumnDataType::Float64.into()), + column.datatype == ColumnDataType::Float64 as i32, TypeMismatchSnafu { column_name, expected: "f64", @@ -124,7 +124,7 @@ impl LinesWriter { let (idx, column) = self.mut_column(column_name, ColumnDataType::String, SemanticType::Field); ensure!( - column.datatype == Some(ColumnDataType::String.into()), + column.datatype == ColumnDataType::String as i32, TypeMismatchSnafu { column_name, expected: "string", @@ -142,7 +142,7 @@ impl LinesWriter { let (idx, column) = self.mut_column(column_name, ColumnDataType::Boolean, SemanticType::Field); ensure!( - column.datatype == Some(ColumnDataType::Boolean.into()), + column.datatype == ColumnDataType::Boolean as i32, TypeMismatchSnafu { column_name, expected: "boolean", @@ -197,7 +197,7 @@ impl LinesWriter { column_name: column_name.to_string(), semantic_type: semantic_type.into(), values: Some(Values::with_capacity(datatype, to_insert)), - datatype: Some(datatype.into()), + datatype: datatype as i32, null_mask: Vec::default(), }); column_names.insert(column_name.to_string(), new_idx); @@ -275,7 +275,7 @@ mod tests { let column = &columns[0]; assert_eq!("host", columns[0].column_name); - assert_eq!(Some(ColumnDataType::String as i32), column.datatype); + assert_eq!(ColumnDataType::String as i32, column.datatype); assert_eq!(SemanticType::Tag as i32, column.semantic_type); assert_eq!( vec!["host1", "host2", "host3"], @@ -285,28 +285,28 @@ mod tests { let column = &columns[1]; assert_eq!("cpu", column.column_name); - assert_eq!(Some(ColumnDataType::Float64 as i32), column.datatype); + assert_eq!(ColumnDataType::Float64 as i32, column.datatype); assert_eq!(SemanticType::Field as i32, column.semantic_type); assert_eq!(vec![0.5, 0.4], column.values.as_ref().unwrap().f64_values); verify_null_mask(&column.null_mask, vec![false, true, false]); let column = &columns[2]; assert_eq!("memory", column.column_name); - assert_eq!(Some(ColumnDataType::Float64 as i32), column.datatype); + assert_eq!(ColumnDataType::Float64 as i32, column.datatype); assert_eq!(SemanticType::Field as i32, column.semantic_type); assert_eq!(vec![0.4], column.values.as_ref().unwrap().f64_values); verify_null_mask(&column.null_mask, vec![false, true, true]); let column = &columns[3]; assert_eq!("name", column.column_name); - assert_eq!(Some(ColumnDataType::String as i32), column.datatype); + assert_eq!(ColumnDataType::String as i32, column.datatype); assert_eq!(SemanticType::Field as i32, column.semantic_type); assert_eq!(vec!["name1"], column.values.as_ref().unwrap().string_values); verify_null_mask(&column.null_mask, vec![false, true, true]); let column = &columns[4]; assert_eq!("ts", column.column_name); - assert_eq!(Some(ColumnDataType::Timestamp as i32), column.datatype); + assert_eq!(ColumnDataType::Timestamp as i32, column.datatype); assert_eq!(SemanticType::Timestamp as i32, column.semantic_type); assert_eq!( vec![101011000, 102011001, 103011002], @@ -316,28 +316,28 @@ mod tests { let column = &columns[5]; assert_eq!("enable_reboot", column.column_name); - assert_eq!(Some(ColumnDataType::Boolean as i32), column.datatype); + assert_eq!(ColumnDataType::Boolean as i32, column.datatype); assert_eq!(SemanticType::Field as i32, column.semantic_type); assert_eq!(vec![true], column.values.as_ref().unwrap().bool_values); verify_null_mask(&column.null_mask, vec![true, false, true]); let column = &columns[6]; assert_eq!("year_of_service", column.column_name); - assert_eq!(Some(ColumnDataType::Uint64 as i32), column.datatype); + assert_eq!(ColumnDataType::Uint64 as i32, column.datatype); assert_eq!(SemanticType::Field as i32, column.semantic_type); assert_eq!(vec![2], column.values.as_ref().unwrap().u64_values); verify_null_mask(&column.null_mask, vec![true, false, true]); let column = &columns[7]; assert_eq!("temperature", column.column_name); - assert_eq!(Some(ColumnDataType::Int64 as i32), column.datatype); + assert_eq!(ColumnDataType::Int64 as i32, column.datatype); assert_eq!(SemanticType::Field as i32, column.semantic_type); assert_eq!(vec![4], column.values.as_ref().unwrap().i64_values); verify_null_mask(&column.null_mask, vec![true, false, true]); let column = &columns[8]; assert_eq!("cpu_core_num", column.column_name); - assert_eq!(Some(ColumnDataType::Uint64 as i32), column.datatype); + assert_eq!(ColumnDataType::Uint64 as i32, column.datatype); assert_eq!(SemanticType::Field as i32, column.semantic_type); assert_eq!(vec![16], column.values.as_ref().unwrap().u64_values); verify_null_mask(&column.null_mask, vec![true, true, false]); diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 9533a813c7..a6e15470e5 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -67,6 +67,9 @@ pub enum Error { #[snafu(display("Missing required field in protobuf, field: {}", field))] MissingField { field: String, backtrace: Backtrace }, + #[snafu(display("Missing timestamp column in request"))] + MissingTimestampColumn { backtrace: Backtrace }, + #[snafu(display( "Columns and values number mismatch, columns: {}, values: {}", columns, @@ -247,6 +250,17 @@ pub enum Error { #[snafu(backtrace)] source: datatypes::error::Error, }, + + #[snafu(display( + "Duplicated timestamp column in gRPC requests, exists {}, duplicated: {}", + exists, + duplicated + ))] + DuplicatedTimestampColumn { + exists: String, + duplicated: String, + backtrace: Backtrace, + }, } pub type Result = std::result::Result; @@ -283,10 +297,13 @@ impl ErrorExt for Error { | Error::KeyColumnNotFound { .. } | Error::InvalidPrimaryKey { .. } | Error::MissingField { .. } + | Error::MissingTimestampColumn { .. } | Error::CatalogNotFound { .. } | Error::SchemaNotFound { .. } | Error::ConstraintNotSupported { .. } - | Error::ParseTimestamp { .. } => StatusCode::InvalidArguments, + | Error::ParseTimestamp { .. } + | Error::DuplicatedTimestampColumn { .. } => StatusCode::InvalidArguments, + // TODO(yingwen): Further categorize http error. Error::StartServer { .. } | Error::ParseAddr { .. } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 7eeb8b55c8..412a08a5b4 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -1,15 +1,15 @@ use std::{fs, path, sync::Arc}; use api::v1::{ - admin_expr, insert_expr, object_expr, select_expr, AdminExpr, AdminResult, ObjectExpr, - ObjectResult, SelectExpr, + admin_expr, codec::InsertBatch, insert_expr, object_expr, select_expr, AdminExpr, AdminResult, + ObjectExpr, ObjectResult, SelectExpr, }; use async_trait::async_trait; use catalog::{CatalogManagerRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::prelude::BoxedError; use common_error::status_code::StatusCode; use common_query::Output; -use common_telemetry::logging::{error, info}; +use common_telemetry::logging::{debug, error, info}; use common_telemetry::timer; use log_store::fs::{config::LogConfig, log::LocalFileLogStore}; use object_store::{backend::fs::Backend, util, ObjectStore}; @@ -18,6 +18,7 @@ use servers::query_handler::{GrpcAdminHandler, GrpcQueryHandler, SqlQueryHandler use snafu::prelude::*; use sql::statements::statement::Statement; use storage::{config::EngineConfig as StorageEngineConfig, EngineImpl}; +use table::requests::AddColumnRequest; use table_engine::config::EngineConfig as TableEngineConfig; use table_engine::engine::MitoEngine; @@ -29,7 +30,7 @@ use crate::error::{ use crate::metric; use crate::script::ScriptExecutor; use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder}; -use crate::server::grpc::insert::insertion_expr_to_request; +use crate::server::grpc::insert::{self, insertion_expr_to_request}; use crate::server::grpc::plan::PhysicalPlanner; use crate::server::grpc::select::to_object_result; use crate::sql::{SqlHandler, SqlRequest}; @@ -38,10 +39,8 @@ type DefaultEngine = MitoEngine>; // An abstraction to read/write services. pub struct Instance { - // Query service query_engine: QueryEngineRef, sql_handler: SqlHandler, - // Catalog list catalog_manager: CatalogManagerRef, physical_planner: PhysicalPlanner, script_executor: ScriptExecutor, @@ -82,6 +81,60 @@ impl Instance { }) } + async fn add_new_columns_to_table( + &self, + table_name: &str, + add_columns: Vec, + ) -> Result<()> { + let column_names = add_columns + .iter() + .map(|req| req.column_schema.name.clone()) + .collect::>(); + + let alter_request = insert::build_alter_table_request(table_name, add_columns); + + debug!( + "Adding new columns: {:?} to table: {}", + column_names, table_name + ); + + let _result = self + .sql_handler() + .execute(SqlRequest::Alter(alter_request)) + .await?; + + info!( + "Added new columns: {:?} to table: {}", + column_names, table_name + ); + Ok(()) + } + + async fn create_table_by_insert_batches( + &self, + table_name: &str, + insert_batches: &[InsertBatch], + ) -> Result<()> { + // Create table automatically, build schema from data. + let table_id = self.catalog_manager.next_table_id(); + let create_table_request = + insert::build_create_table_request(table_id, table_name, insert_batches)?; + + info!( + "Try to create table: {} automatically with request: {:?}", + table_name, create_table_request, + ); + + let _result = self + .sql_handler() + .execute(SqlRequest::Create(create_table_request)) + .await?; + + info!("Success to create table: {} automatically", table_name); + + Ok(()) + } + pub async fn execute_grpc_insert( &self, table_name: &str, @@ -94,11 +147,27 @@ impl Instance { .schema(DEFAULT_SCHEMA_NAME) .unwrap(); - let table = schema_provider - .table(table_name) - .context(TableNotFoundSnafu { table_name })?; + let insert_batches = insert::insert_batches(values.values)?; + ensure!(!insert_batches.is_empty(), error::IllegalInsertDataSnafu); - let insert = insertion_expr_to_request(table_name, values, table.clone())?; + let table = if let Some(table) = schema_provider.table(table_name) { + let schema = table.schema(); + if let Some(add_columns) = insert::find_new_columns(&schema, &insert_batches)? { + self.add_new_columns_to_table(table_name, add_columns) + .await?; + } + + table + } else { + self.create_table_by_insert_batches(table_name, &insert_batches) + .await?; + + schema_provider + .table(table_name) + .context(TableNotFoundSnafu { table_name })? + }; + + let insert = insertion_expr_to_request(table_name, insert_batches, table.clone())?; let affected_rows = table .insert(insert) diff --git a/src/datanode/src/server/grpc/ddl.rs b/src/datanode/src/server/grpc/ddl.rs index 3c633ae42c..01868235d7 100644 --- a/src/datanode/src/server/grpc/ddl.rs +++ b/src/datanode/src/server/grpc/ddl.rs @@ -8,7 +8,7 @@ use datatypes::schema::ColumnDefaultConstraint; use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef}; use futures::TryFutureExt; use snafu::prelude::*; -use table::requests::{AlterKind, AlterTableRequest, CreateTableRequest}; +use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, CreateTableRequest}; use crate::error::{self, ColumnDefaultConstraintSnafu, MissingFieldSnafu, Result}; use crate::instance::Instance; @@ -96,8 +96,12 @@ impl Instance { let column_def = add_column.column_def.context(MissingFieldSnafu { field: "column_def", })?; - let alter_kind = AlterKind::AddColumn { - new_column: create_column_schema(&column_def)?, + let alter_kind = AlterKind::AddColumns { + columns: vec![AddColumnRequest { + column_schema: create_column_schema(&column_def)?, + // FIXME(dennis): supports adding key column + is_key: false, + }], }; let request = AlterTableRequest { catalog_name: expr.catalog_name, diff --git a/src/datanode/src/server/grpc/insert.rs b/src/datanode/src/server/grpc/insert.rs index e528ba3f9f..9f616da7c0 100644 --- a/src/datanode/src/server/grpc/insert.rs +++ b/src/datanode/src/server/grpc/insert.rs @@ -1,26 +1,184 @@ +use std::collections::HashSet; use std::{ collections::{hash_map::Entry, HashMap}, ops::Deref, sync::Arc, }; -use api::v1::{codec::InsertBatch, column::Values, insert_expr, Column}; +use api::{ + helper::ColumnDataTypeWrapper, + v1::{ + codec::InsertBatch, + column::{SemanticType, Values}, + Column, + }, +}; use common_base::BitVec; use common_time::timestamp::Timestamp; +use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef}; use datatypes::{data_type::ConcreteDataType, value::Value, vectors::VectorBuilder}; use snafu::{ensure, OptionExt, ResultExt}; -use table::{requests::InsertRequest, Table}; +use table::metadata::TableId; +use table::{ + requests::{AddColumnRequest, AlterKind, AlterTableRequest, CreateTableRequest, InsertRequest}, + Table, +}; -use crate::error::{ColumnNotFoundSnafu, DecodeInsertSnafu, IllegalInsertDataSnafu, Result}; +use crate::error::{self, ColumnNotFoundSnafu, DecodeInsertSnafu, IllegalInsertDataSnafu, Result}; + +const TAG_SEMANTIC_TYPE: i32 = SemanticType::Tag as i32; +const TIMESTAMP_SEMANTIC_TYPE: i32 = SemanticType::Timestamp as i32; + +#[inline] +fn build_column_schema(column_name: &str, datatype: i32, nullable: bool) -> Result { + let datatype_wrapper = + ColumnDataTypeWrapper::try_new(datatype).context(error::ColumnDataTypeSnafu)?; + + Ok(ColumnSchema::new( + column_name, + datatype_wrapper.into(), + nullable, + )) +} + +pub fn find_new_columns( + schema: &SchemaRef, + insert_batches: &[InsertBatch], +) -> Result>> { + let mut requests = Vec::default(); + let mut new_columns: HashSet = HashSet::default(); + + for InsertBatch { columns, row_count } in insert_batches { + if *row_count == 0 || columns.is_empty() { + continue; + } + + for Column { + column_name, + semantic_type, + datatype, + .. + } in columns + { + if schema.column_schema_by_name(column_name).is_none() + && !new_columns.contains(column_name) + { + let column_schema = build_column_schema(column_name, *datatype, true)?; + + requests.push(AddColumnRequest { + column_schema, + is_key: *semantic_type == TAG_SEMANTIC_TYPE, + }); + new_columns.insert(column_name.to_string()); + } + } + } + + if requests.is_empty() { + Ok(None) + } else { + Ok(Some(requests)) + } +} + +/// Build a alter table rqeusts that adding new columns. +#[inline] +pub fn build_alter_table_request( + table_name: &str, + columns: Vec, +) -> AlterTableRequest { + AlterTableRequest { + catalog_name: None, + schema_name: None, + table_name: table_name.to_string(), + alter_kind: AlterKind::AddColumns { columns }, + } +} + +/// Try to build create table request from insert data. +pub fn build_create_table_request( + table_id: TableId, + table_name: &str, + insert_batches: &[InsertBatch], +) -> Result { + let mut new_columns: HashSet = HashSet::default(); + let mut column_schemas = Vec::default(); + let mut primary_key_indices = Vec::default(); + let mut timestamp_index = usize::MAX; + + for InsertBatch { columns, row_count } in insert_batches { + if *row_count == 0 || columns.is_empty() { + continue; + } + + for Column { + column_name, + semantic_type, + datatype, + .. + } in columns + { + if !new_columns.contains(column_name) { + let mut column_schema = build_column_schema(column_name, *datatype, true)?; + + match *semantic_type { + TAG_SEMANTIC_TYPE => primary_key_indices.push(column_schemas.len()), + TIMESTAMP_SEMANTIC_TYPE => { + ensure!( + timestamp_index == usize::MAX, + error::DuplicatedTimestampColumnSnafu { + exists: &columns[timestamp_index].column_name, + duplicated: column_name, + } + ); + timestamp_index = column_schemas.len(); + // Timestamp column must not be null. + column_schema.is_nullable = false; + } + _ => {} + } + + column_schemas.push(column_schema); + new_columns.insert(column_name.to_string()); + } + } + + ensure!( + timestamp_index != usize::MAX, + error::MissingTimestampColumnSnafu + ); + + let schema = Arc::new( + SchemaBuilder::try_from(column_schemas) + .unwrap() + .timestamp_index(timestamp_index) + .build() + .context(error::CreateSchemaSnafu)?, + ); + + return Ok(CreateTableRequest { + id: table_id, + catalog_name: None, + schema_name: None, + table_name: table_name.to_string(), + desc: None, + schema, + create_if_not_exists: true, + primary_key_indices, + table_options: HashMap::new(), + }); + } + + error::IllegalInsertDataSnafu.fail() +} pub fn insertion_expr_to_request( table_name: &str, - values: insert_expr::Values, + insert_batches: Vec, table: Arc, ) -> Result { let schema = table.schema(); let mut columns_builders = HashMap::with_capacity(schema.column_schemas().len()); - let insert_batches = insert_batches(values.values)?; for InsertBatch { columns, row_count } in insert_batches { for Column { @@ -66,7 +224,8 @@ pub fn insertion_expr_to_request( }) } -fn insert_batches(bytes_vec: Vec>) -> Result> { +#[inline] +pub fn insert_batches(bytes_vec: Vec>) -> Result> { bytes_vec .iter() .map(|bytes| bytes.deref().try_into().context(DecodeInsertSnafu)) @@ -199,8 +358,8 @@ mod tests { use api::v1::{ codec::InsertBatch, - column::{self, Values}, - insert_expr, Column, + column::{self, SemanticType, Values}, + insert_expr, Column, ColumnDataType, }; use common_base::BitVec; use common_query::prelude::Expr; @@ -214,7 +373,85 @@ mod tests { use table::error::Result as TableResult; use table::Table; - use crate::server::grpc::insert::{convert_values, insertion_expr_to_request, is_null}; + use super::{ + build_column_schema, build_create_table_request, convert_values, find_new_columns, + insert_batches, insertion_expr_to_request, is_null, TAG_SEMANTIC_TYPE, + TIMESTAMP_SEMANTIC_TYPE, + }; + + #[test] + fn test_build_create_table_request() { + let table_id = 10; + let table_name = "test_metric"; + + assert!(build_create_table_request(table_id, table_name, &[]).is_err()); + + let insert_batches = insert_batches(mock_insert_batches()).unwrap(); + + let req = build_create_table_request(table_id, table_name, &insert_batches).unwrap(); + assert_eq!(table_id, req.id); + assert!(req.catalog_name.is_none()); + assert!(req.schema_name.is_none()); + assert_eq!(table_name, req.table_name); + assert!(req.desc.is_none()); + assert_eq!(vec![0], req.primary_key_indices); + + let schema = req.schema; + assert_eq!(Some(3), schema.timestamp_index()); + assert_eq!(4, schema.num_columns()); + assert_eq!( + ConcreteDataType::string_datatype(), + schema.column_schema_by_name("host").unwrap().data_type + ); + assert_eq!( + ConcreteDataType::float64_datatype(), + schema.column_schema_by_name("cpu").unwrap().data_type + ); + assert_eq!( + ConcreteDataType::float64_datatype(), + schema.column_schema_by_name("memory").unwrap().data_type + ); + assert_eq!( + ConcreteDataType::timestamp_millis_datatype(), + schema.column_schema_by_name("ts").unwrap().data_type + ); + } + + #[test] + fn test_find_new_columns() { + let mut columns = Vec::with_capacity(1); + let cpu_column = build_column_schema("cpu", 10, true).unwrap(); + let ts_column = build_column_schema("ts", 15, false).unwrap(); + columns.push(cpu_column); + columns.push(ts_column); + + let schema = Arc::new( + SchemaBuilder::try_from(columns) + .unwrap() + .timestamp_index(1) + .build() + .unwrap(), + ); + + assert!(find_new_columns(&schema, &[]).unwrap().is_none()); + + let insert_batches = insert_batches(mock_insert_batches()).unwrap(); + let new_columns = find_new_columns(&schema, &insert_batches).unwrap().unwrap(); + + assert_eq!(2, new_columns.len()); + let host_column = &new_columns[0]; + assert!(host_column.is_key); + assert_eq!( + ConcreteDataType::string_datatype(), + host_column.column_schema.data_type + ); + let memory_column = &new_columns[1]; + assert!(!memory_column.is_key); + assert_eq!( + ConcreteDataType::float64_datatype(), + memory_column.column_schema.data_type + ) + } #[test] fn test_insertion_expr_to_request() { @@ -223,7 +460,8 @@ mod tests { let values = insert_expr::Values { values: mock_insert_batches(), }; - let insert_req = insertion_expr_to_request("demo", values, table).unwrap(); + let insert_batches = insert_batches(values.values).unwrap(); + let insert_req = insertion_expr_to_request("demo", insert_batches, table).unwrap(); assert_eq!("demo", insert_req.table_name); @@ -313,10 +551,6 @@ mod tests { } fn mock_insert_batches() -> Vec> { - const SEMANTIC_TAG: i32 = 0; - const SEMANTIC_FIELD: i32 = 1; - const SEMANTIC_TS: i32 = 2; - let row_count = 2; let host_vals = column::Values { @@ -325,10 +559,10 @@ mod tests { }; let host_column = Column { column_name: "host".to_string(), - semantic_type: SEMANTIC_TAG, + semantic_type: TAG_SEMANTIC_TYPE, values: Some(host_vals), null_mask: vec![0], - ..Default::default() + datatype: ColumnDataType::String as i32, }; let cpu_vals = column::Values { @@ -337,10 +571,10 @@ mod tests { }; let cpu_column = Column { column_name: "cpu".to_string(), - semantic_type: SEMANTIC_FIELD, + semantic_type: SemanticType::Field as i32, values: Some(cpu_vals), null_mask: vec![2], - ..Default::default() + datatype: ColumnDataType::Float64 as i32, }; let mem_vals = column::Values { @@ -349,10 +583,10 @@ mod tests { }; let mem_column = Column { column_name: "memory".to_string(), - semantic_type: SEMANTIC_FIELD, + semantic_type: SemanticType::Field as i32, values: Some(mem_vals), null_mask: vec![1], - ..Default::default() + datatype: ColumnDataType::Float64 as i32, }; let ts_vals = column::Values { @@ -361,10 +595,10 @@ mod tests { }; let ts_column = Column { column_name: "ts".to_string(), - semantic_type: SEMANTIC_TS, + semantic_type: TIMESTAMP_SEMANTIC_TYPE, values: Some(ts_vals), null_mask: vec![0], - datatype: Some(15), + datatype: ColumnDataType::Timestamp as i32, }; let insert_batch = InsertBatch { diff --git a/src/datanode/src/server/grpc/select.rs b/src/datanode/src/server/grpc/select.rs index dab83caff0..5edf646ade 100644 --- a/src/datanode/src/server/grpc/select.rs +++ b/src/datanode/src/server/grpc/select.rs @@ -1,13 +1,14 @@ use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; -use api::v1::{codec::SelectResult, column::Values, Column, ObjectResult}; +use api::v1::{codec::SelectResult, column::SemanticType, column::Values, Column, ObjectResult}; use arrow::array::{Array, BooleanArray, PrimitiveArray}; use common_base::BitVec; use common_error::status_code::StatusCode; use common_query::Output; use common_recordbatch::{util, RecordBatches, SendableRecordBatchStream}; use datatypes::arrow_array::{BinaryArray, StringArray}; +use datatypes::schema::SchemaRef; use snafu::{OptionExt, ResultExt}; use crate::error::{self, ConversionSnafu, Result}; @@ -49,6 +50,17 @@ fn build_result(recordbatches: RecordBatches) -> Result { Ok(object_result) } +#[inline] +fn get_semantic_type(schema: &SchemaRef, idx: usize) -> i32 { + if Some(idx) == schema.timestamp_index() { + SemanticType::Timestamp as i32 + } else { + // FIXME(dennis): set primary key's columns semantic type as Tag, + // but we can't get the table's schema here right now. + SemanticType::Field as i32 + } +} + fn try_convert(record_batches: RecordBatches) -> Result { let schema = record_batches.schema(); let record_batches = record_batches.take(); @@ -61,8 +73,8 @@ fn try_convert(record_batches: RecordBatches) -> Result { let schemas = schema.column_schemas(); let mut columns = Vec::with_capacity(schemas.len()); - for (idx, schema) in schemas.iter().enumerate() { - let column_name = schema.name.clone(); + for (idx, column_schema) in schemas.iter().enumerate() { + let column_name = column_schema.name.clone(); let arrays: Vec> = record_batches .iter() @@ -73,12 +85,10 @@ fn try_convert(record_batches: RecordBatches) -> Result { column_name, values: Some(values(&arrays)?), null_mask: null_mask(&arrays, row_count), - datatype: Some( - ColumnDataTypeWrapper::try_from(schema.data_type.clone()) - .context(error::ColumnDataTypeSnafu)? - .datatype() as i32, - ), - ..Default::default() + datatype: ColumnDataTypeWrapper::try_from(column_schema.data_type.clone()) + .context(error::ColumnDataTypeSnafu)? + .datatype() as i32, + semantic_type: get_semantic_type(&schema, idx), }; columns.push(column); } diff --git a/src/datanode/src/sql/alter.rs b/src/datanode/src/sql/alter.rs index 0806a4fc40..141c3a5c7d 100644 --- a/src/datanode/src/sql/alter.rs +++ b/src/datanode/src/sql/alter.rs @@ -3,7 +3,7 @@ use snafu::prelude::*; use sql::statements::alter::{AlterTable, AlterTableOperation}; use sql::statements::{column_def_to_schema, table_idents_to_full_name}; use table::engine::EngineContext; -use table::requests::{AlterKind, AlterTableRequest}; +use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest}; use crate::error::{self, Result}; use crate::sql::SqlHandler; @@ -34,8 +34,13 @@ impl SqlHandler { } .fail() } - AlterTableOperation::AddColumn { column_def } => AlterKind::AddColumn { - new_column: column_def_to_schema(column_def).context(error::ParseSqlSnafu)?, + AlterTableOperation::AddColumn { column_def } => AlterKind::AddColumns { + columns: vec![AddColumnRequest { + column_schema: column_def_to_schema(column_def) + .context(error::ParseSqlSnafu)?, + // FIXME(dennis): supports adding key column + is_key: false, + }], }, }; Ok(AlterTableRequest { @@ -80,13 +85,16 @@ mod tests { assert_eq!(req.table_name, "my_metric_1"); let alter_kind = req.alter_kind; - assert_matches!(alter_kind, AlterKind::AddColumn { .. }); + assert_matches!(alter_kind, AlterKind::AddColumns { .. }); match alter_kind { - AlterKind::AddColumn { new_column } => { + AlterKind::AddColumns { columns } => { + let new_column = &columns[0].column_schema; + assert_eq!(new_column.name, "tagk_i"); assert!(new_column.is_nullable); assert_eq!(new_column.data_type, ConcreteDataType::string_datatype()); } + _ => unreachable!(), } } } diff --git a/src/datanode/src/tests/grpc_test.rs b/src/datanode/src/tests/grpc_test.rs index 5d2526d982..6999f02d11 100644 --- a/src/datanode/src/tests/grpc_test.rs +++ b/src/datanode/src/tests/grpc_test.rs @@ -6,8 +6,8 @@ use std::time::Duration; use api::v1::ColumnDataType; use api::v1::{ - admin_result, alter_expr::Kind, codec::InsertBatch, column, insert_expr, AddColumn, AlterExpr, - Column, ColumnDef, CreateExpr, InsertExpr, MutateResult, + admin_result, alter_expr::Kind, codec::InsertBatch, column, column::SemanticType, insert_expr, + AddColumn, AlterExpr, Column, ColumnDef, CreateExpr, InsertExpr, MutateResult, }; use client::admin::Admin; use client::{Client, Database, ObjectResult}; @@ -17,27 +17,38 @@ use servers::server::Server; use crate::instance::Instance; use crate::tests::test_util; -#[tokio::test] -async fn test_insert_and_select() { +async fn setup_grpc_server(port: usize) -> String { common_telemetry::init_default_ut_logging(); - let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts(); + let (mut opts, _guard) = test_util::create_tmp_dir_and_datanode_opts(); + let addr = format!("127.0.0.1:{}", port); + opts.rpc_addr = addr.clone(); let instance = Arc::new(Instance::new(&opts).await.unwrap()); instance.start().await.unwrap(); + let addr_cloned = addr.clone(); tokio::spawn(async move { let mut grpc_server = GrpcServer::new(instance.clone(), instance); - let addr = "127.0.0.1:3001".parse::().unwrap(); + let addr = addr_cloned.parse::().unwrap(); grpc_server.start(addr).await.unwrap() }); // wait for GRPC server to start tokio::time::sleep(Duration::from_secs(1)).await; + addr +} - let grpc_client = Client::connect("http://127.0.0.1:3001").await.unwrap(); - let db = Database::new("greptime", grpc_client.clone()); - let admin = Admin::new("greptime", grpc_client); +#[tokio::test] +async fn test_auto_create_table() { + let addr = setup_grpc_server(3991).await; + let grpc_client = Client::connect(format!("http://{}", addr)).await.unwrap(); + let db = Database::new("greptime", grpc_client); + + insert_and_assert(&db).await; +} + +fn expect_data() -> (Column, Column, Column, Column) { // testing data: let expected_host_col = Column { column_name: "host".to_string(), @@ -48,7 +59,8 @@ async fn test_insert_and_select() { .collect(), ..Default::default() }), - datatype: Some(12), // string + semantic_type: SemanticType::Field as i32, + datatype: ColumnDataType::String as i32, ..Default::default() }; let expected_cpu_col = Column { @@ -58,8 +70,8 @@ async fn test_insert_and_select() { ..Default::default() }), null_mask: vec![2], - datatype: Some(10), // float64 - ..Default::default() + semantic_type: SemanticType::Field as i32, + datatype: ColumnDataType::Float64 as i32, }; let expected_mem_col = Column { column_name: "memory".to_string(), @@ -68,8 +80,8 @@ async fn test_insert_and_select() { ..Default::default() }), null_mask: vec![4], - datatype: Some(10), // float64 - ..Default::default() + semantic_type: SemanticType::Field as i32, + datatype: ColumnDataType::Float64 as i32, }; let expected_ts_col = Column { column_name: "ts".to_string(), @@ -77,10 +89,28 @@ async fn test_insert_and_select() { ts_millis_values: vec![100, 101, 102, 103], ..Default::default() }), - datatype: Some(15), // timestamp + semantic_type: SemanticType::Timestamp as i32, + datatype: ColumnDataType::Timestamp as i32, ..Default::default() }; + ( + expected_host_col, + expected_cpu_col, + expected_mem_col, + expected_ts_col, + ) +} + +#[tokio::test] +async fn test_insert_and_select() { + let addr = setup_grpc_server(3990).await; + + let grpc_client = Client::connect(format!("http://{}", addr)).await.unwrap(); + + let db = Database::new("greptime", grpc_client.clone()); + let admin = Admin::new("greptime", grpc_client); + // create let expr = testing_create_expr(); let result = admin.create(expr).await.unwrap(); @@ -112,6 +142,13 @@ async fn test_insert_and_select() { assert_eq!(result.result, None); // insert + insert_and_assert(&db).await; +} + +async fn insert_and_assert(db: &Database) { + // testing data: + let (expected_host_col, expected_cpu_col, expected_mem_col, expected_ts_col) = expect_data(); + let values = vec![InsertBatch { columns: vec![ expected_host_col.clone(), @@ -161,19 +198,19 @@ fn testing_create_expr() -> CreateExpr { let column_defs = vec![ ColumnDef { name: "host".to_string(), - datatype: 12, // string + datatype: ColumnDataType::String as i32, is_nullable: false, default_constraint: None, }, ColumnDef { name: "cpu".to_string(), - datatype: 10, // float64 + datatype: ColumnDataType::Float64 as i32, is_nullable: true, default_constraint: None, }, ColumnDef { name: "memory".to_string(), - datatype: 10, // float64 + datatype: ColumnDataType::Float64 as i32, is_nullable: true, default_constraint: None, }, diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index 1da88769fc..a78cbff679 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -147,6 +147,11 @@ impl Schema { self.name_to_index.get(name).copied() } + #[inline] + pub fn contains_column(&self, name: &str) -> bool { + self.name_to_index.contains_key(name) + } + #[inline] pub fn num_columns(&self) -> usize { self.column_schemas.len() diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 47f1253fd0..0f7779b1ff 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -262,8 +262,8 @@ mod tests { use api::v1::codec::{InsertBatch, SelectResult}; use api::v1::{ - admin_expr, admin_result, column, object_expr, object_result, select_expr, Column, - ExprHeader, MutateResult, SelectExpr, + admin_expr, admin_result, column, column::SemanticType, object_expr, object_result, + select_expr, Column, ExprHeader, MutateResult, SelectExpr, }; use datafusion::arrow_print; use datafusion_common::record_batch::RecordBatch as DfRecordBatch; @@ -367,7 +367,8 @@ mod tests { .collect(), ..Default::default() }), - datatype: Some(12), // string + semantic_type: SemanticType::Field as i32, + datatype: ColumnDataType::String as i32, ..Default::default() }; let expected_cpu_col = Column { @@ -377,8 +378,8 @@ mod tests { ..Default::default() }), null_mask: vec![2], - datatype: Some(10), // float64 - ..Default::default() + semantic_type: SemanticType::Field as i32, + datatype: ColumnDataType::Float64 as i32, }; let expected_mem_col = Column { column_name: "memory".to_string(), @@ -387,8 +388,8 @@ mod tests { ..Default::default() }), null_mask: vec![4], - datatype: Some(10), // float64 - ..Default::default() + semantic_type: SemanticType::Field as i32, + datatype: ColumnDataType::Float64 as i32, }; let expected_disk_col = Column { column_name: "disk_util".to_string(), @@ -396,7 +397,8 @@ mod tests { f64_values: vec![9.9, 9.9, 9.9, 9.9], ..Default::default() }), - datatype: Some(10), // float64 + semantic_type: SemanticType::Field as i32, + datatype: ColumnDataType::Float64 as i32, ..Default::default() }; let expected_ts_col = Column { @@ -405,7 +407,9 @@ mod tests { ts_millis_values: vec![1000, 2000, 3000, 4000], ..Default::default() }), - datatype: Some(15), // timestamp + // FIXME(dennis): looks like the read schema in table scan doesn't have timestamp index, we have to investigate it. + semantic_type: SemanticType::Field as i32, + datatype: ColumnDataType::Timestamp as i32, ..Default::default() }; @@ -495,25 +499,25 @@ mod tests { let column_defs = vec![ GrpcColumnDef { name: "host".to_string(), - datatype: 12, // string + datatype: ColumnDataType::String as i32, is_nullable: false, default_constraint: None, }, GrpcColumnDef { name: "cpu".to_string(), - datatype: 10, // float64 + datatype: ColumnDataType::Float64 as i32, is_nullable: true, default_constraint: None, }, GrpcColumnDef { name: "memory".to_string(), - datatype: 10, // float64 + datatype: ColumnDataType::Float64 as i32, is_nullable: true, default_constraint: None, }, GrpcColumnDef { name: "disk_util".to_string(), - datatype: 10, // float64 + datatype: ColumnDataType::Float64 as i32, is_nullable: true, default_constraint: Some( ColumnDefaultConstraint::Value(Value::from(9.9f64)) @@ -523,7 +527,7 @@ mod tests { }, GrpcColumnDef { name: "ts".to_string(), - datatype: 15, // timestamp + datatype: ColumnDataType::Timestamp as i32, is_nullable: true, default_constraint: None, }, @@ -533,6 +537,7 @@ mod tests { column_defs, time_index: "ts".to_string(), primary_keys: vec!["ts".to_string(), "host".to_string()], + create_if_not_exists: true, ..Default::default() } } diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs index 4c3b6564ed..9ed7bdd5e3 100644 --- a/src/frontend/src/instance/opentsdb.rs +++ b/src/frontend/src/instance/opentsdb.rs @@ -1,14 +1,8 @@ -use std::collections::HashMap; - -use api::v1::{alter_expr, AddColumn, AlterExpr, ColumnDataType, ColumnDef, CreateExpr}; use async_trait::async_trait; -use client::{Error as ClientError, ObjectResult}; -use common_error::prelude::{BoxedError, StatusCode}; -use common_telemetry::info; +use client::ObjectResult; +use common_error::prelude::BoxedError; use servers::error as server_error; -use servers::opentsdb::codec::{ - DataPoint, OPENTSDB_TIMESTAMP_COLUMN_NAME, OPENTSDB_VALUE_COLUMN_NAME, -}; +use servers::opentsdb::codec::DataPoint; use servers::query_handler::OpentsdbProtocolHandler; use snafu::prelude::*; @@ -38,26 +32,6 @@ impl Instance { let object_result = match result { Ok(result) => result, - Err(ClientError::Datanode { code, .. }) => { - let retry = if code == StatusCode::TableNotFound as u32 { - self.create_opentsdb_metric(data_point).await?; - true - } else if code == StatusCode::TableColumnNotFound as u32 { - self.create_opentsdb_tags(data_point).await?; - true - } else { - false - }; - if retry { - self.db - .insert(expr.clone()) - .await - .context(error::RequestDatanodeSnafu)? - } else { - // `unwrap_err` is safe because we are matching "result" in "Err" arm - return Err(result.context(error::RequestDatanodeSnafu).unwrap_err()); - } - } Err(_) => { return Err(result.context(error::RequestDatanodeSnafu).unwrap_err()); } @@ -76,116 +50,6 @@ impl Instance { } Ok(()) } - - async fn create_opentsdb_metric(&self, data_point: &DataPoint) -> Result<()> { - let mut column_defs = Vec::with_capacity(2 + data_point.tags().len()); - - let ts_column = ColumnDef { - name: OPENTSDB_TIMESTAMP_COLUMN_NAME.to_string(), - datatype: ColumnDataType::Timestamp as i32, - is_nullable: false, - ..Default::default() - }; - column_defs.push(ts_column); - - let value_column = ColumnDef { - name: OPENTSDB_VALUE_COLUMN_NAME.to_string(), - datatype: ColumnDataType::Float64 as i32, - is_nullable: false, - ..Default::default() - }; - column_defs.push(value_column); - - for (tagk, _) in data_point.tags().iter() { - column_defs.push(ColumnDef { - name: tagk.to_string(), - datatype: ColumnDataType::String as i32, - is_nullable: true, - ..Default::default() - }) - } - - let expr = CreateExpr { - catalog_name: None, - schema_name: None, - table_name: data_point.metric().to_string(), - desc: Some(format!( - "Table for OpenTSDB metric: {}", - &data_point.metric() - )), - column_defs, - time_index: OPENTSDB_TIMESTAMP_COLUMN_NAME.to_string(), - primary_keys: vec![], - create_if_not_exists: true, - table_options: HashMap::new(), - }; - - let result = self - .admin - .create(expr) - .await - .context(error::RequestDatanodeSnafu)?; - let header = result.header.context(error::IncompleteGrpcResultSnafu { - err_msg: "'header' is missing", - })?; - if header.code == (StatusCode::Success as u32) - || header.code == (StatusCode::TableAlreadyExists as u32) - { - info!( - "OpenTSDB metric table for \"{}\" is created!", - data_point.metric() - ); - Ok(()) - } else { - error::ExecOpentsdbPutSnafu { - reason: format!("error code: {}", header.code), - } - .fail() - } - } - - async fn create_opentsdb_tags(&self, data_point: &DataPoint) -> Result<()> { - // TODO(LFC): support adding columns in one request - for (tagk, _) in data_point.tags().iter() { - let tag_column = ColumnDef { - name: tagk.to_string(), - datatype: ColumnDataType::String as i32, - is_nullable: true, - ..Default::default() - }; - let expr = AlterExpr { - catalog_name: None, - schema_name: None, - table_name: data_point.metric().to_string(), - kind: Some(alter_expr::Kind::AddColumn(AddColumn { - column_def: Some(tag_column), - })), - }; - - let result = self - .admin - .alter(expr) - .await - .context(error::RequestDatanodeSnafu)?; - let header = result.header.context(error::IncompleteGrpcResultSnafu { - err_msg: "'header' is missing", - })?; - if header.code != (StatusCode::Success as u32) - && header.code != (StatusCode::TableColumnExists as u32) - { - return error::ExecOpentsdbPutSnafu { - reason: format!("error code: {}", header.code), - } - .fail(); - } - info!( - "OpenTSDB tag \"{}\" for metric \"{}\" is added!", - tagk, - data_point.metric() - ); - } - Ok(()) - } } #[cfg(test)] diff --git a/src/servers/src/influxdb.rs b/src/servers/src/influxdb.rs index ffac778f9a..9b1a46f53f 100644 --- a/src/servers/src/influxdb.rs +++ b/src/servers/src/influxdb.rs @@ -262,7 +262,7 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; vals: Values, ) { assert_eq!(name, column.column_name); - assert_eq!(Some(datatype as i32), column.datatype); + assert_eq!(datatype as i32, column.datatype); assert_eq!(semantic_type as i32, column.semantic_type); verify_null_mask(&column.null_mask, null_mask); assert_eq!(Some(vals), column.values); diff --git a/src/servers/src/opentsdb/codec.rs b/src/servers/src/opentsdb/codec.rs index 41cd849c6e..5f0ff4dc5c 100644 --- a/src/servers/src/opentsdb/codec.rs +++ b/src/servers/src/opentsdb/codec.rs @@ -1,5 +1,5 @@ use api::v1::codec::InsertBatch; -use api::v1::{column, insert_expr, Column, InsertExpr}; +use api::v1::{column, column::SemanticType, insert_expr, Column, ColumnDataType, InsertExpr}; use crate::error::{self, Result}; @@ -119,6 +119,8 @@ impl DataPoint { ts_millis_values: vec![self.ts_millis], ..Default::default() }), + semantic_type: SemanticType::Timestamp as i32, + datatype: ColumnDataType::Timestamp as i32, ..Default::default() }; columns.push(ts_column); @@ -129,6 +131,8 @@ impl DataPoint { f64_values: vec![self.value], ..Default::default() }), + semantic_type: SemanticType::Field as i32, + datatype: ColumnDataType::Float64 as i32, ..Default::default() }; columns.push(value_column); @@ -140,6 +144,8 @@ impl DataPoint { string_values: vec![tagv.to_string()], ..Default::default() }), + semantic_type: SemanticType::Tag as i32, + datatype: ColumnDataType::String as i32, ..Default::default() }); } diff --git a/src/table-engine/src/engine.rs b/src/table-engine/src/engine.rs index 1097750e23..9f51b757f6 100644 --- a/src/table-engine/src/engine.rs +++ b/src/table-engine/src/engine.rs @@ -432,7 +432,7 @@ mod tests { use storage::EngineImpl; use store_api::manifest::Manifest; use store_api::storage::ReadContext; - use table::requests::{AlterKind, InsertRequest}; + use table::requests::{AddColumnRequest, AlterKind, InsertRequest}; use tempdir::TempDir; use super::*; @@ -831,8 +831,11 @@ mod tests { catalog_name: None, schema_name: None, table_name: TABLE_NAME.to_string(), - alter_kind: AlterKind::AddColumn { - new_column: new_column.clone(), + alter_kind: AlterKind::AddColumns { + columns: vec![AddColumnRequest { + column_schema: new_column.clone(), + is_key: false, + }], }, }; let table = table_engine diff --git a/src/table-engine/src/error.rs b/src/table-engine/src/error.rs index ae52ab1673..b03054103f 100644 --- a/src/table-engine/src/error.rs +++ b/src/table-engine/src/error.rs @@ -143,6 +143,13 @@ pub enum Error { table_name: String, }, + #[snafu(display("Columns {} not exist in table {}", column_names.join(","), table_name))] + ColumnsNotExist { + backtrace: Backtrace, + column_names: Vec, + table_name: String, + }, + #[snafu(display("Failed to build schema, msg: {}, source: {}", msg, source))] SchemaBuild { #[snafu(backtrace)] @@ -203,6 +210,8 @@ impl ErrorExt for Error { ColumnExists { .. } => StatusCode::TableColumnExists, + ColumnsNotExist { .. } => StatusCode::TableColumnNotFound, + TableInfoNotFound { .. } => StatusCode::Unexpected, ScanTableManifest { .. } | UpdateTableManifest { .. } => StatusCode::StorageUnavailable, diff --git a/src/table-engine/src/table.rs b/src/table-engine/src/table.rs index f68b4ec8b9..18167598c4 100644 --- a/src/table-engine/src/table.rs +++ b/src/table-engine/src/table.rs @@ -20,7 +20,7 @@ use datatypes::vectors::{ConstantVector, TimestampVector, VectorRef}; use futures::task::{Context, Poll}; use futures::Stream; use object_store::ObjectStore; -use snafu::{OptionExt, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator}; use store_api::storage::{ AddColumn, AlterOperation, AlterRequest, ChunkReader, ColumnDescriptorBuilder, PutOperation, @@ -28,7 +28,7 @@ use store_api::storage::{ }; use table::error::{Error as TableError, MissingColumnSnafu, Result as TableResult}; use table::metadata::{FilterPushDownType, TableInfoRef, TableMetaBuilder}; -use table::requests::{AlterKind, AlterTableRequest, InsertRequest}; +use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, InsertRequest}; use table::{ metadata::{TableInfo, TableType}, table::Table, @@ -36,8 +36,9 @@ use table::{ use tokio::sync::Mutex; use crate::error::{ - self, ProjectedColumnNotFoundSnafu, Result, ScanTableManifestSnafu, SchemaBuildSnafu, - TableInfoNotFoundSnafu, UnsupportedDefaultConstraintSnafu, UpdateTableManifestSnafu, + self, ColumnsNotExistSnafu, ProjectedColumnNotFoundSnafu, Result, ScanTableManifestSnafu, + SchemaBuildSnafu, TableInfoNotFoundSnafu, UnsupportedDefaultConstraintSnafu, + UpdateTableManifestSnafu, }; use crate::manifest::action::*; use crate::manifest::TableManifest; @@ -86,29 +87,51 @@ impl Table for MitoTable { //Add row key and columns for name in key_columns { - let vector = columns_values - .remove(name) - .or_else(|| { - Self::try_get_column_default_constraint_vector(&schema, name, rows_num).ok()? - }) - .context(MissingColumnSnafu { name }) - .map_err(TableError::from)?; + let column_schema = schema + .column_schema_by_name(name) + .expect("column schema not found"); - put_op - .add_key_column(name, vector) - .map_err(TableError::new)?; - } - // Add vaue columns - for name in value_columns { let vector = columns_values.remove(name).or_else(|| { - Self::try_get_column_default_constraint_vector(&schema, name, rows_num).ok()? + Self::try_get_column_default_constraint_vector(column_schema, rows_num).ok()? + }); + + if let Some(vector) = vector { + put_op + .add_key_column(name, vector) + .map_err(TableError::new)?; + } else if !column_schema.is_nullable { + return MissingColumnSnafu { name }.fail().map_err(TableError::from); + } + } + // Add value columns + for name in value_columns { + let column_schema = schema + .column_schema_by_name(name) + .expect("column schema not found"); + + let vector = columns_values.remove(name).or_else(|| { + Self::try_get_column_default_constraint_vector(column_schema, rows_num).ok()? }); if let Some(v) = vector { put_op.add_value_column(name, v).map_err(TableError::new)?; + } else if !column_schema.is_nullable { + return MissingColumnSnafu { name }.fail().map_err(TableError::from); } } + ensure!( + columns_values.is_empty(), + ColumnsNotExistSnafu { + table_name: &table_info.name, + column_names: columns_values + .keys() + .into_iter() + .map(|s| s.to_string()) + .collect::>(), + } + ); + logging::debug!( "Insert into table {} with put_op: {:?}", table_info.name, @@ -181,42 +204,58 @@ impl Table for MitoTable { let table_info = self.table_info(); let table_name = &table_info.name; let table_meta = &table_info.meta; - let (alter_op, table_schema) = match &req.alter_kind { - AlterKind::AddColumn { new_column } => { - let desc = ColumnDescriptorBuilder::new( - table_meta.next_column_id, - &new_column.name, - new_column.data_type.clone(), - ) - .is_nullable(new_column.is_nullable) - .default_constraint(new_column.default_constraint.clone()) - .build() - .context(error::BuildColumnDescriptorSnafu { - table_name, - column_name: &new_column.name, - })?; - let alter_op = AlterOperation::AddColumns { - columns: vec![AddColumn { - desc, - // TODO(yingwen): [alter] AlterTableRequest should be able to add a key column. - is_key: false, - }], - }; + + let (alter_op, table_schema, new_columns_num) = match &req.alter_kind { + AlterKind::AddColumns { + columns: new_columns, + } => { + let columns = new_columns + .iter() + .enumerate() + .map(|(i, add_column)| { + let new_column = &add_column.column_schema; + + let desc = ColumnDescriptorBuilder::new( + table_meta.next_column_id + i as u32, + &new_column.name, + new_column.data_type.clone(), + ) + .is_nullable(new_column.is_nullable) + .default_constraint(new_column.default_constraint.clone()) + .build() + .context(error::BuildColumnDescriptorSnafu { + table_name, + column_name: &new_column.name, + })?; + + Ok(AddColumn { + desc, + is_key: add_column.is_key, + }) + }) + .collect::>>()?; + + let alter_op = AlterOperation::AddColumns { columns }; // TODO(yingwen): [alter] Better way to alter the schema struct. In fact the column order // in table schema could differ from the region schema, so we could just push this column // to the back of the schema (as last column). - let table_schema = - build_table_schema_with_new_column(table_name, &table_meta.schema, new_column)?; + let table_schema = build_table_schema_with_new_columns( + table_name, + &table_meta.schema, + new_columns, + )?; - (alter_op, table_schema) + (alter_op, table_schema, new_columns.len() as u32) } + // TODO(dennis): supports removing columns etc. + _ => unimplemented!(), }; let new_meta = TableMetaBuilder::default() .schema(table_schema.clone()) .engine(&table_meta.engine) - .next_column_id(table_meta.next_column_id + 1) // Bump next column id. + .next_column_id(table_meta.next_column_id + new_columns_num) // Bump next column id. .primary_key_indices(table_meta.primary_key_indices.clone()) .build() .context(error::BuildTableMetaSnafu { table_name })?; @@ -272,24 +311,27 @@ impl Table for MitoTable { } } -fn build_table_schema_with_new_column( +fn build_table_schema_with_new_columns( table_name: &str, table_schema: &SchemaRef, - new_column: &ColumnSchema, + new_columns: &[AddColumnRequest], ) -> Result { - if table_schema - .column_schema_by_name(&new_column.name) - .is_some() - { - return error::ColumnExistsSnafu { - column_name: &new_column.name, - table_name, - } - .fail()?; - } - let mut columns = table_schema.column_schemas().to_vec(); - columns.push(new_column.clone()); + + for add_column in new_columns { + let new_column = &add_column.column_schema; + if table_schema + .column_schema_by_name(&new_column.name) + .is_some() + { + return error::ColumnExistsSnafu { + column_name: &new_column.name, + table_name, + } + .fail()?; + } + columns.push(new_column.clone()); + } // Right now we are not support adding the column // before or after some column, so just clone a new schema like this. @@ -307,7 +349,7 @@ fn build_table_schema_with_new_column( builder = builder.add_metadata(k, v); } let new_schema = Arc::new(builder.build().with_context(|_| error::SchemaBuildSnafu { - msg: format!("cannot add new column {:?}", new_column), + msg: format!("cannot add new columns {:?}", new_columns), })?); Ok(new_schema) } @@ -424,14 +466,10 @@ impl MitoTable { } fn try_get_column_default_constraint_vector( - schema: &SchemaRef, - name: &str, + column_schema: &ColumnSchema, rows_num: usize, ) -> TableResult> { // TODO(dennis): when we support altering schema, we should check the schemas difference between table and region - let column_schema = schema - .column_schema_by_name(name) - .expect("column schema not found"); if let Some(v) = &column_schema.default_constraint { assert!(rows_num > 0); @@ -572,7 +610,12 @@ mod tests { let table_schema = &table_meta.schema; let new_column = ColumnSchema::new("host", ConcreteDataType::string_datatype(), true); - let result = build_table_schema_with_new_column(table_name, table_schema, &new_column); + + let new_columns = vec![AddColumnRequest { + column_schema: new_column, + is_key: false, + }]; + let result = build_table_schema_with_new_columns(table_name, table_schema, &new_columns); assert!(result.is_err()); assert!(result .unwrap_err() @@ -580,8 +623,12 @@ mod tests { .contains("Column host already exists in table demo")); let new_column = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true); + let new_columns = vec![AddColumnRequest { + column_schema: new_column.clone(), + is_key: false, + }]; let new_schema = - build_table_schema_with_new_column(table_name, table_schema, &new_column).unwrap(); + build_table_schema_with_new_columns(table_name, table_schema, &new_columns).unwrap(); assert_eq!(new_schema.num_columns(), table_schema.num_columns() + 1); assert_eq!( diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index ade7283a30..09e541cab0 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -45,9 +45,17 @@ pub struct AlterTableRequest { pub alter_kind: AlterKind, } +/// Add column request +#[derive(Debug)] +pub struct AddColumnRequest { + pub column_schema: ColumnSchema, + pub is_key: bool, +} + #[derive(Debug)] pub enum AlterKind { - AddColumn { new_column: ColumnSchema }, + AddColumns { columns: Vec }, + RemoveColumns { names: Vec }, } /// Drop table request