diff --git a/Cargo.lock b/Cargo.lock index ec30af55ae..ca588da53b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -83,7 +83,9 @@ checksum = "08f9b8508dccb7687a1d6c4ce66b2b0ecef467c94667de27d8d7fe1f8d2a9cdc" name = "api" version = "0.1.0" dependencies = [ + "datatypes", "prost 0.11.0", + "snafu", "tonic 0.8.0", "tonic-build", ] @@ -691,16 +693,16 @@ dependencies = [ [[package]] name = "clap" -version = "3.2.16" +version = "3.1.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3dbbb6653e7c55cc8595ad3e1f7be8f32aba4eb7ff7f0fd1163d4f3d137c0a9" +checksum = "47582c09be7c8b32c0ab3a6181825ababb713fde6fff20fc573a3870dd45c6a0" dependencies = [ "atty", "bitflags", "clap_derive", "clap_lex", "indexmap", - "once_cell", + "lazy_static", "strsim 0.10.0", "termcolor", "textwrap 0.15.0", @@ -708,9 +710,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "3.2.15" +version = "3.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ba52acd3b0a5c33aeada5cdaa3267cdc7c594a98731d4268cdc1532f4264cb4" +checksum = "a3aab4734e083b809aaf5794e14e756d1c798d2c69c7f7de7a09a2f5214993c1" dependencies = [ "heck 0.4.0", "proc-macro-error", @@ -721,9 +723,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.2.4" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2850f2f5a82cbf437dd5af4d49848fbdfc27c157c3d010345776f952765261c5" +checksum = "a37c35f1112dad5e6e0b1adaff798507497a18fceeb30cceb3bae7d1427b9213" dependencies = [ "os_str_bytes", ] @@ -733,9 +735,17 @@ name = "client" version = "0.1.0" dependencies = [ "api", + "async-stream", + "catalog", + "common-base", "common-error", "common-grpc", + "common-recordbatch", + "common-time", "datafusion", + "datanode", + "datatypes", + "query", "snafu", "tokio", "tonic 0.8.0", @@ -767,10 +777,12 @@ dependencies = [ name = "cmd" version = "0.1.0" dependencies = [ - "clap 3.2.16", + "clap 3.1.17", "common-error", "common-telemetry", "datanode", + "frontend", + "futures", "serde", "snafu", "tempdir", @@ -1288,7 +1300,7 @@ dependencies = [ "datafusion-expr", "datafusion-physical-expr", "futures", - "hashbrown", + "hashbrown 0.12.1", "lazy_static", "log", "num_cpus", @@ -1339,7 +1351,7 @@ dependencies = [ "chrono", "datafusion-common", "datafusion-expr", - "hashbrown", + "hashbrown 0.12.1", "lazy_static", "md-5", "ordered-float 2.10.0", @@ -1678,6 +1690,38 @@ dependencies = [ "regex", ] +[[package]] +name = "frontend" +version = "0.1.0" +dependencies = [ + "api", + "arrow2", + "async-stream", + "async-trait", + "catalog", + "client", + "common-base", + "common-error", + "common-recordbatch", + "common-runtime", + "common-telemetry", + "common-time", + "datafusion", + "datafusion-common", + "datanode", + "datatypes", + "futures", + "query", + "serde", + "servers", + "snafu", + "sql", + "tempdir", + "tokio", + "tonic 0.8.0", + "tower", +] + [[package]] name = "frunk" version = "0.4.0" @@ -1934,6 +1978,12 @@ version = "2.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74721d007512d0cb3338cd20f0654ac913920061a4c4d0d8708edb3f2a698c0c" +[[package]] +name = "hashbrown" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" + [[package]] name = "hashbrown" version = "0.12.1" @@ -2116,12 +2166,12 @@ dependencies = [ [[package]] name = "indexmap" -version = "1.9.1" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e" +checksum = "0f647032dfaa1f8b6dc29bd3edb7bbef4861b8b8007ebb118d6db284fd59f6ee" dependencies = [ "autocfg", - "hashbrown", + "hashbrown 0.11.2", ] [[package]] @@ -2145,9 +2195,9 @@ dependencies = [ [[package]] name = "io-lifetimes" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24c3f4eff5495aee4c0399d7b6a0dc2b6e81be84242ffbfcf253ebacccc1d0cb" +checksum = "1ea37f355c05dde75b84bba2d767906ad522e97cd9e2eef2be7a4ab7fb442c06" [[package]] name = "ipnet" @@ -2277,18 +2327,18 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "lexical" -version = "6.1.1" +version = "6.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7aefb36fd43fef7003334742cbf77b243fcd36418a1d1bdd480d613a67968f6" +checksum = "ccd3e434c16f0164124ade12dcdee324fcc3dafb1cad0c7f1d8c2451a1aa6886" dependencies = [ "lexical-core", ] [[package]] name = "lexical-core" -version = "0.8.5" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cde5de06e8d4c2faabc400238f9ae1c74d5412d03a7bd067645ccbc47070e46" +checksum = "92912c4af2e7d9075be3e5e3122c4d7263855fa6cce34fbece4dd08e5884624d" dependencies = [ "lexical-parse-float", "lexical-parse-integer", @@ -2299,9 +2349,9 @@ dependencies = [ [[package]] name = "lexical-parse-float" -version = "0.8.5" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "683b3a5ebd0130b8fb52ba0bdc718cc56815b6a097e28ae5a6997d0ad17dc05f" +checksum = "f518eed87c3be6debe6d26b855c97358d8a11bf05acec137e5f53080f5ad2dd8" dependencies = [ "lexical-parse-integer", "lexical-util", @@ -2310,9 +2360,9 @@ dependencies = [ [[package]] name = "lexical-parse-integer" -version = "0.8.6" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d0994485ed0c312f6d965766754ea177d07f9c00c9b82a5ee62ed5b47945ee9" +checksum = "afc852ec67c6538bbb2b9911116a385b24510e879a69ab516e6a151b15a79168" dependencies = [ "lexical-util", "static_assertions", @@ -2320,18 +2370,18 @@ dependencies = [ [[package]] name = "lexical-util" -version = "0.8.5" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5255b9ff16ff898710eb9eb63cb39248ea8a5bb036bea8085b1a767ff6c4e3fc" +checksum = "c72a9d52c5c4e62fa2cdc2cb6c694a39ae1382d9c2a17a466f18e272a0930eb1" dependencies = [ "static_assertions", ] [[package]] name = "lexical-write-float" -version = "0.8.5" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "accabaa1c4581f05a3923d1b4cfd124c329352288b7b9da09e766b0668116862" +checksum = "8a89ec1d062e481210c309b672f73a0567b7855f21e7d2fae636df44d12e97f9" dependencies = [ "lexical-util", "lexical-write-integer", @@ -2340,9 +2390,9 @@ dependencies = [ [[package]] name = "lexical-write-integer" -version = "0.8.5" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1b6f3d1f4422866b68192d62f77bc5c700bee84f3069f2469d7bc8c77852446" +checksum = "094060bd2a7c2ff3a16d5304a6ae82727cb3cc9d1c70f813cc73f744c319337e" dependencies = [ "lexical-util", "static_assertions", @@ -2350,9 +2400,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.126" +version = "0.2.132" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "349d5a591cd28b49e1d1037471617a32ddcda5731b99419008085f72d5a53836" +checksum = "8371e4e5341c3a96db127eb2465ac681ced4c433e01dd0e938adbef26ba93ba5" [[package]] name = "libloading" @@ -2452,7 +2502,7 @@ version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e999beba7b6e8345721bd280141ed958096a2e4abdf74f67ff4ce49b4b54e47a" dependencies = [ - "hashbrown", + "hashbrown 0.12.1", ] [[package]] @@ -2477,9 +2527,9 @@ dependencies = [ [[package]] name = "lz4_flex" -version = "0.9.3" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74141c8af4bb8136dafb5705826bdd9dce823021db897c1129191804140ddf84" +checksum = "c038063f7a78126c539d666a0323a2032de5e7366012cd14a6eafc5ba290bbd6" dependencies = [ "twox-hash", ] @@ -2625,7 +2675,7 @@ checksum = "f7d24dc2dbae22bff6f1f9326ffce828c9f07ef9cc1e8002e5279f845432a30a" dependencies = [ "crossbeam-epoch", "crossbeam-utils", - "hashbrown", + "hashbrown 0.12.1", "metrics 0.20.1", "num_cpus", "parking_lot 0.12.0", @@ -3033,9 +3083,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.13.0" +version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18a6dbe30758c9f83eb00cbea4ac95966305f5a7772f3f42ebfc7fc7eddbd8e1" +checksum = "2f7254b99e31cad77da24b08ebf628882739a608578bb1bcdfc1f9c21260d7c0" [[package]] name = "oorandom" @@ -3219,7 +3269,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ccd746e37177e1711c20dd619a1620f34f5c8b569c53590a72dedd5344d8924a" dependencies = [ "dlv-list", - "hashbrown", + "hashbrown 0.12.1", ] [[package]] @@ -3524,10 +3574,11 @@ dependencies = [ [[package]] name = "proc-macro-crate" -version = "1.1.3" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e17d47ce914bf4de440332250b0edd23ce48c005f59fab39d3335866b114f11a" +checksum = "eda0fc3b0fb7c975631757e14d9049da17374063edb6ebbcbc54d880d4fe94e9" dependencies = [ + "once_cell", "thiserror", "toml", ] @@ -3826,9 +3877,9 @@ dependencies = [ [[package]] name = "raw-cpuid" -version = "10.4.0" +version = "10.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c49596760fce12ca21550ac21dc5a9617b2ea4b6e0aa7d8dab8ff2824fc2bba" +checksum = "738bc47119e3eeccc7e94c4a506901aea5e7b4944ecd0829cbebf4af04ceda12" dependencies = [ "bitflags", ] @@ -3894,9 +3945,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.6.0" +version = "1.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c4eb3267174b8c6c2f654116623910a0fef09c4753f8dd83db29c48a0df988b" +checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286" dependencies = [ "aho-corasick", "memchr", @@ -3914,9 +3965,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.6.27" +version = "0.6.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3f87b73ce11b1619a3c6332f45341e0047173771e8b8b73f87bfeefb7b56244" +checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" [[package]] name = "remove_dir_all" @@ -3995,18 +4046,18 @@ dependencies = [ [[package]] name = "result-like" -version = "0.4.3" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f95d927de9fa384eaf3e5b10e86065dd0a8a272b61cede64ffe7e83d2827073c" +checksum = "7b80fe0296795a96913be20558326b797a187bb3986ce84ed82dee0fb7414428" dependencies = [ "result-like-derive", ] [[package]] name = "result-like-derive" -version = "0.4.3" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6dac91550a14a4b4ec485260b40d83b25059130f564d7f598604e0c7b1a8b9e6" +checksum = "2a29c8a4ac7839f1dcb8b899263b501e0d6932f210300c8a0d271323727b35c1" dependencies = [ "pmutil", "proc-macro2", @@ -4091,9 +4142,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.35.7" +version = "0.35.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d51cc38aa10f6bbb377ed28197aa052aa4e2b762c22be9d3153d01822587e787" +checksum = "72c825b8aa8010eb9ee99b75f05e10180b9278d161583034d7574c9d617aeada" dependencies = [ "bitflags", "errno", @@ -4285,7 +4336,7 @@ dependencies = [ "sre-engine", "static_assertions", "strum 0.24.1", - "strum_macros 0.24.2", + "strum_macros 0.24.3", "thiserror", "thread_local", "timsort", @@ -4689,6 +4740,7 @@ name = "sql" version = "0.1.0" dependencies = [ "common-error", + "datatypes", "snafu", "sqlparser", "table-engine", @@ -4875,9 +4927,9 @@ dependencies = [ [[package]] name = "strum_macros" -version = "0.24.2" +version = "0.24.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4faebde00e8ff94316c01800f9054fd2ba77d30d9e922541913051d1d978918b" +checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59" dependencies = [ "heck 0.4.0", "proc-macro2", @@ -5241,9 +5293,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.9" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df54d54117d6fdc4e4fea40fe1e4e566b3505700e148a6827e59b34b0d2600d9" +checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" dependencies = [ "futures-core", "pin-project-lite", @@ -5494,11 +5546,11 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.29" +version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5aeea4303076558a00714b823f9ad67d58a3bbda1df83d8827d21193156e22f7" +checksum = "f54c8ca710e81886d498c2fd3331b56c93aa248d49de2222ad2742247c60072f" dependencies = [ - "once_cell", + "lazy_static", "valuable", ] @@ -5569,7 +5621,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ "cfg-if", - "rand 0.4.6", + "rand 0.8.5", "static_assertions", ] @@ -5722,9 +5774,9 @@ checksum = "957e51f3646910546462e67d5f7599b9e4fb8acdd304b087a6494730f9eebf04" [[package]] name = "unicode_names2" -version = "0.5.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eec8e807a365e5c972debc47b8f06d361b37b94cfd18d48f7adc715fb86404dd" +checksum = "029df4cc8238cefc911704ff8fa210853a0f3bce2694d8f51181dd41ee0f3301" [[package]] name = "untrusted" diff --git a/Cargo.toml b/Cargo.toml index 8baa0c6bba..d7a537318c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ members = [ "src/cmd", "src/datanode", "src/datatypes", + "src/frontend", "src/log-store", "src/logical-plans", "src/object-store", diff --git a/README.md b/README.md index 73eb5fdd92..ee5a97cfb4 100644 --- a/README.md +++ b/README.md @@ -80,6 +80,33 @@ docker run -p 3000:3000 \ greptimedb ``` +### Start Frontend + +Frontend should connect to Datanode, so **Datanode must have been started** at first! + +``` +// Connects to local Datanode at its default GRPC port: 3001 + +// Start Frontend with default options. +cargo run -- frontend start + +OR + +// Start Frontend with `mysql-addr` option. +cargo run -- frontend start --mysql-addr=0.0.0.0:9999 + +OR + +// Start datanode with `log-dir` and `log-level` options. +cargo run -- --log-dir=logs --log-level=debug frontend start +``` + +Start datanode with config file: + +``` +cargo run -- --log-dir=logs --log-level=debug frontend start -c ./config/frontend.example.toml +``` + ### SQL Operations 1. Connecting DB by [mysql client](https://dev.mysql.com/downloads/mysql/): diff --git a/config/frontend.example.toml b/config/frontend.example.toml new file mode 100644 index 0000000000..d8a9fb2b33 --- /dev/null +++ b/config/frontend.example.toml @@ -0,0 +1,4 @@ +http_addr = '0.0.0.0:4000' +grpc_addr = '0.0.0.0:4001' +mysql_addr = '0.0.0.0:4003' +mysql_runtime_size = 4 diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 5931652f80..a9264c1e5f 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -6,7 +6,9 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +datatypes = { path = "../datatypes" } prost = "0.11" +snafu = { version = "0.7", features = ["backtraces"] } tonic = "0.8" [build-dependencies] diff --git a/src/api/greptime/v1/column.proto b/src/api/greptime/v1/column.proto index 6454b9303b..76c100387b 100644 --- a/src/api/greptime/v1/column.proto +++ b/src/api/greptime/v1/column.proto @@ -29,6 +29,10 @@ message Column { repeated bool bool_values = 11; repeated bytes binary_values = 12; repeated string string_values = 13; + + repeated int32 date_values = 14; + repeated int64 datetime_values = 15; + repeated int64 ts_millis_values = 16; } // The array of non-null values in this column. // @@ -43,6 +47,9 @@ message Column { // Mask maps the positions of null values. // If a bit in null_mask is 1, it indicates that the column value at that position is null. bytes null_mask = 4; + + // Helpful in creating vector from column. + ColumnDataType datatype = 5; } message ColumnDef { diff --git a/src/api/greptime/v1/database.proto b/src/api/greptime/v1/database.proto index b9202d975d..1d98b4fc82 100644 --- a/src/api/greptime/v1/database.proto +++ b/src/api/greptime/v1/database.proto @@ -38,7 +38,23 @@ message PhysicalPlan { message InsertExpr { string table_name = 1; - repeated bytes values = 2; + + message Values { + repeated bytes values = 1; + } + + oneof expr { + Values values = 2; + + // TODO(LFC): Remove field "sql" in InsertExpr. + // When Frontend instance received an insertion SQL (`insert into ...`), it's anticipated to parse the SQL and + // assemble the values to insert to feed Datanode. In other words, inserting data through Datanode instance's GRPC + // interface shouldn't use SQL directly. + // Then why the "sql" field exists here? It's because the Frontend needs table schema to create the values to insert, + // which is currently not able to find anywhere. (Maybe the table schema is suppose to be fetched from Meta?) + // The "sql" field is meant to be removed in the future. + string sql = 3; + } } // TODO(jiachun) diff --git a/src/api/src/error.rs b/src/api/src/error.rs new file mode 100644 index 0000000000..9b97352626 --- /dev/null +++ b/src/api/src/error.rs @@ -0,0 +1,18 @@ +use datatypes::prelude::ConcreteDataType; +use snafu::prelude::*; +use snafu::Backtrace; + +pub type Result = std::result::Result; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Unknown proto column datatype: {}", datatype))] + UnknownColumnDataType { datatype: i32, backtrace: Backtrace }, + + #[snafu(display("Failed to create column datatype from {:?}", from))] + IntoColumnDataType { + from: ConcreteDataType, + backtrace: Backtrace, + }, +} diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs new file mode 100644 index 0000000000..57a726c620 --- /dev/null +++ b/src/api/src/helper.rs @@ -0,0 +1,230 @@ +use datatypes::prelude::ConcreteDataType; +use snafu::prelude::*; + +use crate::error::{self, Result}; +use crate::v1::ColumnDataType; + +#[derive(Debug, PartialEq, Eq)] +pub struct ColumnDataTypeWrapper(ColumnDataType); + +impl ColumnDataTypeWrapper { + pub fn try_new(datatype: i32) -> Result { + let datatype = ColumnDataType::from_i32(datatype) + .context(error::UnknownColumnDataTypeSnafu { datatype })?; + Ok(Self(datatype)) + } + + pub fn datatype(&self) -> ColumnDataType { + self.0 + } +} + +impl From for ConcreteDataType { + fn from(datatype: ColumnDataTypeWrapper) -> Self { + match datatype.0 { + ColumnDataType::Boolean => ConcreteDataType::boolean_datatype(), + ColumnDataType::Int8 => ConcreteDataType::int8_datatype(), + ColumnDataType::Int16 => ConcreteDataType::int16_datatype(), + ColumnDataType::Int32 => ConcreteDataType::int32_datatype(), + ColumnDataType::Int64 => ConcreteDataType::int64_datatype(), + ColumnDataType::Uint8 => ConcreteDataType::uint8_datatype(), + ColumnDataType::Uint16 => ConcreteDataType::uint16_datatype(), + ColumnDataType::Uint32 => ConcreteDataType::uint32_datatype(), + ColumnDataType::Uint64 => ConcreteDataType::uint64_datatype(), + ColumnDataType::Float32 => ConcreteDataType::float32_datatype(), + ColumnDataType::Float64 => ConcreteDataType::float64_datatype(), + ColumnDataType::Binary => ConcreteDataType::binary_datatype(), + ColumnDataType::String => ConcreteDataType::string_datatype(), + ColumnDataType::Date => ConcreteDataType::date_datatype(), + ColumnDataType::Datetime => ConcreteDataType::datetime_datatype(), + ColumnDataType::Timestamp => ConcreteDataType::timestamp_millis_datatype(), + } + } +} + +impl TryFrom for ColumnDataTypeWrapper { + type Error = error::Error; + + fn try_from(datatype: ConcreteDataType) -> Result { + let datatype = ColumnDataTypeWrapper(match datatype { + ConcreteDataType::Boolean(_) => ColumnDataType::Boolean, + ConcreteDataType::Int8(_) => ColumnDataType::Int8, + ConcreteDataType::Int16(_) => ColumnDataType::Int16, + ConcreteDataType::Int32(_) => ColumnDataType::Int32, + ConcreteDataType::Int64(_) => ColumnDataType::Int64, + ConcreteDataType::UInt8(_) => ColumnDataType::Uint8, + ConcreteDataType::UInt16(_) => ColumnDataType::Uint16, + ConcreteDataType::UInt32(_) => ColumnDataType::Uint32, + ConcreteDataType::UInt64(_) => ColumnDataType::Uint64, + ConcreteDataType::Float32(_) => ColumnDataType::Float32, + ConcreteDataType::Float64(_) => ColumnDataType::Float64, + ConcreteDataType::Binary(_) => ColumnDataType::Binary, + ConcreteDataType::String(_) => ColumnDataType::String, + ConcreteDataType::Date(_) => ColumnDataType::Date, + ConcreteDataType::DateTime(_) => ColumnDataType::Datetime, + ConcreteDataType::Timestamp(_) => ColumnDataType::Timestamp, + ConcreteDataType::Null(_) | ConcreteDataType::List(_) => { + return error::IntoColumnDataTypeSnafu { from: datatype }.fail() + } + }); + Ok(datatype) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_concrete_datatype_from_column_datatype() { + assert_eq!( + ConcreteDataType::boolean_datatype(), + ColumnDataTypeWrapper(ColumnDataType::Boolean).into() + ); + assert_eq!( + ConcreteDataType::int8_datatype(), + ColumnDataTypeWrapper(ColumnDataType::Int8).into() + ); + assert_eq!( + ConcreteDataType::int16_datatype(), + ColumnDataTypeWrapper(ColumnDataType::Int16).into() + ); + assert_eq!( + ConcreteDataType::int32_datatype(), + ColumnDataTypeWrapper(ColumnDataType::Int32).into() + ); + assert_eq!( + ConcreteDataType::int64_datatype(), + ColumnDataTypeWrapper(ColumnDataType::Int64).into() + ); + assert_eq!( + ConcreteDataType::uint8_datatype(), + ColumnDataTypeWrapper(ColumnDataType::Uint8).into() + ); + assert_eq!( + ConcreteDataType::uint16_datatype(), + ColumnDataTypeWrapper(ColumnDataType::Uint16).into() + ); + assert_eq!( + ConcreteDataType::uint32_datatype(), + ColumnDataTypeWrapper(ColumnDataType::Uint32).into() + ); + assert_eq!( + ConcreteDataType::uint64_datatype(), + ColumnDataTypeWrapper(ColumnDataType::Uint64).into() + ); + assert_eq!( + ConcreteDataType::float32_datatype(), + ColumnDataTypeWrapper(ColumnDataType::Float32).into() + ); + assert_eq!( + ConcreteDataType::float64_datatype(), + ColumnDataTypeWrapper(ColumnDataType::Float64).into() + ); + assert_eq!( + ConcreteDataType::binary_datatype(), + ColumnDataTypeWrapper(ColumnDataType::Binary).into() + ); + assert_eq!( + ConcreteDataType::string_datatype(), + ColumnDataTypeWrapper(ColumnDataType::String).into() + ); + assert_eq!( + ConcreteDataType::date_datatype(), + ColumnDataTypeWrapper(ColumnDataType::Date).into() + ); + assert_eq!( + ConcreteDataType::datetime_datatype(), + ColumnDataTypeWrapper(ColumnDataType::Datetime).into() + ); + assert_eq!( + ConcreteDataType::timestamp_millis_datatype(), + ColumnDataTypeWrapper(ColumnDataType::Timestamp).into() + ); + } + + #[test] + fn test_column_datatype_from_concrete_datatype() { + assert_eq!( + ColumnDataTypeWrapper(ColumnDataType::Boolean), + ConcreteDataType::boolean_datatype().try_into().unwrap() + ); + assert_eq!( + ColumnDataTypeWrapper(ColumnDataType::Int8), + ConcreteDataType::int8_datatype().try_into().unwrap() + ); + assert_eq!( + ColumnDataTypeWrapper(ColumnDataType::Int16), + ConcreteDataType::int16_datatype().try_into().unwrap() + ); + assert_eq!( + ColumnDataTypeWrapper(ColumnDataType::Int32), + ConcreteDataType::int32_datatype().try_into().unwrap() + ); + assert_eq!( + ColumnDataTypeWrapper(ColumnDataType::Int64), + ConcreteDataType::int64_datatype().try_into().unwrap() + ); + assert_eq!( + ColumnDataTypeWrapper(ColumnDataType::Uint8), + ConcreteDataType::uint8_datatype().try_into().unwrap() + ); + assert_eq!( + ColumnDataTypeWrapper(ColumnDataType::Uint16), + ConcreteDataType::uint16_datatype().try_into().unwrap() + ); + assert_eq!( + ColumnDataTypeWrapper(ColumnDataType::Uint32), + ConcreteDataType::uint32_datatype().try_into().unwrap() + ); + assert_eq!( + ColumnDataTypeWrapper(ColumnDataType::Uint64), + ConcreteDataType::uint64_datatype().try_into().unwrap() + ); + assert_eq!( + ColumnDataTypeWrapper(ColumnDataType::Float32), + ConcreteDataType::float32_datatype().try_into().unwrap() + ); + assert_eq!( + ColumnDataTypeWrapper(ColumnDataType::Float64), + ConcreteDataType::float64_datatype().try_into().unwrap() + ); + assert_eq!( + ColumnDataTypeWrapper(ColumnDataType::Binary), + ConcreteDataType::binary_datatype().try_into().unwrap() + ); + assert_eq!( + ColumnDataTypeWrapper(ColumnDataType::String), + ConcreteDataType::string_datatype().try_into().unwrap() + ); + assert_eq!( + ColumnDataTypeWrapper(ColumnDataType::Date), + ConcreteDataType::date_datatype().try_into().unwrap() + ); + assert_eq!( + ColumnDataTypeWrapper(ColumnDataType::Datetime), + ConcreteDataType::datetime_datatype().try_into().unwrap() + ); + assert_eq!( + ColumnDataTypeWrapper(ColumnDataType::Timestamp), + ConcreteDataType::timestamp_millis_datatype() + .try_into() + .unwrap() + ); + + let result: Result = ConcreteDataType::null_datatype().try_into(); + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "Failed to create column datatype from Null(NullType)" + ); + + let result: Result = + ConcreteDataType::list_datatype(ConcreteDataType::boolean_datatype()).try_into(); + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "Failed to create column datatype from List(ListType { inner: Boolean(BooleanType) })" + ); + } +} diff --git a/src/api/src/lib.rs b/src/api/src/lib.rs index ef132c15fe..51614463aa 100644 --- a/src/api/src/lib.rs +++ b/src/api/src/lib.rs @@ -1,3 +1,5 @@ +pub mod error; +pub mod helper; pub mod serde; pub mod v1; diff --git a/src/api/src/serde.rs b/src/api/src/serde.rs index 1f8e540ed1..9e884f4515 100644 --- a/src/api/src/serde.rs +++ b/src/api/src/serde.rs @@ -138,6 +138,7 @@ mod tests { semantic_type: SEMANTIC_TAG, values: Some(values), null_mask, + ..Default::default() }; InsertBatch { columns: vec![column], @@ -156,6 +157,7 @@ mod tests { semantic_type: SEMANTIC_TAG, values: Some(values), null_mask, + ..Default::default() }; SelectResult { columns: vec![column], diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 4b667110aa..041e60f03a 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -17,7 +17,7 @@ mod manager; pub mod memory; pub mod schema; mod system; -mod tables; +pub mod tables; /// Represent a list of named catalogs pub trait CatalogList: Sync + Send { diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index b5f328f710..419b472db2 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -7,13 +7,21 @@ edition = "2021" [dependencies] api = { path = "../api" } +async-stream = "0.3" +catalog = { path = "../catalog" } +common-base = { path = "../common/base" } common-error = { path = "../common/error" } common-grpc = { path = "../common/grpc" } +common-recordbatch = { path = "../common/recordbatch" } +common-time = { path = "../common/time" } datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] } +datatypes = { path = "../datatypes" } +query = { path = "../query" } snafu = { version = "0.7", features = ["backtraces"] } tonic = "0.8" [dev-dependencies] +datanode = { path = "../datanode" } tokio = { version = "1.0", features = ["full"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/src/client/examples/insert.rs b/src/client/examples/insert.rs index 560bd1f08f..2b2e812a22 100644 --- a/src/client/examples/insert.rs +++ b/src/client/examples/insert.rs @@ -13,7 +13,13 @@ async fn run() { let client = Client::connect("http://127.0.0.1:3001").await.unwrap(); let db = Database::new("greptime", client); - db.insert("demo", insert_batches()).await.unwrap(); + let expr = InsertExpr { + table_name: "demo".to_string(), + expr: Some(insert_expr::Expr::Values(insert_expr::Values { + values: insert_batches(), + })), + }; + db.insert(expr).await.unwrap(); } fn insert_batches() -> Vec> { @@ -37,6 +43,7 @@ fn insert_batches() -> Vec> { semantic_type: SEMANTIC_TAG, values: Some(host_vals), null_mask: vec![0], + ..Default::default() }; let cpu_vals = column::Values { @@ -48,6 +55,7 @@ fn insert_batches() -> Vec> { semantic_type: SEMANTIC_FEILD, values: Some(cpu_vals), null_mask: vec![2], + ..Default::default() }; let mem_vals = column::Values { @@ -59,6 +67,7 @@ fn insert_batches() -> Vec> { semantic_type: SEMANTIC_FEILD, values: Some(mem_vals), null_mask: vec![4], + ..Default::default() }; let ts_vals = column::Values { @@ -70,6 +79,7 @@ fn insert_batches() -> Vec> { semantic_type: SEMANTIC_TS, values: Some(ts_vals), null_mask: vec![0], + ..Default::default() }; let insert_batch = InsertBatch { diff --git a/src/client/src/admin.rs b/src/client/src/admin.rs index fa4c3146b7..8608e692bb 100644 --- a/src/client/src/admin.rs +++ b/src/client/src/admin.rs @@ -1,4 +1,6 @@ use api::v1::*; +use common_error::prelude::StatusCode; +use query::Output; use snafu::prelude::*; use crate::database::PROTOCOL_VERSION; @@ -20,6 +22,10 @@ impl Admin { } } + pub async fn start(&mut self, url: impl Into) -> Result<()> { + self.client.start(url).await + } + pub async fn create(&self, expr: CreateExpr) -> Result { let header = ExprHeader { version: PROTOCOL_VERSION, @@ -28,8 +34,12 @@ impl Admin { header: Some(header), expr: Some(admin_expr::Expr::Create(expr)), }; - // `remove(0)` is safe because of `do_request`'s invariants. - Ok(self.do_request(vec![expr]).await?.remove(0)) + self.do_request(expr).await + } + + pub async fn do_request(&self, expr: AdminExpr) -> Result { + // `remove(0)` is safe because of `do_requests`'s invariants. + Ok(self.do_requests(vec![expr]).await?.remove(0)) } pub async fn alter(&self, expr: AlterExpr) -> Result { @@ -40,11 +50,11 @@ impl Admin { header: Some(header), expr: Some(admin_expr::Expr::Alter(expr)), }; - Ok(self.do_request(vec![expr]).await?.remove(0)) + Ok(self.do_requests(vec![expr]).await?.remove(0)) } /// Invariants: the lengths of input vec (`Vec`) and output vec (`Vec`) are equal. - async fn do_request(&self, exprs: Vec) -> Result> { + async fn do_requests(&self, exprs: Vec) -> Result> { let expr_count = exprs.len(); let req = AdminRequest { name: self.name.clone(), @@ -65,3 +75,32 @@ impl Admin { Ok(results) } } + +pub fn admin_result_to_output(admin_result: AdminResult) -> Result { + let header = admin_result.header.context(error::MissingHeaderSnafu)?; + if !StatusCode::is_success(header.code) { + return error::DatanodeSnafu { + code: header.code, + msg: header.err_msg, + } + .fail(); + } + + let result = admin_result.result.context(error::MissingResultSnafu { + name: "result".to_string(), + expected: 1_usize, + actual: 0_usize, + })?; + let output = match result { + admin_result::Result::Mutate(mutate) => { + if mutate.failure != 0 { + return error::MutateFailureSnafu { + failure: mutate.failure, + } + .fail(); + } + Output::AffectedRows(mutate.success as usize) + } + }; + Ok(output) +} diff --git a/src/client/src/client.rs b/src/client/src/client.rs index bd46a0add0..d9a55b92f8 100644 --- a/src/client/src/client.rs +++ b/src/client/src/client.rs @@ -5,18 +5,43 @@ use tonic::transport::Channel; use crate::error; use crate::Result; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub struct Client { - client: GreptimeClient, + client: Option>, } impl Client { + pub async fn start(&mut self, url: impl Into) -> Result<()> { + match self.client.as_ref() { + None => { + let url = url.into(); + let client = GreptimeClient::connect(url.clone()) + .await + .context(error::ConnectFailedSnafu { url })?; + self.client = Some(client); + Ok(()) + } + Some(_) => error::IllegalGrpcClientStateSnafu { + err_msg: "already started", + } + .fail(), + } + } + + pub fn with_client(client: GreptimeClient) -> Self { + Self { + client: Some(client), + } + } + pub async fn connect(url: impl Into) -> Result { let url = url.into(); let client = GreptimeClient::connect(url.clone()) .await .context(error::ConnectFailedSnafu { url })?; - Ok(Self { client }) + Ok(Self { + client: Some(client), + }) } pub async fn admin(&self, req: AdminRequest) -> Result { @@ -48,12 +73,18 @@ impl Client { } pub async fn batch(&self, req: BatchRequest) -> Result { - let res = self - .client - .clone() - .batch(req) - .await - .context(error::TonicStatusSnafu)?; - Ok(res.into_inner()) + if let Some(client) = self.client.as_ref() { + let res = client + .clone() + .batch(req) + .await + .context(error::TonicStatusSnafu)?; + Ok(res.into_inner()) + } else { + error::IllegalGrpcClientStateSnafu { + err_msg: "not started", + } + .fail() + } } } diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 647866e4ed..12ffd127be 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -1,15 +1,24 @@ use std::sync::Arc; +use api::helper::ColumnDataTypeWrapper; use api::v1::codec::SelectResult as GrpcSelectResult; use api::v1::{ - object_expr, object_result, select_expr, DatabaseRequest, ExprHeader, InsertExpr, - MutateResult as GrpcMutateResult, ObjectExpr, ObjectResult as GrpcObjectResult, PhysicalPlan, - SelectExpr, + column::Values, object_expr, object_result, select_expr, Column, ColumnDataType, + DatabaseRequest, ExprHeader, InsertExpr, MutateResult as GrpcMutateResult, ObjectExpr, + ObjectResult as GrpcObjectResult, PhysicalPlan, SelectExpr, }; +use common_base::BitVec; use common_error::status_code::StatusCode; use common_grpc::AsExcutionPlan; use common_grpc::DefaultAsPlanImpl; +use common_recordbatch::{RecordBatch, RecordBatches}; +use common_time::date::Date; +use common_time::datetime::DateTime; +use common_time::timestamp::Timestamp; use datafusion::physical_plan::ExecutionPlan; +use datatypes::prelude::*; +use datatypes::schema::{ColumnSchema, Schema}; +use query::Output; use snafu::{ensure, OptionExt, ResultExt}; use crate::error; @@ -19,8 +28,6 @@ use crate::{ pub const PROTOCOL_VERSION: u32 = 1; -pub type Bytes = Vec; - #[derive(Clone, Debug)] pub struct Database { name: String, @@ -35,26 +42,23 @@ impl Database { } } + pub async fn start(&mut self, url: impl Into) -> Result<()> { + self.client.start(url).await + } + pub fn name(&self) -> &str { &self.name } - pub async fn insert(&self, table: impl Into, values: Vec) -> Result<()> { + pub async fn insert(&self, insert: InsertExpr) -> Result { let header = ExprHeader { version: PROTOCOL_VERSION, }; - let insert = InsertExpr { - table_name: table.into(), - values, - }; let expr = ObjectExpr { header: Some(header), expr: Some(object_expr::Expr::Insert(insert)), }; - - self.object(expr).await?; - - Ok(()) + self.object(expr).await?.try_into() } pub async fn select(&self, expr: Select) -> Result { @@ -100,7 +104,7 @@ impl Database { // TODO(jiachun) update/delete - async fn object(&self, expr: ObjectExpr) -> Result { + pub async fn object(&self, expr: ObjectExpr) -> Result { let res = self.objects(vec![expr]).await?.pop().unwrap(); Ok(res) } @@ -165,3 +169,234 @@ impl TryFrom for ObjectResult { pub enum Select { Sql(String), } + +impl TryFrom for Output { + type Error = error::Error; + + fn try_from(value: ObjectResult) -> Result { + let output = match value { + ObjectResult::Select(select) => { + let vectors = select + .columns + .iter() + .map(|column| column_to_vector(column, select.row_count)) + .collect::>>()?; + + let column_schemas = select + .columns + .iter() + .zip(vectors.iter()) + .map(|(column, vector)| { + let datatype = vector.data_type(); + // nullable or not, does not affect the output + ColumnSchema::new(&column.column_name, datatype, true) + }) + .collect::>(); + + let schema = Arc::new(Schema::new(column_schemas)); + let recordbatches = RecordBatch::new(schema, vectors) + .and_then(|batch| RecordBatches::try_new(batch.schema.clone(), vec![batch])) + .context(error::CreateRecordBatchesSnafu)?; + Output::RecordBatches(recordbatches) + } + ObjectResult::Mutate(mutate) => { + if mutate.failure != 0 { + return error::MutateFailureSnafu { + failure: mutate.failure, + } + .fail(); + } + Output::AffectedRows(mutate.success as usize) + } + }; + Ok(output) + } +} + +fn column_to_vector(column: &Column, rows: u32) -> Result { + let wrapper = + ColumnDataTypeWrapper::try_new(column.datatype).context(error::ColumnDataTypeSnafu)?; + let column_datatype = wrapper.datatype(); + + let rows = rows as usize; + let mut vector = VectorBuilder::with_capacity(wrapper.into(), rows); + + if let Some(values) = &column.values { + let values = collect_column_values(column_datatype, values); + let mut values_iter = values.into_iter(); + + let null_mask = BitVec::from_slice(&column.null_mask); + let mut nulls_iter = null_mask.iter().by_vals().fuse(); + + for i in 0..rows { + if let Some(true) = nulls_iter.next() { + vector.push_null(); + } else { + let value_ref = values_iter.next().context(error::InvalidColumnProtoSnafu { + err_msg: format!( + "value not found at position {} of column {}", + i, &column.column_name + ), + })?; + vector + .try_push_ref(value_ref) + .context(error::CreateVectorSnafu)?; + } + } + } else { + (0..rows).for_each(|_| vector.push_null()); + } + Ok(vector.finish()) +} + +fn collect_column_values(column_datatype: ColumnDataType, values: &Values) -> Vec { + macro_rules! collect_values { + ($value: expr, $mapper: expr) => { + $value.iter().map($mapper).collect::>() + }; + } + + match column_datatype { + ColumnDataType::Boolean => collect_values!(values.bool_values, |v| ValueRef::from(*v)), + ColumnDataType::Int8 => collect_values!(values.i8_values, |v| ValueRef::from(*v as i8)), + ColumnDataType::Int16 => { + collect_values!(values.i16_values, |v| ValueRef::from(*v as i16)) + } + ColumnDataType::Int32 => { + collect_values!(values.i32_values, |v| ValueRef::from(*v)) + } + ColumnDataType::Int64 => { + collect_values!(values.i64_values, |v| ValueRef::from(*v as i64)) + } + ColumnDataType::Uint8 => { + collect_values!(values.u8_values, |v| ValueRef::from(*v as u8)) + } + ColumnDataType::Uint16 => { + collect_values!(values.u16_values, |v| ValueRef::from(*v as u16)) + } + ColumnDataType::Uint32 => { + collect_values!(values.u32_values, |v| ValueRef::from(*v)) + } + ColumnDataType::Uint64 => { + collect_values!(values.u64_values, |v| ValueRef::from(*v as u64)) + } + ColumnDataType::Float32 => collect_values!(values.f32_values, |v| ValueRef::from(*v)), + ColumnDataType::Float64 => collect_values!(values.f64_values, |v| ValueRef::from(*v)), + ColumnDataType::Binary => { + collect_values!(values.binary_values, |v| ValueRef::from(v.as_slice())) + } + ColumnDataType::String => { + collect_values!(values.string_values, |v| ValueRef::from(v.as_str())) + } + ColumnDataType::Date => { + collect_values!(values.date_values, |v| ValueRef::Date(Date::new(*v))) + } + ColumnDataType::Datetime => { + collect_values!(values.datetime_values, |v| ValueRef::DateTime( + DateTime::new(*v) + )) + } + ColumnDataType::Timestamp => { + collect_values!(values.ts_millis_values, |v| ValueRef::Timestamp( + Timestamp::from_millis(*v) + )) + } + } +} + +#[cfg(test)] +mod tests { + use datanode::server::grpc::select::{null_mask, values}; + use datatypes::vectors::{ + BinaryVector, BooleanVector, DateTimeVector, DateVector, Float32Vector, Float64Vector, + Int16Vector, Int32Vector, Int64Vector, Int8Vector, StringVector, UInt16Vector, + UInt32Vector, UInt64Vector, UInt8Vector, + }; + + use super::*; + + #[test] + fn test_column_to_vector() { + let mut column = create_test_column(Arc::new(BooleanVector::from(vec![true]))); + column.datatype = -100; + let result = column_to_vector(&column, 1); + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "Column datatype error, source: Unknown proto column datatype: -100" + ); + + macro_rules! test_with_vector { + ($vector: expr) => { + let vector = Arc::new($vector); + let column = create_test_column(vector.clone()); + let result = column_to_vector(&column, vector.len() as u32).unwrap(); + assert_eq!(result, vector as VectorRef); + }; + } + + test_with_vector!(BooleanVector::from(vec![Some(true), None, Some(false)])); + test_with_vector!(Int8Vector::from(vec![Some(i8::MIN), None, Some(i8::MAX)])); + test_with_vector!(Int16Vector::from(vec![ + Some(i16::MIN), + None, + Some(i16::MAX) + ])); + test_with_vector!(Int32Vector::from(vec![ + Some(i32::MIN), + None, + Some(i32::MAX) + ])); + test_with_vector!(Int64Vector::from(vec![ + Some(i64::MIN), + None, + Some(i64::MAX) + ])); + test_with_vector!(UInt8Vector::from(vec![Some(u8::MIN), None, Some(u8::MAX)])); + test_with_vector!(UInt16Vector::from(vec![ + Some(u16::MIN), + None, + Some(u16::MAX) + ])); + test_with_vector!(UInt32Vector::from(vec![ + Some(u32::MIN), + None, + Some(u32::MAX) + ])); + test_with_vector!(UInt64Vector::from(vec![ + Some(u64::MIN), + None, + Some(u64::MAX) + ])); + test_with_vector!(Float32Vector::from(vec![ + Some(f32::MIN), + None, + Some(f32::MAX) + ])); + test_with_vector!(Float64Vector::from(vec![ + Some(f64::MIN), + None, + Some(f64::MAX) + ])); + test_with_vector!(BinaryVector::from(vec![ + Some(b"".to_vec()), + None, + Some(b"hello".to_vec()) + ])); + test_with_vector!(StringVector::from(vec![Some(""), None, Some("foo"),])); + test_with_vector!(DateVector::from(vec![Some(1), None, Some(3)])); + test_with_vector!(DateTimeVector::from(vec![Some(4), None, Some(6)])); + } + + fn create_test_column(vector: VectorRef) -> Column { + let wrapper: ColumnDataTypeWrapper = vector.data_type().try_into().unwrap(); + let array = vector.to_arrow_array(); + Column { + column_name: "test".to_string(), + semantic_type: 1, + values: Some(values(&[array.clone()]).unwrap()), + null_mask: null_mask(&vec![array], vector.len()), + datatype: wrapper.datatype() as i32, + } + } +} diff --git a/src/client/src/error.rs b/src/client/src/error.rs index e588ff844b..7528fbdf69 100644 --- a/src/client/src/error.rs +++ b/src/client/src/error.rs @@ -1,3 +1,4 @@ +use std::any::Any; use std::sync::Arc; use api::serde::DecodeError; @@ -42,6 +43,67 @@ pub enum Error { #[snafu(backtrace)] source: common_grpc::Error, }, + + #[snafu(display("Mutate result has failure {}", failure))] + MutateFailure { failure: u32, backtrace: Backtrace }, + + #[snafu(display("Invalid column proto: {}", err_msg))] + InvalidColumnProto { + err_msg: String, + backtrace: Backtrace, + }, + + #[snafu(display("Column datatype error, source: {}", source))] + ColumnDataType { + #[snafu(backtrace)] + source: api::error::Error, + }, + + #[snafu(display("Failed to create vector, source: {}", source))] + CreateVector { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + + #[snafu(display("Failed to create RecordBatches, source: {}", source))] + CreateRecordBatches { + #[snafu(backtrace)] + source: common_recordbatch::error::Error, + }, + + #[snafu(display("Illegal GRPC client state: {}", err_msg))] + IllegalGrpcClientState { + err_msg: String, + backtrace: Backtrace, + }, } pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + match self { + Error::ConnectFailed { .. } + | Error::MissingResult { .. } + | Error::MissingHeader { .. } + | Error::TonicStatus { .. } + | Error::DecodeSelect { .. } + | Error::Datanode { .. } + | Error::EncodePhysical { .. } + | Error::MutateFailure { .. } + | Error::InvalidColumnProto { .. } + | Error::ColumnDataType { .. } => StatusCode::Internal, + Error::CreateVector { source } => source.status_code(), + Error::CreateRecordBatches { source } => source.status_code(), + Error::IllegalGrpcClientState { .. } => StatusCode::Unexpected, + } + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 028f7e270a..a962b24148 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -12,6 +12,8 @@ clap = { version = "3.1", features = ["derive"] } common-error = { path = "../common/error" } common-telemetry = { path = "../common/telemetry", features = ["deadlock_detection"]} datanode = { path = "../datanode" } +frontend = { path = "../frontend" } +futures = "0.3" snafu = { version = "0.7", features = ["backtraces"] } tokio = { version = "1.18", features = ["full"] } toml = "0.5" diff --git a/src/cmd/src/bin/greptime.rs b/src/cmd/src/bin/greptime.rs index 54140306b7..3713f4ab4c 100644 --- a/src/cmd/src/bin/greptime.rs +++ b/src/cmd/src/bin/greptime.rs @@ -3,6 +3,7 @@ use std::fmt; use clap::Parser; use cmd::datanode; use cmd::error::Result; +use cmd::frontend; use common_telemetry::{self, logging::error, logging::info}; #[derive(Parser)] @@ -26,12 +27,15 @@ impl Command { enum SubCommand { #[clap(name = "datanode")] Datanode(datanode::Command), + #[clap(name = "frontend")] + Frontend(frontend::Command), } impl SubCommand { async fn run(self) -> Result<()> { match self { SubCommand::Datanode(cmd) => cmd.run().await, + SubCommand::Frontend(cmd) => cmd.run().await, } } } @@ -40,6 +44,7 @@ impl fmt::Display for SubCommand { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { SubCommand::Datanode(..) => write!(f, "greptime-datanode"), + SubCommand::Frontend(..) => write!(f, "greptime-frontend"), } } } diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index cccbafc2f5..9247a8abc2 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -11,6 +11,12 @@ pub enum Error { source: datanode::error::Error, }, + #[snafu(display("Failed to start frontend, source: {}", source))] + StartFrontend { + #[snafu(backtrace)] + source: frontend::error::Error, + }, + #[snafu(display("Failed to read config file: {}, source: {}", path, source))] ReadConfig { source: std::io::Error, @@ -27,6 +33,7 @@ impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { Error::StartDatanode { source } => source.status_code(), + Error::StartFrontend { source } => source.status_code(), Error::ReadConfig { .. } | Error::ParseConfig { .. } => StatusCode::InvalidArguments, } } diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs new file mode 100644 index 0000000000..b77294ce02 --- /dev/null +++ b/src/cmd/src/frontend.rs @@ -0,0 +1,97 @@ +use clap::Parser; +use frontend::frontend::{Frontend, FrontendOptions}; +use snafu::ResultExt; + +use crate::error::{self, Result}; +use crate::toml_loader; + +#[derive(Parser)] +pub struct Command { + #[clap(subcommand)] + subcmd: SubCommand, +} + +impl Command { + pub async fn run(self) -> Result<()> { + self.subcmd.run().await + } +} + +#[derive(Parser)] +enum SubCommand { + Start(StartCommand), +} + +impl SubCommand { + async fn run(self) -> Result<()> { + match self { + SubCommand::Start(cmd) => cmd.run().await, + } + } +} + +#[derive(Debug, Parser)] +struct StartCommand { + #[clap(long)] + http_addr: Option, + #[clap(long)] + grpc_addr: Option, + #[clap(long)] + mysql_addr: Option, + #[clap(short, long)] + config_file: Option, +} + +impl StartCommand { + async fn run(self) -> Result<()> { + let opts = self.try_into()?; + let mut frontend = Frontend::new(opts); + frontend.start().await.context(error::StartFrontendSnafu) + } +} + +impl TryFrom for FrontendOptions { + type Error = error::Error; + + fn try_from(cmd: StartCommand) -> Result { + let mut opts: FrontendOptions = if let Some(path) = cmd.config_file { + toml_loader::from_file!(&path)? + } else { + FrontendOptions::default() + }; + + if let Some(addr) = cmd.http_addr { + opts.http_addr = Some(addr); + } + if let Some(addr) = cmd.grpc_addr { + opts.grpc_addr = Some(addr); + } + if let Some(addr) = cmd.mysql_addr { + opts.mysql_addr = Some(addr); + } + Ok(opts) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_try_from_start_command() { + let command = StartCommand { + http_addr: Some("127.0.0.1:1234".to_string()), + grpc_addr: None, + mysql_addr: Some("127.0.0.1:5678".to_string()), + config_file: None, + }; + + let opts: FrontendOptions = command.try_into().unwrap(); + assert_eq!(opts.http_addr, Some("127.0.0.1:1234".to_string())); + assert_eq!(opts.mysql_addr, Some("127.0.0.1:5678".to_string())); + + let default_opts = FrontendOptions::default(); + assert_eq!(opts.grpc_addr, default_opts.grpc_addr); + assert_eq!(opts.mysql_runtime_size, default_opts.mysql_runtime_size); + } +} diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs index 192811c2e1..0bffa92fd1 100644 --- a/src/cmd/src/lib.rs +++ b/src/cmd/src/lib.rs @@ -1,3 +1,4 @@ pub mod datanode; pub mod error; +pub mod frontend; mod toml_loader; diff --git a/src/common/recordbatch/src/error.rs b/src/common/recordbatch/src/error.rs index c4e5d41836..597e3b6f63 100644 --- a/src/common/recordbatch/src/error.rs +++ b/src/common/recordbatch/src/error.rs @@ -27,13 +27,21 @@ pub enum InnerError { #[snafu(backtrace)] source: BoxedError, }, + + #[snafu(display("Failed to create RecordBatches, reason: {}", reason))] + CreateRecordBatches { + reason: String, + backtrace: Backtrace, + }, } impl ErrorExt for InnerError { fn status_code(&self) -> StatusCode { match self { InnerError::NewDfRecordBatch { .. } => StatusCode::InvalidArguments, - InnerError::DataTypes { .. } => StatusCode::Internal, + InnerError::DataTypes { .. } | InnerError::CreateRecordBatches { .. } => { + StatusCode::Internal + } InnerError::External { source } => source.status_code(), } } diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs index f3ae0209ff..e8d2918819 100644 --- a/src/common/recordbatch/src/lib.rs +++ b/src/common/recordbatch/src/lib.rs @@ -9,6 +9,7 @@ use error::Result; use futures::task::{Context, Poll}; use futures::Stream; pub use recordbatch::RecordBatch; +use snafu::ensure; pub trait RecordBatchStream: Stream> { fn schema(&self) -> SchemaRef; @@ -43,3 +44,76 @@ impl Stream for EmptyRecordBatchStream { Poll::Ready(None) } } + +#[derive(Debug)] +pub struct RecordBatches { + schema: SchemaRef, + batches: Vec, +} + +impl RecordBatches { + pub fn try_new(schema: SchemaRef, batches: Vec) -> Result { + for batch in batches.iter() { + ensure!( + batch.schema == schema, + error::CreateRecordBatchesSnafu { + reason: format!( + "expect RecordBatch schema equals {:?}, actual: {:?}", + schema, batch.schema + ) + } + ) + } + Ok(Self { schema, batches }) + } + + pub fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + pub fn to_vec(self) -> Vec { + self.batches + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use datatypes::prelude::{ConcreteDataType, VectorRef}; + use datatypes::schema::{ColumnSchema, Schema}; + use datatypes::vectors::{BooleanVector, Int32Vector, StringVector}; + + use super::*; + + #[test] + fn test_recordbatches() { + let column_a = ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false); + let column_b = ColumnSchema::new("b", ConcreteDataType::string_datatype(), false); + let column_c = ColumnSchema::new("c", ConcreteDataType::boolean_datatype(), false); + + let va: VectorRef = Arc::new(Int32Vector::from_slice(&[1, 2])); + let vb: VectorRef = Arc::new(StringVector::from(vec!["hello", "world"])); + let vc: VectorRef = Arc::new(BooleanVector::from(vec![true, false])); + + let schema1 = Arc::new(Schema::new(vec![column_a.clone(), column_b])); + let batch1 = RecordBatch::new(schema1.clone(), vec![va.clone(), vb]).unwrap(); + + let schema2 = Arc::new(Schema::new(vec![column_a, column_c])); + let batch2 = RecordBatch::new(schema2.clone(), vec![va, vc]).unwrap(); + + let result = RecordBatches::try_new(schema1.clone(), vec![batch1.clone(), batch2]); + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + format!( + "Failed to create RecordBatches, reason: expect RecordBatch schema equals {:?}, actual: {:?}", + schema1, schema2 + ) + ); + + let batches = RecordBatches::try_new(schema1.clone(), vec![batch1.clone()]).unwrap(); + assert_eq!(schema1, batches.schema()); + assert_eq!(vec![batch1], batches.to_vec()); + } +} diff --git a/src/common/telemetry/src/logging.rs b/src/common/telemetry/src/logging.rs index 2a90ed6669..8d128319d5 100644 --- a/src/common/telemetry/src/logging.rs +++ b/src/common/telemetry/src/logging.rs @@ -74,6 +74,7 @@ pub fn init_global_logging( .with_target("datafusion", Level::WARN) .with_target("reqwest", Level::WARN) .with_target("sqlparser", Level::WARN) + .with_target("h2", Level::INFO) .with_default( directives .parse::() diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 9589db0dfd..b6b9109e91 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -159,12 +159,6 @@ pub enum Error { source: datatypes::error::Error, }, - #[snafu(display("SQL data type not supported yet: {:?}", t))] - SqlTypeNotSupported { - t: sql::ast::DataType, - backtrace: Backtrace, - }, - #[snafu(display("Specified timestamp key or primary key column not found: {}", name))] KeyColumnNotFound { name: String, backtrace: Backtrace }, @@ -189,8 +183,17 @@ pub enum Error { source: common_grpc::Error, }, - #[snafu(display("Invalid ColumnDef in protobuf msg: {}", msg))] - InvalidColumnDef { msg: String, backtrace: Backtrace }, + #[snafu(display("Column datatype error, source: {}", source))] + ColumnDataType { + #[snafu(backtrace)] + source: api::error::Error, + }, + + #[snafu(display("Failed to parse SQL, source: {}", source))] + ParseSql { + #[snafu(backtrace)] + source: sql::error::Error, + }, #[snafu(display("Failed to start script manager, source: {}", source))] StartScriptManager { @@ -220,12 +223,10 @@ impl ErrorExt for Error { | Error::IllegalInsertData { .. } | Error::DecodeInsert { .. } | Error::InvalidSql { .. } - | Error::SqlTypeNotSupported { .. } | Error::CreateSchema { .. } | Error::KeyColumnNotFound { .. } | Error::MissingField { .. } - | Error::ConstraintNotSupported { .. } - | Error::InvalidColumnDef { .. } => StatusCode::InvalidArguments, + | Error::ConstraintNotSupported { .. } => StatusCode::InvalidArguments, // TODO(yingwen): Further categorize http error. Error::StartServer { .. } | Error::ParseAddr { .. } @@ -235,7 +236,9 @@ impl ErrorExt for Error { | Error::InsertSystemCatalog { .. } | Error::Conversion { .. } | Error::IntoPhysicalPlan { .. } - | Error::UnsupportedExpr { .. } => StatusCode::Internal, + | Error::UnsupportedExpr { .. } + | Error::ColumnDataType { .. } => StatusCode::Internal, + Error::ParseSql { source } => source.status_code(), Error::InitBackend { .. } => StatusCode::StorageUnavailable, Error::OpenLogStore { source } => source.status_code(), Error::StartScriptManager { source } => source.status_code(), diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 48db6b74af..766bf48917 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -1,7 +1,7 @@ use std::{fs, path, sync::Arc}; use api::v1::{ - admin_expr, object_expr, select_expr, AdminExpr, AdminResult, InsertExpr, ObjectExpr, + admin_expr, insert_expr, object_expr, select_expr, AdminExpr, AdminResult, ObjectExpr, ObjectResult, SelectExpr, }; use async_trait::async_trait; @@ -81,7 +81,11 @@ impl Instance { }) } - pub async fn execute_grpc_insert(&self, insert_expr: InsertExpr) -> Result { + pub async fn execute_grpc_insert( + &self, + table_name: &str, + values: insert_expr::Values, + ) -> Result { let schema_provider = self .catalog_manager .catalog(DEFAULT_CATALOG_NAME) @@ -89,12 +93,11 @@ impl Instance { .schema(DEFAULT_SCHEMA_NAME) .unwrap(); - let table_name = &insert_expr.table_name.clone(); let table = schema_provider .table(table_name) .context(TableNotFoundSnafu { table_name })?; - let insert = insertion_expr_to_request(insert_expr, table.clone())?; + let insert = insertion_expr_to_request(table_name, values, table.clone())?; let affected_rows = table .insert(insert) @@ -167,8 +170,8 @@ impl Instance { Ok(()) } - async fn handle_insert(&self, insert_expr: InsertExpr) -> ObjectResult { - match self.execute_grpc_insert(insert_expr).await { + async fn handle_insert(&self, table_name: &str, values: insert_expr::Values) -> ObjectResult { + match self.execute_grpc_insert(table_name, values).await { Ok(Output::AffectedRows(rows)) => ObjectResultBuilder::new() .status_code(StatusCode::Success as u32) .mutate_result(rows as u32, 0) @@ -289,6 +292,7 @@ async fn create_local_file_log_store(opts: &DatanodeOptions) -> Result servers::error::Result { @@ -315,7 +319,23 @@ impl SqlQueryHandler for Instance { impl GrpcQueryHandler for Instance { async fn do_query(&self, query: ObjectExpr) -> servers::error::Result { let object_resp = match query.expr { - Some(object_expr::Expr::Insert(insert_expr)) => self.handle_insert(insert_expr).await, + Some(object_expr::Expr::Insert(insert_expr)) => { + let table_name = &insert_expr.table_name; + let expr = insert_expr + .expr + .context(servers::error::InvalidQuerySnafu { + reason: "missing `expr` in `InsertExpr`", + })?; + match expr { + insert_expr::Expr::Values(values) => { + self.handle_insert(table_name, values).await + } + insert_expr::Expr::Sql(sql) => { + let output = self.execute_sql(&sql).await; + to_object_result(output).await + } + } + } Some(object_expr::Expr::Select(select_expr)) => self.handle_select(select_expr).await, other => { return servers::error::NotSupportedSnafu { diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index 9b089c722b..051cd4e1e4 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -2,4 +2,4 @@ mod ddl; pub(crate) mod handler; pub(crate) mod insert; pub(crate) mod plan; -pub(crate) mod select; +pub mod select; diff --git a/src/datanode/src/server/grpc/ddl.rs b/src/datanode/src/server/grpc/ddl.rs index 7b5a76dc45..498c446b96 100644 --- a/src/datanode/src/server/grpc/ddl.rs +++ b/src/datanode/src/server/grpc/ddl.rs @@ -1,8 +1,8 @@ use std::sync::Arc; -use api::v1::{alter_expr::Kind, AdminResult, AlterExpr, ColumnDataType, ColumnDef, CreateExpr}; +use api::helper::ColumnDataTypeWrapper; +use api::v1::{alter_expr::Kind, AdminResult, AlterExpr, ColumnDef, CreateExpr}; use common_error::prelude::{ErrorExt, StatusCode}; -use datatypes::prelude::*; use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef}; use futures::TryFutureExt; use query::Output; @@ -26,7 +26,7 @@ impl Instance { .mutate_result(rows as u32, 0) .build(), // Unreachable because we are executing "CREATE TABLE"; otherwise it's an internal bug. - Ok(Output::RecordBatch(_)) => unreachable!(), + Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(), Err(err) => AdminResultBuilder::default() .status_code(err.status_code() as u32) .err_msg(err.to_string()) @@ -53,7 +53,7 @@ impl Instance { .status_code(StatusCode::Success as u32) .mutate_result(rows as u32, 0) .build(), - Ok(Output::RecordBatch(_)) => unreachable!(), + Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(), Err(err) => AdminResultBuilder::default() .status_code(err.status_code() as u32) .err_msg(err.to_string()) @@ -140,30 +140,10 @@ fn create_table_schema(expr: &CreateExpr) -> Result { fn create_column_schema(column_def: &ColumnDef) -> Result { let data_type = - ColumnDataType::from_i32(column_def.data_type).context(error::InvalidColumnDefSnafu { - msg: format!("unknown ColumnDataType {}", column_def.data_type), - })?; - let data_type = match data_type { - ColumnDataType::Boolean => ConcreteDataType::boolean_datatype(), - ColumnDataType::Int8 => ConcreteDataType::int8_datatype(), - ColumnDataType::Int16 => ConcreteDataType::int16_datatype(), - ColumnDataType::Int32 => ConcreteDataType::int32_datatype(), - ColumnDataType::Int64 => ConcreteDataType::int64_datatype(), - ColumnDataType::Uint8 => ConcreteDataType::uint8_datatype(), - ColumnDataType::Uint16 => ConcreteDataType::uint16_datatype(), - ColumnDataType::Uint32 => ConcreteDataType::uint32_datatype(), - ColumnDataType::Uint64 => ConcreteDataType::uint64_datatype(), - ColumnDataType::Float32 => ConcreteDataType::float32_datatype(), - ColumnDataType::Float64 => ConcreteDataType::float64_datatype(), - ColumnDataType::Binary => ConcreteDataType::binary_datatype(), - ColumnDataType::String => ConcreteDataType::string_datatype(), - ColumnDataType::Date => ConcreteDataType::date_datatype(), - ColumnDataType::Datetime => ConcreteDataType::datetime_datatype(), - ColumnDataType::Timestamp => ConcreteDataType::timestamp_millis_datatype(), - }; + ColumnDataTypeWrapper::try_new(column_def.data_type).context(error::ColumnDataTypeSnafu)?; Ok(ColumnSchema { name: column_def.name.clone(), - data_type, + data_type: data_type.into(), is_nullable: column_def.is_nullable, }) } @@ -173,6 +153,7 @@ mod tests { use std::collections::HashMap; use catalog::MIN_USER_TABLE_ID; + use datatypes::prelude::ConcreteDataType; use super::*; use crate::tests::test_util; @@ -228,10 +209,10 @@ mod tests { }; let result = create_column_schema(&column_def); assert!(result.is_err()); - assert!(result - .unwrap_err() - .to_string() - .contains("Invalid ColumnDef in protobuf msg: unknown ColumnDataType 1024")); + assert_eq!( + result.unwrap_err().to_string(), + "Column datatype error, source: Unknown proto column datatype: 1024" + ); let column_def = ColumnDef { name: "a".to_string(), diff --git a/src/datanode/src/server/grpc/insert.rs b/src/datanode/src/server/grpc/insert.rs index 2bab5b6319..e819ac37c7 100644 --- a/src/datanode/src/server/grpc/insert.rs +++ b/src/datanode/src/server/grpc/insert.rs @@ -4,7 +4,7 @@ use std::{ sync::Arc, }; -use api::v1::{codec::InsertBatch, column::Values, Column, InsertExpr}; +use api::v1::{codec::InsertBatch, column::Values, insert_expr, Column}; use common_base::BitVec; use common_time::timestamp::Timestamp; use datatypes::{data_type::ConcreteDataType, value::Value, vectors::VectorBuilder}; @@ -14,13 +14,13 @@ use table::{requests::InsertRequest, Table}; use crate::error::{ColumnNotFoundSnafu, DecodeInsertSnafu, IllegalInsertDataSnafu, Result}; pub fn insertion_expr_to_request( - insert: InsertExpr, + table_name: &str, + values: insert_expr::Values, table: Arc, ) -> Result { let schema = table.schema(); - let table_name = &insert.table_name; let mut columns_builders = HashMap::with_capacity(schema.column_schemas().len()); - let insert_batches = insert_batches(insert.values)?; + let insert_batches = insert_batches(values.values)?; for InsertBatch { columns, row_count } in insert_batches { for Column { @@ -182,7 +182,7 @@ fn convert_values(data_type: &ConcreteDataType, values: Values) -> Vec { .map(|v| Value::Date(v.into())) .collect(), ConcreteDataType::Timestamp(_) => values - .i64_values + .ts_millis_values .into_iter() .map(|v| Value::Timestamp(Timestamp::from_millis(v))) .collect(), @@ -202,7 +202,7 @@ mod tests { use api::v1::{ codec::InsertBatch, column::{self, Values}, - Column, InsertExpr, + insert_expr, Column, }; use common_base::BitVec; use common_query::prelude::Expr; @@ -219,13 +219,12 @@ mod tests { #[test] fn test_insertion_expr_to_request() { - let insert_expr = InsertExpr { - table_name: "demo".to_string(), - values: mock_insert_batches(), - }; let table: Arc = Arc::new(DemoTable {}); - let insert_req = insertion_expr_to_request(insert_expr, table).unwrap(); + let values = insert_expr::Values { + values: mock_insert_batches(), + }; + let insert_req = insertion_expr_to_request("demo", values, table).unwrap(); assert_eq!("demo", insert_req.table_name); @@ -329,6 +328,7 @@ mod tests { semantic_type: SEMANTIC_TAG, values: Some(host_vals), null_mask: vec![0], + ..Default::default() }; let cpu_vals = column::Values { @@ -340,6 +340,7 @@ mod tests { semantic_type: SEMANTIC_FEILD, values: Some(cpu_vals), null_mask: vec![2], + ..Default::default() }; let mem_vals = column::Values { @@ -351,6 +352,7 @@ mod tests { semantic_type: SEMANTIC_FEILD, values: Some(mem_vals), null_mask: vec![1], + ..Default::default() }; let ts_vals = column::Values { @@ -362,6 +364,7 @@ mod tests { semantic_type: SEMANTIC_TS, values: Some(ts_vals), null_mask: vec![0], + ..Default::default() }; let insert_batch = InsertBatch { diff --git a/src/datanode/src/server/grpc/select.rs b/src/datanode/src/server/grpc/select.rs index 85dba25933..9088524679 100644 --- a/src/datanode/src/server/grpc/select.rs +++ b/src/datanode/src/server/grpc/select.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use api::helper::ColumnDataTypeWrapper; use api::v1::{codec::SelectResult, column::Values, Column, ObjectResult}; use arrow::array::{Array, BooleanArray, PrimitiveArray}; use common_base::BitVec; @@ -8,9 +9,9 @@ use common_error::status_code::StatusCode; use common_recordbatch::{util, RecordBatch, SendableRecordBatchStream}; use datatypes::arrow_array::{BinaryArray, StringArray}; use query::Output; -use snafu::OptionExt; +use snafu::{OptionExt, ResultExt}; -use crate::error::{ConversionSnafu, Result}; +use crate::error::{self, ConversionSnafu, Result}; use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder}; pub async fn to_object_result(result: Result) -> ObjectResult { @@ -19,7 +20,8 @@ pub async fn to_object_result(result: Result) -> ObjectResult { .status_code(StatusCode::Success as u32) .mutate_result(rows as u32, 0) .build(), - Ok(Output::RecordBatch(stream)) => record_batchs(stream).await, + Ok(Output::Stream(stream)) => record_batchs(stream).await, + Ok(Output::RecordBatches(recordbatches)) => build_result(recordbatches.to_vec()).await, Err(err) => ObjectResultBuilder::new() .status_code(err.status_code() as u32) .err_msg(err.to_string()) @@ -28,15 +30,18 @@ pub async fn to_object_result(result: Result) -> ObjectResult { } async fn record_batchs(stream: SendableRecordBatchStream) -> ObjectResult { - let builder = ObjectResultBuilder::new(); match util::collect(stream).await { - Ok(record_batches) => match try_convert(record_batches) { - Ok(select_result) => builder - .status_code(StatusCode::Success as u32) - .select_result(select_result) - .build(), - Err(err) => build_err_result(&err), - }, + Ok(recordbatches) => build_result(recordbatches).await, + Err(err) => build_err_result(&err), + } +} + +async fn build_result(recordbatches: Vec) -> ObjectResult { + match try_convert(recordbatches) { + Ok(select_result) => ObjectResultBuilder::new() + .status_code(StatusCode::Success as u32) + .select_result(select_result) + .build(), Err(err) => build_err_result(&err), } } @@ -69,6 +74,9 @@ fn try_convert(record_batches: Vec) -> Result { column_name, values: Some(values(&arrays)?), null_mask: null_mask(&arrays, row_count), + datatype: ColumnDataTypeWrapper::try_from(schema.data_type.clone()) + .context(error::ColumnDataTypeSnafu)? + .datatype() as i32, ..Default::default() }; columns.push(column); @@ -80,7 +88,7 @@ fn try_convert(record_batches: Vec) -> Result { }) } -fn null_mask(arrays: &Vec>, row_count: usize) -> Vec { +pub fn null_mask(arrays: &Vec>, row_count: usize) -> Vec { let null_count: usize = arrays.iter().map(|a| a.null_count()).sum(); if null_count == 0 { @@ -123,7 +131,7 @@ macro_rules! convert_arrow_array_to_grpc_vals { } -fn values(arrays: &[Arc]) -> Result { +pub fn values(arrays: &[Arc]) -> Result { if arrays.is_empty() { return Ok(Values::default()); } @@ -153,10 +161,11 @@ fn values(arrays: &[Arc]) -> Result { (DataType::Utf8, StringArray, string_values, |x| {x.into()}), (DataType::LargeUtf8, StringArray, string_values, |x| {x.into()}), - (DataType::Date32, PrimitiveArray, i32_values, |x| {*x as i32}), - (DataType::Date64, PrimitiveArray, i64_values, |x| {*x as i64}), - (DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, _), PrimitiveArray, i64_values, |x| {*x} ) + (DataType::Date32, PrimitiveArray, date_values, |x| {*x as i32}), + (DataType::Date64, PrimitiveArray, datetime_values,|x| {*x as i64}), + + (DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, _), PrimitiveArray, ts_millis_values, |x| {*x}) ) } diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index 4266b87f58..efd2db52a7 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -1,17 +1,13 @@ //! sql handler use catalog::CatalogManagerRef; -use datatypes::prelude::ConcreteDataType; -use datatypes::schema::ColumnSchema; -use datatypes::types::DateTimeType; use query::query_engine::Output; use snafu::{OptionExt, ResultExt}; -use sql::ast::{ColumnDef, ColumnOption, DataType as SqlDataType, ObjectName}; use table::engine::{EngineContext, TableEngineRef}; use table::requests::*; use table::TableRef; -use crate::error::{self, GetTableSnafu, Result, TableNotFoundSnafu}; +use crate::error::{GetTableSnafu, Result, TableNotFoundSnafu}; mod alter; mod create; @@ -58,77 +54,6 @@ impl SqlHandler { } } -/// Converts maybe fully-qualified table name (`..` or `
` when -/// catalog and schema are default) to tuple. -fn table_idents_to_full_name( - obj_name: &ObjectName, -) -> Result<(Option, Option, String)> { - match &obj_name.0[..] { - [table] => Ok((None, None, table.value.clone())), - [catalog, schema, table] => Ok(( - Some(catalog.value.clone()), - Some(schema.value.clone()), - table.value.clone(), - )), - _ => error::InvalidSqlSnafu { - msg: format!( - "expect table name to be ..
or
, actual: {}", - obj_name - ), - } - .fail(), - } -} - -fn column_def_to_schema(column_def: &ColumnDef) -> Result { - let is_nullable = column_def - .options - .iter() - .any(|o| matches!(o.option, ColumnOption::Null)); - Ok(ColumnSchema { - name: column_def.name.value.clone(), - data_type: sql_data_type_to_concrete_data_type(&column_def.data_type)?, - is_nullable, - }) -} - -fn sql_data_type_to_concrete_data_type(data_type: &SqlDataType) -> Result { - match data_type { - SqlDataType::BigInt(_) => Ok(ConcreteDataType::int64_datatype()), - SqlDataType::Int(_) => Ok(ConcreteDataType::int32_datatype()), - SqlDataType::SmallInt(_) => Ok(ConcreteDataType::int16_datatype()), - SqlDataType::Char(_) - | SqlDataType::Varchar(_) - | SqlDataType::Text - | SqlDataType::String => Ok(ConcreteDataType::string_datatype()), - SqlDataType::Float(_) => Ok(ConcreteDataType::float32_datatype()), - SqlDataType::Double => Ok(ConcreteDataType::float64_datatype()), - SqlDataType::Boolean => Ok(ConcreteDataType::boolean_datatype()), - SqlDataType::Date => Ok(ConcreteDataType::date_datatype()), - SqlDataType::Custom(obj_name) => match &obj_name.0[..] { - [type_name] => { - if type_name.value.eq_ignore_ascii_case(DateTimeType::name()) { - Ok(ConcreteDataType::datetime_datatype()) - } else { - error::SqlTypeNotSupportedSnafu { - t: data_type.clone(), - } - .fail() - } - } - _ => error::SqlTypeNotSupportedSnafu { - t: data_type.clone(), - } - .fail(), - }, - SqlDataType::Timestamp => Ok(ConcreteDataType::timestamp_millis_datatype()), - _ => error::SqlTypeNotSupportedSnafu { - t: data_type.clone(), - } - .fail(), - } -} - #[cfg(test)] mod tests { use std::any::Any; diff --git a/src/datanode/src/sql/alter.rs b/src/datanode/src/sql/alter.rs index ccdec5bd36..3d92fd2857 100644 --- a/src/datanode/src/sql/alter.rs +++ b/src/datanode/src/sql/alter.rs @@ -1,11 +1,12 @@ use query::query_engine::Output; use snafu::prelude::*; use sql::statements::alter::{AlterTable, AlterTableOperation}; +use sql::statements::{column_def_to_schema, table_idents_to_full_name}; use table::engine::EngineContext; use table::requests::{AlterKind, AlterTableRequest}; use crate::error::{self, Result}; -use crate::sql::{column_def_to_schema, table_idents_to_full_name, SqlHandler}; +use crate::sql::SqlHandler; impl SqlHandler { pub(crate) async fn alter(&self, req: AlterTableRequest) -> Result { @@ -24,7 +25,7 @@ impl SqlHandler { pub(crate) fn alter_to_request(&self, alter_table: AlterTable) -> Result { let (catalog_name, schema_name, table_name) = - table_idents_to_full_name(alter_table.table_name())?; + table_idents_to_full_name(alter_table.table_name()).context(error::ParseSqlSnafu)?; let alter_kind = match alter_table.alter_operation() { AlterTableOperation::AddConstraint(table_constraint) => { @@ -34,7 +35,7 @@ impl SqlHandler { .fail() } AlterTableOperation::AddColumn { column_def } => AlterKind::AddColumn { - new_column: column_def_to_schema(column_def)?, + new_column: column_def_to_schema(column_def).context(error::ParseSqlSnafu)?, }, }; Ok(AlterTableRequest { diff --git a/src/datanode/src/sql/create.rs b/src/datanode/src/sql/create.rs index 8010abf913..6fc88ad26e 100644 --- a/src/datanode/src/sql/create.rs +++ b/src/datanode/src/sql/create.rs @@ -8,6 +8,7 @@ use query::query_engine::Output; use snafu::{OptionExt, ResultExt}; use sql::ast::TableConstraint; use sql::statements::create_table::CreateTable; +use sql::statements::{column_def_to_schema, table_idents_to_full_name}; use store_api::storage::consts::TIME_INDEX_NAME; use table::engine::EngineContext; use table::metadata::TableId; @@ -17,7 +18,7 @@ use crate::error::{ self, ConstraintNotSupportedSnafu, CreateSchemaSnafu, CreateTableSnafu, InsertSystemCatalogSnafu, KeyColumnNotFoundSnafu, Result, }; -use crate::sql::{column_def_to_schema, table_idents_to_full_name, SqlHandler}; +use crate::sql::SqlHandler; impl SqlHandler { pub(crate) async fn create(&self, req: CreateTableRequest) -> Result { @@ -61,7 +62,8 @@ impl SqlHandler { let mut ts_index = usize::MAX; let mut primary_keys = vec![]; - let (catalog_name, schema_name, table_name) = table_idents_to_full_name(&stmt.name)?; + let (catalog_name, schema_name, table_name) = + table_idents_to_full_name(&stmt.name).context(error::ParseSqlSnafu)?; let col_map = stmt .columns @@ -129,7 +131,7 @@ impl SqlHandler { let columns_schemas: Vec<_> = stmt .columns .iter() - .map(column_def_to_schema) + .map(|column| column_def_to_schema(column).context(error::ParseSqlSnafu)) .collect::>>()?; let schema = Arc::new( @@ -159,15 +161,12 @@ mod tests { use std::assert_matches::assert_matches; use datatypes::prelude::ConcreteDataType; - use sql::ast::Ident; - use sql::ast::{DataType as SqlDataType, ObjectName}; use sql::dialect::GenericDialect; use sql::parser::ParserContext; use sql::statements::statement::Statement; use super::*; use crate::error::Error; - use crate::sql::sql_data_type_to_concrete_data_type; use crate::tests::test_util::create_mock_sql_handler; fn sql_to_statement(sql: &str) -> CreateTable { @@ -292,46 +291,4 @@ mod tests { .data_type ); } - - fn check_type(sql_type: SqlDataType, data_type: ConcreteDataType) { - assert_eq!( - data_type, - sql_data_type_to_concrete_data_type(&sql_type).unwrap() - ); - } - - #[test] - pub fn test_sql_data_type_to_concrete_data_type() { - check_type( - SqlDataType::BigInt(None), - ConcreteDataType::int64_datatype(), - ); - check_type(SqlDataType::Int(None), ConcreteDataType::int32_datatype()); - check_type( - SqlDataType::SmallInt(None), - ConcreteDataType::int16_datatype(), - ); - check_type(SqlDataType::Char(None), ConcreteDataType::string_datatype()); - check_type( - SqlDataType::Varchar(None), - ConcreteDataType::string_datatype(), - ); - check_type(SqlDataType::Text, ConcreteDataType::string_datatype()); - check_type(SqlDataType::String, ConcreteDataType::string_datatype()); - check_type( - SqlDataType::Float(None), - ConcreteDataType::float32_datatype(), - ); - check_type(SqlDataType::Double, ConcreteDataType::float64_datatype()); - check_type(SqlDataType::Boolean, ConcreteDataType::boolean_datatype()); - check_type(SqlDataType::Date, ConcreteDataType::date_datatype()); - check_type( - SqlDataType::Custom(ObjectName(vec![Ident::new("datetime")])), - ConcreteDataType::datetime_datatype(), - ); - check_type( - SqlDataType::Timestamp, - ConcreteDataType::timestamp_millis_datatype(), - ); - } } diff --git a/src/datanode/src/tests/grpc_test.rs b/src/datanode/src/tests/grpc_test.rs index bce0c7e831..b54f28bcdb 100644 --- a/src/datanode/src/tests/grpc_test.rs +++ b/src/datanode/src/tests/grpc_test.rs @@ -6,8 +6,8 @@ use std::time::Duration; use api::v1::ColumnDataType; use api::v1::{ - admin_result, alter_expr::Kind, codec::InsertBatch, column, AddColumn, AlterExpr, Column, - ColumnDef, CreateExpr, MutateResult, + admin_result, alter_expr::Kind, codec::InsertBatch, column, insert_expr, AddColumn, AlterExpr, + Column, ColumnDef, CreateExpr, InsertExpr, MutateResult, }; use client::admin::Admin; use client::{Client, Database, ObjectResult}; @@ -48,6 +48,7 @@ async fn test_insert_and_select() { .collect(), ..Default::default() }), + datatype: 12, // string ..Default::default() }; let expected_cpu_col = Column { @@ -57,6 +58,7 @@ async fn test_insert_and_select() { ..Default::default() }), null_mask: vec![2], + datatype: 10, // float64 ..Default::default() }; let expected_mem_col = Column { @@ -66,14 +68,16 @@ async fn test_insert_and_select() { ..Default::default() }), null_mask: vec![4], + datatype: 10, // float64 ..Default::default() }; let expected_ts_col = Column { column_name: "ts".to_string(), values: Some(column::Values { - i64_values: vec![100, 101, 102, 103], + ts_millis_values: vec![100, 101, 102, 103], ..Default::default() }), + datatype: 15, // timestamp ..Default::default() }; @@ -117,7 +121,11 @@ async fn test_insert_and_select() { row_count: 4, } .into()]; - let result = db.insert("demo", values).await; + let expr = InsertExpr { + table_name: "demo".to_string(), + expr: Some(insert_expr::Expr::Values(insert_expr::Values { values })), + }; + let result = db.insert(expr).await; assert!(result.is_ok()); // select diff --git a/src/datanode/src/tests/instance_test.rs b/src/datanode/src/tests/instance_test.rs index 4e0efe528e..0715f17fff 100644 --- a/src/datanode/src/tests/instance_test.rs +++ b/src/datanode/src/tests/instance_test.rs @@ -40,7 +40,7 @@ async fn test_execute_query() { .await .unwrap(); match output { - Output::RecordBatch(recordbatch) => { + Output::Stream(recordbatch) => { let numbers = util::collect(recordbatch).await.unwrap(); let columns = numbers[0].df_recordbatch.columns(); assert_eq!(1, columns.len()); @@ -116,7 +116,7 @@ async fn test_alter_table() { let output = instance.execute_sql("select * from demo").await.unwrap(); match output { - Output::RecordBatch(stream) => { + Output::Stream(stream) => { let recordbatches = util::collect(stream).await.unwrap(); let recordbatch = recordbatches .into_iter() diff --git a/src/datatypes/src/vectors/builder.rs b/src/datatypes/src/vectors/builder.rs index ac3b1eb5ec..5e97203b00 100644 --- a/src/datatypes/src/vectors/builder.rs +++ b/src/datatypes/src/vectors/builder.rs @@ -5,6 +5,8 @@ use common_time::datetime::DateTime; use common_time::timestamp::Timestamp; use crate::data_type::ConcreteDataType; +use crate::error::{self, Result}; +use crate::prelude::ValueRef; use crate::scalars::ScalarVectorBuilder; use crate::value::Value; use crate::vectors::date::DateVectorBuilder; @@ -160,6 +162,37 @@ impl VectorBuilder { } } + pub fn try_push_ref(&mut self, value: ValueRef) -> Result<()> { + match &mut *self { + VectorBuilder::Null(b) => { + if !value.is_null() { + return error::CastTypeSnafu { + msg: "unable to accept non-null value in NullVectorBuilder", + } + .fail(); + } + *b += 1; + Ok(()) + } + VectorBuilder::Boolean(b) => b.push_value_ref(value), + VectorBuilder::UInt8(b) => b.push_value_ref(value), + VectorBuilder::UInt16(b) => b.push_value_ref(value), + VectorBuilder::UInt32(b) => b.push_value_ref(value), + VectorBuilder::UInt64(b) => b.push_value_ref(value), + VectorBuilder::Int8(b) => b.push_value_ref(value), + VectorBuilder::Int16(b) => b.push_value_ref(value), + VectorBuilder::Int32(b) => b.push_value_ref(value), + VectorBuilder::Int64(b) => b.push_value_ref(value), + VectorBuilder::Float32(b) => b.push_value_ref(value), + VectorBuilder::Float64(b) => b.push_value_ref(value), + VectorBuilder::String(b) => b.push_value_ref(value), + VectorBuilder::Binary(b) => b.push_value_ref(value), + VectorBuilder::Date(b) => b.push_value_ref(value), + VectorBuilder::DateTime(b) => b.push_value_ref(value), + VectorBuilder::Timestamp(b) => b.push_value_ref(value), + } + } + pub fn push_null(&mut self) { match self { VectorBuilder::Null(v) => *v += 1, @@ -223,19 +256,37 @@ mod tests { for i in 0..10 { builder.push(&Value::$Type(i)); } + for i in 10..20 { + builder.try_push_ref(ValueRef::$Type(i)).unwrap(); + } let vector = builder.finish(); - for i in 0..10 { + for i in 0..20 { assert_eq!(Value::$Type(i), vector.get(i as usize)); } let mut builder = VectorBuilder::new(ConcreteDataType::$datatype()); builder.push(&Value::Null); builder.push(&Value::$Type(100)); + builder.try_push_ref(ValueRef::Null).unwrap(); + builder.try_push_ref(ValueRef::$Type(101)).unwrap(); + + let result = builder.try_push_ref(ValueRef::Boolean(true)); + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + format!( + "Failed to cast value Boolean(true) to primitive type {}", + stringify!($Type) + ), + ); + let vector = builder.finish(); assert!(vector.is_null(0)); assert_eq!(Value::$Type(100), vector.get(1)); + assert!(vector.is_null(2)); + assert_eq!(Value::$Type(101), vector.get(3)); }; } @@ -244,8 +295,19 @@ mod tests { let mut builder = VectorBuilder::new(ConcreteDataType::null_datatype()); assert_eq!(ConcreteDataType::null_datatype(), builder.data_type()); builder.push(&Value::Null); + + let result = builder.try_push_ref(ValueRef::Boolean(true)); + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "unable to accept non-null value in NullVectorBuilder" + ); + + builder.try_push_ref(ValueRef::Null).unwrap(); + let vector = builder.finish(); assert!(vector.is_null(0)); + assert!(vector.is_null(1)); } #[test] @@ -267,13 +329,43 @@ mod tests { assert_eq!(data_type, builder.data_type()); builder.push(&Value::Float32(OrderedFloat(1.0))); + + let result = builder.try_push_ref(ValueRef::Boolean(true)); + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "Failed to cast value Boolean(true) to primitive type Float32" + ); + + builder + .try_push_ref(ValueRef::Float32(OrderedFloat(2.0))) + .unwrap(); + builder.try_push_ref(ValueRef::Null).unwrap(); + let vector = builder.finish(); assert_eq!(Value::Float32(OrderedFloat(1.0)), vector.get(0)); + assert_eq!(Value::Float32(OrderedFloat(2.0)), vector.get(1)); + assert_eq!(Value::Null, vector.get(2)); let mut builder = VectorBuilder::new(ConcreteDataType::float64_datatype()); builder.push(&Value::Float64(OrderedFloat(2.0))); + + let result = builder.try_push_ref(ValueRef::Boolean(true)); + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "Failed to cast value Boolean(true) to primitive type Float64" + ); + + builder + .try_push_ref(ValueRef::Float64(OrderedFloat(3.0))) + .unwrap(); + builder.try_push_ref(ValueRef::Null).unwrap(); + let vector = builder.finish(); assert_eq!(Value::Float64(OrderedFloat(2.0)), vector.get(0)); + assert_eq!(Value::Float64(OrderedFloat(3.0)), vector.get(1)); + assert_eq!(Value::Null, vector.get(2)); } #[test] @@ -283,8 +375,21 @@ mod tests { let mut builder = VectorBuilder::new(data_type.clone()); assert_eq!(data_type, builder.data_type()); builder.push(&Value::Binary(hello.into())); + + let result = builder.try_push_ref(ValueRef::Boolean(true)); + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "Failed to cast value ref Boolean(true) to Binary" + ); + + builder.try_push_ref(ValueRef::Binary(b"world")).unwrap(); + builder.try_push_ref(ValueRef::Null).unwrap(); + let vector = builder.finish(); assert_eq!(Value::Binary(hello.into()), vector.get(0)); + assert_eq!(ValueRef::Binary(b"world"), vector.get_ref(1)); + assert_eq!(Value::Null, vector.get(2)); } #[test] @@ -294,8 +399,21 @@ mod tests { let mut builder = VectorBuilder::new(data_type.clone()); assert_eq!(data_type, builder.data_type()); builder.push(&Value::String(hello.into())); + + let result = builder.try_push_ref(ValueRef::Boolean(true)); + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "Failed to cast value ref Boolean(true) to String" + ); + + builder.try_push_ref(ValueRef::String("world")).unwrap(); + builder.try_push_ref(ValueRef::Null).unwrap(); + let vector = builder.finish(); assert_eq!(Value::String(hello.into()), vector.get(0)); + assert_eq!(ValueRef::String("world"), vector.get_ref(1)); + assert_eq!(Value::Null, vector.get(2)); } #[test] @@ -304,10 +422,25 @@ mod tests { assert_eq!(ConcreteDataType::date_datatype(), builder.data_type()); builder.push_null(); builder.push(&Value::Date(Date::new(123))); + + let result = builder.try_push_ref(ValueRef::Boolean(true)); + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "Failed to cast value ref Boolean(true) to Date" + ); + + builder + .try_push_ref(ValueRef::Date(Date::new(456))) + .unwrap(); + builder.try_push_ref(ValueRef::Null).unwrap(); + let v = builder.finish(); let v = v.as_any().downcast_ref::().unwrap(); assert_eq!(Value::Null, v.get(0)); assert_eq!(Value::Date(Date::new(123)), v.get(1)); + assert_eq!(ValueRef::Date(Date::new(456)), v.get_ref(2)); + assert_eq!(ValueRef::Null, v.get_ref(3)); assert_eq!( &arrow::datatypes::DataType::Date32, v.to_arrow_array().data_type() @@ -320,10 +453,25 @@ mod tests { assert_eq!(ConcreteDataType::datetime_datatype(), builder.data_type()); builder.push_null(); builder.push(&Value::DateTime(DateTime::new(123))); + + let result = builder.try_push_ref(ValueRef::Boolean(true)); + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "Failed to cast value ref Boolean(true) to DateTime" + ); + + builder + .try_push_ref(ValueRef::DateTime(DateTime::new(456))) + .unwrap(); + builder.try_push_ref(ValueRef::Null).unwrap(); + let v = builder.finish(); let v = v.as_any().downcast_ref::().unwrap(); assert_eq!(Value::Null, v.get(0)); assert_eq!(Value::DateTime(DateTime::new(123)), v.get(1)); + assert_eq!(ValueRef::DateTime(DateTime::new(456)), v.get_ref(2)); + assert_eq!(ValueRef::Null, v.get_ref(3)); assert_eq!( &arrow::datatypes::DataType::Date64, v.to_arrow_array().data_type() diff --git a/src/datatypes/src/vectors/list.rs b/src/datatypes/src/vectors/list.rs index 6349866481..d06a48f8fb 100644 --- a/src/datatypes/src/vectors/list.rs +++ b/src/datatypes/src/vectors/list.rs @@ -20,7 +20,7 @@ type ArrowListArray = ListArray; #[derive(Debug, Clone, PartialEq)] pub struct ListVector { array: ArrowListArray, - inner_data_type: ConcreteDataType, + inner_datatype: ConcreteDataType, } impl ListVector { @@ -31,7 +31,7 @@ impl ListVector { impl Vector for ListVector { fn data_type(&self) -> ConcreteDataType { - ConcreteDataType::List(ListType::new(self.inner_data_type.clone())) + ConcreteDataType::List(ListType::new(self.inner_datatype.clone())) } fn vector_type_name(&self) -> String { @@ -89,7 +89,7 @@ impl Vector for ListVector { .collect::>(); Value::List(ListValue::new( Some(Box::new(values)), - self.inner_data_type.clone(), + self.inner_datatype.clone(), )) } @@ -124,13 +124,13 @@ impl Serializable for ListVector { impl From for ListVector { fn from(array: ArrowListArray) -> Self { - let inner_data_type = ConcreteDataType::from_arrow_type(match array.data_type() { + let inner_datatype = ConcreteDataType::from_arrow_type(match array.data_type() { ArrowDataType::List(field) => &field.data_type, _ => unreachable!(), }); Self { array, - inner_data_type, + inner_datatype, } } } @@ -234,7 +234,7 @@ impl MutableVector for ListVectorBuilder { let vector = ListVector { array, - inner_data_type: self.inner_type.clone(), + inner_datatype: self.inner_type.clone(), }; Arc::new(vector) } @@ -286,7 +286,7 @@ mod tests { let list_vector = ListVector { array: arrow_array.clone(), - inner_data_type: ConcreteDataType::int32_datatype(), + inner_datatype: ConcreteDataType::int32_datatype(), }; assert_eq!( ConcreteDataType::List(ListType::new(ConcreteDataType::int32_datatype())), @@ -374,7 +374,7 @@ mod tests { let list_vector = ListVector::try_from_arrow_array(array_ref).unwrap(); assert_eq!( - "ListVector { array: ListArray[[1, 2, 3], None, [4, None, 6]], inner_data_type: UInt32(UInt32) }", + "ListVector { array: ListArray[[1, 2, 3], None, [4, None, 6]], inner_datatype: UInt32(UInt32) }", format!("{:?}", list_vector) ); } diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml new file mode 100644 index 0000000000..bc21a75425 --- /dev/null +++ b/src/frontend/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "frontend" +version = "0.1.0" +edition = "2021" + +[dependencies.arrow] +package = "arrow2" +version = "0.10" +features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"] + +[dependencies] +api = { path = "../api" } +async-stream = "0.3" +async-trait = "0.1" +catalog = { path = "../catalog" } +client = { path = "../client" } +common-base = { path = "../common/base" } +common-error = { path = "../common/error" } +common-recordbatch = { path = "../common/recordbatch" } +common-runtime = { path = "../common/runtime" } +common-telemetry = { path = "../common/telemetry" } +common-time = { path = "../common/time" } +datatypes = { path = "../datatypes" } +query = { path = "../query" } +snafu = { version = "0.7", features = ["backtraces"] } +tokio = { version = "1.18", features = ["full"] } +serde = "1.0" +servers = { path = "../servers" } +sql = { path = "../sql" } + +[dev-dependencies] +datanode = { path = "../datanode" } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]} +datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } +futures = "0.3" +tonic = "0.8" +tempdir = "0.3" +tower = "0.4" diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs new file mode 100644 index 0000000000..470063a6c2 --- /dev/null +++ b/src/frontend/src/error.rs @@ -0,0 +1,81 @@ +use std::any::Any; + +use common_error::prelude::*; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Failed to connect Datanode at {}, source: {}", addr, source))] + ConnectDatanode { + addr: String, + #[snafu(backtrace)] + source: client::Error, + }, + + #[snafu(display("Runtime resource error, source: {}", source))] + RuntimeResource { + #[snafu(backtrace)] + source: common_runtime::error::Error, + }, + + #[snafu(display("Failed to start server, source: {}", source))] + StartServer { + #[snafu(backtrace)] + source: servers::error::Error, + }, + + #[snafu(display("Failed to parse address {}, source: {}", addr, source))] + ParseAddr { + addr: String, + source: std::net::AddrParseError, + }, + + #[snafu(display("Failed to parse SQL, source: {}", source))] + ParseSql { + #[snafu(backtrace)] + source: sql::error::Error, + }, + + #[snafu(display("Column datatype error, source: {}", source))] + ColumnDataType { + #[snafu(backtrace)] + source: api::error::Error, + }, + + #[snafu(display("Invalid SQL, error: {}", err_msg))] + InvalidSql { + err_msg: String, + backtrace: Backtrace, + }, + + #[snafu(display("Illegal Frontend state: {}", err_msg))] + IllegalFrontendState { + err_msg: String, + backtrace: Backtrace, + }, +} + +pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + match self { + Error::ConnectDatanode { .. } | Error::ParseAddr { .. } | Error::InvalidSql { .. } => { + StatusCode::InvalidArguments + } + Error::RuntimeResource { source, .. } => source.status_code(), + Error::StartServer { source, .. } => source.status_code(), + Error::ParseSql { source } => source.status_code(), + Error::ColumnDataType { .. } => StatusCode::Internal, + Error::IllegalFrontendState { .. } => StatusCode::Unexpected, + } + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs new file mode 100644 index 0000000000..4464062566 --- /dev/null +++ b/src/frontend/src/frontend.rs @@ -0,0 +1,62 @@ +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; +use snafu::prelude::*; + +use crate::error::{self, Result}; +use crate::instance::Instance; +use crate::server::Services; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct FrontendOptions { + pub http_addr: Option, + pub grpc_addr: Option, + pub mysql_addr: Option, + pub mysql_runtime_size: u32, +} + +impl Default for FrontendOptions { + fn default() -> Self { + Self { + http_addr: Some("0.0.0.0:4000".to_string()), + grpc_addr: Some("0.0.0.0:4001".to_string()), + mysql_addr: Some("0.0.0.0:4002".to_string()), + mysql_runtime_size: 2, + } + } +} + +impl FrontendOptions { + // TODO(LFC) Get Datanode address from Meta. + pub(crate) fn datanode_grpc_addr(&self) -> String { + "http://127.0.0.1:3001".to_string() + } +} + +pub struct Frontend { + opts: FrontendOptions, + instance: Option, +} + +impl Frontend { + pub fn new(opts: FrontendOptions) -> Self { + let instance = Instance::new(); + Self { + opts, + instance: Some(instance), + } + } + + pub async fn start(&mut self) -> Result<()> { + let mut instance = self + .instance + .take() + .context(error::IllegalFrontendStateSnafu { + err_msg: "Frontend instance not initialized", + })?; + instance.start(&self.opts).await?; + + let instance = Arc::new(instance); + Services::start(&self.opts, instance).await + } +} diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs new file mode 100644 index 0000000000..60aa822813 --- /dev/null +++ b/src/frontend/src/instance.rs @@ -0,0 +1,550 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use api::helper::ColumnDataTypeWrapper; +use api::v1::{ + insert_expr, AdminExpr, AdminResult, ColumnDataType, ColumnDef as GrpcColumnDef, CreateExpr, + InsertExpr, ObjectExpr, ObjectResult as GrpcObjectResult, +}; +use async_trait::async_trait; +use client::admin::{admin_result_to_output, Admin}; +use client::{Client, Database, Select}; +use common_error::prelude::BoxedError; +use datatypes::schema::ColumnSchema; +use query::Output; +use servers::error as server_error; +use servers::query_handler::{GrpcAdminHandler, GrpcQueryHandler, SqlQueryHandler}; +use snafu::prelude::*; +use sql::ast::{ColumnDef, TableConstraint}; +use sql::statements::create_table::{CreateTable, TIME_INDEX}; +use sql::statements::statement::Statement; +use sql::statements::{column_def_to_schema, table_idents_to_full_name}; +use sql::{dialect::GenericDialect, parser::ParserContext}; + +use crate::error::{self, Result}; +use crate::frontend::FrontendOptions; + +pub(crate) type InstanceRef = Arc; + +pub struct Instance { + db: Database, + admin: Admin, +} + +impl Instance { + pub(crate) fn new() -> Self { + let client = Client::default(); + let db = Database::new("greptime", client.clone()); + let admin = Admin::new("greptime", client); + Self { db, admin } + } + + pub(crate) async fn start(&mut self, opts: &FrontendOptions) -> Result<()> { + let addr = opts.datanode_grpc_addr(); + self.db + .start(addr.clone()) + .await + .context(error::ConnectDatanodeSnafu { addr: addr.clone() })?; + self.admin + .start(addr.clone()) + .await + .context(error::ConnectDatanodeSnafu { addr })?; + Ok(()) + } +} + +#[cfg(test)] +impl Instance { + pub fn with_client(client: Client) -> Self { + Self { + db: Database::new("greptime", client.clone()), + admin: Admin::new("greptime", client), + } + } +} + +#[async_trait] +impl SqlQueryHandler for Instance { + async fn do_query(&self, query: &str) -> server_error::Result { + let mut stmt = ParserContext::create_with_dialect(query, &GenericDialect {}) + .map_err(BoxedError::new) + .context(server_error::ExecuteQuerySnafu { query })?; + if stmt.len() != 1 { + // TODO(LFC): Support executing multiple SQLs, + // which seems to be a major change to our whole server framework? + return server_error::NotSupportedSnafu { + feat: "Only one SQL is allowed to be executed at one time.", + } + .fail(); + } + let stmt = stmt.remove(0); + + match stmt { + Statement::Query(_) => self + .db + .select(Select::Sql(query.to_string())) + .await + .and_then(|object_result| object_result.try_into()), + Statement::Insert(insert) => { + let table_name = insert.table_name(); + let expr = InsertExpr { + table_name, + expr: Some(insert_expr::Expr::Sql(query.to_string())), + }; + self.db + .insert(expr) + .await + .and_then(|object_result| object_result.try_into()) + } + Statement::Create(create) => { + let expr = create_to_expr(create) + .map_err(BoxedError::new) + .context(server_error::ExecuteQuerySnafu { query })?; + self.admin + .create(expr) + .await + .and_then(admin_result_to_output) + } + // TODO(LFC): Support other SQL execution, + // update, delete, alter, explain, etc. + _ => return server_error::NotSupportedSnafu { feat: query }.fail(), + } + .map_err(BoxedError::new) + .context(server_error::ExecuteQuerySnafu { query }) + } + + async fn insert_script(&self, _name: &str, _script: &str) -> server_error::Result<()> { + server_error::NotSupportedSnafu { + feat: "Script execution in Frontend", + } + .fail() + } + + async fn execute_script(&self, _script: &str) -> server_error::Result { + server_error::NotSupportedSnafu { + feat: "Script execution in Frontend", + } + .fail() + } +} + +fn create_to_expr(create: CreateTable) -> Result { + let (catalog_name, schema_name, table_name) = + table_idents_to_full_name(&create.name).context(error::ParseSqlSnafu)?; + + let expr = CreateExpr { + catalog_name, + schema_name, + table_name, + column_defs: columns_to_expr(&create.columns)?, + time_index: find_time_index(&create.constraints)?, + primary_keys: find_primary_keys(&create.constraints)?, + create_if_not_exists: create.if_not_exists, + // TODO(LFC): Fill in other table options. + table_options: HashMap::from([("engine".to_string(), create.engine)]), + ..Default::default() + }; + Ok(expr) +} + +fn find_primary_keys(constraints: &[TableConstraint]) -> Result> { + let primary_keys = constraints + .iter() + .filter_map(|constraint| match constraint { + TableConstraint::Unique { + name: _, + columns, + is_primary: true, + } => Some(columns.iter().map(|ident| ident.value.clone())), + _ => None, + }) + .flatten() + .collect::>(); + Ok(primary_keys) +} + +fn find_time_index(constraints: &[TableConstraint]) -> Result { + let time_index = constraints + .iter() + .filter_map(|constraint| match constraint { + TableConstraint::Unique { + name: Some(name), + columns, + is_primary: false, + } => { + if name.value == TIME_INDEX { + Some(columns.iter().map(|ident| &ident.value)) + } else { + None + } + } + _ => None, + }) + .flatten() + .collect::>(); + ensure!( + time_index.len() == 1, + error::InvalidSqlSnafu { + err_msg: "must have one and only one TimeIndex columns", + } + ); + Ok(time_index.first().unwrap().to_string()) +} + +fn columns_to_expr(column_defs: &[ColumnDef]) -> Result> { + let column_schemas = column_defs + .iter() + .map(|c| column_def_to_schema(c).context(error::ParseSqlSnafu)) + .collect::>>()?; + + let column_datatypes = column_schemas + .iter() + .map(|c| { + ColumnDataTypeWrapper::try_from(c.data_type.clone()) + .map(|w| w.datatype()) + .context(error::ColumnDataTypeSnafu) + }) + .collect::>>()?; + + Ok(column_schemas + .iter() + .zip(column_datatypes.into_iter()) + .map(|(schema, datatype)| GrpcColumnDef { + name: schema.name.clone(), + data_type: datatype as i32, + is_nullable: schema.is_nullable, + }) + .collect::>()) +} + +#[async_trait] +impl GrpcQueryHandler for Instance { + async fn do_query(&self, query: ObjectExpr) -> server_error::Result { + self.db + .object(query.clone()) + .await + .map_err(BoxedError::new) + .with_context(|_| server_error::ExecuteQuerySnafu { + query: format!("{:?}", query), + }) + } +} + +#[async_trait] +impl GrpcAdminHandler for Instance { + async fn exec_admin_request(&self, expr: AdminExpr) -> server_error::Result { + self.admin + .do_request(expr.clone()) + .await + .map_err(BoxedError::new) + .with_context(|_| server_error::ExecuteQuerySnafu { + query: format!("{:?}", expr), + }) + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use api::v1::codec::{InsertBatch, SelectResult}; + use api::v1::greptime_client::GreptimeClient; + use api::v1::{ + admin_expr, admin_result, column, object_expr, object_result, select_expr, Column, + ExprHeader, MutateResult, SelectExpr, + }; + use datafusion::arrow_print; + use datafusion_common::record_batch::RecordBatch as DfRecordBatch; + use datanode::datanode::{DatanodeOptions, ObjectStoreConfig}; + use datanode::instance::Instance as DatanodeInstance; + use servers::grpc::GrpcServer; + use tempdir::TempDir; + use tonic::transport::{Endpoint, Server}; + use tower::service_fn; + + use super::*; + + #[tokio::test] + async fn test_execute_sql() { + common_telemetry::init_default_ut_logging(); + + let datanode_instance = create_datanode_instance().await; + let frontend_instance = create_frontend_instance(datanode_instance).await; + + let sql = r#"CREATE TABLE demo( + host STRING, + ts TIMESTAMP, + cpu DOUBLE NULL, + memory DOUBLE NULL, + TIME INDEX (ts), + PRIMARY KEY(ts, host) + ) engine=mito with(regions=1);"#; + let output = SqlQueryHandler::do_query(&*frontend_instance, sql) + .await + .unwrap(); + match output { + Output::AffectedRows(rows) => assert_eq!(rows, 1), + _ => unreachable!(), + } + + let sql = r#"insert into demo(host, cpu, memory, ts) values + ('frontend.host1', 1.1, 100, 1000), + ('frontend.host2', null, null, 2000), + ('frontend.host3', 3.3, 300, 3000) + "#; + let output = SqlQueryHandler::do_query(&*frontend_instance, sql) + .await + .unwrap(); + match output { + Output::AffectedRows(rows) => assert_eq!(rows, 3), + _ => unreachable!(), + } + + let sql = "select * from demo"; + let output = SqlQueryHandler::do_query(&*frontend_instance, sql) + .await + .unwrap(); + match output { + Output::RecordBatches(recordbatches) => { + let recordbatches = recordbatches + .to_vec() + .into_iter() + .map(|r| r.df_recordbatch) + .collect::>(); + let pretty_print = arrow_print::write(&recordbatches); + let pretty_print = pretty_print.lines().collect::>(); + let expected = vec![ + "+----------------+---------------------+-----+--------+", + "| host | ts | cpu | memory |", + "+----------------+---------------------+-----+--------+", + "| frontend.host1 | 1970-01-01 00:00:01 | 1.1 | 100 |", + "| frontend.host2 | 1970-01-01 00:00:02 | | |", + "| frontend.host3 | 1970-01-01 00:00:03 | 3.3 | 300 |", + "+----------------+---------------------+-----+--------+", + ]; + assert_eq!(pretty_print, expected); + } + _ => unreachable!(), + }; + } + + #[tokio::test] + async fn test_execute_grpc() { + common_telemetry::init_default_ut_logging(); + + let datanode_instance = create_datanode_instance().await; + let frontend_instance = create_frontend_instance(datanode_instance).await; + + // testing data: + let expected_host_col = Column { + column_name: "host".to_string(), + values: Some(column::Values { + string_values: vec!["fe.host.a", "fe.host.b", "fe.host.c", "fe.host.d"] + .into_iter() + .map(|s| s.to_string()) + .collect(), + ..Default::default() + }), + datatype: 12, // string + ..Default::default() + }; + let expected_cpu_col = Column { + column_name: "cpu".to_string(), + values: Some(column::Values { + f64_values: vec![1.0, 3.0, 4.0], + ..Default::default() + }), + null_mask: vec![2], + datatype: 10, // float64 + ..Default::default() + }; + let expected_mem_col = Column { + column_name: "memory".to_string(), + values: Some(column::Values { + f64_values: vec![100.0, 200.0, 400.0], + ..Default::default() + }), + null_mask: vec![4], + datatype: 10, // float64 + ..Default::default() + }; + let expected_ts_col = Column { + column_name: "ts".to_string(), + values: Some(column::Values { + ts_millis_values: vec![1000, 2000, 3000, 4000], + ..Default::default() + }), + datatype: 15, // timestamp + ..Default::default() + }; + + // create + let create_expr = create_expr(); + let admin_expr = AdminExpr { + header: Some(ExprHeader::default()), + expr: Some(admin_expr::Expr::Create(create_expr)), + }; + let result = GrpcAdminHandler::exec_admin_request(&*frontend_instance, admin_expr) + .await + .unwrap(); + assert_matches!( + result.result, + Some(admin_result::Result::Mutate(MutateResult { + success: 1, + failure: 0 + })) + ); + + // insert + let values = vec![InsertBatch { + columns: vec![ + expected_host_col.clone(), + expected_cpu_col.clone(), + expected_mem_col.clone(), + expected_ts_col.clone(), + ], + row_count: 4, + } + .into()]; + let insert_expr = InsertExpr { + table_name: "demo".to_string(), + expr: Some(insert_expr::Expr::Values(insert_expr::Values { values })), + }; + let object_expr = ObjectExpr { + header: Some(ExprHeader::default()), + expr: Some(object_expr::Expr::Insert(insert_expr)), + }; + let result = GrpcQueryHandler::do_query(&*frontend_instance, object_expr) + .await + .unwrap(); + assert_matches!( + result.result, + Some(object_result::Result::Mutate(MutateResult { + success: 4, + failure: 0 + })) + ); + + // select + let object_expr = ObjectExpr { + header: Some(ExprHeader::default()), + expr: Some(object_expr::Expr::Select(SelectExpr { + expr: Some(select_expr::Expr::Sql("select * from demo".to_string())), + })), + }; + let result = GrpcQueryHandler::do_query(&*frontend_instance, object_expr) + .await + .unwrap(); + match result.result { + Some(object_result::Result::Select(select_result)) => { + let select_result: SelectResult = (*select_result.raw_data).try_into().unwrap(); + + assert_eq!(4, select_result.row_count); + let actual_columns = select_result.columns; + assert_eq!(4, actual_columns.len()); + + // Respect the order in create table schema + let expected_columns = vec![ + expected_host_col, + expected_cpu_col, + expected_mem_col, + expected_ts_col, + ]; + expected_columns + .iter() + .zip(actual_columns.iter()) + .for_each(|(x, y)| assert_eq!(x, y)); + } + _ => unreachable!(), + } + } + + async fn create_datanode_instance() -> Arc { + let wal_tmp_dir = TempDir::new("/tmp/greptimedb_test_wal").unwrap(); + let data_tmp_dir = TempDir::new("/tmp/greptimedb_test_data").unwrap(); + let opts = DatanodeOptions { + wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(), + storage: ObjectStoreConfig::File { + data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), + }, + ..Default::default() + }; + + let instance = Arc::new(DatanodeInstance::new(&opts).await.unwrap()); + instance.start().await.unwrap(); + instance + } + + async fn create_frontend_instance(datanode_instance: Arc) -> Arc { + let (client, server) = tokio::io::duplex(1024); + + // create a mock datanode grpc service, see example here: + // https://github.com/hyperium/tonic/blob/master/examples/src/mock/mock.rs + let datanode_service = + GrpcServer::new(datanode_instance.clone(), datanode_instance).create_service(); + tokio::spawn(async move { + Server::builder() + .add_service(datanode_service) + .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .await + }); + + // Move client to an option so we can _move_ the inner value + // on the first attempt to connect. All other attempts will fail. + let mut client = Some(client); + // "http://[::]:50051" is just a placeholder, does not actually connect to it, + // see https://github.com/hyperium/tonic/issues/727#issuecomment-881532934 + let channel = Endpoint::try_from("http://[::]:50051") + .unwrap() + .connect_with_connector(service_fn(move |_| { + let client = client.take(); + + async move { + if let Some(client) = client { + Ok(client) + } else { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Client already taken", + )) + } + } + })) + .await + .unwrap(); + let client = Client::with_client(GreptimeClient::new(channel)); + Arc::new(Instance::with_client(client)) + } + + fn create_expr() -> CreateExpr { + let column_defs = vec![ + GrpcColumnDef { + name: "host".to_string(), + data_type: 12, // string + is_nullable: false, + }, + GrpcColumnDef { + name: "cpu".to_string(), + data_type: 10, // float64 + is_nullable: true, + }, + GrpcColumnDef { + name: "memory".to_string(), + data_type: 10, // float64 + is_nullable: true, + }, + GrpcColumnDef { + name: "ts".to_string(), + data_type: 15, // timestamp + is_nullable: true, + }, + ]; + CreateExpr { + table_name: "demo".to_string(), + column_defs, + time_index: "ts".to_string(), + primary_keys: vec!["ts".to_string(), "host".to_string()], + ..Default::default() + } + } +} diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs new file mode 100644 index 0000000000..47e168e09f --- /dev/null +++ b/src/frontend/src/lib.rs @@ -0,0 +1,6 @@ +#![feature(assert_matches)] + +pub mod error; +pub mod frontend; +pub mod instance; +mod server; diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs new file mode 100644 index 0000000000..19d10a792a --- /dev/null +++ b/src/frontend/src/server.rs @@ -0,0 +1,80 @@ +use std::net::SocketAddr; +use std::sync::Arc; + +use common_runtime::Builder as RuntimeBuilder; +use servers::grpc::GrpcServer; +use servers::http::HttpServer; +use servers::mysql::server::MysqlServer; +use servers::server::Server; +use snafu::ResultExt; +use tokio::try_join; + +use crate::error::{self, Result}; +use crate::frontend::FrontendOptions; +use crate::instance::InstanceRef; + +pub(crate) struct Services; + +impl Services { + pub(crate) async fn start(opts: &FrontendOptions, instance: InstanceRef) -> Result<()> { + let http_server_and_addr = if let Some(http_addr) = &opts.http_addr { + let http_addr = parse_addr(http_addr)?; + + let http_server = HttpServer::new(instance.clone()); + + Some((Box::new(http_server) as _, http_addr)) + } else { + None + }; + + let grpc_server_and_addr = if let Some(grpc_addr) = &opts.grpc_addr { + let grpc_addr = parse_addr(grpc_addr)?; + + let grpc_server = GrpcServer::new(instance.clone(), instance.clone()); + + Some((Box::new(grpc_server) as _, grpc_addr)) + } else { + None + }; + + let mysql_server_and_addr = if let Some(mysql_addr) = &opts.mysql_addr { + let mysql_addr = parse_addr(mysql_addr)?; + + let mysql_io_runtime = Arc::new( + RuntimeBuilder::default() + .worker_threads(opts.mysql_runtime_size as usize) + .thread_name("mysql-io-handlers") + .build() + .context(error::RuntimeResourceSnafu)?, + ); + + let mysql_server = MysqlServer::create_server(instance.clone(), mysql_io_runtime); + + Some((mysql_server, mysql_addr)) + } else { + None + }; + + try_join!( + start_server(http_server_and_addr), + start_server(grpc_server_and_addr), + start_server(mysql_server_and_addr) + ) + .context(error::StartServerSnafu)?; + Ok(()) + } +} + +fn parse_addr(addr: &str) -> Result { + addr.parse().context(error::ParseAddrSnafu { addr }) +} + +async fn start_server( + server_and_addr: Option<(Box, SocketAddr)>, +) -> servers::error::Result> { + if let Some((mut server, addr)) = server_and_addr { + server.start(addr).await.map(Some) + } else { + Ok(None) + } +} diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 6e4248bed9..55454296b0 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -79,14 +79,14 @@ impl QueryEngine for DatafusionQueryEngine { let physical_plan = self.create_physical_plan(&mut ctx, &logical_plan).await?; let physical_plan = self.optimize_physical_plan(&mut ctx, physical_plan)?; - Ok(Output::RecordBatch( + Ok(Output::Stream( self.execute_stream(&ctx, &physical_plan).await?, )) } async fn execute_physical(&self, plan: &Arc) -> Result { let ctx = QueryContext::new(self.state.clone()); - Ok(Output::RecordBatch(self.execute_stream(&ctx, plan).await?)) + Ok(Output::Stream(self.execute_stream(&ctx, plan).await?)) } fn register_udf(&self, udf: ScalarUdf) { @@ -267,7 +267,7 @@ mod tests { let output = engine.execute(&plan).await.unwrap(); match output { - Output::RecordBatch(recordbatch) => { + Output::Stream(recordbatch) => { let numbers = util::collect(recordbatch).await.unwrap(); assert_eq!(1, numbers.len()); assert_eq!(numbers[0].df_recordbatch.num_columns(), 1); diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index 56acb830f0..e9e2ae2b9e 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -7,7 +7,7 @@ use catalog::CatalogList; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_function::scalars::{FunctionRef, FUNCTION_REGISTRY}; use common_query::prelude::ScalarUdf; -use common_recordbatch::SendableRecordBatchStream; +use common_recordbatch::{RecordBatches, SendableRecordBatchStream}; use sql::statements::statement::Statement; use crate::datafusion::DatafusionQueryEngine; @@ -19,7 +19,8 @@ pub use crate::query_engine::state::QueryEngineState; /// Sql output pub enum Output { AffectedRows(usize), - RecordBatch(SendableRecordBatchStream), + RecordBatches(RecordBatches), + Stream(SendableRecordBatchStream), } #[async_trait::async_trait] diff --git a/src/query/tests/argmax_test.rs b/src/query/tests/argmax_test.rs index 18bd990f6a..639176dbf6 100644 --- a/src/query/tests/argmax_test.rs +++ b/src/query/tests/argmax_test.rs @@ -86,7 +86,7 @@ async fn execute_argmax<'a>( let output = engine.execute(&plan).await.unwrap(); let recordbatch_stream = match output { - Output::RecordBatch(batch) => batch, + Output::Stream(batch) => batch, _ => unreachable!(), }; util::collect(recordbatch_stream).await diff --git a/src/query/tests/argmin_test.rs b/src/query/tests/argmin_test.rs index f5d0368f91..c6f60cf1d5 100644 --- a/src/query/tests/argmin_test.rs +++ b/src/query/tests/argmin_test.rs @@ -87,7 +87,7 @@ async fn execute_argmin<'a>( let output = engine.execute(&plan).await.unwrap(); let recordbatch_stream = match output { - Output::RecordBatch(batch) => batch, + Output::Stream(batch) => batch, _ => unreachable!(), }; util::collect(recordbatch_stream).await diff --git a/src/query/tests/function.rs b/src/query/tests/function.rs index 13b259e945..11353dfb9b 100644 --- a/src/query/tests/function.rs +++ b/src/query/tests/function.rs @@ -67,7 +67,7 @@ where let output = engine.execute(&plan).await.unwrap(); let recordbatch_stream = match output { - Output::RecordBatch(batch) => batch, + Output::Stream(batch) => batch, _ => unreachable!(), }; let numbers = util::collect(recordbatch_stream).await.unwrap(); diff --git a/src/query/tests/mean_test.rs b/src/query/tests/mean_test.rs index 4c40c8caed..6942b0d5dc 100644 --- a/src/query/tests/mean_test.rs +++ b/src/query/tests/mean_test.rs @@ -80,7 +80,7 @@ async fn execute_mean<'a>( let output = engine.execute(&plan).await.unwrap(); let recordbatch_stream = match output { - Output::RecordBatch(batch) => batch, + Output::Stream(batch) => batch, _ => unreachable!(), }; util::collect(recordbatch_stream).await diff --git a/src/query/tests/my_sum_udaf_example.rs b/src/query/tests/my_sum_udaf_example.rs index eb2144ae89..a97b46f8f8 100644 --- a/src/query/tests/my_sum_udaf_example.rs +++ b/src/query/tests/my_sum_udaf_example.rs @@ -222,7 +222,7 @@ where let output = engine.execute(&plan).await?; let recordbatch_stream = match output { - Output::RecordBatch(batch) => batch, + Output::Stream(batch) => batch, _ => unreachable!(), }; let recordbatch = util::collect(recordbatch_stream).await.unwrap(); diff --git a/src/query/tests/percentile_test.rs b/src/query/tests/percentile_test.rs index 7221be9ed1..a472ceea1a 100644 --- a/src/query/tests/percentile_test.rs +++ b/src/query/tests/percentile_test.rs @@ -48,7 +48,7 @@ async fn test_percentile_correctness() -> Result<()> { let output = engine.execute(&plan).await.unwrap(); let recordbatch_stream = match output { - Output::RecordBatch(batch) => batch, + Output::Stream(batch) => batch, _ => unreachable!(), }; let record_batch = util::collect(recordbatch_stream).await.unwrap(); @@ -108,7 +108,7 @@ async fn execute_percentile<'a>( let output = engine.execute(&plan).await.unwrap(); let recordbatch_stream = match output { - Output::RecordBatch(batch) => batch, + Output::Stream(batch) => batch, _ => unreachable!(), }; util::collect(recordbatch_stream).await diff --git a/src/query/tests/polyval_test.rs b/src/query/tests/polyval_test.rs index f7509938be..3c6b8463e4 100644 --- a/src/query/tests/polyval_test.rs +++ b/src/query/tests/polyval_test.rs @@ -83,7 +83,7 @@ async fn execute_polyval<'a>( let output = engine.execute(&plan).await.unwrap(); let recordbatch_stream = match output { - Output::RecordBatch(batch) => batch, + Output::Stream(batch) => batch, _ => unreachable!(), }; util::collect(recordbatch_stream).await diff --git a/src/query/tests/query_engine_test.rs b/src/query/tests/query_engine_test.rs index 6cfe698f09..fccfa8da86 100644 --- a/src/query/tests/query_engine_test.rs +++ b/src/query/tests/query_engine_test.rs @@ -63,7 +63,7 @@ async fn test_datafusion_query_engine() -> Result<()> { let output = engine.execute(&plan).await?; let recordbatch = match output { - Output::RecordBatch(recordbatch) => recordbatch, + Output::Stream(recordbatch) => recordbatch, _ => unreachable!(), }; @@ -121,7 +121,7 @@ async fn test_udf() -> Result<()> { let output = engine.execute(&plan).await?; let recordbatch = match output { - Output::RecordBatch(recordbatch) => recordbatch, + Output::Stream(recordbatch) => recordbatch, _ => unreachable!(), }; @@ -244,7 +244,7 @@ where let output = engine.execute(&plan).await.unwrap(); let recordbatch_stream = match output { - Output::RecordBatch(batch) => batch, + Output::Stream(batch) => batch, _ => unreachable!(), }; let numbers = util::collect(recordbatch_stream).await.unwrap(); @@ -349,7 +349,7 @@ async fn execute_median<'a>( let output = engine.execute(&plan).await.unwrap(); let recordbatch_stream = match output { - Output::RecordBatch(batch) => batch, + Output::Stream(batch) => batch, _ => unreachable!(), }; util::collect(recordbatch_stream).await diff --git a/src/query/tests/scipy_stats_norm_cdf_test.rs b/src/query/tests/scipy_stats_norm_cdf_test.rs index b5baf6c31f..4256777888 100644 --- a/src/query/tests/scipy_stats_norm_cdf_test.rs +++ b/src/query/tests/scipy_stats_norm_cdf_test.rs @@ -85,7 +85,7 @@ async fn execute_scipy_stats_norm_cdf<'a>( let output = engine.execute(&plan).await.unwrap(); let recordbatch_stream = match output { - Output::RecordBatch(batch) => batch, + Output::Stream(batch) => batch, _ => unreachable!(), }; util::collect(recordbatch_stream).await diff --git a/src/query/tests/scipy_stats_norm_pdf.rs b/src/query/tests/scipy_stats_norm_pdf.rs index 989bb87356..217ba9fa2a 100644 --- a/src/query/tests/scipy_stats_norm_pdf.rs +++ b/src/query/tests/scipy_stats_norm_pdf.rs @@ -85,7 +85,7 @@ async fn execute_scipy_stats_norm_pdf<'a>( let output = engine.execute(&plan).await.unwrap(); let recordbatch_stream = match output { - Output::RecordBatch(batch) => batch, + Output::Stream(batch) => batch, _ => unreachable!(), }; util::collect(recordbatch_stream).await diff --git a/src/script/src/python/engine.rs b/src/script/src/python/engine.rs index 2ef9d083ef..f170b6caea 100644 --- a/src/script/src/python/engine.rs +++ b/src/script/src/python/engine.rs @@ -88,9 +88,7 @@ impl Script for PyScript { let res = self.query_engine.execute(&plan).await?; let copr = self.copr.clone(); match res { - query::Output::RecordBatch(stream) => { - Ok(Output::RecordBatch(Box::pin(CoprStream { copr, stream }))) - } + Output::Stream(stream) => Ok(Output::Stream(Box::pin(CoprStream { copr, stream }))), _ => unreachable!(), } } else { @@ -178,7 +176,7 @@ def test(a, b, c): .unwrap(); let output = script.execute(EvalContext::default()).await.unwrap(); match output { - Output::RecordBatch(stream) => { + Output::Stream(stream) => { let numbers = util::collect(stream).await.unwrap(); assert_eq!(1, numbers.len()); @@ -209,7 +207,7 @@ def test(a): .unwrap(); let output = script.execute(EvalContext::default()).await.unwrap(); match output { - Output::RecordBatch(stream) => { + Output::Stream(stream) => { let numbers = util::collect(stream).await.unwrap(); assert_eq!(1, numbers.len()); diff --git a/src/script/src/table.rs b/src/script/src/table.rs index 34ef239fbc..d85f265c48 100644 --- a/src/script/src/table.rs +++ b/src/script/src/table.rs @@ -143,7 +143,7 @@ impl ScriptsTable { .await .context(FindScriptSnafu { name })? { - Output::RecordBatch(stream) => stream, + Output::Stream(stream) => stream, _ => unreachable!(), }; let records = record_util::collect(stream) diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 5e79c1aff7..da14cb60c3 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -65,6 +65,12 @@ pub enum Error { #[snafu(display("Not supported: {}", feat))] NotSupported { feat: String }, + + #[snafu(display("Invalid query: {}", reason))] + InvalidQuery { + reason: String, + backtrace: Backtrace, + }, } pub type Result = std::result::Result; @@ -86,7 +92,7 @@ impl ErrorExt for Error { | ExecuteScript { source, .. } | ExecuteQuery { source, .. } => source.status_code(), - NotSupported { .. } => StatusCode::InvalidArguments, + NotSupported { .. } | InvalidQuery { .. } => StatusCode::InvalidArguments, } } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 3b7964e43a..4c10e113d5 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -81,10 +81,13 @@ impl JsonResponse { Ok(Output::AffectedRows(rows)) => { Self::with_output(Some(JsonOutput::AffectedRows(rows))) } - Ok(Output::RecordBatch(stream)) => match util::collect(stream).await { + Ok(Output::Stream(stream)) => match util::collect(stream).await { Ok(rows) => Self::with_output(Some(JsonOutput::Rows(rows))), Err(e) => Self::with_error(Some(format!("Recordbatch error: {}", e))), }, + Ok(Output::RecordBatches(recordbatches)) => { + Self::with_output(Some(JsonOutput::Rows(recordbatches.to_vec()))) + } Err(e) => Self::with_error(Some(format!("Query engine output error: {}", e))), } } diff --git a/src/servers/src/mysql/writer.rs b/src/servers/src/mysql/writer.rs index deb4509258..cc800ae133 100644 --- a/src/servers/src/mysql/writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -34,7 +34,7 @@ impl<'a, W: io::Write> MysqlResultWriter<'a, W> { })?; match output { Ok(output) => match output { - Output::RecordBatch(stream) => { + Output::Stream(stream) => { let schema = stream.schema().clone(); let recordbatches = util::collect(stream) .await @@ -45,6 +45,13 @@ impl<'a, W: io::Write> MysqlResultWriter<'a, W> { }; Self::write_query_result(query_result, writer)? } + Output::RecordBatches(recordbatches) => { + let query_result = QueryResult { + schema: recordbatches.schema(), + recordbatches: recordbatches.to_vec(), + }; + Self::write_query_result(query_result, writer)? + } Output::AffectedRows(rows) => Self::write_affected_rows(writer, rows)?, }, Err(error) => Self::write_query_error(error, writer)?, diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index e6bff657cc..bbff85bd48 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] common-error = { path = "../common/error" } +datatypes = { path = "../datatypes" } snafu = { version = "0.7", features = ["backtraces"] } sqlparser = "0.15.0" table-engine = { path = "../table-engine" } diff --git a/src/sql/src/error.rs b/src/sql/src/error.rs index 9498452f1d..ea46e8c6a7 100644 --- a/src/sql/src/error.rs +++ b/src/sql/src/error.rs @@ -4,6 +4,8 @@ use common_error::prelude::*; use sqlparser::parser::ParserError; use sqlparser::tokenizer::TokenizerError; +pub type Result = std::result::Result; + /// SQL parser errors. // Now the error in parser does not contains backtrace to avoid generating backtrace // every time the parser parses an invalid SQL. @@ -39,6 +41,15 @@ pub enum Error { sql ))] InvalidTimeIndex { sql: String, backtrace: Backtrace }, + + #[snafu(display("Invalid SQL, error: {}", msg))] + InvalidSql { msg: String, backtrace: Backtrace }, + + #[snafu(display("SQL data type not supported yet: {:?}", t))] + SqlTypeNotSupported { + t: crate::ast::DataType, + backtrace: Backtrace, + }, } impl ErrorExt for Error { @@ -47,9 +58,12 @@ impl ErrorExt for Error { match self { Unsupported { .. } => StatusCode::Unsupported, - Unexpected { .. } | Syntax { .. } | InvalidTimeIndex { .. } | Tokenizer { .. } => { - StatusCode::InvalidSyntax - } + Unexpected { .. } + | Syntax { .. } + | InvalidTimeIndex { .. } + | Tokenizer { .. } + | InvalidSql { .. } + | SqlTypeNotSupported { .. } => StatusCode::InvalidSyntax, } } @@ -68,7 +82,7 @@ mod tests { use super::*; - fn throw_sp_error() -> Result<(), ParserError> { + fn throw_sp_error() -> std::result::Result<(), ParserError> { Err(ParserError::ParserError("parser error".to_string())) } diff --git a/src/sql/src/parser.rs b/src/sql/src/parser.rs index c2ecb0a431..50fea70c40 100644 --- a/src/sql/src/parser.rs +++ b/src/sql/src/parser.rs @@ -5,13 +5,11 @@ use sqlparser::parser::Parser; use sqlparser::parser::ParserError; use sqlparser::tokenizer::{Token, Tokenizer}; -use crate::error::{self, Error, SyntaxSnafu, TokenizerSnafu}; +use crate::error::{self, Result, SyntaxSnafu, TokenizerSnafu}; use crate::statements::show_database::SqlShowDatabase; use crate::statements::show_kind::ShowKind; use crate::statements::statement::Statement; -pub type Result = std::result::Result; - /// GrepTime SQL parser context, a simple wrapper for Datafusion SQL parser. pub struct ParserContext<'a> { pub(crate) parser: Parser<'a>, diff --git a/src/sql/src/parsers/alter_parser.rs b/src/sql/src/parsers/alter_parser.rs index ec23c20388..283dc2eea3 100644 --- a/src/sql/src/parsers/alter_parser.rs +++ b/src/sql/src/parsers/alter_parser.rs @@ -2,9 +2,8 @@ use snafu::ResultExt; use sqlparser::keywords::Keyword; use sqlparser::parser::ParserError; -use crate::error; +use crate::error::{self, Result}; use crate::parser::ParserContext; -use crate::parser::Result; use crate::statements::alter::{AlterTable, AlterTableOperation}; use crate::statements::statement::Statement; diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index 6bd893b96f..56daa3712e 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -5,10 +5,8 @@ use sqlparser::{dialect::keywords::Keyword, tokenizer::Token}; use table_engine::engine; use crate::ast::{ColumnDef, Ident, TableConstraint}; -use crate::error; -use crate::error::{InvalidTimeIndexSnafu, SyntaxSnafu}; +use crate::error::{self, InvalidTimeIndexSnafu, Result, SyntaxSnafu}; use crate::parser::ParserContext; -use crate::parser::Result; use crate::statements::create_table::{CreateTable, TIME_INDEX}; use crate::statements::statement::Statement; diff --git a/src/sql/src/parsers/insert_parser.rs b/src/sql/src/parsers/insert_parser.rs index 61fd33c456..6d6800d836 100644 --- a/src/sql/src/parsers/insert_parser.rs +++ b/src/sql/src/parsers/insert_parser.rs @@ -1,9 +1,8 @@ use snafu::ResultExt; use sqlparser::ast::Statement as SpStatement; -use crate::error; +use crate::error::{self, Result}; use crate::parser::ParserContext; -use crate::parser::Result; use crate::statements::insert::Insert; use crate::statements::statement::Statement; diff --git a/src/sql/src/parsers/query_parser.rs b/src/sql/src/parsers/query_parser.rs index 9f055fc51b..603d603272 100644 --- a/src/sql/src/parsers/query_parser.rs +++ b/src/sql/src/parsers/query_parser.rs @@ -1,8 +1,7 @@ use snafu::prelude::*; -use crate::error; +use crate::error::{self, Result}; use crate::parser::ParserContext; -use crate::parser::Result; use crate::statements::query::Query; use crate::statements::statement::Statement; diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index bdd8b1b317..72c06cc684 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -5,3 +5,129 @@ pub mod query; pub mod show_database; pub mod show_kind; pub mod statement; + +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::ColumnSchema; +use datatypes::types::DateTimeType; + +use crate::ast::{ColumnDef, ColumnOption, DataType as SqlDataType, ObjectName}; +use crate::error::{self, Result}; + +/// Converts maybe fully-qualified table name (`..
` or `
` when +/// catalog and schema are default) to tuple. +pub fn table_idents_to_full_name( + obj_name: &ObjectName, +) -> Result<(Option, Option, String)> { + match &obj_name.0[..] { + [table] => Ok((None, None, table.value.clone())), + [catalog, schema, table] => Ok(( + Some(catalog.value.clone()), + Some(schema.value.clone()), + table.value.clone(), + )), + _ => error::InvalidSqlSnafu { + msg: format!( + "expect table name to be ..
or
, actual: {}", + obj_name + ), + } + .fail(), + } +} + +pub fn column_def_to_schema(column_def: &ColumnDef) -> Result { + let is_nullable = column_def + .options + .iter() + .any(|o| matches!(o.option, ColumnOption::Null)); + Ok(ColumnSchema { + name: column_def.name.value.clone(), + data_type: sql_data_type_to_concrete_data_type(&column_def.data_type)?, + is_nullable, + }) +} + +fn sql_data_type_to_concrete_data_type(data_type: &SqlDataType) -> Result { + match data_type { + SqlDataType::BigInt(_) => Ok(ConcreteDataType::int64_datatype()), + SqlDataType::Int(_) => Ok(ConcreteDataType::int32_datatype()), + SqlDataType::SmallInt(_) => Ok(ConcreteDataType::int16_datatype()), + SqlDataType::Char(_) + | SqlDataType::Varchar(_) + | SqlDataType::Text + | SqlDataType::String => Ok(ConcreteDataType::string_datatype()), + SqlDataType::Float(_) => Ok(ConcreteDataType::float32_datatype()), + SqlDataType::Double => Ok(ConcreteDataType::float64_datatype()), + SqlDataType::Boolean => Ok(ConcreteDataType::boolean_datatype()), + SqlDataType::Date => Ok(ConcreteDataType::date_datatype()), + SqlDataType::Custom(obj_name) => match &obj_name.0[..] { + [type_name] => { + if type_name.value.eq_ignore_ascii_case(DateTimeType::name()) { + Ok(ConcreteDataType::datetime_datatype()) + } else { + error::SqlTypeNotSupportedSnafu { + t: data_type.clone(), + } + .fail() + } + } + _ => error::SqlTypeNotSupportedSnafu { + t: data_type.clone(), + } + .fail(), + }, + SqlDataType::Timestamp => Ok(ConcreteDataType::timestamp_millis_datatype()), + _ => error::SqlTypeNotSupportedSnafu { + t: data_type.clone(), + } + .fail(), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::ast::Ident; + + fn check_type(sql_type: SqlDataType, data_type: ConcreteDataType) { + assert_eq!( + data_type, + sql_data_type_to_concrete_data_type(&sql_type).unwrap() + ); + } + + #[test] + pub fn test_sql_data_type_to_concrete_data_type() { + check_type( + SqlDataType::BigInt(None), + ConcreteDataType::int64_datatype(), + ); + check_type(SqlDataType::Int(None), ConcreteDataType::int32_datatype()); + check_type( + SqlDataType::SmallInt(None), + ConcreteDataType::int16_datatype(), + ); + check_type(SqlDataType::Char(None), ConcreteDataType::string_datatype()); + check_type( + SqlDataType::Varchar(None), + ConcreteDataType::string_datatype(), + ); + check_type(SqlDataType::Text, ConcreteDataType::string_datatype()); + check_type(SqlDataType::String, ConcreteDataType::string_datatype()); + check_type( + SqlDataType::Float(None), + ConcreteDataType::float32_datatype(), + ); + check_type(SqlDataType::Double, ConcreteDataType::float64_datatype()); + check_type(SqlDataType::Boolean, ConcreteDataType::boolean_datatype()); + check_type(SqlDataType::Date, ConcreteDataType::date_datatype()); + check_type( + SqlDataType::Custom(ObjectName(vec![Ident::new("datetime")])), + ConcreteDataType::datetime_datatype(), + ); + check_type( + SqlDataType::Timestamp, + ConcreteDataType::timestamp_millis_datatype(), + ); + } +} diff --git a/test-util/Cargo.toml b/test-util/Cargo.toml index 49fe6ee31c..48fb447e48 100644 --- a/test-util/Cargo.toml +++ b/test-util/Cargo.toml @@ -11,9 +11,9 @@ features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc [dependencies] async-trait = "0.1" common-query = { path = "../src/common/query" } -common-recordbatch = {path = "../src/common/recordbatch" } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]} -datatypes = {path = "../src/datatypes" } +common-recordbatch = { path = "../src/common/recordbatch" } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"] } +datatypes = { path = "../src/datatypes" } futures = "0.3" snafu = { version = "0.7", features = ["backtraces"] } table = { path = "../src/table" }