feat: impl alter table in distributed mode (#572)

This commit is contained in:
Lei, HUANG
2022-11-22 15:17:25 +08:00
committed by GitHub
parent 0791c65149
commit c144a1b20e
26 changed files with 425 additions and 221 deletions

9
Cargo.lock generated
View File

@@ -1045,7 +1045,7 @@ dependencies = [
"common-base",
"common-error",
"common-grpc",
"common-insert",
"common-grpc-expr",
"common-query",
"common-recordbatch",
"common-time",
@@ -1213,12 +1213,13 @@ dependencies = [
]
[[package]]
name = "common-insert"
name = "common-grpc-expr"
version = "0.1.0"
dependencies = [
"api",
"async-trait",
"common-base",
"common-catalog",
"common-error",
"common-query",
"common-telemetry",
@@ -1740,7 +1741,7 @@ dependencies = [
"common-catalog",
"common-error",
"common-grpc",
"common-insert",
"common-grpc-expr",
"common-query",
"common-recordbatch",
"common-runtime",
@@ -2119,7 +2120,7 @@ dependencies = [
"common-catalog",
"common-error",
"common-grpc",
"common-insert",
"common-grpc-expr",
"common-query",
"common-recordbatch",
"common-runtime",

View File

@@ -15,7 +15,7 @@ members = [
"src/common/recordbatch",
"src/common/runtime",
"src/common/substrait",
"src/common/insert",
"src/common/grpc-expr",
"src/common/telemetry",
"src/common/time",
"src/datanode",

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod column_def;
pub mod error;
pub mod helper;
pub mod prometheus;

View File

@@ -21,4 +21,5 @@ pub mod codec {
tonic::include_proto!("greptime.v1.codec");
}
mod column_def;
pub mod meta;

View File

@@ -13,7 +13,7 @@ common-error = { path = "../common/error" }
common-grpc = { path = "../common/grpc" }
common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-insert = { path = "../common/insert" }
common-grpc-expr = { path = "../common/grpc-expr" }
common-time = { path = "../common/time" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [
"simd",

View File

@@ -23,7 +23,7 @@ use api::v1::{
};
use common_error::status_code::StatusCode;
use common_grpc::{AsExecutionPlan, DefaultAsPlanImpl};
use common_insert::column_to_vector;
use common_grpc_expr::column_to_vector;
use common_query::Output;
use common_recordbatch::{RecordBatch, RecordBatches};
use datafusion::physical_plan::ExecutionPlan;

View File

@@ -103,7 +103,7 @@ pub enum Error {
#[snafu(display("Failed to convert column to vector, source: {}", source))]
ColumnToVector {
#[snafu(backtrace)]
source: common_insert::error::Error,
source: common_grpc_expr::error::Error,
},
}

View File

@@ -97,10 +97,7 @@ mod tests {
#[test]
fn test_start_node_error() {
fn throw_datanode_error() -> StdResult<datanode::error::Error> {
datanode::error::MissingFieldSnafu {
field: "test_field",
}
.fail()
datanode::error::MissingNodeIdSnafu {}.fail()
}
let e = throw_datanode_error()

View File

@@ -1,5 +1,5 @@
[package]
name = "common-insert"
name = "common-grpc-expr"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
@@ -11,6 +11,7 @@ common-base = { path = "../base" }
common-error = { path = "../error" }
common-telemetry = { path = "../telemetry" }
common-time = { path = "../time" }
common-catalog = { path = "../catalog" }
common-query = { path = "../query" }
datatypes = { path = "../../datatypes" }
snafu = { version = "0.7", features = ["backtraces"] }

View File

@@ -0,0 +1,234 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use api::v1::alter_expr::Kind;
use api::v1::{AlterExpr, CreateExpr, DropColumns};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableId;
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, CreateTableRequest};
use crate::error::{
ColumnNotFoundSnafu, CreateSchemaSnafu, InvalidColumnDefSnafu, MissingFieldSnafu,
MissingTimestampColumnSnafu, Result,
};
/// Convert an [`AlterExpr`] to an optional [`AlterTableRequest`]
pub fn alter_expr_to_request(expr: AlterExpr) -> Result<Option<AlterTableRequest>> {
match expr.kind {
Some(Kind::AddColumns(add_columns)) => {
let add_column_requests = add_columns
.add_columns
.into_iter()
.map(|ac| {
let column_def = ac.column_def.context(MissingFieldSnafu {
field: "column_def",
})?;
let schema =
column_def
.try_as_column_schema()
.context(InvalidColumnDefSnafu {
column: &column_def.name,
})?;
Ok(AddColumnRequest {
column_schema: schema,
is_key: ac.is_key,
})
})
.collect::<Result<Vec<_>>>()?;
let alter_kind = AlterKind::AddColumns {
columns: add_column_requests,
};
let request = AlterTableRequest {
catalog_name: expr.catalog_name,
schema_name: expr.schema_name,
table_name: expr.table_name,
alter_kind,
};
Ok(Some(request))
}
Some(Kind::DropColumns(DropColumns { drop_columns })) => {
let alter_kind = AlterKind::DropColumns {
names: drop_columns.into_iter().map(|c| c.name).collect(),
};
let request = AlterTableRequest {
catalog_name: expr.catalog_name,
schema_name: expr.schema_name,
table_name: expr.table_name,
alter_kind,
};
Ok(Some(request))
}
None => Ok(None),
}
}
pub fn create_table_schema(expr: &CreateExpr) -> Result<SchemaRef> {
let column_schemas = expr
.column_defs
.iter()
.map(|x| {
x.try_as_column_schema()
.context(InvalidColumnDefSnafu { column: &x.name })
})
.collect::<Result<Vec<ColumnSchema>>>()?;
ensure!(
column_schemas
.iter()
.any(|column| column.name == expr.time_index),
MissingTimestampColumnSnafu {
msg: format!("CreateExpr: {:?}", expr)
}
);
let column_schemas = column_schemas
.into_iter()
.map(|column_schema| {
if column_schema.name == expr.time_index {
column_schema.with_time_index(true)
} else {
column_schema
}
})
.collect::<Vec<_>>();
Ok(Arc::new(
SchemaBuilder::try_from(column_schemas)
.context(CreateSchemaSnafu)?
.build()
.context(CreateSchemaSnafu)?,
))
}
pub fn create_expr_to_request(table_id: TableId, expr: CreateExpr) -> Result<CreateTableRequest> {
let schema = create_table_schema(&expr)?;
let primary_key_indices = expr
.primary_keys
.iter()
.map(|key| {
schema
.column_index_by_name(key)
.context(ColumnNotFoundSnafu {
column_name: key,
table_name: &expr.table_name,
})
})
.collect::<Result<Vec<usize>>>()?;
let catalog_name = expr
.catalog_name
.unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string());
let schema_name = expr
.schema_name
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());
let region_ids = if expr.region_ids.is_empty() {
vec![0]
} else {
expr.region_ids
};
Ok(CreateTableRequest {
id: table_id,
catalog_name,
schema_name,
table_name: expr.table_name,
desc: expr.desc,
schema,
region_numbers: region_ids,
primary_key_indices,
create_if_not_exists: expr.create_if_not_exists,
table_options: expr.table_options,
})
}
#[cfg(test)]
mod tests {
use api::v1::{AddColumn, AddColumns, ColumnDataType, ColumnDef, DropColumn};
use datatypes::prelude::ConcreteDataType;
use super::*;
#[test]
fn test_alter_expr_to_request() {
let expr = AlterExpr {
catalog_name: None,
schema_name: None,
table_name: "monitor".to_string(),
kind: Some(Kind::AddColumns(AddColumns {
add_columns: vec![AddColumn {
column_def: Some(ColumnDef {
name: "mem_usage".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: false,
default_constraint: None,
}),
is_key: false,
}],
})),
};
let alter_request = alter_expr_to_request(expr).unwrap().unwrap();
assert_eq!(None, alter_request.catalog_name);
assert_eq!(None, alter_request.schema_name);
assert_eq!("monitor".to_string(), alter_request.table_name);
let add_column = match alter_request.alter_kind {
AlterKind::AddColumns { mut columns } => columns.pop().unwrap(),
_ => unreachable!(),
};
assert!(!add_column.is_key);
assert_eq!("mem_usage", add_column.column_schema.name);
assert_eq!(
ConcreteDataType::float64_datatype(),
add_column.column_schema.data_type
);
}
#[test]
fn test_drop_column_expr() {
let expr = AlterExpr {
catalog_name: Some("test_catalog".to_string()),
schema_name: Some("test_schema".to_string()),
table_name: "monitor".to_string(),
kind: Some(Kind::DropColumns(DropColumns {
drop_columns: vec![DropColumn {
name: "mem_usage".to_string(),
}],
})),
};
let alter_request = alter_expr_to_request(expr).unwrap().unwrap();
assert_eq!(Some("test_catalog".to_string()), alter_request.catalog_name);
assert_eq!(Some("test_schema".to_string()), alter_request.schema_name);
assert_eq!("monitor".to_string(), alter_request.table_name);
let mut drop_names = match alter_request.alter_kind {
AlterKind::DropColumns { names } => names,
_ => unreachable!(),
};
assert_eq!(1, drop_names.len());
assert_eq!("mem_usage".to_string(), drop_names.pop().unwrap());
}
}

View File

@@ -22,7 +22,7 @@ use snafu::{Backtrace, ErrorCompat};
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Column {} not found in table {}", column_name, table_name))]
#[snafu(display("Column `{}` not found in table `{}`", column_name, table_name))]
ColumnNotFound {
column_name: String,
table_name: String,
@@ -57,8 +57,8 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display("Missing timestamp column in request"))]
MissingTimestampColumn { backtrace: Backtrace },
#[snafu(display("Missing timestamp column, msg: {}", msg))]
MissingTimestampColumn { msg: String, backtrace: Backtrace },
#[snafu(display("Invalid column proto: {}", err_msg))]
InvalidColumnProto {
@@ -70,6 +70,26 @@ pub enum Error {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display("Missing required field in protobuf, field: {}", field))]
MissingField { field: String, backtrace: Backtrace },
#[snafu(display("Invalid column default constraint, source: {}", source))]
ColumnDefaultConstraint {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display(
"Invalid column proto definition, column: {}, source: {}",
column,
source
))]
InvalidColumnDef {
column: String,
#[snafu(backtrace)]
source: api::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -87,6 +107,9 @@ impl ErrorExt for Error {
| Error::MissingTimestampColumn { .. } => StatusCode::InvalidArguments,
Error::InvalidColumnProto { .. } => StatusCode::InvalidArguments,
Error::CreateVector { .. } => StatusCode::InvalidArguments,
Error::MissingField { .. } => StatusCode::InvalidArguments,
Error::ColumnDefaultConstraint { source, .. } => source.status_code(),
Error::InvalidColumnDef { source, .. } => source.status_code(),
}
}
fn backtrace_opt(&self) -> Option<&Backtrace> {

View File

@@ -245,7 +245,10 @@ pub fn build_create_expr_from_insertion(
}
}
ensure!(timestamp_index != usize::MAX, MissingTimestampColumnSnafu);
ensure!(
timestamp_index != usize::MAX,
MissingTimestampColumnSnafu { msg: table_name }
);
let timestamp_field_name = columns[timestamp_index].column_name.clone();
let primary_keys = primary_key_indices

View File

@@ -1,3 +1,4 @@
#![feature(assert_matches)]
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
@@ -12,8 +13,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod alter;
pub mod error;
mod insert;
pub use alter::{alter_expr_to_request, create_expr_to_request, create_table_schema};
pub use insert::{
build_alter_table_request, build_create_expr_from_insertion, column_to_vector,
find_new_columns, insert_batches, insertion_expr_to_request,

View File

@@ -23,7 +23,7 @@ common-recordbatch = { path = "../common/recordbatch" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
common-insert = { path = "../common/insert" }
common-grpc-expr = { path = "../common/grpc-expr" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [
"simd",
] }

View File

@@ -82,9 +82,6 @@ pub enum Error {
table_name: String,
},
#[snafu(display("Missing required field in protobuf, field: {}", field))]
MissingField { field: String, backtrace: Backtrace },
#[snafu(display("Missing timestamp column in request"))]
MissingTimestampColumn { backtrace: Backtrace },
@@ -202,21 +199,16 @@ pub enum Error {
source: common_grpc::Error,
},
#[snafu(display("Column datatype error, source: {}", source))]
ColumnDataType {
#[snafu(display("Failed to convert alter expr to request: {}", source))]
AlterExprToRequest {
#[snafu(backtrace)]
source: api::error::Error,
source: common_grpc_expr::error::Error,
},
#[snafu(display(
"Invalid column proto definition, column: {}, source: {}",
column,
source
))]
InvalidColumnDef {
column: String,
#[snafu(display("Failed to convert create expr to request: {}", source))]
CreateExprToRequest {
#[snafu(backtrace)]
source: api::error::Error,
source: common_grpc_expr::error::Error,
},
#[snafu(display("Failed to parse SQL, source: {}", source))]
@@ -263,7 +255,7 @@ pub enum Error {
#[snafu(display("Failed to insert data, source: {}", source))]
InsertData {
#[snafu(backtrace)]
source: common_insert::error::Error,
source: common_grpc_expr::error::Error,
},
#[snafu(display("Insert batch is empty"))]
@@ -316,6 +308,8 @@ impl ErrorExt for Error {
source.status_code()
}
Error::AlterExprToRequest { source, .. }
| Error::CreateExprToRequest { source, .. } => source.status_code(),
Error::CreateSchema { source, .. }
| Error::ConvertSchema { source, .. }
| Error::VectorComputation { source } => source.status_code(),
@@ -324,7 +318,6 @@ impl ErrorExt for Error {
| Error::InvalidSql { .. }
| Error::KeyColumnNotFound { .. }
| Error::InvalidPrimaryKey { .. }
| Error::MissingField { .. }
| Error::MissingTimestampColumn { .. }
| Error::CatalogNotFound { .. }
| Error::SchemaNotFound { .. }
@@ -343,10 +336,6 @@ impl ErrorExt for Error {
| Error::UnsupportedExpr { .. }
| Error::Catalog { .. } => StatusCode::Internal,
Error::ColumnDataType { source } | Error::InvalidColumnDef { source, .. } => {
source.status_code()
}
Error::InitBackend { .. } => StatusCode::StorageUnavailable,
Error::OpenLogStore { source } => source.status_code(),
Error::StartScriptManager { source } => source.status_code(),

View File

@@ -22,7 +22,7 @@ use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_grpc::select::to_object_result;
use common_insert::insertion_expr_to_request;
use common_grpc_expr::insertion_expr_to_request;
use common_query::Output;
use query::plan::LogicalPlan;
use servers::query_handler::{GrpcAdminHandler, GrpcQueryHandler};
@@ -56,7 +56,7 @@ impl Instance {
.context(SchemaNotFoundSnafu { name: schema_name })?;
let insert_batches =
common_insert::insert_batches(&values.values).context(InsertDataSnafu)?;
common_grpc_expr::insert_batches(&values.values).context(InsertDataSnafu)?;
ensure!(!insert_batches.is_empty(), EmptyInsertBatchSnafu);

View File

@@ -12,22 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use api::result::AdminResultBuilder;
use api::v1::alter_expr::Kind;
use api::v1::{AdminResult, AlterExpr, CreateExpr, DropColumns};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use api::v1::{AdminResult, AlterExpr, CreateExpr};
use common_error::prelude::{ErrorExt, StatusCode};
use common_grpc_expr::{alter_expr_to_request, create_expr_to_request};
use common_query::Output;
use common_telemetry::{error, info};
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
use futures::TryFutureExt;
use snafu::prelude::*;
use table::metadata::TableId;
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, CreateTableRequest};
use crate::error::{self, BumpTableIdSnafu, MissingFieldSnafu, Result};
use crate::error::{AlterExprToRequestSnafu, BumpTableIdSnafu, CreateExprToRequestSnafu};
use crate::instance::Instance;
use crate::sql::SqlRequest;
@@ -75,7 +69,7 @@ impl Instance {
}
};
let request = create_expr_to_request(table_id, expr).await;
let request = create_expr_to_request(table_id, expr).context(CreateExprToRequestSnafu);
let result = futures::future::ready(request)
.and_then(|request| self.sql_handler().execute(SqlRequest::CreateTable(request)))
.await;
@@ -94,14 +88,17 @@ impl Instance {
}
pub(crate) async fn handle_alter(&self, expr: AlterExpr) -> AdminResult {
let request = match alter_expr_to_request(expr).transpose() {
Some(req) => req,
let request = match alter_expr_to_request(expr)
.context(AlterExprToRequestSnafu)
.transpose()
{
None => {
return AdminResultBuilder::default()
.status_code(StatusCode::Success as u32)
.mutate_result(0, 0)
.build()
}
Some(req) => req,
};
let result = futures::future::ready(request)
@@ -121,154 +118,25 @@ impl Instance {
}
}
async fn create_expr_to_request(table_id: TableId, expr: CreateExpr) -> Result<CreateTableRequest> {
let schema = create_table_schema(&expr)?;
let primary_key_indices = expr
.primary_keys
.iter()
.map(|key| {
schema
.column_index_by_name(key)
.context(error::KeyColumnNotFoundSnafu { name: key })
})
.collect::<Result<Vec<usize>>>()?;
let catalog_name = expr
.catalog_name
.unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string());
let schema_name = expr
.schema_name
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());
let region_ids = if expr.region_ids.is_empty() {
vec![0]
} else {
expr.region_ids
};
Ok(CreateTableRequest {
id: table_id,
catalog_name,
schema_name,
table_name: expr.table_name,
desc: expr.desc,
schema,
region_numbers: region_ids,
primary_key_indices,
create_if_not_exists: expr.create_if_not_exists,
table_options: expr.table_options,
})
}
fn alter_expr_to_request(expr: AlterExpr) -> Result<Option<AlterTableRequest>> {
match expr.kind {
Some(Kind::AddColumns(add_columns)) => {
let mut add_column_requests = vec![];
for add_column_expr in add_columns.add_columns {
let column_def = add_column_expr.column_def.context(MissingFieldSnafu {
field: "column_def",
})?;
let schema =
column_def
.try_as_column_schema()
.context(error::InvalidColumnDefSnafu {
column: &column_def.name,
})?;
add_column_requests.push(AddColumnRequest {
column_schema: schema,
is_key: add_column_expr.is_key,
})
}
let alter_kind = AlterKind::AddColumns {
columns: add_column_requests,
};
let request = AlterTableRequest {
catalog_name: expr.catalog_name,
schema_name: expr.schema_name,
table_name: expr.table_name,
alter_kind,
};
Ok(Some(request))
}
Some(Kind::DropColumns(DropColumns { drop_columns })) => {
let alter_kind = AlterKind::DropColumns {
names: drop_columns.into_iter().map(|c| c.name).collect(),
};
let request = AlterTableRequest {
catalog_name: expr.catalog_name,
schema_name: expr.schema_name,
table_name: expr.table_name,
alter_kind,
};
Ok(Some(request))
}
None => Ok(None),
}
}
fn create_table_schema(expr: &CreateExpr) -> Result<SchemaRef> {
let column_schemas = expr
.column_defs
.iter()
.map(|x| {
x.try_as_column_schema()
.context(error::InvalidColumnDefSnafu { column: &x.name })
})
.collect::<Result<Vec<ColumnSchema>>>()?;
ensure!(
column_schemas
.iter()
.any(|column| column.name == expr.time_index),
error::KeyColumnNotFoundSnafu {
name: &expr.time_index,
}
);
let column_schemas = column_schemas
.into_iter()
.map(|column_schema| {
if column_schema.name == expr.time_index {
column_schema.with_time_index(true)
} else {
column_schema
}
})
.collect::<Vec<_>>();
Ok(Arc::new(
SchemaBuilder::try_from(column_schemas)
.context(error::CreateSchemaSnafu)?
.build()
.context(error::CreateSchemaSnafu)?,
))
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::ColumnDef;
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_grpc_expr::create_table_schema;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnDefaultConstraint;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaBuilder, SchemaRef};
use datatypes::value::Value;
use super::*;
use crate::tests::test_util;
#[tokio::test(flavor = "multi_thread")]
async fn test_create_expr_to_request() {
common_telemetry::init_default_ut_logging();
let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts("create_expr_to_request");
let instance = Instance::with_mock_meta_client(&opts).await.unwrap();
instance.start().await.unwrap();
let expr = testing_create_expr();
let request = create_expr_to_request(1024, expr).await.unwrap();
assert_eq!(request.id, common_catalog::consts::MIN_USER_TABLE_ID);
let request = create_expr_to_request(1024, expr).unwrap();
assert_eq!(request.id, MIN_USER_TABLE_ID);
assert_eq!(request.catalog_name, "greptime".to_string());
assert_eq!(request.schema_name, "public".to_string());
assert_eq!(request.table_name, "my-metrics");
@@ -279,12 +147,13 @@ mod tests {
let mut expr = testing_create_expr();
expr.primary_keys = vec!["host".to_string(), "not-exist-column".to_string()];
let result = create_expr_to_request(1025, expr).await;
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Specified timestamp key or primary key column not found: not-exist-column"));
let result = create_expr_to_request(1025, expr);
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("Column `not-exist-column` not found in table `my-metrics`"),
"{}",
err_msg
);
}
#[test]
@@ -295,11 +164,12 @@ mod tests {
expr.time_index = "not-exist-column".to_string();
let result = create_table_schema(&expr);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Specified timestamp key or primary key column not found: not-exist-column"));
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("Missing timestamp column"),
"actual: {}",
err_msg
);
}
#[test]

View File

@@ -20,7 +20,7 @@ common-catalog = { path = "../common/catalog" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
common-insert = { path = "../common/insert" }
common-grpc-expr = { path = "../common/grpc-expr" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [
"simd",
] }

View File

@@ -277,25 +277,25 @@ pub enum Error {
#[snafu(display("Failed to build CreateExpr on insertion: {}", source))]
BuildCreateExprOnInsertion {
#[snafu(backtrace)]
source: common_insert::error::Error,
source: common_grpc_expr::error::Error,
},
#[snafu(display("Failed to find new columns on insertion: {}", source))]
FindNewColumnsOnInsertion {
#[snafu(backtrace)]
source: common_insert::error::Error,
source: common_grpc_expr::error::Error,
},
#[snafu(display("Failed to deserialize insert batching: {}", source))]
DeserializeInsertBatch {
#[snafu(backtrace)]
source: common_insert::error::Error,
source: common_grpc_expr::error::Error,
},
#[snafu(display("Failed to deserialize insert batching: {}", source))]
InsertBatchToRequest {
#[snafu(backtrace)]
source: common_insert::error::Error,
source: common_grpc_expr::error::Error,
},
#[snafu(display("Failed to find catalog by name: {}", catalog_name))]
@@ -427,6 +427,15 @@ pub enum Error {
#[snafu(display("Missing meta_client_opts section in config"))]
MissingMetasrvOpts { backtrace: Backtrace },
#[snafu(display("Failed to convert AlterExpr to AlterRequest, source: {}", source))]
AlterExprToRequest {
#[snafu(backtrace)]
source: common_grpc_expr::error::Error,
},
#[snafu(display("Failed to find leaders when altering table, table: {}", table))]
LeaderNotFound { table: String, backtrace: Backtrace },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -514,6 +523,8 @@ impl ErrorExt for Error {
source.status_code()
}
Error::MissingMetasrvOpts { .. } => StatusCode::InvalidArguments,
Error::AlterExprToRequest { source, .. } => source.status_code(),
Error::LeaderNotFound { .. } => StatusCode::StorageUnavailable,
}
}

View File

@@ -61,7 +61,7 @@ impl CreateExprFactory for DefaultCreateExprFactory {
batch: &[InsertBatch],
) -> Result<CreateExpr> {
let table_id = None;
let create_expr = common_insert::build_create_expr_from_insertion(
let create_expr = common_grpc_expr::build_create_expr_from_insertion(
catalog_name,
schema_name,
table_id,

View File

@@ -268,11 +268,15 @@ impl Instance {
/// Handle alter expr
pub async fn handle_alter(&self, expr: AlterExpr) -> Result<Output> {
self.admin(expr.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME))
.alter(expr)
.await
.and_then(admin_result_to_output)
.context(AlterTableSnafu)
match &self.dist_instance {
Some(dist_instance) => dist_instance.handle_alter_table(expr).await,
None => self
.admin(expr.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME))
.alter(expr)
.await
.and_then(admin_result_to_output)
.context(AlterTableSnafu),
}
}
/// Handle batch inserts
@@ -333,8 +337,8 @@ impl Instance {
region_number: u32,
values: &insert_expr::Values,
) -> Result<Output> {
let insert_batches =
common_insert::insert_batches(&values.values).context(DeserializeInsertBatchSnafu)?;
let insert_batches = common_grpc_expr::insert_batches(&values.values)
.context(DeserializeInsertBatchSnafu)?;
self.create_or_alter_table_on_demand(
catalog_name,
schema_name,
@@ -399,8 +403,9 @@ impl Instance {
}
Some(table) => {
let schema = table.schema();
if let Some(add_columns) = common_insert::find_new_columns(&schema, insert_batches)
.context(FindNewColumnsOnInsertionSnafu)?
if let Some(add_columns) =
common_grpc_expr::find_new_columns(&schema, insert_batches)
.context(FindNewColumnsOnInsertionSnafu)?
{
info!(
"Find new columns {:?} on insertion, try to alter table: {}.{}.{}",

View File

@@ -16,7 +16,8 @@ use std::collections::HashMap;
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::{CreateDatabaseExpr, CreateExpr};
use api::v1::{AlterExpr, CreateDatabaseExpr, CreateExpr};
use catalog::CatalogList;
use chrono::DateTime;
use client::admin::{admin_result_to_output, Admin};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
@@ -42,10 +43,12 @@ use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
use crate::catalog::FrontendCatalogManager;
use crate::datanode::DatanodeClients;
use crate::error::{
self, CatalogEntrySerdeSnafu, ColumnDataTypeSnafu, PrimaryKeyNotFoundSnafu, RequestMetaSnafu,
Result, StartMetaClientSnafu,
self, CatalogEntrySerdeSnafu, CatalogNotFoundSnafu, CatalogSnafu, ColumnDataTypeSnafu,
PrimaryKeyNotFoundSnafu, RequestMetaSnafu, Result, SchemaNotFoundSnafu, StartMetaClientSnafu,
TableNotFoundSnafu,
};
use crate::partitioning::{PartitionBound, PartitionDef};
use crate::table::DistTable;
#[derive(Clone)]
pub(crate) struct DistInstance {
@@ -166,6 +169,34 @@ impl DistInstance {
Ok(Output::AffectedRows(1))
}
pub async fn handle_alter_table(&self, expr: AlterExpr) -> Result<Output> {
let catalog_name = expr.catalog_name.as_deref().unwrap_or(DEFAULT_CATALOG_NAME);
let schema_name = expr.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME);
let table_name = expr.table_name.as_str();
let table = self
.catalog_manager
.catalog(catalog_name)
.context(CatalogSnafu)?
.context(CatalogNotFoundSnafu { catalog_name })?
.schema(schema_name)
.context(CatalogSnafu)?
.context(SchemaNotFoundSnafu {
schema_info: format!("{}.{}", catalog_name, schema_name),
})?
.table(table_name)
.context(CatalogSnafu)?
.context(TableNotFoundSnafu {
table_name: format!("{}.{}.{}", catalog_name, schema_name, table_name),
})?;
let dist_table = table
.as_any()
.downcast_ref::<DistTable>()
.expect("Table impl must be DistTable in distributed mode");
dist_table.alter_by_expr(expr).await?;
Ok(Output::AffectedRows(0))
}
async fn create_table_in_meta(
&self,
create_table: &CreateExpr,

View File

@@ -20,7 +20,7 @@ use api::v1::InsertExpr;
use async_trait::async_trait;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_error::prelude::BoxedError;
use common_insert::column_to_vector;
use common_grpc_expr::column_to_vector;
use servers::influxdb::InfluxdbRequest;
use servers::query_handler::InfluxdbLineProtocolHandler;
use servers::{error as server_error, Mode};
@@ -66,7 +66,7 @@ impl Instance {
for insert in inserts {
let self_clone = self.clone();
let insert_batches = match &insert.expr.unwrap() {
Expr::Values(values) => common_insert::insert_batches(&values.values)
Expr::Values(values) => common_grpc_expr::insert_batches(&values.values)
.context(DeserializeInsertBatchSnafu)?,
Expr::Sql(_) => unreachable!(),
};

View File

@@ -18,13 +18,17 @@ use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use api::v1::AlterExpr;
use async_trait::async_trait;
use client::admin::Admin;
use client::Database;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_query::error::Result as QueryResult;
use common_query::logical_plan::Expr;
use common_query::physical_plan::{PhysicalPlan, PhysicalPlanRef};
use common_recordbatch::adapter::AsyncRecordBatchStreamAdapter;
use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use common_telemetry::debug;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::logical_plan::Expr as DfExpr;
use datafusion::physical_plan::{
@@ -43,7 +47,7 @@ use table::Table;
use tokio::sync::RwLock;
use crate::datanode::DatanodeClients;
use crate::error::{self, Error, Result};
use crate::error::{self, Error, LeaderNotFoundSnafu, RequestDatanodeSnafu, Result};
use crate::partitioning::columns::RangeColumnsPartitionRule;
use crate::partitioning::range::RangePartitionRule;
use crate::partitioning::{
@@ -348,6 +352,36 @@ impl DistTable {
};
Ok(partition_rule)
}
/// Define a `alter_by_expr` instead of impl [`Table::alter`] to avoid redundant conversion between
/// [`table::requests::AlterTableRequest`] and [`AlterExpr`].
pub(crate) async fn alter_by_expr(&self, expr: AlterExpr) -> Result<()> {
let table_routes = self.table_routes.get_route(&self.table_name).await?;
let leaders = table_routes.find_leaders();
ensure!(
!leaders.is_empty(),
LeaderNotFoundSnafu {
table: format!(
"{:?}.{:?}.{}",
expr.catalog_name, expr.schema_name, expr.table_name
)
}
);
for datanode in leaders {
let admin = Admin::new(
DEFAULT_CATALOG_NAME,
self.datanode_clients.get_client(&datanode).await,
);
debug!("Sent alter table {:?} to {:?}", expr, admin);
let result = admin
.alter(expr.clone())
.await
.context(RequestDatanodeSnafu)?;
debug!("Alter table result: {:?}", result);
// TODO(hl): We should further check and track alter result in some global DDL task tracker
}
Ok(())
}
}
fn project_schema(table_schema: SchemaRef, projection: &Option<Vec<usize>>) -> SchemaRef {

View File

@@ -69,7 +69,8 @@ pub trait Table: Send + Sync {
Ok(FilterPushDownType::Unsupported)
}
async fn alter(&self, _request: AlterTableRequest) -> Result<()> {
async fn alter(&self, request: AlterTableRequest) -> Result<()> {
let _ = request;
unimplemented!()
}
}