feat: distribute mode support auto create table (#489)

This commit is contained in:
Lei, Huang
2022-11-14 19:53:35 +08:00
committed by GitHub
parent d10e45f4aa
commit c90832ea6c
7 changed files with 350 additions and 256 deletions

View File

@@ -344,6 +344,9 @@ pub enum Error {
source: client::Error,
},
#[snafu(display("Cannot find primary key column by name: {}", msg))]
PrimaryKeyNotFound { msg: String, backtrace: Backtrace },
#[snafu(display("Failed to execute sql: {}, source: {}", sql, source))]
ExecuteSql {
sql: String,
@@ -422,7 +425,7 @@ impl ErrorExt for Error {
Error::Select { source, .. } => source.status_code(),
Error::FindNewColumnsOnInsertion { source, .. } => source.status_code(),
Error::DeserializeInsertBatch { source, .. } => source.status_code(),
Error::PrimaryKeyNotFound { .. } => StatusCode::InvalidArguments,
Error::ExecuteSql { source, .. } => source.status_code(),
}
}

View File

@@ -0,0 +1,173 @@
use std::collections::HashMap;
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::codec::InsertBatch;
use api::v1::{ColumnDataType, CreateExpr};
use datatypes::schema::ColumnSchema;
use snafu::{ensure, ResultExt};
use sql::statements::create::{CreateTable, TIME_INDEX};
use sql::statements::{column_def_to_schema, table_idents_to_full_name};
use sqlparser::ast::{ColumnDef, TableConstraint};
use crate::error::InvalidSqlSnafu;
use crate::error::Result;
use crate::error::{
BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
ParseSqlSnafu,
};
pub type CreateExprFactoryRef = Arc<dyn CreateExprFactory + Send + Sync>;
#[async_trait::async_trait]
pub trait CreateExprFactory {
async fn create_expr_by_stmt(&self, stmt: &CreateTable) -> Result<CreateExpr>;
async fn create_expr_by_insert_batch(
&self,
catalog_name: &str,
schema_name: &str,
table_name: &str,
batch: &[InsertBatch],
) -> crate::error::Result<CreateExpr>;
}
#[derive(Debug)]
pub struct DefaultCreateExprFactory;
#[async_trait::async_trait]
impl CreateExprFactory for DefaultCreateExprFactory {
async fn create_expr_by_stmt(&self, stmt: &CreateTable) -> Result<CreateExpr> {
create_to_expr(None, vec![0], stmt)
}
async fn create_expr_by_insert_batch(
&self,
catalog_name: &str,
schema_name: &str,
table_name: &str,
batch: &[InsertBatch],
) -> Result<CreateExpr> {
let table_id = None;
let create_expr = common_insert::build_create_expr_from_insertion(
catalog_name,
schema_name,
table_id,
table_name,
batch,
)
.context(BuildCreateExprOnInsertionSnafu)?;
Ok(create_expr)
}
}
/// Convert `CreateTable` statement to `CreateExpr` gRPC request.
fn create_to_expr(
table_id: Option<u32>,
region_ids: Vec<u32>,
create: &CreateTable,
) -> Result<CreateExpr> {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(&create.name).context(ParseSqlSnafu)?;
let time_index = find_time_index(&create.constraints)?;
let expr = CreateExpr {
catalog_name: Some(catalog_name),
schema_name: Some(schema_name),
table_name,
desc: None,
column_defs: columns_to_expr(&create.columns, &time_index)?,
time_index,
primary_keys: find_primary_keys(&create.constraints)?,
create_if_not_exists: create.if_not_exists,
// TODO(LFC): Fill in other table options.
table_options: HashMap::from([("engine".to_string(), create.engine.clone())]),
table_id,
region_ids,
};
Ok(expr)
}
fn find_primary_keys(constraints: &[TableConstraint]) -> Result<Vec<String>> {
let primary_keys = constraints
.iter()
.filter_map(|constraint| match constraint {
TableConstraint::Unique {
name: _,
columns,
is_primary: true,
} => Some(columns.iter().map(|ident| ident.value.clone())),
_ => None,
})
.flatten()
.collect::<Vec<String>>();
Ok(primary_keys)
}
pub fn find_time_index(constraints: &[TableConstraint]) -> crate::error::Result<String> {
let time_index = constraints
.iter()
.filter_map(|constraint| match constraint {
TableConstraint::Unique {
name: Some(name),
columns,
is_primary: false,
} => {
if name.value == TIME_INDEX {
Some(columns.iter().map(|ident| &ident.value))
} else {
None
}
}
_ => None,
})
.flatten()
.collect::<Vec<&String>>();
ensure!(
time_index.len() == 1,
InvalidSqlSnafu {
err_msg: "must have one and only one TimeIndex columns",
}
);
Ok(time_index.first().unwrap().to_string())
}
fn columns_to_expr(
column_defs: &[ColumnDef],
time_index: &str,
) -> crate::error::Result<Vec<api::v1::ColumnDef>> {
let column_schemas = column_defs
.iter()
.map(|c| column_def_to_schema(c, c.name.to_string() == time_index).context(ParseSqlSnafu))
.collect::<Result<Vec<ColumnSchema>>>()?;
let column_datatypes = column_schemas
.iter()
.map(|c| {
ColumnDataTypeWrapper::try_from(c.data_type.clone())
.map(|w| w.datatype())
.context(ColumnDataTypeSnafu)
})
.collect::<Result<Vec<ColumnDataType>>>()?;
column_schemas
.iter()
.zip(column_datatypes.into_iter())
.map(|(schema, datatype)| {
Ok(api::v1::ColumnDef {
name: schema.name.clone(),
datatype: datatype as i32,
is_nullable: schema.is_nullable(),
default_constraint: match schema.default_constraint() {
None => None,
Some(v) => Some(v.clone().try_into().context(
ConvertColumnDefaultConstraintSnafu {
column_name: &schema.name,
},
)?),
},
})
})
.collect()
}

View File

@@ -7,15 +7,13 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use api::helper::ColumnDataTypeWrapper;
use api::result::ObjectResultBuilder;
use api::v1::alter_expr::Kind;
use api::v1::codec::InsertBatch;
use api::v1::object_expr::Expr;
use api::v1::{
insert_expr, AddColumns, AdminExpr, AdminResult, AlterExpr, ColumnDataType,
ColumnDef as GrpcColumnDef, CreateDatabaseExpr, CreateExpr, InsertExpr, ObjectExpr,
ObjectResult as GrpcObjectResult,
insert_expr, AddColumns, AdminExpr, AdminResult, AlterExpr, CreateDatabaseExpr, CreateExpr,
InsertExpr, ObjectExpr, ObjectResult as GrpcObjectResult,
};
use async_trait::async_trait;
use catalog::remote::MetaKvBackend;
@@ -26,7 +24,6 @@ use common_error::prelude::{BoxedError, StatusCode};
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_query::Output;
use common_telemetry::{debug, error, info};
use datatypes::schema::ColumnSchema;
use distributed::DistInstance;
use meta_client::client::MetaClientBuilder;
use meta_client::MetaClientOpts;
@@ -36,22 +33,19 @@ use servers::query_handler::{
PrometheusProtocolHandler, SqlQueryHandler,
};
use snafu::prelude::*;
use sql::ast::{ColumnDef, TableConstraint};
use sql::statements::create::{CreateTable, TIME_INDEX};
use sql::statements::create::Partitions;
use sql::statements::insert::Insert;
use sql::statements::statement::Statement;
use sql::statements::{column_def_to_schema, table_idents_to_full_name};
use sql::{dialect::GenericDialect, parser::ParserContext};
use table::table::TableIdProviderRef;
use crate::catalog::FrontendCatalogManager;
use crate::datanode::DatanodeClients;
use crate::error::{
self, AlterTableOnInsertionSnafu, AlterTableSnafu, BuildCreateExprOnInsertionSnafu,
BumpTableIdSnafu, CatalogNotFoundSnafu, CatalogSnafu, ConvertColumnDefaultConstraintSnafu,
CreateTableOnInsertionSnafu, CreateTableSnafu, DeserializeInsertBatchSnafu,
FindNewColumnsOnInsertionSnafu, InsertSnafu, Result, SchemaNotFoundSnafu, SelectSnafu,
self, AlterTableOnInsertionSnafu, AlterTableSnafu, CatalogNotFoundSnafu, CatalogSnafu,
CreateTableSnafu, DeserializeInsertBatchSnafu, FindNewColumnsOnInsertionSnafu, InsertSnafu,
Result, SchemaNotFoundSnafu, SelectSnafu,
};
use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory};
use crate::frontend::{FrontendOptions, Mode};
use crate::sql::insert_to_request;
use crate::table::route::TableRoutes;
@@ -80,9 +74,7 @@ pub struct Instance {
client: Client,
/// catalog manager is None in standalone mode, datanode will keep their own
catalog_manager: Option<CatalogManagerRef>,
/// Table id provider, in standalone mode is left to None, but in distributed mode,
/// table id should be generated by metasrv.
table_id_provider: Option<TableIdProviderRef>,
create_expr_factory: CreateExprFactoryRef,
// TODO(fys): it should be a trait that corresponds to two implementations:
// Standalone and Distributed, then the code behind it doesn't need to use so
// many match statements.
@@ -96,7 +88,7 @@ impl Default for Instance {
Self {
client: Client::default(),
catalog_manager: None,
table_id_provider: None,
create_expr_factory: Arc::new(DefaultCreateExprFactory {}),
mode: Mode::Standalone,
dist_instance: None,
}
@@ -189,42 +181,24 @@ impl Instance {
}
}
/// Convert `CreateTable` statement to `CreateExpr` gRPC request.
fn create_to_expr(
table_id: Option<u32>,
region_ids: Vec<u32>,
create: &CreateTable,
) -> Result<CreateExpr> {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(&create.name).context(error::ParseSqlSnafu)?;
let time_index = find_time_index(&create.constraints)?;
let expr = CreateExpr {
catalog_name: Some(catalog_name),
schema_name: Some(schema_name),
table_name,
desc: None,
column_defs: columns_to_expr(&create.columns, &time_index)?,
time_index,
primary_keys: find_primary_keys(&create.constraints)?,
create_if_not_exists: create.if_not_exists,
// TODO(LFC): Fill in other table options.
table_options: HashMap::from([("engine".to_string(), create.engine.clone())]),
table_id,
region_ids,
};
Ok(expr)
}
/// Handle create expr.
pub async fn handle_create_table(&self, expr: CreateExpr) -> Result<Output> {
let result = self.admin().create(expr.clone()).await;
if let Err(e) = &result {
error!(e; "Failed to create table by expr: {:?}", expr);
pub async fn handle_create_table(
&self,
mut expr: CreateExpr,
partitions: Option<Partitions>,
) -> Result<Output> {
if let Some(v) = &self.dist_instance {
v.create_table(&mut expr, partitions).await
} else {
// Currently standalone mode does not support multi partitions/regions.
let result = self.admin().create(expr.clone()).await;
if let Err(e) = &result {
error!(e; "Failed to create table by expr: {:?}", expr);
}
result
.and_then(admin_result_to_output)
.context(CreateTableSnafu)
}
result
.and_then(admin_result_to_output)
.context(CreateTableSnafu)
}
/// Handle create database expr.
@@ -397,29 +371,17 @@ impl Instance {
insert_batches: &[InsertBatch],
) -> Result<Output> {
// Create table automatically, build schema from data.
let table_id = match &self.table_id_provider {
Some(provider) => Some(provider.next_table_id().await.context(BumpTableIdSnafu)?),
None => None,
};
let create_expr = common_insert::build_create_expr_from_insertion(
catalog_name,
schema_name,
table_id,
table_name,
insert_batches,
)
.context(BuildCreateExprOnInsertionSnafu)?;
let create_expr = self
.create_expr_factory
.create_expr_by_insert_batch(catalog_name, schema_name, table_name, insert_batches)
.await?;
info!(
"Try to create table: {} automatically with request: {:?}",
table_name, create_expr,
);
self.admin()
.create(create_expr)
.await
.and_then(admin_result_to_output)
.context(CreateTableOnInsertionSnafu)
// Create-on-insert does support partition by other columns now
self.handle_create_table(create_expr, None).await
}
async fn add_new_columns_to_table(
@@ -501,7 +463,7 @@ impl Instance {
Self {
client,
catalog_manager: Some(catalog),
table_id_provider: None,
create_expr_factory: Arc::new(DefaultCreateExprFactory),
mode: Mode::Standalone,
dist_instance: None,
}
@@ -564,32 +526,17 @@ impl SqlQueryHandler for Instance {
}
},
Statement::CreateTable(create) => {
if let Some(dist_instance) = &self.dist_instance {
dist_instance
.create_table(&create)
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })
} else {
let table_id = match &self.table_id_provider {
Some(provider) => Some(
provider
.next_table_id()
.await
.context(BumpTableIdSnafu)
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })?,
),
None => None,
};
let expr = Self::create_to_expr(table_id, vec![0], &create)
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })?;
self.handle_create_table(expr)
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })
}
let create_expr = self
.create_expr_factory
.create_expr_by_stmt(&create)
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })?;
self.handle_create_table(create_expr, create.partitions)
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })
}
Statement::ShowDatabases(_) | Statement::ShowTables(_) => self
@@ -639,88 +586,6 @@ impl SqlQueryHandler for Instance {
}
}
fn find_primary_keys(constraints: &[TableConstraint]) -> Result<Vec<String>> {
let primary_keys = constraints
.iter()
.filter_map(|constraint| match constraint {
TableConstraint::Unique {
name: _,
columns,
is_primary: true,
} => Some(columns.iter().map(|ident| ident.value.clone())),
_ => None,
})
.flatten()
.collect::<Vec<String>>();
Ok(primary_keys)
}
fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
let time_index = constraints
.iter()
.filter_map(|constraint| match constraint {
TableConstraint::Unique {
name: Some(name),
columns,
is_primary: false,
} => {
if name.value == TIME_INDEX {
Some(columns.iter().map(|ident| &ident.value))
} else {
None
}
}
_ => None,
})
.flatten()
.collect::<Vec<&String>>();
ensure!(
time_index.len() == 1,
error::InvalidSqlSnafu {
err_msg: "must have one and only one TimeIndex columns",
}
);
Ok(time_index.first().unwrap().to_string())
}
fn columns_to_expr(column_defs: &[ColumnDef], time_index: &str) -> Result<Vec<GrpcColumnDef>> {
let column_schemas = column_defs
.iter()
.map(|c| {
column_def_to_schema(c, c.name.to_string() == time_index).context(error::ParseSqlSnafu)
})
.collect::<Result<Vec<ColumnSchema>>>()?;
let column_datatypes = column_schemas
.iter()
.map(|c| {
ColumnDataTypeWrapper::try_from(c.data_type.clone())
.map(|w| w.datatype())
.context(error::ColumnDataTypeSnafu)
})
.collect::<Result<Vec<ColumnDataType>>>()?;
column_schemas
.iter()
.zip(column_datatypes.into_iter())
.map(|(schema, datatype)| {
Ok(GrpcColumnDef {
name: schema.name.clone(),
datatype: datatype as i32,
is_nullable: schema.is_nullable(),
default_constraint: match schema.default_constraint() {
None => None,
Some(v) => Some(v.clone().try_into().context(
ConvertColumnDefaultConstraintSnafu {
column_name: &schema.name,
},
)?),
},
})
})
.collect()
}
#[async_trait]
impl GrpcQueryHandler for Instance {
async fn do_query(&self, query: ObjectExpr) -> server_error::Result<GrpcObjectResult> {
@@ -785,7 +650,8 @@ mod tests {
use api::v1::codec::{InsertBatch, SelectResult};
use api::v1::{
admin_expr, admin_result, column, column::SemanticType, object_expr, object_result,
select_expr, Column, ExprHeader, MutateResult, SelectExpr,
select_expr, Column, ColumnDataType, ColumnDef as GrpcColumnDef, ExprHeader, MutateResult,
SelectExpr,
};
use datatypes::schema::ColumnDefaultConstraint;
use datatypes::value::Value;

View File

@@ -1,13 +1,17 @@
use std::collections::HashMap;
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::CreateExpr;
use chrono::DateTime;
use client::admin::{admin_result_to_output, Admin};
use client::Select;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::{TableGlobalKey, TableGlobalValue};
use common_query::Output;
use common_telemetry::debug;
use datatypes::schema::RawSchema;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema};
use meta_client::client::MetaClient;
use meta_client::rpc::{
CreateRequest as MetaCreateRequest, Partition as MetaPartition, RouteResponse, TableName,
@@ -15,19 +19,16 @@ use meta_client::rpc::{
};
use query::{QueryEngineFactory, QueryEngineRef};
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::create::CreateTable;
use sql::statements::{
column_def_to_schema, sql_data_type_to_concrete_data_type, sql_value_to_value,
table_idents_to_full_name,
};
use sqlparser::ast::ColumnDef;
use sql::statements::create::Partitions;
use sql::statements::sql_value_to_value;
use sqlparser::ast::Value as SqlValue;
use table::metadata::RawTableMeta;
use crate::catalog::FrontendCatalogManager;
use crate::datanode::DatanodeClients;
use crate::error::{self, Result};
use crate::instance::{find_primary_keys, find_time_index, Instance};
use crate::error::{
self, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu, PrimaryKeyNotFoundSnafu, Result,
};
use crate::partitioning::{PartitionBound, PartitionDef};
#[derive(Clone)]
@@ -53,14 +54,17 @@ impl DistInstance {
}
}
pub(crate) async fn create_table(&self, create_table: &CreateTable) -> Result<Output> {
let response = self.create_table_in_meta(create_table).await?;
pub(crate) async fn create_table(
&self,
create_table: &mut CreateExpr,
partitions: Option<Partitions>,
) -> Result<Output> {
let response = self.create_table_in_meta(create_table, partitions).await?;
let table_routes = response.table_routes;
ensure!(
table_routes.len() == 1,
error::FindTableRoutesSnafu {
table_name: create_table.name.to_string()
table_name: create_table.table_name.to_string()
}
);
let table_route = table_routes.first().unwrap();
@@ -69,10 +73,10 @@ impl DistInstance {
ensure!(
!region_routes.is_empty(),
error::FindRegionRoutesSnafu {
table_name: create_table.name.to_string()
table_name: create_table.table_name.to_string()
}
);
create_table.table_id = Some(table_route.table.id as u32);
self.put_table_global_meta(create_table, table_route)
.await?;
@@ -81,18 +85,16 @@ impl DistInstance {
let client = Admin::new("greptime", client);
let regions = table_route.find_leader_regions(&datanode);
let create_expr = Instance::create_to_expr(
Some(table_route.table.id as u32),
regions.clone(),
create_table,
)?;
let mut create_expr_for_region = create_table.clone();
create_expr_for_region.region_ids = regions;
debug!(
"Creating table {:?} on Datanode {:?} with regions {:?}",
create_table, datanode, regions,
create_table, datanode, create_expr_for_region.region_ids,
);
client
.create(create_expr)
.create(create_expr_for_region)
.await
.and_then(admin_result_to_output)
.context(error::InvalidAdminResultSnafu)?;
@@ -113,13 +115,24 @@ impl DistInstance {
.context(error::ExecuteSqlSnafu { sql })
}
async fn create_table_in_meta(&self, create_table: &CreateTable) -> Result<RouteResponse> {
let (catalog, schema, table) =
table_idents_to_full_name(&create_table.name).context(error::ParseSqlSnafu)?;
let table_name = TableName::new(catalog, schema, table);
let partitions = parse_partitions(create_table)?;
async fn create_table_in_meta(
&self,
create_table: &CreateExpr,
partitions: Option<Partitions>,
) -> Result<RouteResponse> {
let table_name = TableName::new(
create_table
.catalog_name
.clone()
.unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string()),
create_table
.schema_name
.clone()
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string()),
create_table.table_name.clone(),
);
let partitions = parse_partitions(create_table, partitions)?;
let request = MetaCreateRequest {
table_name,
partitions,
@@ -133,7 +146,7 @@ impl DistInstance {
// TODO(LFC): Maybe move this to FrontendCatalogManager's "register_table" method?
async fn put_table_global_meta(
&self,
create_table: &CreateTable,
create_table: &CreateExpr,
table_route: &TableRoute,
) -> Result<()> {
let table_name = &table_route.table.table_name;
@@ -156,7 +169,7 @@ impl DistInstance {
}
fn create_table_global_value(
create_table: &CreateTable,
create_table: &CreateExpr,
table_route: &TableRoute,
) -> Result<TableGlobalValue> {
let table_name = &table_route.table.table_name;
@@ -171,43 +184,40 @@ fn create_table_global_value(
})?
.id;
let mut column_schemas = Vec::with_capacity(create_table.columns.len());
let time_index = find_time_index(&create_table.constraints)?;
for column in create_table.columns.iter() {
column_schemas.push(
column_def_to_schema(column, column.name.value == time_index)
.context(error::ParseSqlSnafu)?,
);
let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
let mut column_name_to_index_map = HashMap::new();
for (idx, column) in create_table.column_defs.iter().enumerate() {
column_schemas.push(create_column_schema(column)?);
column_name_to_index_map.insert(column.name.clone(), idx);
}
let timestamp_index = column_schemas.iter().enumerate().find_map(|(i, c)| {
if c.name == time_index {
Some(i)
} else {
None
}
});
let timestamp_index = column_name_to_index_map
.get(&create_table.time_index)
.cloned();
let raw_schema = RawSchema {
column_schemas: column_schemas.clone(),
timestamp_index,
version: 0,
};
let primary_key_indices = find_primary_keys(&create_table.constraints)?
let primary_key_indices = create_table
.primary_keys
.iter()
.map(|k| {
column_schemas
.iter()
.enumerate()
.find_map(|(i, c)| if &c.name == k { Some(i) } else { None })
.unwrap() // unwrap is safe because primary key's column name must have been defined
.map(|name| {
column_name_to_index_map
.get(name)
.cloned()
.context(PrimaryKeyNotFoundSnafu { msg: name })
})
.collect::<Vec<usize>>();
.collect::<Result<Vec<_>>>()?;
let meta = RawTableMeta {
schema: raw_schema,
primary_key_indices,
value_indices: vec![],
engine: create_table.engine.clone(),
engine: "mito".to_string(),
next_column_id: column_schemas.len() as u32,
region_numbers: vec![],
engine_options: HashMap::new(),
@@ -223,11 +233,37 @@ fn create_table_global_value(
})
}
fn parse_partitions(create_table: &CreateTable) -> Result<Vec<MetaPartition>> {
// Remove this duplication in the future
fn create_column_schema(column_def: &api::v1::ColumnDef) -> Result<ColumnSchema> {
let data_type =
ColumnDataTypeWrapper::try_new(column_def.datatype).context(error::ColumnDataTypeSnafu)?;
let default_constraint = match &column_def.default_constraint {
None => None,
Some(v) => Some(ColumnDefaultConstraint::try_from(&v[..]).context(
ConvertColumnDefaultConstraintSnafu {
column_name: &column_def.name,
},
)?),
};
ColumnSchema::new(
column_def.name.clone(),
data_type.into(),
column_def.is_nullable,
)
.with_default_constraint(default_constraint)
.context(ConvertColumnDefaultConstraintSnafu {
column_name: &column_def.name,
})
}
fn parse_partitions(
create_table: &CreateExpr,
partitions: Option<Partitions>,
) -> Result<Vec<MetaPartition>> {
// If partitions are not defined by user, use the timestamp column (which has to be existed) as
// the partition column, and create only one partition.
let partition_columns = find_partition_columns(create_table)?;
let partition_entries = find_partition_entries(create_table, &partition_columns)?;
let partition_columns = find_partition_columns(create_table, &partitions)?;
let partition_entries = find_partition_entries(create_table, &partitions, &partition_columns)?;
partition_entries
.into_iter()
@@ -236,26 +272,28 @@ fn parse_partitions(create_table: &CreateTable) -> Result<Vec<MetaPartition>> {
}
fn find_partition_entries(
create_table: &CreateTable,
create_table: &CreateExpr,
partitions: &Option<Partitions>,
partition_columns: &[String],
) -> Result<Vec<Vec<PartitionBound>>> {
let entries = if let Some(partitions) = &create_table.partitions {
let entries = if let Some(partitions) = partitions {
let column_defs = partition_columns
.iter()
.map(|pc| {
create_table
.columns
.column_defs
.iter()
.find(|c| &c.name.value == pc)
.find(|c| &c.name == pc)
// unwrap is safe here because we have checked that partition columns are defined
.unwrap()
})
.collect::<Vec<&ColumnDef>>();
.collect::<Vec<_>>();
let mut column_name_and_type = Vec::with_capacity(column_defs.len());
for column in column_defs {
let column_name = &column.name.value;
let data_type = sql_data_type_to_concrete_data_type(&column.data_type)
.context(error::ParseSqlSnafu)?;
let column_name = &column.name;
let data_type = ConcreteDataType::from(
ColumnDataTypeWrapper::try_new(column.datatype).context(ColumnDataTypeSnafu)?,
);
column_name_and_type.push((column_name, data_type));
}
@@ -283,34 +321,38 @@ fn find_partition_entries(
Ok(entries)
}
fn find_partition_columns(create_table: &CreateTable) -> Result<Vec<String>> {
let columns = if let Some(partitions) = &create_table.partitions {
fn find_partition_columns(
create_table: &CreateExpr,
partitions: &Option<Partitions>,
) -> Result<Vec<String>> {
let columns = if let Some(partitions) = partitions {
partitions
.column_list
.iter()
.map(|x| x.value.clone())
.collect::<Vec<String>>()
.collect::<Vec<_>>()
} else {
vec![find_time_index(&create_table.constraints)?]
vec![create_table.time_index.clone()]
};
Ok(columns)
}
#[cfg(test)]
mod test {
use sql::parser::ParserContext;
use sql::statements::statement::Statement;
use sqlparser::dialect::GenericDialect;
use super::*;
use crate::expr_factory::{CreateExprFactory, DefaultCreateExprFactory};
#[test]
fn test_parse_partitions() {
#[tokio::test]
async fn test_parse_partitions() {
common_telemetry::init_default_ut_logging();
let cases = [
(
r"
CREATE TABLE rcx ( a INT, b STRING, c INT )
CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
PARTITION BY RANGE COLUMNS (b) (
PARTITION r0 VALUES LESS THAN ('hz'),
PARTITION r1 VALUES LESS THAN ('sh'),
@@ -321,7 +363,7 @@ ENGINE=mito",
),
(
r"
CREATE TABLE rcx ( a INT, b STRING, c INT )
CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
PARTITION BY RANGE COLUMNS (b, a) (
PARTITION r0 VALUES LESS THAN ('hz', 10),
PARTITION r1 VALUES LESS THAN ('sh', 20),
@@ -335,7 +377,10 @@ ENGINE=mito",
let result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap();
match &result[0] {
Statement::CreateTable(c) => {
let partitions = parse_partitions(c).unwrap();
common_telemetry::info!("{}", sql);
let factory = DefaultCreateExprFactory {};
let expr = factory.create_expr_by_stmt(c).await.unwrap();
let partitions = parse_partitions(&expr, c.partitions.clone()).unwrap();
let json = serde_json::to_string(&partitions).unwrap();
assert_eq!(json, expected);
}

View File

@@ -3,6 +3,7 @@
mod catalog;
mod datanode;
pub mod error;
mod expr_factory;
pub mod frontend;
pub mod grpc;
pub mod influxdb;

View File

@@ -465,6 +465,7 @@ mod test {
use super::*;
use crate::catalog::FrontendCatalogManager;
use crate::expr_factory::{CreateExprFactory, DefaultCreateExprFactory};
use crate::instance::distributed::DistInstance;
use crate::partitioning::range::RangePartitionRule;
@@ -810,7 +811,12 @@ mod test {
wait_datanodes_alive(kv_store).await;
let _result = dist_instance.create_table(&create_table).await.unwrap();
let factory = DefaultCreateExprFactory {};
let mut expr = factory.create_expr_by_stmt(&create_table).await.unwrap();
let _result = dist_instance
.create_table(&mut expr, create_table.partitions)
.await
.unwrap();
let table_route = table_routes.get_route(&table_name).await.unwrap();
println!("{}", serde_json::to_string_pretty(&table_route).unwrap());

View File

@@ -242,7 +242,7 @@ pub fn column_def_to_schema(column_def: &ColumnDef, is_time_index: bool) -> Resu
}
/// Convert `ColumnDef` in sqlparser to `ColumnDef` in gRPC proto.
fn sql_column_def_to_grpc_column_def(col: ColumnDef) -> Result<api::v1::ColumnDef> {
pub fn sql_column_def_to_grpc_column_def(col: ColumnDef) -> Result<api::v1::ColumnDef> {
let name = col.name.value.clone();
let data_type = sql_data_type_to_concrete_data_type(&col.data_type)?;
let nullable = col