From f0db878bf5500d277d01093f8f0778d0d004043a Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Fri, 6 May 2022 17:34:29 +0800 Subject: [PATCH] feat: http server and cmd crate etc. (#15) * feat: adds cmd crate and http server * feat: impl sql http handler * feat: convert all arrow array types * feat: adds query test * feat: adds test for datanode * fix: format * feat: refactor state.rs * feat: adds collect test * fix: by code review * fix: style --- Cargo.lock | 657 +++++++++++++++++- Cargo.toml | 1 + src/cmd/Cargo.toml | 13 + src/cmd/src/bin/greptime.rs | 20 + src/cmd/src/lib.rs | 1 + src/cmd/src/opts.rs | 15 + src/common/recordbatch/Cargo.toml | 14 +- src/common/recordbatch/src/lib.rs | 1 + src/common/recordbatch/src/recordbatch.rs | 69 +- src/common/recordbatch/src/util.rs | 90 +++ src/datanode/Cargo.toml | 16 + src/datanode/src/datanode.rs | 33 + src/datanode/src/error.rs | 11 +- src/datanode/src/instance.rs | 73 ++ src/datanode/src/lib.rs | 22 +- src/datanode/src/rpc.rs | 10 - src/datanode/src/server.rs | 24 + src/datanode/src/server/grpc.rs | 1 + .../src/{ => server/grpc}/processors.rs | 0 src/datanode/src/server/http.rs | 122 ++++ src/datanode/src/server/http/handler.rs | 75 ++ src/datatypes/Cargo.toml | 6 +- src/datatypes/src/data_type.rs | 2 +- src/datatypes/src/lib.rs | 2 +- src/datatypes/src/schema.rs | 4 +- src/datatypes/src/types/binary_type.rs | 2 +- src/datatypes/src/types/primitive_traits.rs | 2 +- src/datatypes/src/types/primitive_type.rs | 2 +- src/datatypes/src/vectors.rs | 2 +- src/datatypes/src/vectors/binary.rs | 6 +- src/datatypes/src/vectors/primitive.rs | 4 +- src/query/Cargo.toml | 4 +- src/query/src/catalog.rs | 6 + src/query/src/catalog/memory.rs | 129 +++- src/query/src/catalog/schema.rs | 3 + src/query/src/error.rs | 20 +- src/query/src/lib.rs | 4 + src/query/src/plan.rs | 2 +- src/query/src/planner.rs | 71 +- src/query/src/query_engine.rs | 40 +- src/query/src/query_engine/datafusion.rs | 100 ++- src/query/src/query_engine/state.rs | 111 ++- src/query/tests/query_engine_test.rs | 28 +- src/sql/src/dialect.rs | 2 + src/table/Cargo.toml | 2 +- src/table/src/table.rs | 6 +- src/table/src/table/numbers.rs | 1 + 47 files changed, 1648 insertions(+), 181 deletions(-) create mode 100644 src/cmd/Cargo.toml create mode 100644 src/cmd/src/bin/greptime.rs create mode 100644 src/cmd/src/lib.rs create mode 100644 src/cmd/src/opts.rs create mode 100644 src/common/recordbatch/src/util.rs create mode 100644 src/datanode/src/datanode.rs create mode 100644 src/datanode/src/instance.rs delete mode 100644 src/datanode/src/rpc.rs create mode 100644 src/datanode/src/server.rs create mode 100644 src/datanode/src/server/grpc.rs rename src/datanode/src/{ => server/grpc}/processors.rs (100%) create mode 100644 src/datanode/src/server/http.rs create mode 100644 src/datanode/src/server/http/handler.rs diff --git a/Cargo.lock b/Cargo.lock index c24be666b5..81af3bfe4e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -96,12 +96,27 @@ dependencies = [ "parquet2", "regex", "serde", + "serde_derive", "serde_json", "simdutf8", "streaming-iterator", "strength_reduce", ] +[[package]] +name = "async-compression" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2bf394cfbbe876f0ac67b13b6ca819f9c9f2fb9ec67223cceb1555fbab1c31a" +dependencies = [ + "brotli", + "flate2", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-stream" version = "0.3.3" @@ -134,12 +149,80 @@ dependencies = [ "syn", ] +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi", +] + [[package]] name = "autocfg" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4af7447fc1214c1f3a1ace861d0216a6c8bb13965b64bbad9650f375b67689a" +dependencies = [ + "async-trait", + "axum-core", + "bitflags", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa 1.0.1", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-http", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bdc19781b16e32f8a7200368a336fa4509d4b72ef15dd4e41df5290855ee1e6" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", +] + +[[package]] +name = "axum-macros" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63bcb0d395bc5dd286e61aada9fc48201eb70e232f006f9d6c330c9db2f256f5" +dependencies = [ + "heck 0.4.0", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "base64" version = "0.13.0" @@ -252,6 +335,12 @@ version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" +[[package]] +name = "bytes" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" + [[package]] name = "cc" version = "1.0.73" @@ -281,6 +370,54 @@ dependencies = [ "winapi", ] +[[package]] +name = "clap" +version = "3.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c167e37342afc5f33fd87bbc870cedd020d2a6dffa05d45ccd9241fbdd146db" +dependencies = [ + "atty", + "bitflags", + "clap_derive", + "clap_lex", + "indexmap", + "lazy_static", + "strsim", + "termcolor", + "textwrap", +] + +[[package]] +name = "clap_derive" +version = "3.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3aab4734e083b809aaf5794e14e756d1c798d2c69c7f7de7a09a2f5214993c1" +dependencies = [ + "heck 0.4.0", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "189ddd3b5d32a70b35e7686054371742a937b0d99128e76dde6340210e966669" +dependencies = [ + "os_str_bytes", +] + +[[package]] +name = "cmd" +version = "0.1.0" +dependencies = [ + "clap", + "datanode", + "tokio", +] + [[package]] name = "comfy-table" version = "5.0.1" @@ -312,7 +449,10 @@ dependencies = [ "datafusion-common", "datatypes", "futures", + "paste", + "serde", "snafu", + "tokio", ] [[package]] @@ -339,6 +479,26 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aaa7bd5fb665c6864b5f963dd9097905c54125909c7aa94c9e18507cdbe6c53" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf124c720b7686e3c2663cf54062ab0f68a88af2fb6a030e87e30bf721fcb38" +dependencies = [ + "cfg-if", + "lazy_static", +] + [[package]] name = "crunchy" version = "0.2.2" @@ -457,7 +617,19 @@ dependencies = [ name = "datanode" version = "0.1.0" dependencies = [ + "arrow2", + "axum", + "axum-macros", + "common-recordbatch", + "hyper", + "query", + "serde", + "serde_json", "snafu", + "table", + "tokio", + "tower", + "tower-http", ] [[package]] @@ -520,6 +692,22 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + +[[package]] +name = "form_urlencoded" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fc25a87fa4fd2094bffb06925852034d90a17f0d1e05197d4956d3555752191" +dependencies = [ + "matches", + "percent-encoding", +] + [[package]] name = "futures" version = "0.3.21" @@ -627,7 +815,26 @@ checksum = "9be70c98951c83b8d2f8f60d7065fa6d5146873094452a1008da8c2f1e4205ad" dependencies = [ "cfg-if", "libc", - "wasi", + "wasi 0.10.2+wasi-snapshot-preview1", +] + +[[package]] +name = "h2" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37a82c6d637fc9515a4694bbf1cb2457b79d81ce52b3108bdeea58b07dd34a57" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", ] [[package]] @@ -651,6 +858,20 @@ dependencies = [ "ahash", ] +[[package]] +name = "hdrhistogram" +version = "7.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31672b7011be2c4f7456c4ddbcb40e7e9a4a9fad8efe49a6ebaf5f307d0109c0" +dependencies = [ + "base64", + "byteorder", + "crossbeam-channel", + "flate2", + "nom", + "num-traits", +] + [[package]] name = "heck" version = "0.3.3" @@ -660,6 +881,12 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "heck" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -669,6 +896,70 @@ dependencies = [ "libc", ] +[[package]] +name = "http" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31f4c6746584866f0feabcc69893c5b51beef3831656a968ed7ae254cdc4fd03" +dependencies = [ + "bytes", + "fnv", + "itoa 1.0.1", +] + +[[package]] +name = "http-body" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6" +dependencies = [ + "bytes", + "http", + "pin-project-lite", +] + +[[package]] +name = "http-range-header" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" + +[[package]] +name = "httparse" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "496ce29bb5a52785b44e0f7ca2847ae0bb839c9bd28f69acac9b99d461c0c04c" + +[[package]] +name = "httpdate" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" + +[[package]] +name = "hyper" +version = "0.14.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b26ae0a80afebe130861d90abf98e3814a4f28a4c6ffeb5ab8ebb2be311e0ef2" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "httparse", + "httpdate", + "itoa 1.0.1", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + [[package]] name = "indexmap" version = "1.8.1" @@ -698,6 +989,15 @@ dependencies = [ "futures-util", ] +[[package]] +name = "iri-string" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f0f7638c1e223529f1bfdc48c8b133b9e0b434094d1d28473161ee48b235f78" +dependencies = [ + "nom", +] + [[package]] name = "itertools" version = "0.10.3" @@ -851,6 +1151,18 @@ dependencies = [ "libc", ] +[[package]] +name = "matches" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" + +[[package]] +name = "matchit" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb" + [[package]] name = "md-5" version = "0.10.1" @@ -866,6 +1178,28 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" +[[package]] +name = "mime" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" + +[[package]] +name = "mime_guess" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" +dependencies = [ + "mime", + "unicase", +] + +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.5.1" @@ -875,6 +1209,29 @@ dependencies = [ "adler", ] +[[package]] +name = "mio" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52da4364ffb0e4fe33a9841a98a3f3014fb964045ce4f7a45a398243c8d6b0c9" +dependencies = [ + "libc", + "log", + "miow", + "ntapi", + "wasi 0.11.0+wasi-snapshot-preview1", + "winapi", +] + +[[package]] +name = "miow" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" +dependencies = [ + "winapi", +] + [[package]] name = "multiversion" version = "0.6.1" @@ -895,6 +1252,25 @@ dependencies = [ "syn", ] +[[package]] +name = "nom" +version = "7.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8903e5a29a317527874d0402f867152a3d21c908bb0b933e416c65e301d4c36" +dependencies = [ + "memchr", + "minimal-lexical", +] + +[[package]] +name = "ntapi" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28774a7fd2fbb4f0babd8237ce554b73af68021b5f695a3cebd6c59bac0980f" +dependencies = [ + "winapi", +] + [[package]] name = "num-integer" version = "0.1.44" @@ -952,6 +1328,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "os_str_bytes" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e22443d1643a904602595ba1cd8f7d896afe56d26712531c5ff73a15b2fbf64" + [[package]] name = "parking_lot" version = "0.12.0" @@ -1012,6 +1394,32 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c520e05135d6e763148b6426a837e239041653ba7becd2e538c076c738025fc" +[[package]] +name = "percent-encoding" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" + +[[package]] +name = "pin-project" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58ad3879ad3baf4e44784bc6a718a8698867bb991f8ce24d1bcbe2cfb4c3a75e" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "744b6f092ba29c3650faf274db506afd39944f48420f6c86b17cfe0ee1cb36bb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.8" @@ -1039,6 +1447,30 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + [[package]] name = "proc-macro2" version = "1.0.37" @@ -1196,6 +1628,18 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa 1.0.1", + "ryu", + "serde", +] + [[package]] name = "sha2" version = "0.10.2" @@ -1207,6 +1651,15 @@ dependencies = [ "digest", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +dependencies = [ + "libc", +] + [[package]] name = "simdutf8" version = "0.1.4" @@ -1241,7 +1694,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a7fe9b0669ef117c5cabc5549638528f36771f058ff977d7689deb517833a75" dependencies = [ - "heck", + "heck 0.3.3", "proc-macro2", "quote", "syn", @@ -1253,6 +1706,16 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "45456094d1983e2ee2a18fdfebce3189fa451699d0502cb8e3b49dba5ba41451" +[[package]] +name = "socket2" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "sql" version = "0.1.0" @@ -1297,6 +1760,12 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3ff2f71c82567c565ba4b3009a9350a96a7269eaa4001ebedae926230bc2254" +[[package]] +name = "strsim" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" + [[package]] name = "strum" version = "0.23.0" @@ -1309,7 +1778,7 @@ version = "0.23.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5bb0dc7ee9c15cea6199cde9a127fa16a4c5819af85395457ad72d68edc85a38" dependencies = [ - "heck", + "heck 0.3.3", "proc-macro2", "quote", "rustversion", @@ -1333,6 +1802,12 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "sync_wrapper" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8" + [[package]] name = "table" version = "0.1.0" @@ -1364,6 +1839,21 @@ dependencies = [ "winapi", ] +[[package]] +name = "termcolor" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "textwrap" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb" + [[package]] name = "time" version = "0.1.43" @@ -1376,14 +1866,22 @@ dependencies = [ [[package]] name = "tokio" -version = "1.17.0" +version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee" +checksum = "0f48b6d60512a392e34dbf7fd456249fd2de3c83669ab642e021903f4015185b" dependencies = [ + "bytes", + "libc", + "memchr", + "mio", "num_cpus", + "once_cell", "parking_lot", "pin-project-lite", + "signal-hook-registry", + "socket2", "tokio-macros", + "winapi", ] [[package]] @@ -1408,12 +1906,136 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-util" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0edfdeb067411dba2044da6d1cb2df793dd35add7888d73c16e3381ded401764" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", + "tracing", +] + +[[package]] +name = "tower" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a89fd63ad6adf737582df5db40d286574513c69a11dac5214dc3b5603d6713e" +dependencies = [ + "futures-core", + "futures-util", + "hdrhistogram", + "indexmap", + "pin-project", + "pin-project-lite", + "rand", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-http" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e980386f06883cf4d0578d6c9178c81f68b45d77d00f2c2c1bc034b3439c2c56" +dependencies = [ + "async-compression", + "base64", + "bitflags", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-range-header", + "httpdate", + "iri-string", + "mime", + "mime_guess", + "percent-encoding", + "pin-project-lite", + "tokio", + "tokio-util", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62" + +[[package]] +name = "tower-service" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" + +[[package]] +name = "tracing" +version = "0.1.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d0ecdcb44a79f0fe9844f0c4f33a342cbcbb5117de8001e6ba0dc2351327d09" +dependencies = [ + "cfg-if", + "log", + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc6b8ad3567499f98a1db7a752b07a7c8c7c7c34c332ec00effb2b0027974b7c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f54c8ca710e81886d498c2fd3331b56c93aa248d49de2222ad2742247c60072f" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "try-lock" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" + [[package]] name = "typenum" version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987" +[[package]] +name = "unicase" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-segmentation" version = "1.9.0" @@ -1438,12 +2060,28 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "want" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" +dependencies = [ + "log", + "try-lock", +] + [[package]] name = "wasi" version = "0.10.2+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + [[package]] name = "winapi" version = "0.3.9" @@ -1460,6 +2098,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index e6de5cea81..daed9802d6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ members = [ "src/common/base", "src/common/query", "src/common/recordbatch", + "src/cmd", "src/datanode", "src/datatypes", "src/log-store", diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml new file mode 100644 index 0000000000..506feb02ee --- /dev/null +++ b/src/cmd/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "cmd" +version = "0.1.0" +edition = "2021" + +[[bin]] +name = "greptime" +path = "src/bin/greptime.rs" + +[dependencies] +clap = { version = "3.1", features = ["derive"] } +datanode = { path = "../datanode" } +tokio = { version = "1.18.0", features = ["full"] } diff --git a/src/cmd/src/bin/greptime.rs b/src/cmd/src/bin/greptime.rs new file mode 100644 index 0000000000..4a9a0b6829 --- /dev/null +++ b/src/cmd/src/bin/greptime.rs @@ -0,0 +1,20 @@ +use clap::Parser; +use cmd::opts::{GrepTimeOpts, NodeType}; +use datanode::DataNode; + +async fn datanode_main(_opts: &GrepTimeOpts) { + let data_node = DataNode::new().unwrap(); + + if let Err(e) = data_node.start().await { + println!("Fail to start data node, error: {:?}", e); + } +} + +#[tokio::main] +async fn main() { + let opts = GrepTimeOpts::parse(); + + match opts.node_type { + NodeType::Data => datanode_main(&opts).await, + } +} diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs new file mode 100644 index 0000000000..9d1c99982e --- /dev/null +++ b/src/cmd/src/lib.rs @@ -0,0 +1 @@ +pub mod opts; diff --git a/src/cmd/src/opts.rs b/src/cmd/src/opts.rs new file mode 100644 index 0000000000..9fc3f80148 --- /dev/null +++ b/src/cmd/src/opts.rs @@ -0,0 +1,15 @@ +//! greptime commandline options +use clap::{ArgEnum, Parser}; + +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ArgEnum)] +pub enum NodeType { + /// Data node + Data, +} + +#[derive(Parser, Debug)] +#[clap(author, version, about, long_about = None)] +pub struct GrepTimeOpts { + #[clap(name = "type", short, long, arg_enum)] + pub node_type: NodeType, +} diff --git a/src/common/recordbatch/Cargo.toml b/src/common/recordbatch/Cargo.toml index ace80dfbcd..d9e2570a45 100644 --- a/src/common/recordbatch/Cargo.toml +++ b/src/common/recordbatch/Cargo.toml @@ -6,11 +6,21 @@ edition = "2021" [dependencies.arrow] package = "arrow2" version="0.10" -features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute"] +features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"] [dependencies] datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]} datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2"} datatypes = {path ="../../datatypes" } futures = "0.3" -snafu = "0.7.0" +paste = "1.0" +serde = "1.0" +snafu = "0.7" + +[dev-dependencies.arrow] +package = "arrow2" +version="0.10" +features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"] + +[dev-dependencies] +tokio = { version = "1.18", features = ["full"] } diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs index d785487e13..f3ae0209ff 100644 --- a/src/common/recordbatch/src/lib.rs +++ b/src/common/recordbatch/src/lib.rs @@ -1,5 +1,6 @@ pub mod error; mod recordbatch; +pub mod util; use std::pin::Pin; diff --git a/src/common/recordbatch/src/recordbatch.rs b/src/common/recordbatch/src/recordbatch.rs index 1f48247785..02ed0ef2da 100644 --- a/src/common/recordbatch/src/recordbatch.rs +++ b/src/common/recordbatch/src/recordbatch.rs @@ -1,10 +1,77 @@ use std::sync::Arc; +use arrow::array::{ + BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, + UInt16Array, UInt32Array, UInt64Array, UInt8Array, Utf8Array, +}; +use arrow::datatypes::DataType; use datafusion_common::record_batch::RecordBatch as DfRecordBatch; use datatypes::schema::Schema; +use paste::paste; +use serde::ser::SerializeStruct; +use serde::{Serialize, Serializer}; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct RecordBatch { pub schema: Arc, pub df_recordbatch: DfRecordBatch, } + +macro_rules! collect_columns { + ($array: ident, $columns: ident, $($data_type: expr), +) => { + paste! { + match $array.data_type() { + $(DataType::$data_type => { + if let Some(array) = $array.as_any().downcast_ref::<[<$data_type Array>]>() { + $columns.push(Column::$data_type(array.values().as_slice())); + } + })+, + DataType::Utf8 => { + if let Some(array) = $array.as_any().downcast_ref::>() { + $columns.push(Column::Utf8(array.values().as_slice())); + } + }, + _ => unimplemented!(), + } + } + }; +} + +#[derive(Serialize)] +enum Column<'a> { + Int64(&'a [i64]), + Int32(&'a [i32]), + Int16(&'a [i16]), + Int8(&'a [i8]), + UInt64(&'a [u64]), + UInt32(&'a [u32]), + UInt16(&'a [u16]), + UInt8(&'a [u8]), + Float64(&'a [f64]), + Float32(&'a [f32]), + Boolean((&'a [u8], usize, usize)), + Utf8(&'a [u8]), +} + +/// TODO(dennis): should be implemented in datatypes +impl Serialize for RecordBatch { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut s = serializer.serialize_struct("record", 2)?; + s.serialize_field("schema", &self.schema.arrow_schema())?; + + let df_columns = self.df_recordbatch.columns(); + let mut columns: Vec = Vec::with_capacity(df_columns.len()); + for array in df_columns { + collect_columns!( + array, columns, Int64, Int32, Int16, Int8, UInt64, UInt32, UInt16, UInt8, Float64, + Float32, Boolean + ); + } + s.serialize_field("columns", &columns)?; + + s.end() + } +} diff --git a/src/common/recordbatch/src/util.rs b/src/common/recordbatch/src/util.rs new file mode 100644 index 0000000000..f53309daa8 --- /dev/null +++ b/src/common/recordbatch/src/util.rs @@ -0,0 +1,90 @@ +use futures::TryStreamExt; + +use crate::{error::Result, RecordBatch, SendableRecordBatchStream}; + +pub async fn collect(stream: SendableRecordBatchStream) -> Result> { + stream.try_collect::>().await +} + +#[cfg(test)] +mod tests { + use std::mem; + use std::pin::Pin; + use std::sync::Arc; + + use arrow::array::UInt32Array; + use arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; + use datafusion_common::field_util::SchemaExt; + use datafusion_common::record_batch::RecordBatch as DfRecordBatch; + use datatypes::schema::Schema; + use datatypes::schema::SchemaRef; + use futures::task::{Context, Poll}; + use futures::Stream; + + use super::*; + use crate::RecordBatchStream; + + struct MockRecordBatchStream { + batch: Option, + schema: SchemaRef, + } + + impl RecordBatchStream for MockRecordBatchStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + } + + impl Stream for MockRecordBatchStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + let batch = mem::replace(&mut self.batch, None); + + if let Some(batch) = batch { + Poll::Ready(Some(Ok(batch))) + } else { + Poll::Ready(None) + } + } + } + + #[tokio::test] + async fn test_collect() { + let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new( + "number", + DataType::UInt32, + false, + )])); + let schema = Arc::new(Schema::new(arrow_schema.clone())); + + let stream = MockRecordBatchStream { + schema: schema.clone(), + batch: None, + }; + + let batches = collect(Box::pin(stream)).await.unwrap(); + assert_eq!(0, batches.len()); + + let numbers: Vec = (0..10).collect(); + let df_batch = DfRecordBatch::try_new( + arrow_schema.clone(), + vec![Arc::new(UInt32Array::from_slice(&numbers))], + ) + .unwrap(); + + let batch = RecordBatch { + schema: schema.clone(), + df_recordbatch: df_batch, + }; + + let stream = MockRecordBatchStream { + schema: Arc::new(Schema::new(arrow_schema)), + batch: Some(batch.clone()), + }; + let batches = collect(Box::pin(stream)).await.unwrap(); + assert_eq!(1, batches.len()); + + assert_eq!(batch, batches[0]); + } +} diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index ec6162f968..ebf3b8aa02 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -6,4 +6,20 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +axum = "0.5" +axum-macros = "0.2" +common-recordbatch = {path = "../common/recordbatch" } +hyper = { version = "0.14", features = ["full"] } +query = { path = "../query" } +serde = "1.0" +serde_json = "1.0" snafu = "0.7" +table = { path = "../table" } +tokio = { version = "1.18", features = ["full"] } +tower = { version = "0.4", features = ["full"]} +tower-http = { version ="0.3", features = ["full"]} + +[dev-dependencies.arrow] +package = "arrow2" +version="0.10" +features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"] diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs new file mode 100644 index 0000000000..454db221be --- /dev/null +++ b/src/datanode/src/datanode.rs @@ -0,0 +1,33 @@ +use std::sync::Arc; + +use query::catalog::memory; +use query::catalog::CatalogListRef; +use snafu::ResultExt; + +use crate::error::{QuerySnafu, Result}; +use crate::instance::{Instance, InstanceRef}; +use crate::server::Services; + +/// DataNode service. +pub struct DataNode { + services: Services, + _catalog_list: CatalogListRef, + _instance: InstanceRef, +} + +impl DataNode { + pub fn new() -> Result { + let catalog_list = memory::new_memory_catalog_list().context(QuerySnafu)?; + let instance = Arc::new(Instance::new(catalog_list.clone())); + + Ok(Self { + services: Services::new(instance.clone()), + _catalog_list: catalog_list, + _instance: instance, + }) + } + + pub async fn start(&self) -> Result<()> { + self.services.start().await + } +} diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index d7815b56db..04cf8ab34c 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -1,8 +1,15 @@ +use hyper::Error as HyperError; +use query::error::Error as QueryError; use snafu::Snafu; /// business error of datanode. #[derive(Debug, Snafu)] -#[snafu(display("DataNode error"))] -pub struct Error; +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Query error: {}", source))] + Query { source: QueryError }, + #[snafu(display("Http error: {}", source))] + Hyper { source: HyperError }, +} pub type Result = std::result::Result; diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs new file mode 100644 index 0000000000..6f04239c6d --- /dev/null +++ b/src/datanode/src/instance.rs @@ -0,0 +1,73 @@ +use std::sync::Arc; + +use query::catalog::CatalogListRef; +use query::query_engine::{Output, QueryEngineFactory, QueryEngineRef}; +use snafu::ResultExt; + +use crate::error::{QuerySnafu, Result}; + +// An abstraction to read/write services. +pub struct Instance { + // Query service + query_engine: QueryEngineRef, + // Catalog list + _catalog_list: CatalogListRef, +} + +pub type InstanceRef = Arc; + +impl Instance { + pub fn new(catalog_list: CatalogListRef) -> Self { + let factory = QueryEngineFactory::new(catalog_list.clone()); + let query_engine = factory.query_engine().clone(); + Self { + query_engine, + _catalog_list: catalog_list, + } + } + + pub async fn execute_sql(&self, sql: &str) -> Result { + let logical_plan = self.query_engine.sql_to_plan(sql).context(QuerySnafu)?; + + self.query_engine + .execute(&logical_plan) + .await + .context(QuerySnafu) + } +} + +#[cfg(test)] +mod tests { + use arrow::array::UInt64Array; + use common_recordbatch::util; + use query::catalog::memory; + + use super::*; + + #[tokio::test] + async fn test_execute_sql() { + let catalog_list = memory::new_memory_catalog_list().unwrap(); + + let instance = Instance::new(catalog_list); + + let output = instance + .execute_sql("select sum(number) from numbers limit 20") + .await + .unwrap(); + + match output { + Output::RecordBatch(recordbatch) => { + let numbers = util::collect(recordbatch).await.unwrap(); + let columns = numbers[0].df_recordbatch.columns(); + assert_eq!(1, columns.len()); + assert_eq!(columns[0].len(), 1); + + assert_eq!( + *columns[0].as_any().downcast_ref::().unwrap(), + UInt64Array::from_slice(&[4950]) + ); + } + _ => unreachable!(), + } + } +} diff --git a/src/datanode/src/lib.rs b/src/datanode/src/lib.rs index f12076ea06..29ae25b068 100644 --- a/src/datanode/src/lib.rs +++ b/src/datanode/src/lib.rs @@ -1,21 +1,7 @@ mod catalog; +pub mod datanode; mod error; -mod processors; -mod rpc; +mod instance; +mod server; -use crate::error::Result; -use crate::rpc::Services; - -/// DataNode service. -pub struct DataNode { - services: Services, -} - -impl DataNode { - /// Shutdown the datanode service gracefully. - pub async fn shutdown(&self) -> Result<()> { - self.services.shutdown().await?; - - unimplemented!() - } -} +pub use crate::datanode::DataNode; diff --git a/src/datanode/src/rpc.rs b/src/datanode/src/rpc.rs deleted file mode 100644 index ef06c5b171..0000000000 --- a/src/datanode/src/rpc.rs +++ /dev/null @@ -1,10 +0,0 @@ -use crate::error::Result; - -/// All rpc services. -pub struct Services {} - -impl Services { - pub async fn shutdown(&self) -> Result<()> { - unimplemented!() - } -} diff --git a/src/datanode/src/server.rs b/src/datanode/src/server.rs new file mode 100644 index 0000000000..57104d5c9c --- /dev/null +++ b/src/datanode/src/server.rs @@ -0,0 +1,24 @@ +mod grpc; +mod http; + +use http::HttpServer; + +use crate::error::Result; +use crate::instance::InstanceRef; + +/// All rpc services. +pub struct Services { + http_server: HttpServer, +} + +impl Services { + pub fn new(instance: InstanceRef) -> Self { + Self { + http_server: HttpServer::new(instance), + } + } + + pub async fn start(&self) -> Result<()> { + self.http_server.start().await + } +} diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs new file mode 100644 index 0000000000..1e0df3b152 --- /dev/null +++ b/src/datanode/src/server/grpc.rs @@ -0,0 +1 @@ +mod processors; diff --git a/src/datanode/src/processors.rs b/src/datanode/src/server/grpc/processors.rs similarity index 100% rename from src/datanode/src/processors.rs rename to src/datanode/src/server/grpc/processors.rs diff --git a/src/datanode/src/server/http.rs b/src/datanode/src/server/http.rs new file mode 100644 index 0000000000..a1f22c57ef --- /dev/null +++ b/src/datanode/src/server/http.rs @@ -0,0 +1,122 @@ +mod handler; + +use std::net::SocketAddr; +use std::time::Duration; + +use axum::{ + error_handling::HandleErrorLayer, + response::IntoResponse, + response::{Json, Response}, + routing::get, + BoxError, Extension, Router, +}; +use common_recordbatch::{util, RecordBatch}; +use query::Output; +use serde::Serialize; +use snafu::ResultExt; +use tower::{timeout::TimeoutLayer, ServiceBuilder}; +use tower_http::trace::TraceLayer; + +use crate::error::{HyperSnafu, Result}; +use crate::server::InstanceRef; + +/// Http server +pub struct HttpServer { + instance: InstanceRef, +} + +#[derive(Serialize)] +pub enum JsonOutput { + AffectedRows(usize), + Rows(Vec), +} + +#[derive(Serialize)] +pub struct JsonResponse { + success: bool, + #[serde(skip_serializing_if = "Option::is_none")] + error: Option, + #[serde(skip_serializing_if = "Option::is_none")] + output: Option, +} + +impl IntoResponse for JsonResponse { + fn into_response(self) -> Response { + Json(self).into_response() + } +} + +impl JsonResponse { + fn with_error(error: Option) -> Self { + JsonResponse { + success: false, + error, + output: None, + } + } + fn with_output(output: Option) -> Self { + JsonResponse { + success: true, + error: None, + output, + } + } + + /// Create a json response from query result + async fn from_output(output: Result) -> Self { + match output { + Ok(Output::AffectedRows(rows)) => { + Self::with_output(Some(JsonOutput::AffectedRows(rows))) + } + Ok(Output::RecordBatch(stream)) => match util::collect(stream).await { + Ok(rows) => Self::with_output(Some(JsonOutput::Rows(rows))), + Err(e) => Self::with_error(Some(format!("Recordbatch error: {}", e))), + }, + Err(e) => Self::with_error(Some(format!("Query engine output error: {}", e))), + } + } +} + +async fn shutdown_signal() { + // Wait for the CTRL+C signal + // It has an issue on chrome: https://github.com/sigp/lighthouse/issues/478 + tokio::signal::ctrl_c() + .await + .expect("failed to install CTRL+C signal handler"); +} + +impl HttpServer { + pub fn new(instance: InstanceRef) -> Self { + Self { instance } + } + + pub async fn start(&self) -> Result<()> { + let app = Router::new().route("/sql", get(handler::sql)).layer( + ServiceBuilder::new() + .layer(HandleErrorLayer::new(handle_error)) + .layer(TraceLayer::new_for_http()) + .layer(Extension(self.instance.clone())) + // TODO configure timeout + .layer(TimeoutLayer::new(Duration::from_secs(30))), + ); + + let addr = SocketAddr::from(([0, 0, 0, 0], 3000)); + // TODO(dennis): log + println!("Datanode HTTP server is listening on {}", addr); + let server = axum::Server::bind(&addr).serve(app.into_make_service()); + let graceful = server.with_graceful_shutdown(shutdown_signal()); + + graceful.await.context(HyperSnafu)?; + + Ok(()) + } +} + +/// handle error middleware +async fn handle_error(err: BoxError) -> Json { + Json(JsonResponse { + success: false, + error: Some(format!("Unhandled internal error: {}", err)), + output: None, + }) +} diff --git a/src/datanode/src/server/http/handler.rs b/src/datanode/src/server/http/handler.rs new file mode 100644 index 0000000000..5980e0a75a --- /dev/null +++ b/src/datanode/src/server/http/handler.rs @@ -0,0 +1,75 @@ +// http handlers + +use std::collections::HashMap; + +use axum::extract::{Extension, Query}; + +use crate::instance::InstanceRef; +use crate::server::http::JsonResponse; + +/// Handler to execute sql +#[axum_macros::debug_handler] +pub async fn sql( + Extension(instance): Extension, + Query(params): Query>, +) -> JsonResponse { + if let Some(sql) = params.get("sql") { + JsonResponse::from_output(instance.execute_sql(sql).await).await + } else { + JsonResponse::with_error(Some("sql parameter is required.".to_string())) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use query::catalog::memory; + + use super::*; + use crate::instance::Instance; + use crate::server::http::JsonOutput; + + fn create_params() -> Query> { + let mut map = HashMap::new(); + map.insert( + "sql".to_string(), + "select sum(number) from numbers limit 20".to_string(), + ); + Query(map) + } + + fn create_extension() -> Extension { + let catalog_list = memory::new_memory_catalog_list().unwrap(); + let instance = Arc::new(Instance::new(catalog_list)); + Extension(instance) + } + + #[tokio::test] + async fn test_sql_not_provided() { + let extension = create_extension(); + + let json = sql(extension, Query(HashMap::default())).await; + assert!(!json.success); + assert_eq!(Some("sql parameter is required.".to_string()), json.error); + assert!(json.output.is_none()); + } + + #[tokio::test] + async fn test_sql_output_rows() { + let query = create_params(); + let extension = create_extension(); + + let json = sql(extension, query).await; + assert!(json.success); + assert!(json.error.is_none()); + assert!(json.output.is_some()); + + match json.output.unwrap() { + JsonOutput::Rows(rows) => { + assert_eq!(1, rows.len()); + } + _ => unreachable!(), + } + } +} diff --git a/src/datatypes/Cargo.toml b/src/datatypes/Cargo.toml index c38c0205e8..e751d0f76e 100644 --- a/src/datatypes/Cargo.toml +++ b/src/datatypes/Cargo.toml @@ -3,8 +3,12 @@ name = "datatypes" version = "0.1.0" edition = "2021" +[dependencies.arrow] +package = "arrow2" +version="0.10" +features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"] + [dependencies] -arrow2 = "0.10" common-base = { path = "../common/base" } paste = "1.0" serde ={ version = "1.0.136", features = ["derive"] } diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index 77174a43a3..211c03a40a 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use arrow2::datatypes::DataType as ArrowDataType; +use arrow::datatypes::DataType as ArrowDataType; use crate::type_id::LogicalTypeId; use crate::value::Value; diff --git a/src/datatypes/src/lib.rs b/src/datatypes/src/lib.rs index 645b6d75d1..29e20efea5 100644 --- a/src/datatypes/src/lib.rs +++ b/src/datatypes/src/lib.rs @@ -8,7 +8,7 @@ mod types; pub mod value; pub mod vectors; -use arrow2::array::{BinaryArray, MutableBinaryArray}; +use arrow::array::{BinaryArray, MutableBinaryArray}; pub type LargeBinaryArray = BinaryArray; pub type MutableLargeBinaryArray = MutableBinaryArray; diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index 61e2179c99..d0556a426a 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -1,8 +1,8 @@ use std::sync::Arc; -use arrow2::datatypes::Schema as ArrowSchema; +use arrow::datatypes::Schema as ArrowSchema; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Eq, PartialEq)] pub struct Schema { arrow_schema: Arc, } diff --git a/src/datatypes/src/types/binary_type.rs b/src/datatypes/src/types/binary_type.rs index 18f5c86813..d099f98ee1 100644 --- a/src/datatypes/src/types/binary_type.rs +++ b/src/datatypes/src/types/binary_type.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use arrow2::datatypes::DataType as ArrowDataType; +use arrow::datatypes::DataType as ArrowDataType; use common_base::bytes::StringBytes; use crate::data_type::{DataType, DataTypeRef}; diff --git a/src/datatypes/src/types/primitive_traits.rs b/src/datatypes/src/types/primitive_traits.rs index 6402c7a727..9fce0efe88 100644 --- a/src/datatypes/src/types/primitive_traits.rs +++ b/src/datatypes/src/types/primitive_traits.rs @@ -1,4 +1,4 @@ -use arrow2::types::NativeType; +use arrow::types::NativeType; use crate::value::Value; diff --git a/src/datatypes/src/types/primitive_type.rs b/src/datatypes/src/types/primitive_type.rs index 4bba31e857..e3b8a0d83c 100644 --- a/src/datatypes/src/types/primitive_type.rs +++ b/src/datatypes/src/types/primitive_type.rs @@ -1,7 +1,7 @@ use std::marker::PhantomData; use std::sync::Arc; -use arrow2::datatypes::DataType as ArrowDataType; +use arrow::datatypes::DataType as ArrowDataType; use crate::data_type::{DataType, DataTypeRef}; use crate::type_id::LogicalTypeId; diff --git a/src/datatypes/src/vectors.rs b/src/datatypes/src/vectors.rs index 1ea36c32a2..efdfc66591 100644 --- a/src/datatypes/src/vectors.rs +++ b/src/datatypes/src/vectors.rs @@ -4,7 +4,7 @@ pub mod primitive; use std::any::Any; use std::sync::Arc; -use arrow2::array::ArrayRef; +use arrow::array::ArrayRef; pub use binary::*; pub use primitive::*; diff --git a/src/datatypes/src/vectors/binary.rs b/src/datatypes/src/vectors/binary.rs index 60b49dd827..b37e8754fb 100644 --- a/src/datatypes/src/vectors/binary.rs +++ b/src/datatypes/src/vectors/binary.rs @@ -1,9 +1,9 @@ use std::any::Any; use std::sync::Arc; -use arrow2::array::ArrayRef; -use arrow2::array::BinaryValueIter; -use arrow2::bitmap::utils::ZipValidity; +use arrow::array::ArrayRef; +use arrow::array::BinaryValueIter; +use arrow::bitmap::utils::ZipValidity; use crate::data_type::DataTypeRef; use crate::scalars::{ScalarVector, ScalarVectorBuilder}; diff --git a/src/datatypes/src/vectors/primitive.rs b/src/datatypes/src/vectors/primitive.rs index 95d1266843..a5243c56af 100644 --- a/src/datatypes/src/vectors/primitive.rs +++ b/src/datatypes/src/vectors/primitive.rs @@ -2,8 +2,8 @@ use std::any::Any; use std::slice::Iter; use std::sync::Arc; -use arrow2::array::{ArrayRef, MutablePrimitiveArray, PrimitiveArray}; -use arrow2::bitmap::utils::ZipValidity; +use arrow::array::{ArrayRef, MutablePrimitiveArray, PrimitiveArray}; +use arrow::bitmap::utils::ZipValidity; use crate::data_type::DataTypeRef; use crate::scalars::{ScalarVector, ScalarVectorBuilder}; diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 9bac7f20f3..131af06ab0 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" [dependencies.arrow] package = "arrow2" version="0.10" -features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute"] +features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"] [dependencies] async-trait = "0.1" @@ -21,5 +21,5 @@ tokio = "1.0" sql = { path = "../sql" } [dev-dependencies] -tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] } +tokio = { version = "1.0", features = ["full"] } tokio-stream = "0.1" diff --git a/src/query/src/catalog.rs b/src/query/src/catalog.rs index b6c802ad07..5c0a845ab6 100644 --- a/src/query/src/catalog.rs +++ b/src/query/src/catalog.rs @@ -38,3 +38,9 @@ pub trait CatalogProvider: Sync + Send { /// Retrieves a specific schema from the catalog by name, provided it exists. fn schema(&self, name: &str) -> Option>; } + +pub type CatalogListRef = Arc; +pub type CatalogProviderRef = Arc; + +pub const DEFAULT_CATALOG_NAME: &str = "greptime"; +pub const DEFAULT_SCHEMA_NAME: &str = "public"; diff --git a/src/query/src/catalog/memory.rs b/src/query/src/catalog/memory.rs index cd75224b3b..13418ecb67 100644 --- a/src/query/src/catalog/memory.rs +++ b/src/query/src/catalog/memory.rs @@ -3,14 +3,21 @@ use std::collections::HashMap; use std::sync::Arc; use std::sync::RwLock; +use table::table::numbers::NumbersTable; +use table::TableRef; + use crate::catalog::schema::SchemaProvider; -use crate::catalog::{CatalogList, CatalogProvider}; +use crate::catalog::{ + CatalogList, CatalogListRef, CatalogProvider, CatalogProviderRef, DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, +}; +use crate::error::{ExecutionSnafu, Result}; /// Simple in-memory list of catalogs #[derive(Default)] pub struct MemoryCatalogList { - /// Collection of catalogs containing schemas and ultimately TableProviders - pub catalogs: RwLock>>, + /// Collection of catalogs containing schemas and ultimately Tables + pub catalogs: RwLock>, } impl CatalogList for MemoryCatalogList { @@ -21,8 +28,8 @@ impl CatalogList for MemoryCatalogList { fn register_catalog( &self, name: String, - catalog: Arc, - ) -> Option> { + catalog: CatalogProviderRef, + ) -> Option { let mut catalogs = self.catalogs.write().unwrap(); catalogs.insert(name, catalog) } @@ -32,7 +39,7 @@ impl CatalogList for MemoryCatalogList { catalogs.keys().map(|s| s.to_string()).collect() } - fn catalog(&self, name: &str) -> Option> { + fn catalog(&self, name: &str) -> Option { let catalogs = self.catalogs.read().unwrap(); catalogs.get(name).cloned() } @@ -82,3 +89,113 @@ impl CatalogProvider for MemoryCatalogProvider { schemas.get(name).cloned() } } + +/// Simple in-memory implementation of a schema. +pub struct MemorySchemaProvider { + tables: RwLock>, +} + +impl MemorySchemaProvider { + /// Instantiates a new MemorySchemaProvider with an empty collection of tables. + pub fn new() -> Self { + Self { + tables: RwLock::new(HashMap::new()), + } + } +} + +impl Default for MemorySchemaProvider { + fn default() -> Self { + Self::new() + } +} + +impl SchemaProvider for MemorySchemaProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + let tables = self.tables.read().unwrap(); + tables.keys().cloned().collect() + } + + fn table(&self, name: &str) -> Option { + let tables = self.tables.read().unwrap(); + tables.get(name).cloned() + } + + fn register_table(&self, name: String, table: TableRef) -> Result> { + if self.table_exist(name.as_str()) { + return ExecutionSnafu { + message: format!("The table {} already exists", name), + } + .fail(); + } + let mut tables = self.tables.write().unwrap(); + Ok(tables.insert(name, table)) + } + + fn deregister_table(&self, name: &str) -> Result> { + let mut tables = self.tables.write().unwrap(); + Ok(tables.remove(name)) + } + + fn table_exist(&self, name: &str) -> bool { + let tables = self.tables.read().unwrap(); + tables.contains_key(name) + } +} + +/// Create a memory catalog list contains a numbers table for test +pub fn new_memory_catalog_list() -> Result { + let schema_provider = Arc::new(MemorySchemaProvider::new()); + let catalog_provider = Arc::new(MemoryCatalogProvider::new()); + let catalog_list = Arc::new(MemoryCatalogList::default()); + + // Add numbers table for test + let table = Arc::new(NumbersTable::default()); + schema_provider.register_table("numbers".to_string(), table)?; + catalog_provider.register_schema(DEFAULT_SCHEMA_NAME, schema_provider); + catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider); + + Ok(catalog_list) +} + +#[cfg(test)] +mod tests { + use table::table::numbers::NumbersTable; + + use super::*; + + #[test] + fn test_new_memory_catalog_list() { + let catalog_list = new_memory_catalog_list().unwrap(); + + let catalog_provider = catalog_list.catalog(DEFAULT_CATALOG_NAME).unwrap(); + let schema_provider = catalog_provider.schema(DEFAULT_SCHEMA_NAME).unwrap(); + + let table = schema_provider.table("numbers"); + assert!(table.is_some()); + + assert!(schema_provider.table("not_exists").is_none()); + } + + #[tokio::test] + async fn test_mem_provider() { + let provider = MemorySchemaProvider::new(); + let table_name = "numbers"; + assert!(!provider.table_exist(table_name)); + assert!(provider.deregister_table(table_name).unwrap().is_none()); + let test_table = NumbersTable::default(); + // register table successfully + assert!(provider + .register_table(table_name.to_string(), Arc::new(test_table)) + .unwrap() + .is_none()); + assert!(provider.table_exist(table_name)); + let other_table = NumbersTable::default(); + let result = provider.register_table(table_name.to_string(), Arc::new(other_table)); + assert!(result.is_err()); + } +} diff --git a/src/query/src/catalog/schema.rs b/src/query/src/catalog/schema.rs index ec168d878d..bbdfaad3b1 100644 --- a/src/query/src/catalog/schema.rs +++ b/src/query/src/catalog/schema.rs @@ -1,4 +1,5 @@ use std::any::Any; +use std::sync::Arc; use table::TableRef; @@ -33,3 +34,5 @@ pub trait SchemaProvider: Sync + Send { /// Otherwise, return true. fn table_exist(&self, name: &str) -> bool; } + +pub type SchemaProviderRef = Arc; diff --git a/src/query/src/error.rs b/src/query/src/error.rs index e25b224dc6..3d240d1ac4 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -1,6 +1,7 @@ use common_recordbatch::error::Error as RecordBatchError; use datafusion::error::DataFusionError; use snafu::Snafu; +use sql::errors::ParserError; /// business error of query engine #[derive(Debug, Snafu)] @@ -12,6 +13,15 @@ pub enum Error { PhysicalPlanDowncast, #[snafu(display("RecordBatch error: {}", source))] RecordBatch { source: RecordBatchError }, + #[snafu(display("Execution error: {}", message))] + Execution { message: String }, + #[snafu(display("Cannot parse SQL: {}, source: {}", sql, source))] + ParseSql { sql: String, source: ParserError }, + #[snafu(display("Cannot plan SQL: {}, source: {}", sql, source))] + Planner { + sql: String, + source: DataFusionError, + }, } pub type Result = std::result::Result; @@ -21,13 +31,3 @@ impl From for DataFusionError { DataFusionError::External(Box::new(e)) } } - -#[derive(Debug, Snafu)] -#[snafu(visibility(pub))] -pub enum PlannerError { - #[snafu(display("Cannot plan SQL: {}, source: {}", sql, source))] - DfPlan { - sql: String, - source: DataFusionError, - }, -} diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index a9fc6cee1e..c065b8857e 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -8,3 +8,7 @@ pub mod physical_planner; pub mod plan; pub mod planner; pub mod query_engine; + +pub use crate::query_engine::{ + Output, QueryContext, QueryEngine, QueryEngineFactory, QueryEngineRef, +}; diff --git a/src/query/src/plan.rs b/src/query/src/plan.rs index 25c3418ec1..9a1b98a945 100644 --- a/src/query/src/plan.rs +++ b/src/query/src/plan.rs @@ -16,7 +16,7 @@ use crate::executor::Runtime; /// an output relation (table) with a (potentially) different /// schema. A plan represents a dataflow tree where data flows /// from leaves up to the root to produce the query result. -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum LogicalPlan { DfPlan(DfLogicalPlan), } diff --git a/src/query/src/planner.rs b/src/query/src/planner.rs index e276fe3da7..ee2645bb3a 100644 --- a/src/query/src/planner.rs +++ b/src/query/src/planner.rs @@ -1,23 +1,31 @@ +use std::sync::Arc; + +use arrow::datatypes::DataType; +use datafusion::catalog::TableReference; +use datafusion::datasource::TableProvider; +use datafusion::physical_plan::udaf::AggregateUDF; +use datafusion::physical_plan::udf::ScalarUDF; use datafusion::sql::planner::{ContextProvider, SqlToRel}; use snafu::ResultExt; use sql::statements::query::Query; use sql::statements::statement::Statement; +use table::table::adapter::DfTableProviderAdapter; -use crate::error; -use crate::error::PlannerError; -use crate::plan::LogicalPlan; +use crate::{ + catalog::{CatalogListRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}, + error::{PlannerSnafu, Result}, + plan::LogicalPlan, +}; -pub trait Planner { +pub trait Planner: Send + Sync { fn statement_to_plan(&self, statement: Statement) -> Result; } -type Result = std::result::Result; - pub struct DfPlanner<'a, S: ContextProvider> { sql_to_rel: SqlToRel<'a, S>, } -impl<'a, S: ContextProvider> DfPlanner<'a, S> { +impl<'a, S: ContextProvider + Send + Sync> DfPlanner<'a, S> { /// Creates a DataFusion planner instance pub fn new(schema_provider: &'a S) -> Self { let rel = SqlToRel::new(schema_provider); @@ -31,7 +39,7 @@ impl<'a, S: ContextProvider> DfPlanner<'a, S> { let result = self .sql_to_rel .query_to_plan(query.inner) - .context(error::DfPlanSnafu { sql })?; + .context(PlannerSnafu { sql })?; Ok(LogicalPlan::DfPlan(result)) } @@ -39,7 +47,7 @@ impl<'a, S: ContextProvider> DfPlanner<'a, S> { impl<'a, S> Planner for DfPlanner<'a, S> where - S: ContextProvider, + S: ContextProvider + Send + Sync, { /// Converts statement to logical plan using datafusion planner fn statement_to_plan(&self, statement: Statement) -> Result { @@ -54,3 +62,48 @@ where } } } + +pub(crate) struct DfContextProviderAdapter<'a> { + catalog_list: &'a CatalogListRef, +} + +impl<'a> DfContextProviderAdapter<'a> { + pub(crate) fn new(catalog_list: &'a CatalogListRef) -> Self { + Self { catalog_list } + } +} + +impl<'a> ContextProvider for DfContextProviderAdapter<'a> { + fn get_table_provider(&self, name: TableReference) -> Option> { + let (catalog, schema, table) = match name { + TableReference::Bare { table } => (DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table), + TableReference::Partial { schema, table } => (DEFAULT_CATALOG_NAME, schema, table), + TableReference::Full { + catalog, + schema, + table, + } => (catalog, schema, table), + }; + + self.catalog_list + .catalog(catalog) + .and_then(|catalog_provider| catalog_provider.schema(schema)) + .and_then(|schema_provider| schema_provider.table(table)) + .map(|table| Arc::new(DfTableProviderAdapter::new(table)) as _) + } + + fn get_function_meta(&self, _name: &str) -> Option> { + // TODO(dennis) + None + } + + fn get_aggregate_meta(&self, _name: &str) -> Option> { + // TODO(dennis) + None + } + + fn get_variable_type(&self, _variable_names: &[String]) -> Option { + // TODO(dennis) + None + } +} diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index 6746af567d..ac274ef563 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -1,22 +1,28 @@ +mod context; +mod datafusion; +mod state; + use std::sync::Arc; use common_recordbatch::SendableRecordBatchStream; +pub use context::QueryContext; use crate::catalog::CatalogList; use crate::error::Result; use crate::plan::LogicalPlan; - -mod context; -mod datafusion; -mod state; -pub use context::QueryContext; - use crate::query_engine::datafusion::DatafusionQueryEngine; +/// Sql output +pub enum Output { + AffectedRows(usize), + RecordBatch(SendableRecordBatchStream), +} + #[async_trait::async_trait] -pub trait QueryEngine { +pub trait QueryEngine: Send + Sync { fn name(&self) -> &str; - async fn execute(&self, plan: &LogicalPlan) -> Result; + fn sql_to_plan(&self, sql: &str) -> Result; + async fn execute(&self, plan: &LogicalPlan) -> Result; } pub struct QueryEngineFactory { @@ -36,3 +42,21 @@ impl QueryEngineFactory { &self.query_engine } } + +pub type QueryEngineRef = Arc; + +#[cfg(test)] +mod tests { + use super::*; + use crate::catalog::memory; + + #[test] + fn test_query_engine_factory() { + let catalog_list = memory::new_memory_catalog_list().unwrap(); + let factory = QueryEngineFactory::new(catalog_list); + + let engine = factory.query_engine(); + + assert_eq!("datafusion", engine.name()); + } +} diff --git a/src/query/src/query_engine/datafusion.rs b/src/query/src/query_engine/datafusion.rs index 2b223f0c03..043585a89d 100644 --- a/src/query/src/query_engine/datafusion.rs +++ b/src/query/src/query_engine/datafusion.rs @@ -1,30 +1,33 @@ +mod adapter; + use std::sync::Arc; use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream}; use snafu::{OptionExt, ResultExt}; +use sql::{dialect::GenericDialect, parser::ParserContext}; use super::{context::QueryContext, state::QueryEngineState}; use crate::{ - catalog::CatalogList, - error::{self, Result}, + catalog::CatalogListRef, + error::{self, ParseSqlSnafu, Result}, executor::QueryExecutor, logical_optimizer::LogicalOptimizer, physical_optimizer::PhysicalOptimizer, physical_planner::PhysicalPlanner, plan::{LogicalPlan, PhysicalPlan}, + planner::{DfContextProviderAdapter, DfPlanner, Planner}, query_engine::datafusion::adapter::PhysicalPlanAdapter, - query_engine::QueryEngine, + query_engine::{Output, QueryEngine}, }; -mod adapter; pub(crate) struct DatafusionQueryEngine { state: QueryEngineState, } impl DatafusionQueryEngine { - pub fn new(catalog_list: Arc) -> Self { + pub fn new(catalog_list: CatalogListRef) -> Self { Self { - state: QueryEngineState::new(catalog_list), + state: QueryEngineState::new(catalog_list.clone()), } } } @@ -34,13 +37,28 @@ impl QueryEngine for DatafusionQueryEngine { fn name(&self) -> &str { "datafusion" } - async fn execute(&self, plan: &LogicalPlan) -> Result { + + fn sql_to_plan(&self, sql: &str) -> Result { + let context_provider = DfContextProviderAdapter::new(self.state.catalog_list()); + let planner = DfPlanner::new(&context_provider); + let mut statement = ParserContext::create_with_dialect(sql, &GenericDialect {}) + .with_context(|_| ParseSqlSnafu { + sql: sql.to_string(), + })?; + // TODO(dennis): supports multi statement in one sql? + assert!(1 == statement.len()); + planner.statement_to_plan(statement.remove(0)) + } + + async fn execute(&self, plan: &LogicalPlan) -> Result { let mut ctx = QueryContext::new(self.state.clone()); let logical_plan = self.optimize_logical_plan(&mut ctx, plan)?; let physical_plan = self.create_physical_plan(&mut ctx, &logical_plan).await?; let physical_plan = self.optimize_physical_plan(&mut ctx, physical_plan)?; - Ok(self.execute_stream(&ctx, &physical_plan).await?) + Ok(Output::RecordBatch( + self.execute_stream(&ctx, &physical_plan).await?, + )) } } @@ -130,3 +148,69 @@ impl QueryExecutor for DatafusionQueryEngine { } } } + +#[cfg(test)] +mod tests { + use arrow::array::UInt64Array; + use common_recordbatch::util; + use datafusion::field_util::FieldExt; + use datafusion::field_util::SchemaExt; + + use crate::catalog::memory; + use crate::query_engine::{Output, QueryEngineFactory, QueryEngineRef}; + + fn create_test_engine() -> QueryEngineRef { + let catalog_list = memory::new_memory_catalog_list().unwrap(); + let factory = QueryEngineFactory::new(catalog_list); + factory.query_engine().clone() + } + + #[test] + fn test_sql_to_plan() { + let engine = create_test_engine(); + let sql = "select sum(number) from numbers limit 20"; + + let plan = engine.sql_to_plan(sql).unwrap(); + + println!("{:?}", plan); + assert_eq!( + format!("{:?}", plan), + r#"DfPlan(Limit: 20 + Projection: #SUM(numbers.number) + Aggregate: groupBy=[[]], aggr=[[SUM(#numbers.number)]] + TableScan: numbers projection=None)"# + ); + } + + #[tokio::test] + async fn test_execute() { + let engine = create_test_engine(); + let sql = "select sum(number) from numbers limit 20"; + + let plan = engine.sql_to_plan(sql).unwrap(); + let output = engine.execute(&plan).await.unwrap(); + + match output { + Output::RecordBatch(recordbatch) => { + let numbers = util::collect(recordbatch).await.unwrap(); + assert_eq!(1, numbers.len()); + assert_eq!(numbers[0].df_recordbatch.num_columns(), 1); + assert_eq!(1, numbers[0].schema.arrow_schema().fields().len()); + assert_eq!( + "SUM(numbers.number)", + numbers[0].schema.arrow_schema().field(0).name() + ); + + let columns = numbers[0].df_recordbatch.columns(); + assert_eq!(1, columns.len()); + assert_eq!(columns[0].len(), 1); + + assert_eq!( + *columns[0].as_any().downcast_ref::().unwrap(), + UInt64Array::from_slice(&[4950]) + ); + } + _ => unreachable!(), + } + } +} diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 88fa896ae4..d9f8ca19f9 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -16,18 +16,18 @@ use table::{ Table, }; -use crate::catalog::{schema::SchemaProvider, CatalogList, CatalogProvider}; +use crate::catalog::{ + schema::SchemaProvider, CatalogListRef, CatalogProvider, DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, +}; use crate::error::{self, Result}; use crate::executor::Runtime; -const DEFAULT_CATALOG_NAME: &str = "greptime"; -const DEFAULT_SCHEMA_NAME: &str = "public"; - /// Query engine global state #[derive(Clone)] pub struct QueryEngineState { df_context: ExecutionContext, - catalog_list: Arc, + catalog_list: CatalogListRef, } impl fmt::Debug for QueryEngineState { @@ -38,7 +38,7 @@ impl fmt::Debug for QueryEngineState { } impl QueryEngineState { - pub(crate) fn new(catalog_list: Arc) -> Self { + pub(crate) fn new(catalog_list: CatalogListRef) -> Self { let config = ExecutionConfig::new() .with_default_catalog_and_schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME); let df_context = ExecutionContext::with_config(config); @@ -54,6 +54,11 @@ impl QueryEngineState { } } + #[inline] + pub(crate) fn catalog_list(&self) -> &CatalogListRef { + &self.catalog_list + } + #[inline] pub(crate) fn df_context(&self) -> &ExecutionContext { &self.df_context @@ -63,19 +68,12 @@ impl QueryEngineState { pub(crate) fn runtime(&self) -> Runtime { self.df_context.runtime_env().into() } - - #[allow(dead_code)] - pub(crate) fn schema(&self, schema_name: &str) -> Option> { - self.catalog_list - .catalog(DEFAULT_CATALOG_NAME) - .and_then(|c| c.schema(schema_name)) - } } /// Adapters between datafusion and greptime query engine. struct DfCatalogListAdapter { runtime: Arc, - catalog_list: Arc, + catalog_list: CatalogListRef, } impl DfCatalogList for DfCatalogListAdapter { @@ -92,13 +90,14 @@ impl DfCatalogList for DfCatalogListAdapter { df_cataglog_provider: catalog, runtime: self.runtime.clone(), }); - match self.catalog_list.register_catalog(name, catalog_adapter) { - Some(catalog_provider) => Some(Arc::new(DfCatalogProviderAdapter { - catalog_provider, - runtime: self.runtime.clone(), - })), - None => None, - } + self.catalog_list + .register_catalog(name, catalog_adapter) + .map(|catalog_provider| { + Arc::new(DfCatalogProviderAdapter { + catalog_provider, + runtime: self.runtime.clone(), + }) as _ + }) } fn catalog_names(&self) -> Vec { @@ -106,13 +105,12 @@ impl DfCatalogList for DfCatalogListAdapter { } fn catalog(&self, name: &str) -> Option> { - match self.catalog_list.catalog(name) { - Some(catalog_provider) => Some(Arc::new(DfCatalogProviderAdapter { + self.catalog_list.catalog(name).map(|catalog_provider| { + Arc::new(DfCatalogProviderAdapter { catalog_provider, runtime: self.runtime.clone(), - })), - None => None, - } + }) as _ + }) } } @@ -132,13 +130,14 @@ impl CatalogProvider for CatalogProviderAdapter { } fn schema(&self, name: &str) -> Option> { - match self.df_cataglog_provider.schema(name) { - Some(df_schema_provider) => Some(Arc::new(SchemaProviderAdapter { - df_schema_provider, - runtime: self.runtime.clone(), - })), - None => None, - } + self.df_cataglog_provider + .schema(name) + .map(|df_schema_provider| { + Arc::new(SchemaProviderAdapter { + df_schema_provider, + runtime: self.runtime.clone(), + }) as _ + }) } } @@ -158,13 +157,12 @@ impl DfCatalogProvider for DfCatalogProviderAdapter { } fn schema(&self, name: &str) -> Option> { - match self.catalog_provider.schema(name) { - Some(schema_provider) => Some(Arc::new(DfSchemaProviderAdapter { + self.catalog_provider.schema(name).map(|schema_provider| { + Arc::new(DfSchemaProviderAdapter { schema_provider, runtime: self.runtime.clone(), - })), - None => None, - } + }) as _ + }) } } @@ -184,10 +182,9 @@ impl DfSchemaProvider for DfSchemaProviderAdapter { } fn table(&self, name: &str) -> Option> { - match self.schema_provider.table(name) { - Some(table) => Some(Arc::new(DfTableProviderAdapter::new(table))), - None => None, - } + self.schema_provider + .table(name) + .map(|table| Arc::new(DfTableProviderAdapter::new(table)) as _) } fn register_table( @@ -233,13 +230,9 @@ impl SchemaProvider for SchemaProviderAdapter { } fn table(&self, name: &str) -> Option> { - match self.df_schema_provider.table(name) { - Some(table_provider) => Some(Arc::new(TableAdapter::new( - table_provider, - self.runtime.clone(), - ))), - None => None, - } + self.df_schema_provider.table(name).map(|table_provider| { + Arc::new(TableAdapter::new(table_provider, self.runtime.clone())) as _ + }) } fn register_table( @@ -248,31 +241,19 @@ impl SchemaProvider for SchemaProviderAdapter { table: Arc, ) -> Result>> { let table_provider = Arc::new(DfTableProviderAdapter::new(table)); - match self + Ok(self .df_schema_provider .register_table(name, table_provider) .context(error::DatafusionSnafu)? - { - Some(table) => Ok(Some(Arc::new(TableAdapter::new( - table, - self.runtime.clone(), - )))), - None => Ok(None), - } + .map(|table| (Arc::new(TableAdapter::new(table, self.runtime.clone())) as _))) } fn deregister_table(&self, name: &str) -> Result>> { - match self + Ok(self .df_schema_provider .deregister_table(name) .context(error::DatafusionSnafu)? - { - Some(table) => Ok(Some(Arc::new(TableAdapter::new( - table, - self.runtime.clone(), - )))), - None => Ok(None), - } + .map(|table| Arc::new(TableAdapter::new(table, self.runtime.clone())) as _)) } fn table_exist(&self, name: &str) -> bool { diff --git a/src/query/tests/query_engine_test.rs b/src/query/tests/query_engine_test.rs index e03abc2206..5032d4dd5e 100644 --- a/src/query/tests/query_engine_test.rs +++ b/src/query/tests/query_engine_test.rs @@ -1,22 +1,20 @@ use std::sync::Arc; use arrow::array::UInt32Array; -use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; +use common_recordbatch::util; use datafusion::field_util::FieldExt; use datafusion::field_util::SchemaExt; use datafusion::logical_plan::LogicalPlanBuilder; -use futures_util::stream::TryStreamExt; -use query::catalog::memory::MemoryCatalogList; -use query::error::{RecordBatchSnafu, Result}; +use query::catalog::memory; +use query::error::Result; use query::plan::LogicalPlan; -use query::query_engine::QueryEngineFactory; -use snafu::ResultExt; +use query::query_engine::{Output, QueryEngineFactory}; use table::table::adapter::DfTableProviderAdapter; use table::table::numbers::NumbersTable; #[tokio::test] async fn test_datafusion_query_engine() -> Result<()> { - let catalog_list = Arc::new(MemoryCatalogList::default()); + let catalog_list = memory::new_memory_catalog_list()?; let factory = QueryEngineFactory::new(catalog_list); let engine = factory.query_engine(); @@ -32,9 +30,14 @@ async fn test_datafusion_query_engine() -> Result<()> { .unwrap(), ); - let ret = engine.execute(&plan).await; + let output = engine.execute(&plan).await?; - let numbers = collect(ret.unwrap()).await.unwrap(); + let recordbatch = match output { + Output::RecordBatch(recordbach) => recordbach, + _ => unreachable!(), + }; + + let numbers = util::collect(recordbatch).await.unwrap(); assert_eq!(1, numbers.len()); assert_eq!(numbers[0].df_recordbatch.num_columns(), 1); @@ -52,10 +55,3 @@ async fn test_datafusion_query_engine() -> Result<()> { Ok(()) } - -pub async fn collect(stream: SendableRecordBatchStream) -> Result> { - stream - .try_collect::>() - .await - .context(RecordBatchSnafu) -} diff --git a/src/sql/src/dialect.rs b/src/sql/src/dialect.rs index 7bfbbf52b9..20e141877c 100644 --- a/src/sql/src/dialect.rs +++ b/src/sql/src/dialect.rs @@ -1 +1,3 @@ // todo(hl) wrap sqlparser dialects + +pub use sqlparser::dialect::{Dialect, GenericDialect}; diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 2036b54271..505f41b57c 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" [dependencies.arrow] package = "arrow2" version="0.10" -features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute"] +features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"] [dependencies] async-trait = "0.1" diff --git a/src/table/src/table.rs b/src/table/src/table.rs index 2a039e8ba8..ac92cbc389 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -1,3 +1,6 @@ +pub mod adapter; +pub mod numbers; + use std::any::Any; use std::collections::HashMap; use std::sync::Arc; @@ -10,9 +13,6 @@ use datatypes::schema::{Schema, SchemaRef}; use crate::error::Result; -pub mod adapter; -pub mod numbers; - pub type TableId = u64; pub type TableVersion = u64; diff --git a/src/table/src/table/numbers.rs b/src/table/src/table/numbers.rs index c3f0c668fe..62a5857852 100644 --- a/src/table/src/table/numbers.rs +++ b/src/table/src/table/numbers.rs @@ -16,6 +16,7 @@ use crate::error::Result; use crate::table::{Expr, Table}; /// numbers table for test +#[derive(Debug, Clone, Eq, PartialEq)] pub struct NumbersTable { schema: SchemaRef, }