From 3a2f794f6c812c9d6adffdf253c093630fc16b4e Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Tue, 26 Apr 2022 15:17:32 +0800 Subject: [PATCH] feat: query engine impl on datafusion (#10) * feat: query engine impl on datafusion * feat: adds physical_optimizer, physical_planner and executor * feat: impl adpaters between datafuion and greptime query engine core APIs. * feat: impl PhysicalPlanAdapter and ExecutionPlanAdapter * feat: rename table datafusion mod to adapter * fix: clippy warning * fix: conflicts with develop branch * feat: add database mod * fix: CR comment * fix: by CR comments * fix: conflicts with develop branch * fix: by CR comments --- Cargo.lock | 1286 ++++++++++++++++- Cargo.toml | 2 + src/common/query/Cargo.toml | 7 + src/common/query/src/lib.rs | 1 + src/common/query/src/logical_plan/expr.rs | 18 + src/common/query/src/logical_plan/mod.rs | 3 + src/common/recordbatch/Cargo.toml | 16 + src/common/recordbatch/src/error.rs | 10 + src/common/recordbatch/src/lib.rs | 15 + src/common/recordbatch/src/recordbatch.rs | 10 + src/datatypes/Cargo.toml | 3 +- src/datatypes/src/lib.rs | 2 +- src/datatypes/src/schema.rs | 24 + src/query/Cargo.toml | 8 + src/query/src/catalog.rs | 39 + src/query/src/catalog/schema.rs | 35 + src/query/src/database.rs | 1 + src/query/src/error.rs | 20 + src/query/src/executor.rs | 8 + src/query/src/lib.rs | 5 + src/query/src/logical_optimizer.rs | 10 + src/query/src/physical_optimizer.rs | 10 + src/query/src/physical_planner.rs | 16 + src/query/src/plan.rs | 44 + src/query/src/query_engine.rs | 36 + src/query/src/query_engine/context.rs | 3 + src/query/src/query_engine/datafusion.rs | 126 ++ .../src/query_engine/datafusion/adapter.rs | 171 +++ src/query/src/query_engine/state.rs | 225 +++ src/table/Cargo.toml | 14 +- src/table/src/error.rs | 18 + src/table/src/lib.rs | 6 +- src/table/src/table.rs | 104 ++ src/table/src/table/adapter.rs | 308 ++++ src/table/src/table/memory.rs | 1 + 35 files changed, 2597 insertions(+), 8 deletions(-) create mode 100644 src/common/query/Cargo.toml create mode 100644 src/common/query/src/lib.rs create mode 100644 src/common/query/src/logical_plan/expr.rs create mode 100644 src/common/query/src/logical_plan/mod.rs create mode 100644 src/common/recordbatch/Cargo.toml create mode 100644 src/common/recordbatch/src/error.rs create mode 100644 src/common/recordbatch/src/lib.rs create mode 100644 src/common/recordbatch/src/recordbatch.rs create mode 100644 src/query/src/catalog.rs create mode 100644 src/query/src/catalog/schema.rs create mode 100644 src/query/src/database.rs create mode 100644 src/query/src/error.rs create mode 100644 src/query/src/plan.rs create mode 100644 src/query/src/query_engine.rs create mode 100644 src/query/src/query_engine/context.rs create mode 100644 src/query/src/query_engine/datafusion.rs create mode 100644 src/query/src/query_engine/datafusion/adapter.rs create mode 100644 src/query/src/query_engine/state.rs create mode 100644 src/table/src/error.rs create mode 100644 src/table/src/table.rs create mode 100644 src/table/src/table/adapter.rs create mode 100644 src/table/src/table/memory.rs diff --git a/Cargo.lock b/Cargo.lock index cfea96319f..fdfaef032c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,18 +2,125 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + +[[package]] +name = "ahash" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" +dependencies = [ + "getrandom", + "once_cell", + "version_check", +] + +[[package]] +name = "aho-corasick" +version = "0.7.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e37cfd5e7657ada45f742d6e99ca5788580b5c529dc78faf11ece6dc702656f" +dependencies = [ + "memchr", +] + +[[package]] +name = "alloc-no-stdlib" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35ef4730490ad1c4eae5c4325b2a95f521d023e5c885853ff7aca0a6a1631db3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "697ed7edc0f1711de49ce108c541623a0af97c6c60b2f6e2b65229847ac843c2" +dependencies = [ + "alloc-no-stdlib", +] + +[[package]] +name = "array-init-cursor" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf7d0a018de4f6aa429b9d33d69edf69072b1c5b1cb8d3e4a5f7ef898fc3eb76" + +[[package]] +name = "arrayref" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4c527152e37cf757a3f78aae5a06fbeefdb07ccc535c980a3208ee3060dd544" + +[[package]] +name = "arrayvec" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" + +[[package]] +name = "arrow-format" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2333f8ccf0d597ba779863c57a0b61f635721187fb2fdeabae92691d7d582fe5" +dependencies = [ + "planus", + "serde", +] + [[package]] name = "arrow2" version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2e387b20dd573a96f36b173d9027483898f944d696521afd74e2caa3c813d86e" dependencies = [ + "ahash", + "arrow-format", + "base64", "bytemuck", "chrono", + "csv", + "csv-core", "either", + "fallible-streaming-iterator", + "futures", "hash_hasher", + "indexmap", + "itertools", + "lexical-core", + "multiversion", "num-traits", + "parquet2", + "regex", + "serde", + "serde_json", "simdutf8", + "streaming-iterator", + "strength_reduce", +] + +[[package]] +name = "async-stream" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dad5c83079eae9969be7fadefe640a1c566901f05ff91ab221de4b6f68d9507e" +dependencies = [ + "async-stream-impl", + "futures-core", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -33,6 +140,92 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "base64" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + +[[package]] +name = "bitpacking" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8c7d2ac73c167c06af4a5f37e6e59d84148d57ccbe4480b76f0273eefea82d7" +dependencies = [ + "crunchy", +] + +[[package]] +name = "blake2" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9cf849ee05b2ee5fba5e36f97ff8ec2533916700fc0758d40d92136a42f3388" +dependencies = [ + "digest", +] + +[[package]] +name = "blake3" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a08e53fc5a564bb15bfe6fae56bd71522205f1f91893f9c0116edad6496c183f" +dependencies = [ + "arrayref", + "arrayvec", + "cc", + "cfg-if", + "constant_time_eq", + "digest", +] + +[[package]] +name = "block-buffer" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf7fe51849ea569fd452f37822f606a5cabb684dc918707a0193fd4664ff324" +dependencies = [ + "generic-array", +] + +[[package]] +name = "brotli" +version = "3.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1a0b1dbcc8ae29329621f8d4f0d835787c1c38bb1401979b49d13b0b305ff68" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ad2d4653bf5ca36ae797b1f4bb4dbddb60ce49ca4aed8a2ce4829f60425b80" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + +[[package]] +name = "bstr" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3569f383e8f1598449f1a423e72e99569137b47740b1da11ef19af3d5c3223" +dependencies = [ + "lazy_static", + "memchr", + "regex-automata", + "serde", +] + [[package]] name = "bytemuck" version = "1.9.1" @@ -53,6 +246,21 @@ dependencies = [ "syn", ] +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + +[[package]] +name = "cc" +version = "1.0.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" +dependencies = [ + "jobserver", +] + [[package]] name = "cfg-if" version = "1.0.0" @@ -65,14 +273,186 @@ version = "0.4.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73" dependencies = [ + "libc", "num-integer", "num-traits", + "serde", + "time", + "winapi", +] + +[[package]] +name = "comfy-table" +version = "5.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b103d85ca6e209388771bfb7aa6b68a7aeec4afbf6f0a0264bfbf50360e5212e" +dependencies = [ + "strum", + "strum_macros", + "unicode-width", ] [[package]] name = "common-base" version = "0.1.0" +[[package]] +name = "common-query" +version = "0.1.0" +dependencies = [ + "datafusion", +] + +[[package]] +name = "common-recordbatch" +version = "0.1.0" +dependencies = [ + "arrow2", + "datafusion", + "datafusion-common", + "datatypes", + "futures", + "snafu", +] + +[[package]] +name = "constant_time_eq" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" + +[[package]] +name = "cpufeatures" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59a6001667ab124aebae2a495118e11d30984c3a653e99d86d58971708cf5e4b" +dependencies = [ + "libc", +] + +[[package]] +name = "crc32fast" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "crunchy" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" + +[[package]] +name = "crypto-common" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57952ca27b5e3606ff4dd79b0020231aaf9d6aa76dc05fd30137538c50bd3ce8" +dependencies = [ + "generic-array", + "typenum", +] + +[[package]] +name = "csv" +version = "1.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1" +dependencies = [ + "bstr", + "csv-core", + "itoa 0.4.8", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90" +dependencies = [ + "memchr", +] + +[[package]] +name = "datafusion" +version = "7.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?branch=arrow2#744b2626081db95a254fc882820fc7812f95aa51" +dependencies = [ + "ahash", + "arrow2", + "async-trait", + "chrono", + "comfy-table", + "datafusion-common", + "datafusion-expr", + "datafusion-physical-expr", + "futures", + "hashbrown 0.12.0", + "lazy_static", + "log", + "num_cpus", + "ordered-float 2.10.0", + "parking_lot", + "parquet2", + "paste", + "pin-project-lite", + "rand", + "smallvec", + "sqlparser 0.15.0", + "tempfile", + "tokio", + "tokio-stream", +] + +[[package]] +name = "datafusion-common" +version = "7.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?branch=arrow2#744b2626081db95a254fc882820fc7812f95aa51" +dependencies = [ + "arrow2", + "ordered-float 2.10.0", + "parquet2", + "sqlparser 0.15.0", +] + +[[package]] +name = "datafusion-expr" +version = "7.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?branch=arrow2#744b2626081db95a254fc882820fc7812f95aa51" +dependencies = [ + "ahash", + "arrow2", + "datafusion-common", + "sqlparser 0.15.0", +] + +[[package]] +name = "datafusion-physical-expr" +version = "7.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?branch=arrow2#744b2626081db95a254fc882820fc7812f95aa51" +dependencies = [ + "ahash", + "arrow2", + "blake2", + "blake3", + "chrono", + "datafusion-common", + "datafusion-expr", + "hashbrown 0.12.0", + "lazy_static", + "md-5", + "ordered-float 2.10.0", + "paste", + "rand", + "regex", + "sha2", + "unicode-segmentation", +] + [[package]] name = "datanode" version = "0.1.0" @@ -87,6 +467,18 @@ dependencies = [ "arrow2", "common-base", "paste", + "serde", +] + +[[package]] +name = "digest" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2fb860ca6fafa5552fb6d0e816a69c8e49f0908bf524e30a90d97c85892d506" +dependencies = [ + "block-buffer", + "crypto-common", + "subtle", ] [[package]] @@ -101,12 +493,164 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + +[[package]] +name = "fastrand" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3fcf0cee53519c866c09b5de1f6c56ff9d647101f81c1964fa632e148896cdf" +dependencies = [ + "instant", +] + +[[package]] +name = "flate2" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b39522e96686d38f4bc984b9198e3a0613264abaebaff2c5c918bfa6b6da09af" +dependencies = [ + "cfg-if", + "crc32fast", + "libc", + "miniz_oxide", +] + +[[package]] +name = "futures" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" + +[[package]] +name = "futures-executor" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" + +[[package]] +name = "futures-macro" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" + +[[package]] +name = "futures-task" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a" + +[[package]] +name = "futures-util" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] +name = "generic-array" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd48d33ec7f05fbfa152300fdad764757cbded343c1aa1cff2fbaf4134851803" +dependencies = [ + "typenum", + "version_check", +] + +[[package]] +name = "getrandom" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9be70c98951c83b8d2f8f60d7065fa6d5146873094452a1008da8c2f1e4205ad" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "hash_hasher" version = "2.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74721d007512d0cb3338cd20f0654ac913920061a4c4d0d8708edb3f2a698c0c" +[[package]] +name = "hashbrown" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" + +[[package]] +name = "hashbrown" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c21d40587b92fa6a6c6e3c1bdbf87d75511db5672f9c93175574b3a00df1758" +dependencies = [ + "ahash", +] + [[package]] name = "heck" version = "0.3.3" @@ -116,6 +660,160 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "hermit-abi" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +dependencies = [ + "libc", +] + +[[package]] +name = "indexmap" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f647032dfaa1f8b6dc29bd3edb7bbef4861b8b8007ebb118d6db284fd59f6ee" +dependencies = [ + "autocfg", + "hashbrown 0.11.2", +] + +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "integer-encoding" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e85a1509a128c855368e135cffcde7eac17d8e1083f41e2b98c58bc1a5074be" +dependencies = [ + "async-trait", + "futures-util", +] + +[[package]] +name = "itertools" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9a9d19fa1e79b6215ff29b9d6880b706147f16e9b1dbb1e4e5947b5b02bc5e3" +dependencies = [ + "either", +] + +[[package]] +name = "itoa" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" + +[[package]] +name = "itoa" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" + +[[package]] +name = "jobserver" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa" +dependencies = [ + "libc", +] + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "lexical-core" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92912c4af2e7d9075be3e5e3122c4d7263855fa6cce34fbece4dd08e5884624d" +dependencies = [ + "lexical-parse-float", + "lexical-parse-integer", + "lexical-util", + "lexical-write-float", + "lexical-write-integer", +] + +[[package]] +name = "lexical-parse-float" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f518eed87c3be6debe6d26b855c97358d8a11bf05acec137e5f53080f5ad2dd8" +dependencies = [ + "lexical-parse-integer", + "lexical-util", + "static_assertions", +] + +[[package]] +name = "lexical-parse-integer" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afc852ec67c6538bbb2b9911116a385b24510e879a69ab516e6a151b15a79168" +dependencies = [ + "lexical-util", + "static_assertions", +] + +[[package]] +name = "lexical-util" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c72a9d52c5c4e62fa2cdc2cb6c694a39ae1382d9c2a17a466f18e272a0930eb1" +dependencies = [ + "static_assertions", +] + +[[package]] +name = "lexical-write-float" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a89ec1d062e481210c309b672f73a0567b7855f21e7d2fae636df44d12e97f9" +dependencies = [ + "lexical-util", + "lexical-write-integer", + "static_assertions", +] + +[[package]] +name = "lexical-write-integer" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "094060bd2a7c2ff3a16d5304a6ae82727cb3cc9d1c70f813cc73f744c319337e" +dependencies = [ + "lexical-util", + "static_assertions", +] + +[[package]] +name = "libc" +version = "0.2.124" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21a41fed9d98f27ab1c6d161da622a4fa35e8a54a8adc24bbf3ddd0ef70b0e50" + +[[package]] +name = "lock_api" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "327fa5b6a6940e4699ec49a9beae1ea4845c6bab9314e4f84ac68742139d8c53" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "log" version = "0.4.16" @@ -133,6 +831,70 @@ version = "0.1.0" name = "logical-plans" version = "0.1.0" +[[package]] +name = "lz4" +version = "1.23.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4edcb94251b1c375c459e5abe9fb0168c1c826c3370172684844f8f3f8d1a885" +dependencies = [ + "libc", + "lz4-sys", +] + +[[package]] +name = "lz4-sys" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7be8908e2ed6f31c02db8a9fa962f03e36c53fbfde437363eae3306b85d7e17" +dependencies = [ + "cc", + "libc", +] + +[[package]] +name = "md-5" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "658646b21e0b72f7866c7038ab086d3d5e1cd6271f060fd37defb241949d0582" +dependencies = [ + "digest", +] + +[[package]] +name = "memchr" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" + +[[package]] +name = "miniz_oxide" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2b29bd4bc3f33391105ebee3589c19197c4271e3e5a9ec9bfe8127eeff8f082" +dependencies = [ + "adler", +] + +[[package]] +name = "multiversion" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "025c962a3dd3cc5e0e520aa9c612201d127dcdf28616974961a649dca64f5373" +dependencies = [ + "multiversion-macros", +] + +[[package]] +name = "multiversion-macros" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8a3e2bde382ebf960c1f3e79689fa5941625fe9bf694a1cb64af3e85faff3af" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "num-integer" version = "0.1.44" @@ -152,16 +914,131 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "object-store" version = "0.1.0" +[[package]] +name = "once_cell" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9" + +[[package]] +name = "ordered-float" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3305af35278dd29f46fcdd139e0b1fbfae2153f0e5928b39b035542dd31e37b7" +dependencies = [ + "num-traits", +] + +[[package]] +name = "ordered-float" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7940cf2ca942593318d07fcf2596cdca60a85c9e7fab408a5e21a4f9dcd40d87" +dependencies = [ + "num-traits", +] + +[[package]] +name = "parking_lot" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "995f667a6c822200b0433ac218e05582f0e2efa1b922a3fd2fbaadc5f87bab37" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-sys", +] + +[[package]] +name = "parquet-format-async-temp" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03abc2f9c83fe9ceec83f47c76cc071bfd56caba33794340330f35623ab1f544" +dependencies = [ + "async-trait", + "byteorder", + "futures", + "integer-encoding", + "ordered-float 1.1.1", +] + +[[package]] +name = "parquet2" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b085f9e78e4842865151b693f6d94bdf7b280af66daa6e3587adeb3106a07e9" +dependencies = [ + "async-stream", + "bitpacking", + "brotli", + "flate2", + "futures", + "lz4", + "parquet-format-async-temp", + "snap", + "streaming-decompression", + "zstd", +] + [[package]] name = "paste" version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c520e05135d6e763148b6426a837e239041653ba7becd2e538c076c738025fc" +[[package]] +name = "pin-project-lite" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e280fbe77cc62c91527259e9442153f4688736748d24660126286329742b4c6c" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "planus" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bffebaf174d6cad46a5f0f1bb1c45c6eb509571688bcb18dfab217f3c9f9b151" +dependencies = [ + "array-init-cursor", +] + +[[package]] +name = "ppv-lite86" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" + [[package]] name = "proc-macro2" version = "1.0.37" @@ -174,6 +1051,16 @@ dependencies = [ [[package]] name = "query" version = "0.1.0" +dependencies = [ + "async-trait", + "common-recordbatch", + "datafusion", + "datatypes", + "futures", + "snafu", + "table", + "tokio", +] [[package]] name = "quote" @@ -184,12 +1071,156 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" +dependencies = [ + "getrandom", +] + +[[package]] +name = "redox_syscall" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62f25bc4c7e55e0b0b7a1d43fb893f4fa1361d0abe38b9ce4f323c2adfe6ef42" +dependencies = [ + "bitflags", +] + +[[package]] +name = "regex" +version = "1.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" + +[[package]] +name = "regex-syntax" +version = "0.6.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" + +[[package]] +name = "remove_dir_all" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" +dependencies = [ + "winapi", +] + +[[package]] +name = "rustversion" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2cc38e8fa666e2de3c4aba7edeb5ffc5246c1c2ed0e3d17e560aeeba736b23f" + +[[package]] +name = "ryu" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73b4b750c782965c211b42f022f59af1fbceabdd026623714f104152f1ec149f" + +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + +[[package]] +name = "serde" +version = "1.0.136" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce31e24b01e1e524df96f1c2fdd054405f8d7376249a5110886fb4b658484789" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.136" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08597e7152fcd306f41838ed3e37be9eaeed2b61c42e2117266a554fab4662f9" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e8d9fa5c3b304765ce1fd9c4c8a3de2c8db365a5b91be52f186efc675681d95" +dependencies = [ + "indexmap", + "itoa 1.0.1", + "ryu", + "serde", +] + +[[package]] +name = "sha2" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55deaec60f81eefe3cce0dc50bda92d6d8e88f2a27df7c5033b42afeb1ed2676" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "simdutf8" version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a" +[[package]] +name = "slab" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb703cfe953bccee95685111adeedb76fabe4e97549a58d16f03ea7b9367bb32" + +[[package]] +name = "smallvec" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" + [[package]] name = "snafu" version = "0.7.0" @@ -212,13 +1243,28 @@ dependencies = [ "syn", ] +[[package]] +name = "snap" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45456094d1983e2ee2a18fdfebce3189fa451699d0502cb8e3b49dba5ba41451" + [[package]] name = "sql" version = "0.1.0" dependencies = [ "query", "snafu", - "sqlparser", + "sqlparser 0.16.0", +] + +[[package]] +name = "sqlparser" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adbbea2526ad0d02ad9414a07c396078a5b944bbf9ca4fbab8f01bb4cb579081" +dependencies = [ + "log", ] [[package]] @@ -230,6 +1276,58 @@ dependencies = [ "log", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + +[[package]] +name = "streaming-decompression" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9bc687acd5dc742c4a7094f2927a8614a68e4743ef682e7a2f9f0f711656cc92" +dependencies = [ + "fallible-streaming-iterator", +] + +[[package]] +name = "streaming-iterator" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "303235c177994a476226b80d076bd333b7b560fb05bd242a10609d11b07f81f5" + +[[package]] +name = "strength_reduce" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3ff2f71c82567c565ba4b3009a9350a96a7269eaa4001ebedae926230bc2254" + +[[package]] +name = "strum" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cae14b91c7d11c9a851d3fbc80a963198998c2a64eec840477fa92d8ce9b70bb" + +[[package]] +name = "strum_macros" +version = "0.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bb0dc7ee9c15cea6199cde9a127fa16a4c5819af85395457ad72d68edc85a38" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "rustversion", + "syn", +] + +[[package]] +name = "subtle" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" + [[package]] name = "syn" version = "1.0.91" @@ -245,17 +1343,203 @@ dependencies = [ name = "table" version = "0.1.0" dependencies = [ + "arrow2", "async-trait", + "chrono", + "common-query", + "common-recordbatch", + "datafusion", + "datafusion-common", + "datatypes", + "futures", + "serde", + "snafu", ] +[[package]] +name = "tempfile" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cdb1ef4eaeeaddc8fbd371e5017057064af0911902ef36b39801f67cc6d79e4" +dependencies = [ + "cfg-if", + "fastrand", + "libc", + "redox_syscall", + "remove_dir_all", + "winapi", +] + +[[package]] +name = "time" +version = "0.1.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "tokio" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee" +dependencies = [ + "num_cpus", + "parking_lot", + "pin-project-lite", + "tokio-macros", +] + +[[package]] +name = "tokio-macros" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b557f72f448c511a979e2564e55d74e6c4432fc96ff4f6241bc6bded342643b7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-stream" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "typenum" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987" + [[package]] name = "unicode-segmentation" version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e8820f5d777f6224dc4be3632222971ac30164d4a258d595640799554ebfd99" +[[package]] +name = "unicode-width" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ed742d4ea2bd1176e236172c8429aaf54486e7ac098db29ffe6529e0ce50973" + [[package]] name = "unicode-xid" version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" + +[[package]] +name = "version_check" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" + +[[package]] +name = "wasi" +version = "0.10.2+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-sys" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5acdd78cb4ba54c0045ac14f62d8f94a03d10047904ae2a40afa1e99d8f70825" +dependencies = [ + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_msvc" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17cffbe740121affb56fad0fc0e421804adf0ae00891205213b5cecd30db881d" + +[[package]] +name = "windows_i686_gnu" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2564fde759adb79129d9b4f54be42b32c89970c18ebf93124ca8870a498688ed" + +[[package]] +name = "windows_i686_msvc" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cd9d32ba70453522332c14d38814bceeb747d80b3958676007acadd7e166956" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfce6deae227ee8d356d19effc141a509cc503dfd1f850622ec4b0f84428e1f4" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d19538ccc21819d01deaf88d6a17eae6596a12e9aafdbb97916fb49896d89de9" + +[[package]] +name = "zstd" +version = "0.10.0+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b1365becbe415f3f0fcd024e2f7b45bacfb5bdd055f0dc113571394114e7bdd" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "4.1.4+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f7cd17c9af1a4d6c24beb1cc54b17e2ef7b593dc92f19e9d9acad8b182bbaee" +dependencies = [ + "libc", + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "1.6.3+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc49afa5c8d634e75761feda8c592051e7eeb4683ba827211eb0d731d3402ea8" +dependencies = [ + "cc", + "libc", +] diff --git a/Cargo.toml b/Cargo.toml index deff6444b8..e6de5cea81 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,8 @@ [workspace] members = [ "src/common/base", + "src/common/query", + "src/common/recordbatch", "src/datanode", "src/datatypes", "src/log-store", diff --git a/src/common/query/Cargo.toml b/src/common/query/Cargo.toml new file mode 100644 index 0000000000..29cfebdd72 --- /dev/null +++ b/src/common/query/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "common-query" +version = "0.1.0" +edition = "2021" + +[dependencies] +datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]} \ No newline at end of file diff --git a/src/common/query/src/lib.rs b/src/common/query/src/lib.rs new file mode 100644 index 0000000000..e06461473c --- /dev/null +++ b/src/common/query/src/lib.rs @@ -0,0 +1 @@ +pub mod logical_plan; diff --git a/src/common/query/src/logical_plan/expr.rs b/src/common/query/src/logical_plan/expr.rs new file mode 100644 index 0000000000..1a55d1d350 --- /dev/null +++ b/src/common/query/src/logical_plan/expr.rs @@ -0,0 +1,18 @@ +use datafusion::logical_plan::Expr as DfExpr; + +/// Central struct of query API. +/// Represent logical expressions such as `A + 1`, or `CAST(c1 AS int)`. +#[derive(Clone, PartialEq, Hash)] +pub struct Expr { + df_expr: DfExpr, +} + +impl Expr { + pub fn new(df_expr: DfExpr) -> Self { + Self { df_expr } + } + + pub fn df_expr(&self) -> &DfExpr { + &self.df_expr + } +} diff --git a/src/common/query/src/logical_plan/mod.rs b/src/common/query/src/logical_plan/mod.rs new file mode 100644 index 0000000000..726fb9eb84 --- /dev/null +++ b/src/common/query/src/logical_plan/mod.rs @@ -0,0 +1,3 @@ +mod expr; + +pub use self::expr::Expr; diff --git a/src/common/recordbatch/Cargo.toml b/src/common/recordbatch/Cargo.toml new file mode 100644 index 0000000000..ace80dfbcd --- /dev/null +++ b/src/common/recordbatch/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "common-recordbatch" +version = "0.1.0" +edition = "2021" + +[dependencies.arrow] +package = "arrow2" +version="0.10" +features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute"] + +[dependencies] +datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]} +datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2"} +datatypes = {path ="../../datatypes" } +futures = "0.3" +snafu = "0.7.0" diff --git a/src/common/recordbatch/src/error.rs b/src/common/recordbatch/src/error.rs new file mode 100644 index 0000000000..e5666f485b --- /dev/null +++ b/src/common/recordbatch/src/error.rs @@ -0,0 +1,10 @@ +use arrow::error::ArrowError; +use snafu::Snafu; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Arrow error: {}", source))] + Arrow { source: ArrowError }, +} +pub type Result = std::result::Result; diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs new file mode 100644 index 0000000000..85e75ed4e3 --- /dev/null +++ b/src/common/recordbatch/src/lib.rs @@ -0,0 +1,15 @@ +pub mod error; +mod recordbatch; + +use std::pin::Pin; + +use datatypes::schema::SchemaRef; +use error::Result; +use futures::Stream; +pub use recordbatch::RecordBatch; + +pub trait RecordBatchStream: Stream> { + fn schema(&self) -> SchemaRef; +} + +pub type SendableRecordBatchStream = Pin>; diff --git a/src/common/recordbatch/src/recordbatch.rs b/src/common/recordbatch/src/recordbatch.rs new file mode 100644 index 0000000000..1f48247785 --- /dev/null +++ b/src/common/recordbatch/src/recordbatch.rs @@ -0,0 +1,10 @@ +use std::sync::Arc; + +use datafusion_common::record_batch::RecordBatch as DfRecordBatch; +use datatypes::schema::Schema; + +#[derive(Clone, Debug)] +pub struct RecordBatch { + pub schema: Arc, + pub df_recordbatch: DfRecordBatch, +} diff --git a/src/datatypes/Cargo.toml b/src/datatypes/Cargo.toml index 7365737cc3..c38c0205e8 100644 --- a/src/datatypes/Cargo.toml +++ b/src/datatypes/Cargo.toml @@ -3,9 +3,8 @@ name = "datatypes" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] arrow2 = "0.10" common-base = { path = "../common/base" } paste = "1.0" +serde ={ version = "1.0.136", features = ["derive"] } diff --git a/src/datatypes/src/lib.rs b/src/datatypes/src/lib.rs index 7f3e4ebb8e..645b6d75d1 100644 --- a/src/datatypes/src/lib.rs +++ b/src/datatypes/src/lib.rs @@ -3,7 +3,6 @@ mod data_type; pub mod prelude; mod scalars; -mod schema; pub mod type_id; mod types; pub mod value; @@ -13,3 +12,4 @@ use arrow2::array::{BinaryArray, MutableBinaryArray}; pub type LargeBinaryArray = BinaryArray; pub type MutableLargeBinaryArray = MutableBinaryArray; +pub mod schema; diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index 8b13789179..61e2179c99 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -1 +1,25 @@ +use std::sync::Arc; +use arrow2::datatypes::Schema as ArrowSchema; + +#[derive(Debug, Clone)] +pub struct Schema { + arrow_schema: Arc, +} + +impl Schema { + pub fn new(arrow_schema: Arc) -> Self { + Self { arrow_schema } + } + pub fn arrow_schema(&self) -> &Arc { + &self.arrow_schema + } +} + +pub type SchemaRef = Arc; + +impl From> for Schema { + fn from(s: Arc) -> Schema { + Schema::new(s) + } +} diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 9cd531027d..3757bbb7a0 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -6,3 +6,11 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-trait = "0.1" +common-recordbatch = {path = "../common/recordbatch" } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]} +datatypes = {path = "../datatypes" } +futures = "0.3" +snafu = "0.7.0" +table = { path = "../table" } +tokio = "1.0" diff --git a/src/query/src/catalog.rs b/src/query/src/catalog.rs new file mode 100644 index 0000000000..4eaf4b5dbd --- /dev/null +++ b/src/query/src/catalog.rs @@ -0,0 +1,39 @@ +pub mod schema; +use std::any::Any; +use std::sync::Arc; + +use crate::catalog::schema::SchemaProvider; + +/// Represent a list of named catalogs +pub trait CatalogList: Sync + Send { + /// Returns the catalog list as [`Any`](std::any::Any) + /// so that it can be downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; + + /// Adds a new catalog to this catalog list + /// If a catalog of the same name existed before, it is replaced in the list and returned. + fn register_catalog( + &self, + name: String, + catalog: Arc, + ) -> Option>; + + /// Retrieves the list of available catalog names + fn catalog_names(&self) -> Vec; + + /// Retrieves a specific catalog by name, provided it exists. + fn catalog(&self, name: &str) -> Option>; +} + +/// Represents a catalog, comprising a number of named schemas. +pub trait CatalogProvider: Sync + Send { + /// Returns the catalog provider as [`Any`](std::any::Any) + /// so that it can be downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; + + /// Retrieves the list of available schema names in this catalog. + fn schema_names(&self) -> Vec; + + /// Retrieves a specific schema from the catalog by name, provided it exists. + fn schema(&self, name: &str) -> Option>; +} diff --git a/src/query/src/catalog/schema.rs b/src/query/src/catalog/schema.rs new file mode 100644 index 0000000000..ec168d878d --- /dev/null +++ b/src/query/src/catalog/schema.rs @@ -0,0 +1,35 @@ +use std::any::Any; + +use table::TableRef; + +use crate::error::Result; + +/// Represents a schema, comprising a number of named tables. +pub trait SchemaProvider: Sync + Send { + /// Returns the schema provider as [`Any`](std::any::Any) + /// so that it can be downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; + + /// Retrieves the list of available table names in this schema. + fn table_names(&self) -> Vec; + + /// Retrieves a specific table from the schema by name, provided it exists. + fn table(&self, name: &str) -> Option; + + /// If supported by the implementation, adds a new table to this schema. + /// If a table of the same name existed before, it returns "Table already exists" error. + fn register_table(&self, _name: String, _table: TableRef) -> Result> { + todo!(); + } + + /// If supported by the implementation, removes an existing table from this schema and returns it. + /// If no table of that name exists, returns Ok(None). + fn deregister_table(&self, _name: &str) -> Result> { + todo!(); + } + + /// If supported by the implementation, checks the table exist in the schema provider or not. + /// If no matched table in the schema provider, return false. + /// Otherwise, return true. + fn table_exist(&self, name: &str) -> bool; +} diff --git a/src/query/src/database.rs b/src/query/src/database.rs new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/src/query/src/database.rs @@ -0,0 +1 @@ + diff --git a/src/query/src/error.rs b/src/query/src/error.rs new file mode 100644 index 0000000000..9dd8d7506e --- /dev/null +++ b/src/query/src/error.rs @@ -0,0 +1,20 @@ +use datafusion::error::DataFusionError; +use snafu::Snafu; + +/// business error of query engine +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Datafusion query engine error: {}", source))] + Datafusion { source: DataFusionError }, + #[snafu(display("PhysicalPlan downcast_ref failed"))] + PhysicalPlanDowncast, +} + +pub type Result = std::result::Result; + +impl From for DataFusionError { + fn from(e: Error) -> DataFusionError { + DataFusionError::External(Box::new(e)) + } +} diff --git a/src/query/src/executor.rs b/src/query/src/executor.rs index 8b13789179..af472a419f 100644 --- a/src/query/src/executor.rs +++ b/src/query/src/executor.rs @@ -1 +1,9 @@ +use std::sync::Arc; +use crate::{error::Result, plan::PhysicalPlan, query_engine::QueryContext}; + +/// Executor to run [ExecutionPlan]. +#[async_trait::async_trait] +pub trait QueryExecutor { + async fn execute_stream(&self, ctx: &QueryContext, plan: &Arc) -> Result<()>; +} diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index 7af252fa23..7f37f417d8 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -1,4 +1,9 @@ +pub mod catalog; +pub mod database; +pub mod error; pub mod executor; pub mod logical_optimizer; pub mod physical_optimizer; pub mod physical_planner; +mod plan; +pub mod query_engine; diff --git a/src/query/src/logical_optimizer.rs b/src/query/src/logical_optimizer.rs index 8b13789179..8b4414fd95 100644 --- a/src/query/src/logical_optimizer.rs +++ b/src/query/src/logical_optimizer.rs @@ -1 +1,11 @@ +use crate::error::Result; +use crate::plan::LogicalPlan; +use crate::query_engine::QueryContext; +pub trait LogicalOptimizer { + fn optimize_logical_plan( + &self, + ctx: &mut QueryContext, + plan: &LogicalPlan, + ) -> Result; +} diff --git a/src/query/src/physical_optimizer.rs b/src/query/src/physical_optimizer.rs index 8b13789179..bf7813a988 100644 --- a/src/query/src/physical_optimizer.rs +++ b/src/query/src/physical_optimizer.rs @@ -1 +1,11 @@ +use std::sync::Arc; +use crate::{error::Result, plan::PhysicalPlan, query_engine::QueryContext}; + +pub trait PhysicalOptimizer { + fn optimize_physical_plan( + &self, + ctx: &mut QueryContext, + plan: Arc, + ) -> Result>; +} diff --git a/src/query/src/physical_planner.rs b/src/query/src/physical_planner.rs index 8b13789179..967038afd0 100644 --- a/src/query/src/physical_planner.rs +++ b/src/query/src/physical_planner.rs @@ -1 +1,17 @@ +use std::sync::Arc; +use crate::error::Result; +use crate::plan::{LogicalPlan, PhysicalPlan}; +use crate::query_engine::QueryContext; + +/// Physical query planner that converts a `LogicalPlan` to an +/// `ExecutionPlan` suitable for execution. +#[async_trait::async_trait] +pub trait PhysicalPlanner { + /// Create a physical plan from a logical plan + async fn create_physical_plan( + &self, + ctx: &mut QueryContext, + logical_plan: &LogicalPlan, + ) -> Result>; +} diff --git a/src/query/src/plan.rs b/src/query/src/plan.rs new file mode 100644 index 0000000000..c259a6886e --- /dev/null +++ b/src/query/src/plan.rs @@ -0,0 +1,44 @@ +use std::any::Any; +use std::sync::Arc; + +use common_recordbatch::SendableRecordBatchStream; +use datafusion::logical_plan::LogicalPlan as DfLogicalPlan; +use datatypes::schema::SchemaRef; + +use crate::error::Result; + +/// A LogicalPlan represents the different types of relational +/// operators (such as Projection, Filter, etc) and can be created by +/// the SQL query planner. +/// +/// A LogicalPlan represents transforming an input relation (table) to +/// an output relation (table) with a (potentially) different +/// schema. A plan represents a dataflow tree where data flows +/// from leaves up to the root to produce the query result. +#[derive(Clone)] +pub enum LogicalPlan { + DfPlan(DfLogicalPlan), +} + +#[async_trait::async_trait] +pub trait PhysicalPlan: Send + Sync + Any { + /// Get the schema for this execution plan + fn schema(&self) -> SchemaRef; + + /// Get a list of child execution plans that provide the input for this plan. The returned list + /// will be empty for leaf nodes, will contain a single value for unary nodes, or two + /// values for binary nodes (such as joins). + fn children(&self) -> Vec>; + + /// Returns a new plan where all children were replaced by new plans. + /// The size of `children` must be equal to the size of `ExecutionPlan::children()`. + fn with_new_children( + &self, + children: Vec>, + ) -> Result>; + + /// creates an iterator + async fn execute(&self, partition: usize) -> Result; + + fn as_any(&self) -> &dyn Any; +} diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs new file mode 100644 index 0000000000..76ae018c06 --- /dev/null +++ b/src/query/src/query_engine.rs @@ -0,0 +1,36 @@ +use std::sync::Arc; + +use crate::catalog::CatalogList; +use crate::error::Result; +use crate::plan::LogicalPlan; + +mod context; +mod datafusion; +mod state; +pub use context::QueryContext; + +use crate::query_engine::datafusion::DatafusionQueryEngine; + +#[async_trait::async_trait] +pub trait QueryEngine { + fn name(&self) -> &str; + async fn execute(&self, plan: &LogicalPlan) -> Result<()>; +} + +pub struct QueryEngineFactory { + query_engine: Arc, +} + +impl QueryEngineFactory { + pub fn new(catalog_list: Arc) -> Self { + Self { + query_engine: Arc::new(DatafusionQueryEngine::new(catalog_list)), + } + } +} + +impl QueryEngineFactory { + pub fn query_engine(&self) -> &Arc { + &self.query_engine + } +} diff --git a/src/query/src/query_engine/context.rs b/src/query/src/query_engine/context.rs new file mode 100644 index 0000000000..97bf384a59 --- /dev/null +++ b/src/query/src/query_engine/context.rs @@ -0,0 +1,3 @@ +/// Query engine execution context +#[derive(Default, Debug)] +pub struct QueryContext; diff --git a/src/query/src/query_engine/datafusion.rs b/src/query/src/query_engine/datafusion.rs new file mode 100644 index 0000000000..5f16b21de0 --- /dev/null +++ b/src/query/src/query_engine/datafusion.rs @@ -0,0 +1,126 @@ +use std::sync::Arc; + +use snafu::{OptionExt, ResultExt}; + +use super::{context::QueryContext, state::QueryEngineState}; +use crate::{ + catalog::CatalogList, + error::{self, Result}, + executor::QueryExecutor, + logical_optimizer::LogicalOptimizer, + physical_optimizer::PhysicalOptimizer, + physical_planner::PhysicalPlanner, + plan::{LogicalPlan, PhysicalPlan}, + query_engine::datafusion::adapter::PhysicalPlanAdapter, + query_engine::QueryEngine, +}; +mod adapter; + +pub(crate) struct DatafusionQueryEngine { + state: QueryEngineState, +} + +impl DatafusionQueryEngine { + pub fn new(catalog_list: Arc) -> Self { + Self { + state: QueryEngineState::new(catalog_list), + } + } +} + +#[async_trait::async_trait] +impl QueryEngine for DatafusionQueryEngine { + fn name(&self) -> &str { + "datafusion" + } + async fn execute(&self, plan: &LogicalPlan) -> Result<()> { + let mut ctx = QueryContext::default(); + let logical_plan = self.optimize_logical_plan(&mut ctx, plan)?; + let physical_plan = self.create_physical_plan(&mut ctx, &logical_plan).await?; + let physical_plan = self.optimize_physical_plan(&mut ctx, physical_plan)?; + + Ok(self.execute_stream(&ctx, &physical_plan).await?) + } +} + +impl LogicalOptimizer for DatafusionQueryEngine { + fn optimize_logical_plan( + &self, + _ctx: &mut QueryContext, + plan: &LogicalPlan, + ) -> Result { + match plan { + LogicalPlan::DfPlan(df_plan) => { + let optimized_plan = self + .state + .df_context() + .optimize(df_plan) + .context(error::DatafusionSnafu)?; + + Ok(LogicalPlan::DfPlan(optimized_plan)) + } + } + } +} + +#[async_trait::async_trait] +impl PhysicalPlanner for DatafusionQueryEngine { + async fn create_physical_plan( + &self, + _ctx: &mut QueryContext, + logical_plan: &LogicalPlan, + ) -> Result> { + match logical_plan { + LogicalPlan::DfPlan(df_plan) => { + let physical_plan = self + .state + .df_context() + .create_physical_plan(df_plan) + .await + .context(error::DatafusionSnafu)?; + + Ok(Arc::new(PhysicalPlanAdapter::new( + Arc::new(physical_plan.schema().into()), + physical_plan, + ))) + } + } + } +} + +impl PhysicalOptimizer for DatafusionQueryEngine { + fn optimize_physical_plan( + &self, + _ctx: &mut QueryContext, + plan: Arc, + ) -> Result> { + let config = &self.state.df_context().state.lock().config; + let optimizers = &config.physical_optimizers; + + let mut new_plan = plan + .as_any() + .downcast_ref::() + .context(error::PhysicalPlanDowncastSnafu)? + .df_plan() + .clone(); + + for optimizer in optimizers { + new_plan = optimizer + .optimize(new_plan, config) + .context(error::DatafusionSnafu)?; + } + Ok(Arc::new(PhysicalPlanAdapter::new(plan.schema(), new_plan))) + } +} + +#[async_trait::async_trait] +impl QueryExecutor for DatafusionQueryEngine { + async fn execute_stream( + &self, + _ctx: &QueryContext, + _plan: &Arc, + ) -> Result<()> { + let _runtime = self.state.df_context().runtime_env(); + Ok(()) + } +} diff --git a/src/query/src/query_engine/datafusion/adapter.rs b/src/query/src/query_engine/datafusion/adapter.rs new file mode 100644 index 0000000000..c60f183d5d --- /dev/null +++ b/src/query/src/query_engine/datafusion/adapter.rs @@ -0,0 +1,171 @@ +use std::any::Any; +use std::fmt::{self, Debug}; +use std::sync::Arc; + +use common_recordbatch::SendableRecordBatchStream; +use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; +use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +use datafusion::{ + error::Result as DfResult, + physical_plan::{ + expressions::PhysicalSortExpr, ExecutionPlan, Partitioning, + SendableRecordBatchStream as DfSendableRecordBatchStream, Statistics, + }, +}; +use datatypes::schema::SchemaRef; +use snafu::ResultExt; +use table::table::adapter::{DfRecordBatchStreamAdapter, RecordBatchStreamAdapter}; + +use crate::error::{self, Result}; +use crate::plan::PhysicalPlan; + +/// Datafusion ExecutionPlan -> greptime PhysicalPlan +pub struct PhysicalPlanAdapter { + plan: Arc, + schema: SchemaRef, +} + +impl PhysicalPlanAdapter { + pub fn new(schema: SchemaRef, plan: Arc) -> Self { + Self { schema, plan } + } + + #[inline] + pub fn df_plan(&self) -> &Arc { + &self.plan + } +} + +#[async_trait::async_trait] +impl PhysicalPlan for PhysicalPlanAdapter { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn children(&self) -> Vec> { + let mut plans: Vec> = vec![]; + for p in self.plan.children() { + let plan = PhysicalPlanAdapter::new(self.schema.clone(), p); + plans.push(Arc::new(plan)); + } + plans + } + + fn with_new_children( + &self, + children: Vec>, + ) -> Result> { + let mut df_children: Vec> = Vec::with_capacity(children.len()); + + for plan in children { + let p = Arc::new(ExecutionPlanAdapter { + plan, + schema: self.schema.clone(), + }); + df_children.push(p); + } + + let plan = self + .plan + .with_new_children(df_children) + .context(error::DatafusionSnafu)?; + Ok(Arc::new(PhysicalPlanAdapter::new( + self.schema.clone(), + plan, + ))) + } + + async fn execute(&self, partition: usize) -> Result { + // FIXME(dennis) runtime + let runtime = RuntimeEnv::new(RuntimeConfig::default()).context(error::DatafusionSnafu)?; + let df_stream = self + .plan + .execute(partition, Arc::new(runtime)) + .await + .context(error::DatafusionSnafu)?; + + Ok(Box::pin(RecordBatchStreamAdapter::new(df_stream))) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +/// Greptime PhysicalPlan -> datafusion ExecutionPlan. +struct ExecutionPlanAdapter { + plan: Arc, + schema: SchemaRef, +} + +impl Debug for ExecutionPlanAdapter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + //TODO(dennis) better debug info + write!(f, "ExecutionPlan(PlaceHolder)") + } +} + +unsafe impl Send for ExecutionPlanAdapter {} +unsafe impl Sync for ExecutionPlanAdapter {} + +#[async_trait::async_trait] +impl ExecutionPlan for ExecutionPlanAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> DfSchemaRef { + self.schema.arrow_schema().clone() + } + + fn output_partitioning(&self) -> Partitioning { + // FIXME(dennis) + Partitioning::UnknownPartitioning(1) + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + // FIXME(dennis) + None + } + + fn children(&self) -> Vec> { + // TODO(dennis) + vec![] + } + + fn with_new_children( + &self, + children: Vec>, + ) -> DfResult> { + let mut gt_children: Vec> = Vec::with_capacity(children.len()); + + for plan in children { + let p = Arc::new(PhysicalPlanAdapter::new(self.schema.clone(), plan)); + gt_children.push(p); + } + + match self.plan.with_new_children(gt_children) { + Ok(plan) => Ok(Arc::new(ExecutionPlanAdapter { + schema: self.schema.clone(), + plan, + })), + Err(e) => Err(e.into()), + } + } + + async fn execute( + &self, + partition: usize, + _runtime: Arc, + ) -> DfResult { + match self.plan.execute(partition).await { + Ok(stream) => Ok(Box::pin(DfRecordBatchStreamAdapter::new(stream))), + Err(e) => Err(e.into()), + } + } + + fn statistics(&self) -> Statistics { + //TODO(dennis) + Statistics::default() + } +} diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs new file mode 100644 index 0000000000..d1f467f357 --- /dev/null +++ b/src/query/src/query_engine/state.rs @@ -0,0 +1,225 @@ +use std::any::Any; +use std::sync::Arc; + +use datafusion::catalog::{ + catalog::{CatalogList as DfCatalogList, CatalogProvider as DfCatalogProvider}, + schema::SchemaProvider as DfSchemaProvider, +}; +use datafusion::datasource::TableProvider as DfTableProvider; +use datafusion::error::Result as DataFusionResult; +use datafusion::prelude::{ExecutionConfig, ExecutionContext}; +use snafu::ResultExt; +use table::{ + table::adapter::{DfTableProviderAdapter, TableAdapter}, + Table, +}; + +use crate::catalog::{schema::SchemaProvider, CatalogList, CatalogProvider}; +use crate::error::{self, Result}; + +/// Query engine global state +#[derive(Clone)] +pub struct QueryEngineState { + df_context: ExecutionContext, +} + +impl QueryEngineState { + pub(crate) fn new(catalog_list: Arc) -> Self { + let config = ExecutionConfig::new().with_default_catalog_and_schema("greptime", "public"); + let df_context = ExecutionContext::with_config(config); + + df_context.state.lock().catalog_list = Arc::new(DfCatalogListAdapter { + catalog_list: catalog_list.clone(), + }); + + Self { df_context } + } + + #[inline] + pub(crate) fn df_context(&self) -> &ExecutionContext { + &self.df_context + } +} + +/// Adapters between datafusion and greptime query engine. +struct DfCatalogListAdapter { + catalog_list: Arc, +} + +impl DfCatalogList for DfCatalogListAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + fn register_catalog( + &self, + name: String, + catalog: Arc, + ) -> Option> { + let catalog_adapter = Arc::new(CatalogProviderAdapter { + df_cataglog_provider: catalog, + }); + match self.catalog_list.register_catalog(name, catalog_adapter) { + Some(catalog_provider) => Some(Arc::new(DfCatalogProviderAdapter { catalog_provider })), + None => None, + } + } + + fn catalog_names(&self) -> Vec { + self.catalog_list.catalog_names() + } + + fn catalog(&self, name: &str) -> Option> { + match self.catalog_list.catalog(name) { + Some(catalog_provider) => Some(Arc::new(DfCatalogProviderAdapter { catalog_provider })), + None => None, + } + } +} + +/// Datafusion's CatalogProvider -> greptime CatalogProvider +struct CatalogProviderAdapter { + df_cataglog_provider: Arc, +} + +impl CatalogProvider for CatalogProviderAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> Vec { + self.df_cataglog_provider.schema_names() + } + + fn schema(&self, name: &str) -> Option> { + match self.df_cataglog_provider.schema(name) { + Some(df_schema_provider) => { + Some(Arc::new(SchemaProviderAdapter { df_schema_provider })) + } + None => None, + } + } +} + +///Greptime CatalogProvider -> datafusion's CatalogProvider +struct DfCatalogProviderAdapter { + catalog_provider: Arc, +} + +impl DfCatalogProvider for DfCatalogProviderAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> Vec { + self.catalog_provider.schema_names() + } + + fn schema(&self, name: &str) -> Option> { + match self.catalog_provider.schema(name) { + Some(schema_provider) => Some(Arc::new(DfSchemaProviderAdapter { schema_provider })), + None => None, + } + } +} + +/// Greptime SchemaProvider -> datafusion SchemaProvider +struct DfSchemaProviderAdapter { + schema_provider: Arc, +} + +impl DfSchemaProvider for DfSchemaProviderAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + self.schema_provider.table_names() + } + + fn table(&self, name: &str) -> Option> { + match self.schema_provider.table(name) { + Some(table) => Some(Arc::new(DfTableProviderAdapter::new(table))), + None => None, + } + } + + fn register_table( + &self, + name: String, + table: Arc, + ) -> DataFusionResult>> { + let table = Arc::new(TableAdapter::new(table)); + match self.schema_provider.register_table(name, table) { + Ok(Some(p)) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))), + Ok(None) => Ok(None), + Err(e) => Err(e.into()), + } + } + + fn deregister_table(&self, name: &str) -> DataFusionResult>> { + match self.schema_provider.deregister_table(name) { + Ok(Some(p)) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))), + Ok(None) => Ok(None), + Err(e) => Err(e.into()), + } + } + + fn table_exist(&self, name: &str) -> bool { + self.schema_provider.table_exist(name) + } +} + +/// Datafuion SchemaProviderAdapter -> greptime SchemaProviderAdapter +struct SchemaProviderAdapter { + df_schema_provider: Arc, +} + +impl SchemaProvider for SchemaProviderAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + /// Retrieves the list of available table names in this schema. + fn table_names(&self) -> Vec { + self.df_schema_provider.table_names() + } + + fn table(&self, name: &str) -> Option> { + match self.df_schema_provider.table(name) { + Some(table_provider) => Some(Arc::new(TableAdapter::new(table_provider))), + None => None, + } + } + + fn register_table( + &self, + name: String, + table: Arc, + ) -> Result>> { + let table_provider = Arc::new(DfTableProviderAdapter::new(table)); + match self + .df_schema_provider + .register_table(name, table_provider) + .context(error::DatafusionSnafu)? + { + Some(table) => Ok(Some(Arc::new(TableAdapter::new(table)))), + None => Ok(None), + } + } + + fn deregister_table(&self, name: &str) -> Result>> { + match self + .df_schema_provider + .deregister_table(name) + .context(error::DatafusionSnafu)? + { + Some(table) => Ok(Some(Arc::new(TableAdapter::new(table)))), + None => Ok(None), + } + } + + fn table_exist(&self, name: &str) -> bool { + self.df_schema_provider.table_exist(name) + } +} diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 6ee53b9861..2036b54271 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -3,7 +3,19 @@ name = "table" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[dependencies.arrow] +package = "arrow2" +version="0.10" +features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute"] [dependencies] async-trait = "0.1" +chrono = { version = "0.4", features = ["serde"] } +common-query = {path = "../common/query" } +common-recordbatch = {path = "../common/recordbatch" } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]} +datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2"} +datatypes = { path = "../datatypes" } +futures = "0.3" +serde = "1.0.136" +snafu = "0.7.0" diff --git a/src/table/src/error.rs b/src/table/src/error.rs new file mode 100644 index 0000000000..13248151ca --- /dev/null +++ b/src/table/src/error.rs @@ -0,0 +1,18 @@ +use datafusion::error::DataFusionError; +use snafu::Snafu; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Datafusion error: {}", source))] + Datafusion { source: DataFusionError }, + #[snafu(display("Not expected to run ExecutionPlan more than once."))] + ExecuteRepeatedly, +} +pub type Result = std::result::Result; + +impl From for DataFusionError { + fn from(e: Error) -> DataFusionError { + DataFusionError::External(Box::new(e)) + } +} diff --git a/src/table/src/lib.rs b/src/table/src/lib.rs index 804f6c9a68..b36cf823c3 100644 --- a/src/table/src/lib.rs +++ b/src/table/src/lib.rs @@ -1,5 +1,5 @@ mod engine; +pub mod error; +pub mod table; -/// Table abstraction. -#[async_trait::async_trait] -pub trait Table: Send + Sync {} +pub use crate::table::{Table, TableRef}; diff --git a/src/table/src/table.rs b/src/table/src/table.rs new file mode 100644 index 0000000000..31ae9d2532 --- /dev/null +++ b/src/table/src/table.rs @@ -0,0 +1,104 @@ +use std::any::Any; +use std::collections::HashMap; +use std::sync::Arc; + +use chrono::DateTime; +use chrono::Utc; +use common_query::logical_plan::Expr; +use common_recordbatch::SendableRecordBatchStream; +use datatypes::schema::{Schema, SchemaRef}; + +use crate::error::Result; + +pub mod adapter; +pub mod memory; + +pub type TableId = u64; +pub type TableVersion = u64; + +/// Indicates whether and how a filter expression can be handled by a +/// Table for table scans. +#[derive(Debug, Clone, PartialEq)] +pub enum TableProviderFilterPushDown { + /// The expression cannot be used by the provider. + Unsupported, + /// The expression can be used to help minimise the data retrieved, + /// but the provider cannot guarantee that all returned tuples + /// satisfy the filter. The Filter plan node containing this expression + /// will be preserved. + Inexact, + /// The provider guarantees that all returned data satisfies this + /// filter expression. The Filter plan node containing this expression + /// will be removed. + Exact, +} + +/// Indicates the type of this table for metadata/catalog purposes. +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum TableType { + /// An ordinary physical table. + Base, + /// A non-materialised table that itself uses a query internally to provide data. + View, + /// A transient table. + Temporary, +} + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq, Default)] +pub struct TableIdent { + pub table_id: TableId, + pub version: TableVersion, +} + +#[derive(Debug)] +pub struct TableInfo { + pub ident: TableIdent, + pub name: String, + pub desc: Option, + pub meta: TableMeta, +} + +#[derive(Clone, Debug)] +pub struct TableMeta { + pub schema: Arc, + pub engine: String, + pub engine_options: HashMap, + pub options: HashMap, + pub created_on: DateTime, +} + +/// Table abstraction. +#[async_trait::async_trait] +pub trait Table: Send + Sync { + /// Returns the table as [`Any`](std::any::Any) so that it can be + /// downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; + + /// Get a reference to the schema for this table + fn schema(&self) -> SchemaRef; + + /// Get the type of this table for metadata/catalog purposes. + fn table_type(&self) -> TableType { + TableType::Base + } + + /// Scan the table and returns a SendableRecordBatchStream. + async fn scan( + &self, + projection: &Option>, + filters: &[Expr], + // limit can be used to reduce the amount scanned + // from the datasource as a performance optimization. + // If set, it contains the amount of rows needed by the `LogicalPlan`, + // The datasource should return *at least* this number of rows if available. + limit: Option, + ) -> Result; + + /// Tests whether the table provider can make use of a filter expression + /// to optimise data retrieval. + fn supports_filter_pushdown(&self, _filter: &Expr) -> Result { + Ok(TableProviderFilterPushDown::Unsupported) + } +} + +pub type TableRef = Arc; diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs new file mode 100644 index 0000000000..2a3145eea1 --- /dev/null +++ b/src/table/src/table/adapter.rs @@ -0,0 +1,308 @@ +use core::pin::Pin; +use core::task::{Context, Poll}; +use std::any::Any; +use std::fmt; +use std::fmt::Debug; +use std::mem; +use std::sync::{Arc, Mutex}; + +use arrow::error::{ArrowError, Result as ArrowResult}; +use common_query::logical_plan::Expr; +use common_recordbatch::error::{self as recordbatch_error, Result as RecordBatchResult}; +use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; +use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; +/// Datafusion table adpaters +use datafusion::datasource::{ + datasource::TableProviderFilterPushDown as DfTableProviderFilterPushDown, TableProvider, + TableType as DfTableType, +}; +use datafusion::error::{DataFusionError, Result as DfResult}; +use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +use datafusion::logical_plan::Expr as DfExpr; +use datafusion::physical_plan::{ + expressions::PhysicalSortExpr, ExecutionPlan, Partitioning, + RecordBatchStream as DfRecordBatchStream, + SendableRecordBatchStream as DfSendableRecordBatchStream, Statistics, +}; +use datafusion_common::record_batch::RecordBatch as DfRecordBatch; +use datatypes::schema::SchemaRef as TableSchemaRef; +use datatypes::schema::{Schema, SchemaRef}; +use futures::Stream; +use snafu::prelude::*; + +use super::{Table, TableProviderFilterPushDown, TableRef, TableType}; +use crate::error::{self, Result}; + +/// Greptime SendableRecordBatchStream -> datafusion ExecutionPlan. +struct ExecutionPlanAdapter { + stream: Mutex>, + schema: SchemaRef, +} + +impl Debug for ExecutionPlanAdapter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + //TODO(dennis) better debug info + write!(f, "ExecutionPlan(PlaceHolder)") + } +} + +#[async_trait::async_trait] +impl ExecutionPlan for ExecutionPlanAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> DfSchemaRef { + self.schema.arrow_schema().clone() + } + + fn output_partitioning(&self) -> Partitioning { + // FIXME(dennis) + Partitioning::UnknownPartitioning(1) + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + // FIXME(dennis) + None + } + + fn children(&self) -> Vec> { + // TODO(dennis) + vec![] + } + + fn with_new_children( + &self, + _children: Vec>, + ) -> DfResult> { + // TODO(dennis) + todo!(); + } + + async fn execute( + &self, + _partition: usize, + _runtime: Arc, + ) -> DfResult { + let mut stream = self.stream.lock().unwrap(); + + if stream.is_some() { + let stream = mem::replace(&mut *stream, None); + Ok(Box::pin(DfRecordBatchStreamAdapter::new(stream.unwrap()))) + } else { + error::ExecuteRepeatedlySnafu + .fail() + .map_err(|e| DataFusionError::External(Box::new(e))) + } + } + + fn statistics(&self) -> Statistics { + //TODO(dennis) + Statistics::default() + } +} + +/// Greptime Table -> datafusion TableProvider +pub struct DfTableProviderAdapter { + table: TableRef, +} + +impl DfTableProviderAdapter { + pub fn new(table: TableRef) -> Self { + Self { table } + } +} + +#[async_trait::async_trait] +impl TableProvider for DfTableProviderAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> DfSchemaRef { + self.table.schema().arrow_schema().clone() + } + + fn table_type(&self) -> DfTableType { + match self.table.table_type() { + TableType::Base => DfTableType::Base, + TableType::View => DfTableType::View, + TableType::Temporary => DfTableType::Temporary, + } + } + + async fn scan( + &self, + projection: &Option>, + filters: &[DfExpr], + limit: Option, + ) -> DfResult> { + let filters: Vec = filters.iter().map(Clone::clone).map(Expr::new).collect(); + + match self.table.scan(projection, &filters, limit).await { + Ok(stream) => Ok(Arc::new(ExecutionPlanAdapter { + schema: stream.schema(), + stream: Mutex::new(Some(stream)), + })), + Err(e) => Err(e.into()), + } + } + + fn supports_filter_pushdown(&self, filter: &DfExpr) -> DfResult { + match self + .table + .supports_filter_pushdown(&Expr::new(filter.clone())) + { + Ok(p) => match p { + TableProviderFilterPushDown::Unsupported => { + Ok(DfTableProviderFilterPushDown::Unsupported) + } + TableProviderFilterPushDown::Inexact => Ok(DfTableProviderFilterPushDown::Inexact), + TableProviderFilterPushDown::Exact => Ok(DfTableProviderFilterPushDown::Exact), + }, + Err(e) => Err(e.into()), + } + } +} + +/// Datafusion TableProvider -> greptime Table +pub struct TableAdapter { + table_provider: Arc, +} + +impl TableAdapter { + pub fn new(table_provider: Arc) -> Self { + Self { table_provider } + } +} + +#[async_trait::async_trait] +impl Table for TableAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> TableSchemaRef { + Arc::new(self.table_provider.schema().into()) + } + + fn table_type(&self) -> TableType { + match self.table_provider.table_type() { + DfTableType::Base => TableType::Base, + DfTableType::View => TableType::View, + DfTableType::Temporary => TableType::Temporary, + } + } + + async fn scan( + &self, + projection: &Option>, + filters: &[Expr], + limit: Option, + ) -> Result { + let filters: Vec = filters.iter().map(|e| e.df_expr().clone()).collect(); + + let execution_plan = self + .table_provider + .scan(projection, &filters, limit) + .await + .context(error::DatafusionSnafu)?; + + // FIXME(dennis) Partitioning and runtime + let runtime = RuntimeEnv::new(RuntimeConfig::default()).context(error::DatafusionSnafu)?; + let df_stream = execution_plan + .execute(0, Arc::new(runtime)) + .await + .context(error::DatafusionSnafu)?; + + Ok(Box::pin(RecordBatchStreamAdapter::new(df_stream))) + } + + fn supports_filter_pushdown(&self, filter: &Expr) -> Result { + match self + .table_provider + .supports_filter_pushdown(filter.df_expr()) + .context(error::DatafusionSnafu)? + { + DfTableProviderFilterPushDown::Unsupported => { + Ok(TableProviderFilterPushDown::Unsupported) + } + DfTableProviderFilterPushDown::Inexact => Ok(TableProviderFilterPushDown::Inexact), + DfTableProviderFilterPushDown::Exact => Ok(TableProviderFilterPushDown::Exact), + } + } +} + +/// Greptime SendableRecordBatchStream -> datafusion RecordBatchStream +pub struct DfRecordBatchStreamAdapter { + stream: SendableRecordBatchStream, +} + +impl DfRecordBatchStreamAdapter { + pub fn new(stream: SendableRecordBatchStream) -> Self { + Self { stream } + } +} + +impl DfRecordBatchStream for DfRecordBatchStreamAdapter { + fn schema(&self) -> DfSchemaRef { + self.stream.schema().arrow_schema().clone() + } +} + +impl Stream for DfRecordBatchStreamAdapter { + type Item = ArrowResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match Pin::new(&mut self.stream).poll_next(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Some(recordbatch)) => match recordbatch { + Ok(recordbatch) => Poll::Ready(Some(Ok(recordbatch.df_recordbatch))), + Err(e) => Poll::Ready(Some(Err(ArrowError::External("".to_owned(), Box::new(e))))), + }, + Poll::Ready(None) => Poll::Ready(None), + } + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } +} + +/// Datafusion SendableRecordBatchStream to greptime RecordBatchStream +pub struct RecordBatchStreamAdapter { + stream: DfSendableRecordBatchStream, +} + +impl RecordBatchStreamAdapter { + pub fn new(stream: DfSendableRecordBatchStream) -> Self { + Self { stream } + } +} + +impl RecordBatchStream for RecordBatchStreamAdapter { + fn schema(&self) -> SchemaRef { + Arc::new(Schema::new(self.stream.schema())) + } +} + +impl Stream for RecordBatchStreamAdapter { + type Item = RecordBatchResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match Pin::new(&mut self.stream).poll_next(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Some(df_recordbatch)) => Poll::Ready(Some(Ok(RecordBatch { + schema: self.schema(), + df_recordbatch: df_recordbatch.context(recordbatch_error::ArrowSnafu)?, + }))), + Poll::Ready(None) => Poll::Ready(None), + } + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } +} diff --git a/src/table/src/table/memory.rs b/src/table/src/table/memory.rs new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/src/table/src/table/memory.rs @@ -0,0 +1 @@ +