Compare commits

..

19 Commits

Author SHA1 Message Date
Ning Sun
11bdb33d37 feat: sql query interceptor and plugin refactoring (#773)
* feat: let instance hold plugins

* feat: add sql query interceptor definition

* docs: add comments to key apis

* feat: add implementation for pre-parsing and post-parsing

* feat: add post_execute hook

* test: add tests for interceptor

* chore: add license header

* fix: clippy error

* Update src/cmd/src/frontend.rs

Co-authored-by: LFC <bayinamine@gmail.com>

* refactor: batching post_parsing calls

* refactor: rename AnyMap2 to Plugins

* feat: call pre_execute with logical plan empty at the moment

Co-authored-by: LFC <bayinamine@gmail.com>
2022-12-23 15:22:12 +08:00
LFC
1daba75e7b refactor: use "USE" keyword (#785)
Co-authored-by: luofucong <luofucong@greptime.com>
2022-12-23 14:29:47 +08:00
LFC
dc52a51576 chore: upgrade to Arrow 29.0 and use workspace package and dependencies (#782)
* chore: upgrade to Arrow 29.0 and use workspace package and dependencies

* fix: resolve PR comments

Co-authored-by: luofucong <luofucong@greptime.com>
2022-12-23 14:28:37 +08:00
Ruihang Xia
26af9e6214 ci: setup secrets for setup-protoc job (#783)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2022-12-23 11:36:39 +08:00
fys
e07791c5e8 chore: make election mod public (#781) 2022-12-22 17:32:35 +08:00
Yingwen
b6d29afcd1 ci: Use lld for coverage (#778)
* ci: Use lld for coverage

* style: Fix clippy
2022-12-22 16:10:37 +08:00
LFC
ea9af42091 chore: upgrade Rust to nightly 2022-12-20 (#772)
* chore: upgrade Rust to nightly 2022-12-20

* chore: upgrade Rust to nightly 2022-12-20

Co-authored-by: luofucong <luofucong@greptime.com>
2022-12-21 19:32:30 +08:00
shuiyisong
d0ebcc3b5a chore: open userinfo constructor (#774) 2022-12-21 17:58:43 +08:00
LFC
77182f5024 chore: upgrade Arrow to version 28, and DataFusion to 15 (#771)
Co-authored-by: luofucong <luofucong@greptime.com>
2022-12-21 17:02:11 +08:00
Ning Sun
539ead5460 feat: check database existence on http api (#764)
* feat: check database existance on http api

* Update src/servers/src/http/handler.rs

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* feat: use database not found status code

* test: add assertion for status code

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
2022-12-21 10:28:45 +08:00
Ruihang Xia
bc0e4e2cb0 fix: fill NULL based on row_count (#765)
* fix: fill NULL based on row_count

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* simplify code

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: replace set_len with resize

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2022-12-20 12:12:48 +08:00
Ruihang Xia
7d29670c86 fix: consider null mask in sqlness display util (#763)
* fix: consider null mask in sqlness display util

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add test case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix test case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* change placeholder to null

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2022-12-19 14:20:28 +08:00
LFC
afd88dd53a fix: test_dist_table_scan block (#761)
* fix: `test_dist_table_scan` block

* fix: resolve PR comments

Co-authored-by: luofucong <luofucong@greptime.com>
2022-12-19 11:20:51 +08:00
Ning Sun
efd85df6be feat: add schema check on postgres startup (#758)
* feat: add schema check on postgres startup

* chore: update pgwire to 0.6.3

* test: add test for unspecified db
2022-12-19 10:53:44 +08:00
Ning Sun
ea1896493b feat: allow multiple sql statements in query string (#699)
* feat: allow multiple sql statement in query string

* test: add a test for multiple statement call

* feat: add temprary workaround for standalone mode

* fix: resolve sql parser issue temporarily

* Update src/datanode/src/instance/sql.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* fix: adopt new sql handler

* refactor: revert changes in query engine

* refactor: assume sql-statement 1-1 on datanode

* test: use frontend for integration test

* refactor: add statement execution api for explicit single statement call

* fix: typo

* refactor: rename query method

* test: add test case for error

* test: data type change adoption

* chore: add todo from review

* chore: remove obsolete comments

* fix: resolve resolve issues

Co-authored-by: Yingwen <realevenyag@gmail.com>
2022-12-16 19:50:20 +08:00
Jiachun Feng
66bca11401 refactor: remove optional from the protos (#756) 2022-12-16 15:47:51 +08:00
Yingwen
7c16a4a17b refactor(storage): Move write_batch::codec to a separate file (#757)
* refactor(storage): Move write_batch::codec to a separate file

* chore: move new_test_batch to write_batch mod
2022-12-16 15:32:59 +08:00
dennis zhuang
28bd7404ad feat: change column's default property to nullable (#751)
* feat: change column's default property to nullable

* chore: use all instead of any

* fix: compile error

* fix: dependencies order in cargo
2022-12-16 11:17:01 +08:00
Lei, HUANG
0653301754 feat: replace arrow2 with official implementation 🎉 (#753)
* chore: kick off. change datafusion/arrow/parquet to target version

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* chore: replace one last datafusion dep

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* feat: arrow_array switch to arrow

* chore: update dep of binary vector

* chore: fix wrong merge commit

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* feat: Switch to datatypes2

* feat: Make recordbatch compile

* chore: sort Cargo.toml

* feat: Fix common::recordbatch compiler errors

* feat: Fix recordbatch test compiling issue

* fix: api crate (#708)

* fix: rename ConcreteDataType::timestamp_millis_type to ConcreteDataType::timestamp_millisecond_type. fix other warnings regarding timestamp

* fix: revert changes in datatypes2

* fix: helper

* chore: delete datatypes based on arrow2

* feat: Fix some compiler errors in common::query (#710)

* feat: Fix some compiler errors in common::query

* feat: test_collect use vectors api

* fix: common-query subcrate (#712)

* fix: record batch adapter

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix error enum

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: Fix common::query compiler errors (#713)

* feat: Move conversion to ScalarValue to value.rs

* fix: Fix common::query compiler errors

This commit also make InnerError pub(crate)

* feat: Implements diff accumulator using WrapperType (#715)

* feat: Remove usage of opaque error from common::recordbatch

* feat: Remove opaque error from common::query

* feat: Fix diff compiler errors

Now common_function just use common_query's Error and Result. Adds
a LargestType associated type to LogicalPrimitiveType to get the largest
type a logical primitive type can cast to.

* feat: Remove LargestType from NativeType trait

* chore: Update comments

* feat: Restrict Scalar::RefType of WrapperType to itself

Add trait bound `for<'a> Scalar<RefType<'a> = Self>` to WrapperType

* chore: Address CR comments

* chore: Format codes

* fix: fix compile error for mean/polyval/pow/interp ops

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Revert "fix: fix compile error for mean/polyval/pow/interp ops"

This reverts commit fb0b4eb826.

* fix: Fix compiler errors in argmax/rate/median/norm_cdf (#716)

* fix: Fix compiler errors in argmax/rate/median/norm_cdf

* chore: Address CR comments

* fix: fix compile error for mean/polyval/pow/interp ops (#717)

* fix: fix compile error for mean/polyval/pow/interp ops

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* simplify type bounds

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: fix argmin/percentile/clip/interp/scipy_stats_norm_pdf errors (#718)

fix: fix argmin/percentile/clip/interp/scipy_stats_norm_pdf compiler errors

* fix: fix other compile error in common-function (#719)

* further fixing

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix all compile errors in common function

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: Fix tests and clippy for common-function subcrate (#726)

* further fixing

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix all compile errors in common function

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* revert test changes

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: row group pruning (#725)

* fix: row group pruning

* chore: use macro to simplify stats implemetation

* fxi: CR comments

* fix: row group metadata length mismatch

* fix: simplify code

* fix: Fix common::grpc compiler errors (#722)

* fix: Fix common::grpc compiler errors

This commit refactors RecordBatch and holds vectors in the RecordBatch
struct, so we don't need to cast the array to vector when doing
serialization or iterating the batch.

Now we use the vector API instead of the arrow API in grpc crate.

* chore: Address CR comments

* fix common record batch

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: Fix compile error in server subcrate (#727)

* fix: Fix compile error in server subcrate

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unused type alias

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* explicitly panic

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/storage/src/sst/parquet.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>

* fix: Fix common grpc expr (#730)

* fix compile errors

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* rename fn names

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix styles

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix wranings in common-time

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: pre-cast to avoid tremendous match arms (#734)

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* feat: upgrade storage crate to arrow and parquet offcial impl (#738)

* fix: compile erros

* fix: parquet reader and writer

* fix: parquet reader and writer

* fix: WriteBatch IPC encode/decode

* fix: clippy errors in storage subcrate

* chore: remove suspicious unwrap

* fix: some cr comments

* fix: CR comments

* fix: CR comments

* fix: Fix compiler errors in catalog and mito crates (#742)

* fix: Fix compiler errors in mito

* fix: Fix compiler errors in catalog crate

* style: Fix clippy

* chore: Fix use

* Merge pull request #745

* fix nyc-taxi and util

* Merge branch 'replace-arrow2' into fix-others

* fix substrait

* fix warnings and error in test

* fix: Fix imports in optimizer.rs

* fix: errors in optimzer

* fix: remove unwrap

* fix: Fix compiler errors in query crate (#746)

* fix: Fix compiler errors in state.rs

* fix: fix compiler errors in state

* feat: upgrade sqlparser to 0.26

* fix: fix datafusion engine compiler errors

* fix: Fix some tests in query crate

* fix: Fix all warnings in tests

* feat: Remove `Type` from timestamp's type name

* fix: fix query tests

Now datafusion already supports median, so this commit also remove the
median function

* style: Fix clippy

* feat: Remove RecordBatch::pretty_print

* chore: Address CR comments

* Update src/query/src/query_engine/state.rs

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* fix: frontend compile errors (#747)

fix: fix compile errors in frontend

* fix: Fix compiler errors in script crate (#749)

* fix: Fix compiler errors in state.rs

* fix: fix compiler errors in state

* feat: upgrade sqlparser to 0.26

* fix: fix datafusion engine compiler errors

* fix: Fix some tests in query crate

* fix: Fix all warnings in tests

* feat: Remove `Type` from timestamp's type name

* fix: fix query tests

Now datafusion already supports median, so this commit also remove the
median function

* style: Fix clippy

* feat: Remove RecordBatch::pretty_print

* chore: Address CR comments

* feat: Add column_by_name to RecordBatch

* feat: modify select_from_rb

* feat: Fix some compiler errors in vector.rs

* feat: Fix more compiler errors in vector.rs

* fix: fix table.rs

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: Fix compiler errors in coprocessor

* fix: Fix some compiler errors

* fix: Fix compiler errors in script

* chore: Remove unused imports and format code

* test: disable interval tests

* test: Fix test_compile_execute test

* style: Fix clippy

* feat: Support interval

* feat: Add RecordBatch::columns and fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* fix: Fix All The Tests! (#752)

* fix: Fix several tests compile errors

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: some compile errors in tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: compile errors in frontend tests

* fix: compile errors in frontend tests

* test: Fix tests in api and common-query

* test: Fix test in sql crate

* fix: resolve substrait error

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* chore: add more test

* test: Fix tests in servers

* fix instance_test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* test: Fix tests in tests-integration

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Lei, HUANG <mrsatangel@gmail.com>
Co-authored-by: evenyag <realevenyag@gmail.com>

* fix: clippy errors

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: evenyag <realevenyag@gmail.com>
2022-12-15 18:49:12 +08:00
224 changed files with 2986 additions and 1864 deletions

View File

@@ -24,7 +24,7 @@ on:
name: Code coverage
env:
RUST_TOOLCHAIN: nightly-2022-07-14
RUST_TOOLCHAIN: nightly-2022-12-20
jobs:
coverage:
@@ -34,6 +34,11 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: arduino/setup-protoc@v1
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: KyleMayes/install-llvm-action@v1
with:
version: "14.0"
- name: Install toolchain
uses: dtolnay/rust-toolchain@master
with:
@@ -48,6 +53,7 @@ jobs:
- name: Collect coverage data
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info
env:
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=lld"
RUST_BACKTRACE: 1
CARGO_INCREMENTAL: 0
GT_S3_BUCKET: ${{ secrets.S3_BUCKET }}

View File

@@ -23,7 +23,7 @@ on:
name: CI
env:
RUST_TOOLCHAIN: nightly-2022-07-14
RUST_TOOLCHAIN: nightly-2022-12-20
jobs:
typos:
@@ -41,6 +41,8 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: arduino/setup-protoc@v1
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
@@ -81,6 +83,8 @@ jobs:
# path: ./llvm
# key: llvm
# - uses: arduino/setup-protoc@v1
# with:
# repo-token: ${{ secrets.GITHUB_TOKEN }}
# - uses: KyleMayes/install-llvm-action@v1
# with:
# version: "14.0"
@@ -114,6 +118,8 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: arduino/setup-protoc@v1
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
@@ -131,6 +137,8 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: arduino/setup-protoc@v1
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}

View File

@@ -10,7 +10,7 @@ on:
name: Release
env:
RUST_TOOLCHAIN: nightly-2022-07-14
RUST_TOOLCHAIN: nightly-2022-12-20
# FIXME(zyy17): Would be better to use `gh release list -L 1 | cut -f 3` to get the latest release version tag, but for a long time, we will stay at 'v0.1.0-alpha-*'.
SCHEDULED_BUILD_VERSION_PREFIX: v0.1.0-alpha

534
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -39,5 +39,23 @@ members = [
"tests/runner",
]
[workspace.package]
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
[workspace.dependencies]
arrow = "29.0"
arrow-schema = { version = "29.0", features = ["serde"] }
# TODO(LFC): Use released Datafusion when it officially dpendent on Arrow 29.0
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "4917235a398ae20145c87d20984e6367dc1a0c1e" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "4917235a398ae20145c87d20984e6367dc1a0c1e" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "4917235a398ae20145c87d20984e6367dc1a0c1e" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "4917235a398ae20145c87d20984e6367dc1a0c1e" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "4917235a398ae20145c87d20984e6367dc1a0c1e" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "4917235a398ae20145c87d20984e6367dc1a0c1e" }
parquet = "29.0"
sqlparser = "0.28"
[profile.release]
debug = true

View File

@@ -1,14 +1,14 @@
[package]
name = "benchmarks"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
arrow = "26.0.0"
arrow.workspace = true
clap = { version = "4.0", features = ["derive"] }
client = { path = "../src/client" }
indicatif = "0.17.1"
itertools = "0.10.5"
parquet = "26.0.0"
parquet.workspace = true
tokio = { version = "1.21", features = ["full"] }

View File

@@ -15,7 +15,6 @@
//! Use the taxi trip records from New York City dataset to bench. You can download the dataset from
//! [here](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page).
#![feature(once_cell)]
#![allow(clippy::print_stdout)]
use std::collections::HashMap;
@@ -28,7 +27,7 @@ use arrow::record_batch::RecordBatch;
use clap::Parser;
use client::admin::Admin;
use client::api::v1::column::Values;
use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateExpr, InsertExpr};
use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertExpr, TableId};
use client::{Client, Database, Select};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
@@ -94,7 +93,7 @@ async fn write_data(
.unwrap();
let progress_bar = mpb.add(ProgressBar::new(row_num as _));
progress_bar.set_style(pb_style);
progress_bar.set_message(format!("{:?}", path));
progress_bar.set_message(format!("{path:?}"));
let mut total_rpc_elapsed_ms = 0;
@@ -115,10 +114,7 @@ async fn write_data(
progress_bar.inc(row_count as _);
}
progress_bar.finish_with_message(format!(
"file {:?} done in {}ms",
path, total_rpc_elapsed_ms
));
progress_bar.finish_with_message(format!("file {path:?} done in {total_rpc_elapsed_ms}ms",));
total_rpc_elapsed_ms
}
@@ -219,126 +215,126 @@ fn build_values(column: &ArrayRef) -> Values {
}
}
fn create_table_expr() -> CreateExpr {
CreateExpr {
catalog_name: Some(CATALOG_NAME.to_string()),
schema_name: Some(SCHEMA_NAME.to_string()),
fn create_table_expr() -> CreateTableExpr {
CreateTableExpr {
catalog_name: CATALOG_NAME.to_string(),
schema_name: SCHEMA_NAME.to_string(),
table_name: TABLE_NAME.to_string(),
desc: None,
desc: "".to_string(),
column_defs: vec![
ColumnDef {
name: "VendorID".to_string(),
datatype: ColumnDataType::Int64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "tpep_pickup_datetime".to_string(),
datatype: ColumnDataType::Int64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "tpep_dropoff_datetime".to_string(),
datatype: ColumnDataType::Int64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "passenger_count".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "trip_distance".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "RatecodeID".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "store_and_fwd_flag".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "PULocationID".to_string(),
datatype: ColumnDataType::Int64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "DOLocationID".to_string(),
datatype: ColumnDataType::Int64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "payment_type".to_string(),
datatype: ColumnDataType::Int64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "fare_amount".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "extra".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "mta_tax".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "tip_amount".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "tolls_amount".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "improvement_surcharge".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "total_amount".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "congestion_surcharge".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "airport_fee".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
],
time_index: "tpep_pickup_datetime".to_string(),
@@ -346,7 +342,7 @@ fn create_table_expr() -> CreateExpr {
create_if_not_exists: false,
table_options: Default::default(),
region_ids: vec![0],
table_id: Some(0),
table_id: Some(TableId { id: 0 }),
}
}
@@ -355,12 +351,12 @@ fn query_set() -> HashMap<String, String> {
ret.insert(
"count_all".to_string(),
format!("SELECT COUNT(*) FROM {};", TABLE_NAME),
format!("SELECT COUNT(*) FROM {TABLE_NAME};"),
);
ret.insert(
"fare_amt_by_passenger".to_string(),
format!("SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), SUM(fare_amount) FROM {} GROUP BY passenger_count",TABLE_NAME)
format!("SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), SUM(fare_amount) FROM {TABLE_NAME} GROUP BY passenger_count")
);
ret
@@ -373,7 +369,7 @@ async fn do_write(args: &Args, client: &Client) {
let mut write_jobs = JoinSet::new();
let create_table_result = admin.create(create_table_expr()).await;
println!("Create table result: {:?}", create_table_result);
println!("Create table result: {create_table_result:?}");
let progress_bar_style = ProgressStyle::with_template(
"[{elapsed_precise}] {bar:60.cyan/blue} {pos:>7}/{len:7} {msg}",
@@ -406,7 +402,7 @@ async fn do_write(args: &Args, client: &Client) {
async fn do_query(num_iter: usize, db: &Database) {
for (query_name, query) in query_set() {
println!("Running query: {}", query);
println!("Running query: {query}");
for i in 0..num_iter {
let now = Instant::now();
let _res = db.select(Select::Sql(query.clone())).await.unwrap();

View File

@@ -1 +1 @@
nightly-2022-07-14
nightly-2022-12-20

View File

@@ -1,9 +1,8 @@
[package]
name = "api"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
common-base = { path = "../common/base" }

View File

@@ -17,7 +17,7 @@ message AdminResponse {
message AdminExpr {
ExprHeader header = 1;
oneof expr {
CreateExpr create = 2;
CreateTableExpr create_table = 2;
AlterExpr alter = 3;
CreateDatabaseExpr create_database = 4;
DropTableExpr drop_table = 5;
@@ -31,24 +31,23 @@ message AdminResult {
}
}
// TODO(hl): rename to CreateTableExpr
message CreateExpr {
optional string catalog_name = 1;
optional string schema_name = 2;
message CreateTableExpr {
string catalog_name = 1;
string schema_name = 2;
string table_name = 3;
optional string desc = 4;
string desc = 4;
repeated ColumnDef column_defs = 5;
string time_index = 6;
repeated string primary_keys = 7;
bool create_if_not_exists = 8;
map<string, string> table_options = 9;
optional uint32 table_id = 10;
TableId table_id = 10;
repeated uint32 region_ids = 11;
}
message AlterExpr {
optional string catalog_name = 1;
optional string schema_name = 2;
string catalog_name = 1;
string schema_name = 2;
string table_name = 3;
oneof kind {
AddColumns add_columns = 4;
@@ -62,6 +61,11 @@ message DropTableExpr {
string table_name = 3;
}
message CreateDatabaseExpr {
//TODO(hl): maybe rename to schema_name?
string database_name = 1;
}
message AddColumns {
repeated AddColumn add_columns = 1;
}
@@ -79,7 +83,6 @@ message DropColumn {
string name = 1;
}
message CreateDatabaseExpr {
//TODO(hl): maybe rename to schema_name?
string database_name = 1;
message TableId {
uint32 id = 1;
}

View File

@@ -59,7 +59,7 @@ message ColumnDef {
string name = 1;
ColumnDataType datatype = 2;
bool is_nullable = 3;
optional bytes default_constraint = 4;
bytes default_constraint = 4;
}
enum ColumnDataType {

View File

@@ -23,12 +23,13 @@ impl ColumnDef {
pub fn try_as_column_schema(&self) -> Result<ColumnSchema> {
let data_type = ColumnDataTypeWrapper::try_new(self.datatype)?;
let constraint = match &self.default_constraint {
None => None,
Some(v) => Some(
ColumnDefaultConstraint::try_from(&v[..])
let constraint = if self.default_constraint.is_empty() {
None
} else {
Some(
ColumnDefaultConstraint::try_from(self.default_constraint.as_slice())
.context(error::ConvertColumnDefaultConstraintSnafu { column: &self.name })?,
),
)
};
ColumnSchema::new(&self.name, data_type.into(), self.is_nullable)

View File

@@ -1,9 +1,8 @@
[package]
name = "catalog"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
api = { path = "../api" }
@@ -19,7 +18,7 @@ common-recordbatch = { path = "../common/recordbatch" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datafusion = "14.0.0"
datafusion.workspace = true
datatypes = { path = "../datatypes" }
futures = "0.3"
futures-util = "0.3"

View File

@@ -33,48 +33,38 @@ const ALPHANUMERICS_NAME_PATTERN: &str = "[a-zA-Z_][a-zA-Z0-9_]*";
lazy_static! {
static ref CATALOG_KEY_PATTERN: Regex = Regex::new(&format!(
"^{}-({})$",
CATALOG_KEY_PREFIX, ALPHANUMERICS_NAME_PATTERN
"^{CATALOG_KEY_PREFIX}-({ALPHANUMERICS_NAME_PATTERN})$"
))
.unwrap();
}
lazy_static! {
static ref SCHEMA_KEY_PATTERN: Regex = Regex::new(&format!(
"^{}-({})-({})$",
SCHEMA_KEY_PREFIX, ALPHANUMERICS_NAME_PATTERN, ALPHANUMERICS_NAME_PATTERN
"^{SCHEMA_KEY_PREFIX}-({ALPHANUMERICS_NAME_PATTERN})-({ALPHANUMERICS_NAME_PATTERN})$"
))
.unwrap();
}
lazy_static! {
static ref TABLE_GLOBAL_KEY_PATTERN: Regex = Regex::new(&format!(
"^{}-({})-({})-({})$",
TABLE_GLOBAL_KEY_PREFIX,
ALPHANUMERICS_NAME_PATTERN,
ALPHANUMERICS_NAME_PATTERN,
ALPHANUMERICS_NAME_PATTERN
"^{TABLE_GLOBAL_KEY_PREFIX}-({ALPHANUMERICS_NAME_PATTERN})-({ALPHANUMERICS_NAME_PATTERN})-({ALPHANUMERICS_NAME_PATTERN})$"
))
.unwrap();
}
lazy_static! {
static ref TABLE_REGIONAL_KEY_PATTERN: Regex = Regex::new(&format!(
"^{}-({})-({})-({})-([0-9]+)$",
TABLE_REGIONAL_KEY_PREFIX,
ALPHANUMERICS_NAME_PATTERN,
ALPHANUMERICS_NAME_PATTERN,
ALPHANUMERICS_NAME_PATTERN
"^{TABLE_REGIONAL_KEY_PREFIX}-({ALPHANUMERICS_NAME_PATTERN})-({ALPHANUMERICS_NAME_PATTERN})-({ALPHANUMERICS_NAME_PATTERN})-([0-9]+)$"
))
.unwrap();
}
pub fn build_catalog_prefix() -> String {
format!("{}-", CATALOG_KEY_PREFIX)
format!("{CATALOG_KEY_PREFIX}-")
}
pub fn build_schema_prefix(catalog_name: impl AsRef<str>) -> String {
format!("{}-{}-", SCHEMA_KEY_PREFIX, catalog_name.as_ref())
format!("{SCHEMA_KEY_PREFIX}-{}-", catalog_name.as_ref())
}
pub fn build_table_global_prefix(
@@ -82,8 +72,7 @@ pub fn build_table_global_prefix(
schema_name: impl AsRef<str>,
) -> String {
format!(
"{}-{}-{}-",
TABLE_GLOBAL_KEY_PREFIX,
"{TABLE_GLOBAL_KEY_PREFIX}-{}-{}-",
catalog_name.as_ref(),
schema_name.as_ref()
)
@@ -378,7 +367,7 @@ mod tests {
table_info,
};
let serialized = serde_json::to_string(&value).unwrap();
let deserialized = TableGlobalValue::parse(&serialized).unwrap();
let deserialized = TableGlobalValue::parse(serialized).unwrap();
assert_eq!(value, deserialized);
}
}

View File

@@ -157,7 +157,7 @@ pub struct RegisterSchemaRequest {
/// Formats table fully-qualified name
pub fn format_full_table_name(catalog: &str, schema: &str, table: &str) -> String {
format!("{}.{}.{}", catalog, schema, table)
format!("{catalog}.{schema}.{table}")
}
pub trait CatalogProviderFactory {
@@ -187,8 +187,7 @@ pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>(
.await
.with_context(|_| CreateTableSnafu {
table_info: format!(
"{}.{}.{}, id: {}",
catalog_name, schema_name, table_name, table_id,
"{catalog_name}.{schema_name}.{table_name}, id: {table_id}",
),
})?;
manager
@@ -200,7 +199,7 @@ pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>(
table: table.clone(),
})
.await?;
info!("Created and registered system table: {}", table_name);
info!("Created and registered system table: {table_name}");
table
};
if let Some(hook) = req.open_hook {

View File

@@ -338,7 +338,7 @@ impl CatalogManager for LocalCatalogManager {
let schema = catalog
.schema(schema_name)?
.with_context(|| SchemaNotFoundSnafu {
schema_info: format!("{}.{}", catalog_name, schema_name),
schema_info: format!("{catalog_name}.{schema_name}"),
})?;
{
@@ -452,7 +452,7 @@ impl CatalogManager for LocalCatalogManager {
let schema = catalog
.schema(schema_name)?
.with_context(|| SchemaNotFoundSnafu {
schema_info: format!("{}.{}", catalog_name, schema_name),
schema_info: format!("{catalog_name}.{schema_name}"),
})?;
schema.table(table_name)
}

View File

@@ -331,10 +331,7 @@ impl RemoteCatalogManager {
.open_table(&context, request)
.await
.with_context(|_| OpenTableSnafu {
table_info: format!(
"{}.{}.{}, id:{}",
catalog_name, schema_name, table_name, table_id
),
table_info: format!("{catalog_name}.{schema_name}.{table_name}, id:{table_id}"),
})? {
Some(table) => {
info!(
@@ -355,7 +352,7 @@ impl RemoteCatalogManager {
.clone()
.try_into()
.context(InvalidTableSchemaSnafu {
table_info: format!("{}.{}.{}", catalog_name, schema_name, table_name,),
table_info: format!("{catalog_name}.{schema_name}.{table_name}"),
schema: meta.schema.clone(),
})?;
let req = CreateTableRequest {
@@ -477,7 +474,7 @@ impl CatalogManager for RemoteCatalogManager {
let schema = catalog
.schema(schema_name)?
.with_context(|| SchemaNotFoundSnafu {
schema_info: format!("{}.{}", catalog_name, schema_name),
schema_info: format!("{catalog_name}.{schema_name}"),
})?;
schema.table(table_name)
}

View File

@@ -61,7 +61,7 @@ impl Table for SystemCatalogTable {
async fn scan(
&self,
_projection: &Option<Vec<usize>>,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> table::Result<PhysicalPlanRef> {
@@ -129,7 +129,7 @@ impl SystemCatalogTable {
let ctx = SessionContext::new();
let scan = self
.table
.scan(&full_projection, &[], None)
.scan(full_projection, &[], None)
.await
.context(error::SystemCatalogTableScanSnafu)?;
let stream = scan
@@ -197,7 +197,7 @@ pub fn build_table_insert_request(full_table_name: String, table_id: TableId) ->
}
pub fn build_schema_insert_request(catalog_name: String, schema_name: String) -> InsertRequest {
let full_schema_name = format!("{}.{}", catalog_name, schema_name);
let full_schema_name = format!("{catalog_name}.{schema_name}");
build_insert_request(
EntryType::Schema,
full_schema_name.as_bytes(),
@@ -390,7 +390,7 @@ mod tests {
if let Entry::Catalog(e) = entry {
assert_eq!("some_catalog", e.catalog_name);
} else {
panic!("Unexpected type: {:?}", entry);
panic!("Unexpected type: {entry:?}");
}
}
@@ -407,7 +407,7 @@ mod tests {
assert_eq!("some_catalog", e.catalog_name);
assert_eq!("some_schema", e.schema_name);
} else {
panic!("Unexpected type: {:?}", entry);
panic!("Unexpected type: {entry:?}");
}
}
@@ -426,7 +426,7 @@ mod tests {
assert_eq!("some_table", e.table_name);
assert_eq!(42, e.table_id);
} else {
panic!("Unexpected type: {:?}", entry);
panic!("Unexpected type: {entry:?}");
}
}

View File

@@ -77,7 +77,7 @@ impl Table for Tables {
async fn scan(
&self,
_projection: &Option<Vec<usize>>,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> table::error::Result<PhysicalPlanRef> {
@@ -370,7 +370,7 @@ mod tests {
.unwrap();
let tables = Tables::new(catalog_list, "test_engine".to_string());
let tables_stream = tables.scan(&None, &[], None).await.unwrap();
let tables_stream = tables.scan(None, &[], None).await.unwrap();
let session_ctx = SessionContext::new();
let mut tables_stream = tables_stream.execute(0, session_ctx.task_ctx()).unwrap();

View File

@@ -69,8 +69,7 @@ mod tests {
assert!(
err.to_string()
.contains("Table `greptime.public.test_table` already exists"),
"Actual error message: {}",
err
"Actual error message: {err}",
);
}

View File

@@ -189,10 +189,10 @@ impl TableEngine for MockTableEngine {
unimplemented!()
}
fn get_table<'a>(
fn get_table(
&self,
_ctx: &EngineContext,
table_ref: &'a TableReference,
table_ref: &TableReference,
) -> table::Result<Option<TableRef>> {
futures::executor::block_on(async {
Ok(self
@@ -204,7 +204,7 @@ impl TableEngine for MockTableEngine {
})
}
fn table_exists<'a>(&self, _ctx: &EngineContext, table_ref: &'a TableReference) -> bool {
fn table_exists(&self, _ctx: &EngineContext, table_ref: &TableReference) -> bool {
futures::executor::block_on(async {
self.tables
.read()

View File

@@ -1,9 +1,8 @@
[package]
name = "client"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
api = { path = "../api" }
@@ -15,7 +14,7 @@ common-grpc-expr = { path = "../common/grpc-expr" }
common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-time = { path = "../common/time" }
datafusion = "14.0.0"
datafusion.workspace = true
datatypes = { path = "../datatypes" }
enum_dispatch = "0.3"
parking_lot = "0.12"

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::{ColumnDataType, ColumnDef, CreateExpr};
use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, TableId};
use client::admin::Admin;
use client::{Client, Database};
use prost_09::Message;
@@ -33,36 +33,36 @@ fn main() {
async fn run() {
let client = Client::with_urls(vec!["127.0.0.1:3001"]);
let create_table_expr = CreateExpr {
catalog_name: Some("greptime".to_string()),
schema_name: Some("public".to_string()),
let create_table_expr = CreateTableExpr {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: "test_logical_dist_exec".to_string(),
desc: None,
desc: "".to_string(),
column_defs: vec![
ColumnDef {
name: "timestamp".to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
is_nullable: false,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "key".to_string(),
datatype: ColumnDataType::Uint64 as i32,
is_nullable: false,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "value".to_string(),
datatype: ColumnDataType::Uint64 as i32,
is_nullable: false,
default_constraint: None,
default_constraint: vec![],
},
],
time_index: "timestamp".to_string(),
primary_keys: vec!["key".to_string()],
create_if_not_exists: false,
table_options: Default::default(),
table_id: Some(1024),
table_id: Some(TableId { id: 1024 }),
region_ids: vec![0],
};

View File

@@ -34,13 +34,13 @@ impl Admin {
}
}
pub async fn create(&self, expr: CreateExpr) -> Result<AdminResult> {
pub async fn create(&self, expr: CreateTableExpr) -> Result<AdminResult> {
let header = ExprHeader {
version: PROTOCOL_VERSION,
};
let expr = AdminExpr {
header: Some(header),
expr: Some(admin_expr::Expr::Create(expr)),
expr: Some(admin_expr::Expr::CreateTable(expr)),
};
self.do_request(expr).await
}

View File

@@ -1,9 +1,9 @@
[package]
name = "cmd"
version = "0.1.0"
edition = "2021"
version.workspace = true
edition.workspace = true
license.workspace = true
default-run = "greptime"
license = "Apache-2.0"
[[bin]]
name = "greptime"

View File

@@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use anymap::AnyMap;
use std::sync::Arc;
use clap::Parser;
use frontend::frontend::{Frontend, FrontendOptions};
use frontend::grpc::GrpcOptions;
@@ -21,6 +22,7 @@ use frontend::instance::Instance;
use frontend::mysql::MysqlOptions;
use frontend::opentsdb::OpentsdbOptions;
use frontend::postgres::PostgresOptions;
use frontend::Plugins;
use meta_client::MetaClientOpts;
use servers::auth::UserProviderRef;
use servers::http::HttpOptions;
@@ -86,21 +88,21 @@ pub struct StartCommand {
impl StartCommand {
async fn run(self) -> Result<()> {
let plugins = load_frontend_plugins(&self.user_provider)?;
let plugins = Arc::new(load_frontend_plugins(&self.user_provider)?);
let opts: FrontendOptions = self.try_into()?;
let mut frontend = Frontend::new(
opts.clone(),
Instance::try_new_distributed(&opts)
.await
.context(error::StartFrontendSnafu)?,
plugins,
);
let mut instance = Instance::try_new_distributed(&opts)
.await
.context(error::StartFrontendSnafu)?;
instance.set_plugins(plugins.clone());
let mut frontend = Frontend::new(opts, instance, plugins);
frontend.start().await.context(error::StartFrontendSnafu)
}
}
pub fn load_frontend_plugins(user_provider: &Option<String>) -> Result<AnyMap> {
let mut plugins = AnyMap::new();
pub fn load_frontend_plugins(user_provider: &Option<String>) -> Result<Plugins> {
let mut plugins = Plugins::new();
if let Some(provider) = user_provider {
let provider = auth::user_provider_from_option(provider).context(IllegalAuthConfigSnafu)?;

View File

@@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use anymap::AnyMap;
use std::sync::Arc;
use clap::Parser;
use common_telemetry::info;
use datanode::datanode::{Datanode, DatanodeOptions, ObjectStoreConfig};
@@ -25,6 +26,7 @@ use frontend::mysql::MysqlOptions;
use frontend::opentsdb::OpentsdbOptions;
use frontend::postgres::PostgresOptions;
use frontend::prometheus::PrometheusOptions;
use frontend::Plugins;
use serde::{Deserialize, Serialize};
use servers::http::HttpOptions;
use servers::tls::{TlsMode, TlsOption};
@@ -150,7 +152,7 @@ impl StartCommand {
async fn run(self) -> Result<()> {
let enable_memory_catalog = self.enable_memory_catalog;
let config_file = self.config_file.clone();
let plugins = load_frontend_plugins(&self.user_provider)?;
let plugins = Arc::new(load_frontend_plugins(&self.user_provider)?);
let fe_opts = FrontendOptions::try_from(self)?;
let dn_opts: DatanodeOptions = {
let mut opts: StandaloneOptions = if let Some(path) = config_file {
@@ -187,11 +189,12 @@ impl StartCommand {
/// Build frontend instance in standalone mode
async fn build_frontend(
fe_opts: FrontendOptions,
plugins: AnyMap,
plugins: Arc<Plugins>,
datanode_instance: InstanceRef,
) -> Result<Frontend<FeInstance>> {
let mut frontend_instance = FeInstance::new_standalone(datanode_instance.clone());
frontend_instance.set_script_handler(datanode_instance);
frontend_instance.set_plugins(plugins.clone());
Ok(Frontend::new(fe_opts, frontend_instance, plugins))
}
@@ -221,8 +224,7 @@ impl TryFrom<StartCommand> for FrontendOptions {
if addr == datanode_grpc_addr {
return IllegalConfigSnafu {
msg: format!(
"gRPC listen address conflicts with datanode reserved gRPC addr: {}",
datanode_grpc_addr
"gRPC listen address conflicts with datanode reserved gRPC addr: {datanode_grpc_addr}",
),
}
.fail();

View File

@@ -1,8 +1,8 @@
[package]
name = "common-base"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
bitvec = "1.0"

View File

@@ -1,8 +1,8 @@
[package]
name = "common-catalog"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
async-trait = "0.1"

View File

@@ -1,8 +1,8 @@
[package]
name = "common-error"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
snafu = { version = "0.7", features = ["backtraces"] }

View File

@@ -131,7 +131,7 @@ mod tests {
assert!(ErrorCompat::backtrace(&err).is_some());
let msg = format!("{:?}", err);
let msg = format!("{err:?}");
assert!(msg.contains("\nBacktrace:\n"));
let fmt_msg = format!("{:?}", DebugFormat::new(&err));
assert_eq!(msg, fmt_msg);
@@ -151,7 +151,7 @@ mod tests {
assert!(err.as_any().downcast_ref::<MockError>().is_some());
assert!(err.source().is_some());
let msg = format!("{:?}", err);
let msg = format!("{err:?}");
assert!(msg.contains("\nBacktrace:\n"));
assert!(msg.contains("Caused by"));

View File

@@ -31,11 +31,11 @@ impl<'a, E: ErrorExt + ?Sized> fmt::Debug for DebugFormat<'a, E> {
write!(f, "{}.", self.0)?;
if let Some(source) = self.0.source() {
// Source error use debug format for more verbose info.
write!(f, " Caused by: {:?}", source)?;
write!(f, " Caused by: {source:?}")?;
}
if let Some(backtrace) = self.0.backtrace_opt() {
// Add a newline to separate causes and backtrace.
write!(f, "\nBacktrace:\n{}", backtrace)?;
write!(f, "\nBacktrace:\n{backtrace}")?;
}
Ok(())

View File

@@ -51,6 +51,7 @@ pub enum StatusCode {
TableNotFound = 4001,
TableColumnNotFound = 4002,
TableColumnExists = 4003,
DatabaseNotFound = 4004,
// ====== End of catalog related status code =======
// ====== Begin of storage related status code =====
@@ -86,7 +87,7 @@ impl StatusCode {
impl fmt::Display for StatusCode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// The current debug format is suitable to display.
write!(f, "{:?}", self)
write!(f, "{self:?}")
}
}
@@ -95,7 +96,7 @@ mod tests {
use super::*;
fn assert_status_code_display(code: StatusCode, msg: &str) {
let code_msg = format!("{}", code);
let code_msg = format!("{code}");
assert_eq!(msg, code_msg);
}

View File

@@ -1,8 +1,8 @@
[package]
name = "common-function-macro"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
version.workspace = true
edition.workspace = true
license.workspace = true
[lib]
proc-macro = true

View File

@@ -1,8 +1,8 @@
[package]
edition = "2021"
name = "common-function"
version = "0.1.0"
license = "Apache-2.0"
edition.workspace = true
version.workspace = true
license.workspace = true
[dependencies]
arc-swap = "1.0"
@@ -11,7 +11,7 @@ common-error = { path = "../error" }
common-function-macro = { path = "../function-macro" }
common-query = { path = "../query" }
common-time = { path = "../time" }
datafusion-common = "14.0.0"
datafusion.workspace = true
datatypes = { path = "../../datatypes" }
libc = "0.2"
num = "0.4"

View File

@@ -343,7 +343,7 @@ mod tests {
Arc::new(Int64Vector::from_vec(fp.clone())),
];
let vector = interp(&args).unwrap();
assert!(matches!(vector.get(0), Value::Float64(v) if v==x[0] as f64));
assert!(matches!(vector.get(0), Value::Float64(v) if v == x[0]));
// x=None output:Null
let input = vec![None, Some(0.0), Some(0.3)];

View File

@@ -127,12 +127,7 @@ mod tests {
assert_eq!(4, vec.len());
for i in 0..4 {
assert_eq!(
i == 0 || i == 3,
vec.get_data(i).unwrap(),
"failed at {}",
i
)
assert_eq!(i == 0 || i == 3, vec.get_data(i).unwrap(), "Failed at {i}",)
}
}
_ => unreachable!(),

View File

@@ -1,8 +1,8 @@
[package]
name = "common-grpc-expr"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
api = { path = "../../api" }

View File

@@ -15,7 +15,7 @@
use std::sync::Arc;
use api::v1::alter_expr::Kind;
use api::v1::{AlterExpr, CreateExpr, DropColumns};
use api::v1::{AlterExpr, CreateTableExpr, DropColumns};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
use snafu::{ensure, OptionExt, ResultExt};
@@ -29,6 +29,16 @@ use crate::error::{
/// Convert an [`AlterExpr`] to an optional [`AlterTableRequest`]
pub fn alter_expr_to_request(expr: AlterExpr) -> Result<Option<AlterTableRequest>> {
let catalog_name = if expr.catalog_name.is_empty() {
None
} else {
Some(expr.catalog_name)
};
let schema_name = if expr.schema_name.is_empty() {
None
} else {
Some(expr.schema_name)
};
match expr.kind {
Some(Kind::AddColumns(add_columns)) => {
let add_column_requests = add_columns
@@ -57,8 +67,8 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<Option<AlterTableRequest
};
let request = AlterTableRequest {
catalog_name: expr.catalog_name,
schema_name: expr.schema_name,
catalog_name,
schema_name,
table_name: expr.table_name,
alter_kind,
};
@@ -70,8 +80,8 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<Option<AlterTableRequest
};
let request = AlterTableRequest {
catalog_name: expr.catalog_name,
schema_name: expr.schema_name,
catalog_name,
schema_name,
table_name: expr.table_name,
alter_kind,
};
@@ -81,7 +91,7 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<Option<AlterTableRequest
}
}
pub fn create_table_schema(expr: &CreateExpr) -> Result<SchemaRef> {
pub fn create_table_schema(expr: &CreateTableExpr) -> Result<SchemaRef> {
let column_schemas = expr
.column_defs
.iter()
@@ -96,7 +106,7 @@ pub fn create_table_schema(expr: &CreateExpr) -> Result<SchemaRef> {
.iter()
.any(|column| column.name == expr.time_index),
MissingTimestampColumnSnafu {
msg: format!("CreateExpr: {:?}", expr)
msg: format!("CreateExpr: {expr:?}")
}
);
@@ -119,7 +129,10 @@ pub fn create_table_schema(expr: &CreateExpr) -> Result<SchemaRef> {
))
}
pub fn create_expr_to_request(table_id: TableId, expr: CreateExpr) -> Result<CreateTableRequest> {
pub fn create_expr_to_request(
table_id: TableId,
expr: CreateTableExpr,
) -> Result<CreateTableRequest> {
let schema = create_table_schema(&expr)?;
let primary_key_indices = expr
.primary_keys
@@ -134,12 +147,19 @@ pub fn create_expr_to_request(table_id: TableId, expr: CreateExpr) -> Result<Cre
})
.collect::<Result<Vec<usize>>>()?;
let catalog_name = expr
.catalog_name
.unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string());
let schema_name = expr
.schema_name
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());
let mut catalog_name = expr.catalog_name;
if catalog_name.is_empty() {
catalog_name = DEFAULT_CATALOG_NAME.to_string();
}
let mut schema_name = expr.schema_name;
if schema_name.is_empty() {
schema_name = DEFAULT_SCHEMA_NAME.to_string();
}
let desc = if expr.desc.is_empty() {
None
} else {
Some(expr.desc)
};
let region_ids = if expr.region_ids.is_empty() {
vec![0]
@@ -152,7 +172,7 @@ pub fn create_expr_to_request(table_id: TableId, expr: CreateExpr) -> Result<Cre
catalog_name,
schema_name,
table_name: expr.table_name,
desc: expr.desc,
desc,
schema,
region_numbers: region_ids,
primary_key_indices,
@@ -171,8 +191,8 @@ mod tests {
#[test]
fn test_alter_expr_to_request() {
let expr = AlterExpr {
catalog_name: None,
schema_name: None,
catalog_name: "".to_string(),
schema_name: "".to_string(),
table_name: "monitor".to_string(),
kind: Some(Kind::AddColumns(AddColumns {
@@ -181,7 +201,7 @@ mod tests {
name: "mem_usage".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: false,
default_constraint: None,
default_constraint: vec![],
}),
is_key: false,
}],
@@ -208,8 +228,8 @@ mod tests {
#[test]
fn test_drop_column_expr() {
let expr = AlterExpr {
catalog_name: Some("test_catalog".to_string()),
schema_name: Some("test_schema".to_string()),
catalog_name: "test_catalog".to_string(),
schema_name: "test_schema".to_string(),
table_name: "monitor".to_string(),
kind: Some(Kind::DropColumns(DropColumns {

View File

@@ -18,7 +18,7 @@ use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::column::{SemanticType, Values};
use api::v1::{AddColumn, AddColumns, Column, ColumnDataType, ColumnDef, CreateExpr};
use api::v1::{AddColumn, AddColumns, Column, ColumnDataType, ColumnDef, CreateTableExpr};
use common_base::BitVec;
use common_time::timestamp::Timestamp;
use common_time::{Date, DateTime};
@@ -45,7 +45,7 @@ fn build_column_def(column_name: &str, datatype: i32, nullable: bool) -> ColumnD
name: column_name.to_string(),
datatype,
is_nullable: nullable,
default_constraint: None,
default_constraint: vec![],
}
}
@@ -154,7 +154,7 @@ fn collect_column_values(column_datatype: ColumnDataType, values: &Values) -> Ve
collect_values!(values.i32_values, |v| ValueRef::from(*v))
}
ColumnDataType::Int64 => {
collect_values!(values.i64_values, |v| ValueRef::from(*v as i64))
collect_values!(values.i64_values, |v| ValueRef::from(*v))
}
ColumnDataType::Uint8 => {
collect_values!(values.u8_values, |v| ValueRef::from(*v as u8))
@@ -166,7 +166,7 @@ fn collect_column_values(column_datatype: ColumnDataType, values: &Values) -> Ve
collect_values!(values.u32_values, |v| ValueRef::from(*v))
}
ColumnDataType::Uint64 => {
collect_values!(values.u64_values, |v| ValueRef::from(*v as u64))
collect_values!(values.u64_values, |v| ValueRef::from(*v))
}
ColumnDataType::Float32 => collect_values!(values.f32_values, |v| ValueRef::from(*v)),
ColumnDataType::Float64 => collect_values!(values.f64_values, |v| ValueRef::from(*v)),
@@ -214,7 +214,7 @@ pub fn build_create_expr_from_insertion(
table_id: Option<TableId>,
table_name: &str,
columns: &[Column],
) -> Result<CreateExpr> {
) -> Result<CreateTableExpr> {
let mut new_columns: HashSet<String> = HashSet::default();
let mut column_defs = Vec::default();
let mut primary_key_indices = Vec::default();
@@ -263,17 +263,17 @@ pub fn build_create_expr_from_insertion(
.map(|idx| columns[*idx].column_name.clone())
.collect::<Vec<_>>();
let expr = CreateExpr {
catalog_name: Some(catalog_name.to_string()),
schema_name: Some(schema_name.to_string()),
let expr = CreateTableExpr {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
desc: Some("Created on insertion".to_string()),
desc: "Created on insertion".to_string(),
column_defs,
time_index: timestamp_field_name,
primary_keys,
create_if_not_exists: true,
table_options: Default::default(),
table_id,
table_id: table_id.map(|id| api::v1::TableId { id }),
region_ids: vec![0], // TODO:(hl): region id should be allocated by frontend
};
@@ -516,9 +516,9 @@ mod tests {
build_create_expr_from_insertion("", "", table_id, table_name, &insert_batch.0)
.unwrap();
assert_eq!(table_id, create_expr.table_id);
assert_eq!(table_id, create_expr.table_id.map(|x| x.id));
assert_eq!(table_name, create_expr.table_name);
assert_eq!(Some("Created on insertion".to_string()), create_expr.desc);
assert_eq!("Created on insertion".to_string(), create_expr.desc);
assert_eq!(
vec![create_expr.column_defs[0].name.clone()],
create_expr.primary_keys
@@ -725,7 +725,7 @@ mod tests {
async fn scan(
&self,
_projection: &Option<Vec<usize>>,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> TableResult<PhysicalPlanRef> {

View File

@@ -1,4 +1,3 @@
#![feature(assert_matches)]
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");

View File

@@ -1,8 +1,8 @@
[package]
name = "common-grpc"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
api = { path = "../../api" }
@@ -13,7 +13,7 @@ common-query = { path = "../query" }
common-recordbatch = { path = "../recordbatch" }
common-runtime = { path = "../runtime" }
dashmap = "5.4"
datafusion = "14.0.0"
datafusion.workspace = true
datatypes = { path = "../../datatypes" }
snafu = { version = "0.7", features = ["backtraces"] }
tokio = { version = "1.0", features = ["full"] }

View File

@@ -26,7 +26,7 @@ async fn do_bench_channel_manager() {
let join = tokio::spawn(async move {
for _ in 0..10000 {
let idx = rand::random::<usize>() % 100;
let ret = m_clone.get(format!("{}", idx));
let ret = m_clone.get(format!("{idx}"));
assert!(ret.is_ok());
}
});

View File

@@ -120,7 +120,7 @@ impl ChannelManager {
fn build_endpoint(&self, addr: &str) -> Result<Endpoint> {
let mut endpoint =
Endpoint::new(format!("http://{}", addr)).context(error::CreateChannelSnafu)?;
Endpoint::new(format!("http://{addr}")).context(error::CreateChannelSnafu)?;
if let Some(dur) = self.config.timeout {
endpoint = endpoint.timeout(dur);

View File

@@ -1,17 +1,17 @@
[package]
name = "common-query"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
async-trait = "0.1"
common-error = { path = "../error" }
common-recordbatch = { path = "../recordbatch" }
common-time = { path = "../time" }
datafusion = "14.0.0"
datafusion-common = "14.0.0"
datafusion-expr = "14.0.0"
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datatypes = { path = "../../datatypes" }
snafu = { version = "0.7", features = ["backtraces"] }
statrs = "0.15"

View File

@@ -161,12 +161,7 @@ mod tests {
assert_eq!(4, vec.len());
for i in 0..4 {
assert_eq!(
i == 0 || i == 3,
vec.get_data(i).unwrap(),
"failed at {}",
i
)
assert_eq!(i == 0 || i == 3, vec.get_data(i).unwrap(), "Failed at {i}")
}
}
_ => unreachable!(),

View File

@@ -18,7 +18,7 @@ use std::fmt::Debug;
use std::sync::Arc;
use datafusion_common::Result as DfResult;
use datafusion_expr::{Accumulator as DfAccumulator, AggregateState};
use datafusion_expr::Accumulator as DfAccumulator;
use datatypes::arrow::array::ArrayRef;
use datatypes::prelude::*;
use datatypes::vectors::{Helper as VectorHelper, VectorRef};
@@ -126,24 +126,19 @@ impl DfAccumulatorAdaptor {
}
impl DfAccumulator for DfAccumulatorAdaptor {
fn state(&self) -> DfResult<Vec<AggregateState>> {
fn state(&self) -> DfResult<Vec<ScalarValue>> {
let state_values = self.accumulator.state()?;
let state_types = self.creator.state_types()?;
if state_values.len() != state_types.len() {
return error::BadAccumulatorImplSnafu {
err_msg: format!("Accumulator {:?} returned state values size do not match its state types size.", self),
err_msg: format!("Accumulator {self:?} returned state values size do not match its state types size."),
}
.fail()?;
}
Ok(state_values
.into_iter()
.zip(state_types.iter())
.map(|(v, t)| {
let scalar = v
.try_to_scalar_value(t)
.context(error::ToScalarValueSnafu)?;
Ok(AggregateState::Scalar(scalar))
})
.map(|(v, t)| v.try_to_scalar_value(t).context(error::ToScalarValueSnafu))
.collect::<Result<Vec<_>>>()?)
}
@@ -175,4 +170,9 @@ impl DfAccumulator for DfAccumulatorAdaptor {
.map_err(Error::from)?;
Ok(scalar_value)
}
fn size(&self) -> usize {
// TODO(LFC): Implement new "size" method for Accumulator.
0
}
}

View File

@@ -233,7 +233,7 @@ mod test {
async fn scan(
&self,
_ctx: &SessionState,
_projection: &Option<Vec<usize>>,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> DfResult<Arc<dyn DfPhysicalPlan>> {

View File

@@ -1,13 +1,13 @@
[package]
name = "common-recordbatch"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
common-error = { path = "../error" }
datafusion = "14.0.0"
datafusion-common = "14.0.0"
datafusion.workspace = true
datafusion-common.workspace = true
datatypes = { path = "../../datatypes" }
futures = "0.3"
paste = "1.0"

View File

@@ -121,7 +121,8 @@ impl Stream for RecordBatchStreamAdapter {
enum AsyncRecordBatchStreamAdapterState {
Uninit(FutureStream),
Inited(std::result::Result<DfSendableRecordBatchStream, DataFusionError>),
Ready(DfSendableRecordBatchStream),
Failed,
}
pub struct AsyncRecordBatchStreamAdapter {
@@ -151,28 +152,26 @@ impl Stream for AsyncRecordBatchStreamAdapter {
loop {
match &mut self.state {
AsyncRecordBatchStreamAdapterState::Uninit(stream_future) => {
self.state = AsyncRecordBatchStreamAdapterState::Inited(ready!(Pin::new(
stream_future
)
.poll(cx)));
continue;
match ready!(Pin::new(stream_future).poll(cx)) {
Ok(stream) => {
self.state = AsyncRecordBatchStreamAdapterState::Ready(stream);
continue;
}
Err(e) => {
self.state = AsyncRecordBatchStreamAdapterState::Failed;
return Poll::Ready(Some(
Err(e).context(error::InitRecordbatchStreamSnafu),
));
}
};
}
AsyncRecordBatchStreamAdapterState::Inited(stream) => match stream {
Ok(stream) => {
return Poll::Ready(ready!(Pin::new(stream).poll_next(cx)).map(|df| {
let df_record_batch = df.context(error::PollStreamSnafu)?;
RecordBatch::try_from_df_record_batch(self.schema(), df_record_batch)
}));
}
Err(e) => {
return Poll::Ready(Some(
error::CreateRecordBatchesSnafu {
reason: format!("Read error {:?} from stream", e),
}
.fail(),
))
}
},
AsyncRecordBatchStreamAdapterState::Ready(stream) => {
return Poll::Ready(ready!(Pin::new(stream).poll_next(cx)).map(|x| {
let df_record_batch = x.context(error::PollStreamSnafu)?;
RecordBatch::try_from_df_record_batch(self.schema(), df_record_batch)
}))
}
AsyncRecordBatchStreamAdapterState::Failed => return Poll::Ready(None),
}
}
}
@@ -183,3 +182,104 @@ impl Stream for AsyncRecordBatchStreamAdapter {
(0, None)
}
}
#[cfg(test)]
mod test {
use common_error::mock::MockError;
use common_error::prelude::{BoxedError, StatusCode};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::vectors::Int32Vector;
use super::*;
use crate::RecordBatches;
#[tokio::test]
async fn test_async_recordbatch_stream_adaptor() {
struct MaybeErrorRecordBatchStream {
items: Vec<Result<RecordBatch>>,
}
impl RecordBatchStream for MaybeErrorRecordBatchStream {
fn schema(&self) -> SchemaRef {
unimplemented!()
}
}
impl Stream for MaybeErrorRecordBatchStream {
type Item = Result<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if let Some(batch) = self.items.pop() {
Poll::Ready(Some(Ok(batch?)))
} else {
Poll::Ready(None)
}
}
}
fn new_future_stream(
maybe_recordbatches: Result<Vec<Result<RecordBatch>>>,
) -> FutureStream {
Box::pin(async move {
maybe_recordbatches
.map(|items| {
Box::pin(DfRecordBatchStreamAdapter::new(Box::pin(
MaybeErrorRecordBatchStream { items },
))) as _
})
.map_err(|e| DataFusionError::External(Box::new(e)))
})
}
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"a",
ConcreteDataType::int32_datatype(),
false,
)]));
let batch1 = RecordBatch::new(
schema.clone(),
vec![Arc::new(Int32Vector::from_slice(&[1])) as _],
)
.unwrap();
let batch2 = RecordBatch::new(
schema.clone(),
vec![Arc::new(Int32Vector::from_slice(&[2])) as _],
)
.unwrap();
let success_stream = new_future_stream(Ok(vec![Ok(batch1.clone()), Ok(batch2.clone())]));
let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), success_stream);
let collected = RecordBatches::try_collect(Box::pin(adapter)).await.unwrap();
assert_eq!(
collected,
RecordBatches::try_new(schema.clone(), vec![batch2.clone(), batch1.clone()]).unwrap()
);
let poll_err_stream = new_future_stream(Ok(vec![
Ok(batch1.clone()),
Err(error::Error::External {
source: BoxedError::new(MockError::new(StatusCode::Unknown)),
}),
]));
let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), poll_err_stream);
let result = RecordBatches::try_collect(Box::pin(adapter)).await;
assert_eq!(
result.unwrap_err().to_string(),
"Failed to poll stream, source: External error: External error, source: Unknown"
);
let failed_to_init_stream = new_future_stream(Err(error::Error::External {
source: BoxedError::new(MockError::new(StatusCode::Internal)),
}));
let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), failed_to_init_stream);
let result = RecordBatches::try_collect(Box::pin(adapter)).await;
assert_eq!(
result.unwrap_err().to_string(),
"Failed to init Recordbatch stream, source: External error: External error, source: Internal"
);
}
}

View File

@@ -64,6 +64,12 @@ pub enum Error {
source: datatypes::arrow::error::ArrowError,
backtrace: Backtrace,
},
#[snafu(display("Failed to init Recordbatch stream, source: {}", source))]
InitRecordbatchStream {
source: datafusion_common::DataFusionError,
backtrace: Backtrace,
},
}
impl ErrorExt for Error {
@@ -74,7 +80,8 @@ impl ErrorExt for Error {
Error::DataTypes { .. }
| Error::CreateRecordBatches { .. }
| Error::PollStream { .. }
| Error::Format { .. } => StatusCode::Internal,
| Error::Format { .. }
| Error::InitRecordbatchStream { .. } => StatusCode::Internal,
Error::External { source } => source.status_code(),

View File

@@ -231,8 +231,7 @@ mod tests {
assert_eq!(
result.unwrap_err().to_string(),
format!(
"Failed to create RecordBatches, reason: expect RecordBatch schema equals {:?}, actual: {:?}",
schema1, schema2
"Failed to create RecordBatches, reason: expect RecordBatch schema equals {schema1:?}, actual: {schema2:?}",
)
);

View File

@@ -227,7 +227,7 @@ mod tests {
let output = serde_json::to_string(&batch).unwrap();
assert_eq!(
r#"{"schema":{"fields":[{"name":"number","data_type":"UInt32","nullable":false,"dict_id":0,"dict_is_ordered":false}],"metadata":{"greptime:version":"0"}},"columns":[[0,1,2,3,4,5,6,7,8,9]]}"#,
r#"{"schema":{"fields":[{"name":"number","data_type":"UInt32","nullable":false,"dict_id":0,"dict_is_ordered":false,"metadata":{}}],"metadata":{"greptime:version":"0"}},"columns":[[0,1,2,3,4,5,6,7,8,9]]}"#,
output
);
}

View File

@@ -1,8 +1,8 @@
[package]
name = "common-runtime"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
common-error = { path = "../error" }

View File

@@ -1,8 +1,8 @@
[package]
name = "substrait"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
bytes = "1.1"
@@ -10,8 +10,8 @@ catalog = { path = "../../catalog" }
common-catalog = { path = "../catalog" }
common-error = { path = "../error" }
common-telemetry = { path = "../telemetry" }
datafusion = "14.0.0"
datafusion-expr = "14.0.0"
datafusion.workspace = true
datafusion-expr.workspace = true
datatypes = { path = "../../datatypes" }
futures = "0.3"
prost = "0.9"

View File

@@ -16,6 +16,7 @@ use std::collections::VecDeque;
use std::str::FromStr;
use datafusion::common::Column;
use datafusion_expr::expr::Sort;
use datafusion_expr::{expr_fn, lit, Between, BinaryExpr, BuiltinScalarFunction, Expr, Operator};
use datatypes::schema::Schema;
use snafu::{ensure, OptionExt};
@@ -61,7 +62,7 @@ pub(crate) fn to_df_expr(
| RexType::Cast(_)
| RexType::Subquery(_)
| RexType::Enum(_) => UnsupportedExprSnafu {
name: format!("substrait expression {:?}", expr_rex_type),
name: format!("substrait expression {expr_rex_type:?}"),
}
.fail()?,
}
@@ -109,7 +110,7 @@ pub fn convert_scalar_function(
let fn_name = ctx
.find_scalar_fn(anchor)
.with_context(|| InvalidParametersSnafu {
reason: format!("Unregistered scalar function reference: {}", anchor),
reason: format!("Unregistered scalar function reference: {anchor}"),
})?;
// convenient util
@@ -331,19 +332,19 @@ pub fn convert_scalar_function(
// skip Cast and TryCast, is covered in substrait::Cast.
"sort" | "sort_des" => {
ensure_arg_len(1)?;
Expr::Sort {
Expr::Sort(Sort {
expr: Box::new(inputs.pop_front().unwrap()),
asc: false,
nulls_first: false,
}
})
}
"sort_asc" => {
ensure_arg_len(1)?;
Expr::Sort {
Expr::Sort(Sort {
expr: Box::new(inputs.pop_front().unwrap()),
asc: true,
nulls_first: false,
}
})
}
// those are datafusion built-in "scalar functions".
"abs"
@@ -435,7 +436,7 @@ pub fn convert_scalar_function(
// skip Wildcard, unimplemented.
// end other direct expr
_ => UnsupportedExprSnafu {
name: format!("scalar function {}", fn_name),
name: format!("scalar function {fn_name}"),
}
.fail()?,
};
@@ -537,11 +538,11 @@ pub fn expression_from_df_expr(
name: expr.to_string(),
}
.fail()?,
Expr::Sort {
Expr::Sort(Sort {
expr,
asc,
nulls_first: _,
} => {
}) => {
let expr = expression_from_df_expr(ctx, expr, schema)?;
let arguments = utils::expression_to_argument(vec![expr]);
let op_name = if *asc { "sort_asc" } else { "sort_des" };
@@ -577,6 +578,7 @@ pub fn expression_from_df_expr(
| Expr::Exists { .. }
| Expr::InSubquery { .. }
| Expr::ScalarSubquery(..)
| Expr::Placeholder { .. }
| Expr::QualifiedWildcard { .. } => todo!(),
Expr::GroupingSet(_) => UnsupportedExprSnafu {
name: expr.to_string(),
@@ -595,8 +597,8 @@ pub fn convert_column(column: &Column, schema: &Schema) -> Result<FieldReference
schema
.column_index_by_name(column_name)
.with_context(|| MissingFieldSnafu {
field: format!("{:?}", column),
plan: format!("schema: {:?}", schema),
field: format!("{column:?}"),
plan: format!("schema: {schema:?}"),
})?;
Ok(FieldReference {
@@ -646,6 +648,8 @@ mod utils {
Operator::BitwiseShiftRight => "bitwise_shift_right",
Operator::BitwiseShiftLeft => "bitwise_shift_left",
Operator::StringConcat => "string_concat",
Operator::ILike => "i_like",
Operator::NotILike => "not_i_like",
}
}

View File

@@ -19,7 +19,7 @@ use catalog::CatalogManagerRef;
use common_error::prelude::BoxedError;
use common_telemetry::debug;
use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use datafusion::common::ToDFSchema;
use datafusion::common::{DFField, DFSchema};
use datafusion::datasource::DefaultTableSource;
use datafusion::physical_plan::project_schema;
use datafusion_expr::{Filter, LogicalPlan, TableScan, TableSource};
@@ -236,7 +236,7 @@ impl DFLogicalSubstraitConvertor {
.map_err(BoxedError::new)
.context(InternalSnafu)?
.context(TableNotFoundSnafu {
name: format!("{}.{}.{}", catalog_name, schema_name, table_name),
name: format!("{catalog_name}.{schema_name}.{table_name}"),
})?;
let adapter = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(table_ref),
@@ -262,16 +262,26 @@ impl DFLogicalSubstraitConvertor {
};
// Calculate the projected schema
let projected_schema = project_schema(&stored_schema, projection.as_ref())
.context(DFInternalSnafu)?
.to_dfschema_ref()
.context(DFInternalSnafu)?;
let qualified = &format!("{catalog_name}.{schema_name}.{table_name}");
let projected_schema = Arc::new(
project_schema(&stored_schema, projection.as_ref())
.and_then(|x| {
DFSchema::new_with_metadata(
x.fields()
.iter()
.map(|f| DFField::from_qualified(qualified, f.clone()))
.collect(),
x.metadata().clone(),
)
})
.context(DFInternalSnafu)?,
);
ctx.set_df_schema(projected_schema.clone());
// TODO(ruihang): Support limit(fetch)
Ok(LogicalPlan::TableScan(TableScan {
table_name: format!("{}.{}.{}", catalog_name, schema_name, table_name),
table_name: format!("{catalog_name}.{schema_name}.{table_name}"),
source: adapter,
projection,
projected_schema,
@@ -385,10 +395,10 @@ impl DFLogicalSubstraitConvertor {
| LogicalPlan::Values(_)
| LogicalPlan::Explain(_)
| LogicalPlan::Analyze(_)
| LogicalPlan::Extension(_) => InvalidParametersSnafu {
| LogicalPlan::Extension(_)
| LogicalPlan::Prepare(_) => InvalidParametersSnafu {
reason: format!(
"Trying to convert DDL/DML plan to substrait proto, plan: {:?}",
plan
"Trying to convert DDL/DML plan to substrait proto, plan: {plan:?}",
),
}
.fail()?,
@@ -562,7 +572,7 @@ mod test {
let proto = convertor.encode(plan.clone()).unwrap();
let tripped_plan = convertor.decode(proto, catalog).unwrap();
assert_eq!(format!("{:?}", plan), format!("{:?}", tripped_plan));
assert_eq!(format!("{plan:?}"), format!("{tripped_plan:?}"));
}
#[tokio::test]
@@ -596,8 +606,7 @@ mod test {
let table_scan_plan = LogicalPlan::TableScan(TableScan {
table_name: format!(
"{}.{}.{}",
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_TABLE_NAME
"{DEFAULT_CATALOG_NAME}.{DEFAULT_SCHEMA_NAME}.{DEFAULT_TABLE_NAME}",
),
source: adapter,
projection: Some(projection),

View File

@@ -87,7 +87,7 @@ pub fn to_concrete_type(ty: &SType) -> Result<(ConcreteDataType, bool)> {
| Kind::List(_)
| Kind::Map(_)
| Kind::UserDefinedTypeReference(_) => UnsupportedSubstraitTypeSnafu {
ty: format!("{:?}", kind),
ty: format!("{kind:?}"),
}
.fail(),
}
@@ -154,7 +154,7 @@ pub(crate) fn scalar_value_as_literal_type(v: &ScalarValue) -> Result<LiteralTyp
// TODO(LFC): Implement other conversions: ScalarValue => LiteralType
_ => {
return error::UnsupportedExprSnafu {
name: format!("{:?}", v),
name: format!("{v:?}"),
}
.fail()
}
@@ -177,7 +177,7 @@ pub(crate) fn literal_type_to_scalar_value(t: LiteralType) -> Result<ScalarValue
// TODO(LFC): Implement other conversions: Kind => ScalarValue
_ => {
return error::UnsupportedSubstraitTypeSnafu {
ty: format!("{:?}", kind),
ty: format!("{kind:?}"),
}
.fail()
}
@@ -194,7 +194,7 @@ pub(crate) fn literal_type_to_scalar_value(t: LiteralType) -> Result<ScalarValue
// TODO(LFC): Implement other conversions: LiteralType => ScalarValue
_ => {
return error::UnsupportedSubstraitTypeSnafu {
ty: format!("{:?}", t),
ty: format!("{t:?}"),
}
.fail()
}

View File

@@ -1,8 +1,8 @@
[package]
name = "common-telemetry"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
version.workspace = true
edition.workspace = true
license.workspace = true
[features]
console = ["console-subscriber"]

View File

@@ -28,7 +28,7 @@ pub fn set_panic_hook() {
let default_hook = panic::take_hook();
panic::set_hook(Box::new(move |panic| {
let backtrace = Backtrace::new();
let backtrace = format!("{:?}", backtrace);
let backtrace = format!("{backtrace:?}");
if let Some(location) = panic.location() {
tracing::error!(
message = %panic,

View File

@@ -1,8 +1,8 @@
[package]
name = "common-time"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
chrono = "0.4"

View File

@@ -1,8 +1,8 @@
[package]
name = "datanode"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
version.workspace = true
edition.workspace = true
license.workspace = true
[features]
default = ["python"]
@@ -25,7 +25,7 @@ common-recordbatch = { path = "../common/recordbatch" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datafusion = "14.0.0"
datafusion.workspace = true
datatypes = { path = "../datatypes" }
futures = "0.3"
hyper = { version = "0.14", features = ["full"] }
@@ -57,5 +57,5 @@ tower-http = { version = "0.3", features = ["full"] }
axum-test-helper = { git = "https://github.com/sunng87/axum-test-helper.git", branch = "patch-1" }
client = { path = "../client" }
common-query = { path = "../common/query" }
datafusion-common = "14.0.0"
datafusion-common.workspace = true
tempdir = "0.3"

View File

@@ -155,7 +155,7 @@ pub enum Error {
#[snafu(display("Failed to init backend, config: {:#?}, source: {}", config, source))]
InitBackend {
config: ObjectStoreConfig,
config: Box<ObjectStoreConfig>,
source: object_store::Error,
backtrace: Backtrace,
},

View File

@@ -233,7 +233,7 @@ pub(crate) async fn new_fs_object_store(data_dir: &str) -> Result<ObjectStore> {
.context(error::CreateDirSnafu { dir: &data_dir })?;
info!("The file storage directory is: {}", &data_dir);
let atomic_write_dir = format!("{}/.tmp/", data_dir);
let atomic_write_dir = format!("{data_dir}/.tmp/");
let accessor = FsBuilder::default()
.root(&data_dir)

View File

@@ -117,7 +117,7 @@ impl Instance {
}
Some(select_expr::Expr::LogicalPlan(plan)) => self.execute_logical(plan).await,
_ => UnsupportedExprSnafu {
name: format!("{:?}", expr),
name: format!("{expr:?}"),
}
.fail(),
}
@@ -175,7 +175,7 @@ impl GrpcQueryHandler for Instance {
Some(object_expr::Expr::Select(select_expr)) => self.handle_select(select_expr).await,
other => {
return servers::error::NotSupportedSnafu {
feat: format!("{:?}", other),
feat: format!("{other:?}"),
}
.fail();
}
@@ -188,7 +188,9 @@ impl GrpcQueryHandler for Instance {
impl GrpcAdminHandler for Instance {
async fn exec_admin_request(&self, expr: AdminExpr) -> servers::error::Result<AdminResult> {
let admin_resp = match expr.expr {
Some(admin_expr::Expr::Create(create_expr)) => self.handle_create(create_expr).await,
Some(admin_expr::Expr::CreateTable(create_expr)) => {
self.handle_create(create_expr).await
}
Some(admin_expr::Expr::Alter(alter_expr)) => self.handle_alter(alter_expr).await,
Some(admin_expr::Expr::CreateDatabase(create_database_expr)) => {
self.execute_create_database(create_database_expr).await
@@ -198,7 +200,7 @@ impl GrpcAdminHandler for Instance {
}
other => {
return servers::error::NotSupportedSnafu {
feat: format!("{:?}", other),
feat: format!("{other:?}"),
}
.fail();
}

View File

@@ -33,12 +33,11 @@ use crate::metric;
use crate::sql::SqlRequest;
impl Instance {
pub async fn execute_sql(&self, sql: &str, query_ctx: QueryContextRef) -> Result<Output> {
let stmt = self
.query_engine
.sql_to_statement(sql)
.context(ExecuteSqlSnafu)?;
pub async fn execute_stmt(
&self,
stmt: Statement,
query_ctx: QueryContextRef,
) -> Result<Output> {
match stmt {
Statement::Query(_) => {
let logical_plan = self
@@ -153,6 +152,14 @@ impl Instance {
}
}
}
pub async fn execute_sql(&self, sql: &str, query_ctx: QueryContextRef) -> Result<Output> {
let stmt = self
.query_engine
.sql_to_statement(sql)
.context(ExecuteSqlSnafu)?;
self.execute_stmt(stmt, query_ctx).await
}
}
// TODO(LFC): Refactor consideration: move this function to some helper mod,
@@ -180,8 +187,7 @@ fn table_idents_to_full_name(
)),
_ => error::InvalidSqlSnafu {
msg: format!(
"expect table name to be <catalog>.<schema>.<table>, <schema>.<table> or <table>, actual: {}",
obj_name
"expect table name to be <catalog>.<schema>.<table>, <schema>.<table> or <table>, actual: {obj_name}",
),
}.fail(),
}
@@ -193,15 +199,40 @@ impl SqlQueryHandler for Instance {
&self,
query: &str,
query_ctx: QueryContextRef,
) -> servers::error::Result<Output> {
) -> Vec<servers::error::Result<Output>> {
let _timer = timer!(metric::METRIC_HANDLE_SQL_ELAPSED);
self.execute_sql(query, query_ctx)
// we assume sql string has only 1 statement in datanode
let result = self
.execute_sql(query, query_ctx)
.await
.map_err(|e| {
error!(e; "Instance failed to execute sql");
BoxedError::new(e)
})
.context(servers::error::ExecuteQuerySnafu { query })
.context(servers::error::ExecuteQuerySnafu { query });
vec![result]
}
async fn do_statement_query(
&self,
stmt: Statement,
query_ctx: QueryContextRef,
) -> servers::error::Result<Output> {
let _timer = timer!(metric::METRIC_HANDLE_SQL_ELAPSED);
self.execute_stmt(stmt, query_ctx)
.await
.map_err(|e| {
error!(e; "Instance failed to execute sql");
BoxedError::new(e)
})
.context(servers::error::ExecuteStatementSnafu)
}
fn is_valid_schema(&self, catalog: &str, schema: &str) -> servers::error::Result<bool> {
self.catalog_manager
.schema(catalog, schema)
.map(|s| s.is_some())
.context(servers::error::CatalogSnafu)
}
}

View File

@@ -40,7 +40,7 @@ impl Services {
pub async fn try_new(instance: InstanceRef, opts: &DatanodeOptions) -> Result<Self> {
let grpc_runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(opts.rpc_runtime_size as usize)
.worker_threads(opts.rpc_runtime_size)
.thread_name("grpc-io-handlers")
.build()
.context(RuntimeResourceSnafu)?,
@@ -54,7 +54,7 @@ impl Services {
Mode::Distributed => {
let mysql_io_runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(opts.mysql_runtime_size as usize)
.worker_threads(opts.mysql_runtime_size)
.thread_name("mysql-io-handlers")
.build()
.context(RuntimeResourceSnafu)?,

View File

@@ -15,7 +15,7 @@
use std::sync::Arc;
use api::result::AdminResultBuilder;
use api::v1::{AdminResult, AlterExpr, CreateExpr, DropTableExpr};
use api::v1::{AdminResult, AlterExpr, CreateTableExpr, DropTableExpr};
use common_error::prelude::{ErrorExt, StatusCode};
use common_grpc_expr::{alter_expr_to_request, create_expr_to_request};
use common_query::Output;
@@ -31,15 +31,15 @@ use crate::sql::SqlRequest;
impl Instance {
/// Handle gRPC create table requests.
pub(crate) async fn handle_create(&self, expr: CreateExpr) -> AdminResult {
pub(crate) async fn handle_create(&self, expr: CreateTableExpr) -> AdminResult {
// Respect CreateExpr's table id and region ids if present, or allocate table id
// from local table id provider and set region id to 0.
let table_id = if let Some(table_id) = expr.table_id {
let table_id = if let Some(table_id) = &expr.table_id {
info!(
"Creating table {:?}.{:?}.{:?} with table id from frontend: {}",
expr.catalog_name, expr.schema_name, expr.table_name, table_id
expr.catalog_name, expr.schema_name, expr.table_name, table_id.id
);
table_id
table_id.id
} else {
match self.table_id_provider.as_ref() {
None => {
@@ -157,7 +157,7 @@ impl Instance {
mod tests {
use std::sync::Arc;
use api::v1::{ColumnDataType, ColumnDef};
use api::v1::{ColumnDataType, ColumnDef, TableId};
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_grpc_expr::create_table_schema;
use datatypes::prelude::ConcreteDataType;
@@ -175,7 +175,7 @@ mod tests {
assert_eq!(request.catalog_name, "greptime".to_string());
assert_eq!(request.schema_name, "public".to_string());
assert_eq!(request.table_name, "my-metrics");
assert_eq!(request.desc, Some("blabla".to_string()));
assert_eq!(request.desc, Some("blabla little magic fairy".to_string()));
assert_eq!(request.schema, expected_table_schema());
assert_eq!(request.primary_key_indices, vec![1, 0]);
assert!(request.create_if_not_exists);
@@ -202,8 +202,7 @@ mod tests {
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("Missing timestamp column"),
"actual: {}",
err_msg
"actual: {err_msg}",
);
}
@@ -214,7 +213,7 @@ mod tests {
name: "a".to_string(),
datatype: 1024,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
};
let result = column_def.try_as_column_schema();
assert!(matches!(
@@ -226,7 +225,7 @@ mod tests {
name: "a".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
};
let column_schema = column_def.try_as_column_schema().unwrap();
assert_eq!(column_schema.name, "a");
@@ -238,7 +237,7 @@ mod tests {
name: "a".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: Some(default_constraint.clone().try_into().unwrap()),
default_constraint: default_constraint.clone().try_into().unwrap(),
};
let column_schema = column_def.try_as_column_schema().unwrap();
assert_eq!(column_schema.name, "a");
@@ -250,44 +249,46 @@ mod tests {
);
}
fn testing_create_expr() -> CreateExpr {
fn testing_create_expr() -> CreateTableExpr {
let column_defs = vec![
ColumnDef {
name: "host".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: false,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "ts".to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
is_nullable: false,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "cpu".to_string(),
datatype: ColumnDataType::Float32 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
ColumnDef {
name: "memory".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
];
CreateExpr {
catalog_name: None,
schema_name: None,
CreateTableExpr {
catalog_name: "".to_string(),
schema_name: "".to_string(),
table_name: "my-metrics".to_string(),
desc: Some("blabla".to_string()),
desc: "blabla little magic fairy".to_string(),
column_defs,
time_index: "ts".to_string(),
primary_keys: vec!["ts".to_string(), "host".to_string()],
create_if_not_exists: true,
table_options: Default::default(),
table_id: Some(MIN_USER_TABLE_ID),
table_id: Some(TableId {
id: MIN_USER_TABLE_ID,
}),
region_ids: vec![0],
}
}

View File

@@ -96,7 +96,7 @@ impl SqlHandler {
result
}
pub(crate) fn get_table<'a>(&self, table_ref: &'a TableReference) -> Result<TableRef> {
pub(crate) fn get_table(&self, table_ref: &TableReference) -> Result<TableRef> {
self.table_engine
.get_table(&EngineContext::default(), table_ref)
.with_context(|_| GetTableSnafu {
@@ -176,7 +176,7 @@ mod tests {
async fn scan(
&self,
_projection: &Option<Vec<usize>>,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> TableResult<PhysicalPlanRef> {

View File

@@ -61,7 +61,7 @@ impl SqlHandler {
let alter_kind = match alter_table.alter_operation() {
AlterTableOperation::AddConstraint(table_constraint) => {
return error::InvalidSqlSnafu {
msg: format!("unsupported table constraint {}", table_constraint),
msg: format!("unsupported table constraint {table_constraint}"),
}
.fail()
}

View File

@@ -143,7 +143,7 @@ impl SqlHandler {
)?;
} else {
return error::InvalidSqlSnafu {
msg: format!("Cannot recognize named UNIQUE constraint: {}", name),
msg: format!("Cannot recognize named UNIQUE constraint: {name}"),
}
.fail();
}
@@ -158,8 +158,7 @@ impl SqlHandler {
} else {
return error::InvalidSqlSnafu {
msg: format!(
"Unrecognized non-primary unnamed UNIQUE constraint: {:?}",
name
"Unrecognized non-primary unnamed UNIQUE constraint: {name:?}",
),
}
.fail();
@@ -167,7 +166,7 @@ impl SqlHandler {
}
_ => {
return ConstraintNotSupportedSnafu {
constraint: format!("{:?}", c),
constraint: format!("{c:?}"),
}
.fail();
}

View File

@@ -487,12 +487,11 @@ async fn test_insert_with_default_value_for_type(type_name: &str) {
let create_sql = format!(
r#"create table test_table(
host string,
ts {} DEFAULT CURRENT_TIMESTAMP,
ts {type_name} DEFAULT CURRENT_TIMESTAMP,
cpu double default 0,
TIME INDEX (ts),
PRIMARY KEY(host)
) engine=mito with(regions=1);"#,
type_name
);
let output = execute_sql(&instance, &create_sql).await;
assert!(matches!(output, Output::AffectedRows(1)));

View File

@@ -40,8 +40,8 @@ pub struct TestGuard {
}
pub fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) {
let wal_tmp_dir = TempDir::new(&format!("gt_wal_{}", name)).unwrap();
let data_tmp_dir = TempDir::new(&format!("gt_data_{}", name)).unwrap();
let wal_tmp_dir = TempDir::new(&format!("gt_wal_{name}")).unwrap();
let data_tmp_dir = TempDir::new(&format!("gt_data_{name}")).unwrap();
let opts = DatanodeOptions {
wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
storage: ObjectStoreConfig::File {

View File

@@ -1,20 +1,20 @@
[package]
name = "datatypes"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
version.workspace = true
edition.workspace = true
license.workspace = true
[features]
default = []
test = []
[dependencies]
arrow = { version = "26.0" }
arrow-schema = { version = "26.0", features = ["serde"] }
arrow.workspace = true
arrow-schema.workspace = true
common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-time = { path = "../common/time" }
datafusion-common = "14.0"
datafusion-common.workspace = true
enum_dispatch = "0.3"
num = "0.4"
num-traits = "0.2"

View File

@@ -139,7 +139,7 @@ mod tests {
map.insert(false, 2);
let result = serde_json::to_string(&map).context(SerializeSnafu);
assert!(result.is_err(), "serialize result is: {:?}", result);
assert!(result.is_err(), "serialize result is: {result:?}");
let err = serde_json::to_string(&map)
.context(SerializeSnafu)
.err()

View File

@@ -12,9 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(generic_associated_types)]
#![feature(assert_matches)]
pub mod arrow_array;
pub mod data_type;
pub mod error;

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use std::collections::HashMap;
use arrow::datatypes::Field;
use serde::{Deserialize, Serialize};
@@ -23,7 +23,7 @@ use crate::error::{self, Error, Result};
use crate::schema::constraint::ColumnDefaultConstraint;
use crate::vectors::VectorRef;
pub type Metadata = BTreeMap<String, String>;
pub type Metadata = HashMap<String, String>;
/// Key used to store whether the column is time index in arrow field's metadata.
const TIME_INDEX_KEY: &str = "greptime:time_index";
@@ -131,7 +131,7 @@ impl TryFrom<&Field> for ColumnSchema {
fn try_from(field: &Field) -> Result<ColumnSchema> {
let data_type = ConcreteDataType::try_from(field.data_type())?;
let mut metadata = field.metadata().cloned().unwrap_or_default();
let mut metadata = field.metadata().clone();
let default_constraint = match metadata.remove(DEFAULT_CONSTRAINT_KEY) {
Some(json) => {
Some(serde_json::from_str(&json).context(error::DeserializeSnafu { json })?)
@@ -176,7 +176,7 @@ impl TryFrom<&ColumnSchema> for Field {
column_schema.data_type.as_arrow_type(),
column_schema.is_nullable(),
)
.with_metadata(Some(metadata)))
.with_metadata(metadata))
}
}
@@ -215,11 +215,7 @@ mod tests {
assert!(field.is_nullable());
assert_eq!(
"{\"Value\":{\"Int32\":99}}",
field
.metadata()
.unwrap()
.get(DEFAULT_CONSTRAINT_KEY)
.unwrap()
field.metadata().get(DEFAULT_CONSTRAINT_KEY).unwrap()
);
let new_column_schema = ColumnSchema::try_from(&field).unwrap();
@@ -241,12 +237,8 @@ mod tests {
.is_none());
let field = Field::try_from(&column_schema).unwrap();
assert_eq!("v1", field.metadata().unwrap().get("k1").unwrap());
assert!(field
.metadata()
.unwrap()
.get(DEFAULT_CONSTRAINT_KEY)
.is_some());
assert_eq!("v1", field.metadata().get("k1").unwrap());
assert!(field.metadata().get(DEFAULT_CONSTRAINT_KEY).is_some());
let new_column_schema = ColumnSchema::try_from(&field).unwrap();
assert_eq!(column_schema, new_column_schema);

View File

@@ -57,8 +57,8 @@ impl TryFrom<ColumnDefaultConstraint> for Vec<u8> {
impl Display for ColumnDefaultConstraint {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
ColumnDefaultConstraint::Function(expr) => write!(f, "{}", expr),
ColumnDefaultConstraint::Value(v) => write!(f, "{}", v),
ColumnDefaultConstraint::Function(expr) => write!(f, "{expr}"),
ColumnDefaultConstraint::Value(v) => write!(f, "{v}"),
}
}
}
@@ -172,10 +172,7 @@ fn create_current_timestamp_vector(
std::iter::repeat(util::current_time_millis()).take(num_rows),
))),
_ => error::DefaultValueTypeSnafu {
reason: format!(
"Not support to assign current timestamp to {:?} type",
data_type
),
reason: format!("Not support to assign current timestamp to {data_type:?} type",),
}
.fail(),
}
@@ -301,6 +298,6 @@ mod tests {
let err = constraint
.create_default_vector(&data_type, false, 4)
.unwrap_err();
assert!(matches!(err, Error::DefaultValueType { .. }), "{:?}", err);
assert!(matches!(err, Error::DefaultValueType { .. }), "{err:?}");
}
}

View File

@@ -83,7 +83,7 @@ impl LogicalPrimitiveType for DateType {
ValueRef::Null => Ok(None),
ValueRef::Date(v) => Ok(Some(v)),
other => error::CastTypeSnafu {
msg: format!("Failed to cast value {:?} to Date", other,),
msg: format!("Failed to cast value {other:?} to Date"),
}
.fail(),
}

View File

@@ -84,7 +84,7 @@ impl LogicalPrimitiveType for DateTimeType {
ValueRef::Null => Ok(None),
ValueRef::DateTime(v) => Ok(Some(v)),
other => error::CastTypeSnafu {
msg: format!("Failed to cast value {:?} to DateTime", other,),
msg: format!("Failed to cast value {other:?} to DateTime"),
}
.fail(),
}

View File

@@ -49,7 +49,7 @@ impl DataType for NullType {
}
fn create_mutable_vector(&self, _capacity: usize) -> Box<dyn MutableVector> {
Box::new(NullVectorBuilder::default())
Box::<NullVectorBuilder>::default()
}
fn is_timestamp_compatible(&self) -> bool {

View File

@@ -70,31 +70,31 @@ impl Display for Value {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Value::Null => write!(f, "{}", self.data_type().name()),
Value::Boolean(v) => write!(f, "{}", v),
Value::UInt8(v) => write!(f, "{}", v),
Value::UInt16(v) => write!(f, "{}", v),
Value::UInt32(v) => write!(f, "{}", v),
Value::UInt64(v) => write!(f, "{}", v),
Value::Int8(v) => write!(f, "{}", v),
Value::Int16(v) => write!(f, "{}", v),
Value::Int32(v) => write!(f, "{}", v),
Value::Int64(v) => write!(f, "{}", v),
Value::Float32(v) => write!(f, "{}", v),
Value::Float64(v) => write!(f, "{}", v),
Value::Boolean(v) => write!(f, "{v}"),
Value::UInt8(v) => write!(f, "{v}"),
Value::UInt16(v) => write!(f, "{v}"),
Value::UInt32(v) => write!(f, "{v}"),
Value::UInt64(v) => write!(f, "{v}"),
Value::Int8(v) => write!(f, "{v}"),
Value::Int16(v) => write!(f, "{v}"),
Value::Int32(v) => write!(f, "{v}"),
Value::Int64(v) => write!(f, "{v}"),
Value::Float32(v) => write!(f, "{v}"),
Value::Float64(v) => write!(f, "{v}"),
Value::String(v) => write!(f, "{}", v.as_utf8()),
Value::Binary(v) => {
let hex = v
.iter()
.map(|b| format!("{:02x}", b))
.map(|b| format!("{b:02x}"))
.collect::<Vec<String>>()
.join("");
write!(f, "{}", hex)
write!(f, "{hex}")
}
Value::Date(v) => write!(f, "{}", v),
Value::DateTime(v) => write!(f, "{}", v),
Value::Date(v) => write!(f, "{v}"),
Value::DateTime(v) => write!(f, "{v}"),
Value::Timestamp(v) => write!(f, "{}", v.to_iso8601_string()),
Value::List(v) => {
let default = Box::new(vec![]);
let default = Box::<Vec<Value>>::default();
let items = v.items().as_ref().unwrap_or(&default);
let items = items
.iter()
@@ -146,7 +146,7 @@ impl Value {
Value::Null => Ok(None),
Value::List(v) => Ok(Some(v)),
other => error::CastTypeSnafu {
msg: format!("Failed to cast {:?} to list value", other),
msg: format!("Failed to cast {other:?} to list value"),
}
.fail(),
}
@@ -214,8 +214,7 @@ impl Value {
output_type_id == value_type_id || self.is_null(),
error::ToScalarValueSnafu {
reason: format!(
"expect value to return output_type {:?}, actual: {:?}",
output_type_id, value_type_id,
"expect value to return output_type {output_type_id:?}, actual: {value_type_id:?}",
),
}
);
@@ -544,12 +543,15 @@ impl TryFrom<ScalarValue> for Value {
.map(|x| Value::Timestamp(Timestamp::new(x, TimeUnit::Nanosecond)))
.unwrap_or(Value::Null),
ScalarValue::Decimal128(_, _, _)
| ScalarValue::Time64(_)
| ScalarValue::IntervalYearMonth(_)
| ScalarValue::IntervalDayTime(_)
| ScalarValue::IntervalMonthDayNano(_)
| ScalarValue::Struct(_, _)
| ScalarValue::Dictionary(_, _) => {
| ScalarValue::Dictionary(_, _)
| ScalarValue::Time32Second(_)
| ScalarValue::Time32Millisecond(_)
| ScalarValue::Time64Microsecond(_)
| ScalarValue::Time64Nanosecond(_) => {
return error::UnsupportedArrowTypeSnafu {
arrow_type: v.get_datatype(),
}
@@ -1342,7 +1344,7 @@ mod tests {
);
assert_eq!(
Value::List(ListValue::new(
Some(Box::new(vec![])),
Some(Box::default()),
ConcreteDataType::timestamp_second_datatype(),
))
.to_string(),
@@ -1350,7 +1352,7 @@ mod tests {
);
assert_eq!(
Value::List(ListValue::new(
Some(Box::new(vec![])),
Some(Box::default()),
ConcreteDataType::timestamp_millisecond_datatype(),
))
.to_string(),
@@ -1358,7 +1360,7 @@ mod tests {
);
assert_eq!(
Value::List(ListValue::new(
Some(Box::new(vec![])),
Some(Box::default()),
ConcreteDataType::timestamp_microsecond_datatype(),
))
.to_string(),
@@ -1366,7 +1368,7 @@ mod tests {
);
assert_eq!(
Value::List(ListValue::new(
Some(Box::new(vec![])),
Some(Box::default()),
ConcreteDataType::timestamp_nanosecond_datatype(),
))
.to_string(),

View File

@@ -252,7 +252,7 @@ mod tests {
#[test]
fn test_serialize_binary_vector_to_json() {
let vector = BinaryVector::from(BinaryArray::from_iter_values(&[
let vector = BinaryVector::from(BinaryArray::from_iter_values([
vec![1, 2, 3],
vec![1, 2, 3],
]));
@@ -281,7 +281,7 @@ mod tests {
#[test]
fn test_from_arrow_array() {
let arrow_array = BinaryArray::from_iter_values(&[vec![1, 2, 3], vec![1, 2, 3]]);
let arrow_array = BinaryArray::from_iter_values([vec![1, 2, 3], vec![1, 2, 3]]);
let original = BinaryArray::from(arrow_array.data().clone());
let vector = BinaryVector::from(arrow_array);
assert_eq!(original, vector.array);

View File

@@ -296,7 +296,7 @@ mod tests {
let vec = BooleanVector::from(input.clone());
assert_eq!(4, vec.len());
for (i, v) in input.into_iter().enumerate() {
assert_eq!(Some(v), vec.get_data(i), "failed at {}", i)
assert_eq!(Some(v), vec.get_data(i), "Failed at {i}")
}
}
@@ -306,7 +306,7 @@ mod tests {
let vec = input.iter().collect::<BooleanVector>();
assert_eq!(4, vec.len());
for (i, v) in input.into_iter().enumerate() {
assert_eq!(v, vec.get_data(i), "failed at {}", i)
assert_eq!(v, vec.get_data(i), "Failed at {i}")
}
}
@@ -316,7 +316,7 @@ mod tests {
let vec = BooleanVector::from(input.clone());
assert_eq!(4, vec.len());
for (i, v) in input.into_iter().enumerate() {
assert_eq!(v, vec.get_data(i), "failed at {}", i)
assert_eq!(v, vec.get_data(i), "failed at {i}")
}
}

View File

@@ -203,7 +203,7 @@ mod tests {
let a = Int32Vector::from_slice(vec![1]);
let c = ConstantVector::new(Arc::new(a), 10);
let s = format!("{:?}", c);
let s = format!("{c:?}");
assert_eq!(s, "ConstantVector([Int32(1); 10])");
}

View File

@@ -37,7 +37,7 @@ mod tests {
#[test]
fn test_datetime_vector() {
let v = DateTimeVector::new(PrimitiveArray::from_slice(&[1, 2, 3]));
let v = DateTimeVector::new(PrimitiveArray::from_slice([1, 2, 3]));
assert_eq!(ConcreteDataType::datetime_datatype(), v.data_type());
assert_eq!(3, v.len());
assert_eq!("DateTimeVector", v.vector_type_name());

View File

@@ -195,14 +195,17 @@ impl Helper {
ConstantVector::new(Arc::new(TimestampNanosecondVector::from(vec![v])), length)
}
ScalarValue::Decimal128(_, _, _)
| ScalarValue::Time64(_)
| ScalarValue::IntervalYearMonth(_)
| ScalarValue::IntervalDayTime(_)
| ScalarValue::IntervalMonthDayNano(_)
| ScalarValue::Struct(_, _)
| ScalarValue::Dictionary(_, _) => {
| ScalarValue::Dictionary(_, _)
| ScalarValue::Time32Second(_)
| ScalarValue::Time32Millisecond(_)
| ScalarValue::Time64Microsecond(_)
| ScalarValue::Time64Nanosecond(_) => {
return error::ConversionSnafu {
from: format!("Unsupported scalar value: {}", value),
from: format!("Unsupported scalar value: {value}"),
}
.fail()
}

View File

@@ -157,10 +157,7 @@ impl From<ListArray> for ListVector {
fn from(array: ListArray) -> Self {
let item_type = ConcreteDataType::from_arrow_type(match array.data_type() {
ArrowDataType::List(field) => field.data_type(),
other => panic!(
"Try to create ListVector from an arrow array with type {:?}",
other
),
other => panic!("Try to create ListVector from an arrow array with type {other:?}"),
});
Self { array, item_type }
}

View File

@@ -167,7 +167,7 @@ impl MutableVector for NullVectorBuilder {
ensure!(
value.is_null(),
error::CastTypeSnafu {
msg: format!("Failed to cast value ref {:?} to null", value),
msg: format!("Failed to cast value ref {value:?} to null"),
}
);
@@ -243,7 +243,7 @@ mod tests {
#[test]
fn test_debug_null_vector() {
let array = NullVector::new(1024 * 1024);
assert_eq!(format!("{:?}", array), "NullVector(1048576)");
assert_eq!(format!("{array:?}"), "NullVector(1048576)");
}
#[test]

View File

@@ -45,11 +45,11 @@ mod tests {
};
fn check_filter_primitive(expect: &[i32], input: &[i32], filter: &[bool]) {
let v = Int32Vector::from_slice(&input);
let v = Int32Vector::from_slice(input);
let filter = BooleanVector::from_slice(filter);
let out = v.filter(&filter).unwrap();
let expect: VectorRef = Arc::new(Int32Vector::from_slice(&expect));
let expect: VectorRef = Arc::new(Int32Vector::from_slice(expect));
assert_eq!(expect, out);
}

View File

@@ -365,7 +365,7 @@ pub(crate) fn replicate_primitive<T: LogicalPrimitiveType>(
return vector.get_slice(0, 0);
}
let mut builder = PrimitiveVectorBuilder::<T>::with_capacity(*offsets.last().unwrap() as usize);
let mut builder = PrimitiveVectorBuilder::<T>::with_capacity(*offsets.last().unwrap());
let mut previous_offset = 0;

View File

@@ -1,8 +1,8 @@
[package]
name = "frontend"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
anymap = "1.0.0-beta.2"
@@ -22,9 +22,9 @@ common-recordbatch = { path = "../common/recordbatch" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datafusion = "14.0.0"
datafusion-common = "14.0.0"
datafusion-expr = "14.0.0"
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datanode = { path = "../datanode" }
datatypes = { path = "../datatypes" }
futures = "0.3"

View File

@@ -291,7 +291,7 @@ impl SchemaProvider for FrontendSchemaProvider {
}
Some(r) => r,
};
let val = TableGlobalValue::from_bytes(&res.1).context(InvalidCatalogValueSnafu)?;
let val = TableGlobalValue::from_bytes(res.1).context(InvalidCatalogValueSnafu)?;
let table = Arc::new(DistTable::new(
table_name,

View File

@@ -387,6 +387,12 @@ pub enum Error {
source: query::error::Error,
},
#[snafu(display("Failed to execute statement, source: {}", source))]
ExecuteStatement {
#[snafu(backtrace)]
source: query::error::Error,
},
#[snafu(display("Failed to do vector computation, source: {}", source))]
VectorComputation {
#[snafu(backtrace)]
@@ -536,6 +542,7 @@ impl ErrorExt for Error {
Error::DeserializeInsertBatch { source, .. } => source.status_code(),
Error::PrimaryKeyNotFound { .. } => StatusCode::InvalidArguments,
Error::ExecuteSql { source, .. } => source.status_code(),
Error::ExecuteStatement { source, .. } => source.status_code(),
Error::InsertBatchToRequest { source, .. } => source.status_code(),
Error::CollectRecordbatchStream { source } | Error::CreateRecordbatches { source } => {
source.status_code()

View File

@@ -16,7 +16,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::{Column, ColumnDataType, CreateExpr};
use api::v1::{Column, ColumnDataType, CreateTableExpr};
use datatypes::schema::ColumnSchema;
use snafu::{ensure, ResultExt};
use sql::ast::{ColumnDef, TableConstraint};
@@ -32,7 +32,7 @@ pub type CreateExprFactoryRef = Arc<dyn CreateExprFactory + Send + Sync>;
#[async_trait::async_trait]
pub trait CreateExprFactory {
async fn create_expr_by_stmt(&self, stmt: &CreateTable) -> Result<CreateExpr>;
async fn create_expr_by_stmt(&self, stmt: &CreateTable) -> Result<CreateTableExpr>;
async fn create_expr_by_columns(
&self,
@@ -40,7 +40,7 @@ pub trait CreateExprFactory {
schema_name: &str,
table_name: &str,
columns: &[Column],
) -> crate::error::Result<CreateExpr>;
) -> crate::error::Result<CreateTableExpr>;
}
#[derive(Debug)]
@@ -48,7 +48,7 @@ pub struct DefaultCreateExprFactory;
#[async_trait::async_trait]
impl CreateExprFactory for DefaultCreateExprFactory {
async fn create_expr_by_stmt(&self, stmt: &CreateTable) -> Result<CreateExpr> {
async fn create_expr_by_stmt(&self, stmt: &CreateTable) -> Result<CreateTableExpr> {
create_to_expr(None, vec![0], stmt)
}
@@ -58,7 +58,7 @@ impl CreateExprFactory for DefaultCreateExprFactory {
schema_name: &str,
table_name: &str,
columns: &[Column],
) -> Result<CreateExpr> {
) -> Result<CreateTableExpr> {
let table_id = None;
let create_expr = common_grpc_expr::build_create_expr_from_insertion(
catalog_name,
@@ -78,23 +78,23 @@ fn create_to_expr(
table_id: Option<u32>,
region_ids: Vec<u32>,
create: &CreateTable,
) -> Result<CreateExpr> {
) -> Result<CreateTableExpr> {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(&create.name).context(ParseSqlSnafu)?;
let time_index = find_time_index(&create.constraints)?;
let expr = CreateExpr {
catalog_name: Some(catalog_name),
schema_name: Some(schema_name),
let expr = CreateTableExpr {
catalog_name,
schema_name,
table_name,
desc: None,
desc: "".to_string(),
column_defs: columns_to_expr(&create.columns, &time_index)?,
time_index,
primary_keys: find_primary_keys(&create.constraints)?,
create_if_not_exists: create.if_not_exists,
// TODO(LFC): Fill in other table options.
table_options: HashMap::from([("engine".to_string(), create.engine.clone())]),
table_id,
table_id: table_id.map(|id| api::v1::TableId { id }),
region_ids,
};
Ok(expr)
@@ -171,12 +171,14 @@ fn columns_to_expr(
datatype: datatype as i32,
is_nullable: schema.is_nullable(),
default_constraint: match schema.default_constraint() {
None => None,
Some(v) => Some(v.clone().try_into().context(
ConvertColumnDefaultConstraintSnafu {
column_name: &schema.name,
},
)?),
None => vec![],
Some(v) => {
v.clone()
.try_into()
.context(ConvertColumnDefaultConstraintSnafu {
column_name: &schema.name,
})?
}
},
})
})

View File

@@ -14,7 +14,6 @@
use std::sync::Arc;
use anymap::AnyMap;
use meta_client::MetaClientOpts;
use serde::{Deserialize, Serialize};
use servers::auth::UserProviderRef;
@@ -31,6 +30,7 @@ use crate::opentsdb::OpentsdbOptions;
use crate::postgres::PostgresOptions;
use crate::prometheus::PrometheusOptions;
use crate::server::Services;
use crate::Plugins;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct FrontendOptions {
@@ -67,11 +67,11 @@ where
{
opts: FrontendOptions,
instance: Option<T>,
plugins: AnyMap,
plugins: Arc<Plugins>,
}
impl<T: FrontendInstance> Frontend<T> {
pub fn new(opts: FrontendOptions, instance: T, plugins: AnyMap) -> Self {
pub fn new(opts: FrontendOptions, instance: T, plugins: Arc<Plugins>) -> Self {
Self {
opts,
instance: Some(instance),
@@ -90,6 +90,7 @@ impl<T: FrontendInstance> Frontend<T> {
let instance = Arc::new(instance);
// TODO(sunng87): merge this into instance
let provider = self.plugins.get::<UserProviderRef>().cloned();
Services::start(&self.opts, instance, provider).await

View File

@@ -25,7 +25,7 @@ use api::v1::alter_expr::Kind;
use api::v1::object_expr::Expr;
use api::v1::{
admin_expr, AddColumns, AdminExpr, AdminResult, AlterExpr, Column, CreateDatabaseExpr,
CreateExpr, DropTableExpr, ExprHeader, InsertExpr, ObjectExpr,
CreateTableExpr, DropTableExpr, ExprHeader, InsertExpr, ObjectExpr,
ObjectResult as GrpcObjectResult,
};
use async_trait::async_trait;
@@ -43,6 +43,7 @@ use datanode::instance::InstanceRef as DnInstanceRef;
use distributed::DistInstance;
use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_client::MetaClientOpts;
use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef};
use servers::query_handler::{
GrpcAdminHandler, GrpcAdminHandlerRef, GrpcQueryHandler, GrpcQueryHandlerRef,
InfluxdbLineProtocolHandler, OpentsdbProtocolHandler, PrometheusProtocolHandler, ScriptHandler,
@@ -69,6 +70,7 @@ use crate::frontend::FrontendOptions;
use crate::sql::insert_to_request;
use crate::table::insert::insert_request_to_insert_batch;
use crate::table::route::TableRoutes;
use crate::Plugins;
#[async_trait]
pub trait FrontendInstance:
@@ -105,6 +107,10 @@ pub struct Instance {
sql_handler: SqlQueryHandlerRef,
grpc_query_handler: GrpcQueryHandlerRef,
grpc_admin_handler: GrpcAdminHandlerRef,
/// plugins: this map holds extensions to customize query or auth
/// behaviours.
plugins: Arc<Plugins>,
}
impl Instance {
@@ -135,6 +141,7 @@ impl Instance {
sql_handler: dist_instance_ref.clone(),
grpc_query_handler: dist_instance_ref.clone(),
grpc_admin_handler: dist_instance_ref,
plugins: Default::default(),
})
}
@@ -178,6 +185,7 @@ impl Instance {
sql_handler: dn_instance.clone(),
grpc_query_handler: dn_instance.clone(),
grpc_admin_handler: dn_instance,
plugins: Default::default(),
}
}
@@ -196,7 +204,7 @@ impl Instance {
/// Handle create expr.
pub async fn handle_create_table(
&self,
mut expr: CreateExpr,
mut expr: CreateTableExpr,
partitions: Option<Partitions>,
) -> Result<Output> {
if let Some(v) = &self.dist_instance {
@@ -206,7 +214,7 @@ impl Instance {
header: Some(ExprHeader {
version: PROTOCOL_VERSION,
}),
expr: Some(admin_expr::Expr::Create(expr)),
expr: Some(admin_expr::Expr::CreateTable(expr)),
};
let result = self
.grpc_admin_handler
@@ -359,8 +367,8 @@ impl Instance {
);
let expr = AlterExpr {
table_name: table_name.to_string(),
schema_name: Some(schema_name.to_string()),
catalog_name: Some(catalog_name.to_string()),
schema_name: schema_name.to_string(),
catalog_name: catalog_name.to_string(),
kind: Some(Kind::AddColumns(add_columns)),
};
@@ -451,6 +459,14 @@ impl Instance {
Ok(Output::RecordBatches(RecordBatches::empty()))
}
pub fn set_plugins(&mut self, map: Arc<Plugins>) {
self.plugins = map;
}
pub fn plugins(&self) -> Arc<Plugins> {
self.plugins.clone()
}
}
#[async_trait]
@@ -461,31 +477,18 @@ impl FrontendInstance for Instance {
}
}
fn parse_stmt(sql: &str) -> Result<Statement> {
let mut stmt = ParserContext::create_with_dialect(sql, &GenericDialect {})
.context(error::ParseSqlSnafu)?;
// TODO(LFC): Support executing multiple SQL queries,
// which seems to be a major change to our whole server framework?
ensure!(
stmt.len() == 1,
error::InvalidSqlSnafu {
err_msg: "Currently executing multiple SQL queries are not supported."
}
);
Ok(stmt.remove(0))
fn parse_stmt(sql: &str) -> Result<Vec<Statement>> {
ParserContext::create_with_dialect(sql, &GenericDialect {}).context(error::ParseSqlSnafu)
}
#[async_trait]
impl SqlQueryHandler for Instance {
async fn do_query(
impl Instance {
async fn query_statement(
&self,
query: &str,
stmt: Statement,
query_ctx: QueryContextRef,
) -> server_error::Result<Output> {
let stmt = parse_stmt(query)
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })?;
// TODO(sunng87): provide a better form to log or track statement
let query = &format!("{:?}", &stmt);
match stmt {
Statement::CreateDatabase(_)
| Statement::ShowDatabases(_)
@@ -494,7 +497,7 @@ impl SqlQueryHandler for Instance {
| Statement::DescribeTable(_)
| Statement::Explain(_)
| Statement::Query(_) => {
return self.sql_handler.do_query(query, query_ctx).await;
return self.sql_handler.do_statement_query(stmt, query_ctx).await;
}
Statement::Insert(insert) => match self.mode {
Mode::Standalone => {
@@ -569,6 +572,78 @@ impl SqlQueryHandler for Instance {
}
}
#[async_trait]
impl SqlQueryHandler for Instance {
async fn do_query(
&self,
query: &str,
query_ctx: QueryContextRef,
) -> Vec<server_error::Result<Output>> {
let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef>();
let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
Ok(q) => q,
Err(e) => return vec![Err(e)],
};
match parse_stmt(query.as_ref())
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })
.and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone()))
{
Ok(stmts) => {
let mut results = Vec::with_capacity(stmts.len());
for stmt in stmts {
// TODO(sunng87): figure out at which stage we can call
// this hook after ArrowFlight adoption. We need to provide
// LogicalPlan as to this hook.
if let Err(e) = query_interceptor.pre_execute(&stmt, None, query_ctx.clone()) {
results.push(Err(e));
break;
}
match self.query_statement(stmt, query_ctx.clone()).await {
Ok(output) => {
let output_result =
query_interceptor.post_execute(output, query_ctx.clone());
results.push(output_result);
}
Err(e) => {
results.push(Err(e));
break;
}
}
}
results
}
Err(e) => {
vec![Err(e)]
}
}
}
async fn do_statement_query(
&self,
stmt: Statement,
query_ctx: QueryContextRef,
) -> server_error::Result<Output> {
let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef>();
// TODO(sunng87): figure out at which stage we can call
// this hook after ArrowFlight adoption. We need to provide
// LogicalPlan as to this hook.
query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
self.query_statement(stmt, query_ctx.clone())
.await
.and_then(|output| query_interceptor.post_execute(output, query_ctx.clone()))
}
fn is_valid_schema(&self, catalog: &str, schema: &str) -> server_error::Result<bool> {
self.catalog_manager
.schema(catalog, schema)
.map(|s| s.is_some())
.context(server_error::CatalogSnafu)
}
}
#[async_trait]
impl ScriptHandler for Instance {
async fn insert_script(&self, name: &str, script: &str) -> server_error::Result<()> {
@@ -610,7 +685,7 @@ impl GrpcQueryHandler for Instance {
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu {
query: format!("{:?}", insert_expr),
query: format!("{insert_expr:?}"),
})?;
let object_result = match output {
Output::AffectedRows(rows) => ObjectResultBuilder::default()
@@ -630,7 +705,7 @@ impl GrpcAdminHandler for Instance {
async fn exec_admin_request(&self, mut expr: AdminExpr) -> server_error::Result<AdminResult> {
// Force the default to be `None` rather than `Some(0)` comes from gRPC decode.
// Related issue: #480
if let Some(api::v1::admin_expr::Expr::Create(create)) = &mut expr.expr {
if let Some(api::v1::admin_expr::Expr::CreateTable(create)) = &mut expr.expr {
create.table_id = None;
}
self.grpc_admin_handler.exec_admin_request(expr).await
@@ -640,6 +715,8 @@ impl GrpcAdminHandler for Instance {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::borrow::Cow;
use std::sync::atomic::AtomicU32;
use api::v1::codec::SelectResult;
use api::v1::column::SemanticType;
@@ -671,6 +748,7 @@ mod tests {
) engine=mito with(regions=1);"#;
let output = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
.await
.remove(0)
.unwrap();
match output {
Output::AffectedRows(rows) => assert_eq!(rows, 1),
@@ -684,6 +762,7 @@ mod tests {
"#;
let output = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
.await
.remove(0)
.unwrap();
match output {
Output::AffectedRows(rows) => assert_eq!(rows, 3),
@@ -693,6 +772,7 @@ mod tests {
let sql = "select * from demo";
let output = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
.await
.remove(0)
.unwrap();
match output {
Output::RecordBatches(_) => {
@@ -720,6 +800,7 @@ mod tests {
let sql = "select * from demo where ts>cast(1000000000 as timestamp)"; // use nanoseconds as where condition
let output = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
.await
.remove(0)
.unwrap();
match output {
Output::RecordBatches(_) => {
@@ -808,7 +889,7 @@ mod tests {
let create_expr = create_expr();
let admin_expr = AdminExpr {
header: Some(ExprHeader::default()),
expr: Some(admin_expr::Expr::Create(create_expr)),
expr: Some(admin_expr::Expr::CreateTable(create_expr)),
};
let result = GrpcAdminHandler::exec_admin_request(&*instance, admin_expr)
.await
@@ -886,48 +967,46 @@ mod tests {
}
}
fn create_expr() -> CreateExpr {
fn create_expr() -> CreateTableExpr {
let column_defs = vec![
GrpcColumnDef {
name: "host".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: false,
default_constraint: None,
default_constraint: vec![],
},
GrpcColumnDef {
name: "cpu".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
GrpcColumnDef {
name: "memory".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
GrpcColumnDef {
name: "disk_util".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: Some(
ColumnDefaultConstraint::Value(Value::from(9.9f64))
.try_into()
.unwrap(),
),
default_constraint: ColumnDefaultConstraint::Value(Value::from(9.9f64))
.try_into()
.unwrap(),
},
GrpcColumnDef {
name: "ts".to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
is_nullable: true,
default_constraint: None,
default_constraint: vec![],
},
];
CreateExpr {
catalog_name: None,
schema_name: None,
CreateTableExpr {
catalog_name: "".to_string(),
schema_name: "".to_string(),
table_name: "demo".to_string(),
desc: None,
desc: "".to_string(),
column_defs,
time_index: "ts".to_string(),
primary_keys: vec!["host".to_string()],
@@ -937,4 +1016,164 @@ mod tests {
region_ids: vec![0],
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_sql_interceptor_plugin() {
#[derive(Default)]
struct AssertionHook {
pub(crate) c: AtomicU32,
}
impl SqlQueryInterceptor for AssertionHook {
fn pre_parsing<'a>(
&self,
query: &'a str,
_query_ctx: QueryContextRef,
) -> server_error::Result<std::borrow::Cow<'a, str>> {
self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
assert!(query.starts_with("CREATE TABLE demo"));
Ok(Cow::Borrowed(query))
}
fn post_parsing(
&self,
statements: Vec<Statement>,
_query_ctx: QueryContextRef,
) -> server_error::Result<Vec<Statement>> {
self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
assert!(matches!(statements[0], Statement::CreateTable(_)));
Ok(statements)
}
fn pre_execute(
&self,
_statement: &Statement,
_plan: Option<&query::plan::LogicalPlan>,
_query_ctx: QueryContextRef,
) -> server_error::Result<()> {
self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Ok(())
}
fn post_execute(
&self,
mut output: Output,
_query_ctx: QueryContextRef,
) -> server_error::Result<Output> {
self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
match &mut output {
Output::AffectedRows(rows) => {
assert_eq!(*rows, 1);
// update output result
*rows = 10;
}
_ => unreachable!(),
}
Ok(output)
}
}
let query_ctx = Arc::new(QueryContext::new());
let (mut instance, _guard) = tests::create_frontend_instance("test_hook").await;
let mut plugins = Plugins::new();
let counter_hook = Arc::new(AssertionHook::default());
plugins.insert::<SqlQueryInterceptorRef>(counter_hook.clone());
Arc::make_mut(&mut instance).set_plugins(Arc::new(plugins));
let sql = r#"CREATE TABLE demo(
host STRING,
ts TIMESTAMP,
cpu DOUBLE NULL,
memory DOUBLE NULL,
disk_util DOUBLE DEFAULT 9.9,
TIME INDEX (ts),
PRIMARY KEY(host)
) engine=mito with(regions=1);"#;
let output = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
.await
.remove(0)
.unwrap();
// assert that the hook is called 3 times
assert_eq!(4, counter_hook.c.load(std::sync::atomic::Ordering::Relaxed));
match output {
Output::AffectedRows(rows) => assert_eq!(rows, 10),
_ => unreachable!(),
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_disable_db_operation_plugin() {
#[derive(Default)]
struct DisableDBOpHook;
impl SqlQueryInterceptor for DisableDBOpHook {
fn post_parsing(
&self,
statements: Vec<Statement>,
_query_ctx: QueryContextRef,
) -> server_error::Result<Vec<Statement>> {
for s in &statements {
match s {
Statement::CreateDatabase(_) | Statement::ShowDatabases(_) => {
return Err(server_error::Error::NotSupported {
feat: "Database operations".to_owned(),
})
}
_ => {}
}
}
Ok(statements)
}
}
let query_ctx = Arc::new(QueryContext::new());
let (mut instance, _guard) = tests::create_frontend_instance("test_db_hook").await;
let mut plugins = Plugins::new();
let hook = Arc::new(DisableDBOpHook::default());
plugins.insert::<SqlQueryInterceptorRef>(hook.clone());
Arc::make_mut(&mut instance).set_plugins(Arc::new(plugins));
let sql = r#"CREATE TABLE demo(
host STRING,
ts TIMESTAMP,
cpu DOUBLE NULL,
memory DOUBLE NULL,
disk_util DOUBLE DEFAULT 9.9,
TIME INDEX (ts),
PRIMARY KEY(host)
) engine=mito with(regions=1);"#;
let output = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
.await
.remove(0)
.unwrap();
match output {
Output::AffectedRows(rows) => assert_eq!(rows, 1),
_ => unreachable!(),
}
let sql = r#"CREATE DATABASE tomcat"#;
if let Err(e) = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
.await
.remove(0)
{
assert!(matches!(e, server_error::Error::NotSupported { .. }));
} else {
unreachable!();
}
let sql = r#"SELECT 1; SHOW DATABASES"#;
if let Err(e) = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
.await
.remove(0)
{
assert!(matches!(e, server_error::Error::NotSupported { .. }));
} else {
unreachable!();
}
}
}

View File

@@ -18,12 +18,12 @@ use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::result::AdminResultBuilder;
use api::v1::{
admin_expr, AdminExpr, AdminResult, AlterExpr, CreateDatabaseExpr, CreateExpr, ObjectExpr,
ObjectResult,
admin_expr, AdminExpr, AdminResult, AlterExpr, CreateDatabaseExpr, CreateTableExpr, ObjectExpr,
ObjectResult, TableId,
};
use async_trait::async_trait;
use catalog::helper::{SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue};
use catalog::CatalogList;
use catalog::{CatalogList, CatalogManager};
use chrono::DateTime;
use client::admin::{admin_result_to_output, Admin};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
@@ -86,7 +86,7 @@ impl DistInstance {
pub(crate) async fn create_table(
&self,
create_table: &mut CreateExpr,
create_table: &mut CreateTableExpr,
partitions: Option<Partitions>,
) -> Result<Output> {
let response = self.create_table_in_meta(create_table, partitions).await?;
@@ -112,7 +112,9 @@ impl DistInstance {
table_name: create_table.table_name.to_string()
}
);
create_table.table_id = Some(table_route.table.id as u32);
create_table.table_id = Some(TableId {
id: table_route.table.id as u32,
});
self.put_table_global_meta(create_table, table_route)
.await?;
@@ -140,14 +142,17 @@ impl DistInstance {
Ok(Output::AffectedRows(0))
}
async fn handle_sql(&self, sql: &str, query_ctx: QueryContextRef) -> Result<Output> {
let stmt = parse_stmt(sql)?;
async fn handle_statement(
&self,
stmt: Statement,
query_ctx: QueryContextRef,
) -> Result<Output> {
match stmt {
Statement::Query(_) => {
let plan = self
.query_engine
.statement_to_plan(stmt, query_ctx)
.context(error::ExecuteSqlSnafu { sql })?;
.context(error::ExecuteStatementSnafu {})?;
self.query_engine.execute(&plan).await
}
Statement::CreateDatabase(stmt) => {
@@ -171,7 +176,30 @@ impl DistInstance {
}
_ => unreachable!(),
}
.context(error::ExecuteSqlSnafu { sql })
.context(error::ExecuteStatementSnafu)
}
async fn handle_sql(&self, sql: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
let stmts = parse_stmt(sql);
match stmts {
Ok(stmts) => {
let mut results = Vec::with_capacity(stmts.len());
for stmt in stmts {
let result = self.handle_statement(stmt, query_ctx.clone()).await;
let is_err = result.is_err();
results.push(result);
if is_err {
break;
}
}
results
}
Err(e) => vec![Err(e)],
}
}
/// Handles distributed database creation
@@ -194,8 +222,16 @@ impl DistInstance {
}
async fn handle_alter_table(&self, expr: AlterExpr) -> Result<AdminResult> {
let catalog_name = expr.catalog_name.as_deref().unwrap_or(DEFAULT_CATALOG_NAME);
let schema_name = expr.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME);
let catalog_name = if expr.catalog_name.is_empty() {
DEFAULT_CATALOG_NAME
} else {
expr.catalog_name.as_str()
};
let schema_name = if expr.schema_name.is_empty() {
DEFAULT_SCHEMA_NAME
} else {
expr.schema_name.as_str()
};
let table_name = expr.table_name.as_str();
let table = self
.catalog_manager
@@ -205,12 +241,12 @@ impl DistInstance {
.schema(schema_name)
.context(CatalogSnafu)?
.context(SchemaNotFoundSnafu {
schema_info: format!("{}.{}", catalog_name, schema_name),
schema_info: format!("{catalog_name}.{schema_name}"),
})?
.table(table_name)
.context(CatalogSnafu)?
.context(TableNotFoundSnafu {
table_name: format!("{}.{}.{}", catalog_name, schema_name, table_name),
table_name: format!("{catalog_name}.{schema_name}.{table_name}"),
})?;
let dist_table = table
@@ -223,20 +259,18 @@ impl DistInstance {
async fn create_table_in_meta(
&self,
create_table: &CreateExpr,
create_table: &CreateTableExpr,
partitions: Option<Partitions>,
) -> Result<RouteResponse> {
let table_name = TableName::new(
create_table
.catalog_name
.clone()
.unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string()),
create_table
.schema_name
.clone()
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string()),
create_table.table_name.clone(),
);
let mut catalog_name = create_table.catalog_name.clone();
if catalog_name.is_empty() {
catalog_name = DEFAULT_CATALOG_NAME.to_string();
}
let mut schema_name = create_table.schema_name.clone();
if schema_name.is_empty() {
schema_name = DEFAULT_SCHEMA_NAME.to_string();
}
let table_name = TableName::new(catalog_name, schema_name, create_table.table_name.clone());
let partitions = parse_partitions(create_table, partitions)?;
let request = MetaCreateRequest {
@@ -252,7 +286,7 @@ impl DistInstance {
// TODO(LFC): Maybe move this to FrontendCatalogManager's "register_table" method?
async fn put_table_global_meta(
&self,
create_table: &CreateExpr,
create_table: &CreateTableExpr,
table_route: &TableRoute,
) -> Result<()> {
let table_name = &table_route.table.table_name;
@@ -274,10 +308,12 @@ impl DistInstance {
.await
.context(CatalogSnafu)?
{
let existing_bytes = existing.unwrap(); //this unwrap is safe since we compare with empty bytes and failed
let existing_bytes = existing.unwrap(); // this unwrap is safe since we compare with empty bytes and failed
let existing_value =
TableGlobalValue::from_bytes(&existing_bytes).context(CatalogEntrySerdeSnafu)?;
if existing_value.table_info.ident.table_id != create_table.table_id.unwrap() {
if existing_value.table_info.ident.table_id
!= create_table.table_id.as_ref().unwrap().id
{
error!(
"Table with name {} already exists, value in catalog: {:?}",
key, existing_bytes
@@ -300,11 +336,33 @@ impl SqlQueryHandler for DistInstance {
&self,
query: &str,
query_ctx: QueryContextRef,
) -> server_error::Result<Output> {
) -> Vec<server_error::Result<Output>> {
self.handle_sql(query, query_ctx)
.await
.into_iter()
.map(|r| {
r.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })
})
.collect()
}
async fn do_statement_query(
&self,
stmt: Statement,
query_ctx: QueryContextRef,
) -> server_error::Result<Output> {
self.handle_statement(stmt, query_ctx)
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })
.context(server_error::ExecuteStatementSnafu)
}
fn is_valid_schema(&self, catalog: &str, schema: &str) -> server_error::Result<bool> {
self.catalog_manager
.schema(catalog, schema)
.map(|s| s.is_some())
.context(server_error::CatalogSnafu)
}
}
@@ -334,13 +392,13 @@ impl GrpcAdminHandler for DistInstance {
}
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu {
query: format!("{:?}", query),
query: format!("{query:?}"),
})
}
}
fn create_table_global_value(
create_table: &CreateExpr,
create_table: &CreateTableExpr,
table_route: &TableRoute,
) -> Result<TableGlobalValue> {
let table_name = &table_route.table.table_name;
@@ -419,13 +477,19 @@ fn create_table_global_value(
created_on: DateTime::default(),
};
let desc = if create_table.desc.is_empty() {
None
} else {
Some(create_table.desc.clone())
};
let table_info = RawTableInfo {
ident: TableIdent {
table_id: table_route.table.id as u32,
version: 0,
},
name: table_name.table_name.clone(),
desc: create_table.desc.clone(),
desc,
catalog_name: table_name.catalog_name.clone(),
schema_name: table_name.schema_name.clone(),
meta,
@@ -440,7 +504,7 @@ fn create_table_global_value(
}
fn parse_partitions(
create_table: &CreateExpr,
create_table: &CreateTableExpr,
partitions: Option<Partitions>,
) -> Result<Vec<MetaPartition>> {
// If partitions are not defined by user, use the timestamp column (which has to be existed) as
@@ -455,7 +519,7 @@ fn parse_partitions(
}
fn find_partition_entries(
create_table: &CreateExpr,
create_table: &CreateTableExpr,
partitions: &Option<Partitions>,
partition_columns: &[String],
) -> Result<Vec<Vec<PartitionBound>>> {
@@ -505,7 +569,7 @@ fn find_partition_entries(
}
fn find_partition_columns(
create_table: &CreateExpr,
create_table: &CreateTableExpr,
partitions: &Option<Partitions>,
) -> Result<Vec<String>> {
let columns = if let Some(partitions) = partitions {
@@ -539,7 +603,7 @@ mod test {
let cases = [
(
r"
CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
PARTITION BY RANGE COLUMNS (b) (
PARTITION r0 VALUES LESS THAN ('hz'),
PARTITION r1 VALUES LESS THAN ('sh'),
@@ -585,6 +649,7 @@ ENGINE=mito",
let output = dist_instance
.handle_sql(sql, QueryContext::arc())
.await
.remove(0)
.unwrap();
match output {
Output::AffectedRows(rows) => assert_eq!(rows, 1),
@@ -595,6 +660,7 @@ ENGINE=mito",
let output = dist_instance
.handle_sql(sql, QueryContext::arc())
.await
.remove(0)
.unwrap();
match output {
Output::RecordBatches(r) => {
@@ -633,6 +699,7 @@ ENGINE=mito",
dist_instance
.handle_sql(sql, QueryContext::arc())
.await
.remove(0)
.unwrap();
let sql = "
@@ -651,11 +718,16 @@ ENGINE=mito",
dist_instance
.handle_sql(sql, QueryContext::arc())
.await
.remove(0)
.unwrap();
async fn assert_show_tables(instance: SqlQueryHandlerRef) {
let sql = "show tables in test_show_tables";
let output = instance.do_query(sql, QueryContext::arc()).await.unwrap();
let output = instance
.do_query(sql, QueryContext::arc())
.await
.remove(0)
.unwrap();
match output {
Output::RecordBatches(r) => {
let expected = r#"+--------------+

View File

@@ -51,7 +51,7 @@ impl Instance {
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu {
query: format!("{:?}", data_point),
query: format!("{data_point:?}"),
})?;
Ok(())
}
@@ -130,6 +130,7 @@ mod tests {
Arc::new(QueryContext::new()),
)
.await
.remove(0)
.unwrap();
match output {
Output::Stream(stream) => {

View File

@@ -27,9 +27,9 @@ use servers::prometheus::{self, Metrics};
use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse};
use servers::Mode;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use crate::instance::Instance;
use crate::instance::{parse_stmt, Instance};
const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32;
@@ -54,8 +54,7 @@ fn negotiate_response_type(accepted_response_types: &[i32]) -> ServerResult<Resp
.find(|t| is_supported(**t))
.with_context(|| error::NotSupportedSnafu {
feat: format!(
"server does not support any of the requested response types: {:?}",
accepted_response_types
"server does not support any of the requested response types: {accepted_response_types:?}",
),
})?;
@@ -94,13 +93,26 @@ impl Instance {
);
let query_ctx = Arc::new(QueryContext::with_current_schema(db.to_string()));
let output = self.sql_handler.do_query(&sql, query_ctx).await;
let mut stmts = parse_stmt(&sql)
.map_err(BoxedError::new)
.context(error::ExecuteQuerySnafu { query: &sql })?;
ensure!(
stmts.len() == 1,
error::InvalidQuerySnafu {
reason: "The sql has multiple statements".to_string()
}
);
let stmt = stmts.remove(0);
let output = self.sql_handler.do_statement_query(stmt, query_ctx).await;
let object_result = to_object_result(output)
.await
.try_into()
.map_err(BoxedError::new)
.context(error::ExecuteQuerySnafu { query: sql })?;
.context(error::ExecuteQuerySnafu { query: &sql })?;
results.push((table_name, object_result));
}
@@ -118,7 +130,7 @@ impl PrometheusProtocolHandler for Instance {
.await
.map_err(BoxedError::new)
.with_context(|_| error::ExecuteInsertSnafu {
msg: format!("{:?}", request),
msg: format!("{request:?}"),
})?;
}
Mode::Distributed => {
@@ -126,7 +138,7 @@ impl PrometheusProtocolHandler for Instance {
.await
.map_err(BoxedError::new)
.with_context(|_| error::ExecuteInsertSnafu {
msg: format!("{:?}", request),
msg: format!("{request:?}"),
})?;
}
}

Some files were not shown because too many files have changed in this diff Show More