Compare commits

..

5 Commits

Author SHA1 Message Date
Ning Sun
ea1896493b feat: allow multiple sql statements in query string (#699)
* feat: allow multiple sql statement in query string

* test: add a test for multiple statement call

* feat: add temprary workaround for standalone mode

* fix: resolve sql parser issue temporarily

* Update src/datanode/src/instance/sql.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* fix: adopt new sql handler

* refactor: revert changes in query engine

* refactor: assume sql-statement 1-1 on datanode

* test: use frontend for integration test

* refactor: add statement execution api for explicit single statement call

* fix: typo

* refactor: rename query method

* test: add test case for error

* test: data type change adoption

* chore: add todo from review

* chore: remove obsolete comments

* fix: resolve resolve issues

Co-authored-by: Yingwen <realevenyag@gmail.com>
2022-12-16 19:50:20 +08:00
Jiachun Feng
66bca11401 refactor: remove optional from the protos (#756) 2022-12-16 15:47:51 +08:00
Yingwen
7c16a4a17b refactor(storage): Move write_batch::codec to a separate file (#757)
* refactor(storage): Move write_batch::codec to a separate file

* chore: move new_test_batch to write_batch mod
2022-12-16 15:32:59 +08:00
dennis zhuang
28bd7404ad feat: change column's default property to nullable (#751)
* feat: change column's default property to nullable

* chore: use all instead of any

* fix: compile error

* fix: dependencies order in cargo
2022-12-16 11:17:01 +08:00
Lei, HUANG
0653301754 feat: replace arrow2 with official implementation 🎉 (#753)
* chore: kick off. change datafusion/arrow/parquet to target version

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* chore: replace one last datafusion dep

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* feat: arrow_array switch to arrow

* chore: update dep of binary vector

* chore: fix wrong merge commit

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* feat: Switch to datatypes2

* feat: Make recordbatch compile

* chore: sort Cargo.toml

* feat: Fix common::recordbatch compiler errors

* feat: Fix recordbatch test compiling issue

* fix: api crate (#708)

* fix: rename ConcreteDataType::timestamp_millis_type to ConcreteDataType::timestamp_millisecond_type. fix other warnings regarding timestamp

* fix: revert changes in datatypes2

* fix: helper

* chore: delete datatypes based on arrow2

* feat: Fix some compiler errors in common::query (#710)

* feat: Fix some compiler errors in common::query

* feat: test_collect use vectors api

* fix: common-query subcrate (#712)

* fix: record batch adapter

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix error enum

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: Fix common::query compiler errors (#713)

* feat: Move conversion to ScalarValue to value.rs

* fix: Fix common::query compiler errors

This commit also make InnerError pub(crate)

* feat: Implements diff accumulator using WrapperType (#715)

* feat: Remove usage of opaque error from common::recordbatch

* feat: Remove opaque error from common::query

* feat: Fix diff compiler errors

Now common_function just use common_query's Error and Result. Adds
a LargestType associated type to LogicalPrimitiveType to get the largest
type a logical primitive type can cast to.

* feat: Remove LargestType from NativeType trait

* chore: Update comments

* feat: Restrict Scalar::RefType of WrapperType to itself

Add trait bound `for<'a> Scalar<RefType<'a> = Self>` to WrapperType

* chore: Address CR comments

* chore: Format codes

* fix: fix compile error for mean/polyval/pow/interp ops

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Revert "fix: fix compile error for mean/polyval/pow/interp ops"

This reverts commit fb0b4eb826.

* fix: Fix compiler errors in argmax/rate/median/norm_cdf (#716)

* fix: Fix compiler errors in argmax/rate/median/norm_cdf

* chore: Address CR comments

* fix: fix compile error for mean/polyval/pow/interp ops (#717)

* fix: fix compile error for mean/polyval/pow/interp ops

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* simplify type bounds

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: fix argmin/percentile/clip/interp/scipy_stats_norm_pdf errors (#718)

fix: fix argmin/percentile/clip/interp/scipy_stats_norm_pdf compiler errors

* fix: fix other compile error in common-function (#719)

* further fixing

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix all compile errors in common function

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: Fix tests and clippy for common-function subcrate (#726)

* further fixing

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix all compile errors in common function

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* revert test changes

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: row group pruning (#725)

* fix: row group pruning

* chore: use macro to simplify stats implemetation

* fxi: CR comments

* fix: row group metadata length mismatch

* fix: simplify code

* fix: Fix common::grpc compiler errors (#722)

* fix: Fix common::grpc compiler errors

This commit refactors RecordBatch and holds vectors in the RecordBatch
struct, so we don't need to cast the array to vector when doing
serialization or iterating the batch.

Now we use the vector API instead of the arrow API in grpc crate.

* chore: Address CR comments

* fix common record batch

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: Fix compile error in server subcrate (#727)

* fix: Fix compile error in server subcrate

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unused type alias

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* explicitly panic

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/storage/src/sst/parquet.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>

* fix: Fix common grpc expr (#730)

* fix compile errors

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* rename fn names

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix styles

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix wranings in common-time

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: pre-cast to avoid tremendous match arms (#734)

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* feat: upgrade storage crate to arrow and parquet offcial impl (#738)

* fix: compile erros

* fix: parquet reader and writer

* fix: parquet reader and writer

* fix: WriteBatch IPC encode/decode

* fix: clippy errors in storage subcrate

* chore: remove suspicious unwrap

* fix: some cr comments

* fix: CR comments

* fix: CR comments

* fix: Fix compiler errors in catalog and mito crates (#742)

* fix: Fix compiler errors in mito

* fix: Fix compiler errors in catalog crate

* style: Fix clippy

* chore: Fix use

* Merge pull request #745

* fix nyc-taxi and util

* Merge branch 'replace-arrow2' into fix-others

* fix substrait

* fix warnings and error in test

* fix: Fix imports in optimizer.rs

* fix: errors in optimzer

* fix: remove unwrap

* fix: Fix compiler errors in query crate (#746)

* fix: Fix compiler errors in state.rs

* fix: fix compiler errors in state

* feat: upgrade sqlparser to 0.26

* fix: fix datafusion engine compiler errors

* fix: Fix some tests in query crate

* fix: Fix all warnings in tests

* feat: Remove `Type` from timestamp's type name

* fix: fix query tests

Now datafusion already supports median, so this commit also remove the
median function

* style: Fix clippy

* feat: Remove RecordBatch::pretty_print

* chore: Address CR comments

* Update src/query/src/query_engine/state.rs

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* fix: frontend compile errors (#747)

fix: fix compile errors in frontend

* fix: Fix compiler errors in script crate (#749)

* fix: Fix compiler errors in state.rs

* fix: fix compiler errors in state

* feat: upgrade sqlparser to 0.26

* fix: fix datafusion engine compiler errors

* fix: Fix some tests in query crate

* fix: Fix all warnings in tests

* feat: Remove `Type` from timestamp's type name

* fix: fix query tests

Now datafusion already supports median, so this commit also remove the
median function

* style: Fix clippy

* feat: Remove RecordBatch::pretty_print

* chore: Address CR comments

* feat: Add column_by_name to RecordBatch

* feat: modify select_from_rb

* feat: Fix some compiler errors in vector.rs

* feat: Fix more compiler errors in vector.rs

* fix: fix table.rs

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: Fix compiler errors in coprocessor

* fix: Fix some compiler errors

* fix: Fix compiler errors in script

* chore: Remove unused imports and format code

* test: disable interval tests

* test: Fix test_compile_execute test

* style: Fix clippy

* feat: Support interval

* feat: Add RecordBatch::columns and fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* fix: Fix All The Tests! (#752)

* fix: Fix several tests compile errors

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: some compile errors in tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: compile errors in frontend tests

* fix: compile errors in frontend tests

* test: Fix tests in api and common-query

* test: Fix test in sql crate

* fix: resolve substrait error

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* chore: add more test

* test: Fix tests in servers

* fix instance_test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* test: Fix tests in tests-integration

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Lei, HUANG <mrsatangel@gmail.com>
Co-authored-by: evenyag <realevenyag@gmail.com>

* fix: clippy errors

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: evenyag <realevenyag@gmail.com>
2022-12-15 18:49:12 +08:00
41 changed files with 1080 additions and 712 deletions

1
Cargo.lock generated
View File

@@ -6312,6 +6312,7 @@ dependencies = [
"sha1",
"snafu",
"snap",
"sql",
"strum",
"table",
"tempdir",

View File

@@ -28,7 +28,7 @@ use arrow::record_batch::RecordBatch;
use clap::Parser;
use client::admin::Admin;
use client::api::v1::column::Values;
use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateExpr, InsertExpr};
use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertExpr, TableId};
use client::{Client, Database, Select};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
@@ -219,126 +219,126 @@ fn build_values(column: &ArrayRef) -> Values {
}
}
fn create_table_expr() -> CreateExpr {
CreateExpr {
catalog_name: Some(CATALOG_NAME.to_string()),
schema_name: Some(SCHEMA_NAME.to_string()),
fn create_table_expr() -> CreateTableExpr {
CreateTableExpr {
catalog_name: CATALOG_NAME.to_string(),
schema_name: SCHEMA_NAME.to_string(),
table_name: TABLE_NAME.to_string(),
desc: None,
desc: "".to_string(),
column_defs: vec![
ColumnDef {
name: "VendorID".to_string(),
datatype: ColumnDataType::Int64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "tpep_pickup_datetime".to_string(),
datatype: ColumnDataType::Int64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "tpep_dropoff_datetime".to_string(),
datatype: ColumnDataType::Int64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "passenger_count".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "trip_distance".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "RatecodeID".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "store_and_fwd_flag".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "PULocationID".to_string(),
datatype: ColumnDataType::Int64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "DOLocationID".to_string(),
datatype: ColumnDataType::Int64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "payment_type".to_string(),
datatype: ColumnDataType::Int64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "fare_amount".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "extra".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "mta_tax".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "tip_amount".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "tolls_amount".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "improvement_surcharge".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "total_amount".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "congestion_surcharge".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "airport_fee".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
],
time_index: "tpep_pickup_datetime".to_string(),
@@ -346,7 +346,7 @@ fn create_table_expr() -> CreateExpr {
create_if_not_exists: false,
table_options: Default::default(),
region_ids: vec![0],
table_id: Some(0),
table_id: Some(TableId { id: 0 }),
}
}

View File

@@ -17,7 +17,7 @@ message AdminResponse {
message AdminExpr {
ExprHeader header = 1;
oneof expr {
CreateExpr create = 2;
CreateTableExpr create_table = 2;
AlterExpr alter = 3;
CreateDatabaseExpr create_database = 4;
DropTableExpr drop_table = 5;
@@ -31,24 +31,23 @@ message AdminResult {
}
}
// TODO(hl): rename to CreateTableExpr
message CreateExpr {
optional string catalog_name = 1;
optional string schema_name = 2;
message CreateTableExpr {
string catalog_name = 1;
string schema_name = 2;
string table_name = 3;
optional string desc = 4;
string desc = 4;
repeated ColumnDef column_defs = 5;
string time_index = 6;
repeated string primary_keys = 7;
bool create_if_not_exists = 8;
map<string, string> table_options = 9;
optional uint32 table_id = 10;
TableId table_id = 10;
repeated uint32 region_ids = 11;
}
message AlterExpr {
optional string catalog_name = 1;
optional string schema_name = 2;
string catalog_name = 1;
string schema_name = 2;
string table_name = 3;
oneof kind {
AddColumns add_columns = 4;
@@ -62,6 +61,11 @@ message DropTableExpr {
string table_name = 3;
}
message CreateDatabaseExpr {
//TODO(hl): maybe rename to schema_name?
string database_name = 1;
}
message AddColumns {
repeated AddColumn add_columns = 1;
}
@@ -79,7 +83,6 @@ message DropColumn {
string name = 1;
}
message CreateDatabaseExpr {
//TODO(hl): maybe rename to schema_name?
string database_name = 1;
message TableId {
uint32 id = 1;
}

View File

@@ -59,7 +59,7 @@ message ColumnDef {
string name = 1;
ColumnDataType datatype = 2;
bool is_nullable = 3;
optional bytes default_constraint = 4;
bytes default_constraint = 4;
}
enum ColumnDataType {

View File

@@ -23,12 +23,13 @@ impl ColumnDef {
pub fn try_as_column_schema(&self) -> Result<ColumnSchema> {
let data_type = ColumnDataTypeWrapper::try_new(self.datatype)?;
let constraint = match &self.default_constraint {
None => None,
Some(v) => Some(
ColumnDefaultConstraint::try_from(&v[..])
let constraint = if self.default_constraint.is_empty() {
None
} else {
Some(
ColumnDefaultConstraint::try_from(self.default_constraint.as_slice())
.context(error::ConvertColumnDefaultConstraintSnafu { column: &self.name })?,
),
)
};
ColumnSchema::new(&self.name, data_type.into(), self.is_nullable)

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::{ColumnDataType, ColumnDef, CreateExpr};
use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, TableId};
use client::admin::Admin;
use client::{Client, Database};
use prost_09::Message;
@@ -33,36 +33,36 @@ fn main() {
async fn run() {
let client = Client::with_urls(vec!["127.0.0.1:3001"]);
let create_table_expr = CreateExpr {
catalog_name: Some("greptime".to_string()),
schema_name: Some("public".to_string()),
let create_table_expr = CreateTableExpr {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: "test_logical_dist_exec".to_string(),
desc: None,
desc: "".to_string(),
column_defs: vec![
ColumnDef {
name: "timestamp".to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
is_nullable: false,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "key".to_string(),
datatype: ColumnDataType::Uint64 as i32,
is_nullable: false,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "value".to_string(),
datatype: ColumnDataType::Uint64 as i32,
is_nullable: false,
default_constraint: None,
default_constraint: vec![],
},
],
time_index: "timestamp".to_string(),
primary_keys: vec!["key".to_string()],
create_if_not_exists: false,
table_options: Default::default(),
table_id: Some(1024),
table_id: Some(TableId { id: 1024 }),
region_ids: vec![0],
};

View File

@@ -34,13 +34,13 @@ impl Admin {
}
}
pub async fn create(&self, expr: CreateExpr) -> Result<AdminResult> {
pub async fn create(&self, expr: CreateTableExpr) -> Result<AdminResult> {
let header = ExprHeader {
version: PROTOCOL_VERSION,
};
let expr = AdminExpr {
header: Some(header),
expr: Some(admin_expr::Expr::Create(expr)),
expr: Some(admin_expr::Expr::CreateTable(expr)),
};
self.do_request(expr).await
}

View File

@@ -15,7 +15,7 @@
use std::sync::Arc;
use api::v1::alter_expr::Kind;
use api::v1::{AlterExpr, CreateExpr, DropColumns};
use api::v1::{AlterExpr, CreateTableExpr, DropColumns};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
use snafu::{ensure, OptionExt, ResultExt};
@@ -29,6 +29,16 @@ use crate::error::{
/// Convert an [`AlterExpr`] to an optional [`AlterTableRequest`]
pub fn alter_expr_to_request(expr: AlterExpr) -> Result<Option<AlterTableRequest>> {
let catalog_name = if expr.catalog_name.is_empty() {
None
} else {
Some(expr.catalog_name)
};
let schema_name = if expr.schema_name.is_empty() {
None
} else {
Some(expr.schema_name)
};
match expr.kind {
Some(Kind::AddColumns(add_columns)) => {
let add_column_requests = add_columns
@@ -57,8 +67,8 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<Option<AlterTableRequest
};
let request = AlterTableRequest {
catalog_name: expr.catalog_name,
schema_name: expr.schema_name,
catalog_name,
schema_name,
table_name: expr.table_name,
alter_kind,
};
@@ -70,8 +80,8 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<Option<AlterTableRequest
};
let request = AlterTableRequest {
catalog_name: expr.catalog_name,
schema_name: expr.schema_name,
catalog_name,
schema_name,
table_name: expr.table_name,
alter_kind,
};
@@ -81,7 +91,7 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<Option<AlterTableRequest
}
}
pub fn create_table_schema(expr: &CreateExpr) -> Result<SchemaRef> {
pub fn create_table_schema(expr: &CreateTableExpr) -> Result<SchemaRef> {
let column_schemas = expr
.column_defs
.iter()
@@ -119,7 +129,10 @@ pub fn create_table_schema(expr: &CreateExpr) -> Result<SchemaRef> {
))
}
pub fn create_expr_to_request(table_id: TableId, expr: CreateExpr) -> Result<CreateTableRequest> {
pub fn create_expr_to_request(
table_id: TableId,
expr: CreateTableExpr,
) -> Result<CreateTableRequest> {
let schema = create_table_schema(&expr)?;
let primary_key_indices = expr
.primary_keys
@@ -134,12 +147,19 @@ pub fn create_expr_to_request(table_id: TableId, expr: CreateExpr) -> Result<Cre
})
.collect::<Result<Vec<usize>>>()?;
let catalog_name = expr
.catalog_name
.unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string());
let schema_name = expr
.schema_name
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());
let mut catalog_name = expr.catalog_name;
if catalog_name.is_empty() {
catalog_name = DEFAULT_CATALOG_NAME.to_string();
}
let mut schema_name = expr.schema_name;
if schema_name.is_empty() {
schema_name = DEFAULT_SCHEMA_NAME.to_string();
}
let desc = if expr.desc.is_empty() {
None
} else {
Some(expr.desc)
};
let region_ids = if expr.region_ids.is_empty() {
vec![0]
@@ -152,7 +172,7 @@ pub fn create_expr_to_request(table_id: TableId, expr: CreateExpr) -> Result<Cre
catalog_name,
schema_name,
table_name: expr.table_name,
desc: expr.desc,
desc,
schema,
region_numbers: region_ids,
primary_key_indices,
@@ -171,8 +191,8 @@ mod tests {
#[test]
fn test_alter_expr_to_request() {
let expr = AlterExpr {
catalog_name: None,
schema_name: None,
catalog_name: "".to_string(),
schema_name: "".to_string(),
table_name: "monitor".to_string(),
kind: Some(Kind::AddColumns(AddColumns {
@@ -181,7 +201,7 @@ mod tests {
name: "mem_usage".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: false,
default_constraint: None,
default_constraint: vec![],
}),
is_key: false,
}],
@@ -208,8 +228,8 @@ mod tests {
#[test]
fn test_drop_column_expr() {
let expr = AlterExpr {
catalog_name: Some("test_catalog".to_string()),
schema_name: Some("test_schema".to_string()),
catalog_name: "test_catalog".to_string(),
schema_name: "test_schema".to_string(),
table_name: "monitor".to_string(),
kind: Some(Kind::DropColumns(DropColumns {

View File

@@ -18,7 +18,7 @@ use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::column::{SemanticType, Values};
use api::v1::{AddColumn, AddColumns, Column, ColumnDataType, ColumnDef, CreateExpr};
use api::v1::{AddColumn, AddColumns, Column, ColumnDataType, ColumnDef, CreateTableExpr};
use common_base::BitVec;
use common_time::timestamp::Timestamp;
use common_time::{Date, DateTime};
@@ -45,7 +45,7 @@ fn build_column_def(column_name: &str, datatype: i32, nullable: bool) -> ColumnD
name: column_name.to_string(),
datatype,
is_nullable: nullable,
default_constraint: None,
default_constraint: vec![],
}
}
@@ -214,7 +214,7 @@ pub fn build_create_expr_from_insertion(
table_id: Option<TableId>,
table_name: &str,
columns: &[Column],
) -> Result<CreateExpr> {
) -> Result<CreateTableExpr> {
let mut new_columns: HashSet<String> = HashSet::default();
let mut column_defs = Vec::default();
let mut primary_key_indices = Vec::default();
@@ -263,17 +263,17 @@ pub fn build_create_expr_from_insertion(
.map(|idx| columns[*idx].column_name.clone())
.collect::<Vec<_>>();
let expr = CreateExpr {
catalog_name: Some(catalog_name.to_string()),
schema_name: Some(schema_name.to_string()),
let expr = CreateTableExpr {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
desc: Some("Created on insertion".to_string()),
desc: "Created on insertion".to_string(),
column_defs,
time_index: timestamp_field_name,
primary_keys,
create_if_not_exists: true,
table_options: Default::default(),
table_id,
table_id: table_id.map(|id| api::v1::TableId { id }),
region_ids: vec![0], // TODO:(hl): region id should be allocated by frontend
};
@@ -516,9 +516,9 @@ mod tests {
build_create_expr_from_insertion("", "", table_id, table_name, &insert_batch.0)
.unwrap();
assert_eq!(table_id, create_expr.table_id);
assert_eq!(table_id, create_expr.table_id.map(|x| x.id));
assert_eq!(table_name, create_expr.table_name);
assert_eq!(Some("Created on insertion".to_string()), create_expr.desc);
assert_eq!("Created on insertion".to_string(), create_expr.desc);
assert_eq!(
vec![create_expr.column_defs[0].name.clone()],
create_expr.primary_keys

View File

@@ -188,7 +188,9 @@ impl GrpcQueryHandler for Instance {
impl GrpcAdminHandler for Instance {
async fn exec_admin_request(&self, expr: AdminExpr) -> servers::error::Result<AdminResult> {
let admin_resp = match expr.expr {
Some(admin_expr::Expr::Create(create_expr)) => self.handle_create(create_expr).await,
Some(admin_expr::Expr::CreateTable(create_expr)) => {
self.handle_create(create_expr).await
}
Some(admin_expr::Expr::Alter(alter_expr)) => self.handle_alter(alter_expr).await,
Some(admin_expr::Expr::CreateDatabase(create_database_expr)) => {
self.execute_create_database(create_database_expr).await

View File

@@ -33,12 +33,11 @@ use crate::metric;
use crate::sql::SqlRequest;
impl Instance {
pub async fn execute_sql(&self, sql: &str, query_ctx: QueryContextRef) -> Result<Output> {
let stmt = self
.query_engine
.sql_to_statement(sql)
.context(ExecuteSqlSnafu)?;
pub async fn execute_stmt(
&self,
stmt: Statement,
query_ctx: QueryContextRef,
) -> Result<Output> {
match stmt {
Statement::Query(_) => {
let logical_plan = self
@@ -153,6 +152,14 @@ impl Instance {
}
}
}
pub async fn execute_sql(&self, sql: &str, query_ctx: QueryContextRef) -> Result<Output> {
let stmt = self
.query_engine
.sql_to_statement(sql)
.context(ExecuteSqlSnafu)?;
self.execute_stmt(stmt, query_ctx).await
}
}
// TODO(LFC): Refactor consideration: move this function to some helper mod,
@@ -193,15 +200,33 @@ impl SqlQueryHandler for Instance {
&self,
query: &str,
query_ctx: QueryContextRef,
) -> servers::error::Result<Output> {
) -> Vec<servers::error::Result<Output>> {
let _timer = timer!(metric::METRIC_HANDLE_SQL_ELAPSED);
self.execute_sql(query, query_ctx)
// we assume sql string has only 1 statement in datanode
let result = self
.execute_sql(query, query_ctx)
.await
.map_err(|e| {
error!(e; "Instance failed to execute sql");
BoxedError::new(e)
})
.context(servers::error::ExecuteQuerySnafu { query })
.context(servers::error::ExecuteQuerySnafu { query });
vec![result]
}
async fn do_statement_query(
&self,
stmt: Statement,
query_ctx: QueryContextRef,
) -> servers::error::Result<Output> {
let _timer = timer!(metric::METRIC_HANDLE_SQL_ELAPSED);
self.execute_stmt(stmt, query_ctx)
.await
.map_err(|e| {
error!(e; "Instance failed to execute sql");
BoxedError::new(e)
})
.context(servers::error::ExecuteStatementSnafu)
}
}

View File

@@ -15,7 +15,7 @@
use std::sync::Arc;
use api::result::AdminResultBuilder;
use api::v1::{AdminResult, AlterExpr, CreateExpr, DropTableExpr};
use api::v1::{AdminResult, AlterExpr, CreateTableExpr, DropTableExpr};
use common_error::prelude::{ErrorExt, StatusCode};
use common_grpc_expr::{alter_expr_to_request, create_expr_to_request};
use common_query::Output;
@@ -31,15 +31,15 @@ use crate::sql::SqlRequest;
impl Instance {
/// Handle gRPC create table requests.
pub(crate) async fn handle_create(&self, expr: CreateExpr) -> AdminResult {
pub(crate) async fn handle_create(&self, expr: CreateTableExpr) -> AdminResult {
// Respect CreateExpr's table id and region ids if present, or allocate table id
// from local table id provider and set region id to 0.
let table_id = if let Some(table_id) = expr.table_id {
let table_id = if let Some(table_id) = &expr.table_id {
info!(
"Creating table {:?}.{:?}.{:?} with table id from frontend: {}",
expr.catalog_name, expr.schema_name, expr.table_name, table_id
expr.catalog_name, expr.schema_name, expr.table_name, table_id.id
);
table_id
table_id.id
} else {
match self.table_id_provider.as_ref() {
None => {
@@ -157,7 +157,7 @@ impl Instance {
mod tests {
use std::sync::Arc;
use api::v1::{ColumnDataType, ColumnDef};
use api::v1::{ColumnDataType, ColumnDef, TableId};
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_grpc_expr::create_table_schema;
use datatypes::prelude::ConcreteDataType;
@@ -175,7 +175,7 @@ mod tests {
assert_eq!(request.catalog_name, "greptime".to_string());
assert_eq!(request.schema_name, "public".to_string());
assert_eq!(request.table_name, "my-metrics");
assert_eq!(request.desc, Some("blabla".to_string()));
assert_eq!(request.desc, Some("blabla little magic fairy".to_string()));
assert_eq!(request.schema, expected_table_schema());
assert_eq!(request.primary_key_indices, vec![1, 0]);
assert!(request.create_if_not_exists);
@@ -214,7 +214,7 @@ mod tests {
name: "a".to_string(),
datatype: 1024,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
};
let result = column_def.try_as_column_schema();
assert!(matches!(
@@ -226,7 +226,7 @@ mod tests {
name: "a".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
};
let column_schema = column_def.try_as_column_schema().unwrap();
assert_eq!(column_schema.name, "a");
@@ -238,7 +238,7 @@ mod tests {
name: "a".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: Some(default_constraint.clone().try_into().unwrap()),
default_constraint: default_constraint.clone().try_into().unwrap(),
};
let column_schema = column_def.try_as_column_schema().unwrap();
assert_eq!(column_schema.name, "a");
@@ -250,44 +250,46 @@ mod tests {
);
}
fn testing_create_expr() -> CreateExpr {
fn testing_create_expr() -> CreateTableExpr {
let column_defs = vec![
ColumnDef {
name: "host".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: false,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "ts".to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
is_nullable: false,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "cpu".to_string(),
datatype: ColumnDataType::Float32 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "memory".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
];
CreateExpr {
catalog_name: None,
schema_name: None,
CreateTableExpr {
catalog_name: "".to_string(),
schema_name: "".to_string(),
table_name: "my-metrics".to_string(),
desc: Some("blabla".to_string()),
desc: "blabla little magic fairy".to_string(),
column_defs,
time_index: "ts".to_string(),
primary_keys: vec!["ts".to_string(), "host".to_string()],
create_if_not_exists: true,
table_options: Default::default(),
table_id: Some(MIN_USER_TABLE_ID),
table_id: Some(TableId {
id: MIN_USER_TABLE_ID,
}),
region_ids: vec![0],
}
}

View File

@@ -387,6 +387,12 @@ pub enum Error {
source: query::error::Error,
},
#[snafu(display("Failed to execute statement, source: {}", source))]
ExecuteStatement {
#[snafu(backtrace)]
source: query::error::Error,
},
#[snafu(display("Failed to do vector computation, source: {}", source))]
VectorComputation {
#[snafu(backtrace)]
@@ -536,6 +542,7 @@ impl ErrorExt for Error {
Error::DeserializeInsertBatch { source, .. } => source.status_code(),
Error::PrimaryKeyNotFound { .. } => StatusCode::InvalidArguments,
Error::ExecuteSql { source, .. } => source.status_code(),
Error::ExecuteStatement { source, .. } => source.status_code(),
Error::InsertBatchToRequest { source, .. } => source.status_code(),
Error::CollectRecordbatchStream { source } | Error::CreateRecordbatches { source } => {
source.status_code()

View File

@@ -16,7 +16,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::{Column, ColumnDataType, CreateExpr};
use api::v1::{Column, ColumnDataType, CreateTableExpr};
use datatypes::schema::ColumnSchema;
use snafu::{ensure, ResultExt};
use sql::ast::{ColumnDef, TableConstraint};
@@ -32,7 +32,7 @@ pub type CreateExprFactoryRef = Arc<dyn CreateExprFactory + Send + Sync>;
#[async_trait::async_trait]
pub trait CreateExprFactory {
async fn create_expr_by_stmt(&self, stmt: &CreateTable) -> Result<CreateExpr>;
async fn create_expr_by_stmt(&self, stmt: &CreateTable) -> Result<CreateTableExpr>;
async fn create_expr_by_columns(
&self,
@@ -40,7 +40,7 @@ pub trait CreateExprFactory {
schema_name: &str,
table_name: &str,
columns: &[Column],
) -> crate::error::Result<CreateExpr>;
) -> crate::error::Result<CreateTableExpr>;
}
#[derive(Debug)]
@@ -48,7 +48,7 @@ pub struct DefaultCreateExprFactory;
#[async_trait::async_trait]
impl CreateExprFactory for DefaultCreateExprFactory {
async fn create_expr_by_stmt(&self, stmt: &CreateTable) -> Result<CreateExpr> {
async fn create_expr_by_stmt(&self, stmt: &CreateTable) -> Result<CreateTableExpr> {
create_to_expr(None, vec![0], stmt)
}
@@ -58,7 +58,7 @@ impl CreateExprFactory for DefaultCreateExprFactory {
schema_name: &str,
table_name: &str,
columns: &[Column],
) -> Result<CreateExpr> {
) -> Result<CreateTableExpr> {
let table_id = None;
let create_expr = common_grpc_expr::build_create_expr_from_insertion(
catalog_name,
@@ -78,23 +78,23 @@ fn create_to_expr(
table_id: Option<u32>,
region_ids: Vec<u32>,
create: &CreateTable,
) -> Result<CreateExpr> {
) -> Result<CreateTableExpr> {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(&create.name).context(ParseSqlSnafu)?;
let time_index = find_time_index(&create.constraints)?;
let expr = CreateExpr {
catalog_name: Some(catalog_name),
schema_name: Some(schema_name),
let expr = CreateTableExpr {
catalog_name,
schema_name,
table_name,
desc: None,
desc: "".to_string(),
column_defs: columns_to_expr(&create.columns, &time_index)?,
time_index,
primary_keys: find_primary_keys(&create.constraints)?,
create_if_not_exists: create.if_not_exists,
// TODO(LFC): Fill in other table options.
table_options: HashMap::from([("engine".to_string(), create.engine.clone())]),
table_id,
table_id: table_id.map(|id| api::v1::TableId { id }),
region_ids,
};
Ok(expr)
@@ -171,12 +171,14 @@ fn columns_to_expr(
datatype: datatype as i32,
is_nullable: schema.is_nullable(),
default_constraint: match schema.default_constraint() {
None => None,
Some(v) => Some(v.clone().try_into().context(
ConvertColumnDefaultConstraintSnafu {
column_name: &schema.name,
},
)?),
None => vec![],
Some(v) => {
v.clone()
.try_into()
.context(ConvertColumnDefaultConstraintSnafu {
column_name: &schema.name,
})?
}
},
})
})

View File

@@ -25,7 +25,7 @@ use api::v1::alter_expr::Kind;
use api::v1::object_expr::Expr;
use api::v1::{
admin_expr, AddColumns, AdminExpr, AdminResult, AlterExpr, Column, CreateDatabaseExpr,
CreateExpr, DropTableExpr, ExprHeader, InsertExpr, ObjectExpr,
CreateTableExpr, DropTableExpr, ExprHeader, InsertExpr, ObjectExpr,
ObjectResult as GrpcObjectResult,
};
use async_trait::async_trait;
@@ -196,7 +196,7 @@ impl Instance {
/// Handle create expr.
pub async fn handle_create_table(
&self,
mut expr: CreateExpr,
mut expr: CreateTableExpr,
partitions: Option<Partitions>,
) -> Result<Output> {
if let Some(v) = &self.dist_instance {
@@ -206,7 +206,7 @@ impl Instance {
header: Some(ExprHeader {
version: PROTOCOL_VERSION,
}),
expr: Some(admin_expr::Expr::Create(expr)),
expr: Some(admin_expr::Expr::CreateTable(expr)),
};
let result = self
.grpc_admin_handler
@@ -359,8 +359,8 @@ impl Instance {
);
let expr = AlterExpr {
table_name: table_name.to_string(),
schema_name: Some(schema_name.to_string()),
catalog_name: Some(catalog_name.to_string()),
schema_name: schema_name.to_string(),
catalog_name: catalog_name.to_string(),
kind: Some(Kind::AddColumns(add_columns)),
};
@@ -461,31 +461,18 @@ impl FrontendInstance for Instance {
}
}
fn parse_stmt(sql: &str) -> Result<Statement> {
let mut stmt = ParserContext::create_with_dialect(sql, &GenericDialect {})
.context(error::ParseSqlSnafu)?;
// TODO(LFC): Support executing multiple SQL queries,
// which seems to be a major change to our whole server framework?
ensure!(
stmt.len() == 1,
error::InvalidSqlSnafu {
err_msg: "Currently executing multiple SQL queries are not supported."
}
);
Ok(stmt.remove(0))
fn parse_stmt(sql: &str) -> Result<Vec<Statement>> {
ParserContext::create_with_dialect(sql, &GenericDialect {}).context(error::ParseSqlSnafu)
}
#[async_trait]
impl SqlQueryHandler for Instance {
async fn do_query(
impl Instance {
async fn query_statement(
&self,
query: &str,
stmt: Statement,
query_ctx: QueryContextRef,
) -> server_error::Result<Output> {
let stmt = parse_stmt(query)
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })?;
// TODO(sunng87): provide a better form to log or track statement
let query = &format!("{:?}", &stmt);
match stmt {
Statement::CreateDatabase(_)
| Statement::ShowDatabases(_)
@@ -494,7 +481,7 @@ impl SqlQueryHandler for Instance {
| Statement::DescribeTable(_)
| Statement::Explain(_)
| Statement::Query(_) => {
return self.sql_handler.do_query(query, query_ctx).await;
return self.sql_handler.do_statement_query(stmt, query_ctx).await;
}
Statement::Insert(insert) => match self.mode {
Mode::Standalone => {
@@ -569,6 +556,45 @@ impl SqlQueryHandler for Instance {
}
}
#[async_trait]
impl SqlQueryHandler for Instance {
async fn do_query(
&self,
query: &str,
query_ctx: QueryContextRef,
) -> Vec<server_error::Result<Output>> {
match parse_stmt(query)
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })
{
Ok(stmts) => {
let mut results = Vec::with_capacity(stmts.len());
for stmt in stmts {
match self.query_statement(stmt, query_ctx.clone()).await {
Ok(output) => results.push(Ok(output)),
Err(e) => {
results.push(Err(e));
break;
}
}
}
results
}
Err(e) => {
vec![Err(e)]
}
}
}
async fn do_statement_query(
&self,
stmt: Statement,
query_ctx: QueryContextRef,
) -> server_error::Result<Output> {
self.query_statement(stmt, query_ctx).await
}
}
#[async_trait]
impl ScriptHandler for Instance {
async fn insert_script(&self, name: &str, script: &str) -> server_error::Result<()> {
@@ -630,7 +656,7 @@ impl GrpcAdminHandler for Instance {
async fn exec_admin_request(&self, mut expr: AdminExpr) -> server_error::Result<AdminResult> {
// Force the default to be `None` rather than `Some(0)` comes from gRPC decode.
// Related issue: #480
if let Some(api::v1::admin_expr::Expr::Create(create)) = &mut expr.expr {
if let Some(api::v1::admin_expr::Expr::CreateTable(create)) = &mut expr.expr {
create.table_id = None;
}
self.grpc_admin_handler.exec_admin_request(expr).await
@@ -671,6 +697,7 @@ mod tests {
) engine=mito with(regions=1);"#;
let output = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
.await
.remove(0)
.unwrap();
match output {
Output::AffectedRows(rows) => assert_eq!(rows, 1),
@@ -684,6 +711,7 @@ mod tests {
"#;
let output = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
.await
.remove(0)
.unwrap();
match output {
Output::AffectedRows(rows) => assert_eq!(rows, 3),
@@ -693,6 +721,7 @@ mod tests {
let sql = "select * from demo";
let output = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
.await
.remove(0)
.unwrap();
match output {
Output::RecordBatches(_) => {
@@ -720,6 +749,7 @@ mod tests {
let sql = "select * from demo where ts>cast(1000000000 as timestamp)"; // use nanoseconds as where condition
let output = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
.await
.remove(0)
.unwrap();
match output {
Output::RecordBatches(_) => {
@@ -808,7 +838,7 @@ mod tests {
let create_expr = create_expr();
let admin_expr = AdminExpr {
header: Some(ExprHeader::default()),
expr: Some(admin_expr::Expr::Create(create_expr)),
expr: Some(admin_expr::Expr::CreateTable(create_expr)),
};
let result = GrpcAdminHandler::exec_admin_request(&*instance, admin_expr)
.await
@@ -886,48 +916,46 @@ mod tests {
}
}
fn create_expr() -> CreateExpr {
fn create_expr() -> CreateTableExpr {
let column_defs = vec![
GrpcColumnDef {
name: "host".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: false,
default_constraint: None,
default_constraint: vec![],
},
GrpcColumnDef {
name: "cpu".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
GrpcColumnDef {
name: "memory".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
GrpcColumnDef {
name: "disk_util".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: Some(
ColumnDefaultConstraint::Value(Value::from(9.9f64))
.try_into()
.unwrap(),
),
default_constraint: ColumnDefaultConstraint::Value(Value::from(9.9f64))
.try_into()
.unwrap(),
},
GrpcColumnDef {
name: "ts".to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
];
CreateExpr {
catalog_name: None,
schema_name: None,
CreateTableExpr {
catalog_name: "".to_string(),
schema_name: "".to_string(),
table_name: "demo".to_string(),
desc: None,
desc: "".to_string(),
column_defs,
time_index: "ts".to_string(),
primary_keys: vec!["host".to_string()],

View File

@@ -18,8 +18,8 @@ use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::result::AdminResultBuilder;
use api::v1::{
admin_expr, AdminExpr, AdminResult, AlterExpr, CreateDatabaseExpr, CreateExpr, ObjectExpr,
ObjectResult,
admin_expr, AdminExpr, AdminResult, AlterExpr, CreateDatabaseExpr, CreateTableExpr, ObjectExpr,
ObjectResult, TableId,
};
use async_trait::async_trait;
use catalog::helper::{SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue};
@@ -86,7 +86,7 @@ impl DistInstance {
pub(crate) async fn create_table(
&self,
create_table: &mut CreateExpr,
create_table: &mut CreateTableExpr,
partitions: Option<Partitions>,
) -> Result<Output> {
let response = self.create_table_in_meta(create_table, partitions).await?;
@@ -112,7 +112,9 @@ impl DistInstance {
table_name: create_table.table_name.to_string()
}
);
create_table.table_id = Some(table_route.table.id as u32);
create_table.table_id = Some(TableId {
id: table_route.table.id as u32,
});
self.put_table_global_meta(create_table, table_route)
.await?;
@@ -140,14 +142,17 @@ impl DistInstance {
Ok(Output::AffectedRows(0))
}
async fn handle_sql(&self, sql: &str, query_ctx: QueryContextRef) -> Result<Output> {
let stmt = parse_stmt(sql)?;
async fn handle_statement(
&self,
stmt: Statement,
query_ctx: QueryContextRef,
) -> Result<Output> {
match stmt {
Statement::Query(_) => {
let plan = self
.query_engine
.statement_to_plan(stmt, query_ctx)
.context(error::ExecuteSqlSnafu { sql })?;
.context(error::ExecuteStatementSnafu {})?;
self.query_engine.execute(&plan).await
}
Statement::CreateDatabase(stmt) => {
@@ -171,7 +176,30 @@ impl DistInstance {
}
_ => unreachable!(),
}
.context(error::ExecuteSqlSnafu { sql })
.context(error::ExecuteStatementSnafu)
}
async fn handle_sql(&self, sql: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
let stmts = parse_stmt(sql);
match stmts {
Ok(stmts) => {
let mut results = Vec::with_capacity(stmts.len());
for stmt in stmts {
let result = self.handle_statement(stmt, query_ctx.clone()).await;
let is_err = result.is_err();
results.push(result);
if is_err {
break;
}
}
results
}
Err(e) => vec![Err(e)],
}
}
/// Handles distributed database creation
@@ -194,8 +222,16 @@ impl DistInstance {
}
async fn handle_alter_table(&self, expr: AlterExpr) -> Result<AdminResult> {
let catalog_name = expr.catalog_name.as_deref().unwrap_or(DEFAULT_CATALOG_NAME);
let schema_name = expr.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME);
let catalog_name = if expr.catalog_name.is_empty() {
DEFAULT_CATALOG_NAME
} else {
expr.catalog_name.as_str()
};
let schema_name = if expr.schema_name.is_empty() {
DEFAULT_SCHEMA_NAME
} else {
expr.schema_name.as_str()
};
let table_name = expr.table_name.as_str();
let table = self
.catalog_manager
@@ -223,20 +259,18 @@ impl DistInstance {
async fn create_table_in_meta(
&self,
create_table: &CreateExpr,
create_table: &CreateTableExpr,
partitions: Option<Partitions>,
) -> Result<RouteResponse> {
let table_name = TableName::new(
create_table
.catalog_name
.clone()
.unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string()),
create_table
.schema_name
.clone()
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string()),
create_table.table_name.clone(),
);
let mut catalog_name = create_table.catalog_name.clone();
if catalog_name.is_empty() {
catalog_name = DEFAULT_CATALOG_NAME.to_string();
}
let mut schema_name = create_table.schema_name.clone();
if schema_name.is_empty() {
schema_name = DEFAULT_SCHEMA_NAME.to_string();
}
let table_name = TableName::new(catalog_name, schema_name, create_table.table_name.clone());
let partitions = parse_partitions(create_table, partitions)?;
let request = MetaCreateRequest {
@@ -252,7 +286,7 @@ impl DistInstance {
// TODO(LFC): Maybe move this to FrontendCatalogManager's "register_table" method?
async fn put_table_global_meta(
&self,
create_table: &CreateExpr,
create_table: &CreateTableExpr,
table_route: &TableRoute,
) -> Result<()> {
let table_name = &table_route.table.table_name;
@@ -274,10 +308,12 @@ impl DistInstance {
.await
.context(CatalogSnafu)?
{
let existing_bytes = existing.unwrap(); //this unwrap is safe since we compare with empty bytes and failed
let existing_bytes = existing.unwrap(); // this unwrap is safe since we compare with empty bytes and failed
let existing_value =
TableGlobalValue::from_bytes(&existing_bytes).context(CatalogEntrySerdeSnafu)?;
if existing_value.table_info.ident.table_id != create_table.table_id.unwrap() {
if existing_value.table_info.ident.table_id
!= create_table.table_id.as_ref().unwrap().id
{
error!(
"Table with name {} already exists, value in catalog: {:?}",
key, existing_bytes
@@ -300,11 +336,26 @@ impl SqlQueryHandler for DistInstance {
&self,
query: &str,
query_ctx: QueryContextRef,
) -> server_error::Result<Output> {
) -> Vec<server_error::Result<Output>> {
self.handle_sql(query, query_ctx)
.await
.into_iter()
.map(|r| {
r.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })
})
.collect()
}
async fn do_statement_query(
&self,
stmt: Statement,
query_ctx: QueryContextRef,
) -> server_error::Result<Output> {
self.handle_statement(stmt, query_ctx)
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })
.context(server_error::ExecuteStatementSnafu)
}
}
@@ -340,7 +391,7 @@ impl GrpcAdminHandler for DistInstance {
}
fn create_table_global_value(
create_table: &CreateExpr,
create_table: &CreateTableExpr,
table_route: &TableRoute,
) -> Result<TableGlobalValue> {
let table_name = &table_route.table.table_name;
@@ -419,13 +470,19 @@ fn create_table_global_value(
created_on: DateTime::default(),
};
let desc = if create_table.desc.is_empty() {
None
} else {
Some(create_table.desc.clone())
};
let table_info = RawTableInfo {
ident: TableIdent {
table_id: table_route.table.id as u32,
version: 0,
},
name: table_name.table_name.clone(),
desc: create_table.desc.clone(),
desc,
catalog_name: table_name.catalog_name.clone(),
schema_name: table_name.schema_name.clone(),
meta,
@@ -440,7 +497,7 @@ fn create_table_global_value(
}
fn parse_partitions(
create_table: &CreateExpr,
create_table: &CreateTableExpr,
partitions: Option<Partitions>,
) -> Result<Vec<MetaPartition>> {
// If partitions are not defined by user, use the timestamp column (which has to be existed) as
@@ -455,7 +512,7 @@ fn parse_partitions(
}
fn find_partition_entries(
create_table: &CreateExpr,
create_table: &CreateTableExpr,
partitions: &Option<Partitions>,
partition_columns: &[String],
) -> Result<Vec<Vec<PartitionBound>>> {
@@ -505,7 +562,7 @@ fn find_partition_entries(
}
fn find_partition_columns(
create_table: &CreateExpr,
create_table: &CreateTableExpr,
partitions: &Option<Partitions>,
) -> Result<Vec<String>> {
let columns = if let Some(partitions) = partitions {
@@ -539,7 +596,7 @@ mod test {
let cases = [
(
r"
CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
PARTITION BY RANGE COLUMNS (b) (
PARTITION r0 VALUES LESS THAN ('hz'),
PARTITION r1 VALUES LESS THAN ('sh'),
@@ -585,6 +642,7 @@ ENGINE=mito",
let output = dist_instance
.handle_sql(sql, QueryContext::arc())
.await
.remove(0)
.unwrap();
match output {
Output::AffectedRows(rows) => assert_eq!(rows, 1),
@@ -595,6 +653,7 @@ ENGINE=mito",
let output = dist_instance
.handle_sql(sql, QueryContext::arc())
.await
.remove(0)
.unwrap();
match output {
Output::RecordBatches(r) => {
@@ -633,6 +692,7 @@ ENGINE=mito",
dist_instance
.handle_sql(sql, QueryContext::arc())
.await
.remove(0)
.unwrap();
let sql = "
@@ -651,11 +711,16 @@ ENGINE=mito",
dist_instance
.handle_sql(sql, QueryContext::arc())
.await
.remove(0)
.unwrap();
async fn assert_show_tables(instance: SqlQueryHandlerRef) {
let sql = "show tables in test_show_tables";
let output = instance.do_query(sql, QueryContext::arc()).await.unwrap();
let output = instance
.do_query(sql, QueryContext::arc())
.await
.remove(0)
.unwrap();
match output {
Output::RecordBatches(r) => {
let expected = r#"+--------------+

View File

@@ -130,6 +130,7 @@ mod tests {
Arc::new(QueryContext::new()),
)
.await
.remove(0)
.unwrap();
match output {
Output::Stream(stream) => {

View File

@@ -27,9 +27,9 @@ use servers::prometheus::{self, Metrics};
use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse};
use servers::Mode;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use crate::instance::Instance;
use crate::instance::{parse_stmt, Instance};
const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32;
@@ -94,13 +94,26 @@ impl Instance {
);
let query_ctx = Arc::new(QueryContext::with_current_schema(db.to_string()));
let output = self.sql_handler.do_query(&sql, query_ctx).await;
let mut stmts = parse_stmt(&sql)
.map_err(BoxedError::new)
.context(error::ExecuteQuerySnafu { query: &sql })?;
ensure!(
stmts.len() == 1,
error::InvalidQuerySnafu {
reason: "The sql has multiple statements".to_string()
}
);
let stmt = stmts.remove(0);
let output = self.sql_handler.do_statement_query(stmt, query_ctx).await;
let object_result = to_object_result(output)
.await
.try_into()
.map_err(BoxedError::new)
.context(error::ExecuteQuerySnafu { query: sql })?;
.context(error::ExecuteQuerySnafu { query: &sql })?;
results.push((table_name, object_result));
}

View File

@@ -354,7 +354,7 @@ impl DistTable {
Ok(partition_rule)
}
/// Define a `alter_by_expr` instead of impl [`Table::alter`] to avoid redundant conversion between
/// Define a `alter_by_expr` instead of impl [`Table::alter`] to avoid redundant conversion between
/// [`table::requests::AlterTableRequest`] and [`AlterExpr`].
pub(crate) async fn alter_by_expr(&self, expr: AlterExpr) -> Result<()> {
let table_routes = self.table_routes.get_route(&self.table_name).await?;
@@ -735,6 +735,7 @@ mod test {
#[tokio::test(flavor = "multi_thread")]
async fn test_dist_table_scan() {
common_telemetry::init_default_ut_logging();
let table = Arc::new(new_dist_table().await);
// should scan all regions
// select a, row_id from numbers
@@ -896,8 +897,8 @@ mod test {
async fn new_dist_table() -> DistTable {
let column_schemas = vec![
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false),
ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false),
ColumnSchema::new("row_id", ConcreteDataType::int32_datatype(), false),
ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new("row_id", ConcreteDataType::int32_datatype(), true),
];
let schema = Arc::new(Schema::new(column_schemas.clone()));

View File

@@ -33,7 +33,7 @@ use common_telemetry::timer;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::ExecutionPlan;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use sql::dialect::GenericDialect;
use sql::parser::ParserContext;
use sql::statements::statement::Statement;
@@ -72,8 +72,7 @@ impl QueryEngine for DatafusionQueryEngine {
fn sql_to_statement(&self, sql: &str) -> Result<Statement> {
let mut statement = ParserContext::create_with_dialect(sql, &GenericDialect {})
.context(error::ParseSqlSnafu)?;
// TODO(dennis): supports multi statement in one sql?
assert!(1 == statement.len());
ensure!(1 == statement.len(), error::MultipleStatementsSnafu { sql });
Ok(statement.remove(0))
}
@@ -280,6 +279,7 @@ mod tests {
.sql_to_plan(sql, Arc::new(QueryContext::new()))
.unwrap();
// TODO(sunng87): do not rely on to_string for compare
assert_eq!(
format!("{:?}", plan),
r#"DfPlan(Limit: skip=0, fetch=20
@@ -297,6 +297,7 @@ mod tests {
let plan = engine
.sql_to_plan(sql, Arc::new(QueryContext::new()))
.unwrap();
let output = engine.execute(&plan).await.unwrap();
match output {

View File

@@ -40,6 +40,9 @@ pub enum InnerError {
source: sql::error::Error,
},
#[snafu(display("The SQL string has multiple statements, sql: {}", sql))]
MultipleStatements { sql: String, backtrace: Backtrace },
#[snafu(display("Cannot plan SQL: {}, source: {}", sql, source))]
PlanSql {
sql: String,
@@ -90,6 +93,7 @@ impl ErrorExt for InnerError {
PlanSql { .. } => StatusCode::PlanQuery,
ConvertDfRecordBatchStream { source } => source.status_code(),
ExecutePhysicalPlan { source } => source.status_code(),
MultipleStatements { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -47,6 +47,7 @@ session = { path = "../session" }
sha1 = "0.10"
snafu = { version = "0.7", features = ["backtraces"] }
snap = "1"
sql = { path = "../sql" }
strum = { version = "0.24", features = ["derive"] }
table = { path = "../table" }
tokio = { version = "1.20", features = ["full"] }

View File

@@ -78,6 +78,12 @@ pub enum Error {
source: BoxedError,
},
#[snafu(display("Failed to execute sql statement, source: {}", source))]
ExecuteStatement {
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display("Failed to execute insert: {}, source: {}", msg, source))]
ExecuteInsert {
msg: String,
@@ -257,6 +263,7 @@ impl ErrorExt for Error {
InsertScript { source, .. }
| ExecuteScript { source, .. }
| ExecuteQuery { source, .. }
| ExecuteStatement { source, .. }
| ExecuteInsert { source, .. }
| ExecuteAlter { source, .. }
| PutOpentsdbDataPoint { source, .. } => source.status_code(),

View File

@@ -232,28 +232,52 @@ impl JsonResponse {
}
/// Create a json response from query result
async fn from_output(output: Result<Output>) -> Self {
match output {
Ok(Output::AffectedRows(rows)) => {
Self::with_output(Some(vec![JsonOutput::AffectedRows(rows)]))
}
Ok(Output::Stream(stream)) => match util::collect(stream).await {
Ok(rows) => match HttpRecordsOutput::try_from(rows) {
Ok(rows) => Self::with_output(Some(vec![JsonOutput::Records(rows)])),
Err(err) => Self::with_error(err, StatusCode::Internal),
async fn from_output(outputs: Vec<Result<Output>>) -> Self {
// TODO(sunng87): this api response structure cannot represent error
// well. It hides successful execution results from error response
let mut results = Vec::with_capacity(outputs.len());
for out in outputs {
match out {
Ok(Output::AffectedRows(rows)) => {
results.push(JsonOutput::AffectedRows(rows));
}
Ok(Output::Stream(stream)) => {
// TODO(sunng87): streaming response
match util::collect(stream).await {
Ok(rows) => match HttpRecordsOutput::try_from(rows) {
Ok(rows) => {
results.push(JsonOutput::Records(rows));
}
Err(err) => {
return Self::with_error(err, StatusCode::Internal);
}
},
Err(e) => {
return Self::with_error(
format!("Recordbatch error: {}", e),
e.status_code(),
);
}
}
}
Ok(Output::RecordBatches(rbs)) => match HttpRecordsOutput::try_from(rbs.take()) {
Ok(rows) => {
results.push(JsonOutput::Records(rows));
}
Err(err) => {
return Self::with_error(err, StatusCode::Internal);
}
},
Err(e) => Self::with_error(format!("Recordbatch error: {}", e), e.status_code()),
},
Ok(Output::RecordBatches(recordbatches)) => {
match HttpRecordsOutput::try_from(recordbatches.take()) {
Ok(rows) => Self::with_output(Some(vec![JsonOutput::Records(rows)])),
Err(err) => Self::with_error(err, StatusCode::Internal),
Err(e) => {
return Self::with_error(
format!("Query engine output error: {}", e),
e.status_code(),
);
}
}
Err(e) => {
Self::with_error(format!("Query engine output error: {}", e), e.status_code())
}
}
Self::with_output(Some(results))
}
pub fn code(&self) -> u32 {
@@ -519,7 +543,15 @@ mod test {
#[async_trait]
impl SqlQueryHandler for DummyInstance {
async fn do_query(&self, _: &str, _: QueryContextRef) -> Result<Output> {
async fn do_query(&self, _: &str, _: QueryContextRef) -> Vec<Result<Output>> {
unimplemented!()
}
async fn do_statement_query(
&self,
_stmt: sql::statements::statement::Statement,
_query_ctx: QueryContextRef,
) -> Result<Output> {
unimplemented!()
}
}
@@ -582,7 +614,8 @@ mod test {
let recordbatch = RecordBatch::new(schema.clone(), columns).unwrap();
let recordbatches = RecordBatches::try_new(schema.clone(), vec![recordbatch]).unwrap();
let json_resp = JsonResponse::from_output(Ok(Output::RecordBatches(recordbatches))).await;
let json_resp =
JsonResponse::from_output(vec![Ok(Output::RecordBatches(recordbatches))]).await;
let json_output = &json_resp.output.unwrap()[0];
if let JsonOutput::Records(r) = json_output {

View File

@@ -45,8 +45,11 @@ pub async fn sql(
let sql_handler = &state.sql_handler;
let start = Instant::now();
let resp = if let Some(sql) = &params.sql {
// TODO(LFC): Sessions in http server.
let query_ctx = Arc::new(QueryContext::new());
if let Some(db) = params.database {
query_ctx.set_current_schema(db.as_ref());
}
JsonResponse::from_output(sql_handler.do_query(sql, query_ctx).await).await
} else {
JsonResponse::with_error(
@@ -78,8 +81,8 @@ pub struct HealthQuery {}
#[derive(Debug, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
pub struct HealthResponse {}
/// Handler to export healthy check
///
/// Handler to export healthy check
///
/// Currently simply return status "200 OK" (default) with an empty json payload "{}"
#[axum_macros::debug_handler]
pub async fn health(Query(_params): Query<HealthQuery>) -> Json<HealthResponse> {

View File

@@ -91,7 +91,7 @@ pub async fn run_script(
}
let output = script_handler.execute_script(name.unwrap()).await;
let resp = JsonResponse::from_output(output).await;
let resp = JsonResponse::from_output(vec![output]).await;
Json(resp.with_execution_time(start.elapsed().as_millis()))
} else {

View File

@@ -73,7 +73,7 @@ impl MysqlInstanceShim {
}
}
async fn do_query(&self, query: &str) -> Result<Output> {
async fn do_query(&self, query: &str) -> Vec<Result<Output>> {
debug!("Start executing query: '{}'", query);
let start = Instant::now();
@@ -82,7 +82,7 @@ impl MysqlInstanceShim {
// components, this is quick and dirty, there must be a better way to do it.
let output =
if let Some(output) = crate::mysql::federated::check(query, self.session.context()) {
Ok(output)
vec![Ok(output)]
} else {
self.query_handler
.do_query(query, self.session.context())
@@ -193,14 +193,17 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
query: &'a str,
writer: QueryResultWriter<'a, W>,
) -> Result<()> {
let output = self.do_query(query).await;
let outputs = self.do_query(query).await;
let mut writer = MysqlResultWriter::new(writer);
writer.write(query, output).await
for output in outputs {
writer.write(query, output).await?;
}
Ok(())
}
async fn on_init<'a>(&'a mut self, database: &'a str, w: InitWriter<'a, W>) -> Result<()> {
let query = format!("USE {}", database.trim());
let output = self.do_query(&query).await;
let output = self.do_query(&query).await.remove(0);
if let Err(e) = output {
w.error(ErrorKind::ER_UNKNOWN_ERROR, e.to_string().as_bytes())
.await

View File

@@ -26,7 +26,7 @@ use pgwire::api::portal::Portal;
use pgwire::api::query::{ExtendedQueryHandler, SimpleQueryHandler};
use pgwire::api::results::{text_query_response, FieldInfo, Response, Tag, TextDataRowEncoder};
use pgwire::api::{ClientInfo, Type};
use pgwire::error::{PgWireError, PgWireResult};
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
use session::context::QueryContext;
use crate::error::{self, Error, Result};
@@ -61,36 +61,43 @@ impl SimpleQueryHandler for PostgresServerHandler {
C: ClientInfo + Unpin + Send + Sync,
{
let query_ctx = query_context_from_client_info(client);
let output = self
.query_handler
.do_query(query, query_ctx)
.await
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
let outputs = self.query_handler.do_query(query, query_ctx).await;
match output {
Output::AffectedRows(rows) => Ok(vec![Response::Execution(Tag::new_for_execution(
"OK",
Some(rows),
))]),
Output::Stream(record_stream) => {
let schema = record_stream.schema();
recordbatches_to_query_response(record_stream, schema)
}
Output::RecordBatches(recordbatches) => {
let schema = recordbatches.schema();
recordbatches_to_query_response(
stream::iter(recordbatches.take().into_iter().map(Ok)),
schema,
)
}
let mut results = Vec::with_capacity(outputs.len());
for output in outputs {
let resp = match output {
Ok(Output::AffectedRows(rows)) => {
Response::Execution(Tag::new_for_execution("OK", Some(rows)))
}
Ok(Output::Stream(record_stream)) => {
let schema = record_stream.schema();
recordbatches_to_query_response(record_stream, schema)?
}
Ok(Output::RecordBatches(recordbatches)) => {
let schema = recordbatches.schema();
recordbatches_to_query_response(
stream::iter(recordbatches.take().into_iter().map(Ok)),
schema,
)?
}
Err(e) => Response::Error(Box::new(ErrorInfo::new(
"ERROR".to_string(),
"XX000".to_string(),
e.to_string(),
))),
};
results.push(resp);
}
Ok(results)
}
}
fn recordbatches_to_query_response<S>(
recordbatches_stream: S,
schema: SchemaRef,
) -> PgWireResult<Vec<Response>>
) -> PgWireResult<Response>
where
S: Stream<Item = RecordBatchResult<RecordBatch>> + Send + Unpin + 'static,
{
@@ -121,10 +128,10 @@ where
})
});
Ok(vec![Response::Query(text_query_response(
Ok(Response::Query(text_query_response(
pg_schema,
data_row_stream,
))])
)))
}
fn schema_to_pg(origin: SchemaRef) -> Result<Vec<FieldInfo>> {

View File

@@ -19,6 +19,7 @@ use api::v1::{AdminExpr, AdminResult, ObjectExpr, ObjectResult};
use async_trait::async_trait;
use common_query::Output;
use session::context::QueryContextRef;
use sql::statements::statement::Statement;
use crate::error::Result;
use crate::influxdb::InfluxdbRequest;
@@ -45,7 +46,13 @@ pub type ScriptHandlerRef = Arc<dyn ScriptHandler + Send + Sync>;
#[async_trait]
pub trait SqlQueryHandler {
async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Result<Output>;
async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>>;
async fn do_statement_query(
&self,
stmt: Statement,
query_ctx: QueryContextRef,
) -> Result<Output>;
}
#[async_trait]

View File

@@ -46,7 +46,15 @@ impl InfluxdbLineProtocolHandler for DummyInstance {
#[async_trait]
impl SqlQueryHandler for DummyInstance {
async fn do_query(&self, _: &str, _: QueryContextRef) -> Result<Output> {
async fn do_query(&self, _: &str, _: QueryContextRef) -> Vec<Result<Output>> {
unimplemented!()
}
async fn do_statement_query(
&self,
_stmt: sql::statements::statement::Statement,
_query_ctx: QueryContextRef,
) -> Result<Output> {
unimplemented!()
}
}

View File

@@ -45,7 +45,15 @@ impl OpentsdbProtocolHandler for DummyInstance {
#[async_trait]
impl SqlQueryHandler for DummyInstance {
async fn do_query(&self, _: &str, _: QueryContextRef) -> Result<Output> {
async fn do_query(&self, _: &str, _: QueryContextRef) -> Vec<Result<Output>> {
unimplemented!()
}
async fn do_statement_query(
&self,
_stmt: sql::statements::statement::Statement,
_query_ctx: QueryContextRef,
) -> Result<Output> {
unimplemented!()
}
}

View File

@@ -70,7 +70,15 @@ impl PrometheusProtocolHandler for DummyInstance {
#[async_trait]
impl SqlQueryHandler for DummyInstance {
async fn do_query(&self, _: &str, _: QueryContextRef) -> Result<Output> {
async fn do_query(&self, _: &str, _: QueryContextRef) -> Vec<Result<Output>> {
unimplemented!()
}
async fn do_statement_query(
&self,
_stmt: sql::statements::statement::Statement,
_query_ctx: QueryContextRef,
) -> Result<Output> {
unimplemented!()
}
}

View File

@@ -54,9 +54,18 @@ impl DummyInstance {
#[async_trait]
impl SqlQueryHandler for DummyInstance {
async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Result<Output> {
async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
let plan = self.query_engine.sql_to_plan(query, query_ctx).unwrap();
Ok(self.query_engine.execute(&plan).await.unwrap())
let output = self.query_engine.execute(&plan).await.unwrap();
vec![Ok(output)]
}
async fn do_statement_query(
&self,
_stmt: sql::statements::statement::Statement,
_query_ctx: QueryContextRef,
) -> Result<Output> {
unimplemented!()
}
}

View File

@@ -245,7 +245,8 @@ pub fn column_def_to_schema(column_def: &ColumnDef, is_time_index: bool) -> Resu
let is_nullable = column_def
.options
.iter()
.any(|o| matches!(o.option, ColumnOption::Null));
.all(|o| !matches!(o.option, ColumnOption::NotNull))
&& !is_time_index;
let name = column_def.name.value.clone();
let data_type = sql_data_type_to_concrete_data_type(&column_def.data_type)?;
@@ -265,10 +266,10 @@ pub fn sql_column_def_to_grpc_column_def(col: ColumnDef) -> Result<api::v1::Colu
let name = col.name.value.clone();
let data_type = sql_data_type_to_concrete_data_type(&col.data_type)?;
let nullable = !col
let is_nullable = col
.options
.iter()
.any(|o| matches!(o.option, ColumnOption::NotNull));
.all(|o| !matches!(o.option, ColumnOption::NotNull));
let default_constraint = parse_column_default_constraint(&name, &data_type, &col.options)?
.map(ColumnDefaultConstraint::try_into) // serialize default constraint to bytes
@@ -281,8 +282,8 @@ pub fn sql_column_def_to_grpc_column_def(col: ColumnDef) -> Result<api::v1::Colu
Ok(api::v1::ColumnDef {
name,
datatype: data_type,
is_nullable: nullable,
default_constraint,
is_nullable,
default_constraint: default_constraint.unwrap_or_default(),
})
}
@@ -587,7 +588,7 @@ mod tests {
assert_eq!("col", grpc_column_def.name);
assert!(grpc_column_def.is_nullable); // nullable when options are empty
assert_eq!(ColumnDataType::Float64 as i32, grpc_column_def.datatype);
assert_eq!(None, grpc_column_def.default_constraint);
assert!(grpc_column_def.default_constraint.is_empty());
// test not null
let column_def = ColumnDef {
@@ -603,4 +604,51 @@ mod tests {
let grpc_column_def = sql_column_def_to_grpc_column_def(column_def).unwrap();
assert!(!grpc_column_def.is_nullable);
}
#[test]
pub fn test_column_def_to_schema() {
let column_def = ColumnDef {
name: "col".into(),
data_type: SqlDataType::Double,
collation: None,
options: vec![],
};
let column_schema = column_def_to_schema(&column_def, false).unwrap();
assert_eq!("col", column_schema.name);
assert_eq!(
ConcreteDataType::float64_datatype(),
column_schema.data_type
);
assert!(column_schema.is_nullable());
assert!(!column_schema.is_time_index());
let column_schema = column_def_to_schema(&column_def, true).unwrap();
assert_eq!("col", column_schema.name);
assert_eq!(
ConcreteDataType::float64_datatype(),
column_schema.data_type
);
assert!(!column_schema.is_nullable());
assert!(column_schema.is_time_index());
let column_def = ColumnDef {
name: "col2".into(),
data_type: SqlDataType::String,
collation: None,
options: vec![ColumnOptionDef {
name: None,
option: ColumnOption::NotNull,
}],
};
let column_schema = column_def_to_schema(&column_def, false).unwrap();
assert_eq!("col2", column_schema.name);
assert_eq!(ConcreteDataType::string_datatype(), column_schema.data_type);
assert!(!column_schema.is_nullable());
assert!(!column_schema.is_time_index());
}
}

View File

@@ -56,7 +56,7 @@ impl TryFrom<AlterTable> for AlterExpr {
type Error = crate::error::Error;
fn try_from(value: AlterTable) -> Result<Self, Self::Error> {
let (catalog, schema, table) = table_idents_to_full_name(&value.table_name)?;
let (catalog_name, schema_name, table_name) = table_idents_to_full_name(&value.table_name)?;
let kind = match value.alter_operation {
AlterTableOperation::AddConstraint(_) => {
@@ -80,9 +80,9 @@ impl TryFrom<AlterTable> for AlterExpr {
}
};
let expr = AlterExpr {
catalog_name: Some(catalog),
schema_name: Some(schema),
table_name: table,
catalog_name,
schema_name,
table_name,
kind: Some(kind),
};

View File

@@ -21,8 +21,8 @@ futures = "0.3"
futures-util = "0.3"
lazy_static = "1.4"
object-store = { path = "../object-store" }
paste = "1.0"
parquet = { version = "26", features = ["async"] }
paste = "1.0"
planus = "0.2"
prost = "0.11"
regex = "1.5"

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod codec;
mod compat;
use std::any::Any;
@@ -499,280 +500,23 @@ impl<'a> IntoIterator for &'a WriteBatch {
}
}
pub mod codec {
#[cfg(test)]
pub(crate) fn new_test_batch() -> WriteBatch {
use datatypes::type_id::LogicalTypeId;
use std::io::Cursor;
use std::sync::Arc;
use crate::test_util::write_batch_util;
use datatypes::arrow::ipc::reader::StreamReader;
use datatypes::arrow::ipc::writer::{IpcWriteOptions, StreamWriter};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::schema::{Schema, SchemaRef};
use datatypes::vectors::Helper;
use prost::Message;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::WriteRequest;
use crate::codec::{Decoder, Encoder};
use crate::proto::wal::MutationType;
use crate::proto::write_batch::{self, gen_columns, gen_put_data_vector};
use crate::write_batch::{
DataCorruptedSnafu, DecodeArrowSnafu, DecodeProtobufSnafu, DecodeVectorSnafu,
EncodeArrowSnafu, EncodeProtobufSnafu, Error as WriteBatchError, FromProtobufSnafu,
MissingColumnSnafu, Mutation, ParseSchemaSnafu, PutData, Result, ToProtobufSnafu,
WriteBatch,
};
// TODO(jiachun): We can make a comparison with protobuf, including performance, storage cost,
// CPU consumption, etc
#[derive(Default)]
pub struct WriteBatchArrowEncoder {}
impl WriteBatchArrowEncoder {
pub fn new() -> Self {
Self::default()
}
}
impl Encoder for WriteBatchArrowEncoder {
type Item = WriteBatch;
type Error = WriteBatchError;
fn encode(&self, item: &WriteBatch, dst: &mut Vec<u8>) -> Result<()> {
let item_schema = item.schema();
let arrow_schema = item_schema.arrow_schema();
let opts = IpcWriteOptions::default();
let mut writer = StreamWriter::try_new_with_options(dst, arrow_schema, opts)
.context(EncodeArrowSnafu)?;
for mutation in item.iter() {
let rb = match mutation {
Mutation::Put(put) => {
let arrays = item_schema
.column_schemas()
.iter()
.map(|column_schema| {
let vector = put.column_by_name(&column_schema.name).context(
MissingColumnSnafu {
name: &column_schema.name,
},
)?;
Ok(vector.to_arrow_array())
})
.collect::<Result<Vec<_>>>()?;
RecordBatch::try_new(arrow_schema.clone(), arrays)
.context(EncodeArrowSnafu)?
}
};
writer.write(&rb).context(EncodeArrowSnafu)?;
}
writer.finish().context(EncodeArrowSnafu)?;
Ok(())
}
}
pub struct WriteBatchArrowDecoder {
mutation_types: Vec<i32>,
}
impl WriteBatchArrowDecoder {
pub fn new(mutation_types: Vec<i32>) -> Self {
Self { mutation_types }
}
}
impl Decoder for WriteBatchArrowDecoder {
type Item = WriteBatch;
type Error = WriteBatchError;
fn decode(&self, src: &[u8]) -> Result<WriteBatch> {
let reader = Cursor::new(src);
let mut reader = StreamReader::try_new(reader, None).context(DecodeArrowSnafu)?;
let arrow_schema = reader.schema();
let mut chunks = Vec::with_capacity(self.mutation_types.len());
for maybe_record_batch in reader.by_ref() {
let record_batch = maybe_record_batch.context(DecodeArrowSnafu)?;
chunks.push(record_batch);
}
// check if exactly finished
ensure!(
reader.is_finished(),
DataCorruptedSnafu {
message: "Impossible, the num of data chunks is different than expected."
}
);
ensure!(
chunks.len() == self.mutation_types.len(),
DataCorruptedSnafu {
message: format!(
"expected {} mutations, but got {}",
self.mutation_types.len(),
chunks.len()
)
}
);
let schema = Arc::new(Schema::try_from(arrow_schema).context(ParseSchemaSnafu)?);
let mut write_batch = WriteBatch::new(schema.clone());
for (mutation_type, record_batch) in self.mutation_types.iter().zip(chunks.into_iter())
{
match MutationType::from_i32(*mutation_type) {
Some(MutationType::Put) => {
let mut put_data = PutData::with_num_columns(schema.num_columns());
for (column_schema, array) in schema
.column_schemas()
.iter()
.zip(record_batch.columns().iter())
{
let vector =
Helper::try_into_vector(array).context(DecodeVectorSnafu)?;
put_data.add_column_by_name(&column_schema.name, vector)?;
}
write_batch.put(put_data)?;
}
Some(MutationType::Delete) => {
unimplemented!("delete mutation is not implemented")
}
_ => {
return DataCorruptedSnafu {
message: format!("Unexpceted mutation type: {}", mutation_type),
}
.fail()
}
}
}
Ok(write_batch)
}
}
pub struct WriteBatchProtobufEncoder {}
impl Encoder for WriteBatchProtobufEncoder {
type Item = WriteBatch;
type Error = WriteBatchError;
fn encode(&self, item: &WriteBatch, dst: &mut Vec<u8>) -> Result<()> {
let schema = item.schema().into();
let mutations = item
.iter()
.map(|mtn| match mtn {
Mutation::Put(put_data) => item
.schema()
.column_schemas()
.iter()
.map(|cs| {
let vector = put_data
.column_by_name(&cs.name)
.context(MissingColumnSnafu { name: &cs.name })?;
gen_columns(vector).context(ToProtobufSnafu)
})
.collect::<Result<Vec<_>>>(),
})
.collect::<Result<Vec<_>>>()?
.into_iter()
.map(|columns| write_batch::Mutation {
mutation: Some(write_batch::mutation::Mutation::Put(write_batch::Put {
columns,
})),
})
.collect();
let write_batch = write_batch::WriteBatch {
schema: Some(schema),
mutations,
};
write_batch.encode(dst).context(EncodeProtobufSnafu)
}
}
pub struct WriteBatchProtobufDecoder {
mutation_types: Vec<i32>,
}
impl WriteBatchProtobufDecoder {
#[allow(dead_code)]
pub fn new(mutation_types: Vec<i32>) -> Self {
Self { mutation_types }
}
}
impl Decoder for WriteBatchProtobufDecoder {
type Item = WriteBatch;
type Error = WriteBatchError;
fn decode(&self, src: &[u8]) -> Result<WriteBatch> {
let write_batch = write_batch::WriteBatch::decode(src).context(DecodeProtobufSnafu)?;
let schema = write_batch.schema.context(DataCorruptedSnafu {
message: "schema required",
})?;
let schema = SchemaRef::try_from(schema).context(FromProtobufSnafu {})?;
ensure!(
write_batch.mutations.len() == self.mutation_types.len(),
DataCorruptedSnafu {
message: &format!(
"expected {} mutations, but got {}",
self.mutation_types.len(),
write_batch.mutations.len()
)
}
);
let mutations = write_batch
.mutations
.into_iter()
.map(|mtn| match mtn.mutation {
Some(write_batch::mutation::Mutation::Put(put)) => {
let mut put_data = PutData::with_num_columns(put.columns.len());
let res = schema
.column_schemas()
.iter()
.map(|column| (column.name.clone(), column.data_type.clone()))
.zip(put.columns.into_iter())
.map(|((name, data_type), column)| {
gen_put_data_vector(data_type, column)
.map(|vector| (name, vector))
.context(FromProtobufSnafu)
})
.collect::<Result<Vec<_>>>()?
.into_iter()
.map(|(name, vector)| put_data.add_column_by_name(&name, vector))
.collect::<Result<Vec<_>>>();
res.map(|_| Mutation::Put(put_data))
}
Some(write_batch::mutation::Mutation::Delete(_)) => todo!(),
_ => DataCorruptedSnafu {
message: "invalid mutation type",
}
.fail(),
})
.collect::<Result<Vec<_>>>()?;
let mut write_batch = WriteBatch::new(schema);
mutations
.into_iter()
.try_for_each(|mutation| match mutation {
Mutation::Put(put_data) => write_batch.put(put_data),
})?;
Ok(write_batch)
}
}
write_batch_util::new_write_batch(
&[
("k1", LogicalTypeId::UInt64, false),
(consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false),
("ts", LogicalTypeId::TimestampMillisecond, false),
("v1", LogicalTypeId::Boolean, true),
],
Some(2),
)
}
#[cfg(test)]
mod tests {
use std::iter;
@@ -785,8 +529,6 @@ mod tests {
};
use super::*;
use crate::codec::{Decoder, Encoder};
use crate::proto;
use crate::test_util::write_batch_util;
#[test]
@@ -824,18 +566,6 @@ mod tests {
assert!(put_data.is_empty());
}
fn new_test_batch() -> WriteBatch {
write_batch_util::new_write_batch(
&[
("k1", LogicalTypeId::UInt64, false),
(consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false),
("ts", LogicalTypeId::TimestampMillisecond, false),
("v1", LogicalTypeId::Boolean, true),
],
Some(2),
)
}
#[test]
fn test_write_batch_put() {
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3]));
@@ -959,7 +689,7 @@ mod tests {
}
#[test]
pub fn test_align_timestamp() {
fn test_align_timestamp() {
let duration_millis = 20;
let ts = [-21, -20, -19, -1, 0, 5, 15, 19, 20, 21];
let res = ts.map(|t| align_timestamp(t, duration_millis));
@@ -967,7 +697,7 @@ mod tests {
}
#[test]
pub fn test_align_timestamp_overflow() {
fn test_align_timestamp_overflow() {
assert_eq!(Some(i64::MIN), align_timestamp(i64::MIN, 1));
assert_eq!(Some(-9223372036854775808), align_timestamp(i64::MIN, 2));
assert_eq!(
@@ -982,7 +712,7 @@ mod tests {
}
#[test]
pub fn test_write_batch_time_range() {
fn test_write_batch_time_range() {
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3, 4, 5, 6]));
let tsv = Arc::new(TimestampMillisecondVector::from_vec(vec![
-21, -20, -1, 0, 1, 20,
@@ -1011,7 +741,7 @@ mod tests {
}
#[test]
pub fn test_write_batch_time_range_const_vector() {
fn test_write_batch_time_range_const_vector() {
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3, 4, 5, 6]));
let tsv = Arc::new(ConstantVector::new(
Arc::new(TimestampMillisecondVector::from_vec(vec![20])),
@@ -1039,111 +769,4 @@ mod tests {
ranges.as_slice()
)
}
fn gen_new_batch_and_types() -> (WriteBatch, Vec<i32>) {
let mut batch = new_test_batch();
for i in 0..10 {
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3]));
let boolv = Arc::new(BooleanVector::from(vec![Some(true), Some(false), None]));
let tsv = Arc::new(TimestampMillisecondVector::from_vec(vec![i, i, i]));
let mut put_data = PutData::new();
put_data.add_key_column("k1", intv.clone()).unwrap();
put_data.add_version_column(intv).unwrap();
put_data.add_value_column("v1", boolv).unwrap();
put_data.add_key_column("ts", tsv).unwrap();
batch.put(put_data).unwrap();
}
let types = proto::wal::gen_mutation_types(&batch);
(batch, types)
}
#[test]
fn test_codec_arrow() -> Result<()> {
let (batch, mutation_types) = gen_new_batch_and_types();
let encoder = codec::WriteBatchArrowEncoder::new();
let mut dst = vec![];
let result = encoder.encode(&batch, &mut dst);
assert!(result.is_ok());
let decoder = codec::WriteBatchArrowDecoder::new(mutation_types);
let result = decoder.decode(&dst);
let batch2 = result?;
assert_eq!(batch.num_rows, batch2.num_rows);
Ok(())
}
#[test]
fn test_codec_protobuf() -> Result<()> {
let (batch, mutation_types) = gen_new_batch_and_types();
let encoder = codec::WriteBatchProtobufEncoder {};
let mut dst = vec![];
let result = encoder.encode(&batch, &mut dst);
assert!(result.is_ok());
let decoder = codec::WriteBatchProtobufDecoder::new(mutation_types);
let result = decoder.decode(&dst);
let batch2 = result?;
assert_eq!(batch.num_rows, batch2.num_rows);
Ok(())
}
fn gen_new_batch_and_types_with_none_column() -> (WriteBatch, Vec<i32>) {
let mut batch = new_test_batch();
for _ in 0..10 {
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3]));
let tsv = Arc::new(TimestampMillisecondVector::from_vec(vec![0, 0, 0]));
let mut put_data = PutData::new();
put_data.add_key_column("k1", intv.clone()).unwrap();
put_data.add_version_column(intv).unwrap();
put_data.add_key_column("ts", tsv).unwrap();
batch.put(put_data).unwrap();
}
let types = proto::wal::gen_mutation_types(&batch);
(batch, types)
}
#[test]
fn test_codec_with_none_column_arrow() -> Result<()> {
let (batch, mutation_types) = gen_new_batch_and_types_with_none_column();
let encoder = codec::WriteBatchArrowEncoder::new();
let mut dst = vec![];
let result = encoder.encode(&batch, &mut dst);
assert!(result.is_ok());
let decoder = codec::WriteBatchArrowDecoder::new(mutation_types);
let result = decoder.decode(&dst);
let batch2 = result?;
assert_eq!(batch.num_rows, batch2.num_rows);
Ok(())
}
#[test]
fn test_codec_with_none_column_protobuf() -> Result<()> {
let (batch, mutation_types) = gen_new_batch_and_types_with_none_column();
let encoder = codec::WriteBatchProtobufEncoder {};
let mut dst = vec![];
encoder.encode(&batch, &mut dst).unwrap();
let decoder = codec::WriteBatchProtobufDecoder::new(mutation_types);
let result = decoder.decode(&dst);
let batch2 = result?;
assert_eq!(batch.num_rows, batch2.num_rows);
Ok(())
}
}

View File

@@ -0,0 +1,399 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io::Cursor;
use std::sync::Arc;
use datatypes::arrow::ipc::reader::StreamReader;
use datatypes::arrow::ipc::writer::{IpcWriteOptions, StreamWriter};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::schema::{Schema, SchemaRef};
use datatypes::vectors::Helper;
use prost::Message;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::WriteRequest;
use crate::codec::{Decoder, Encoder};
use crate::proto::wal::MutationType;
use crate::proto::write_batch::{self, gen_columns, gen_put_data_vector};
use crate::write_batch::{
DataCorruptedSnafu, DecodeArrowSnafu, DecodeProtobufSnafu, DecodeVectorSnafu, EncodeArrowSnafu,
EncodeProtobufSnafu, Error as WriteBatchError, FromProtobufSnafu, MissingColumnSnafu, Mutation,
ParseSchemaSnafu, PutData, Result, ToProtobufSnafu, WriteBatch,
};
// TODO(jiachun): We can make a comparison with protobuf, including performance, storage cost,
// CPU consumption, etc
#[derive(Default)]
pub struct WriteBatchArrowEncoder {}
impl WriteBatchArrowEncoder {
pub fn new() -> Self {
Self::default()
}
}
impl Encoder for WriteBatchArrowEncoder {
type Item = WriteBatch;
type Error = WriteBatchError;
fn encode(&self, item: &WriteBatch, dst: &mut Vec<u8>) -> Result<()> {
let item_schema = item.schema();
let arrow_schema = item_schema.arrow_schema();
let opts = IpcWriteOptions::default();
let mut writer = StreamWriter::try_new_with_options(dst, arrow_schema, opts)
.context(EncodeArrowSnafu)?;
for mutation in item.iter() {
let rb = match mutation {
Mutation::Put(put) => {
let arrays = item_schema
.column_schemas()
.iter()
.map(|column_schema| {
let vector = put.column_by_name(&column_schema.name).context(
MissingColumnSnafu {
name: &column_schema.name,
},
)?;
Ok(vector.to_arrow_array())
})
.collect::<Result<Vec<_>>>()?;
RecordBatch::try_new(arrow_schema.clone(), arrays).context(EncodeArrowSnafu)?
}
};
writer.write(&rb).context(EncodeArrowSnafu)?;
}
writer.finish().context(EncodeArrowSnafu)?;
Ok(())
}
}
pub struct WriteBatchArrowDecoder {
mutation_types: Vec<i32>,
}
impl WriteBatchArrowDecoder {
pub fn new(mutation_types: Vec<i32>) -> Self {
Self { mutation_types }
}
}
impl Decoder for WriteBatchArrowDecoder {
type Item = WriteBatch;
type Error = WriteBatchError;
fn decode(&self, src: &[u8]) -> Result<WriteBatch> {
let reader = Cursor::new(src);
let mut reader = StreamReader::try_new(reader, None).context(DecodeArrowSnafu)?;
let arrow_schema = reader.schema();
let mut chunks = Vec::with_capacity(self.mutation_types.len());
for maybe_record_batch in reader.by_ref() {
let record_batch = maybe_record_batch.context(DecodeArrowSnafu)?;
chunks.push(record_batch);
}
// check if exactly finished
ensure!(
reader.is_finished(),
DataCorruptedSnafu {
message: "Impossible, the num of data chunks is different than expected."
}
);
ensure!(
chunks.len() == self.mutation_types.len(),
DataCorruptedSnafu {
message: format!(
"expected {} mutations, but got {}",
self.mutation_types.len(),
chunks.len()
)
}
);
let schema = Arc::new(Schema::try_from(arrow_schema).context(ParseSchemaSnafu)?);
let mut write_batch = WriteBatch::new(schema.clone());
for (mutation_type, record_batch) in self.mutation_types.iter().zip(chunks.into_iter()) {
match MutationType::from_i32(*mutation_type) {
Some(MutationType::Put) => {
let mut put_data = PutData::with_num_columns(schema.num_columns());
for (column_schema, array) in schema
.column_schemas()
.iter()
.zip(record_batch.columns().iter())
{
let vector = Helper::try_into_vector(array).context(DecodeVectorSnafu)?;
put_data.add_column_by_name(&column_schema.name, vector)?;
}
write_batch.put(put_data)?;
}
Some(MutationType::Delete) => {
unimplemented!("delete mutation is not implemented")
}
_ => {
return DataCorruptedSnafu {
message: format!("Unexpceted mutation type: {}", mutation_type),
}
.fail()
}
}
}
Ok(write_batch)
}
}
pub struct WriteBatchProtobufEncoder {}
impl Encoder for WriteBatchProtobufEncoder {
type Item = WriteBatch;
type Error = WriteBatchError;
fn encode(&self, item: &WriteBatch, dst: &mut Vec<u8>) -> Result<()> {
let schema = item.schema().into();
let mutations = item
.iter()
.map(|mtn| match mtn {
Mutation::Put(put_data) => item
.schema()
.column_schemas()
.iter()
.map(|cs| {
let vector = put_data
.column_by_name(&cs.name)
.context(MissingColumnSnafu { name: &cs.name })?;
gen_columns(vector).context(ToProtobufSnafu)
})
.collect::<Result<Vec<_>>>(),
})
.collect::<Result<Vec<_>>>()?
.into_iter()
.map(|columns| write_batch::Mutation {
mutation: Some(write_batch::mutation::Mutation::Put(write_batch::Put {
columns,
})),
})
.collect();
let write_batch = write_batch::WriteBatch {
schema: Some(schema),
mutations,
};
write_batch.encode(dst).context(EncodeProtobufSnafu)
}
}
pub struct WriteBatchProtobufDecoder {
mutation_types: Vec<i32>,
}
impl WriteBatchProtobufDecoder {
#[allow(dead_code)]
pub fn new(mutation_types: Vec<i32>) -> Self {
Self { mutation_types }
}
}
impl Decoder for WriteBatchProtobufDecoder {
type Item = WriteBatch;
type Error = WriteBatchError;
fn decode(&self, src: &[u8]) -> Result<WriteBatch> {
let write_batch = write_batch::WriteBatch::decode(src).context(DecodeProtobufSnafu)?;
let schema = write_batch.schema.context(DataCorruptedSnafu {
message: "schema required",
})?;
let schema = SchemaRef::try_from(schema).context(FromProtobufSnafu {})?;
ensure!(
write_batch.mutations.len() == self.mutation_types.len(),
DataCorruptedSnafu {
message: &format!(
"expected {} mutations, but got {}",
self.mutation_types.len(),
write_batch.mutations.len()
)
}
);
let mutations = write_batch
.mutations
.into_iter()
.map(|mtn| match mtn.mutation {
Some(write_batch::mutation::Mutation::Put(put)) => {
let mut put_data = PutData::with_num_columns(put.columns.len());
let res = schema
.column_schemas()
.iter()
.map(|column| (column.name.clone(), column.data_type.clone()))
.zip(put.columns.into_iter())
.map(|((name, data_type), column)| {
gen_put_data_vector(data_type, column)
.map(|vector| (name, vector))
.context(FromProtobufSnafu)
})
.collect::<Result<Vec<_>>>()?
.into_iter()
.map(|(name, vector)| put_data.add_column_by_name(&name, vector))
.collect::<Result<Vec<_>>>();
res.map(|_| Mutation::Put(put_data))
}
Some(write_batch::mutation::Mutation::Delete(_)) => todo!(),
_ => DataCorruptedSnafu {
message: "invalid mutation type",
}
.fail(),
})
.collect::<Result<Vec<_>>>()?;
let mut write_batch = WriteBatch::new(schema);
mutations
.into_iter()
.try_for_each(|mutation| match mutation {
Mutation::Put(put_data) => write_batch.put(put_data),
})?;
Ok(write_batch)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use datatypes::vectors::{BooleanVector, TimestampMillisecondVector, UInt64Vector};
use store_api::storage::PutOperation;
use super::*;
use crate::{proto, write_batch};
fn gen_new_batch_and_types() -> (WriteBatch, Vec<i32>) {
let mut batch = write_batch::new_test_batch();
for i in 0..10 {
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3]));
let boolv = Arc::new(BooleanVector::from(vec![Some(true), Some(false), None]));
let tsv = Arc::new(TimestampMillisecondVector::from_vec(vec![i, i, i]));
let mut put_data = PutData::new();
put_data.add_key_column("k1", intv.clone()).unwrap();
put_data.add_version_column(intv).unwrap();
put_data.add_value_column("v1", boolv).unwrap();
put_data.add_key_column("ts", tsv).unwrap();
batch.put(put_data).unwrap();
}
let types = proto::wal::gen_mutation_types(&batch);
(batch, types)
}
#[test]
fn test_codec_arrow() -> Result<()> {
let (batch, mutation_types) = gen_new_batch_and_types();
let encoder = WriteBatchArrowEncoder::new();
let mut dst = vec![];
let result = encoder.encode(&batch, &mut dst);
assert!(result.is_ok());
let decoder = WriteBatchArrowDecoder::new(mutation_types);
let result = decoder.decode(&dst);
let batch2 = result?;
assert_eq!(batch.num_rows, batch2.num_rows);
Ok(())
}
#[test]
fn test_codec_protobuf() -> Result<()> {
let (batch, mutation_types) = gen_new_batch_and_types();
let encoder = WriteBatchProtobufEncoder {};
let mut dst = vec![];
let result = encoder.encode(&batch, &mut dst);
assert!(result.is_ok());
let decoder = WriteBatchProtobufDecoder::new(mutation_types);
let result = decoder.decode(&dst);
let batch2 = result?;
assert_eq!(batch.num_rows, batch2.num_rows);
Ok(())
}
fn gen_new_batch_and_types_with_none_column() -> (WriteBatch, Vec<i32>) {
let mut batch = write_batch::new_test_batch();
for _ in 0..10 {
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3]));
let tsv = Arc::new(TimestampMillisecondVector::from_vec(vec![0, 0, 0]));
let mut put_data = PutData::new();
put_data.add_key_column("k1", intv.clone()).unwrap();
put_data.add_version_column(intv).unwrap();
put_data.add_key_column("ts", tsv).unwrap();
batch.put(put_data).unwrap();
}
let types = proto::wal::gen_mutation_types(&batch);
(batch, types)
}
#[test]
fn test_codec_with_none_column_arrow() -> Result<()> {
let (batch, mutation_types) = gen_new_batch_and_types_with_none_column();
let encoder = WriteBatchArrowEncoder::new();
let mut dst = vec![];
let result = encoder.encode(&batch, &mut dst);
assert!(result.is_ok());
let decoder = WriteBatchArrowDecoder::new(mutation_types);
let result = decoder.decode(&dst);
let batch2 = result?;
assert_eq!(batch.num_rows, batch2.num_rows);
Ok(())
}
#[test]
fn test_codec_with_none_column_protobuf() -> Result<()> {
let (batch, mutation_types) = gen_new_batch_and_types_with_none_column();
let encoder = WriteBatchProtobufEncoder {};
let mut dst = vec![];
encoder.encode(&batch, &mut dst).unwrap();
let decoder = WriteBatchProtobufDecoder::new(mutation_types);
let result = decoder.decode(&dst);
let batch2 = result?;
assert_eq!(batch.num_rows, batch2.num_rows);
Ok(())
}
}

View File

@@ -27,6 +27,6 @@ tokio = { version = "1.18", features = ["full"] }
[dev-dependencies]
datafusion-expr = "14.0.0"
parquet = { version = "26", features = ["async"] }
tempdir = "0.3"
tokio-util = { version = "0.7", features = ["compat"] }
parquet = { version = "26", features = ["async"] }

View File

@@ -15,7 +15,7 @@ use api::v1::alter_expr::Kind;
use api::v1::column::SemanticType;
use api::v1::{
admin_result, column, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef,
CreateExpr, InsertExpr, MutateResult,
CreateTableExpr, InsertExpr, MutateResult, TableId,
};
use client::admin::Admin;
use client::{Client, Database, ObjectResult};
@@ -151,7 +151,7 @@ pub async fn test_insert_and_select(store_type: StorageType) {
name: "test_column".to_string(),
datatype: ColumnDataType::Int64.into(),
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
};
let kind = Kind::AddColumns(AddColumns {
add_columns: vec![AddColumn {
@@ -161,8 +161,8 @@ pub async fn test_insert_and_select(store_type: StorageType) {
});
let expr = AlterExpr {
table_name: "test_table".to_string(),
catalog_name: None,
schema_name: None,
catalog_name: "".to_string(),
schema_name: "".to_string(),
kind: Some(kind),
};
let result = admin.alter(expr).await.unwrap();
@@ -222,44 +222,46 @@ async fn insert_and_assert(db: &Database) {
}
}
fn testing_create_expr() -> CreateExpr {
fn testing_create_expr() -> CreateTableExpr {
let column_defs = vec![
ColumnDef {
name: "host".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: false,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "cpu".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "memory".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "ts".to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32, // timestamp
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
];
CreateExpr {
catalog_name: None,
schema_name: None,
CreateTableExpr {
catalog_name: "".to_string(),
schema_name: "".to_string(),
table_name: "demo".to_string(),
desc: Some("blabla".to_string()),
desc: "blabla little magic fairy".to_string(),
column_defs,
time_index: "ts".to_string(),
primary_keys: vec!["host".to_string()],
create_if_not_exists: true,
table_options: Default::default(),
table_id: Some(MIN_USER_TABLE_ID),
table_id: Some(TableId {
id: MIN_USER_TABLE_ID,
}),
region_ids: vec![0],
}
}

View File

@@ -59,13 +59,12 @@ macro_rules! http_tests {
pub async fn test_sql_api(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_app(store_type, "sql_api").await;
let (app, mut guard) = setup_test_app_with_frontend(store_type, "sql_api").await;
let client = TestClient::new(app);
let res = client.get("/v1/sql").send().await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
// body json: r#"{"code":1004,"error":"sql parameter is required."}"#
assert_eq!(body.code(), 1004);
assert_eq!(body.error().unwrap(), "sql parameter is required.");
assert!(body.execution_time_ms().is_some());
@@ -77,9 +76,6 @@ pub async fn test_sql_api(store_type: StorageType) {
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
// body json:
// r#"{"code":0,"output":[{"records":{"schema":{"column_schemas":[{"name":"number","data_type":"UInt32"}]},"rows":[[0],[1],[2],[3],[4],[5],[6],[7],[8],[9]]}}]}"#
assert!(body.success());
assert!(body.execution_time_ms().is_some());
@@ -107,7 +103,6 @@ pub async fn test_sql_api(store_type: StorageType) {
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
// body json: r#"{"code":0,"output":[{"records":{"schema":{"column_schemas":[{"name":"host","data_type":"String"},{"name":"cpu","data_type":"Float64"},{"name":"memory","data_type":"Float64"},{"name":"ts","data_type":"Timestamp"}]},"rows":[["host",66.6,1024.0,0]]}}]}"#
assert!(body.success());
assert!(body.execution_time_ms().is_some());
let output = body.output().unwrap();
@@ -128,8 +123,6 @@ pub async fn test_sql_api(store_type: StorageType) {
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
// body json:
// r#"{"code":0,"output":[{"records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"Timestamp"}]},"rows":[[66.6,0]]}}]}"#
assert!(body.success());
assert!(body.execution_time_ms().is_some());
let output = body.output().unwrap();
@@ -150,8 +143,6 @@ pub async fn test_sql_api(store_type: StorageType) {
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
// body json:
// r#"{"code":0,"output":[{"records":{"schema":{"column_schemas":[{"name":"c","data_type":"Float64"},{"name":"time","data_type":"Timestamp"}]},"rows":[[66.6,0]]}}]}"#
assert!(body.success());
assert!(body.execution_time_ms().is_some());
let output = body.output().unwrap();
@@ -163,6 +154,44 @@ pub async fn test_sql_api(store_type: StorageType) {
})).unwrap()
);
// test multi-statement
let res = client
.get("/v1/sql?sql=select cpu, ts from demo limit 1;select cpu, ts from demo where ts > 0;")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
assert!(body.success());
assert!(body.execution_time_ms().is_some());
let outputs = body.output().unwrap();
assert_eq!(outputs.len(), 2);
assert_eq!(
outputs[0],
serde_json::from_value::<JsonOutput>(json!({
"records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]]}
})).unwrap()
);
assert_eq!(
outputs[1],
serde_json::from_value::<JsonOutput>(json!({
"records":{"rows":[]}
}))
.unwrap()
);
// test multi-statement with error
let res = client
.get("/v1/sql?sql=select cpu, ts from demo limit 1;select cpu, ts from demo2 where ts > 0;")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
assert!(!body.success());
assert!(body.execution_time_ms().is_some());
assert!(body.error().unwrap().contains("not found"));
guard.remove_all().await;
}
@@ -206,7 +235,6 @@ def test(n):
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
// body json: r#"{"code":0}"#
assert_eq!(body.code(), 0);
assert!(body.output().is_none());
@@ -215,8 +243,6 @@ def test(n):
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
// body json:
// r#"{"code":0,"output":[{"records":{"schema":{"column_schemas":[{"name":"n","data_type":"Float64"}]},"rows":[[1.0],[2.0],[3.0],[4.0],[5.0],[6.0],[7.0],[8.0],[9.0],[10.0]]}}]}"#
assert_eq!(body.code(), 0);
assert!(body.execution_time_ms().is_some());
let output = body.output().unwrap();