Compare commits

..

45 Commits

Author SHA1 Message Date
Lei, HUANG
a8630cdb38 fix: clippy errors 2022-12-15 18:12:05 +08:00
Ruihang Xia
0f3dcc1b38 fix: Fix All The Tests! (#752)
* fix: Fix several tests compile errors

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: some compile errors in tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: compile errors in frontend tests

* fix: compile errors in frontend tests

* test: Fix tests in api and common-query

* test: Fix test in sql crate

* fix: resolve substrait error

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* chore: add more test

* test: Fix tests in servers

* fix instance_test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* test: Fix tests in tests-integration

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Lei, HUANG <mrsatangel@gmail.com>
Co-authored-by: evenyag <realevenyag@gmail.com>
2022-12-15 17:47:14 +08:00
evenyag
7c696dae08 Merge branch 'develop' into replace-arrow2 2022-12-15 15:29:35 +08:00
Yingwen
142dee41d6 fix: Fix compiler errors in script crate (#749)
* fix: Fix compiler errors in state.rs

* fix: fix compiler errors in state

* feat: upgrade sqlparser to 0.26

* fix: fix datafusion engine compiler errors

* fix: Fix some tests in query crate

* fix: Fix all warnings in tests

* feat: Remove `Type` from timestamp's type name

* fix: fix query tests

Now datafusion already supports median, so this commit also remove the
median function

* style: Fix clippy

* feat: Remove RecordBatch::pretty_print

* chore: Address CR comments

* feat: Add column_by_name to RecordBatch

* feat: modify select_from_rb

* feat: Fix some compiler errors in vector.rs

* feat: Fix more compiler errors in vector.rs

* fix: fix table.rs

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: Fix compiler errors in coprocessor

* fix: Fix some compiler errors

* fix: Fix compiler errors in script

* chore: Remove unused imports and format code

* test: disable interval tests

* test: Fix test_compile_execute test

* style: Fix clippy

* feat: Support interval

* feat: Add RecordBatch::columns and fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
2022-12-15 14:20:35 +08:00
Lei, HUANG
ce6d1cb7d1 fix: frontend compile errors (#747)
fix: fix compile errors in frontend
2022-12-14 18:30:16 +08:00
Yingwen
dbb3034ecb fix: Fix compiler errors in query crate (#746)
* fix: Fix compiler errors in state.rs

* fix: fix compiler errors in state

* feat: upgrade sqlparser to 0.26

* fix: fix datafusion engine compiler errors

* fix: Fix some tests in query crate

* fix: Fix all warnings in tests

* feat: Remove `Type` from timestamp's type name

* fix: fix query tests

Now datafusion already supports median, so this commit also remove the
median function

* style: Fix clippy

* feat: Remove RecordBatch::pretty_print

* chore: Address CR comments

* Update src/query/src/query_engine/state.rs

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
2022-12-14 17:42:07 +08:00
Lei, HUANG
652d59a643 fix: remove unwrap 2022-12-13 17:51:14 +08:00
Lei, HUANG
fa971c6513 fix: errors in optimzer 2022-12-13 17:44:37 +08:00
evenyag
36c929e1a7 fix: Fix imports in optimizer.rs 2022-12-13 17:27:44 +08:00
Ruihang Xia
a712382fba Merge pull request #745
* fix nyc-taxi and util

* Merge branch 'replace-arrow2' into fix-others

* fix substrait

* fix warnings and error in test
2022-12-13 16:59:28 +08:00
Yingwen
4b644aa482 fix: Fix compiler errors in catalog and mito crates (#742)
* fix: Fix compiler errors in mito

* fix: Fix compiler errors in catalog crate

* style: Fix clippy

* chore: Fix use
2022-12-13 15:53:55 +08:00
Lei, HUANG
4defde055c feat: upgrade storage crate to arrow and parquet offcial impl (#738)
* fix: compile erros

* fix: parquet reader and writer

* fix: parquet reader and writer

* fix: WriteBatch IPC encode/decode

* fix: clippy errors in storage subcrate

* chore: remove suspicious unwrap

* fix: some cr comments

* fix: CR comments

* fix: CR comments
2022-12-13 11:58:50 +08:00
Ruihang Xia
95b2d8654f fix: pre-cast to avoid tremendous match arms (#734)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2022-12-09 17:20:03 +08:00
Ruihang Xia
42fdc7251a fix: Fix common grpc expr (#730)
* fix compile errors

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* rename fn names

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix styles

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix wranings in common-time

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2022-12-09 14:24:04 +08:00
Ruihang Xia
d0892bf0b7 fix: Fix compile error in server subcrate (#727)
* fix: Fix compile error in server subcrate

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unused type alias

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* explicitly panic

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/storage/src/sst/parquet.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
2022-12-08 20:27:53 +08:00
Ruihang Xia
fff530cb50 fix common record batch
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2022-12-08 17:58:53 +08:00
Yingwen
b936d8b18a fix: Fix common::grpc compiler errors (#722)
* fix: Fix common::grpc compiler errors

This commit refactors RecordBatch and holds vectors in the RecordBatch
struct, so we don't need to cast the array to vector when doing
serialization or iterating the batch.

Now we use the vector API instead of the arrow API in grpc crate.

* chore: Address CR comments
2022-12-08 17:51:20 +08:00
Lei, HUANG
1bde1ba399 fix: row group pruning (#725)
* fix: row group pruning

* chore: use macro to simplify stats implemetation

* fxi: CR comments

* fix: row group metadata length mismatch

* fix: simplify code
2022-12-08 17:44:04 +08:00
Ruihang Xia
3687bc7346 fix: Fix tests and clippy for common-function subcrate (#726)
* further fixing

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix all compile errors in common function

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* revert test changes

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2022-12-08 17:01:54 +08:00
Ruihang Xia
587bdc9800 fix: fix other compile error in common-function (#719)
* further fixing

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix all compile errors in common function

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2022-12-08 11:38:07 +08:00
Yingwen
58c26def6b fix: fix argmin/percentile/clip/interp/scipy_stats_norm_pdf errors (#718)
fix: fix argmin/percentile/clip/interp/scipy_stats_norm_pdf compiler errors
2022-12-07 19:55:07 +08:00
Ruihang Xia
6f3baf96b0 fix: fix compile error for mean/polyval/pow/interp ops (#717)
* fix: fix compile error for mean/polyval/pow/interp ops

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* simplify type bounds

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2022-12-07 16:38:43 +08:00
Yingwen
a898f846d1 fix: Fix compiler errors in argmax/rate/median/norm_cdf (#716)
* fix: Fix compiler errors in argmax/rate/median/norm_cdf

* chore: Address CR comments
2022-12-07 15:28:27 +08:00
Ruihang Xia
a562199455 Revert "fix: fix compile error for mean/polyval/pow/interp ops"
This reverts commit fb0b4eb826.
2022-12-07 15:13:58 +08:00
Ruihang Xia
fb0b4eb826 fix: fix compile error for mean/polyval/pow/interp ops
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2022-12-07 15:12:28 +08:00
Yingwen
2ba99259e1 feat: Implements diff accumulator using WrapperType (#715)
* feat: Remove usage of opaque error from common::recordbatch

* feat: Remove opaque error from common::query

* feat: Fix diff compiler errors

Now common_function just use common_query's Error and Result. Adds
a LargestType associated type to LogicalPrimitiveType to get the largest
type a logical primitive type can cast to.

* feat: Remove LargestType from NativeType trait

* chore: Update comments

* feat: Restrict Scalar::RefType of WrapperType to itself

Add trait bound `for<'a> Scalar<RefType<'a> = Self>` to WrapperType

* chore: Address CR comments

* chore: Format codes
2022-12-07 11:13:24 +08:00
Ruihang Xia
551cde23b1 Merge branch 'dev' into replace-arrow2 2022-12-07 10:50:27 +08:00
Yingwen
653906d4fa fix: Fix common::query compiler errors (#713)
* feat: Move conversion to ScalarValue to value.rs

* fix: Fix common::query compiler errors

This commit also make InnerError pub(crate)
2022-12-06 16:45:54 +08:00
Ruihang Xia
829ff491c4 fix: common-query subcrate (#712)
* fix: record batch adapter

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix error enum

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2022-12-06 16:32:52 +08:00
Yingwen
b32438e78c feat: Fix some compiler errors in common::query (#710)
* feat: Fix some compiler errors in common::query

* feat: test_collect use vectors api
2022-12-06 15:32:12 +08:00
Lei, HUANG
0ccb8b4302 chore: delete datatypes based on arrow2 2022-12-06 15:01:57 +08:00
Lei, HUANG
b48ae21b71 fix: api crate (#708)
* fix: rename ConcreteDataType::timestamp_millis_type to ConcreteDataType::timestamp_millisecond_type. fix other warnings regarding timestamp

* fix: revert changes in datatypes2

* fix: helper
2022-12-06 14:56:59 +08:00
evenyag
3c0adb00f3 feat: Fix recordbatch test compiling issue 2022-12-06 12:03:06 +08:00
evenyag
8c66b7d000 feat: Fix common::recordbatch compiler errors 2022-12-06 11:55:19 +08:00
evenyag
99371fd31b chore: sort Cargo.toml 2022-12-06 11:39:15 +08:00
evenyag
fe505fecfd feat: Make recordbatch compile 2022-12-06 11:38:59 +08:00
evenyag
cc1ec26416 feat: Switch to datatypes2 2022-12-05 20:30:47 +08:00
Ruihang Xia
504059a699 chore: fix wrong merge commit
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2022-12-05 20:11:22 +08:00
Ruihang Xia
7151deb4ed Merge branch 'dev' into replace-arrow2 2022-12-05 20:10:37 +08:00
Ruihang Xia
d0686f9c19 Merge branch 'replace-arrow2' of github.com:GreptimeTeam/greptimedb into replace-arrow2 2022-11-21 17:43:40 +08:00
Ruihang Xia
221f3e9d2e Merge branch 'dev' into replace-arrow2 2022-11-21 17:42:15 +08:00
evenyag
61c4a3691a chore: update dep of binary vector 2022-11-21 15:55:07 +08:00
evenyag
d7626fd6af feat: arrow_array switch to arrow 2022-11-21 15:39:41 +08:00
Ruihang Xia
e3201a4705 chore: replace one last datafusion dep
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2022-11-21 14:29:59 +08:00
Ruihang Xia
571a84d91b chore: kick off. change datafusion/arrow/parquet to target version
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2022-11-21 14:19:39 +08:00
286 changed files with 6429 additions and 4893 deletions

View File

@@ -24,7 +24,7 @@ on:
name: Code coverage
env:
RUST_TOOLCHAIN: nightly-2022-12-20
RUST_TOOLCHAIN: nightly-2022-07-14
jobs:
coverage:
@@ -34,11 +34,6 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: arduino/setup-protoc@v1
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: KyleMayes/install-llvm-action@v1
with:
version: "14.0"
- name: Install toolchain
uses: dtolnay/rust-toolchain@master
with:
@@ -53,7 +48,6 @@ jobs:
- name: Collect coverage data
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info
env:
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=lld"
RUST_BACKTRACE: 1
CARGO_INCREMENTAL: 0
GT_S3_BUCKET: ${{ secrets.S3_BUCKET }}

View File

@@ -23,7 +23,7 @@ on:
name: CI
env:
RUST_TOOLCHAIN: nightly-2022-12-20
RUST_TOOLCHAIN: nightly-2022-07-14
jobs:
typos:
@@ -41,8 +41,6 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: arduino/setup-protoc@v1
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
@@ -83,8 +81,6 @@ jobs:
# path: ./llvm
# key: llvm
# - uses: arduino/setup-protoc@v1
# with:
# repo-token: ${{ secrets.GITHUB_TOKEN }}
# - uses: KyleMayes/install-llvm-action@v1
# with:
# version: "14.0"
@@ -118,8 +114,6 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: arduino/setup-protoc@v1
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
@@ -137,8 +131,6 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: arduino/setup-protoc@v1
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}

View File

@@ -10,7 +10,7 @@ on:
name: Release
env:
RUST_TOOLCHAIN: nightly-2022-12-20
RUST_TOOLCHAIN: nightly-2022-07-14
# FIXME(zyy17): Would be better to use `gh release list -L 1 | cut -f 3` to get the latest release version tag, but for a long time, we will stay at 'v0.1.0-alpha-*'.
SCHEDULED_BUILD_VERSION_PREFIX: v0.1.0-alpha

692
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -39,23 +39,5 @@ members = [
"tests/runner",
]
[workspace.package]
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
[workspace.dependencies]
arrow = "29.0"
arrow-schema = { version = "29.0", features = ["serde"] }
# TODO(LFC): Use released Datafusion when it officially dpendent on Arrow 29.0
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "4917235a398ae20145c87d20984e6367dc1a0c1e" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "4917235a398ae20145c87d20984e6367dc1a0c1e" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "4917235a398ae20145c87d20984e6367dc1a0c1e" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "4917235a398ae20145c87d20984e6367dc1a0c1e" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "4917235a398ae20145c87d20984e6367dc1a0c1e" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "4917235a398ae20145c87d20984e6367dc1a0c1e" }
parquet = "29.0"
sqlparser = "0.28"
[profile.release]
debug = true

View File

@@ -1,14 +1,14 @@
[package]
name = "benchmarks"
version.workspace = true
edition.workspace = true
license.workspace = true
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
[dependencies]
arrow.workspace = true
arrow = "26.0.0"
clap = { version = "4.0", features = ["derive"] }
client = { path = "../src/client" }
indicatif = "0.17.1"
itertools = "0.10.5"
parquet.workspace = true
parquet = "26.0.0"
tokio = { version = "1.21", features = ["full"] }

View File

@@ -15,6 +15,7 @@
//! Use the taxi trip records from New York City dataset to bench. You can download the dataset from
//! [here](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page).
#![feature(once_cell)]
#![allow(clippy::print_stdout)]
use std::collections::HashMap;
@@ -27,7 +28,7 @@ use arrow::record_batch::RecordBatch;
use clap::Parser;
use client::admin::Admin;
use client::api::v1::column::Values;
use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertExpr, TableId};
use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateExpr, InsertExpr};
use client::{Client, Database, Select};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
@@ -93,7 +94,7 @@ async fn write_data(
.unwrap();
let progress_bar = mpb.add(ProgressBar::new(row_num as _));
progress_bar.set_style(pb_style);
progress_bar.set_message(format!("{path:?}"));
progress_bar.set_message(format!("{:?}", path));
let mut total_rpc_elapsed_ms = 0;
@@ -114,7 +115,10 @@ async fn write_data(
progress_bar.inc(row_count as _);
}
progress_bar.finish_with_message(format!("file {path:?} done in {total_rpc_elapsed_ms}ms",));
progress_bar.finish_with_message(format!(
"file {:?} done in {}ms",
path, total_rpc_elapsed_ms
));
total_rpc_elapsed_ms
}
@@ -215,126 +219,126 @@ fn build_values(column: &ArrayRef) -> Values {
}
}
fn create_table_expr() -> CreateTableExpr {
CreateTableExpr {
catalog_name: CATALOG_NAME.to_string(),
schema_name: SCHEMA_NAME.to_string(),
fn create_table_expr() -> CreateExpr {
CreateExpr {
catalog_name: Some(CATALOG_NAME.to_string()),
schema_name: Some(SCHEMA_NAME.to_string()),
table_name: TABLE_NAME.to_string(),
desc: "".to_string(),
desc: None,
column_defs: vec![
ColumnDef {
name: "VendorID".to_string(),
datatype: ColumnDataType::Int64 as i32,
is_nullable: true,
default_constraint: vec![],
default_constraint: None,
},
ColumnDef {
name: "tpep_pickup_datetime".to_string(),
datatype: ColumnDataType::Int64 as i32,
is_nullable: true,
default_constraint: vec![],
default_constraint: None,
},
ColumnDef {
name: "tpep_dropoff_datetime".to_string(),
datatype: ColumnDataType::Int64 as i32,
is_nullable: true,
default_constraint: vec![],
default_constraint: None,
},
ColumnDef {
name: "passenger_count".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: vec![],
default_constraint: None,
},
ColumnDef {
name: "trip_distance".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: vec![],
default_constraint: None,
},
ColumnDef {
name: "RatecodeID".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: vec![],
default_constraint: None,
},
ColumnDef {
name: "store_and_fwd_flag".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: vec![],
default_constraint: None,
},
ColumnDef {
name: "PULocationID".to_string(),
datatype: ColumnDataType::Int64 as i32,
is_nullable: true,
default_constraint: vec![],
default_constraint: None,
},
ColumnDef {
name: "DOLocationID".to_string(),
datatype: ColumnDataType::Int64 as i32,
is_nullable: true,
default_constraint: vec![],
default_constraint: None,
},
ColumnDef {
name: "payment_type".to_string(),
datatype: ColumnDataType::Int64 as i32,
is_nullable: true,
default_constraint: vec![],
default_constraint: None,
},
ColumnDef {
name: "fare_amount".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: vec![],
default_constraint: None,
},
ColumnDef {
name: "extra".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: vec![],
default_constraint: None,
},
ColumnDef {
name: "mta_tax".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: vec![],
default_constraint: None,
},
ColumnDef {
name: "tip_amount".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: vec![],
default_constraint: None,
},
ColumnDef {
name: "tolls_amount".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: vec![],
default_constraint: None,
},
ColumnDef {
name: "improvement_surcharge".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: vec![],
default_constraint: None,
},
ColumnDef {
name: "total_amount".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: vec![],
default_constraint: None,
},
ColumnDef {
name: "congestion_surcharge".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: vec![],
default_constraint: None,
},
ColumnDef {
name: "airport_fee".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: vec![],
default_constraint: None,
},
],
time_index: "tpep_pickup_datetime".to_string(),
@@ -342,7 +346,7 @@ fn create_table_expr() -> CreateTableExpr {
create_if_not_exists: false,
table_options: Default::default(),
region_ids: vec![0],
table_id: Some(TableId { id: 0 }),
table_id: Some(0),
}
}
@@ -351,12 +355,12 @@ fn query_set() -> HashMap<String, String> {
ret.insert(
"count_all".to_string(),
format!("SELECT COUNT(*) FROM {TABLE_NAME};"),
format!("SELECT COUNT(*) FROM {};", TABLE_NAME),
);
ret.insert(
"fare_amt_by_passenger".to_string(),
format!("SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), SUM(fare_amount) FROM {TABLE_NAME} GROUP BY passenger_count")
format!("SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), SUM(fare_amount) FROM {} GROUP BY passenger_count",TABLE_NAME)
);
ret
@@ -369,7 +373,7 @@ async fn do_write(args: &Args, client: &Client) {
let mut write_jobs = JoinSet::new();
let create_table_result = admin.create(create_table_expr()).await;
println!("Create table result: {create_table_result:?}");
println!("Create table result: {:?}", create_table_result);
let progress_bar_style = ProgressStyle::with_template(
"[{elapsed_precise}] {bar:60.cyan/blue} {pos:>7}/{len:7} {msg}",
@@ -402,7 +406,7 @@ async fn do_write(args: &Args, client: &Client) {
async fn do_query(num_iter: usize, db: &Database) {
for (query_name, query) in query_set() {
println!("Running query: {query}");
println!("Running query: {}", query);
for i in 0..num_iter {
let now = Instant::now();
let _res = db.select(Select::Sql(query.clone())).await.unwrap();

View File

@@ -7,14 +7,6 @@ mysql_addr = '127.0.0.1:4406'
mysql_runtime_size = 4
enable_memory_catalog = false
[wal]
dir = "/tmp/greptimedb/wal"
file_size = 1073741824
purge_interval = 600
purge_threshold = 53687091200
read_batch_size = 128
sync_write = false
[storage]
type = 'File'
data_dir = '/tmp/greptimedb/data/'

View File

@@ -1,20 +1,12 @@
node_id = 0
mode = 'standalone'
wal_dir = '/tmp/greptimedb/wal/'
enable_memory_catalog = false
[http_options]
addr = '127.0.0.1:4000'
timeout = "30s"
[wal]
dir = "/tmp/greptimedb/wal"
file_size = 1073741824
purge_interval = 600
purge_threshold = 53687091200
read_batch_size = 128
sync_write = false
[storage]
type = 'File'
data_dir = '/tmp/greptimedb/data/'

View File

@@ -24,8 +24,6 @@ RUN cargo build --release
# TODO(zyy17): Maybe should use the more secure container image.
FROM ubuntu:22.04 as base
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y install ca-certificates
WORKDIR /greptime
COPY --from=builder /greptimedb/target/release/greptime /greptime/bin/
ENV PATH /greptime/bin/:$PATH

View File

@@ -1,7 +1,5 @@
FROM ubuntu:22.04
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y install ca-certificates
ARG TARGETARCH
ADD $TARGETARCH/greptime /greptime/bin/

Binary file not shown.

Before

Width:  |  Height:  |  Size: 34 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 58 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 35 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 46 KiB

View File

@@ -1,175 +0,0 @@
---
Feature Name: "promql-in-rust"
Tracking Issue: https://github.com/GreptimeTeam/greptimedb/issues/596
Date: 2022-12-20
Author: "Ruihang Xia <waynestxia@gmail.com>"
---
Rewrite PromQL in Rust
----------------------
# Summary
A Rust native implementation of PromQL, for GreptimeDB.
# Motivation
Prometheus and its query language PromQL prevails in the cloud-native observability area, which is an important scenario for time series database like GreptimeDB. We already have support for its remote read and write protocols. Users can now integrate GreptimeDB as the storage backend to existing Prometheus deployment, but cannot run PromQL query directly on GreptimeDB like SQL.
This RFC proposes to add support for PromQL. Because it was created in Go, we can't use the existing code easily. For interoperability, performance and extendability, porting its logic to Rust is a good choice.
# Details
## Overview
One of the goals is to make use of our existing basic operators, execution model and runtime to reduce the work. So the entire proposal is built on top of Apache Arrow DataFusion. The rewrote PromQL logic is manifested as `Expr` or `Execution Plan` in DataFusion. And both the intermediate data structure and the result is in the format of `Arrow`'s `RecordBatch`.
The following sections are organized in a top-down manner. Starts with evaluation procedure. Then introduces the building blocks of our new PromQL operation. Follows by an explanation of data model. And end with an example logic plan.
*This RFC is heavily related to Prometheus and PromQL. It won't repeat some basic concepts of them.*
## Evaluation
The original implementation is like an interpreter of parsed PromQL AST. It has two characteristics: (1) Operations are evaluated in place after they are parsed to AST. And some key parameters are separated from the AST because they do not present in the query, but come from other places like another field in the HTTP payload. (2) calculation is performed per timestamp. You can see this pattern many times:
```go
for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval {}
```
These bring out two differences in the proposed implementation. First, to make it more general and clear, the evaluation procedure is reorganized into serval phases (and is the same as DataFusion's). And second, data are evaluated by time series (corresponding to "columnar calculation", if think timestamp as row number).
```
Logic
Query AST Plan
─────────► Parser ───────► Logical ────────► Physical ────┐
Planner Planner │
◄───────────────────────────── Executor ◄────────────────┘
Evaluation Result Execution
Plan
```
- Parser
Provided by [`promql-parser`](https://github.com/GreptimeTeam/promql-parser) crate. Same as the original implementation.
- Logical Planner
Generates a logical plan with all the needed parameters. It should accept something like `EvalStmt` in Go's implementation, which contains query time range, evaluation interval and lookback range.
Another important thing done here is assembling the logic plan, with all the operations baked into logically. Like what's the filter and time range to read, how the data then flows through a selector into a binary operation, etc. Or what's the output schema of every single step. The generated logic plan is deterministic without variables, and can be `EXPLAIN`ed clearly.
- Physical Planner
This step converts a logic plan into evaluatable execution plan. There are not many special things like the previous step. Except when a query is going to be executed distributedly. In this case, a logic plan will be divided into serval parts and sent to serval nodes. One physical planner only sees its own part.
- Executor
As its name shows, this step calculates data to result. And all new calculation logic, the implementation of PromQL in rust, is placed here. And the rewrote functions are using `RecordBatch` and `Array` from `Arrow` as the intermediate data structure.
Each "batch" contains only data from single time series. This is from the underlying storage implementation. Though it's not a requirement of this RFC, having this property can simplify some functions.
Another thing to mention is the rewrote functions don't aware of timestamp or value columns, they are defined only based on the input data types. For example, `increase()` function in PromQL calculates the unbiased delta of data, its implementation here only does this single thing. Let's compare the signature of two implementations:
- Go
```go
func funcIncrease(vals []parser.Value, args parser.Expressions) Vector {}
```
- Rust
```rust
fn prom_increase(input: Array) -> Array {}
```
Some unimportant parameters are omitted. The original Go version only writes the logic for `Point`'s value, either float or histogram. But the proposed rewritten one accepts a generic `Array` as input, which can be any type that suits, from `i8` to `u64` to `TimestampNanosecond`.
## Plan and Expression
They are structures to express logic from PromQL. The proposed implementation is built on top of DataFusion, thus our plan and expression are in form of `ExtensionPlan` and `ScalarUDF`. The only difference between them in this context is the return type: plan returns a record batch while expression returns a single column.
This RFC proposes to add four new plans, they are fundamental building blocks that mainly handle data selection logic in PromQL, for the following calculation expressions.
- `SeriesNormalize`
Sort data inside one series on the timestamp column, and bias "offset" if has. This plan usually comes after `TableScan` (or `TableScan` and `Filter`) plan.
- `VectorManipulator` and `MatrixManipulator`
Corresponding to `InstantSelector` and `RangeSelector`. We don't calculate timestamp by timestamp, thus use "vector" instead of "instant", this image shows the difference. And "matrix" is another name for "range vector", for not confused with our "vector". The following section will detail how they are implemented using Arrow.
![instant_and_vector](instant-and-vector.png)
Due to "interval" parameter in PromQL, data after "selector" (or "manipulator" here) are usually shorter than input. And we have to modify the entire record batch to shorten both timestamp, value and tag columns. So they are formed as plan.
- `PromAggregator`
The carrier of aggregator expressions. This should not be very different from the DataFusion built-in `Aggregate` plan, except PromQL can use "group without" to do reverse selection.
PromQL has around 70 expressions and functions. But luckily we can reuse lots of them from DataFusion. Like unary expression, binary expression and aggregator. We only need to implement those PromQL-specific expressions, like `rate` or `percentile`. The following table lists some typical functions in PromQL, and their signature in the proposed implementation. Other function should be the same.
| Name | In Param(s) | Out Param(s) | Explain |
|-------------------- |------------------------------------------------------ |-------------- |-------------------- |
| instant_delta | Matrix T | Array T | idelta in PromQL |
| increase | Matrix T | Array T | increase in PromQL |
| extrapolate_factor | - Matrix T<br>- Array Timestamp<br>- Array Timestamp | Array T | * |
*: *`extrapolate_factor` is one of the "dark sides" in PromQL. In short it's a translation of this [paragraph](https://github.com/prometheus/prometheus/blob/0372e259baf014bbade3134fd79bcdfd8cbdef2c/promql/functions.go#L134-L159)*
To reuse those common calculation logic, we can break them into serval expressions, and assemble in the logic planning phase. Like `rate()` in PromQL can be represented as `increase / extrapolate_factor`.
## Data Model
This part explains how data is represented. Following the data model in GreptimeDB, all the data are stored as table, with tag columns, timestamp column and value column. Table to record batch is very straightforward. So an instant vector can be thought of as a row (though as said before, we don't use instant vectors) in the table. Given four basic types in PromQL: scalar, string, instant vector and range vector, only the last "range vector" need some tricks to adapt our columnar calculation.
Range vector is some sort of matrix, it's consisted of small one-dimension vectors, with each being an input of range function. And, applying range function to a range vector can be thought of kind of convolution.
![range-vector-with-matrix](range-vector-with-matrix.png)
(Left is an illustration of range vector. Notice the Y-axis has no meaning, it's just put different pieces separately. The right side is an imagined "matrix" as range function. Multiplying the left side to it can get a one-dimension "matrix" with four elements. That's the evaluation result of a range vector.)
To adapt this range vector to record batch, it should be represented by a column. This RFC proposes to use `DictionaryArray` from Arrow to represent range vector, or `Matrix`. This is "misusing" `DictionaryArray` to ship some additional information about an array. Because the range vector is sliding over one series, we only need to know the `offset` and `length` of each slides to reconstruct the matrix from an array:
![matrix-from-array](matrix-from-array.png)
The length is not fixed, it depends on the input's timestamp. An PoC implementation of `Matrix` and `increase()` can be found in [this repo](https://github.com/waynexia/corroding-prometheus).
## Example
The logic plan of PromQL query
```promql
# start: 2022-12-20T10:00:00
# end: 2022-12-21T10:00:00
# interval: 1m
# lookback: 30s
sum (rate(request_duration[5m])) by (idc)
```
looks like
<!-- title: 'PromAggregator: \naggr = sum, column = idc'
operator: prom
inputs:
- title: 'Matrix Manipulator: \ninterval = 1m, range = 5m, expr = div(increase(value), extrapolate_factor(timestamp))'
operator: prom
inputs:
- title: 'Series Normalize: \noffset = 0'
operator: prom
inputs:
- title: 'Filter: \ntimetamp > 2022-12-20T10:00:00 && timestamp < 2022-12-21T10:00:00'
operator: filter
inputs:
- title: 'Table Scan: \ntable = request_duration, timetamp > 2022-12-20T10:00:00 && timestamp < 2022-12-21T10:00:00'
operator: scan -->
![example](example.png)
# Drawbacks
Human-being is always error-prone. It's harder to endeavor to rewrite from the ground and requires more attention to ensure correctness, than translate line-by-line. And, since the evaluator's architecture are different, it might be painful to catch up with PromQL's breaking update (if any) in the future.
Misusing Arrow's DictionaryVector as Matrix is another point. This hack needs some `unsafe` function call to bypass Arrow's check. And though Arrow's API is stable, this is still an undocumented behavior.
# Alternatives
There are a few alternatives we've considered:
- Wrap the existing PromQL's implementation via FFI, and import it to GreptimeDB.
- Translate its evaluator engine line-by-line, rather than rewrite one.
- Integrate the Prometheus server into GreptimeDB via RPC, making it a detached execution engine for PromQL.
The first and second options are making a separate execution engine in GreptimeDB, they may alleviate the pain during rewriting, but will have negative impacts to afterward evolve like resource management. And introduce another deploy component in the last option will bring a complex deploy architecture.
And all of them are more or less redundant in data transportation that affects performance and resources. The proposed built-in executing procedure is also easy to integrate and expose to the existing SQL interface GreptimeDB currently provides. Some concepts in PromQL like sliding windows (range vector in PromQL) are very convenient and ergonomic in analyzing series data. This makes it not only a PromQL evaluator, but also an enhancement to our query system.

View File

@@ -1 +1 @@
nightly-2022-12-20
nightly-2022-07-14

View File

@@ -1,8 +1,9 @@
[package]
name = "api"
version.workspace = true
edition.workspace = true
license.workspace = true
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
common-base = { path = "../common/base" }

View File

@@ -17,7 +17,7 @@ message AdminResponse {
message AdminExpr {
ExprHeader header = 1;
oneof expr {
CreateTableExpr create_table = 2;
CreateExpr create = 2;
AlterExpr alter = 3;
CreateDatabaseExpr create_database = 4;
DropTableExpr drop_table = 5;
@@ -31,23 +31,24 @@ message AdminResult {
}
}
message CreateTableExpr {
string catalog_name = 1;
string schema_name = 2;
// TODO(hl): rename to CreateTableExpr
message CreateExpr {
optional string catalog_name = 1;
optional string schema_name = 2;
string table_name = 3;
string desc = 4;
optional string desc = 4;
repeated ColumnDef column_defs = 5;
string time_index = 6;
repeated string primary_keys = 7;
bool create_if_not_exists = 8;
map<string, string> table_options = 9;
TableId table_id = 10;
optional uint32 table_id = 10;
repeated uint32 region_ids = 11;
}
message AlterExpr {
string catalog_name = 1;
string schema_name = 2;
optional string catalog_name = 1;
optional string schema_name = 2;
string table_name = 3;
oneof kind {
AddColumns add_columns = 4;
@@ -61,11 +62,6 @@ message DropTableExpr {
string table_name = 3;
}
message CreateDatabaseExpr {
//TODO(hl): maybe rename to schema_name?
string database_name = 1;
}
message AddColumns {
repeated AddColumn add_columns = 1;
}
@@ -83,6 +79,7 @@ message DropColumn {
string name = 1;
}
message TableId {
uint32 id = 1;
message CreateDatabaseExpr {
//TODO(hl): maybe rename to schema_name?
string database_name = 1;
}

View File

@@ -59,7 +59,7 @@ message ColumnDef {
string name = 1;
ColumnDataType datatype = 2;
bool is_nullable = 3;
bytes default_constraint = 4;
optional bytes default_constraint = 4;
}
enum ColumnDataType {

View File

@@ -23,13 +23,12 @@ impl ColumnDef {
pub fn try_as_column_schema(&self) -> Result<ColumnSchema> {
let data_type = ColumnDataTypeWrapper::try_new(self.datatype)?;
let constraint = if self.default_constraint.is_empty() {
None
} else {
Some(
ColumnDefaultConstraint::try_from(self.default_constraint.as_slice())
let constraint = match &self.default_constraint {
None => None,
Some(v) => Some(
ColumnDefaultConstraint::try_from(&v[..])
.context(error::ConvertColumnDefaultConstraintSnafu { column: &self.name })?,
)
),
};
ColumnSchema::new(&self.name, data_type.into(), self.is_nullable)

View File

@@ -1,8 +1,9 @@
[package]
name = "catalog"
version.workspace = true
edition.workspace = true
license.workspace = true
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
api = { path = "../api" }
@@ -18,7 +19,7 @@ common-recordbatch = { path = "../common/recordbatch" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datafusion.workspace = true
datafusion = "14.0.0"
datatypes = { path = "../datatypes" }
futures = "0.3"
futures-util = "0.3"

View File

@@ -33,38 +33,48 @@ const ALPHANUMERICS_NAME_PATTERN: &str = "[a-zA-Z_][a-zA-Z0-9_]*";
lazy_static! {
static ref CATALOG_KEY_PATTERN: Regex = Regex::new(&format!(
"^{CATALOG_KEY_PREFIX}-({ALPHANUMERICS_NAME_PATTERN})$"
"^{}-({})$",
CATALOG_KEY_PREFIX, ALPHANUMERICS_NAME_PATTERN
))
.unwrap();
}
lazy_static! {
static ref SCHEMA_KEY_PATTERN: Regex = Regex::new(&format!(
"^{SCHEMA_KEY_PREFIX}-({ALPHANUMERICS_NAME_PATTERN})-({ALPHANUMERICS_NAME_PATTERN})$"
"^{}-({})-({})$",
SCHEMA_KEY_PREFIX, ALPHANUMERICS_NAME_PATTERN, ALPHANUMERICS_NAME_PATTERN
))
.unwrap();
}
lazy_static! {
static ref TABLE_GLOBAL_KEY_PATTERN: Regex = Regex::new(&format!(
"^{TABLE_GLOBAL_KEY_PREFIX}-({ALPHANUMERICS_NAME_PATTERN})-({ALPHANUMERICS_NAME_PATTERN})-({ALPHANUMERICS_NAME_PATTERN})$"
"^{}-({})-({})-({})$",
TABLE_GLOBAL_KEY_PREFIX,
ALPHANUMERICS_NAME_PATTERN,
ALPHANUMERICS_NAME_PATTERN,
ALPHANUMERICS_NAME_PATTERN
))
.unwrap();
}
lazy_static! {
static ref TABLE_REGIONAL_KEY_PATTERN: Regex = Regex::new(&format!(
"^{TABLE_REGIONAL_KEY_PREFIX}-({ALPHANUMERICS_NAME_PATTERN})-({ALPHANUMERICS_NAME_PATTERN})-({ALPHANUMERICS_NAME_PATTERN})-([0-9]+)$"
"^{}-({})-({})-({})-([0-9]+)$",
TABLE_REGIONAL_KEY_PREFIX,
ALPHANUMERICS_NAME_PATTERN,
ALPHANUMERICS_NAME_PATTERN,
ALPHANUMERICS_NAME_PATTERN
))
.unwrap();
}
pub fn build_catalog_prefix() -> String {
format!("{CATALOG_KEY_PREFIX}-")
format!("{}-", CATALOG_KEY_PREFIX)
}
pub fn build_schema_prefix(catalog_name: impl AsRef<str>) -> String {
format!("{SCHEMA_KEY_PREFIX}-{}-", catalog_name.as_ref())
format!("{}-{}-", SCHEMA_KEY_PREFIX, catalog_name.as_ref())
}
pub fn build_table_global_prefix(
@@ -72,7 +82,8 @@ pub fn build_table_global_prefix(
schema_name: impl AsRef<str>,
) -> String {
format!(
"{TABLE_GLOBAL_KEY_PREFIX}-{}-{}-",
"{}-{}-{}-",
TABLE_GLOBAL_KEY_PREFIX,
catalog_name.as_ref(),
schema_name.as_ref()
)
@@ -367,7 +378,7 @@ mod tests {
table_info,
};
let serialized = serde_json::to_string(&value).unwrap();
let deserialized = TableGlobalValue::parse(serialized).unwrap();
let deserialized = TableGlobalValue::parse(&serialized).unwrap();
assert_eq!(value, deserialized);
}
}

View File

@@ -157,7 +157,7 @@ pub struct RegisterSchemaRequest {
/// Formats table fully-qualified name
pub fn format_full_table_name(catalog: &str, schema: &str, table: &str) -> String {
format!("{catalog}.{schema}.{table}")
format!("{}.{}.{}", catalog, schema, table)
}
pub trait CatalogProviderFactory {
@@ -187,7 +187,8 @@ pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>(
.await
.with_context(|_| CreateTableSnafu {
table_info: format!(
"{catalog_name}.{schema_name}.{table_name}, id: {table_id}",
"{}.{}.{}, id: {}",
catalog_name, schema_name, table_name, table_id,
),
})?;
manager
@@ -199,7 +200,7 @@ pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>(
table: table.clone(),
})
.await?;
info!("Created and registered system table: {table_name}");
info!("Created and registered system table: {}", table_name);
table
};
if let Some(hook) = req.open_hook {

View File

@@ -338,7 +338,7 @@ impl CatalogManager for LocalCatalogManager {
let schema = catalog
.schema(schema_name)?
.with_context(|| SchemaNotFoundSnafu {
schema_info: format!("{catalog_name}.{schema_name}"),
schema_info: format!("{}.{}", catalog_name, schema_name),
})?;
{
@@ -452,7 +452,7 @@ impl CatalogManager for LocalCatalogManager {
let schema = catalog
.schema(schema_name)?
.with_context(|| SchemaNotFoundSnafu {
schema_info: format!("{catalog_name}.{schema_name}"),
schema_info: format!("{}.{}", catalog_name, schema_name),
})?;
schema.table(table_name)
}

View File

@@ -331,7 +331,10 @@ impl RemoteCatalogManager {
.open_table(&context, request)
.await
.with_context(|_| OpenTableSnafu {
table_info: format!("{catalog_name}.{schema_name}.{table_name}, id:{table_id}"),
table_info: format!(
"{}.{}.{}, id:{}",
catalog_name, schema_name, table_name, table_id
),
})? {
Some(table) => {
info!(
@@ -352,7 +355,7 @@ impl RemoteCatalogManager {
.clone()
.try_into()
.context(InvalidTableSchemaSnafu {
table_info: format!("{catalog_name}.{schema_name}.{table_name}"),
table_info: format!("{}.{}.{}", catalog_name, schema_name, table_name,),
schema: meta.schema.clone(),
})?;
let req = CreateTableRequest {
@@ -474,7 +477,7 @@ impl CatalogManager for RemoteCatalogManager {
let schema = catalog
.schema(schema_name)?
.with_context(|| SchemaNotFoundSnafu {
schema_info: format!("{catalog_name}.{schema_name}"),
schema_info: format!("{}.{}", catalog_name, schema_name),
})?;
schema.table(table_name)
}

View File

@@ -61,7 +61,7 @@ impl Table for SystemCatalogTable {
async fn scan(
&self,
_projection: Option<&Vec<usize>>,
_projection: &Option<Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> table::Result<PhysicalPlanRef> {
@@ -129,7 +129,7 @@ impl SystemCatalogTable {
let ctx = SessionContext::new();
let scan = self
.table
.scan(full_projection, &[], None)
.scan(&full_projection, &[], None)
.await
.context(error::SystemCatalogTableScanSnafu)?;
let stream = scan
@@ -197,7 +197,7 @@ pub fn build_table_insert_request(full_table_name: String, table_id: TableId) ->
}
pub fn build_schema_insert_request(catalog_name: String, schema_name: String) -> InsertRequest {
let full_schema_name = format!("{catalog_name}.{schema_name}");
let full_schema_name = format!("{}.{}", catalog_name, schema_name);
build_insert_request(
EntryType::Schema,
full_schema_name.as_bytes(),
@@ -367,7 +367,7 @@ pub struct TableEntryValue {
#[cfg(test)]
mod tests {
use log_store::NoopLogStore;
use log_store::fs::noop::NoopLogStore;
use mito::config::EngineConfig;
use mito::engine::MitoEngine;
use object_store::ObjectStore;
@@ -390,7 +390,7 @@ mod tests {
if let Entry::Catalog(e) = entry {
assert_eq!("some_catalog", e.catalog_name);
} else {
panic!("Unexpected type: {entry:?}");
panic!("Unexpected type: {:?}", entry);
}
}
@@ -407,7 +407,7 @@ mod tests {
assert_eq!("some_catalog", e.catalog_name);
assert_eq!("some_schema", e.schema_name);
} else {
panic!("Unexpected type: {entry:?}");
panic!("Unexpected type: {:?}", entry);
}
}
@@ -426,7 +426,7 @@ mod tests {
assert_eq!("some_table", e.table_name);
assert_eq!(42, e.table_id);
} else {
panic!("Unexpected type: {entry:?}");
panic!("Unexpected type: {:?}", entry);
}
}

View File

@@ -77,7 +77,7 @@ impl Table for Tables {
async fn scan(
&self,
_projection: Option<&Vec<usize>>,
_projection: &Option<Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> table::error::Result<PhysicalPlanRef> {
@@ -370,7 +370,7 @@ mod tests {
.unwrap();
let tables = Tables::new(catalog_list, "test_engine".to_string());
let tables_stream = tables.scan(None, &[], None).await.unwrap();
let tables_stream = tables.scan(&None, &[], None).await.unwrap();
let session_ctx = SessionContext::new();
let mut tables_stream = tables_stream.execute(0, session_ctx.task_ctx()).unwrap();

View File

@@ -69,7 +69,8 @@ mod tests {
assert!(
err.to_string()
.contains("Table `greptime.public.test_table` already exists"),
"Actual error message: {err}",
"Actual error message: {}",
err
);
}

View File

@@ -189,10 +189,10 @@ impl TableEngine for MockTableEngine {
unimplemented!()
}
fn get_table(
fn get_table<'a>(
&self,
_ctx: &EngineContext,
table_ref: &TableReference,
table_ref: &'a TableReference,
) -> table::Result<Option<TableRef>> {
futures::executor::block_on(async {
Ok(self
@@ -204,7 +204,7 @@ impl TableEngine for MockTableEngine {
})
}
fn table_exists(&self, _ctx: &EngineContext, table_ref: &TableReference) -> bool {
fn table_exists<'a>(&self, _ctx: &EngineContext, table_ref: &'a TableReference) -> bool {
futures::executor::block_on(async {
self.tables
.read()

View File

@@ -1,8 +1,9 @@
[package]
name = "client"
version.workspace = true
edition.workspace = true
license.workspace = true
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
api = { path = "../api" }
@@ -14,7 +15,7 @@ common-grpc-expr = { path = "../common/grpc-expr" }
common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-time = { path = "../common/time" }
datafusion.workspace = true
datafusion = "14.0.0"
datatypes = { path = "../datatypes" }
enum_dispatch = "0.3"
parking_lot = "0.12"

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, TableId};
use api::v1::{ColumnDataType, ColumnDef, CreateExpr};
use client::admin::Admin;
use client::{Client, Database};
use prost_09::Message;
@@ -33,36 +33,36 @@ fn main() {
async fn run() {
let client = Client::with_urls(vec!["127.0.0.1:3001"]);
let create_table_expr = CreateTableExpr {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
let create_table_expr = CreateExpr {
catalog_name: Some("greptime".to_string()),
schema_name: Some("public".to_string()),
table_name: "test_logical_dist_exec".to_string(),
desc: "".to_string(),
desc: None,
column_defs: vec![
ColumnDef {
name: "timestamp".to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
is_nullable: false,
default_constraint: vec![],
default_constraint: None,
},
ColumnDef {
name: "key".to_string(),
datatype: ColumnDataType::Uint64 as i32,
is_nullable: false,
default_constraint: vec![],
default_constraint: None,
},
ColumnDef {
name: "value".to_string(),
datatype: ColumnDataType::Uint64 as i32,
is_nullable: false,
default_constraint: vec![],
default_constraint: None,
},
],
time_index: "timestamp".to_string(),
primary_keys: vec!["key".to_string()],
create_if_not_exists: false,
table_options: Default::default(),
table_id: Some(TableId { id: 1024 }),
table_id: Some(1024),
region_ids: vec![0],
};

View File

@@ -34,13 +34,13 @@ impl Admin {
}
}
pub async fn create(&self, expr: CreateTableExpr) -> Result<AdminResult> {
pub async fn create(&self, expr: CreateExpr) -> Result<AdminResult> {
let header = ExprHeader {
version: PROTOCOL_VERSION,
};
let expr = AdminExpr {
header: Some(header),
expr: Some(admin_expr::Expr::CreateTable(expr)),
expr: Some(admin_expr::Expr::Create(expr)),
};
self.do_request(expr).await
}

View File

@@ -1,9 +1,9 @@
[package]
name = "cmd"
version.workspace = true
edition.workspace = true
license.workspace = true
version = "0.1.0"
edition = "2021"
default-run = "greptime"
license = "Apache-2.0"
[[bin]]
name = "greptime"

View File

@@ -125,7 +125,7 @@ impl TryFrom<StartCommand> for DatanodeOptions {
}
if let Some(wal_dir) = cmd.wal_dir {
opts.wal.dir = wal_dir;
opts.wal_dir = wal_dir;
}
Ok(opts)
}
@@ -151,7 +151,7 @@ mod tests {
};
let options: DatanodeOptions = cmd.try_into().unwrap();
assert_eq!("127.0.0.1:3001".to_string(), options.rpc_addr);
assert_eq!("/tmp/greptimedb/wal".to_string(), options.wal.dir);
assert_eq!("/tmp/greptimedb/wal".to_string(), options.wal_dir);
assert_eq!("127.0.0.1:4406".to_string(), options.mysql_addr);
assert_eq!(4, options.mysql_runtime_size);
let MetaClientOpts {

View File

@@ -12,8 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use anymap::AnyMap;
use clap::Parser;
use frontend::frontend::{Frontend, FrontendOptions};
use frontend::grpc::GrpcOptions;
@@ -22,7 +21,6 @@ use frontend::instance::Instance;
use frontend::mysql::MysqlOptions;
use frontend::opentsdb::OpentsdbOptions;
use frontend::postgres::PostgresOptions;
use frontend::Plugins;
use meta_client::MetaClientOpts;
use servers::auth::UserProviderRef;
use servers::http::HttpOptions;
@@ -88,21 +86,21 @@ pub struct StartCommand {
impl StartCommand {
async fn run(self) -> Result<()> {
let plugins = Arc::new(load_frontend_plugins(&self.user_provider)?);
let plugins = load_frontend_plugins(&self.user_provider)?;
let opts: FrontendOptions = self.try_into()?;
let mut instance = Instance::try_new_distributed(&opts)
.await
.context(error::StartFrontendSnafu)?;
instance.set_plugins(plugins.clone());
let mut frontend = Frontend::new(opts, instance, plugins);
let mut frontend = Frontend::new(
opts.clone(),
Instance::try_new_distributed(&opts)
.await
.context(error::StartFrontendSnafu)?,
plugins,
);
frontend.start().await.context(error::StartFrontendSnafu)
}
}
pub fn load_frontend_plugins(user_provider: &Option<String>) -> Result<Plugins> {
let mut plugins = Plugins::new();
pub fn load_frontend_plugins(user_provider: &Option<String>) -> Result<AnyMap> {
let mut plugins = AnyMap::new();
if let Some(provider) = user_provider {
let provider = auth::user_provider_from_option(provider).context(IllegalAuthConfigSnafu)?;

View File

@@ -12,11 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use anymap::AnyMap;
use clap::Parser;
use common_telemetry::info;
use datanode::datanode::{Datanode, DatanodeOptions, ObjectStoreConfig, WalConfig};
use datanode::datanode::{Datanode, DatanodeOptions, ObjectStoreConfig};
use datanode::instance::InstanceRef;
use frontend::frontend::{Frontend, FrontendOptions};
use frontend::grpc::GrpcOptions;
@@ -26,7 +25,6 @@ use frontend::mysql::MysqlOptions;
use frontend::opentsdb::OpentsdbOptions;
use frontend::postgres::PostgresOptions;
use frontend::prometheus::PrometheusOptions;
use frontend::Plugins;
use serde::{Deserialize, Serialize};
use servers::http::HttpOptions;
use servers::tls::{TlsMode, TlsOption};
@@ -72,7 +70,7 @@ pub struct StandaloneOptions {
pub influxdb_options: Option<InfluxdbOptions>,
pub prometheus_options: Option<PrometheusOptions>,
pub mode: Mode,
pub wal: WalConfig,
pub wal_dir: String,
pub storage: ObjectStoreConfig,
pub enable_memory_catalog: bool,
}
@@ -88,7 +86,7 @@ impl Default for StandaloneOptions {
influxdb_options: Some(InfluxdbOptions::default()),
prometheus_options: Some(PrometheusOptions::default()),
mode: Mode::Standalone,
wal: WalConfig::default(),
wal_dir: "/tmp/greptimedb/wal".to_string(),
storage: ObjectStoreConfig::default(),
enable_memory_catalog: false,
}
@@ -112,7 +110,7 @@ impl StandaloneOptions {
fn datanode_options(self) -> DatanodeOptions {
DatanodeOptions {
wal: self.wal,
wal_dir: self.wal_dir,
storage: self.storage,
enable_memory_catalog: self.enable_memory_catalog,
..Default::default()
@@ -152,7 +150,7 @@ impl StartCommand {
async fn run(self) -> Result<()> {
let enable_memory_catalog = self.enable_memory_catalog;
let config_file = self.config_file.clone();
let plugins = Arc::new(load_frontend_plugins(&self.user_provider)?);
let plugins = load_frontend_plugins(&self.user_provider)?;
let fe_opts = FrontendOptions::try_from(self)?;
let dn_opts: DatanodeOptions = {
let mut opts: StandaloneOptions = if let Some(path) = config_file {
@@ -189,12 +187,11 @@ impl StartCommand {
/// Build frontend instance in standalone mode
async fn build_frontend(
fe_opts: FrontendOptions,
plugins: Arc<Plugins>,
plugins: AnyMap,
datanode_instance: InstanceRef,
) -> Result<Frontend<FeInstance>> {
let mut frontend_instance = FeInstance::new_standalone(datanode_instance.clone());
frontend_instance.set_script_handler(datanode_instance);
frontend_instance.set_plugins(plugins.clone());
Ok(Frontend::new(fe_opts, frontend_instance, plugins))
}
@@ -224,7 +221,8 @@ impl TryFrom<StartCommand> for FrontendOptions {
if addr == datanode_grpc_addr {
return IllegalConfigSnafu {
msg: format!(
"gRPC listen address conflicts with datanode reserved gRPC addr: {datanode_grpc_addr}",
"gRPC listen address conflicts with datanode reserved gRPC addr: {}",
datanode_grpc_addr
),
}
.fail();

View File

@@ -1,8 +1,8 @@
[package]
name = "common-base"
version.workspace = true
edition.workspace = true
license.workspace = true
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
[dependencies]
bitvec = "1.0"

View File

@@ -1,8 +1,8 @@
[package]
name = "common-catalog"
version.workspace = true
edition.workspace = true
license.workspace = true
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
[dependencies]
async-trait = "0.1"

View File

@@ -1,8 +1,8 @@
[package]
name = "common-error"
version.workspace = true
edition.workspace = true
license.workspace = true
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
[dependencies]
snafu = { version = "0.7", features = ["backtraces"] }

View File

@@ -131,7 +131,7 @@ mod tests {
assert!(ErrorCompat::backtrace(&err).is_some());
let msg = format!("{err:?}");
let msg = format!("{:?}", err);
assert!(msg.contains("\nBacktrace:\n"));
let fmt_msg = format!("{:?}", DebugFormat::new(&err));
assert_eq!(msg, fmt_msg);
@@ -151,7 +151,7 @@ mod tests {
assert!(err.as_any().downcast_ref::<MockError>().is_some());
assert!(err.source().is_some());
let msg = format!("{err:?}");
let msg = format!("{:?}", err);
assert!(msg.contains("\nBacktrace:\n"));
assert!(msg.contains("Caused by"));

View File

@@ -31,11 +31,11 @@ impl<'a, E: ErrorExt + ?Sized> fmt::Debug for DebugFormat<'a, E> {
write!(f, "{}.", self.0)?;
if let Some(source) = self.0.source() {
// Source error use debug format for more verbose info.
write!(f, " Caused by: {source:?}")?;
write!(f, " Caused by: {:?}", source)?;
}
if let Some(backtrace) = self.0.backtrace_opt() {
// Add a newline to separate causes and backtrace.
write!(f, "\nBacktrace:\n{backtrace}")?;
write!(f, "\nBacktrace:\n{}", backtrace)?;
}
Ok(())

View File

@@ -51,7 +51,6 @@ pub enum StatusCode {
TableNotFound = 4001,
TableColumnNotFound = 4002,
TableColumnExists = 4003,
DatabaseNotFound = 4004,
// ====== End of catalog related status code =======
// ====== Begin of storage related status code =====
@@ -87,7 +86,7 @@ impl StatusCode {
impl fmt::Display for StatusCode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// The current debug format is suitable to display.
write!(f, "{self:?}")
write!(f, "{:?}", self)
}
}
@@ -96,7 +95,7 @@ mod tests {
use super::*;
fn assert_status_code_display(code: StatusCode, msg: &str) {
let code_msg = format!("{code}");
let code_msg = format!("{}", code);
assert_eq!(msg, code_msg);
}

View File

@@ -1,8 +1,8 @@
[package]
name = "common-function-macro"
version.workspace = true
edition.workspace = true
license.workspace = true
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
[lib]
proc-macro = true

View File

@@ -1,8 +1,8 @@
[package]
edition = "2021"
name = "common-function"
edition.workspace = true
version.workspace = true
license.workspace = true
version = "0.1.0"
license = "Apache-2.0"
[dependencies]
arc-swap = "1.0"
@@ -11,7 +11,7 @@ common-error = { path = "../error" }
common-function-macro = { path = "../function-macro" }
common-query = { path = "../query" }
common-time = { path = "../time" }
datafusion.workspace = true
datafusion-common = "14.0.0"
datatypes = { path = "../../datatypes" }
libc = "0.2"
num = "0.4"

View File

@@ -343,7 +343,7 @@ mod tests {
Arc::new(Int64Vector::from_vec(fp.clone())),
];
let vector = interp(&args).unwrap();
assert!(matches!(vector.get(0), Value::Float64(v) if v == x[0]));
assert!(matches!(vector.get(0), Value::Float64(v) if v==x[0] as f64));
// x=None output:Null
let input = vec![None, Some(0.0), Some(0.3)];

View File

@@ -127,7 +127,12 @@ mod tests {
assert_eq!(4, vec.len());
for i in 0..4 {
assert_eq!(i == 0 || i == 3, vec.get_data(i).unwrap(), "Failed at {i}",)
assert_eq!(
i == 0 || i == 3,
vec.get_data(i).unwrap(),
"failed at {}",
i
)
}
}
_ => unreachable!(),

View File

@@ -1,8 +1,8 @@
[package]
name = "common-grpc-expr"
version.workspace = true
edition.workspace = true
license.workspace = true
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
[dependencies]
api = { path = "../../api" }

View File

@@ -15,7 +15,7 @@
use std::sync::Arc;
use api::v1::alter_expr::Kind;
use api::v1::{AlterExpr, CreateTableExpr, DropColumns};
use api::v1::{AlterExpr, CreateExpr, DropColumns};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
use snafu::{ensure, OptionExt, ResultExt};
@@ -29,16 +29,6 @@ use crate::error::{
/// Convert an [`AlterExpr`] to an optional [`AlterTableRequest`]
pub fn alter_expr_to_request(expr: AlterExpr) -> Result<Option<AlterTableRequest>> {
let catalog_name = if expr.catalog_name.is_empty() {
None
} else {
Some(expr.catalog_name)
};
let schema_name = if expr.schema_name.is_empty() {
None
} else {
Some(expr.schema_name)
};
match expr.kind {
Some(Kind::AddColumns(add_columns)) => {
let add_column_requests = add_columns
@@ -67,8 +57,8 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<Option<AlterTableRequest
};
let request = AlterTableRequest {
catalog_name,
schema_name,
catalog_name: expr.catalog_name,
schema_name: expr.schema_name,
table_name: expr.table_name,
alter_kind,
};
@@ -80,8 +70,8 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<Option<AlterTableRequest
};
let request = AlterTableRequest {
catalog_name,
schema_name,
catalog_name: expr.catalog_name,
schema_name: expr.schema_name,
table_name: expr.table_name,
alter_kind,
};
@@ -91,7 +81,7 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<Option<AlterTableRequest
}
}
pub fn create_table_schema(expr: &CreateTableExpr) -> Result<SchemaRef> {
pub fn create_table_schema(expr: &CreateExpr) -> Result<SchemaRef> {
let column_schemas = expr
.column_defs
.iter()
@@ -106,7 +96,7 @@ pub fn create_table_schema(expr: &CreateTableExpr) -> Result<SchemaRef> {
.iter()
.any(|column| column.name == expr.time_index),
MissingTimestampColumnSnafu {
msg: format!("CreateExpr: {expr:?}")
msg: format!("CreateExpr: {:?}", expr)
}
);
@@ -129,10 +119,7 @@ pub fn create_table_schema(expr: &CreateTableExpr) -> Result<SchemaRef> {
))
}
pub fn create_expr_to_request(
table_id: TableId,
expr: CreateTableExpr,
) -> Result<CreateTableRequest> {
pub fn create_expr_to_request(table_id: TableId, expr: CreateExpr) -> Result<CreateTableRequest> {
let schema = create_table_schema(&expr)?;
let primary_key_indices = expr
.primary_keys
@@ -147,19 +134,12 @@ pub fn create_expr_to_request(
})
.collect::<Result<Vec<usize>>>()?;
let mut catalog_name = expr.catalog_name;
if catalog_name.is_empty() {
catalog_name = DEFAULT_CATALOG_NAME.to_string();
}
let mut schema_name = expr.schema_name;
if schema_name.is_empty() {
schema_name = DEFAULT_SCHEMA_NAME.to_string();
}
let desc = if expr.desc.is_empty() {
None
} else {
Some(expr.desc)
};
let catalog_name = expr
.catalog_name
.unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string());
let schema_name = expr
.schema_name
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());
let region_ids = if expr.region_ids.is_empty() {
vec![0]
@@ -172,7 +152,7 @@ pub fn create_expr_to_request(
catalog_name,
schema_name,
table_name: expr.table_name,
desc,
desc: expr.desc,
schema,
region_numbers: region_ids,
primary_key_indices,
@@ -191,8 +171,8 @@ mod tests {
#[test]
fn test_alter_expr_to_request() {
let expr = AlterExpr {
catalog_name: "".to_string(),
schema_name: "".to_string(),
catalog_name: None,
schema_name: None,
table_name: "monitor".to_string(),
kind: Some(Kind::AddColumns(AddColumns {
@@ -201,7 +181,7 @@ mod tests {
name: "mem_usage".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: false,
default_constraint: vec![],
default_constraint: None,
}),
is_key: false,
}],
@@ -228,8 +208,8 @@ mod tests {
#[test]
fn test_drop_column_expr() {
let expr = AlterExpr {
catalog_name: "test_catalog".to_string(),
schema_name: "test_schema".to_string(),
catalog_name: Some("test_catalog".to_string()),
schema_name: Some("test_schema".to_string()),
table_name: "monitor".to_string(),
kind: Some(Kind::DropColumns(DropColumns {

View File

@@ -18,7 +18,7 @@ use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::column::{SemanticType, Values};
use api::v1::{AddColumn, AddColumns, Column, ColumnDataType, ColumnDef, CreateTableExpr};
use api::v1::{AddColumn, AddColumns, Column, ColumnDataType, ColumnDef, CreateExpr};
use common_base::BitVec;
use common_time::timestamp::Timestamp;
use common_time::{Date, DateTime};
@@ -45,7 +45,7 @@ fn build_column_def(column_name: &str, datatype: i32, nullable: bool) -> ColumnD
name: column_name.to_string(),
datatype,
is_nullable: nullable,
default_constraint: vec![],
default_constraint: None,
}
}
@@ -154,7 +154,7 @@ fn collect_column_values(column_datatype: ColumnDataType, values: &Values) -> Ve
collect_values!(values.i32_values, |v| ValueRef::from(*v))
}
ColumnDataType::Int64 => {
collect_values!(values.i64_values, |v| ValueRef::from(*v))
collect_values!(values.i64_values, |v| ValueRef::from(*v as i64))
}
ColumnDataType::Uint8 => {
collect_values!(values.u8_values, |v| ValueRef::from(*v as u8))
@@ -166,7 +166,7 @@ fn collect_column_values(column_datatype: ColumnDataType, values: &Values) -> Ve
collect_values!(values.u32_values, |v| ValueRef::from(*v))
}
ColumnDataType::Uint64 => {
collect_values!(values.u64_values, |v| ValueRef::from(*v))
collect_values!(values.u64_values, |v| ValueRef::from(*v as u64))
}
ColumnDataType::Float32 => collect_values!(values.f32_values, |v| ValueRef::from(*v)),
ColumnDataType::Float64 => collect_values!(values.f64_values, |v| ValueRef::from(*v)),
@@ -214,7 +214,7 @@ pub fn build_create_expr_from_insertion(
table_id: Option<TableId>,
table_name: &str,
columns: &[Column],
) -> Result<CreateTableExpr> {
) -> Result<CreateExpr> {
let mut new_columns: HashSet<String> = HashSet::default();
let mut column_defs = Vec::default();
let mut primary_key_indices = Vec::default();
@@ -263,17 +263,17 @@ pub fn build_create_expr_from_insertion(
.map(|idx| columns[*idx].column_name.clone())
.collect::<Vec<_>>();
let expr = CreateTableExpr {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
let expr = CreateExpr {
catalog_name: Some(catalog_name.to_string()),
schema_name: Some(schema_name.to_string()),
table_name: table_name.to_string(),
desc: "Created on insertion".to_string(),
desc: Some("Created on insertion".to_string()),
column_defs,
time_index: timestamp_field_name,
primary_keys,
create_if_not_exists: true,
table_options: Default::default(),
table_id: table_id.map(|id| api::v1::TableId { id }),
table_id,
region_ids: vec![0], // TODO:(hl): region id should be allocated by frontend
};
@@ -516,9 +516,9 @@ mod tests {
build_create_expr_from_insertion("", "", table_id, table_name, &insert_batch.0)
.unwrap();
assert_eq!(table_id, create_expr.table_id.map(|x| x.id));
assert_eq!(table_id, create_expr.table_id);
assert_eq!(table_name, create_expr.table_name);
assert_eq!("Created on insertion".to_string(), create_expr.desc);
assert_eq!(Some("Created on insertion".to_string()), create_expr.desc);
assert_eq!(
vec![create_expr.column_defs[0].name.clone()],
create_expr.primary_keys
@@ -725,7 +725,7 @@ mod tests {
async fn scan(
&self,
_projection: Option<&Vec<usize>>,
_projection: &Option<Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> TableResult<PhysicalPlanRef> {

View File

@@ -1,3 +1,4 @@
#![feature(assert_matches)]
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");

View File

@@ -1,8 +1,8 @@
[package]
name = "common-grpc"
version.workspace = true
edition.workspace = true
license.workspace = true
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
[dependencies]
api = { path = "../../api" }
@@ -13,7 +13,7 @@ common-query = { path = "../query" }
common-recordbatch = { path = "../recordbatch" }
common-runtime = { path = "../runtime" }
dashmap = "5.4"
datafusion.workspace = true
datafusion = "14.0.0"
datatypes = { path = "../../datatypes" }
snafu = { version = "0.7", features = ["backtraces"] }
tokio = { version = "1.0", features = ["full"] }

View File

@@ -26,7 +26,7 @@ async fn do_bench_channel_manager() {
let join = tokio::spawn(async move {
for _ in 0..10000 {
let idx = rand::random::<usize>() % 100;
let ret = m_clone.get(format!("{idx}"));
let ret = m_clone.get(format!("{}", idx));
assert!(ret.is_ok());
}
});

View File

@@ -120,7 +120,7 @@ impl ChannelManager {
fn build_endpoint(&self, addr: &str) -> Result<Endpoint> {
let mut endpoint =
Endpoint::new(format!("http://{addr}")).context(error::CreateChannelSnafu)?;
Endpoint::new(format!("http://{}", addr)).context(error::CreateChannelSnafu)?;
if let Some(dur) = self.config.timeout {
endpoint = endpoint.timeout(dur);

View File

@@ -1,17 +1,17 @@
[package]
name = "common-query"
version.workspace = true
edition.workspace = true
license.workspace = true
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
[dependencies]
async-trait = "0.1"
common-error = { path = "../error" }
common-recordbatch = { path = "../recordbatch" }
common-time = { path = "../time" }
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datafusion = "14.0.0"
datafusion-common = "14.0.0"
datafusion-expr = "14.0.0"
datatypes = { path = "../../datatypes" }
snafu = { version = "0.7", features = ["backtraces"] }
statrs = "0.15"

View File

@@ -161,7 +161,12 @@ mod tests {
assert_eq!(4, vec.len());
for i in 0..4 {
assert_eq!(i == 0 || i == 3, vec.get_data(i).unwrap(), "Failed at {i}")
assert_eq!(
i == 0 || i == 3,
vec.get_data(i).unwrap(),
"failed at {}",
i
)
}
}
_ => unreachable!(),

View File

@@ -18,7 +18,7 @@ use std::fmt::Debug;
use std::sync::Arc;
use datafusion_common::Result as DfResult;
use datafusion_expr::Accumulator as DfAccumulator;
use datafusion_expr::{Accumulator as DfAccumulator, AggregateState};
use datatypes::arrow::array::ArrayRef;
use datatypes::prelude::*;
use datatypes::vectors::{Helper as VectorHelper, VectorRef};
@@ -126,19 +126,24 @@ impl DfAccumulatorAdaptor {
}
impl DfAccumulator for DfAccumulatorAdaptor {
fn state(&self) -> DfResult<Vec<ScalarValue>> {
fn state(&self) -> DfResult<Vec<AggregateState>> {
let state_values = self.accumulator.state()?;
let state_types = self.creator.state_types()?;
if state_values.len() != state_types.len() {
return error::BadAccumulatorImplSnafu {
err_msg: format!("Accumulator {self:?} returned state values size do not match its state types size."),
err_msg: format!("Accumulator {:?} returned state values size do not match its state types size.", self),
}
.fail()?;
}
Ok(state_values
.into_iter()
.zip(state_types.iter())
.map(|(v, t)| v.try_to_scalar_value(t).context(error::ToScalarValueSnafu))
.map(|(v, t)| {
let scalar = v
.try_to_scalar_value(t)
.context(error::ToScalarValueSnafu)?;
Ok(AggregateState::Scalar(scalar))
})
.collect::<Result<Vec<_>>>()?)
}
@@ -170,9 +175,4 @@ impl DfAccumulator for DfAccumulatorAdaptor {
.map_err(Error::from)?;
Ok(scalar_value)
}
fn size(&self) -> usize {
// TODO(LFC): Implement new "size" method for Accumulator.
0
}
}

View File

@@ -233,7 +233,7 @@ mod test {
async fn scan(
&self,
_ctx: &SessionState,
_projection: Option<&Vec<usize>>,
_projection: &Option<Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> DfResult<Arc<dyn DfPhysicalPlan>> {

View File

@@ -1,13 +1,13 @@
[package]
name = "common-recordbatch"
version.workspace = true
edition.workspace = true
license.workspace = true
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
[dependencies]
common-error = { path = "../error" }
datafusion.workspace = true
datafusion-common.workspace = true
datafusion = "14.0.0"
datafusion-common = "14.0.0"
datatypes = { path = "../../datatypes" }
futures = "0.3"
paste = "1.0"

View File

@@ -121,8 +121,7 @@ impl Stream for RecordBatchStreamAdapter {
enum AsyncRecordBatchStreamAdapterState {
Uninit(FutureStream),
Ready(DfSendableRecordBatchStream),
Failed,
Inited(std::result::Result<DfSendableRecordBatchStream, DataFusionError>),
}
pub struct AsyncRecordBatchStreamAdapter {
@@ -152,26 +151,28 @@ impl Stream for AsyncRecordBatchStreamAdapter {
loop {
match &mut self.state {
AsyncRecordBatchStreamAdapterState::Uninit(stream_future) => {
match ready!(Pin::new(stream_future).poll(cx)) {
Ok(stream) => {
self.state = AsyncRecordBatchStreamAdapterState::Ready(stream);
continue;
}
Err(e) => {
self.state = AsyncRecordBatchStreamAdapterState::Failed;
return Poll::Ready(Some(
Err(e).context(error::InitRecordbatchStreamSnafu),
));
}
};
self.state = AsyncRecordBatchStreamAdapterState::Inited(ready!(Pin::new(
stream_future
)
.poll(cx)));
continue;
}
AsyncRecordBatchStreamAdapterState::Ready(stream) => {
return Poll::Ready(ready!(Pin::new(stream).poll_next(cx)).map(|x| {
let df_record_batch = x.context(error::PollStreamSnafu)?;
RecordBatch::try_from_df_record_batch(self.schema(), df_record_batch)
}))
}
AsyncRecordBatchStreamAdapterState::Failed => return Poll::Ready(None),
AsyncRecordBatchStreamAdapterState::Inited(stream) => match stream {
Ok(stream) => {
return Poll::Ready(ready!(Pin::new(stream).poll_next(cx)).map(|df| {
let df_record_batch = df.context(error::PollStreamSnafu)?;
RecordBatch::try_from_df_record_batch(self.schema(), df_record_batch)
}));
}
Err(e) => {
return Poll::Ready(Some(
error::CreateRecordBatchesSnafu {
reason: format!("Read error {:?} from stream", e),
}
.fail(),
))
}
},
}
}
}
@@ -182,104 +183,3 @@ impl Stream for AsyncRecordBatchStreamAdapter {
(0, None)
}
}
#[cfg(test)]
mod test {
use common_error::mock::MockError;
use common_error::prelude::{BoxedError, StatusCode};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::vectors::Int32Vector;
use super::*;
use crate::RecordBatches;
#[tokio::test]
async fn test_async_recordbatch_stream_adaptor() {
struct MaybeErrorRecordBatchStream {
items: Vec<Result<RecordBatch>>,
}
impl RecordBatchStream for MaybeErrorRecordBatchStream {
fn schema(&self) -> SchemaRef {
unimplemented!()
}
}
impl Stream for MaybeErrorRecordBatchStream {
type Item = Result<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if let Some(batch) = self.items.pop() {
Poll::Ready(Some(Ok(batch?)))
} else {
Poll::Ready(None)
}
}
}
fn new_future_stream(
maybe_recordbatches: Result<Vec<Result<RecordBatch>>>,
) -> FutureStream {
Box::pin(async move {
maybe_recordbatches
.map(|items| {
Box::pin(DfRecordBatchStreamAdapter::new(Box::pin(
MaybeErrorRecordBatchStream { items },
))) as _
})
.map_err(|e| DataFusionError::External(Box::new(e)))
})
}
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"a",
ConcreteDataType::int32_datatype(),
false,
)]));
let batch1 = RecordBatch::new(
schema.clone(),
vec![Arc::new(Int32Vector::from_slice(&[1])) as _],
)
.unwrap();
let batch2 = RecordBatch::new(
schema.clone(),
vec![Arc::new(Int32Vector::from_slice(&[2])) as _],
)
.unwrap();
let success_stream = new_future_stream(Ok(vec![Ok(batch1.clone()), Ok(batch2.clone())]));
let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), success_stream);
let collected = RecordBatches::try_collect(Box::pin(adapter)).await.unwrap();
assert_eq!(
collected,
RecordBatches::try_new(schema.clone(), vec![batch2.clone(), batch1.clone()]).unwrap()
);
let poll_err_stream = new_future_stream(Ok(vec![
Ok(batch1.clone()),
Err(error::Error::External {
source: BoxedError::new(MockError::new(StatusCode::Unknown)),
}),
]));
let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), poll_err_stream);
let result = RecordBatches::try_collect(Box::pin(adapter)).await;
assert_eq!(
result.unwrap_err().to_string(),
"Failed to poll stream, source: External error: External error, source: Unknown"
);
let failed_to_init_stream = new_future_stream(Err(error::Error::External {
source: BoxedError::new(MockError::new(StatusCode::Internal)),
}));
let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), failed_to_init_stream);
let result = RecordBatches::try_collect(Box::pin(adapter)).await;
assert_eq!(
result.unwrap_err().to_string(),
"Failed to init Recordbatch stream, source: External error: External error, source: Internal"
);
}
}

View File

@@ -64,12 +64,6 @@ pub enum Error {
source: datatypes::arrow::error::ArrowError,
backtrace: Backtrace,
},
#[snafu(display("Failed to init Recordbatch stream, source: {}", source))]
InitRecordbatchStream {
source: datafusion_common::DataFusionError,
backtrace: Backtrace,
},
}
impl ErrorExt for Error {
@@ -80,8 +74,7 @@ impl ErrorExt for Error {
Error::DataTypes { .. }
| Error::CreateRecordBatches { .. }
| Error::PollStream { .. }
| Error::Format { .. }
| Error::InitRecordbatchStream { .. } => StatusCode::Internal,
| Error::Format { .. } => StatusCode::Internal,
Error::External { source } => source.status_code(),

View File

@@ -231,7 +231,8 @@ mod tests {
assert_eq!(
result.unwrap_err().to_string(),
format!(
"Failed to create RecordBatches, reason: expect RecordBatch schema equals {schema1:?}, actual: {schema2:?}",
"Failed to create RecordBatches, reason: expect RecordBatch schema equals {:?}, actual: {:?}",
schema1, schema2
)
);

View File

@@ -227,7 +227,7 @@ mod tests {
let output = serde_json::to_string(&batch).unwrap();
assert_eq!(
r#"{"schema":{"fields":[{"name":"number","data_type":"UInt32","nullable":false,"dict_id":0,"dict_is_ordered":false,"metadata":{}}],"metadata":{"greptime:version":"0"}},"columns":[[0,1,2,3,4,5,6,7,8,9]]}"#,
r#"{"schema":{"fields":[{"name":"number","data_type":"UInt32","nullable":false,"dict_id":0,"dict_is_ordered":false}],"metadata":{"greptime:version":"0"}},"columns":[[0,1,2,3,4,5,6,7,8,9]]}"#,
output
);
}

View File

@@ -1,8 +1,8 @@
[package]
name = "common-runtime"
version.workspace = true
edition.workspace = true
license.workspace = true
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
[dependencies]
common-error = { path = "../error" }

View File

@@ -1,8 +1,8 @@
[package]
name = "substrait"
version.workspace = true
edition.workspace = true
license.workspace = true
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
[dependencies]
bytes = "1.1"
@@ -10,8 +10,8 @@ catalog = { path = "../../catalog" }
common-catalog = { path = "../catalog" }
common-error = { path = "../error" }
common-telemetry = { path = "../telemetry" }
datafusion.workspace = true
datafusion-expr.workspace = true
datafusion = "14.0.0"
datafusion-expr = "14.0.0"
datatypes = { path = "../../datatypes" }
futures = "0.3"
prost = "0.9"

View File

@@ -16,7 +16,6 @@ use std::collections::VecDeque;
use std::str::FromStr;
use datafusion::common::Column;
use datafusion_expr::expr::Sort;
use datafusion_expr::{expr_fn, lit, Between, BinaryExpr, BuiltinScalarFunction, Expr, Operator};
use datatypes::schema::Schema;
use snafu::{ensure, OptionExt};
@@ -62,7 +61,7 @@ pub(crate) fn to_df_expr(
| RexType::Cast(_)
| RexType::Subquery(_)
| RexType::Enum(_) => UnsupportedExprSnafu {
name: format!("substrait expression {expr_rex_type:?}"),
name: format!("substrait expression {:?}", expr_rex_type),
}
.fail()?,
}
@@ -110,7 +109,7 @@ pub fn convert_scalar_function(
let fn_name = ctx
.find_scalar_fn(anchor)
.with_context(|| InvalidParametersSnafu {
reason: format!("Unregistered scalar function reference: {anchor}"),
reason: format!("Unregistered scalar function reference: {}", anchor),
})?;
// convenient util
@@ -332,19 +331,19 @@ pub fn convert_scalar_function(
// skip Cast and TryCast, is covered in substrait::Cast.
"sort" | "sort_des" => {
ensure_arg_len(1)?;
Expr::Sort(Sort {
Expr::Sort {
expr: Box::new(inputs.pop_front().unwrap()),
asc: false,
nulls_first: false,
})
}
}
"sort_asc" => {
ensure_arg_len(1)?;
Expr::Sort(Sort {
Expr::Sort {
expr: Box::new(inputs.pop_front().unwrap()),
asc: true,
nulls_first: false,
})
}
}
// those are datafusion built-in "scalar functions".
"abs"
@@ -436,7 +435,7 @@ pub fn convert_scalar_function(
// skip Wildcard, unimplemented.
// end other direct expr
_ => UnsupportedExprSnafu {
name: format!("scalar function {fn_name}"),
name: format!("scalar function {}", fn_name),
}
.fail()?,
};
@@ -538,11 +537,11 @@ pub fn expression_from_df_expr(
name: expr.to_string(),
}
.fail()?,
Expr::Sort(Sort {
Expr::Sort {
expr,
asc,
nulls_first: _,
}) => {
} => {
let expr = expression_from_df_expr(ctx, expr, schema)?;
let arguments = utils::expression_to_argument(vec![expr]);
let op_name = if *asc { "sort_asc" } else { "sort_des" };
@@ -578,7 +577,6 @@ pub fn expression_from_df_expr(
| Expr::Exists { .. }
| Expr::InSubquery { .. }
| Expr::ScalarSubquery(..)
| Expr::Placeholder { .. }
| Expr::QualifiedWildcard { .. } => todo!(),
Expr::GroupingSet(_) => UnsupportedExprSnafu {
name: expr.to_string(),
@@ -597,8 +595,8 @@ pub fn convert_column(column: &Column, schema: &Schema) -> Result<FieldReference
schema
.column_index_by_name(column_name)
.with_context(|| MissingFieldSnafu {
field: format!("{column:?}"),
plan: format!("schema: {schema:?}"),
field: format!("{:?}", column),
plan: format!("schema: {:?}", schema),
})?;
Ok(FieldReference {
@@ -648,8 +646,6 @@ mod utils {
Operator::BitwiseShiftRight => "bitwise_shift_right",
Operator::BitwiseShiftLeft => "bitwise_shift_left",
Operator::StringConcat => "string_concat",
Operator::ILike => "i_like",
Operator::NotILike => "not_i_like",
}
}

View File

@@ -19,7 +19,7 @@ use catalog::CatalogManagerRef;
use common_error::prelude::BoxedError;
use common_telemetry::debug;
use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use datafusion::common::{DFField, DFSchema};
use datafusion::common::ToDFSchema;
use datafusion::datasource::DefaultTableSource;
use datafusion::physical_plan::project_schema;
use datafusion_expr::{Filter, LogicalPlan, TableScan, TableSource};
@@ -236,7 +236,7 @@ impl DFLogicalSubstraitConvertor {
.map_err(BoxedError::new)
.context(InternalSnafu)?
.context(TableNotFoundSnafu {
name: format!("{catalog_name}.{schema_name}.{table_name}"),
name: format!("{}.{}.{}", catalog_name, schema_name, table_name),
})?;
let adapter = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(table_ref),
@@ -262,26 +262,16 @@ impl DFLogicalSubstraitConvertor {
};
// Calculate the projected schema
let qualified = &format!("{catalog_name}.{schema_name}.{table_name}");
let projected_schema = Arc::new(
project_schema(&stored_schema, projection.as_ref())
.and_then(|x| {
DFSchema::new_with_metadata(
x.fields()
.iter()
.map(|f| DFField::from_qualified(qualified, f.clone()))
.collect(),
x.metadata().clone(),
)
})
.context(DFInternalSnafu)?,
);
let projected_schema = project_schema(&stored_schema, projection.as_ref())
.context(DFInternalSnafu)?
.to_dfschema_ref()
.context(DFInternalSnafu)?;
ctx.set_df_schema(projected_schema.clone());
// TODO(ruihang): Support limit(fetch)
Ok(LogicalPlan::TableScan(TableScan {
table_name: format!("{catalog_name}.{schema_name}.{table_name}"),
table_name: format!("{}.{}.{}", catalog_name, schema_name, table_name),
source: adapter,
projection,
projected_schema,
@@ -395,10 +385,10 @@ impl DFLogicalSubstraitConvertor {
| LogicalPlan::Values(_)
| LogicalPlan::Explain(_)
| LogicalPlan::Analyze(_)
| LogicalPlan::Extension(_)
| LogicalPlan::Prepare(_) => InvalidParametersSnafu {
| LogicalPlan::Extension(_) => InvalidParametersSnafu {
reason: format!(
"Trying to convert DDL/DML plan to substrait proto, plan: {plan:?}",
"Trying to convert DDL/DML plan to substrait proto, plan: {:?}",
plan
),
}
.fail()?,
@@ -572,7 +562,7 @@ mod test {
let proto = convertor.encode(plan.clone()).unwrap();
let tripped_plan = convertor.decode(proto, catalog).unwrap();
assert_eq!(format!("{plan:?}"), format!("{tripped_plan:?}"));
assert_eq!(format!("{:?}", plan), format!("{:?}", tripped_plan));
}
#[tokio::test]
@@ -606,7 +596,8 @@ mod test {
let table_scan_plan = LogicalPlan::TableScan(TableScan {
table_name: format!(
"{DEFAULT_CATALOG_NAME}.{DEFAULT_SCHEMA_NAME}.{DEFAULT_TABLE_NAME}",
"{}.{}.{}",
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_TABLE_NAME
),
source: adapter,
projection: Some(projection),

View File

@@ -87,7 +87,7 @@ pub fn to_concrete_type(ty: &SType) -> Result<(ConcreteDataType, bool)> {
| Kind::List(_)
| Kind::Map(_)
| Kind::UserDefinedTypeReference(_) => UnsupportedSubstraitTypeSnafu {
ty: format!("{kind:?}"),
ty: format!("{:?}", kind),
}
.fail(),
}
@@ -154,7 +154,7 @@ pub(crate) fn scalar_value_as_literal_type(v: &ScalarValue) -> Result<LiteralTyp
// TODO(LFC): Implement other conversions: ScalarValue => LiteralType
_ => {
return error::UnsupportedExprSnafu {
name: format!("{v:?}"),
name: format!("{:?}", v),
}
.fail()
}
@@ -177,7 +177,7 @@ pub(crate) fn literal_type_to_scalar_value(t: LiteralType) -> Result<ScalarValue
// TODO(LFC): Implement other conversions: Kind => ScalarValue
_ => {
return error::UnsupportedSubstraitTypeSnafu {
ty: format!("{kind:?}"),
ty: format!("{:?}", kind),
}
.fail()
}
@@ -194,7 +194,7 @@ pub(crate) fn literal_type_to_scalar_value(t: LiteralType) -> Result<ScalarValue
// TODO(LFC): Implement other conversions: LiteralType => ScalarValue
_ => {
return error::UnsupportedSubstraitTypeSnafu {
ty: format!("{t:?}"),
ty: format!("{:?}", t),
}
.fail()
}

View File

@@ -1,8 +1,8 @@
[package]
name = "common-telemetry"
version.workspace = true
edition.workspace = true
license.workspace = true
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
[features]
console = ["console-subscriber"]

View File

@@ -28,7 +28,7 @@ pub fn set_panic_hook() {
let default_hook = panic::take_hook();
panic::set_hook(Box::new(move |panic| {
let backtrace = Backtrace::new();
let backtrace = format!("{backtrace:?}");
let backtrace = format!("{:?}", backtrace);
if let Some(location) = panic.location() {
tracing::error!(
message = %panic,

View File

@@ -1,8 +1,8 @@
[package]
name = "common-time"
version.workspace = true
edition.workspace = true
license.workspace = true
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
[dependencies]
chrono = "0.4"

View File

@@ -1,8 +1,8 @@
[package]
name = "datanode"
version.workspace = true
edition.workspace = true
license.workspace = true
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
[features]
default = ["python"]
@@ -25,7 +25,7 @@ common-recordbatch = { path = "../common/recordbatch" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datafusion.workspace = true
datafusion = "14.0.0"
datatypes = { path = "../datatypes" }
futures = "0.3"
hyper = { version = "0.14", features = ["full"] }
@@ -57,5 +57,5 @@ tower-http = { version = "0.3", features = ["full"] }
axum-test-helper = { git = "https://github.com/sunng87/axum-test-helper.git", branch = "patch-1" }
client = { path = "../client" }
common-query = { path = "../common/query" }
datafusion-common.workspace = true
datafusion-common = "14.0.0"
tempdir = "0.3"

View File

@@ -45,35 +45,6 @@ impl Default for ObjectStoreConfig {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WalConfig {
// wal directory
pub dir: String,
// wal file size in bytes
pub file_size: usize,
// wal purge threshold in bytes
pub purge_threshold: usize,
// purge interval in seconds
pub purge_interval: u64,
// read batch size
pub read_batch_size: usize,
// whether to sync log file after every write
pub sync_write: bool,
}
impl Default for WalConfig {
fn default() -> Self {
Self {
dir: "/tmp/greptimedb/wal".to_string(),
file_size: 1024 * 1024 * 1024, // log file size 1G
purge_threshold: 1024 * 1024 * 1024 * 50, // purge threshold 50G
purge_interval: 600,
read_batch_size: 128,
sync_write: false,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DatanodeOptions {
pub node_id: Option<u64>,
@@ -82,7 +53,7 @@ pub struct DatanodeOptions {
pub mysql_addr: String,
pub mysql_runtime_size: usize,
pub meta_client_opts: Option<MetaClientOpts>,
pub wal: WalConfig,
pub wal_dir: String,
pub storage: ObjectStoreConfig,
pub enable_memory_catalog: bool,
pub mode: Mode,
@@ -97,7 +68,7 @@ impl Default for DatanodeOptions {
mysql_addr: "127.0.0.1:4406".to_string(),
mysql_runtime_size: 2,
meta_client_opts: None,
wal: WalConfig::default(),
wal_dir: "/tmp/greptimedb/wal".to_string(),
storage: ObjectStoreConfig::default(),
enable_memory_catalog: false,
mode: Mode::Standalone,

View File

@@ -1,10 +1,10 @@
// Copyright 2023 Greptime Team
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
@@ -144,12 +144,18 @@ pub enum Error {
source: log_store::error::Error,
},
#[snafu(display("Failed to star log store gc task, source: {}", source))]
StartLogStore {
#[snafu(backtrace)]
source: log_store::error::Error,
},
#[snafu(display("Failed to storage engine, source: {}", source))]
OpenStorageEngine { source: StorageError },
#[snafu(display("Failed to init backend, config: {:#?}, source: {}", config, source))]
InitBackend {
config: Box<ObjectStoreConfig>,
config: ObjectStoreConfig,
source: object_store::Error,
backtrace: Backtrace,
},
@@ -361,6 +367,7 @@ impl ErrorExt for Error {
Error::BumpTableId { source, .. } => source.status_code(),
Error::MissingNodeId { .. } => StatusCode::InvalidArguments,
Error::MissingMetasrvOpts { .. } => StatusCode::InvalidArguments,
Error::StartLogStore { source, .. } => source.status_code(),
}
}

View File

@@ -21,8 +21,8 @@ use catalog::remote::MetaKvBackend;
use catalog::CatalogManagerRef;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_telemetry::logging::info;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::LogConfig;
use log_store::fs::config::LogConfig;
use log_store::fs::log::LocalFileLogStore;
use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_client::MetaClientOpts;
use mito::config::EngineConfig as TableEngineConfig;
@@ -36,12 +36,13 @@ use servers::Mode;
use snafu::prelude::*;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use store_api::logstore::LogStore;
use table::table::TableIdProviderRef;
use crate::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig};
use crate::datanode::{DatanodeOptions, ObjectStoreConfig};
use crate::error::{
self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu,
NewCatalogSnafu, OpenLogStoreSnafu, Result,
NewCatalogSnafu, Result, StartLogStoreSnafu,
};
use crate::heartbeat::HeartbeatTask;
use crate::script::ScriptExecutor;
@@ -51,7 +52,7 @@ mod grpc;
mod script;
mod sql;
pub(crate) type DefaultEngine = MitoEngine<EngineImpl<RaftEngineLogStore>>;
pub(crate) type DefaultEngine = MitoEngine<EngineImpl<LocalFileLogStore>>;
// An abstraction to read/write services.
pub struct Instance {
@@ -61,6 +62,7 @@ pub struct Instance {
pub(crate) script_executor: ScriptExecutor,
pub(crate) table_id_provider: Option<TableIdProviderRef>,
pub(crate) heartbeat_task: Option<HeartbeatTask>,
pub(crate) logstore: Arc<LocalFileLogStore>,
}
pub type InstanceRef = Arc<Instance>;
@@ -68,7 +70,7 @@ pub type InstanceRef = Arc<Instance>;
impl Instance {
pub async fn new(opts: &DatanodeOptions) -> Result<Self> {
let object_store = new_object_store(&opts.storage).await?;
let logstore = Arc::new(create_log_store(&opts.wal).await?);
let logstore = Arc::new(create_local_file_log_store(&opts.wal_dir).await?);
let meta_client = match opts.mode {
Mode::Standalone => None,
@@ -158,6 +160,7 @@ impl Instance {
script_executor,
heartbeat_task,
table_id_provider,
logstore,
})
}
@@ -166,6 +169,7 @@ impl Instance {
.start()
.await
.context(NewCatalogSnafu)?;
self.logstore.start().await.context(StartLogStoreSnafu)?;
if let Some(task) = &self.heartbeat_task {
task.start().await?;
}
@@ -229,7 +233,7 @@ pub(crate) async fn new_fs_object_store(data_dir: &str) -> Result<ObjectStore> {
.context(error::CreateDirSnafu { dir: &data_dir })?;
info!("The file storage directory is: {}", &data_dir);
let atomic_write_dir = format!("{data_dir}/.tmp/");
let atomic_write_dir = format!("{}/.tmp/", data_dir);
let accessor = FsBuilder::default()
.root(&data_dir)
@@ -271,23 +275,23 @@ async fn new_metasrv_client(node_id: u64, meta_config: &MetaClientOpts) -> Resul
Ok(meta_client)
}
pub(crate) async fn create_log_store(wal_config: &WalConfig) -> Result<RaftEngineLogStore> {
pub(crate) async fn create_local_file_log_store(
path: impl AsRef<str>,
) -> Result<LocalFileLogStore> {
let path = path.as_ref();
// create WAL directory
fs::create_dir_all(path::Path::new(&wal_config.dir)).context(error::CreateDirSnafu {
dir: &wal_config.dir,
})?;
info!("Creating logstore with config: {:?}", wal_config);
fs::create_dir_all(path::Path::new(path)).context(error::CreateDirSnafu { dir: path })?;
info!("The WAL directory is: {}", path);
let log_config = LogConfig {
file_size: wal_config.file_size,
log_file_dir: wal_config.dir.clone(),
purge_interval: Duration::from_secs(wal_config.purge_interval),
purge_threshold: wal_config.purge_threshold,
read_batch_size: wal_config.read_batch_size,
sync_write: wal_config.sync_write,
log_file_dir: path.to_string(),
..Default::default()
};
let logstore = RaftEngineLogStore::try_new(log_config)
let log_store = LocalFileLogStore::open(&log_config)
.await
.context(OpenLogStoreSnafu)?;
Ok(logstore)
.context(error::OpenLogStoreSnafu)?;
Ok(log_store)
}

View File

@@ -117,7 +117,7 @@ impl Instance {
}
Some(select_expr::Expr::LogicalPlan(plan)) => self.execute_logical(plan).await,
_ => UnsupportedExprSnafu {
name: format!("{expr:?}"),
name: format!("{:?}", expr),
}
.fail(),
}
@@ -175,7 +175,7 @@ impl GrpcQueryHandler for Instance {
Some(object_expr::Expr::Select(select_expr)) => self.handle_select(select_expr).await,
other => {
return servers::error::NotSupportedSnafu {
feat: format!("{other:?}"),
feat: format!("{:?}", other),
}
.fail();
}
@@ -188,9 +188,7 @@ impl GrpcQueryHandler for Instance {
impl GrpcAdminHandler for Instance {
async fn exec_admin_request(&self, expr: AdminExpr) -> servers::error::Result<AdminResult> {
let admin_resp = match expr.expr {
Some(admin_expr::Expr::CreateTable(create_expr)) => {
self.handle_create(create_expr).await
}
Some(admin_expr::Expr::Create(create_expr)) => self.handle_create(create_expr).await,
Some(admin_expr::Expr::Alter(alter_expr)) => self.handle_alter(alter_expr).await,
Some(admin_expr::Expr::CreateDatabase(create_database_expr)) => {
self.execute_create_database(create_database_expr).await
@@ -200,7 +198,7 @@ impl GrpcAdminHandler for Instance {
}
other => {
return servers::error::NotSupportedSnafu {
feat: format!("{other:?}"),
feat: format!("{:?}", other),
}
.fail();
}

View File

@@ -33,11 +33,12 @@ use crate::metric;
use crate::sql::SqlRequest;
impl Instance {
pub async fn execute_stmt(
&self,
stmt: Statement,
query_ctx: QueryContextRef,
) -> Result<Output> {
pub async fn execute_sql(&self, sql: &str, query_ctx: QueryContextRef) -> Result<Output> {
let stmt = self
.query_engine
.sql_to_statement(sql)
.context(ExecuteSqlSnafu)?;
match stmt {
Statement::Query(_) => {
let logical_plan = self
@@ -152,14 +153,6 @@ impl Instance {
}
}
}
pub async fn execute_sql(&self, sql: &str, query_ctx: QueryContextRef) -> Result<Output> {
let stmt = self
.query_engine
.sql_to_statement(sql)
.context(ExecuteSqlSnafu)?;
self.execute_stmt(stmt, query_ctx).await
}
}
// TODO(LFC): Refactor consideration: move this function to some helper mod,
@@ -187,7 +180,8 @@ fn table_idents_to_full_name(
)),
_ => error::InvalidSqlSnafu {
msg: format!(
"expect table name to be <catalog>.<schema>.<table>, <schema>.<table> or <table>, actual: {obj_name}",
"expect table name to be <catalog>.<schema>.<table>, <schema>.<table> or <table>, actual: {}",
obj_name
),
}.fail(),
}
@@ -199,40 +193,15 @@ impl SqlQueryHandler for Instance {
&self,
query: &str,
query_ctx: QueryContextRef,
) -> Vec<servers::error::Result<Output>> {
let _timer = timer!(metric::METRIC_HANDLE_SQL_ELAPSED);
// we assume sql string has only 1 statement in datanode
let result = self
.execute_sql(query, query_ctx)
.await
.map_err(|e| {
error!(e; "Instance failed to execute sql");
BoxedError::new(e)
})
.context(servers::error::ExecuteQuerySnafu { query });
vec![result]
}
async fn do_statement_query(
&self,
stmt: Statement,
query_ctx: QueryContextRef,
) -> servers::error::Result<Output> {
let _timer = timer!(metric::METRIC_HANDLE_SQL_ELAPSED);
self.execute_stmt(stmt, query_ctx)
self.execute_sql(query, query_ctx)
.await
.map_err(|e| {
error!(e; "Instance failed to execute sql");
BoxedError::new(e)
})
.context(servers::error::ExecuteStatementSnafu)
}
fn is_valid_schema(&self, catalog: &str, schema: &str) -> servers::error::Result<bool> {
self.catalog_manager
.schema(catalog, schema)
.map(|s| s.is_some())
.context(servers::error::CatalogSnafu)
.context(servers::error::ExecuteQuerySnafu { query })
}
}

View File

@@ -29,7 +29,7 @@ use table::table::TableIdProvider;
use crate::datanode::DatanodeOptions;
use crate::error::Result;
use crate::heartbeat::HeartbeatTask;
use crate::instance::{create_log_store, new_object_store, DefaultEngine, Instance};
use crate::instance::{create_local_file_log_store, new_object_store, DefaultEngine, Instance};
use crate::script::ScriptExecutor;
use crate::sql::SqlHandler;
@@ -41,7 +41,7 @@ impl Instance {
pub async fn with_mock_meta_server(opts: &DatanodeOptions, meta_srv: MockInfo) -> Result<Self> {
let object_store = new_object_store(&opts.storage).await?;
let logstore = Arc::new(create_log_store(&opts.wal).await?);
let logstore = Arc::new(create_local_file_log_store(&opts.wal_dir).await?);
let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id.unwrap_or(42)).await);
let table_engine = Arc::new(DefaultEngine::new(
TableEngineConfig::default(),
@@ -83,6 +83,7 @@ impl Instance {
script_executor,
table_id_provider: Some(Arc::new(LocalTableIdProvider::default())),
heartbeat_task: Some(heartbeat_task),
logstore,
})
}
}

View File

@@ -40,7 +40,7 @@ impl Services {
pub async fn try_new(instance: InstanceRef, opts: &DatanodeOptions) -> Result<Self> {
let grpc_runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(opts.rpc_runtime_size)
.worker_threads(opts.rpc_runtime_size as usize)
.thread_name("grpc-io-handlers")
.build()
.context(RuntimeResourceSnafu)?,
@@ -54,7 +54,7 @@ impl Services {
Mode::Distributed => {
let mysql_io_runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(opts.mysql_runtime_size)
.worker_threads(opts.mysql_runtime_size as usize)
.thread_name("mysql-io-handlers")
.build()
.context(RuntimeResourceSnafu)?,

View File

@@ -15,7 +15,7 @@
use std::sync::Arc;
use api::result::AdminResultBuilder;
use api::v1::{AdminResult, AlterExpr, CreateTableExpr, DropTableExpr};
use api::v1::{AdminResult, AlterExpr, CreateExpr, DropTableExpr};
use common_error::prelude::{ErrorExt, StatusCode};
use common_grpc_expr::{alter_expr_to_request, create_expr_to_request};
use common_query::Output;
@@ -31,15 +31,15 @@ use crate::sql::SqlRequest;
impl Instance {
/// Handle gRPC create table requests.
pub(crate) async fn handle_create(&self, expr: CreateTableExpr) -> AdminResult {
pub(crate) async fn handle_create(&self, expr: CreateExpr) -> AdminResult {
// Respect CreateExpr's table id and region ids if present, or allocate table id
// from local table id provider and set region id to 0.
let table_id = if let Some(table_id) = &expr.table_id {
let table_id = if let Some(table_id) = expr.table_id {
info!(
"Creating table {:?}.{:?}.{:?} with table id from frontend: {}",
expr.catalog_name, expr.schema_name, expr.table_name, table_id.id
expr.catalog_name, expr.schema_name, expr.table_name, table_id
);
table_id.id
table_id
} else {
match self.table_id_provider.as_ref() {
None => {
@@ -157,7 +157,7 @@ impl Instance {
mod tests {
use std::sync::Arc;
use api::v1::{ColumnDataType, ColumnDef, TableId};
use api::v1::{ColumnDataType, ColumnDef};
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_grpc_expr::create_table_schema;
use datatypes::prelude::ConcreteDataType;
@@ -175,7 +175,7 @@ mod tests {
assert_eq!(request.catalog_name, "greptime".to_string());
assert_eq!(request.schema_name, "public".to_string());
assert_eq!(request.table_name, "my-metrics");
assert_eq!(request.desc, Some("blabla little magic fairy".to_string()));
assert_eq!(request.desc, Some("blabla".to_string()));
assert_eq!(request.schema, expected_table_schema());
assert_eq!(request.primary_key_indices, vec![1, 0]);
assert!(request.create_if_not_exists);
@@ -202,7 +202,8 @@ mod tests {
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("Missing timestamp column"),
"actual: {err_msg}",
"actual: {}",
err_msg
);
}
@@ -213,7 +214,7 @@ mod tests {
name: "a".to_string(),
datatype: 1024,
is_nullable: true,
default_constraint: vec![],
default_constraint: None,
};
let result = column_def.try_as_column_schema();
assert!(matches!(
@@ -225,7 +226,7 @@ mod tests {
name: "a".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: vec![],
default_constraint: None,
};
let column_schema = column_def.try_as_column_schema().unwrap();
assert_eq!(column_schema.name, "a");
@@ -237,7 +238,7 @@ mod tests {
name: "a".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: default_constraint.clone().try_into().unwrap(),
default_constraint: Some(default_constraint.clone().try_into().unwrap()),
};
let column_schema = column_def.try_as_column_schema().unwrap();
assert_eq!(column_schema.name, "a");
@@ -249,46 +250,44 @@ mod tests {
);
}
fn testing_create_expr() -> CreateTableExpr {
fn testing_create_expr() -> CreateExpr {
let column_defs = vec![
ColumnDef {
name: "host".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: false,
default_constraint: vec![],
default_constraint: None,
},
ColumnDef {
name: "ts".to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
is_nullable: false,
default_constraint: vec![],
default_constraint: None,
},
ColumnDef {
name: "cpu".to_string(),
datatype: ColumnDataType::Float32 as i32,
is_nullable: true,
default_constraint: vec![],
default_constraint: None,
},
ColumnDef {
name: "memory".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: vec![],
default_constraint: None,
},
];
CreateTableExpr {
catalog_name: "".to_string(),
schema_name: "".to_string(),
CreateExpr {
catalog_name: None,
schema_name: None,
table_name: "my-metrics".to_string(),
desc: "blabla little magic fairy".to_string(),
desc: Some("blabla".to_string()),
column_defs,
time_index: "ts".to_string(),
primary_keys: vec!["ts".to_string(), "host".to_string()],
create_if_not_exists: true,
table_options: Default::default(),
table_id: Some(TableId {
id: MIN_USER_TABLE_ID,
}),
table_id: Some(MIN_USER_TABLE_ID),
region_ids: vec![0],
}
}

View File

@@ -96,7 +96,7 @@ impl SqlHandler {
result
}
pub(crate) fn get_table(&self, table_ref: &TableReference) -> Result<TableRef> {
pub(crate) fn get_table<'a>(&self, table_ref: &'a TableReference) -> Result<TableRef> {
self.table_engine
.get_table(&EngineContext::default(), table_ref)
.with_context(|_| GetTableSnafu {
@@ -125,7 +125,7 @@ mod tests {
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
use datatypes::value::Value;
use log_store::NoopLogStore;
use log_store::fs::noop::NoopLogStore;
use mito::config::EngineConfig as TableEngineConfig;
use mito::engine::MitoEngine;
use object_store::services::fs::Builder;
@@ -176,7 +176,7 @@ mod tests {
async fn scan(
&self,
_projection: Option<&Vec<usize>>,
_projection: &Option<Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> TableResult<PhysicalPlanRef> {

View File

@@ -61,7 +61,7 @@ impl SqlHandler {
let alter_kind = match alter_table.alter_operation() {
AlterTableOperation::AddConstraint(table_constraint) => {
return error::InvalidSqlSnafu {
msg: format!("unsupported table constraint {table_constraint}"),
msg: format!("unsupported table constraint {}", table_constraint),
}
.fail()
}

View File

@@ -143,7 +143,7 @@ impl SqlHandler {
)?;
} else {
return error::InvalidSqlSnafu {
msg: format!("Cannot recognize named UNIQUE constraint: {name}"),
msg: format!("Cannot recognize named UNIQUE constraint: {}", name),
}
.fail();
}
@@ -158,7 +158,8 @@ impl SqlHandler {
} else {
return error::InvalidSqlSnafu {
msg: format!(
"Unrecognized non-primary unnamed UNIQUE constraint: {name:?}",
"Unrecognized non-primary unnamed UNIQUE constraint: {:?}",
name
),
}
.fail();
@@ -166,7 +167,7 @@ impl SqlHandler {
}
_ => {
return ConstraintNotSupportedSnafu {
constraint: format!("{c:?}"),
constraint: format!("{:?}", c),
}
.fail();
}

View File

@@ -487,11 +487,12 @@ async fn test_insert_with_default_value_for_type(type_name: &str) {
let create_sql = format!(
r#"create table test_table(
host string,
ts {type_name} DEFAULT CURRENT_TIMESTAMP,
ts {} DEFAULT CURRENT_TIMESTAMP,
cpu double default 0,
TIME INDEX (ts),
PRIMARY KEY(host)
) engine=mito with(regions=1);"#,
type_name
);
let output = execute_sql(&instance, &create_sql).await;
assert!(matches!(output, Output::AffectedRows(1)));

View File

@@ -28,7 +28,7 @@ use table::engine::{EngineContext, TableEngineRef};
use table::requests::CreateTableRequest;
use tempdir::TempDir;
use crate::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig};
use crate::datanode::{DatanodeOptions, ObjectStoreConfig};
use crate::error::{CreateTableSnafu, Result};
use crate::sql::SqlHandler;
@@ -40,13 +40,10 @@ pub struct TestGuard {
}
pub fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) {
let wal_tmp_dir = TempDir::new(&format!("gt_wal_{name}")).unwrap();
let data_tmp_dir = TempDir::new(&format!("gt_data_{name}")).unwrap();
let wal_tmp_dir = TempDir::new(&format!("gt_wal_{}", name)).unwrap();
let data_tmp_dir = TempDir::new(&format!("gt_data_{}", name)).unwrap();
let opts = DatanodeOptions {
wal: WalConfig {
dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
..Default::default()
},
wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
storage: ObjectStoreConfig::File {
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
},

View File

@@ -1,20 +1,20 @@
[package]
name = "datatypes"
version.workspace = true
edition.workspace = true
license.workspace = true
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
[features]
default = []
test = []
[dependencies]
arrow.workspace = true
arrow-schema.workspace = true
arrow = { version = "26.0" }
arrow-schema = { version = "26.0", features = ["serde"] }
common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-time = { path = "../common/time" }
datafusion-common.workspace = true
datafusion-common = "14.0"
enum_dispatch = "0.3"
num = "0.4"
num-traits = "0.2"

View File

@@ -139,7 +139,7 @@ mod tests {
map.insert(false, 2);
let result = serde_json::to_string(&map).context(SerializeSnafu);
assert!(result.is_err(), "serialize result is: {result:?}");
assert!(result.is_err(), "serialize result is: {:?}", result);
let err = serde_json::to_string(&map)
.context(SerializeSnafu)
.err()

View File

@@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(generic_associated_types)]
#![feature(assert_matches)]
pub mod arrow_array;
pub mod data_type;
pub mod error;

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::collections::BTreeMap;
use arrow::datatypes::Field;
use serde::{Deserialize, Serialize};
@@ -23,7 +23,7 @@ use crate::error::{self, Error, Result};
use crate::schema::constraint::ColumnDefaultConstraint;
use crate::vectors::VectorRef;
pub type Metadata = HashMap<String, String>;
pub type Metadata = BTreeMap<String, String>;
/// Key used to store whether the column is time index in arrow field's metadata.
const TIME_INDEX_KEY: &str = "greptime:time_index";
@@ -131,7 +131,7 @@ impl TryFrom<&Field> for ColumnSchema {
fn try_from(field: &Field) -> Result<ColumnSchema> {
let data_type = ConcreteDataType::try_from(field.data_type())?;
let mut metadata = field.metadata().clone();
let mut metadata = field.metadata().cloned().unwrap_or_default();
let default_constraint = match metadata.remove(DEFAULT_CONSTRAINT_KEY) {
Some(json) => {
Some(serde_json::from_str(&json).context(error::DeserializeSnafu { json })?)
@@ -176,7 +176,7 @@ impl TryFrom<&ColumnSchema> for Field {
column_schema.data_type.as_arrow_type(),
column_schema.is_nullable(),
)
.with_metadata(metadata))
.with_metadata(Some(metadata)))
}
}
@@ -215,7 +215,11 @@ mod tests {
assert!(field.is_nullable());
assert_eq!(
"{\"Value\":{\"Int32\":99}}",
field.metadata().get(DEFAULT_CONSTRAINT_KEY).unwrap()
field
.metadata()
.unwrap()
.get(DEFAULT_CONSTRAINT_KEY)
.unwrap()
);
let new_column_schema = ColumnSchema::try_from(&field).unwrap();
@@ -237,8 +241,12 @@ mod tests {
.is_none());
let field = Field::try_from(&column_schema).unwrap();
assert_eq!("v1", field.metadata().get("k1").unwrap());
assert!(field.metadata().get(DEFAULT_CONSTRAINT_KEY).is_some());
assert_eq!("v1", field.metadata().unwrap().get("k1").unwrap());
assert!(field
.metadata()
.unwrap()
.get(DEFAULT_CONSTRAINT_KEY)
.is_some());
let new_column_schema = ColumnSchema::try_from(&field).unwrap();
assert_eq!(column_schema, new_column_schema);

View File

@@ -57,8 +57,8 @@ impl TryFrom<ColumnDefaultConstraint> for Vec<u8> {
impl Display for ColumnDefaultConstraint {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
ColumnDefaultConstraint::Function(expr) => write!(f, "{expr}"),
ColumnDefaultConstraint::Value(v) => write!(f, "{v}"),
ColumnDefaultConstraint::Function(expr) => write!(f, "{}", expr),
ColumnDefaultConstraint::Value(v) => write!(f, "{}", v),
}
}
}
@@ -172,7 +172,10 @@ fn create_current_timestamp_vector(
std::iter::repeat(util::current_time_millis()).take(num_rows),
))),
_ => error::DefaultValueTypeSnafu {
reason: format!("Not support to assign current timestamp to {data_type:?} type",),
reason: format!(
"Not support to assign current timestamp to {:?} type",
data_type
),
}
.fail(),
}
@@ -298,6 +301,6 @@ mod tests {
let err = constraint
.create_default_vector(&data_type, false, 4)
.unwrap_err();
assert!(matches!(err, Error::DefaultValueType { .. }), "{err:?}");
assert!(matches!(err, Error::DefaultValueType { .. }), "{:?}", err);
}
}

View File

@@ -83,7 +83,7 @@ impl LogicalPrimitiveType for DateType {
ValueRef::Null => Ok(None),
ValueRef::Date(v) => Ok(Some(v)),
other => error::CastTypeSnafu {
msg: format!("Failed to cast value {other:?} to Date"),
msg: format!("Failed to cast value {:?} to Date", other,),
}
.fail(),
}

View File

@@ -84,7 +84,7 @@ impl LogicalPrimitiveType for DateTimeType {
ValueRef::Null => Ok(None),
ValueRef::DateTime(v) => Ok(Some(v)),
other => error::CastTypeSnafu {
msg: format!("Failed to cast value {other:?} to DateTime"),
msg: format!("Failed to cast value {:?} to DateTime", other,),
}
.fail(),
}

View File

@@ -49,7 +49,7 @@ impl DataType for NullType {
}
fn create_mutable_vector(&self, _capacity: usize) -> Box<dyn MutableVector> {
Box::<NullVectorBuilder>::default()
Box::new(NullVectorBuilder::default())
}
fn is_timestamp_compatible(&self) -> bool {

View File

@@ -70,31 +70,31 @@ impl Display for Value {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Value::Null => write!(f, "{}", self.data_type().name()),
Value::Boolean(v) => write!(f, "{v}"),
Value::UInt8(v) => write!(f, "{v}"),
Value::UInt16(v) => write!(f, "{v}"),
Value::UInt32(v) => write!(f, "{v}"),
Value::UInt64(v) => write!(f, "{v}"),
Value::Int8(v) => write!(f, "{v}"),
Value::Int16(v) => write!(f, "{v}"),
Value::Int32(v) => write!(f, "{v}"),
Value::Int64(v) => write!(f, "{v}"),
Value::Float32(v) => write!(f, "{v}"),
Value::Float64(v) => write!(f, "{v}"),
Value::Boolean(v) => write!(f, "{}", v),
Value::UInt8(v) => write!(f, "{}", v),
Value::UInt16(v) => write!(f, "{}", v),
Value::UInt32(v) => write!(f, "{}", v),
Value::UInt64(v) => write!(f, "{}", v),
Value::Int8(v) => write!(f, "{}", v),
Value::Int16(v) => write!(f, "{}", v),
Value::Int32(v) => write!(f, "{}", v),
Value::Int64(v) => write!(f, "{}", v),
Value::Float32(v) => write!(f, "{}", v),
Value::Float64(v) => write!(f, "{}", v),
Value::String(v) => write!(f, "{}", v.as_utf8()),
Value::Binary(v) => {
let hex = v
.iter()
.map(|b| format!("{b:02x}"))
.map(|b| format!("{:02x}", b))
.collect::<Vec<String>>()
.join("");
write!(f, "{hex}")
write!(f, "{}", hex)
}
Value::Date(v) => write!(f, "{v}"),
Value::DateTime(v) => write!(f, "{v}"),
Value::Date(v) => write!(f, "{}", v),
Value::DateTime(v) => write!(f, "{}", v),
Value::Timestamp(v) => write!(f, "{}", v.to_iso8601_string()),
Value::List(v) => {
let default = Box::<Vec<Value>>::default();
let default = Box::new(vec![]);
let items = v.items().as_ref().unwrap_or(&default);
let items = items
.iter()
@@ -146,7 +146,7 @@ impl Value {
Value::Null => Ok(None),
Value::List(v) => Ok(Some(v)),
other => error::CastTypeSnafu {
msg: format!("Failed to cast {other:?} to list value"),
msg: format!("Failed to cast {:?} to list value", other),
}
.fail(),
}
@@ -214,7 +214,8 @@ impl Value {
output_type_id == value_type_id || self.is_null(),
error::ToScalarValueSnafu {
reason: format!(
"expect value to return output_type {output_type_id:?}, actual: {value_type_id:?}",
"expect value to return output_type {:?}, actual: {:?}",
output_type_id, value_type_id,
),
}
);
@@ -543,15 +544,12 @@ impl TryFrom<ScalarValue> for Value {
.map(|x| Value::Timestamp(Timestamp::new(x, TimeUnit::Nanosecond)))
.unwrap_or(Value::Null),
ScalarValue::Decimal128(_, _, _)
| ScalarValue::Time64(_)
| ScalarValue::IntervalYearMonth(_)
| ScalarValue::IntervalDayTime(_)
| ScalarValue::IntervalMonthDayNano(_)
| ScalarValue::Struct(_, _)
| ScalarValue::Dictionary(_, _)
| ScalarValue::Time32Second(_)
| ScalarValue::Time32Millisecond(_)
| ScalarValue::Time64Microsecond(_)
| ScalarValue::Time64Nanosecond(_) => {
| ScalarValue::Dictionary(_, _) => {
return error::UnsupportedArrowTypeSnafu {
arrow_type: v.get_datatype(),
}
@@ -1344,7 +1342,7 @@ mod tests {
);
assert_eq!(
Value::List(ListValue::new(
Some(Box::default()),
Some(Box::new(vec![])),
ConcreteDataType::timestamp_second_datatype(),
))
.to_string(),
@@ -1352,7 +1350,7 @@ mod tests {
);
assert_eq!(
Value::List(ListValue::new(
Some(Box::default()),
Some(Box::new(vec![])),
ConcreteDataType::timestamp_millisecond_datatype(),
))
.to_string(),
@@ -1360,7 +1358,7 @@ mod tests {
);
assert_eq!(
Value::List(ListValue::new(
Some(Box::default()),
Some(Box::new(vec![])),
ConcreteDataType::timestamp_microsecond_datatype(),
))
.to_string(),
@@ -1368,7 +1366,7 @@ mod tests {
);
assert_eq!(
Value::List(ListValue::new(
Some(Box::default()),
Some(Box::new(vec![])),
ConcreteDataType::timestamp_nanosecond_datatype(),
))
.to_string(),

View File

@@ -252,7 +252,7 @@ mod tests {
#[test]
fn test_serialize_binary_vector_to_json() {
let vector = BinaryVector::from(BinaryArray::from_iter_values([
let vector = BinaryVector::from(BinaryArray::from_iter_values(&[
vec![1, 2, 3],
vec![1, 2, 3],
]));
@@ -281,7 +281,7 @@ mod tests {
#[test]
fn test_from_arrow_array() {
let arrow_array = BinaryArray::from_iter_values([vec![1, 2, 3], vec![1, 2, 3]]);
let arrow_array = BinaryArray::from_iter_values(&[vec![1, 2, 3], vec![1, 2, 3]]);
let original = BinaryArray::from(arrow_array.data().clone());
let vector = BinaryVector::from(arrow_array);
assert_eq!(original, vector.array);

View File

@@ -296,7 +296,7 @@ mod tests {
let vec = BooleanVector::from(input.clone());
assert_eq!(4, vec.len());
for (i, v) in input.into_iter().enumerate() {
assert_eq!(Some(v), vec.get_data(i), "Failed at {i}")
assert_eq!(Some(v), vec.get_data(i), "failed at {}", i)
}
}
@@ -306,7 +306,7 @@ mod tests {
let vec = input.iter().collect::<BooleanVector>();
assert_eq!(4, vec.len());
for (i, v) in input.into_iter().enumerate() {
assert_eq!(v, vec.get_data(i), "Failed at {i}")
assert_eq!(v, vec.get_data(i), "failed at {}", i)
}
}
@@ -316,7 +316,7 @@ mod tests {
let vec = BooleanVector::from(input.clone());
assert_eq!(4, vec.len());
for (i, v) in input.into_iter().enumerate() {
assert_eq!(v, vec.get_data(i), "failed at {i}")
assert_eq!(v, vec.get_data(i), "failed at {}", i)
}
}

View File

@@ -203,7 +203,7 @@ mod tests {
let a = Int32Vector::from_slice(vec![1]);
let c = ConstantVector::new(Arc::new(a), 10);
let s = format!("{c:?}");
let s = format!("{:?}", c);
assert_eq!(s, "ConstantVector([Int32(1); 10])");
}

View File

@@ -37,7 +37,7 @@ mod tests {
#[test]
fn test_datetime_vector() {
let v = DateTimeVector::new(PrimitiveArray::from_slice([1, 2, 3]));
let v = DateTimeVector::new(PrimitiveArray::from_slice(&[1, 2, 3]));
assert_eq!(ConcreteDataType::datetime_datatype(), v.data_type());
assert_eq!(3, v.len());
assert_eq!("DateTimeVector", v.vector_type_name());

View File

@@ -195,17 +195,14 @@ impl Helper {
ConstantVector::new(Arc::new(TimestampNanosecondVector::from(vec![v])), length)
}
ScalarValue::Decimal128(_, _, _)
| ScalarValue::Time64(_)
| ScalarValue::IntervalYearMonth(_)
| ScalarValue::IntervalDayTime(_)
| ScalarValue::IntervalMonthDayNano(_)
| ScalarValue::Struct(_, _)
| ScalarValue::Dictionary(_, _)
| ScalarValue::Time32Second(_)
| ScalarValue::Time32Millisecond(_)
| ScalarValue::Time64Microsecond(_)
| ScalarValue::Time64Nanosecond(_) => {
| ScalarValue::Dictionary(_, _) => {
return error::ConversionSnafu {
from: format!("Unsupported scalar value: {value}"),
from: format!("Unsupported scalar value: {}", value),
}
.fail()
}

View File

@@ -157,7 +157,10 @@ impl From<ListArray> for ListVector {
fn from(array: ListArray) -> Self {
let item_type = ConcreteDataType::from_arrow_type(match array.data_type() {
ArrowDataType::List(field) => field.data_type(),
other => panic!("Try to create ListVector from an arrow array with type {other:?}"),
other => panic!(
"Try to create ListVector from an arrow array with type {:?}",
other
),
});
Self { array, item_type }
}

Some files were not shown because too many files have changed in this diff Show More