feat: create table and add new columns automatically in gRPC (#310)

* fix: readme

* feat: change Column's datatype in protobuf from optional to required

* feat: supports creating table and adding new columns automatically in gRPC, #279, #283

* fix: test

* refactor: execute_grpc_insert

* refactor: clean code and add test

* fix: test after rebasing develop branch

* test: test grpc server with different ports

* fix: typo

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* fix: typo

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* chore: minor changes

* chore: build_alter_table_request

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
dennis zhuang
2022-10-17 10:34:52 +08:00
committed by GitHub
parent 494a93c4f2
commit 25a16875b6
20 changed files with 642 additions and 323 deletions

View File

@@ -125,7 +125,7 @@ cargo run -- --log-dir=logs --log-level=debug frontend start -c ./config/fronten
cpu DOUBLE DEFAULT 0,
memory DOUBLE,
TIME INDEX (ts),
PRIMARY KEY(ts,host)) ENGINE=mito WITH(regions=1);
PRIMARY KEY(host)) ENGINE=mito WITH(regions=1);
```
3. Insert data:

View File

@@ -49,7 +49,7 @@ message Column {
bytes null_mask = 4;
// Helpful in creating vector from column.
optional ColumnDataType datatype = 5;
ColumnDataType datatype = 5;
}
message ColumnDef {

View File

@@ -23,10 +23,7 @@ use snafu::{ensure, OptionExt, ResultExt};
use crate::error;
use crate::{
error::{
ConvertSchemaSnafu, DatanodeSnafu, DecodeSelectSnafu, EncodePhysicalSnafu,
MissingFieldSnafu,
},
error::{ConvertSchemaSnafu, DatanodeSnafu, DecodeSelectSnafu, EncodePhysicalSnafu},
Client, Result,
};
@@ -240,12 +237,8 @@ impl TryFrom<ObjectResult> for Output {
}
fn column_to_vector(column: &Column, rows: u32) -> Result<VectorRef> {
let wrapper = ColumnDataTypeWrapper::try_new(
column
.datatype
.context(MissingFieldSnafu { field: "datatype" })?,
)
.context(error::ColumnDataTypeSnafu)?;
let wrapper =
ColumnDataTypeWrapper::try_new(column.datatype).context(error::ColumnDataTypeSnafu)?;
let column_datatype = wrapper.datatype();
let rows = rows as usize;
@@ -348,7 +341,7 @@ mod tests {
#[test]
fn test_column_to_vector() {
let mut column = create_test_column(Arc::new(BooleanVector::from(vec![true])));
column.datatype = Some(-100);
column.datatype = -100;
let result = column_to_vector(&column, 1);
assert!(result.is_err());
assert_eq!(
@@ -426,7 +419,7 @@ mod tests {
semantic_type: 1,
values: Some(values(&[array.clone()]).unwrap()),
null_mask: null_mask(&vec![array], vector.len()),
datatype: Some(wrapper.datatype() as i32),
datatype: wrapper.datatype() as i32,
}
}
}

View File

@@ -35,7 +35,7 @@ impl LinesWriter {
SemanticType::Timestamp,
);
ensure!(
column.datatype == Some(ColumnDataType::Timestamp.into()),
column.datatype == ColumnDataType::Timestamp as i32,
TypeMismatchSnafu {
column_name,
expected: "timestamp",
@@ -52,7 +52,7 @@ impl LinesWriter {
pub fn write_tag(&mut self, column_name: &str, value: &str) -> Result<()> {
let (idx, column) = self.mut_column(column_name, ColumnDataType::String, SemanticType::Tag);
ensure!(
column.datatype == Some(ColumnDataType::String.into()),
column.datatype == ColumnDataType::String as i32,
TypeMismatchSnafu {
column_name,
expected: "string",
@@ -70,7 +70,7 @@ impl LinesWriter {
let (idx, column) =
self.mut_column(column_name, ColumnDataType::Uint64, SemanticType::Field);
ensure!(
column.datatype == Some(ColumnDataType::Uint64.into()),
column.datatype == ColumnDataType::Uint64 as i32,
TypeMismatchSnafu {
column_name,
expected: "u64",
@@ -88,7 +88,7 @@ impl LinesWriter {
let (idx, column) =
self.mut_column(column_name, ColumnDataType::Int64, SemanticType::Field);
ensure!(
column.datatype == Some(ColumnDataType::Int64.into()),
column.datatype == ColumnDataType::Int64 as i32,
TypeMismatchSnafu {
column_name,
expected: "i64",
@@ -106,7 +106,7 @@ impl LinesWriter {
let (idx, column) =
self.mut_column(column_name, ColumnDataType::Float64, SemanticType::Field);
ensure!(
column.datatype == Some(ColumnDataType::Float64.into()),
column.datatype == ColumnDataType::Float64 as i32,
TypeMismatchSnafu {
column_name,
expected: "f64",
@@ -124,7 +124,7 @@ impl LinesWriter {
let (idx, column) =
self.mut_column(column_name, ColumnDataType::String, SemanticType::Field);
ensure!(
column.datatype == Some(ColumnDataType::String.into()),
column.datatype == ColumnDataType::String as i32,
TypeMismatchSnafu {
column_name,
expected: "string",
@@ -142,7 +142,7 @@ impl LinesWriter {
let (idx, column) =
self.mut_column(column_name, ColumnDataType::Boolean, SemanticType::Field);
ensure!(
column.datatype == Some(ColumnDataType::Boolean.into()),
column.datatype == ColumnDataType::Boolean as i32,
TypeMismatchSnafu {
column_name,
expected: "boolean",
@@ -197,7 +197,7 @@ impl LinesWriter {
column_name: column_name.to_string(),
semantic_type: semantic_type.into(),
values: Some(Values::with_capacity(datatype, to_insert)),
datatype: Some(datatype.into()),
datatype: datatype as i32,
null_mask: Vec::default(),
});
column_names.insert(column_name.to_string(), new_idx);
@@ -275,7 +275,7 @@ mod tests {
let column = &columns[0];
assert_eq!("host", columns[0].column_name);
assert_eq!(Some(ColumnDataType::String as i32), column.datatype);
assert_eq!(ColumnDataType::String as i32, column.datatype);
assert_eq!(SemanticType::Tag as i32, column.semantic_type);
assert_eq!(
vec!["host1", "host2", "host3"],
@@ -285,28 +285,28 @@ mod tests {
let column = &columns[1];
assert_eq!("cpu", column.column_name);
assert_eq!(Some(ColumnDataType::Float64 as i32), column.datatype);
assert_eq!(ColumnDataType::Float64 as i32, column.datatype);
assert_eq!(SemanticType::Field as i32, column.semantic_type);
assert_eq!(vec![0.5, 0.4], column.values.as_ref().unwrap().f64_values);
verify_null_mask(&column.null_mask, vec![false, true, false]);
let column = &columns[2];
assert_eq!("memory", column.column_name);
assert_eq!(Some(ColumnDataType::Float64 as i32), column.datatype);
assert_eq!(ColumnDataType::Float64 as i32, column.datatype);
assert_eq!(SemanticType::Field as i32, column.semantic_type);
assert_eq!(vec![0.4], column.values.as_ref().unwrap().f64_values);
verify_null_mask(&column.null_mask, vec![false, true, true]);
let column = &columns[3];
assert_eq!("name", column.column_name);
assert_eq!(Some(ColumnDataType::String as i32), column.datatype);
assert_eq!(ColumnDataType::String as i32, column.datatype);
assert_eq!(SemanticType::Field as i32, column.semantic_type);
assert_eq!(vec!["name1"], column.values.as_ref().unwrap().string_values);
verify_null_mask(&column.null_mask, vec![false, true, true]);
let column = &columns[4];
assert_eq!("ts", column.column_name);
assert_eq!(Some(ColumnDataType::Timestamp as i32), column.datatype);
assert_eq!(ColumnDataType::Timestamp as i32, column.datatype);
assert_eq!(SemanticType::Timestamp as i32, column.semantic_type);
assert_eq!(
vec![101011000, 102011001, 103011002],
@@ -316,28 +316,28 @@ mod tests {
let column = &columns[5];
assert_eq!("enable_reboot", column.column_name);
assert_eq!(Some(ColumnDataType::Boolean as i32), column.datatype);
assert_eq!(ColumnDataType::Boolean as i32, column.datatype);
assert_eq!(SemanticType::Field as i32, column.semantic_type);
assert_eq!(vec![true], column.values.as_ref().unwrap().bool_values);
verify_null_mask(&column.null_mask, vec![true, false, true]);
let column = &columns[6];
assert_eq!("year_of_service", column.column_name);
assert_eq!(Some(ColumnDataType::Uint64 as i32), column.datatype);
assert_eq!(ColumnDataType::Uint64 as i32, column.datatype);
assert_eq!(SemanticType::Field as i32, column.semantic_type);
assert_eq!(vec![2], column.values.as_ref().unwrap().u64_values);
verify_null_mask(&column.null_mask, vec![true, false, true]);
let column = &columns[7];
assert_eq!("temperature", column.column_name);
assert_eq!(Some(ColumnDataType::Int64 as i32), column.datatype);
assert_eq!(ColumnDataType::Int64 as i32, column.datatype);
assert_eq!(SemanticType::Field as i32, column.semantic_type);
assert_eq!(vec![4], column.values.as_ref().unwrap().i64_values);
verify_null_mask(&column.null_mask, vec![true, false, true]);
let column = &columns[8];
assert_eq!("cpu_core_num", column.column_name);
assert_eq!(Some(ColumnDataType::Uint64 as i32), column.datatype);
assert_eq!(ColumnDataType::Uint64 as i32, column.datatype);
assert_eq!(SemanticType::Field as i32, column.semantic_type);
assert_eq!(vec![16], column.values.as_ref().unwrap().u64_values);
verify_null_mask(&column.null_mask, vec![true, true, false]);

View File

@@ -67,6 +67,9 @@ pub enum Error {
#[snafu(display("Missing required field in protobuf, field: {}", field))]
MissingField { field: String, backtrace: Backtrace },
#[snafu(display("Missing timestamp column in request"))]
MissingTimestampColumn { backtrace: Backtrace },
#[snafu(display(
"Columns and values number mismatch, columns: {}, values: {}",
columns,
@@ -247,6 +250,17 @@ pub enum Error {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display(
"Duplicated timestamp column in gRPC requests, exists {}, duplicated: {}",
exists,
duplicated
))]
DuplicatedTimestampColumn {
exists: String,
duplicated: String,
backtrace: Backtrace,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -283,10 +297,13 @@ impl ErrorExt for Error {
| Error::KeyColumnNotFound { .. }
| Error::InvalidPrimaryKey { .. }
| Error::MissingField { .. }
| Error::MissingTimestampColumn { .. }
| Error::CatalogNotFound { .. }
| Error::SchemaNotFound { .. }
| Error::ConstraintNotSupported { .. }
| Error::ParseTimestamp { .. } => StatusCode::InvalidArguments,
| Error::ParseTimestamp { .. }
| Error::DuplicatedTimestampColumn { .. } => StatusCode::InvalidArguments,
// TODO(yingwen): Further categorize http error.
Error::StartServer { .. }
| Error::ParseAddr { .. }

View File

@@ -1,15 +1,15 @@
use std::{fs, path, sync::Arc};
use api::v1::{
admin_expr, insert_expr, object_expr, select_expr, AdminExpr, AdminResult, ObjectExpr,
ObjectResult, SelectExpr,
admin_expr, codec::InsertBatch, insert_expr, object_expr, select_expr, AdminExpr, AdminResult,
ObjectExpr, ObjectResult, SelectExpr,
};
use async_trait::async_trait;
use catalog::{CatalogManagerRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::prelude::BoxedError;
use common_error::status_code::StatusCode;
use common_query::Output;
use common_telemetry::logging::{error, info};
use common_telemetry::logging::{debug, error, info};
use common_telemetry::timer;
use log_store::fs::{config::LogConfig, log::LocalFileLogStore};
use object_store::{backend::fs::Backend, util, ObjectStore};
@@ -18,6 +18,7 @@ use servers::query_handler::{GrpcAdminHandler, GrpcQueryHandler, SqlQueryHandler
use snafu::prelude::*;
use sql::statements::statement::Statement;
use storage::{config::EngineConfig as StorageEngineConfig, EngineImpl};
use table::requests::AddColumnRequest;
use table_engine::config::EngineConfig as TableEngineConfig;
use table_engine::engine::MitoEngine;
@@ -29,7 +30,7 @@ use crate::error::{
use crate::metric;
use crate::script::ScriptExecutor;
use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder};
use crate::server::grpc::insert::insertion_expr_to_request;
use crate::server::grpc::insert::{self, insertion_expr_to_request};
use crate::server::grpc::plan::PhysicalPlanner;
use crate::server::grpc::select::to_object_result;
use crate::sql::{SqlHandler, SqlRequest};
@@ -38,10 +39,8 @@ type DefaultEngine = MitoEngine<EngineImpl<LocalFileLogStore>>;
// An abstraction to read/write services.
pub struct Instance {
// Query service
query_engine: QueryEngineRef,
sql_handler: SqlHandler,
// Catalog list
catalog_manager: CatalogManagerRef,
physical_planner: PhysicalPlanner,
script_executor: ScriptExecutor,
@@ -82,6 +81,60 @@ impl Instance {
})
}
async fn add_new_columns_to_table(
&self,
table_name: &str,
add_columns: Vec<AddColumnRequest>,
) -> Result<()> {
let column_names = add_columns
.iter()
.map(|req| req.column_schema.name.clone())
.collect::<Vec<_>>();
let alter_request = insert::build_alter_table_request(table_name, add_columns);
debug!(
"Adding new columns: {:?} to table: {}",
column_names, table_name
);
let _result = self
.sql_handler()
.execute(SqlRequest::Alter(alter_request))
.await?;
info!(
"Added new columns: {:?} to table: {}",
column_names, table_name
);
Ok(())
}
async fn create_table_by_insert_batches(
&self,
table_name: &str,
insert_batches: &[InsertBatch],
) -> Result<()> {
// Create table automatically, build schema from data.
let table_id = self.catalog_manager.next_table_id();
let create_table_request =
insert::build_create_table_request(table_id, table_name, insert_batches)?;
info!(
"Try to create table: {} automatically with request: {:?}",
table_name, create_table_request,
);
let _result = self
.sql_handler()
.execute(SqlRequest::Create(create_table_request))
.await?;
info!("Success to create table: {} automatically", table_name);
Ok(())
}
pub async fn execute_grpc_insert(
&self,
table_name: &str,
@@ -94,11 +147,27 @@ impl Instance {
.schema(DEFAULT_SCHEMA_NAME)
.unwrap();
let table = schema_provider
.table(table_name)
.context(TableNotFoundSnafu { table_name })?;
let insert_batches = insert::insert_batches(values.values)?;
ensure!(!insert_batches.is_empty(), error::IllegalInsertDataSnafu);
let insert = insertion_expr_to_request(table_name, values, table.clone())?;
let table = if let Some(table) = schema_provider.table(table_name) {
let schema = table.schema();
if let Some(add_columns) = insert::find_new_columns(&schema, &insert_batches)? {
self.add_new_columns_to_table(table_name, add_columns)
.await?;
}
table
} else {
self.create_table_by_insert_batches(table_name, &insert_batches)
.await?;
schema_provider
.table(table_name)
.context(TableNotFoundSnafu { table_name })?
};
let insert = insertion_expr_to_request(table_name, insert_batches, table.clone())?;
let affected_rows = table
.insert(insert)

View File

@@ -8,7 +8,7 @@ use datatypes::schema::ColumnDefaultConstraint;
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
use futures::TryFutureExt;
use snafu::prelude::*;
use table::requests::{AlterKind, AlterTableRequest, CreateTableRequest};
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, CreateTableRequest};
use crate::error::{self, ColumnDefaultConstraintSnafu, MissingFieldSnafu, Result};
use crate::instance::Instance;
@@ -96,8 +96,12 @@ impl Instance {
let column_def = add_column.column_def.context(MissingFieldSnafu {
field: "column_def",
})?;
let alter_kind = AlterKind::AddColumn {
new_column: create_column_schema(&column_def)?,
let alter_kind = AlterKind::AddColumns {
columns: vec![AddColumnRequest {
column_schema: create_column_schema(&column_def)?,
// FIXME(dennis): supports adding key column
is_key: false,
}],
};
let request = AlterTableRequest {
catalog_name: expr.catalog_name,

View File

@@ -1,26 +1,184 @@
use std::collections::HashSet;
use std::{
collections::{hash_map::Entry, HashMap},
ops::Deref,
sync::Arc,
};
use api::v1::{codec::InsertBatch, column::Values, insert_expr, Column};
use api::{
helper::ColumnDataTypeWrapper,
v1::{
codec::InsertBatch,
column::{SemanticType, Values},
Column,
},
};
use common_base::BitVec;
use common_time::timestamp::Timestamp;
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
use datatypes::{data_type::ConcreteDataType, value::Value, vectors::VectorBuilder};
use snafu::{ensure, OptionExt, ResultExt};
use table::{requests::InsertRequest, Table};
use table::metadata::TableId;
use table::{
requests::{AddColumnRequest, AlterKind, AlterTableRequest, CreateTableRequest, InsertRequest},
Table,
};
use crate::error::{ColumnNotFoundSnafu, DecodeInsertSnafu, IllegalInsertDataSnafu, Result};
use crate::error::{self, ColumnNotFoundSnafu, DecodeInsertSnafu, IllegalInsertDataSnafu, Result};
const TAG_SEMANTIC_TYPE: i32 = SemanticType::Tag as i32;
const TIMESTAMP_SEMANTIC_TYPE: i32 = SemanticType::Timestamp as i32;
#[inline]
fn build_column_schema(column_name: &str, datatype: i32, nullable: bool) -> Result<ColumnSchema> {
let datatype_wrapper =
ColumnDataTypeWrapper::try_new(datatype).context(error::ColumnDataTypeSnafu)?;
Ok(ColumnSchema::new(
column_name,
datatype_wrapper.into(),
nullable,
))
}
pub fn find_new_columns(
schema: &SchemaRef,
insert_batches: &[InsertBatch],
) -> Result<Option<Vec<AddColumnRequest>>> {
let mut requests = Vec::default();
let mut new_columns: HashSet<String> = HashSet::default();
for InsertBatch { columns, row_count } in insert_batches {
if *row_count == 0 || columns.is_empty() {
continue;
}
for Column {
column_name,
semantic_type,
datatype,
..
} in columns
{
if schema.column_schema_by_name(column_name).is_none()
&& !new_columns.contains(column_name)
{
let column_schema = build_column_schema(column_name, *datatype, true)?;
requests.push(AddColumnRequest {
column_schema,
is_key: *semantic_type == TAG_SEMANTIC_TYPE,
});
new_columns.insert(column_name.to_string());
}
}
}
if requests.is_empty() {
Ok(None)
} else {
Ok(Some(requests))
}
}
/// Build a alter table rqeusts that adding new columns.
#[inline]
pub fn build_alter_table_request(
table_name: &str,
columns: Vec<AddColumnRequest>,
) -> AlterTableRequest {
AlterTableRequest {
catalog_name: None,
schema_name: None,
table_name: table_name.to_string(),
alter_kind: AlterKind::AddColumns { columns },
}
}
/// Try to build create table request from insert data.
pub fn build_create_table_request(
table_id: TableId,
table_name: &str,
insert_batches: &[InsertBatch],
) -> Result<CreateTableRequest> {
let mut new_columns: HashSet<String> = HashSet::default();
let mut column_schemas = Vec::default();
let mut primary_key_indices = Vec::default();
let mut timestamp_index = usize::MAX;
for InsertBatch { columns, row_count } in insert_batches {
if *row_count == 0 || columns.is_empty() {
continue;
}
for Column {
column_name,
semantic_type,
datatype,
..
} in columns
{
if !new_columns.contains(column_name) {
let mut column_schema = build_column_schema(column_name, *datatype, true)?;
match *semantic_type {
TAG_SEMANTIC_TYPE => primary_key_indices.push(column_schemas.len()),
TIMESTAMP_SEMANTIC_TYPE => {
ensure!(
timestamp_index == usize::MAX,
error::DuplicatedTimestampColumnSnafu {
exists: &columns[timestamp_index].column_name,
duplicated: column_name,
}
);
timestamp_index = column_schemas.len();
// Timestamp column must not be null.
column_schema.is_nullable = false;
}
_ => {}
}
column_schemas.push(column_schema);
new_columns.insert(column_name.to_string());
}
}
ensure!(
timestamp_index != usize::MAX,
error::MissingTimestampColumnSnafu
);
let schema = Arc::new(
SchemaBuilder::try_from(column_schemas)
.unwrap()
.timestamp_index(timestamp_index)
.build()
.context(error::CreateSchemaSnafu)?,
);
return Ok(CreateTableRequest {
id: table_id,
catalog_name: None,
schema_name: None,
table_name: table_name.to_string(),
desc: None,
schema,
create_if_not_exists: true,
primary_key_indices,
table_options: HashMap::new(),
});
}
error::IllegalInsertDataSnafu.fail()
}
pub fn insertion_expr_to_request(
table_name: &str,
values: insert_expr::Values,
insert_batches: Vec<InsertBatch>,
table: Arc<dyn Table>,
) -> Result<InsertRequest> {
let schema = table.schema();
let mut columns_builders = HashMap::with_capacity(schema.column_schemas().len());
let insert_batches = insert_batches(values.values)?;
for InsertBatch { columns, row_count } in insert_batches {
for Column {
@@ -66,7 +224,8 @@ pub fn insertion_expr_to_request(
})
}
fn insert_batches(bytes_vec: Vec<Vec<u8>>) -> Result<Vec<InsertBatch>> {
#[inline]
pub fn insert_batches(bytes_vec: Vec<Vec<u8>>) -> Result<Vec<InsertBatch>> {
bytes_vec
.iter()
.map(|bytes| bytes.deref().try_into().context(DecodeInsertSnafu))
@@ -199,8 +358,8 @@ mod tests {
use api::v1::{
codec::InsertBatch,
column::{self, Values},
insert_expr, Column,
column::{self, SemanticType, Values},
insert_expr, Column, ColumnDataType,
};
use common_base::BitVec;
use common_query::prelude::Expr;
@@ -214,7 +373,85 @@ mod tests {
use table::error::Result as TableResult;
use table::Table;
use crate::server::grpc::insert::{convert_values, insertion_expr_to_request, is_null};
use super::{
build_column_schema, build_create_table_request, convert_values, find_new_columns,
insert_batches, insertion_expr_to_request, is_null, TAG_SEMANTIC_TYPE,
TIMESTAMP_SEMANTIC_TYPE,
};
#[test]
fn test_build_create_table_request() {
let table_id = 10;
let table_name = "test_metric";
assert!(build_create_table_request(table_id, table_name, &[]).is_err());
let insert_batches = insert_batches(mock_insert_batches()).unwrap();
let req = build_create_table_request(table_id, table_name, &insert_batches).unwrap();
assert_eq!(table_id, req.id);
assert!(req.catalog_name.is_none());
assert!(req.schema_name.is_none());
assert_eq!(table_name, req.table_name);
assert!(req.desc.is_none());
assert_eq!(vec![0], req.primary_key_indices);
let schema = req.schema;
assert_eq!(Some(3), schema.timestamp_index());
assert_eq!(4, schema.num_columns());
assert_eq!(
ConcreteDataType::string_datatype(),
schema.column_schema_by_name("host").unwrap().data_type
);
assert_eq!(
ConcreteDataType::float64_datatype(),
schema.column_schema_by_name("cpu").unwrap().data_type
);
assert_eq!(
ConcreteDataType::float64_datatype(),
schema.column_schema_by_name("memory").unwrap().data_type
);
assert_eq!(
ConcreteDataType::timestamp_millis_datatype(),
schema.column_schema_by_name("ts").unwrap().data_type
);
}
#[test]
fn test_find_new_columns() {
let mut columns = Vec::with_capacity(1);
let cpu_column = build_column_schema("cpu", 10, true).unwrap();
let ts_column = build_column_schema("ts", 15, false).unwrap();
columns.push(cpu_column);
columns.push(ts_column);
let schema = Arc::new(
SchemaBuilder::try_from(columns)
.unwrap()
.timestamp_index(1)
.build()
.unwrap(),
);
assert!(find_new_columns(&schema, &[]).unwrap().is_none());
let insert_batches = insert_batches(mock_insert_batches()).unwrap();
let new_columns = find_new_columns(&schema, &insert_batches).unwrap().unwrap();
assert_eq!(2, new_columns.len());
let host_column = &new_columns[0];
assert!(host_column.is_key);
assert_eq!(
ConcreteDataType::string_datatype(),
host_column.column_schema.data_type
);
let memory_column = &new_columns[1];
assert!(!memory_column.is_key);
assert_eq!(
ConcreteDataType::float64_datatype(),
memory_column.column_schema.data_type
)
}
#[test]
fn test_insertion_expr_to_request() {
@@ -223,7 +460,8 @@ mod tests {
let values = insert_expr::Values {
values: mock_insert_batches(),
};
let insert_req = insertion_expr_to_request("demo", values, table).unwrap();
let insert_batches = insert_batches(values.values).unwrap();
let insert_req = insertion_expr_to_request("demo", insert_batches, table).unwrap();
assert_eq!("demo", insert_req.table_name);
@@ -313,10 +551,6 @@ mod tests {
}
fn mock_insert_batches() -> Vec<Vec<u8>> {
const SEMANTIC_TAG: i32 = 0;
const SEMANTIC_FIELD: i32 = 1;
const SEMANTIC_TS: i32 = 2;
let row_count = 2;
let host_vals = column::Values {
@@ -325,10 +559,10 @@ mod tests {
};
let host_column = Column {
column_name: "host".to_string(),
semantic_type: SEMANTIC_TAG,
semantic_type: TAG_SEMANTIC_TYPE,
values: Some(host_vals),
null_mask: vec![0],
..Default::default()
datatype: ColumnDataType::String as i32,
};
let cpu_vals = column::Values {
@@ -337,10 +571,10 @@ mod tests {
};
let cpu_column = Column {
column_name: "cpu".to_string(),
semantic_type: SEMANTIC_FIELD,
semantic_type: SemanticType::Field as i32,
values: Some(cpu_vals),
null_mask: vec![2],
..Default::default()
datatype: ColumnDataType::Float64 as i32,
};
let mem_vals = column::Values {
@@ -349,10 +583,10 @@ mod tests {
};
let mem_column = Column {
column_name: "memory".to_string(),
semantic_type: SEMANTIC_FIELD,
semantic_type: SemanticType::Field as i32,
values: Some(mem_vals),
null_mask: vec![1],
..Default::default()
datatype: ColumnDataType::Float64 as i32,
};
let ts_vals = column::Values {
@@ -361,10 +595,10 @@ mod tests {
};
let ts_column = Column {
column_name: "ts".to_string(),
semantic_type: SEMANTIC_TS,
semantic_type: TIMESTAMP_SEMANTIC_TYPE,
values: Some(ts_vals),
null_mask: vec![0],
datatype: Some(15),
datatype: ColumnDataType::Timestamp as i32,
};
let insert_batch = InsertBatch {

View File

@@ -1,13 +1,14 @@
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::{codec::SelectResult, column::Values, Column, ObjectResult};
use api::v1::{codec::SelectResult, column::SemanticType, column::Values, Column, ObjectResult};
use arrow::array::{Array, BooleanArray, PrimitiveArray};
use common_base::BitVec;
use common_error::status_code::StatusCode;
use common_query::Output;
use common_recordbatch::{util, RecordBatches, SendableRecordBatchStream};
use datatypes::arrow_array::{BinaryArray, StringArray};
use datatypes::schema::SchemaRef;
use snafu::{OptionExt, ResultExt};
use crate::error::{self, ConversionSnafu, Result};
@@ -49,6 +50,17 @@ fn build_result(recordbatches: RecordBatches) -> Result<ObjectResult> {
Ok(object_result)
}
#[inline]
fn get_semantic_type(schema: &SchemaRef, idx: usize) -> i32 {
if Some(idx) == schema.timestamp_index() {
SemanticType::Timestamp as i32
} else {
// FIXME(dennis): set primary key's columns semantic type as Tag,
// but we can't get the table's schema here right now.
SemanticType::Field as i32
}
}
fn try_convert(record_batches: RecordBatches) -> Result<SelectResult> {
let schema = record_batches.schema();
let record_batches = record_batches.take();
@@ -61,8 +73,8 @@ fn try_convert(record_batches: RecordBatches) -> Result<SelectResult> {
let schemas = schema.column_schemas();
let mut columns = Vec::with_capacity(schemas.len());
for (idx, schema) in schemas.iter().enumerate() {
let column_name = schema.name.clone();
for (idx, column_schema) in schemas.iter().enumerate() {
let column_name = column_schema.name.clone();
let arrays: Vec<Arc<dyn Array>> = record_batches
.iter()
@@ -73,12 +85,10 @@ fn try_convert(record_batches: RecordBatches) -> Result<SelectResult> {
column_name,
values: Some(values(&arrays)?),
null_mask: null_mask(&arrays, row_count),
datatype: Some(
ColumnDataTypeWrapper::try_from(schema.data_type.clone())
.context(error::ColumnDataTypeSnafu)?
.datatype() as i32,
),
..Default::default()
datatype: ColumnDataTypeWrapper::try_from(column_schema.data_type.clone())
.context(error::ColumnDataTypeSnafu)?
.datatype() as i32,
semantic_type: get_semantic_type(&schema, idx),
};
columns.push(column);
}

View File

@@ -3,7 +3,7 @@ use snafu::prelude::*;
use sql::statements::alter::{AlterTable, AlterTableOperation};
use sql::statements::{column_def_to_schema, table_idents_to_full_name};
use table::engine::EngineContext;
use table::requests::{AlterKind, AlterTableRequest};
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest};
use crate::error::{self, Result};
use crate::sql::SqlHandler;
@@ -34,8 +34,13 @@ impl SqlHandler {
}
.fail()
}
AlterTableOperation::AddColumn { column_def } => AlterKind::AddColumn {
new_column: column_def_to_schema(column_def).context(error::ParseSqlSnafu)?,
AlterTableOperation::AddColumn { column_def } => AlterKind::AddColumns {
columns: vec![AddColumnRequest {
column_schema: column_def_to_schema(column_def)
.context(error::ParseSqlSnafu)?,
// FIXME(dennis): supports adding key column
is_key: false,
}],
},
};
Ok(AlterTableRequest {
@@ -80,13 +85,16 @@ mod tests {
assert_eq!(req.table_name, "my_metric_1");
let alter_kind = req.alter_kind;
assert_matches!(alter_kind, AlterKind::AddColumn { .. });
assert_matches!(alter_kind, AlterKind::AddColumns { .. });
match alter_kind {
AlterKind::AddColumn { new_column } => {
AlterKind::AddColumns { columns } => {
let new_column = &columns[0].column_schema;
assert_eq!(new_column.name, "tagk_i");
assert!(new_column.is_nullable);
assert_eq!(new_column.data_type, ConcreteDataType::string_datatype());
}
_ => unreachable!(),
}
}
}

View File

@@ -6,8 +6,8 @@ use std::time::Duration;
use api::v1::ColumnDataType;
use api::v1::{
admin_result, alter_expr::Kind, codec::InsertBatch, column, insert_expr, AddColumn, AlterExpr,
Column, ColumnDef, CreateExpr, InsertExpr, MutateResult,
admin_result, alter_expr::Kind, codec::InsertBatch, column, column::SemanticType, insert_expr,
AddColumn, AlterExpr, Column, ColumnDef, CreateExpr, InsertExpr, MutateResult,
};
use client::admin::Admin;
use client::{Client, Database, ObjectResult};
@@ -17,27 +17,38 @@ use servers::server::Server;
use crate::instance::Instance;
use crate::tests::test_util;
#[tokio::test]
async fn test_insert_and_select() {
async fn setup_grpc_server(port: usize) -> String {
common_telemetry::init_default_ut_logging();
let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts();
let (mut opts, _guard) = test_util::create_tmp_dir_and_datanode_opts();
let addr = format!("127.0.0.1:{}", port);
opts.rpc_addr = addr.clone();
let instance = Arc::new(Instance::new(&opts).await.unwrap());
instance.start().await.unwrap();
let addr_cloned = addr.clone();
tokio::spawn(async move {
let mut grpc_server = GrpcServer::new(instance.clone(), instance);
let addr = "127.0.0.1:3001".parse::<SocketAddr>().unwrap();
let addr = addr_cloned.parse::<SocketAddr>().unwrap();
grpc_server.start(addr).await.unwrap()
});
// wait for GRPC server to start
tokio::time::sleep(Duration::from_secs(1)).await;
addr
}
let grpc_client = Client::connect("http://127.0.0.1:3001").await.unwrap();
let db = Database::new("greptime", grpc_client.clone());
let admin = Admin::new("greptime", grpc_client);
#[tokio::test]
async fn test_auto_create_table() {
let addr = setup_grpc_server(3991).await;
let grpc_client = Client::connect(format!("http://{}", addr)).await.unwrap();
let db = Database::new("greptime", grpc_client);
insert_and_assert(&db).await;
}
fn expect_data() -> (Column, Column, Column, Column) {
// testing data:
let expected_host_col = Column {
column_name: "host".to_string(),
@@ -48,7 +59,8 @@ async fn test_insert_and_select() {
.collect(),
..Default::default()
}),
datatype: Some(12), // string
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::String as i32,
..Default::default()
};
let expected_cpu_col = Column {
@@ -58,8 +70,8 @@ async fn test_insert_and_select() {
..Default::default()
}),
null_mask: vec![2],
datatype: Some(10), // float64
..Default::default()
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::Float64 as i32,
};
let expected_mem_col = Column {
column_name: "memory".to_string(),
@@ -68,8 +80,8 @@ async fn test_insert_and_select() {
..Default::default()
}),
null_mask: vec![4],
datatype: Some(10), // float64
..Default::default()
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::Float64 as i32,
};
let expected_ts_col = Column {
column_name: "ts".to_string(),
@@ -77,10 +89,28 @@ async fn test_insert_and_select() {
ts_millis_values: vec![100, 101, 102, 103],
..Default::default()
}),
datatype: Some(15), // timestamp
semantic_type: SemanticType::Timestamp as i32,
datatype: ColumnDataType::Timestamp as i32,
..Default::default()
};
(
expected_host_col,
expected_cpu_col,
expected_mem_col,
expected_ts_col,
)
}
#[tokio::test]
async fn test_insert_and_select() {
let addr = setup_grpc_server(3990).await;
let grpc_client = Client::connect(format!("http://{}", addr)).await.unwrap();
let db = Database::new("greptime", grpc_client.clone());
let admin = Admin::new("greptime", grpc_client);
// create
let expr = testing_create_expr();
let result = admin.create(expr).await.unwrap();
@@ -112,6 +142,13 @@ async fn test_insert_and_select() {
assert_eq!(result.result, None);
// insert
insert_and_assert(&db).await;
}
async fn insert_and_assert(db: &Database) {
// testing data:
let (expected_host_col, expected_cpu_col, expected_mem_col, expected_ts_col) = expect_data();
let values = vec![InsertBatch {
columns: vec![
expected_host_col.clone(),
@@ -161,19 +198,19 @@ fn testing_create_expr() -> CreateExpr {
let column_defs = vec![
ColumnDef {
name: "host".to_string(),
datatype: 12, // string
datatype: ColumnDataType::String as i32,
is_nullable: false,
default_constraint: None,
},
ColumnDef {
name: "cpu".to_string(),
datatype: 10, // float64
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
},
ColumnDef {
name: "memory".to_string(),
datatype: 10, // float64
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
},

View File

@@ -147,6 +147,11 @@ impl Schema {
self.name_to_index.get(name).copied()
}
#[inline]
pub fn contains_column(&self, name: &str) -> bool {
self.name_to_index.contains_key(name)
}
#[inline]
pub fn num_columns(&self) -> usize {
self.column_schemas.len()

View File

@@ -262,8 +262,8 @@ mod tests {
use api::v1::codec::{InsertBatch, SelectResult};
use api::v1::{
admin_expr, admin_result, column, object_expr, object_result, select_expr, Column,
ExprHeader, MutateResult, SelectExpr,
admin_expr, admin_result, column, column::SemanticType, object_expr, object_result,
select_expr, Column, ExprHeader, MutateResult, SelectExpr,
};
use datafusion::arrow_print;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
@@ -367,7 +367,8 @@ mod tests {
.collect(),
..Default::default()
}),
datatype: Some(12), // string
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::String as i32,
..Default::default()
};
let expected_cpu_col = Column {
@@ -377,8 +378,8 @@ mod tests {
..Default::default()
}),
null_mask: vec![2],
datatype: Some(10), // float64
..Default::default()
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::Float64 as i32,
};
let expected_mem_col = Column {
column_name: "memory".to_string(),
@@ -387,8 +388,8 @@ mod tests {
..Default::default()
}),
null_mask: vec![4],
datatype: Some(10), // float64
..Default::default()
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::Float64 as i32,
};
let expected_disk_col = Column {
column_name: "disk_util".to_string(),
@@ -396,7 +397,8 @@ mod tests {
f64_values: vec![9.9, 9.9, 9.9, 9.9],
..Default::default()
}),
datatype: Some(10), // float64
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::Float64 as i32,
..Default::default()
};
let expected_ts_col = Column {
@@ -405,7 +407,9 @@ mod tests {
ts_millis_values: vec![1000, 2000, 3000, 4000],
..Default::default()
}),
datatype: Some(15), // timestamp
// FIXME(dennis): looks like the read schema in table scan doesn't have timestamp index, we have to investigate it.
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::Timestamp as i32,
..Default::default()
};
@@ -495,25 +499,25 @@ mod tests {
let column_defs = vec![
GrpcColumnDef {
name: "host".to_string(),
datatype: 12, // string
datatype: ColumnDataType::String as i32,
is_nullable: false,
default_constraint: None,
},
GrpcColumnDef {
name: "cpu".to_string(),
datatype: 10, // float64
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
},
GrpcColumnDef {
name: "memory".to_string(),
datatype: 10, // float64
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
},
GrpcColumnDef {
name: "disk_util".to_string(),
datatype: 10, // float64
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: Some(
ColumnDefaultConstraint::Value(Value::from(9.9f64))
@@ -523,7 +527,7 @@ mod tests {
},
GrpcColumnDef {
name: "ts".to_string(),
datatype: 15, // timestamp
datatype: ColumnDataType::Timestamp as i32,
is_nullable: true,
default_constraint: None,
},
@@ -533,6 +537,7 @@ mod tests {
column_defs,
time_index: "ts".to_string(),
primary_keys: vec!["ts".to_string(), "host".to_string()],
create_if_not_exists: true,
..Default::default()
}
}

View File

@@ -1,14 +1,8 @@
use std::collections::HashMap;
use api::v1::{alter_expr, AddColumn, AlterExpr, ColumnDataType, ColumnDef, CreateExpr};
use async_trait::async_trait;
use client::{Error as ClientError, ObjectResult};
use common_error::prelude::{BoxedError, StatusCode};
use common_telemetry::info;
use client::ObjectResult;
use common_error::prelude::BoxedError;
use servers::error as server_error;
use servers::opentsdb::codec::{
DataPoint, OPENTSDB_TIMESTAMP_COLUMN_NAME, OPENTSDB_VALUE_COLUMN_NAME,
};
use servers::opentsdb::codec::DataPoint;
use servers::query_handler::OpentsdbProtocolHandler;
use snafu::prelude::*;
@@ -38,26 +32,6 @@ impl Instance {
let object_result = match result {
Ok(result) => result,
Err(ClientError::Datanode { code, .. }) => {
let retry = if code == StatusCode::TableNotFound as u32 {
self.create_opentsdb_metric(data_point).await?;
true
} else if code == StatusCode::TableColumnNotFound as u32 {
self.create_opentsdb_tags(data_point).await?;
true
} else {
false
};
if retry {
self.db
.insert(expr.clone())
.await
.context(error::RequestDatanodeSnafu)?
} else {
// `unwrap_err` is safe because we are matching "result" in "Err" arm
return Err(result.context(error::RequestDatanodeSnafu).unwrap_err());
}
}
Err(_) => {
return Err(result.context(error::RequestDatanodeSnafu).unwrap_err());
}
@@ -76,116 +50,6 @@ impl Instance {
}
Ok(())
}
async fn create_opentsdb_metric(&self, data_point: &DataPoint) -> Result<()> {
let mut column_defs = Vec::with_capacity(2 + data_point.tags().len());
let ts_column = ColumnDef {
name: OPENTSDB_TIMESTAMP_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Timestamp as i32,
is_nullable: false,
..Default::default()
};
column_defs.push(ts_column);
let value_column = ColumnDef {
name: OPENTSDB_VALUE_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: false,
..Default::default()
};
column_defs.push(value_column);
for (tagk, _) in data_point.tags().iter() {
column_defs.push(ColumnDef {
name: tagk.to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: true,
..Default::default()
})
}
let expr = CreateExpr {
catalog_name: None,
schema_name: None,
table_name: data_point.metric().to_string(),
desc: Some(format!(
"Table for OpenTSDB metric: {}",
&data_point.metric()
)),
column_defs,
time_index: OPENTSDB_TIMESTAMP_COLUMN_NAME.to_string(),
primary_keys: vec![],
create_if_not_exists: true,
table_options: HashMap::new(),
};
let result = self
.admin
.create(expr)
.await
.context(error::RequestDatanodeSnafu)?;
let header = result.header.context(error::IncompleteGrpcResultSnafu {
err_msg: "'header' is missing",
})?;
if header.code == (StatusCode::Success as u32)
|| header.code == (StatusCode::TableAlreadyExists as u32)
{
info!(
"OpenTSDB metric table for \"{}\" is created!",
data_point.metric()
);
Ok(())
} else {
error::ExecOpentsdbPutSnafu {
reason: format!("error code: {}", header.code),
}
.fail()
}
}
async fn create_opentsdb_tags(&self, data_point: &DataPoint) -> Result<()> {
// TODO(LFC): support adding columns in one request
for (tagk, _) in data_point.tags().iter() {
let tag_column = ColumnDef {
name: tagk.to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: true,
..Default::default()
};
let expr = AlterExpr {
catalog_name: None,
schema_name: None,
table_name: data_point.metric().to_string(),
kind: Some(alter_expr::Kind::AddColumn(AddColumn {
column_def: Some(tag_column),
})),
};
let result = self
.admin
.alter(expr)
.await
.context(error::RequestDatanodeSnafu)?;
let header = result.header.context(error::IncompleteGrpcResultSnafu {
err_msg: "'header' is missing",
})?;
if header.code != (StatusCode::Success as u32)
&& header.code != (StatusCode::TableColumnExists as u32)
{
return error::ExecOpentsdbPutSnafu {
reason: format!("error code: {}", header.code),
}
.fail();
}
info!(
"OpenTSDB tag \"{}\" for metric \"{}\" is added!",
tagk,
data_point.metric()
);
}
Ok(())
}
}
#[cfg(test)]

View File

@@ -262,7 +262,7 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
vals: Values,
) {
assert_eq!(name, column.column_name);
assert_eq!(Some(datatype as i32), column.datatype);
assert_eq!(datatype as i32, column.datatype);
assert_eq!(semantic_type as i32, column.semantic_type);
verify_null_mask(&column.null_mask, null_mask);
assert_eq!(Some(vals), column.values);

View File

@@ -1,5 +1,5 @@
use api::v1::codec::InsertBatch;
use api::v1::{column, insert_expr, Column, InsertExpr};
use api::v1::{column, column::SemanticType, insert_expr, Column, ColumnDataType, InsertExpr};
use crate::error::{self, Result};
@@ -119,6 +119,8 @@ impl DataPoint {
ts_millis_values: vec![self.ts_millis],
..Default::default()
}),
semantic_type: SemanticType::Timestamp as i32,
datatype: ColumnDataType::Timestamp as i32,
..Default::default()
};
columns.push(ts_column);
@@ -129,6 +131,8 @@ impl DataPoint {
f64_values: vec![self.value],
..Default::default()
}),
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::Float64 as i32,
..Default::default()
};
columns.push(value_column);
@@ -140,6 +144,8 @@ impl DataPoint {
string_values: vec![tagv.to_string()],
..Default::default()
}),
semantic_type: SemanticType::Tag as i32,
datatype: ColumnDataType::String as i32,
..Default::default()
});
}

View File

@@ -432,7 +432,7 @@ mod tests {
use storage::EngineImpl;
use store_api::manifest::Manifest;
use store_api::storage::ReadContext;
use table::requests::{AlterKind, InsertRequest};
use table::requests::{AddColumnRequest, AlterKind, InsertRequest};
use tempdir::TempDir;
use super::*;
@@ -831,8 +831,11 @@ mod tests {
catalog_name: None,
schema_name: None,
table_name: TABLE_NAME.to_string(),
alter_kind: AlterKind::AddColumn {
new_column: new_column.clone(),
alter_kind: AlterKind::AddColumns {
columns: vec![AddColumnRequest {
column_schema: new_column.clone(),
is_key: false,
}],
},
};
let table = table_engine

View File

@@ -143,6 +143,13 @@ pub enum Error {
table_name: String,
},
#[snafu(display("Columns {} not exist in table {}", column_names.join(","), table_name))]
ColumnsNotExist {
backtrace: Backtrace,
column_names: Vec<String>,
table_name: String,
},
#[snafu(display("Failed to build schema, msg: {}, source: {}", msg, source))]
SchemaBuild {
#[snafu(backtrace)]
@@ -203,6 +210,8 @@ impl ErrorExt for Error {
ColumnExists { .. } => StatusCode::TableColumnExists,
ColumnsNotExist { .. } => StatusCode::TableColumnNotFound,
TableInfoNotFound { .. } => StatusCode::Unexpected,
ScanTableManifest { .. } | UpdateTableManifest { .. } => StatusCode::StorageUnavailable,

View File

@@ -20,7 +20,7 @@ use datatypes::vectors::{ConstantVector, TimestampVector, VectorRef};
use futures::task::{Context, Poll};
use futures::Stream;
use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator};
use store_api::storage::{
AddColumn, AlterOperation, AlterRequest, ChunkReader, ColumnDescriptorBuilder, PutOperation,
@@ -28,7 +28,7 @@ use store_api::storage::{
};
use table::error::{Error as TableError, MissingColumnSnafu, Result as TableResult};
use table::metadata::{FilterPushDownType, TableInfoRef, TableMetaBuilder};
use table::requests::{AlterKind, AlterTableRequest, InsertRequest};
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, InsertRequest};
use table::{
metadata::{TableInfo, TableType},
table::Table,
@@ -36,8 +36,9 @@ use table::{
use tokio::sync::Mutex;
use crate::error::{
self, ProjectedColumnNotFoundSnafu, Result, ScanTableManifestSnafu, SchemaBuildSnafu,
TableInfoNotFoundSnafu, UnsupportedDefaultConstraintSnafu, UpdateTableManifestSnafu,
self, ColumnsNotExistSnafu, ProjectedColumnNotFoundSnafu, Result, ScanTableManifestSnafu,
SchemaBuildSnafu, TableInfoNotFoundSnafu, UnsupportedDefaultConstraintSnafu,
UpdateTableManifestSnafu,
};
use crate::manifest::action::*;
use crate::manifest::TableManifest;
@@ -86,29 +87,51 @@ impl<R: Region> Table for MitoTable<R> {
//Add row key and columns
for name in key_columns {
let vector = columns_values
.remove(name)
.or_else(|| {
Self::try_get_column_default_constraint_vector(&schema, name, rows_num).ok()?
})
.context(MissingColumnSnafu { name })
.map_err(TableError::from)?;
let column_schema = schema
.column_schema_by_name(name)
.expect("column schema not found");
put_op
.add_key_column(name, vector)
.map_err(TableError::new)?;
}
// Add vaue columns
for name in value_columns {
let vector = columns_values.remove(name).or_else(|| {
Self::try_get_column_default_constraint_vector(&schema, name, rows_num).ok()?
Self::try_get_column_default_constraint_vector(column_schema, rows_num).ok()?
});
if let Some(vector) = vector {
put_op
.add_key_column(name, vector)
.map_err(TableError::new)?;
} else if !column_schema.is_nullable {
return MissingColumnSnafu { name }.fail().map_err(TableError::from);
}
}
// Add value columns
for name in value_columns {
let column_schema = schema
.column_schema_by_name(name)
.expect("column schema not found");
let vector = columns_values.remove(name).or_else(|| {
Self::try_get_column_default_constraint_vector(column_schema, rows_num).ok()?
});
if let Some(v) = vector {
put_op.add_value_column(name, v).map_err(TableError::new)?;
} else if !column_schema.is_nullable {
return MissingColumnSnafu { name }.fail().map_err(TableError::from);
}
}
ensure!(
columns_values.is_empty(),
ColumnsNotExistSnafu {
table_name: &table_info.name,
column_names: columns_values
.keys()
.into_iter()
.map(|s| s.to_string())
.collect::<Vec<_>>(),
}
);
logging::debug!(
"Insert into table {} with put_op: {:?}",
table_info.name,
@@ -181,42 +204,58 @@ impl<R: Region> Table for MitoTable<R> {
let table_info = self.table_info();
let table_name = &table_info.name;
let table_meta = &table_info.meta;
let (alter_op, table_schema) = match &req.alter_kind {
AlterKind::AddColumn { new_column } => {
let desc = ColumnDescriptorBuilder::new(
table_meta.next_column_id,
&new_column.name,
new_column.data_type.clone(),
)
.is_nullable(new_column.is_nullable)
.default_constraint(new_column.default_constraint.clone())
.build()
.context(error::BuildColumnDescriptorSnafu {
table_name,
column_name: &new_column.name,
})?;
let alter_op = AlterOperation::AddColumns {
columns: vec![AddColumn {
desc,
// TODO(yingwen): [alter] AlterTableRequest should be able to add a key column.
is_key: false,
}],
};
let (alter_op, table_schema, new_columns_num) = match &req.alter_kind {
AlterKind::AddColumns {
columns: new_columns,
} => {
let columns = new_columns
.iter()
.enumerate()
.map(|(i, add_column)| {
let new_column = &add_column.column_schema;
let desc = ColumnDescriptorBuilder::new(
table_meta.next_column_id + i as u32,
&new_column.name,
new_column.data_type.clone(),
)
.is_nullable(new_column.is_nullable)
.default_constraint(new_column.default_constraint.clone())
.build()
.context(error::BuildColumnDescriptorSnafu {
table_name,
column_name: &new_column.name,
})?;
Ok(AddColumn {
desc,
is_key: add_column.is_key,
})
})
.collect::<Result<Vec<_>>>()?;
let alter_op = AlterOperation::AddColumns { columns };
// TODO(yingwen): [alter] Better way to alter the schema struct. In fact the column order
// in table schema could differ from the region schema, so we could just push this column
// to the back of the schema (as last column).
let table_schema =
build_table_schema_with_new_column(table_name, &table_meta.schema, new_column)?;
let table_schema = build_table_schema_with_new_columns(
table_name,
&table_meta.schema,
new_columns,
)?;
(alter_op, table_schema)
(alter_op, table_schema, new_columns.len() as u32)
}
// TODO(dennis): supports removing columns etc.
_ => unimplemented!(),
};
let new_meta = TableMetaBuilder::default()
.schema(table_schema.clone())
.engine(&table_meta.engine)
.next_column_id(table_meta.next_column_id + 1) // Bump next column id.
.next_column_id(table_meta.next_column_id + new_columns_num) // Bump next column id.
.primary_key_indices(table_meta.primary_key_indices.clone())
.build()
.context(error::BuildTableMetaSnafu { table_name })?;
@@ -272,24 +311,27 @@ impl<R: Region> Table for MitoTable<R> {
}
}
fn build_table_schema_with_new_column(
fn build_table_schema_with_new_columns(
table_name: &str,
table_schema: &SchemaRef,
new_column: &ColumnSchema,
new_columns: &[AddColumnRequest],
) -> Result<SchemaRef> {
if table_schema
.column_schema_by_name(&new_column.name)
.is_some()
{
return error::ColumnExistsSnafu {
column_name: &new_column.name,
table_name,
}
.fail()?;
}
let mut columns = table_schema.column_schemas().to_vec();
columns.push(new_column.clone());
for add_column in new_columns {
let new_column = &add_column.column_schema;
if table_schema
.column_schema_by_name(&new_column.name)
.is_some()
{
return error::ColumnExistsSnafu {
column_name: &new_column.name,
table_name,
}
.fail()?;
}
columns.push(new_column.clone());
}
// Right now we are not support adding the column
// before or after some column, so just clone a new schema like this.
@@ -307,7 +349,7 @@ fn build_table_schema_with_new_column(
builder = builder.add_metadata(k, v);
}
let new_schema = Arc::new(builder.build().with_context(|_| error::SchemaBuildSnafu {
msg: format!("cannot add new column {:?}", new_column),
msg: format!("cannot add new columns {:?}", new_columns),
})?);
Ok(new_schema)
}
@@ -424,14 +466,10 @@ impl<R: Region> MitoTable<R> {
}
fn try_get_column_default_constraint_vector(
schema: &SchemaRef,
name: &str,
column_schema: &ColumnSchema,
rows_num: usize,
) -> TableResult<Option<VectorRef>> {
// TODO(dennis): when we support altering schema, we should check the schemas difference between table and region
let column_schema = schema
.column_schema_by_name(name)
.expect("column schema not found");
if let Some(v) = &column_schema.default_constraint {
assert!(rows_num > 0);
@@ -572,7 +610,12 @@ mod tests {
let table_schema = &table_meta.schema;
let new_column = ColumnSchema::new("host", ConcreteDataType::string_datatype(), true);
let result = build_table_schema_with_new_column(table_name, table_schema, &new_column);
let new_columns = vec![AddColumnRequest {
column_schema: new_column,
is_key: false,
}];
let result = build_table_schema_with_new_columns(table_name, table_schema, &new_columns);
assert!(result.is_err());
assert!(result
.unwrap_err()
@@ -580,8 +623,12 @@ mod tests {
.contains("Column host already exists in table demo"));
let new_column = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
let new_columns = vec![AddColumnRequest {
column_schema: new_column.clone(),
is_key: false,
}];
let new_schema =
build_table_schema_with_new_column(table_name, table_schema, &new_column).unwrap();
build_table_schema_with_new_columns(table_name, table_schema, &new_columns).unwrap();
assert_eq!(new_schema.num_columns(), table_schema.num_columns() + 1);
assert_eq!(

View File

@@ -45,9 +45,17 @@ pub struct AlterTableRequest {
pub alter_kind: AlterKind,
}
/// Add column request
#[derive(Debug)]
pub struct AddColumnRequest {
pub column_schema: ColumnSchema,
pub is_key: bool,
}
#[derive(Debug)]
pub enum AlterKind {
AddColumn { new_column: ColumnSchema },
AddColumns { columns: Vec<AddColumnRequest> },
RemoveColumns { names: Vec<String> },
}
/// Drop table request