diff --git a/Cargo.lock b/Cargo.lock index c8f6deddc3..c9201010d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -281,6 +281,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d92bec98840b8f03a5ff5413de5293bfcd8bf96467cf5452609f939ec6f5de16" +[[package]] +name = "ascii-canvas" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8824ecca2e851cec16968d54a01dd372ef8f95b244fb84b84e70128be347c3c6" +dependencies = [ + "term 0.7.0", +] + [[package]] name = "async-channel" version = "1.8.0" @@ -383,6 +392,15 @@ dependencies = [ "syn", ] +[[package]] +name = "atomic" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b88d82667eca772c4aa12f0f1348b3ae643424c8876448f3f7bd5787032e234c" +dependencies = [ + "autocfg", +] + [[package]] name = "atomic_float" version = "0.1.0" @@ -609,6 +627,21 @@ dependencies = [ "shlex", ] +[[package]] +name = "bit-set" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0700ddab506f33b20a03b13996eccd309a48e5ff77d0d95926aa0210fb4e95f1" +dependencies = [ + "bit-vec", +] + +[[package]] +name = "bit-vec" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb" + [[package]] name = "bitflags" version = "1.3.2" @@ -1227,7 +1260,6 @@ dependencies = [ "serde", "serde_json", "snafu", - "table", "tempdir", "tokio", ] @@ -2089,6 +2121,12 @@ dependencies = [ "syn", ] +[[package]] +name = "diff" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8" + [[package]] name = "digest" version = "0.10.6" @@ -2158,6 +2196,18 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0688c2a7f92e427f44895cd63841bff7b29f8d7a1648b9e7e07a4a365b2e1257" +[[package]] +name = "dns-lookup" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53ecafc952c4528d9b51a458d1a8904b81783feff9fde08ab6ed2545ff396872" +dependencies = [ + "cfg-if", + "libc", + "socket2", + "winapi", +] + [[package]] name = "doc-comment" version = "0.3.3" @@ -2182,6 +2232,15 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797" +[[package]] +name = "ena" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7402b94a93c24e742487327a7cd839dc9d36fec9de9fb25b09f2dae459f36c3" +dependencies = [ + "log", +] + [[package]] name = "encode_unicode" version = "0.3.6" @@ -2409,6 +2468,7 @@ dependencies = [ "sql", "sqlparser 0.15.0", "store-api", + "substrait 0.1.0", "table", "tempdir", "tokio", @@ -3085,11 +3145,46 @@ dependencies = [ "simple_asn1", ] +[[package]] +name = "keccak" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3afef3b6eff9ce9d8ff9b3601125eec7f0c8cbac7abd14f355d053fa56c98768" +dependencies = [ + "cpufeatures", +] + +[[package]] +name = "lalrpop" +version = "0.19.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b30455341b0e18f276fa64540aff54deafb54c589de6aca68659c63dd2d5d823" +dependencies = [ + "ascii-canvas", + "atty", + "bit-set", + "diff", + "ena", + "itertools", + "lalrpop-util", + "petgraph", + "pico-args", + "regex", + "regex-syntax", + "string_cache", + "term 0.7.0", + "tiny-keccak", + "unicode-xid", +] + [[package]] name = "lalrpop-util" version = "0.19.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bcf796c978e9b4d983414f4caedc9273aa33ee214c5b887bd55fde84c85d2dc4" +dependencies = [ + "regex", +] [[package]] name = "lazy_static" @@ -3306,6 +3401,16 @@ dependencies = [ "twox-hash", ] +[[package]] +name = "mac_address" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b238e3235c8382b7653c6408ed1b08dd379bdb9fdf990fb0bbae3db2cc0ae963" +dependencies = [ + "nix 0.23.2", + "winapi", +] + [[package]] name = "mach" version = "0.3.2" @@ -3366,6 +3471,15 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +[[package]] +name = "memmap2" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b182332558b18d807c4ce1ca8ca983b34c3ee32765e47b3f0f69b90355cc1dc" +dependencies = [ + "libc", +] + [[package]] name = "memoffset" version = "0.6.5" @@ -3413,6 +3527,7 @@ version = "0.1.0" dependencies = [ "api", "async-trait", + "catalog", "common-base", "common-catalog", "common-error", @@ -3602,6 +3717,15 @@ dependencies = [ "uuid", ] +[[package]] +name = "mt19937" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12ca7f22ed370d5991a9caec16a83187e865bc8a532f889670337d5a5689e3a1" +dependencies = [ + "rand_core 0.6.4", +] + [[package]] name = "multimap" version = "0.8.3" @@ -3630,9 +3754,9 @@ dependencies = [ [[package]] name = "mysql_async" -version = "0.31.0" +version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8fbd756177cfa8248baa7c5f555b9446349822bb94810c22336ec7597a72652" +checksum = "52d8156a1f6a19224593c556c8aac642cf8070abd53d563405da92879dcf341b" dependencies = [ "bytes", "crossbeam", @@ -3648,6 +3772,7 @@ dependencies = [ "pem", "percent-encoding", "pin-project", + "priority-queue", "rustls", "rustls-pemfile", "serde", @@ -3730,6 +3855,12 @@ dependencies = [ "syn", ] +[[package]] +name = "new_debug_unreachable" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4a24736216ec316047a1fc4252e27dabb04218aa4a3f37c6e7ddbf1f9782b54" + [[package]] name = "nibble_vec" version = "0.1.0" @@ -3752,6 +3883,18 @@ dependencies = [ "memoffset 0.6.5", ] +[[package]] +name = "nix" +version = "0.24.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa52e972a9a719cecb6864fb88568781eb706bac2cd1d4f04a648542dbf78069" +dependencies = [ + "bitflags", + "cfg-if", + "libc", + "memoffset 0.6.5", +] + [[package]] name = "nom" version = "7.1.1" @@ -3912,9 +4055,9 @@ dependencies = [ [[package]] name = "object_store" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56ce10a205d9f610ae3532943039c34c145930065ce0c4284134c897fe6073b1" +checksum = "0014545954c5023b5fb8260415e54467cde434db6c824c9028a4b329f1b28e48" dependencies = [ "async-trait", "bytes", @@ -4101,6 +4244,16 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "page_size" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eebde548fbbf1ea81a99b128872779c437752fb99f217c45245e1a61dcd9edcd" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "parking" version = "2.0.0" @@ -4359,6 +4512,12 @@ dependencies = [ "uncased", ] +[[package]] +name = "pico-args" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db8bcd96cb740d03149cbad5518db9fd87126a10ab519c011893b1754134c468" + [[package]] name = "pin-project" version = "1.0.12" @@ -4500,6 +4659,12 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "precomputed-hash" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c" + [[package]] name = "prettydiff" version = "0.6.1" @@ -4531,10 +4696,20 @@ dependencies = [ "csv", "encode_unicode", "lazy_static", - "term", + "term 0.5.2", "unicode-width", ] +[[package]] +name = "priority-queue" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7685ca4cc0b3ad748c22ce6803e23b55b9206ef7715b965ebeaf41639238fdc" +dependencies = [ + "autocfg", + "indexmap", +] + [[package]] name = "proc-macro-crate" version = "0.1.5" @@ -4733,6 +4908,12 @@ dependencies = [ "unicase", ] +[[package]] +name = "puruspe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b7e158a385023d209d6d5f2585c4b468f6dcb3dd5aca9b75c4f1678c05bb375" + [[package]] name = "quanta" version = "0.10.1" @@ -5024,9 +5205,9 @@ dependencies = [ [[package]] name = "reqsign" -version = "0.6.7" +version = "0.6.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d544686c7cb5a31f085ae9f5c4e40a30dcba72d0b1d77656329dc4f90433cd6" +checksum = "605c196ac197a563c908b62470d96968ce4880fdcb64e296f55880fd21c8ad65" dependencies = [ "anyhow", "backon", @@ -5228,9 +5409,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.36.4" +version = "0.36.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb93e85278e08bb5788653183213d3a60fc242b10cb9be96586f5a73dcb67c23" +checksum = "a3807b5d10909833d3e9acd1eb5fb988f79376ff10fce42937de71a449c4c588" dependencies = [ "bitflags", "errno", @@ -5276,32 +5457,33 @@ dependencies = [ [[package]] name = "rustpython-ast" version = "0.1.0" -source = "git+https://github.com/RustPython/RustPython?rev=02a1d1d#02a1d1d7db57afbb78049599c2585cc7cd59e6d3" +source = "git+https://github.com/discord9/RustPython?rev=183e8dab#183e8dabe0027e31630368e36c6be83b5f9cb3f8" dependencies = [ "num-bigint", "rustpython-common", + "rustpython-compiler-core", ] [[package]] -name = "rustpython-bytecode" +name = "rustpython-codegen" version = "0.1.2" -source = "git+https://github.com/RustPython/RustPython?rev=02a1d1d#02a1d1d7db57afbb78049599c2585cc7cd59e6d3" +source = "git+https://github.com/discord9/RustPython?rev=183e8dab#183e8dabe0027e31630368e36c6be83b5f9cb3f8" dependencies = [ - "bincode 1.3.3", - "bitflags", - "bstr", + "ahash 0.7.6", + "indexmap", "itertools", - "lz4_flex", - "num-bigint", + "log", "num-complex", - "serde", - "static_assertions", + "num-traits", + "rustpython-ast", + "rustpython-compiler-core", + "thiserror", ] [[package]] name = "rustpython-common" version = "0.0.0" -source = "git+https://github.com/RustPython/RustPython?rev=02a1d1d#02a1d1d7db57afbb78049599c2585cc7cd59e6d3" +source = "git+https://github.com/discord9/RustPython?rev=183e8dab#183e8dabe0027e31630368e36c6be83b5f9cb3f8" dependencies = [ "ascii", "cfg-if", @@ -5324,9 +5506,9 @@ dependencies = [ [[package]] name = "rustpython-compiler" version = "0.1.2" -source = "git+https://github.com/RustPython/RustPython?rev=02a1d1d#02a1d1d7db57afbb78049599c2585cc7cd59e6d3" +source = "git+https://github.com/discord9/RustPython?rev=183e8dab#183e8dabe0027e31630368e36c6be83b5f9cb3f8" dependencies = [ - "rustpython-bytecode", + "rustpython-codegen", "rustpython-compiler-core", "rustpython-parser", "thiserror", @@ -5335,22 +5517,24 @@ dependencies = [ [[package]] name = "rustpython-compiler-core" version = "0.1.2" -source = "git+https://github.com/RustPython/RustPython?rev=02a1d1d#02a1d1d7db57afbb78049599c2585cc7cd59e6d3" +source = "git+https://github.com/discord9/RustPython?rev=183e8dab#183e8dabe0027e31630368e36c6be83b5f9cb3f8" dependencies = [ - "ahash 0.7.6", - "indexmap", + "bincode 1.3.3", + "bitflags", + "bstr", "itertools", - "log", + "lz4_flex", + "num-bigint", "num-complex", - "num-traits", - "rustpython-ast", - "rustpython-bytecode", + "serde", + "static_assertions", + "thiserror", ] [[package]] name = "rustpython-derive" version = "0.1.2" -source = "git+https://github.com/RustPython/RustPython?rev=02a1d1d#02a1d1d7db57afbb78049599c2585cc7cd59e6d3" +source = "git+https://github.com/discord9/RustPython?rev=183e8dab#183e8dabe0027e31630368e36c6be83b5f9cb3f8" dependencies = [ "indexmap", "itertools", @@ -5358,8 +5542,9 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "rustpython-bytecode", + "rustpython-codegen", "rustpython-compiler", + "rustpython-compiler-core", "rustpython-doc", "syn", "syn-ext", @@ -5377,9 +5562,12 @@ dependencies = [ [[package]] name = "rustpython-parser" version = "0.1.2" -source = "git+https://github.com/RustPython/RustPython?rev=02a1d1d#02a1d1d7db57afbb78049599c2585cc7cd59e6d3" +source = "git+https://github.com/discord9/RustPython?rev=183e8dab#183e8dabe0027e31630368e36c6be83b5f9cb3f8" dependencies = [ "ahash 0.7.6", + "anyhow", + "itertools", + "lalrpop", "lalrpop-util", "log", "num-bigint", @@ -5387,6 +5575,8 @@ dependencies = [ "phf 0.10.1", "phf_codegen 0.10.0", "rustpython-ast", + "rustpython-compiler-core", + "thiserror", "tiny-keccak", "unic-emoji-char", "unic-ucd-ident", @@ -5396,16 +5586,81 @@ dependencies = [ [[package]] name = "rustpython-pylib" version = "0.1.0" -source = "git+https://github.com/RustPython/RustPython?rev=02a1d1d#02a1d1d7db57afbb78049599c2585cc7cd59e6d3" +source = "git+https://github.com/discord9/RustPython?rev=183e8dab#183e8dabe0027e31630368e36c6be83b5f9cb3f8" dependencies = [ - "rustpython-bytecode", + "glob", + "rustpython-compiler-core", "rustpython-derive", ] +[[package]] +name = "rustpython-stdlib" +version = "0.1.2" +source = "git+https://github.com/discord9/RustPython?rev=183e8dab#183e8dabe0027e31630368e36c6be83b5f9cb3f8" +dependencies = [ + "adler32", + "ahash 0.7.6", + "ascii", + "base64", + "blake2", + "cfg-if", + "crc32fast", + "crossbeam-utils", + "csv-core", + "digest", + "dns-lookup", + "flate2", + "gethostname", + "hex", + "itertools", + "lexical-parse-float", + "libc", + "mac_address", + "md-5", + "memchr", + "memmap2", + "mt19937", + "nix 0.24.3", + "num-bigint", + "num-complex", + "num-integer", + "num-traits", + "num_enum", + "once_cell", + "page_size", + "parking_lot", + "paste", + "puruspe", + "rand 0.8.5", + "rand_core 0.6.4", + "rustpython-common", + "rustpython-derive", + "rustpython-vm", + "schannel", + "sha-1", + "sha2", + "sha3", + "socket2", + "system-configuration", + "termios", + "unic-char-property", + "unic-normal", + "unic-ucd-age", + "unic-ucd-bidi", + "unic-ucd-category", + "unic-ucd-ident", + "unicode-casing", + "unicode_names2", + "uuid", + "widestring", + "winapi", + "xml-rs", +] + [[package]] name = "rustpython-vm" version = "0.1.2" -source = "git+https://github.com/RustPython/RustPython?rev=02a1d1d#02a1d1d7db57afbb78049599c2585cc7cd59e6d3" +source = "git+https://github.com/discord9/RustPython?rev=183e8dab#183e8dabe0027e31630368e36c6be83b5f9cb3f8" dependencies = [ "adler32", "ahash 0.7.6", @@ -5420,6 +5675,7 @@ dependencies = [ "exitcode", "flate2", "getrandom 0.2.8", + "glob", "half 1.8.2", "hex", "hexf-parse", @@ -5430,7 +5686,7 @@ dependencies = [ "log", "memchr", "memoffset 0.6.5", - "nix", + "nix 0.24.3", "num-bigint", "num-complex", "num-integer", @@ -5446,13 +5702,12 @@ dependencies = [ "result-like", "rustc_version", "rustpython-ast", - "rustpython-bytecode", + "rustpython-codegen", "rustpython-common", "rustpython-compiler", "rustpython-compiler-core", "rustpython-derive", "rustpython-parser", - "rustpython-pylib", "rustyline", "schannel", "serde", @@ -5473,6 +5728,7 @@ dependencies = [ "which", "widestring", "winapi", + "windows", "winreg", ] @@ -5484,9 +5740,9 @@ checksum = "97477e48b4cf8603ad5f7aaf897467cf42ab4218a38ef76fb14c2d6773a6d6a8" [[package]] name = "rustyline" -version = "9.1.2" +version = "10.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db7826789c0e25614b03e5a54a0717a86f9ff6e6e5247f92b369472869320039" +checksum = "1d1cd5ae51d3f7bf65d7969d579d502168ef578f289452bd8ccc91de28fda20e" dependencies = [ "bitflags", "cfg-if", @@ -5496,10 +5752,9 @@ dependencies = [ "libc", "log", "memchr", - "nix", + "nix 0.24.3", "radix_trie", "scopeguard", - "smallvec", "unicode-segmentation", "unicode-width", "utf8parse", @@ -5663,10 +5918,12 @@ dependencies = [ "query", "ron", "rustpython-ast", - "rustpython-bytecode", + "rustpython-codegen", "rustpython-compiler", "rustpython-compiler-core", "rustpython-parser", + "rustpython-pylib", + "rustpython-stdlib", "rustpython-vm", "serde", "session", @@ -5907,6 +6164,16 @@ dependencies = [ "digest", ] +[[package]] +name = "sha3" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdf0c33fae925bdc080598b84bc15c55e7b9a4a43b3c704da051f977469691c9" +dependencies = [ + "digest", + "keccak", +] + [[package]] name = "sharded-slab" version = "0.1.4" @@ -6137,12 +6404,13 @@ dependencies = [ [[package]] name = "sre-engine" -version = "0.1.2" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5872399287c284fed4bc773cb7f6041623ac88213774f5e11e89e2131681fc1" +checksum = "a490c5c46c35dba9a6f5e7ee8e4d67e775eb2d2da0f115750b8d10e1c1ac2d28" dependencies = [ "bitflags", "num_enum", + "optional", ] [[package]] @@ -6249,6 +6517,19 @@ dependencies = [ "num-traits", ] +[[package]] +name = "string_cache" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "213494b7a2b503146286049378ce02b482200519accc31872ee8be91fa820a08" +dependencies = [ + "new_debug_unreachable", + "once_cell", + "parking_lot", + "phf_shared 0.10.0", + "precomputed-hash", +] + [[package]] name = "stringprep" version = "0.1.2" @@ -6388,12 +6669,34 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8" +[[package]] +name = "system-configuration" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d75182f12f490e953596550b65ee31bda7c8e043d9386174b353bda50838c3fd" +dependencies = [ + "bitflags", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "table" version = "0.1.0" dependencies = [ "async-trait", "chrono", + "common-catalog", "common-error", "common-query", "common-recordbatch", @@ -6461,6 +6764,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "term" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c59df8ac95d96ff9bede18eb7300b0fda5e5d8d90960e76f8e14ae765eedbf1f" +dependencies = [ + "dirs-next", + "rustversion", + "winapi", +] + [[package]] name = "termcolor" version = "1.1.3" @@ -6480,6 +6794,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "termios" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "411c5bf740737c7918b8b1fe232dca4dc9f8e754b8ad5e20966814001ed0ac6b" +dependencies = [ + "libc", +] + [[package]] name = "tests-integration" version = "0.1.0" @@ -6685,9 +7008,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.22.0" +version = "1.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d76ce4a75fb488c605c54bf610f221cea8b0dafb53333c1a67e8ee199dcd2ae3" +checksum = "eab6d665857cc6ca78d6e80303a02cea7a7851e85dfbd77cbdc09bd129f1ef46" dependencies = [ "autocfg", "bytes", @@ -6701,7 +7024,7 @@ dependencies = [ "socket2", "tokio-macros", "tracing", - "winapi", + "windows-sys 0.42.0", ] [[package]] @@ -7087,9 +7410,9 @@ dependencies = [ [[package]] name = "typenum" -version = "1.15.0" +version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987" +checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" [[package]] name = "ucd-trie" @@ -7147,6 +7470,26 @@ dependencies = [ "unic-ucd-version", ] +[[package]] +name = "unic-normal" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f09d64d33589a94628bc2aeb037f35c2e25f3f049c7348b5aa5580b48e6bba62" +dependencies = [ + "unic-ucd-normal", +] + +[[package]] +name = "unic-ucd-age" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c8cfdfe71af46b871dc6af2c24fcd360e2f3392ee4c5111877f2947f311671c" +dependencies = [ + "unic-char-property", + "unic-char-range", + "unic-ucd-version", +] + [[package]] name = "unic-ucd-bidi" version = "0.9.0" @@ -7170,6 +7513,15 @@ dependencies = [ "unic-ucd-version", ] +[[package]] +name = "unic-ucd-hangul" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb1dc690e19010e1523edb9713224cba5ef55b54894fe33424439ec9a40c0054" +dependencies = [ + "unic-ucd-version", +] + [[package]] name = "unic-ucd-ident" version = "0.9.0" @@ -7181,6 +7533,18 @@ dependencies = [ "unic-ucd-version", ] +[[package]] +name = "unic-ucd-normal" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86aed873b8202d22b13859dda5fe7c001d271412c31d411fd9b827e030569410" +dependencies = [ + "unic-char-property", + "unic-char-range", + "unic-ucd-hangul", + "unic-ucd-version", +] + [[package]] name = "unic-ucd-version" version = "0.9.0" @@ -7296,8 +7660,22 @@ version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "422ee0de9031b5b948b97a8fc04e3aa35230001a722ddd27943e0be31564ce4c" dependencies = [ + "atomic", "getrandom 0.2.8", + "rand 0.8.5", "serde", + "uuid-macro-internal", +] + +[[package]] +name = "uuid-macro-internal" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73bc89f2894593e665241e0052c3791999e6787b7c4831daa0a5c2e637e276d8" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -7533,6 +7911,19 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1c4bd0a50ac6020f65184721f758dba47bb9fbc2133df715ec74a237b26794a" +dependencies = [ + "windows_aarch64_msvc 0.39.0", + "windows_i686_gnu 0.39.0", + "windows_i686_msvc 0.39.0", + "windows_x86_64_gnu 0.39.0", + "windows_x86_64_msvc 0.39.0", +] + [[package]] name = "windows-sys" version = "0.36.1" @@ -7573,6 +7964,12 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" +[[package]] +name = "windows_aarch64_msvc" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec7711666096bd4096ffa835238905bb33fb87267910e154b18b44eaabb340f2" + [[package]] name = "windows_aarch64_msvc" version = "0.42.0" @@ -7585,6 +7982,12 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" +[[package]] +name = "windows_i686_gnu" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "763fc57100a5f7042e3057e7e8d9bdd7860d330070251a73d003563a3bb49e1b" + [[package]] name = "windows_i686_gnu" version = "0.42.0" @@ -7597,6 +8000,12 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" +[[package]] +name = "windows_i686_msvc" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bc7cbfe58828921e10a9f446fcaaf649204dcfe6c1ddd712c5eebae6bda1106" + [[package]] name = "windows_i686_msvc" version = "0.42.0" @@ -7609,6 +8018,12 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" +[[package]] +name = "windows_x86_64_gnu" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6868c165637d653ae1e8dc4d82c25d4f97dd6605eaa8d784b5c6e0ab2a252b65" + [[package]] name = "windows_x86_64_gnu" version = "0.42.0" @@ -7627,6 +8042,12 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" +[[package]] +name = "windows_x86_64_msvc" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e4d40883ae9cae962787ca76ba76390ffa29214667a111db9e0a1ad8377e809" + [[package]] name = "windows_x86_64_msvc" version = "0.42.0" @@ -7651,6 +8072,12 @@ dependencies = [ "tap", ] +[[package]] +name = "xml-rs" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2d7d3948613f75c98fd9328cfdcc45acc4d360655289d0a7d4ec931392200a3" + [[package]] name = "zstd" version = "0.11.2+zstd.1.5.2" diff --git a/src/api/build.rs b/src/api/build.rs index f3ff5f6600..3a3008c481 100644 --- a/src/api/build.rs +++ b/src/api/build.rs @@ -21,7 +21,6 @@ fn main() { .compile( &[ "greptime/v1/select.proto", - "greptime/v1/physical_plan.proto", "greptime/v1/greptime.proto", "greptime/v1/meta/common.proto", "greptime/v1/meta/heartbeat.proto", diff --git a/src/api/greptime/v1/database.proto b/src/api/greptime/v1/database.proto index 1cd6a6ee3e..6b65f6386e 100644 --- a/src/api/greptime/v1/database.proto +++ b/src/api/greptime/v1/database.proto @@ -29,15 +29,9 @@ message SelectExpr { oneof expr { string sql = 1; bytes logical_plan = 2; - PhysicalPlan physical_plan = 15; } } -message PhysicalPlan { - bytes original_ql = 1; - bytes plan = 2; -} - message InsertExpr { string schema_name = 1; string table_name = 2; diff --git a/src/api/greptime/v1/physical_plan.proto b/src/api/greptime/v1/physical_plan.proto deleted file mode 100644 index 58444a5af4..0000000000 --- a/src/api/greptime/v1/physical_plan.proto +++ /dev/null @@ -1,33 +0,0 @@ -syntax = "proto3"; - -package greptime.v1.codec; - -message PhysicalPlanNode { - oneof PhysicalPlanType { - ProjectionExecNode projection = 1; - MockInputExecNode mock = 99; - // TODO(fys): impl other physical plan node - } -} - -message ProjectionExecNode { - PhysicalPlanNode input = 1; - repeated PhysicalExprNode expr = 2; - repeated string expr_name = 3; -} - -message PhysicalExprNode { - oneof ExprType { - PhysicalColumn column = 1; - // TODO(fys): impl other physical expr node - } -} - -message PhysicalColumn { - string name = 1; - uint64 index = 2; -} - -message MockInputExecNode { - string name = 1; -} diff --git a/src/api/src/serde.rs b/src/api/src/serde.rs index 1523bfbcfe..18dd19b5fa 100644 --- a/src/api/src/serde.rs +++ b/src/api/src/serde.rs @@ -15,7 +15,7 @@ pub use prost::DecodeError; use prost::Message; -use crate::v1::codec::{PhysicalPlanNode, SelectResult}; +use crate::v1::codec::SelectResult; use crate::v1::meta::TableRouteValue; macro_rules! impl_convert_with_bytes { @@ -37,7 +37,6 @@ macro_rules! impl_convert_with_bytes { } impl_convert_with_bytes!(SelectResult); -impl_convert_with_bytes!(PhysicalPlanNode); impl_convert_with_bytes!(TableRouteValue); #[cfg(test)] diff --git a/src/common/catalog/src/helper.rs b/src/catalog/src/helper.rs similarity index 98% rename from src/common/catalog/src/helper.rs rename to src/catalog/src/helper.rs index dcfa08e8a7..2caf098865 100644 --- a/src/common/catalog/src/helper.rs +++ b/src/catalog/src/helper.rs @@ -15,18 +15,19 @@ use std::collections::HashMap; use std::fmt::{Display, Formatter}; +use common_catalog::error::{ + DeserializeCatalogEntryValueSnafu, Error, InvalidCatalogSnafu, SerializeCatalogEntryValueSnafu, +}; use lazy_static::lazy_static; use regex::Regex; use serde::{Deserialize, Serialize, Serializer}; use snafu::{ensure, OptionExt, ResultExt}; use table::metadata::{RawTableInfo, TableId, TableVersion}; -use crate::consts::{ - CATALOG_KEY_PREFIX, SCHEMA_KEY_PREFIX, TABLE_GLOBAL_KEY_PREFIX, TABLE_REGIONAL_KEY_PREFIX, -}; -use crate::error::{ - DeserializeCatalogEntryValueSnafu, Error, InvalidCatalogSnafu, SerializeCatalogEntryValueSnafu, -}; +const CATALOG_KEY_PREFIX: &str = "__c"; +const SCHEMA_KEY_PREFIX: &str = "__s"; +const TABLE_GLOBAL_KEY_PREFIX: &str = "__tg"; +const TABLE_REGIONAL_KEY_PREFIX: &str = "__tr"; const ALPHANUMERICS_NAME_PATTERN: &str = "[a-zA-Z_][a-zA-Z0-9_]*"; diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index fc7bb42b03..d71a0c6d5b 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -29,6 +29,7 @@ use crate::error::{CreateTableSnafu, Result}; pub use crate::schema::{SchemaProvider, SchemaProviderRef}; pub mod error; +pub mod helper; pub mod local; pub mod remote; pub mod schema; diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index ba7c09f6c0..c37acdc303 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -20,10 +20,6 @@ use std::sync::Arc; use arc_swap::ArcSwap; use async_stream::stream; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; -use common_catalog::{ - build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, CatalogValue, - SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue, TableRegionalKey, TableRegionalValue, -}; use common_telemetry::{debug, info}; use futures::Stream; use futures_util::StreamExt; @@ -39,6 +35,10 @@ use crate::error::{ CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, InvalidTableSchemaSnafu, OpenTableSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu, UnimplementedSnafu, }; +use crate::helper::{ + build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, CatalogValue, + SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue, TableRegionalKey, TableRegionalValue, +}; use crate::remote::{Kv, KvBackendRef}; use crate::{ handle_system_table_request, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, diff --git a/src/catalog/tests/remote_catalog_tests.rs b/src/catalog/tests/remote_catalog_tests.rs index e5d8811e71..9903b8ff85 100644 --- a/src/catalog/tests/remote_catalog_tests.rs +++ b/src/catalog/tests/remote_catalog_tests.rs @@ -22,12 +22,12 @@ mod tests { use std::collections::HashSet; use std::sync::Arc; + use catalog::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue}; use catalog::remote::{ KvBackend, KvBackendRef, RemoteCatalogManager, RemoteCatalogProvider, RemoteSchemaProvider, }; use catalog::{CatalogList, CatalogManager, RegisterTableRequest}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - use common_catalog::{CatalogKey, CatalogValue, SchemaKey, SchemaValue}; use datatypes::schema::Schema; use futures_util::StreamExt; use table::engine::{EngineContext, TableEngineRef}; diff --git a/src/client/examples/physical.rs b/src/client/examples/physical.rs deleted file mode 100644 index 4f60fc7c43..0000000000 --- a/src/client/examples/physical.rs +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright 2022 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use client::{Client, Database}; -use common_grpc::MockExecution; -use datafusion::physical_plan::expressions::Column; -use datafusion::physical_plan::projection::ProjectionExec; -use datafusion::physical_plan::{ExecutionPlan, PhysicalExpr}; -use tracing::{event, Level}; - -fn main() { - tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::builder().finish()) - .unwrap(); - - run(); -} - -#[tokio::main] -async fn run() { - let client = Client::with_urls(vec!["127.0.0.1:3001"]); - let db = Database::new("greptime", client); - - let physical = mock_physical_plan(); - let result = db.physical_plan(physical, None).await; - - event!(Level::INFO, "result: {:#?}", result); -} - -fn mock_physical_plan() -> Arc { - let id_expr = Arc::new(Column::new("id", 0)) as Arc; - let age_expr = Arc::new(Column::new("age", 2)) as Arc; - let expr = vec![(id_expr, "id".to_string()), (age_expr, "age".to_string())]; - - let input = - Arc::new(MockExecution::new("mock_input_exec".to_string())) as Arc; - let projection = ProjectionExec::try_new(expr, input).unwrap(); - Arc::new(projection) -} diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 3228a74cf8..54ab889bf5 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -18,22 +18,17 @@ use api::v1::codec::SelectResult as GrpcSelectResult; use api::v1::column::SemanticType; use api::v1::{ object_expr, object_result, select_expr, DatabaseRequest, ExprHeader, InsertExpr, - MutateResult as GrpcMutateResult, ObjectExpr, ObjectResult as GrpcObjectResult, PhysicalPlan, - SelectExpr, + MutateResult as GrpcMutateResult, ObjectExpr, ObjectResult as GrpcObjectResult, SelectExpr, }; use common_error::status_code::StatusCode; -use common_grpc::{AsExecutionPlan, DefaultAsPlanImpl}; use common_grpc_expr::column_to_vector; use common_query::Output; use common_recordbatch::{RecordBatch, RecordBatches}; -use datafusion::physical_plan::ExecutionPlan; use datatypes::prelude::*; use datatypes::schema::{ColumnSchema, Schema}; use snafu::{ensure, OptionExt, ResultExt}; -use crate::error::{ - ColumnToVectorSnafu, ConvertSchemaSnafu, DatanodeSnafu, DecodeSelectSnafu, EncodePhysicalSnafu, -}; +use crate::error::{ColumnToVectorSnafu, ConvertSchemaSnafu, DatanodeSnafu, DecodeSelectSnafu}; use crate::{error, Client, Result}; pub const PROTOCOL_VERSION: u32 = 1; @@ -94,24 +89,6 @@ impl Database { self.do_select(select_expr).await } - pub async fn physical_plan( - &self, - physical: Arc, - original_ql: Option, - ) -> Result { - let plan = DefaultAsPlanImpl::try_from_physical_plan(physical.clone()) - .context(EncodePhysicalSnafu { physical })? - .bytes; - let original_ql = original_ql.unwrap_or_default(); - let select_expr = SelectExpr { - expr: Some(select_expr::Expr::PhysicalPlan(PhysicalPlan { - original_ql: original_ql.into_bytes(), - plan, - })), - }; - self.do_select(select_expr).await - } - pub async fn logical_plan(&self, logical_plan: Vec) -> Result { let select_expr = SelectExpr { expr: Some(select_expr::Expr::LogicalPlan(logical_plan)), diff --git a/src/common/catalog/Cargo.toml b/src/common/catalog/Cargo.toml index 5df337479c..b18c561caa 100644 --- a/src/common/catalog/Cargo.toml +++ b/src/common/catalog/Cargo.toml @@ -14,7 +14,6 @@ regex = "1.6" serde = "1.0" serde_json = "1.0" snafu = { version = "0.7", features = ["backtraces"] } -table = { path = "../../table" } [dev-dependencies] chrono = "0.4" diff --git a/src/common/catalog/src/consts.rs b/src/common/catalog/src/consts.rs index 775cddcb42..118c53930b 100644 --- a/src/common/catalog/src/consts.rs +++ b/src/common/catalog/src/consts.rs @@ -25,9 +25,3 @@ pub const MIN_USER_TABLE_ID: u32 = 1024; pub const SYSTEM_CATALOG_TABLE_ID: u32 = 0; /// scripts table id pub const SCRIPTS_TABLE_ID: u32 = 1; - -pub(crate) const CATALOG_KEY_PREFIX: &str = "__c"; -pub(crate) const SCHEMA_KEY_PREFIX: &str = "__s"; -pub(crate) const TABLE_GLOBAL_KEY_PREFIX: &str = "__tg"; -pub(crate) const TABLE_REGIONAL_KEY_PREFIX: &str = "__tr"; -pub const TABLE_ID_KEY_PREFIX: &str = "__tid"; diff --git a/src/common/catalog/src/lib.rs b/src/common/catalog/src/lib.rs index 30e01900b3..841420c219 100644 --- a/src/common/catalog/src/lib.rs +++ b/src/common/catalog/src/lib.rs @@ -14,10 +14,3 @@ pub mod consts; pub mod error; -mod helper; - -pub use helper::{ - build_catalog_prefix, build_schema_prefix, build_table_global_prefix, - build_table_regional_prefix, CatalogKey, CatalogValue, SchemaKey, SchemaValue, TableGlobalKey, - TableGlobalValue, TableRegionalKey, TableRegionalValue, -}; diff --git a/src/common/function/src/scalars/math/mod.rs b/src/common/function/src/scalars/math/mod.rs index db3d4a51bf..3934a7f3a2 100644 --- a/src/common/function/src/scalars/math/mod.rs +++ b/src/common/function/src/scalars/math/mod.rs @@ -13,10 +13,12 @@ // limitations under the License. mod pow; +mod rate; use std::sync::Arc; pub use pow::PowFunction; +pub use rate::RateFunction; use crate::scalars::function_registry::FunctionRegistry; @@ -25,5 +27,6 @@ pub(crate) struct MathFunction; impl MathFunction { pub fn register(registry: &FunctionRegistry) { registry.register(Arc::new(PowFunction::default())); + registry.register(Arc::new(RateFunction::default())) } } diff --git a/src/common/function/src/scalars/math/rate.rs b/src/common/function/src/scalars/math/rate.rs new file mode 100644 index 0000000000..628a19408a --- /dev/null +++ b/src/common/function/src/scalars/math/rate.rs @@ -0,0 +1,116 @@ +// Copyright 2022 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt; + +use arrow::array::Array; +use common_query::error::{FromArrowArraySnafu, Result, TypeCastSnafu}; +use common_query::prelude::{Signature, Volatility}; +use datatypes::arrow; +use datatypes::prelude::*; +use datatypes::vectors::{Helper, VectorRef}; +use snafu::ResultExt; + +use crate::scalars::function::{Function, FunctionContext}; + +/// generates rates from a sequence of adjacent data points. +#[derive(Clone, Debug, Default)] +pub struct RateFunction; + +impl fmt::Display for RateFunction { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "RATE") + } +} + +impl Function for RateFunction { + fn name(&self) -> &str { + "prom_rate" + } + + fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result { + Ok(ConcreteDataType::float64_datatype()) + } + + fn signature(&self) -> Signature { + Signature::uniform(2, ConcreteDataType::numerics(), Volatility::Immutable) + } + + fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result { + let val = &columns[0].to_arrow_array(); + let val_0 = val.slice(0, val.len() - 1); + let val_1 = val.slice(1, val.len() - 1); + let dv = arrow::compute::arithmetics::sub(&*val_1, &*val_0); + let ts = &columns[1].to_arrow_array(); + let ts_0 = ts.slice(0, ts.len() - 1); + let ts_1 = ts.slice(1, ts.len() - 1); + let dt = arrow::compute::arithmetics::sub(&*ts_1, &*ts_0); + fn all_to_f64(array: &dyn Array) -> Result> { + Ok(arrow::compute::cast::cast( + array, + &arrow::datatypes::DataType::Float64, + arrow::compute::cast::CastOptions { + wrapped: true, + partial: true, + }, + ) + .context(TypeCastSnafu { + typ: arrow::datatypes::DataType::Float64, + })?) + } + let dv = all_to_f64(&*dv)?; + let dt = all_to_f64(&*dt)?; + let rate = arrow::compute::arithmetics::div(&*dv, &*dt); + let v = Helper::try_into_vector(&rate).context(FromArrowArraySnafu)?; + Ok(v) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow::array::Float64Array; + use common_query::prelude::TypeSignature; + use datatypes::vectors::{Float32Vector, Int64Vector}; + + use super::*; + #[test] + fn test_rate_function() { + let rate = RateFunction::default(); + assert_eq!("prom_rate", rate.name()); + assert_eq!( + ConcreteDataType::float64_datatype(), + rate.return_type(&[]).unwrap() + ); + assert!(matches!(rate.signature(), + Signature { + type_signature: TypeSignature::Uniform(2, valid_types), + volatility: Volatility::Immutable + } if valid_types == ConcreteDataType::numerics() + )); + let values = vec![1.0, 3.0, 6.0]; + let ts = vec![0, 1, 2]; + + let args: Vec = vec![ + Arc::new(Float32Vector::from_vec(values)), + Arc::new(Int64Vector::from_vec(ts)), + ]; + let vector = rate.eval(FunctionContext::default(), &args).unwrap(); + let arr = vector.to_arrow_array(); + let expect = Arc::new(Float64Array::from_vec(vec![2.0, 3.0])); + let res = arrow::compute::comparison::eq(&*arr, &*expect); + res.iter().for_each(|x| assert!(matches!(x, Some(true)))); + } +} diff --git a/src/common/grpc/src/error.rs b/src/common/grpc/src/error.rs index bfc326e597..05c1b37d56 100644 --- a/src/common/grpc/src/error.rs +++ b/src/common/grpc/src/error.rs @@ -14,9 +14,7 @@ use std::any::Any; -use api::DecodeError; use common_error::prelude::{ErrorExt, StatusCode}; -use datafusion::error::DataFusionError; use snafu::{Backtrace, ErrorCompat, Snafu}; pub type Result = std::result::Result; @@ -24,33 +22,9 @@ pub type Result = std::result::Result; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { - #[snafu(display("Unexpected empty physical plan type: {}", name))] - EmptyPhysicalPlan { name: String, backtrace: Backtrace }, - - #[snafu(display("Unexpected empty physical expr: {}", name))] - EmptyPhysicalExpr { name: String, backtrace: Backtrace }, - - #[snafu(display("Unsupported datafusion execution plan: {}", name))] - UnsupportedDfPlan { name: String, backtrace: Backtrace }, - - #[snafu(display("Unsupported datafusion physical expr: {}", name))] - UnsupportedDfExpr { name: String, backtrace: Backtrace }, - #[snafu(display("Missing required field in protobuf, field: {}", field))] MissingField { field: String, backtrace: Backtrace }, - #[snafu(display("Failed to new datafusion projection exec, source: {}", source))] - NewProjection { - source: DataFusionError, - backtrace: Backtrace, - }, - - #[snafu(display("Failed to decode physical plan node, source: {}", source))] - DecodePhysicalPlanNode { - source: DecodeError, - backtrace: Backtrace, - }, - #[snafu(display( "Write type mismatch, column name: {}, expected: {}, actual: {}", column_name, @@ -89,17 +63,8 @@ pub enum Error { impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { - Error::EmptyPhysicalPlan { .. } - | Error::EmptyPhysicalExpr { .. } - | Error::MissingField { .. } - | Error::TypeMismatch { .. } => StatusCode::InvalidArguments, - Error::UnsupportedDfPlan { .. } | Error::UnsupportedDfExpr { .. } => { - StatusCode::Unsupported - } - Error::NewProjection { .. } - | Error::DecodePhysicalPlanNode { .. } - | Error::CreateChannel { .. } - | Error::Conversion { .. } => StatusCode::Internal, + Error::MissingField { .. } | Error::TypeMismatch { .. } => StatusCode::InvalidArguments, + Error::CreateChannel { .. } | Error::Conversion { .. } => StatusCode::Internal, Error::CollectRecordBatches { source } => source.status_code(), Error::ColumnDataType { source } => source.status_code(), } @@ -126,50 +91,6 @@ mod tests { None } - #[test] - fn test_empty_physical_plan_error() { - let e = throw_none_option() - .context(EmptyPhysicalPlanSnafu { name: "test" }) - .err() - .unwrap(); - - assert!(e.backtrace_opt().is_some()); - assert_eq!(e.status_code(), StatusCode::InvalidArguments); - } - - #[test] - fn test_empty_physical_expr_error() { - let e = throw_none_option() - .context(EmptyPhysicalExprSnafu { name: "test" }) - .err() - .unwrap(); - - assert!(e.backtrace_opt().is_some()); - assert_eq!(e.status_code(), StatusCode::InvalidArguments); - } - - #[test] - fn test_unsupported_df_plan_error() { - let e = throw_none_option() - .context(UnsupportedDfPlanSnafu { name: "test" }) - .err() - .unwrap(); - - assert!(e.backtrace_opt().is_some()); - assert_eq!(e.status_code(), StatusCode::Unsupported); - } - - #[test] - fn test_unsupported_df_expr_error() { - let e = throw_none_option() - .context(UnsupportedDfExprSnafu { name: "test" }) - .err() - .unwrap(); - - assert!(e.backtrace_opt().is_some()); - assert_eq!(e.status_code(), StatusCode::Unsupported); - } - #[test] fn test_missing_field_error() { let e = throw_none_option() @@ -181,33 +102,6 @@ mod tests { assert_eq!(e.status_code(), StatusCode::InvalidArguments); } - #[test] - fn test_new_projection_error() { - fn throw_df_error() -> StdResult { - Err(DataFusionError::NotImplemented("".to_string())) - } - - let e = throw_df_error().context(NewProjectionSnafu).err().unwrap(); - - assert!(e.backtrace_opt().is_some()); - assert_eq!(e.status_code(), StatusCode::Internal); - } - - #[test] - fn test_decode_physical_plan_node_error() { - fn throw_decode_error() -> StdResult { - Err(DecodeError::new("test")) - } - - let e = throw_decode_error() - .context(DecodePhysicalPlanNodeSnafu) - .err() - .unwrap(); - - assert!(e.backtrace_opt().is_some()); - assert_eq!(e.status_code(), StatusCode::Internal); - } - #[test] fn test_type_mismatch_error() { let e = throw_none_option() diff --git a/src/common/grpc/src/lib.rs b/src/common/grpc/src/lib.rs index a3d95e0805..9ea0b06cae 100644 --- a/src/common/grpc/src/lib.rs +++ b/src/common/grpc/src/lib.rs @@ -14,10 +14,7 @@ pub mod channel_manager; pub mod error; -pub mod physical; pub mod select; pub mod writer; pub use error::Error; -pub use physical::plan::{DefaultAsPlanImpl, MockExecution}; -pub use physical::AsExecutionPlan; diff --git a/src/common/grpc/src/physical.rs b/src/common/grpc/src/physical.rs deleted file mode 100644 index 40ce20bef6..0000000000 --- a/src/common/grpc/src/physical.rs +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright 2022 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -mod expr; -pub mod plan; - -use std::result::Result; -use std::sync::Arc; - -use datafusion::physical_plan::ExecutionPlan; - -pub type ExecutionPlanRef = Arc; - -pub trait AsExecutionPlan { - type Error: std::error::Error; - - fn try_into_physical_plan(&self) -> Result; - - fn try_from_physical_plan(plan: ExecutionPlanRef) -> Result - where - Self: Sized; -} diff --git a/src/common/grpc/src/physical/expr.rs b/src/common/grpc/src/physical/expr.rs deleted file mode 100644 index f0186d192e..0000000000 --- a/src/common/grpc/src/physical/expr.rs +++ /dev/null @@ -1,100 +0,0 @@ -// Copyright 2022 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::result::Result; -use std::sync::Arc; - -use api::v1::codec; -use datafusion::physical_plan::expressions::Column as DfColumn; -use datafusion::physical_plan::PhysicalExpr as DfPhysicalExpr; -use snafu::OptionExt; - -use crate::error::{EmptyPhysicalExprSnafu, Error, UnsupportedDfExprSnafu}; - -// grpc -> datafusion (physical expr) -pub(crate) fn parse_grpc_physical_expr( - proto: &codec::PhysicalExprNode, -) -> Result, Error> { - let expr_type = proto.expr_type.as_ref().context(EmptyPhysicalExprSnafu { - name: format!("{:?}", proto), - })?; - - // TODO(fys): impl other physical expr - let pexpr: Arc = match expr_type { - codec::physical_expr_node::ExprType::Column(c) => { - let pcol = DfColumn::new(&c.name, c.index as usize); - Arc::new(pcol) - } - }; - Ok(pexpr) -} - -// datafusion -> grpc (physical expr) -pub(crate) fn parse_df_physical_expr( - df_expr: Arc, -) -> Result { - let expr = df_expr.as_any(); - - // TODO(fys): impl other physical expr - if let Some(expr) = expr.downcast_ref::() { - Ok(codec::PhysicalExprNode { - expr_type: Some(codec::physical_expr_node::ExprType::Column( - codec::PhysicalColumn { - name: expr.name().to_string(), - index: expr.index() as u64, - }, - )), - }) - } else { - UnsupportedDfExprSnafu { - name: df_expr.to_string(), - } - .fail()? - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use api::v1::codec::physical_expr_node::ExprType::Column; - use api::v1::codec::{PhysicalColumn, PhysicalExprNode}; - use datafusion::physical_plan::expressions::Column as DfColumn; - use datafusion::physical_plan::PhysicalExpr; - - use crate::physical::expr::{parse_df_physical_expr, parse_grpc_physical_expr}; - - #[test] - fn test_column_convert() { - // mock df_column_expr - let df_column = DfColumn::new("name", 11); - let df_column_clone = df_column.clone(); - let df_expr = Arc::new(df_column) as Arc; - - // mock grpc_column_expr - let grpc_expr = PhysicalExprNode { - expr_type: Some(Column(PhysicalColumn { - name: "name".to_owned(), - index: 11, - })), - }; - - let result = parse_df_physical_expr(df_expr).unwrap(); - assert_eq!(grpc_expr, result); - - let result = parse_grpc_physical_expr(&grpc_expr).unwrap(); - let df_column = result.as_any().downcast_ref::().unwrap(); - assert_eq!(df_column_clone, df_column.to_owned()); - } -} diff --git a/src/common/grpc/src/physical/plan.rs b/src/common/grpc/src/physical/plan.rs deleted file mode 100644 index 798f31b452..0000000000 --- a/src/common/grpc/src/physical/plan.rs +++ /dev/null @@ -1,280 +0,0 @@ -// Copyright 2022 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::ops::Deref; -use std::result::Result; -use std::sync::Arc; - -use api::v1::codec::physical_plan_node::PhysicalPlanType; -use api::v1::codec::{MockInputExecNode, PhysicalPlanNode, ProjectionExecNode}; -use async_trait::async_trait; -use datafusion::execution::runtime_env::RuntimeEnv; -use datafusion::field_util::SchemaExt; -use datafusion::physical_plan::memory::MemoryStream; -use datafusion::physical_plan::projection::ProjectionExec; -use datafusion::physical_plan::{ - ExecutionPlan, PhysicalExpr, SendableRecordBatchStream, Statistics, -}; -use datafusion::record_batch::RecordBatch; -use datatypes::arrow::array::{PrimitiveArray, Utf8Array}; -use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use snafu::{OptionExt, ResultExt}; - -use crate::error::{ - DecodePhysicalPlanNodeSnafu, EmptyPhysicalPlanSnafu, Error, MissingFieldSnafu, - NewProjectionSnafu, UnsupportedDfPlanSnafu, -}; -use crate::physical::{expr, AsExecutionPlan, ExecutionPlanRef}; - -pub struct DefaultAsPlanImpl { - pub bytes: Vec, -} - -impl AsExecutionPlan for DefaultAsPlanImpl { - type Error = Error; - - // Vec -> PhysicalPlanNode -> ExecutionPlanRef - fn try_into_physical_plan(&self) -> Result { - let physicalplan_node: PhysicalPlanNode = self - .bytes - .deref() - .try_into() - .context(DecodePhysicalPlanNodeSnafu)?; - physicalplan_node.try_into_physical_plan() - } - - // ExecutionPlanRef -> PhysicalPlanNode -> Vec - fn try_from_physical_plan(plan: ExecutionPlanRef) -> Result - where - Self: Sized, - { - let bytes: Vec = PhysicalPlanNode::try_from_physical_plan(plan)?.into(); - Ok(DefaultAsPlanImpl { bytes }) - } -} - -impl AsExecutionPlan for PhysicalPlanNode { - type Error = Error; - - fn try_into_physical_plan(&self) -> Result { - let plan = self - .physical_plan_type - .as_ref() - .context(EmptyPhysicalPlanSnafu { - name: format!("{:?}", self), - })?; - - // TODO(fys): impl other physical plan type - match plan { - PhysicalPlanType::Projection(projection) => { - let input = if let Some(input) = &projection.input { - input.as_ref().try_into_physical_plan()? - } else { - MissingFieldSnafu { field: "input" }.fail()? - }; - let exprs = projection - .expr - .iter() - .zip(projection.expr_name.iter()) - .map(|(expr, name)| { - Ok((expr::parse_grpc_physical_expr(expr)?, name.to_string())) - }) - .collect::, String)>, Error>>()?; - - let projection = - ProjectionExec::try_new(exprs, input).context(NewProjectionSnafu)?; - - Ok(Arc::new(projection)) - } - PhysicalPlanType::Mock(mock) => Ok(Arc::new(MockExecution { - name: mock.name.to_string(), - })), - } - } - - fn try_from_physical_plan(plan: ExecutionPlanRef) -> Result - where - Self: Sized, - { - let plan = plan.as_any(); - - if let Some(exec) = plan.downcast_ref::() { - let input = PhysicalPlanNode::try_from_physical_plan(exec.input().to_owned())?; - - let expr = exec - .expr() - .iter() - .map(|expr| expr::parse_df_physical_expr(expr.0.clone())) - .collect::, Error>>()?; - - let expr_name = exec.expr().iter().map(|expr| expr.1.clone()).collect(); - - Ok(PhysicalPlanNode { - physical_plan_type: Some(PhysicalPlanType::Projection(Box::new( - ProjectionExecNode { - input: Some(Box::new(input)), - expr, - expr_name, - }, - ))), - }) - } else if let Some(exec) = plan.downcast_ref::() { - Ok(PhysicalPlanNode { - physical_plan_type: Some(PhysicalPlanType::Mock(MockInputExecNode { - name: exec.name.clone(), - })), - }) - } else { - UnsupportedDfPlanSnafu { - name: format!("{:?}", plan), - } - .fail()? - } - } -} - -// TODO(fys): use "test" feature to enable it -#[derive(Debug)] -pub struct MockExecution { - name: String, -} - -impl MockExecution { - pub fn new(name: String) -> Self { - Self { name } - } -} - -#[async_trait] -impl ExecutionPlan for MockExecution { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn schema(&self) -> SchemaRef { - let field1 = Field::new("id", DataType::UInt32, false); - let field2 = Field::new("name", DataType::Utf8, false); - let field3 = Field::new("age", DataType::UInt32, false); - Arc::new(Schema::new(vec![field1, field2, field3])) - } - - fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning { - unimplemented!() - } - - fn output_ordering( - &self, - ) -> Option<&[datafusion::physical_plan::expressions::PhysicalSortExpr]> { - unimplemented!() - } - - fn children(&self) -> Vec { - unimplemented!() - } - - fn with_new_children( - &self, - _children: Vec, - ) -> datafusion::error::Result { - unimplemented!() - } - - async fn execute( - &self, - _partition: usize, - _runtime: Arc, - ) -> datafusion::error::Result { - let id_array = Arc::new(PrimitiveArray::from_slice([1u32, 2, 3, 4, 5])); - let name_array = Arc::new(Utf8Array::::from_slice([ - "zhangsan", "lisi", "wangwu", "Tony", "Mike", - ])); - let age_array = Arc::new(PrimitiveArray::from_slice([25u32, 28, 27, 35, 25])); - let schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::UInt32, false), - Field::new("name", DataType::Utf8, false), - Field::new("age", DataType::UInt32, false), - ])); - let record_batch = - RecordBatch::try_new(schema, vec![id_array, name_array, age_array]).unwrap(); - let data: Vec = vec![record_batch]; - let projection = Some(vec![0, 1, 2]); - let stream = MemoryStream::try_new(data, self.schema(), projection).unwrap(); - Ok(Box::pin(stream)) - } - - fn statistics(&self) -> Statistics { - todo!() - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use api::v1::codec::PhysicalPlanNode; - use datafusion::physical_plan::expressions::Column; - use datafusion::physical_plan::projection::ProjectionExec; - - use crate::physical::plan::{DefaultAsPlanImpl, MockExecution}; - use crate::physical::{AsExecutionPlan, ExecutionPlanRef}; - - #[test] - fn test_convert_df_projection_with_bytes() { - let projection_exec = mock_df_projection(); - - let bytes = DefaultAsPlanImpl::try_from_physical_plan(projection_exec).unwrap(); - let exec = bytes.try_into_physical_plan().unwrap(); - - verify_df_projection(exec); - } - - #[test] - fn test_convert_df_with_grpc_projection() { - let projection_exec = mock_df_projection(); - - let projection_node = PhysicalPlanNode::try_from_physical_plan(projection_exec).unwrap(); - let exec = projection_node.try_into_physical_plan().unwrap(); - - verify_df_projection(exec); - } - - fn mock_df_projection() -> Arc { - let mock_input = Arc::new(MockExecution { - name: "mock_input".to_string(), - }); - let column1 = Arc::new(Column::new("id", 0)); - let column2 = Arc::new(Column::new("name", 1)); - Arc::new( - ProjectionExec::try_new( - vec![(column1, "id".to_string()), (column2, "name".to_string())], - mock_input, - ) - .unwrap(), - ) - } - - fn verify_df_projection(exec: ExecutionPlanRef) { - let projection_exec = exec.as_any().downcast_ref::().unwrap(); - let mock_input = projection_exec - .input() - .as_any() - .downcast_ref::() - .unwrap(); - - assert_eq!("mock_input", mock_input.name); - assert_eq!(2, projection_exec.expr().len()); - assert_eq!("id", projection_exec.expr()[0].1); - assert_eq!("name", projection_exec.expr()[1].1); - } -} diff --git a/src/common/query/src/error.rs b/src/common/query/src/error.rs index 31b42f6ebf..e736214e49 100644 --- a/src/common/query/src/error.rs +++ b/src/common/query/src/error.rs @@ -14,8 +14,10 @@ use std::any::Any; +use arrow::error::ArrowError; use common_error::prelude::*; use datafusion_common::DataFusionError; +use datatypes::arrow; use datatypes::arrow::datatypes::DataType as ArrowDatatype; use datatypes::error::Error as DataTypeError; use datatypes::prelude::ConcreteDataType; @@ -24,8 +26,14 @@ use statrs::StatsError; common_error::define_opaque_error!(Error); #[derive(Debug, Snafu)] -#[snafu(visibility(pub(crate)))] -pub(crate) enum InnerError { +#[snafu(visibility(pub))] +pub enum InnerError { + #[snafu(display("Fail to cast array to {:?}, source: {}", typ, source))] + TypeCast { + source: ArrowError, + typ: arrow::datatypes::DataType, + }, + #[snafu(display("Fail to execute function, source: {}", source))] ExecuteFunction { source: DataFusionError, @@ -51,6 +59,12 @@ pub(crate) enum InnerError { source: DataTypeError, }, + #[snafu(display("Fail to cast arrow array into vector: {}", source))] + FromArrowArray { + #[snafu(backtrace)] + source: DataTypeError, + }, + #[snafu(display("Fail to cast arrow array into vector: {:?}, {}", data_type, source))] IntoVector { #[snafu(backtrace)] @@ -145,13 +159,16 @@ impl ErrorExt for InnerError { InnerError::InvalidInputs { source, .. } | InnerError::IntoVector { source, .. } | InnerError::FromScalarValue { source } - | InnerError::ConvertArrowSchema { source } => source.status_code(), + | InnerError::ConvertArrowSchema { source } + | InnerError::FromArrowArray { source } => source.status_code(), InnerError::ExecuteRepeatedly { .. } | InnerError::GeneralDataFusion { .. } | InnerError::DataFusionExecutionPlan { .. } => StatusCode::Unexpected, - InnerError::UnsupportedInputDataType { .. } => StatusCode::InvalidArguments, + InnerError::UnsupportedInputDataType { .. } | InnerError::TypeCast { .. } => { + StatusCode::InvalidArguments + } InnerError::ConvertDfRecordBatchStream { source, .. } => source.status_code(), InnerError::ExecutePhysicalPlan { source } => source.status_code(), diff --git a/src/common/substrait/src/context.rs b/src/common/substrait/src/context.rs index 893546ea48..b017e9cc9a 100644 --- a/src/common/substrait/src/context.rs +++ b/src/common/substrait/src/context.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; +use datafusion::logical_plan::DFSchemaRef; use substrait_proto::protobuf::extensions::simple_extension_declaration::{ ExtensionFunction, MappingType, }; @@ -23,6 +24,7 @@ use substrait_proto::protobuf::extensions::SimpleExtensionDeclaration; pub struct ConvertorContext { scalar_fn_names: HashMap, scalar_fn_map: HashMap, + df_schema: Option, } impl ConvertorContext { @@ -63,4 +65,13 @@ impl ConvertorContext { } result } + + pub(crate) fn set_df_schema(&mut self, schema: DFSchemaRef) { + debug_assert!(self.df_schema.is_none()); + self.df_schema.get_or_insert(schema); + } + + pub(crate) fn df_schema(&self) -> Option<&DFSchemaRef> { + self.df_schema.as_ref() + } } diff --git a/src/common/substrait/src/df_expr.rs b/src/common/substrait/src/df_expr.rs index 8267fa9cc1..d924e7b085 100644 --- a/src/common/substrait/src/df_expr.rs +++ b/src/common/substrait/src/df_expr.rs @@ -16,7 +16,7 @@ use std::collections::VecDeque; use std::str::FromStr; use datafusion::logical_plan::{Column, Expr}; -use datafusion_expr::{expr_fn, BuiltinScalarFunction, Operator}; +use datafusion_expr::{expr_fn, lit, BuiltinScalarFunction, Operator}; use datatypes::schema::Schema; use snafu::{ensure, OptionExt}; use substrait_proto::protobuf::expression::field_reference::ReferenceType as FieldReferenceType; @@ -24,7 +24,7 @@ use substrait_proto::protobuf::expression::reference_segment::{ ReferenceType as SegReferenceType, StructField, }; use substrait_proto::protobuf::expression::{ - FieldReference, ReferenceSegment, RexType, ScalarFunction, + FieldReference, Literal, ReferenceSegment, RexType, ScalarFunction, }; use substrait_proto::protobuf::function_argument::ArgType; use substrait_proto::protobuf::Expression; @@ -33,15 +33,24 @@ use crate::context::ConvertorContext; use crate::error::{ EmptyExprSnafu, InvalidParametersSnafu, MissingFieldSnafu, Result, UnsupportedExprSnafu, }; +use crate::types::{literal_type_to_scalar_value, scalar_value_as_literal_type}; /// Convert substrait's `Expression` to DataFusion's `Expr`. -pub fn to_df_expr(ctx: &ConvertorContext, expression: Expression, schema: &Schema) -> Result { +pub(crate) fn to_df_expr( + ctx: &ConvertorContext, + expression: Expression, + schema: &Schema, +) -> Result { let expr_rex_type = expression.rex_type.context(EmptyExprSnafu)?; match expr_rex_type { - RexType::Literal(_) => UnsupportedExprSnafu { - name: "substrait Literal expression", + RexType::Literal(l) => { + let t = l.literal_type.context(MissingFieldSnafu { + field: "LiteralType", + plan: "Literal", + })?; + let v = literal_type_to_scalar_value(t)?; + Ok(lit(v)) } - .fail()?, RexType::Selection(selection) => convert_selection_rex(*selection, schema), RexType::ScalarFunction(scalar_fn) => convert_scalar_function(ctx, scalar_fn, schema), RexType::WindowFunction(_) @@ -453,10 +462,21 @@ pub fn expression_from_df_expr( } } // Don't merge them with other unsupported expr arms to preserve the ordering. - Expr::ScalarVariable(..) | Expr::Literal(..) => UnsupportedExprSnafu { + Expr::ScalarVariable(..) => UnsupportedExprSnafu { name: expr.to_string(), } .fail()?, + Expr::Literal(v) => { + let t = scalar_value_as_literal_type(v)?; + let l = Literal { + nullable: true, + type_variation_reference: 0, + literal_type: Some(t), + }; + Expression { + rex_type: Some(RexType::Literal(l)), + } + } Expr::BinaryExpr { left, op, right } => { let left = expression_from_df_expr(ctx, left, schema)?; let right = expression_from_df_expr(ctx, right, schema)?; diff --git a/src/common/substrait/src/df_logical.rs b/src/common/substrait/src/df_logical.rs index 8d53ef1b08..81909cf38d 100644 --- a/src/common/substrait/src/df_logical.rs +++ b/src/common/substrait/src/df_logical.rs @@ -18,7 +18,9 @@ use bytes::{Buf, Bytes, BytesMut}; use catalog::CatalogManagerRef; use common_error::prelude::BoxedError; use common_telemetry::debug; +use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; use datafusion::datasource::TableProvider; +use datafusion::logical_plan::plan::Filter; use datafusion::logical_plan::{LogicalPlan, TableScan, ToDFSchema}; use datafusion::physical_plan::project_schema; use prost::Message; @@ -29,31 +31,33 @@ use substrait_proto::protobuf::extensions::simple_extension_declaration::Mapping use substrait_proto::protobuf::plan_rel::RelType as PlanRelType; use substrait_proto::protobuf::read_rel::{NamedTable, ReadType}; use substrait_proto::protobuf::rel::RelType; -use substrait_proto::protobuf::{Plan, PlanRel, ReadRel, Rel}; +use substrait_proto::protobuf::{FilterRel, Plan, PlanRel, ReadRel, Rel}; use table::table::adapter::DfTableProviderAdapter; use crate::context::ConvertorContext; use crate::df_expr::{expression_from_df_expr, to_df_expr}; use crate::error::{ - DFInternalSnafu, DecodeRelSnafu, EmptyPlanSnafu, EncodeRelSnafu, Error, InternalSnafu, + self, DFInternalSnafu, DecodeRelSnafu, EmptyPlanSnafu, EncodeRelSnafu, Error, InternalSnafu, InvalidParametersSnafu, MissingFieldSnafu, SchemaNotMatchSnafu, TableNotFoundSnafu, UnknownPlanSnafu, UnsupportedExprSnafu, UnsupportedPlanSnafu, }; use crate::schema::{from_schema, to_schema}; use crate::SubstraitPlan; -pub struct DFLogicalSubstraitConvertor { - catalog_manager: CatalogManagerRef, -} +pub struct DFLogicalSubstraitConvertor; impl SubstraitPlan for DFLogicalSubstraitConvertor { type Error = Error; type Plan = LogicalPlan; - fn decode(&self, message: B) -> Result { + fn decode( + &self, + message: B, + catalog_manager: CatalogManagerRef, + ) -> Result { let plan = Plan::decode(message).context(DecodeRelSnafu)?; - self.convert_plan(plan) + self.convert_plan(plan, catalog_manager) } fn encode(&self, plan: Self::Plan) -> Result { @@ -67,13 +71,11 @@ impl SubstraitPlan for DFLogicalSubstraitConvertor { } impl DFLogicalSubstraitConvertor { - pub fn new(catalog_manager: CatalogManagerRef) -> Self { - Self { catalog_manager } - } -} - -impl DFLogicalSubstraitConvertor { - pub fn convert_plan(&self, mut plan: Plan) -> Result { + fn convert_plan( + &self, + mut plan: Plan, + catalog_manager: CatalogManagerRef, + ) -> Result { // prepare convertor context let mut ctx = ConvertorContext::default(); for simple_ext in plan.extensions { @@ -99,15 +101,51 @@ impl DFLogicalSubstraitConvertor { } .fail()? }; + + self.rel_to_logical_plan(&mut ctx, Box::new(rel), catalog_manager) + } + + fn rel_to_logical_plan( + &self, + ctx: &mut ConvertorContext, + rel: Box, + catalog_manager: CatalogManagerRef, + ) -> Result { let rel_type = rel.rel_type.context(EmptyPlanSnafu)?; // build logical plan let logical_plan = match rel_type { - RelType::Read(read_rel) => self.convert_read_rel(&mut ctx, read_rel), - RelType::Filter(_filter_rel) => UnsupportedPlanSnafu { - name: "Filter Relation", + RelType::Read(read_rel) => self.convert_read_rel(ctx, read_rel, catalog_manager)?, + RelType::Filter(filter) => { + let FilterRel { + common: _, + input, + condition, + advanced_extension: _, + } = *filter; + + let input = input.context(MissingFieldSnafu { + field: "input", + plan: "Filter", + })?; + let input = Arc::new(self.rel_to_logical_plan(ctx, input, catalog_manager)?); + + let condition = condition.context(MissingFieldSnafu { + field: "condition", + plan: "Filter", + })?; + + let schema = ctx.df_schema().context(InvalidParametersSnafu { + reason: "the underlying TableScan plan should have included a table schema", + })?; + let schema = schema + .clone() + .try_into() + .context(error::ConvertDfSchemaSnafu)?; + let predicate = to_df_expr(ctx, *condition, &schema)?; + + LogicalPlan::Filter(Filter { predicate, input }) } - .fail()?, RelType::Fetch(_fetch_rel) => UnsupportedPlanSnafu { name: "Fetch Relation", } @@ -148,7 +186,7 @@ impl DFLogicalSubstraitConvertor { name: "Cross Relation", } .fail()?, - }?; + }; Ok(logical_plan) } @@ -157,6 +195,7 @@ impl DFLogicalSubstraitConvertor { &self, ctx: &mut ConvertorContext, read_rel: Box, + catalog_manager: CatalogManagerRef, ) -> Result { // Extract the catalog, schema and table name from NamedTable. Assume the first three are those names. let read_type = read_rel.read_type.context(MissingFieldSnafu { @@ -192,8 +231,7 @@ impl DFLogicalSubstraitConvertor { .map(|mask_expr| self.convert_mask_expression(mask_expr)); // Get table handle from catalog manager - let table_ref = self - .catalog_manager + let table_ref = catalog_manager .table(&catalog_name, &schema_name, &table_name) .map_err(BoxedError::new) .context(InternalSnafu)? @@ -207,7 +245,7 @@ impl DFLogicalSubstraitConvertor { let retrieved_schema = to_schema(read_rel.base_schema.unwrap_or_default())?; let retrieved_arrow_schema = retrieved_schema.arrow_schema(); ensure!( - stored_schema.fields == retrieved_arrow_schema.fields, + same_schema_without_metadata(&stored_schema, retrieved_arrow_schema), SchemaNotMatchSnafu { substrait_schema: retrieved_arrow_schema.clone(), storage_schema: stored_schema @@ -227,9 +265,11 @@ impl DFLogicalSubstraitConvertor { .to_dfschema_ref() .context(DFInternalSnafu)?; - // TODO(ruihang): Support filters and limit + ctx.set_df_schema(projected_schema.clone()); + + // TODO(ruihang): Support limit Ok(LogicalPlan::TableScan(TableScan { - table_name, + table_name: format!("{}.{}.{}", catalog_name, schema_name, table_name), source: adapter, projection, projected_schema, @@ -250,20 +290,42 @@ impl DFLogicalSubstraitConvertor { } impl DFLogicalSubstraitConvertor { - pub fn convert_df_plan(&self, plan: LogicalPlan) -> Result { - let mut ctx = ConvertorContext::default(); - - // TODO(ruihang): extract this translation logic into a separated function - // convert PlanRel - let rel = match plan { + fn logical_plan_to_rel( + &self, + ctx: &mut ConvertorContext, + plan: Arc, + ) -> Result { + Ok(match &*plan { LogicalPlan::Projection(_) => UnsupportedPlanSnafu { name: "DataFusion Logical Projection", } .fail()?, - LogicalPlan::Filter(_) => UnsupportedPlanSnafu { - name: "DataFusion Logical Filter", + LogicalPlan::Filter(filter) => { + let input = Some(Box::new( + self.logical_plan_to_rel(ctx, filter.input.clone())?, + )); + + let schema = plan + .schema() + .clone() + .try_into() + .context(error::ConvertDfSchemaSnafu)?; + let condition = Some(Box::new(expression_from_df_expr( + ctx, + &filter.predicate, + &schema, + )?)); + + let rel = FilterRel { + common: None, + input, + condition, + advanced_extension: None, + }; + Rel { + rel_type: Some(RelType::Filter(Box::new(rel))), + } } - .fail()?, LogicalPlan::Window(_) => UnsupportedPlanSnafu { name: "DataFusion Logical Window", } @@ -293,7 +355,7 @@ impl DFLogicalSubstraitConvertor { } .fail()?, LogicalPlan::TableScan(table_scan) => { - let read_rel = self.convert_table_scan_plan(&mut ctx, table_scan)?; + let read_rel = self.convert_table_scan_plan(ctx, table_scan)?; Rel { rel_type: Some(RelType::Read(Box::new(read_rel))), } @@ -319,7 +381,13 @@ impl DFLogicalSubstraitConvertor { ), } .fail()?, - }; + }) + } + + fn convert_df_plan(&self, plan: LogicalPlan) -> Result { + let mut ctx = ConvertorContext::default(); + + let rel = self.logical_plan_to_rel(&mut ctx, Arc::new(plan))?; // convert extension let extensions = ctx.generate_function_extension(); @@ -341,7 +409,7 @@ impl DFLogicalSubstraitConvertor { pub fn convert_table_scan_plan( &self, ctx: &mut ConvertorContext, - table_scan: TableScan, + table_scan: &TableScan, ) -> Result { let provider = table_scan .source @@ -363,7 +431,8 @@ impl DFLogicalSubstraitConvertor { // assemble projection let projection = table_scan .projection - .map(|proj| self.convert_schema_projection(&proj)); + .as_ref() + .map(|x| self.convert_schema_projection(x)); // assemble base (unprojected) schema using Table's schema. let base_schema = from_schema(&provider.table().schema())?; @@ -371,7 +440,8 @@ impl DFLogicalSubstraitConvertor { // make conjunction over a list of filters and convert the result to substrait let filter = if let Some(conjunction) = table_scan .filters - .into_iter() + .iter() + .cloned() .reduce(|accum, expr| accum.and(expr)) { Some(Box::new(expression_from_df_expr( @@ -412,6 +482,13 @@ impl DFLogicalSubstraitConvertor { } } +fn same_schema_without_metadata(lhs: &ArrowSchemaRef, rhs: &ArrowSchemaRef) -> bool { + lhs.fields.len() == rhs.fields.len() + && lhs.fields.iter().zip(rhs.fields.iter()).all(|(x, y)| { + x.name == y.name && x.data_type == y.data_type && x.is_nullable == y.is_nullable + }) +} + #[cfg(test)] mod test { use catalog::local::{LocalCatalogManager, MemoryCatalogProvider, MemorySchemaProvider}; @@ -463,10 +540,10 @@ mod test { } async fn logical_plan_round_trip(plan: LogicalPlan, catalog: CatalogManagerRef) { - let convertor = DFLogicalSubstraitConvertor::new(catalog); + let convertor = DFLogicalSubstraitConvertor; let proto = convertor.encode(plan.clone()).unwrap(); - let tripped_plan = convertor.decode(proto).unwrap(); + let tripped_plan = convertor.decode(proto, catalog).unwrap(); assert_eq!(format!("{:?}", plan), format!("{:?}", tripped_plan)); } @@ -488,6 +565,7 @@ mod test { .await .unwrap(); let adapter = Arc::new(DfTableProviderAdapter::new(table_ref)); + let projection = vec![1, 3, 5]; let df_schema = adapter.schema().to_dfschema().unwrap(); let projected_fields = projection @@ -498,7 +576,10 @@ mod test { Arc::new(DFSchema::new_with_metadata(projected_fields, Default::default()).unwrap()); let table_scan_plan = LogicalPlan::TableScan(TableScan { - table_name: DEFAULT_TABLE_NAME.to_string(), + table_name: format!( + "{}.{}.{}", + DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_TABLE_NAME + ), source: adapter, projection: Some(projection), projected_schema, diff --git a/src/common/substrait/src/error.rs b/src/common/substrait/src/error.rs index c33b3679fb..4455e9231c 100644 --- a/src/common/substrait/src/error.rs +++ b/src/common/substrait/src/error.rs @@ -99,6 +99,12 @@ pub enum Error { storage_schema: datafusion::arrow::datatypes::SchemaRef, backtrace: Backtrace, }, + + #[snafu(display("Failed to convert DataFusion schema, source: {}", source))] + ConvertDfSchema { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, } pub type Result = std::result::Result; @@ -120,6 +126,7 @@ impl ErrorExt for Error { | Error::TableNotFound { .. } | Error::SchemaNotMatch { .. } => StatusCode::InvalidArguments, Error::DFInternal { .. } | Error::Internal { .. } => StatusCode::Internal, + Error::ConvertDfSchema { source } => source.status_code(), } } diff --git a/src/common/substrait/src/lib.rs b/src/common/substrait/src/lib.rs index c318799a3b..04c5e82771 100644 --- a/src/common/substrait/src/lib.rs +++ b/src/common/substrait/src/lib.rs @@ -22,6 +22,7 @@ mod schema; mod types; use bytes::{Buf, Bytes}; +use catalog::CatalogManagerRef; pub use crate::df_logical::DFLogicalSubstraitConvertor; @@ -30,7 +31,11 @@ pub trait SubstraitPlan { type Plan; - fn decode(&self, message: B) -> Result; + fn decode( + &self, + message: B, + catalog_manager: CatalogManagerRef, + ) -> Result; fn encode(&self, plan: Self::Plan) -> Result; } diff --git a/src/common/substrait/src/types.rs b/src/common/substrait/src/types.rs index fd4cc34fbe..d1033c7a3e 100644 --- a/src/common/substrait/src/types.rs +++ b/src/common/substrait/src/types.rs @@ -18,11 +18,13 @@ //! Current we only have variations on integer types. Variation 0 (system preferred) are the same with base types, which //! are signed integer (i.e. I8 -> [i8]), and Variation 1 stands for unsigned integer (i.e. I8 -> [u8]). +use datafusion::scalar::ScalarValue; use datatypes::prelude::ConcreteDataType; +use substrait_proto::protobuf::expression::literal::LiteralType; use substrait_proto::protobuf::r#type::{self as s_type, Kind, Nullability}; -use substrait_proto::protobuf::Type as SType; +use substrait_proto::protobuf::{Type as SType, Type}; -use crate::error::{Result, UnsupportedConcreteTypeSnafu, UnsupportedSubstraitTypeSnafu}; +use crate::error::{self, Result, UnsupportedConcreteTypeSnafu, UnsupportedSubstraitTypeSnafu}; macro_rules! substrait_kind { ($desc:ident, $concrete_ty:ident) => {{ @@ -134,3 +136,67 @@ pub fn from_concrete_type(ty: ConcreteDataType, nullability: Option) -> Re Ok(SType { kind }) } + +pub(crate) fn scalar_value_as_literal_type(v: &ScalarValue) -> Result { + Ok(if v.is_null() { + LiteralType::Null(Type { kind: None }) + } else { + match v { + ScalarValue::Boolean(Some(v)) => LiteralType::Boolean(*v), + ScalarValue::Float32(Some(v)) => LiteralType::Fp32(*v), + ScalarValue::Float64(Some(v)) => LiteralType::Fp64(*v), + ScalarValue::Int8(Some(v)) => LiteralType::I8(*v as i32), + ScalarValue::Int16(Some(v)) => LiteralType::I16(*v as i32), + ScalarValue::Int32(Some(v)) => LiteralType::I32(*v), + ScalarValue::Int64(Some(v)) => LiteralType::I64(*v), + ScalarValue::LargeUtf8(Some(v)) => LiteralType::String(v.clone()), + ScalarValue::LargeBinary(Some(v)) => LiteralType::Binary(v.clone()), + // TODO(LFC): Implement other conversions: ScalarValue => LiteralType + _ => { + return error::UnsupportedExprSnafu { + name: format!("{:?}", v), + } + .fail() + } + } + }) +} + +pub(crate) fn literal_type_to_scalar_value(t: LiteralType) -> Result { + Ok(match t { + LiteralType::Null(Type { kind: Some(kind) }) => match kind { + Kind::Bool(_) => ScalarValue::Boolean(None), + Kind::I8(_) => ScalarValue::Int8(None), + Kind::I16(_) => ScalarValue::Int16(None), + Kind::I32(_) => ScalarValue::Int32(None), + Kind::I64(_) => ScalarValue::Int64(None), + Kind::Fp32(_) => ScalarValue::Float32(None), + Kind::Fp64(_) => ScalarValue::Float64(None), + Kind::String(_) => ScalarValue::LargeUtf8(None), + Kind::Binary(_) => ScalarValue::LargeBinary(None), + // TODO(LFC): Implement other conversions: Kind => ScalarValue + _ => { + return error::UnsupportedSubstraitTypeSnafu { + ty: format!("{:?}", kind), + } + .fail() + } + }, + LiteralType::Boolean(v) => ScalarValue::Boolean(Some(v)), + LiteralType::I8(v) => ScalarValue::Int8(Some(v as i8)), + LiteralType::I16(v) => ScalarValue::Int16(Some(v as i16)), + LiteralType::I32(v) => ScalarValue::Int32(Some(v)), + LiteralType::I64(v) => ScalarValue::Int64(Some(v)), + LiteralType::Fp32(v) => ScalarValue::Float32(Some(v)), + LiteralType::Fp64(v) => ScalarValue::Float64(Some(v)), + LiteralType::String(v) => ScalarValue::LargeUtf8(Some(v)), + LiteralType::Binary(v) => ScalarValue::LargeBinary(Some(v)), + // TODO(LFC): Implement other conversions: LiteralType => ScalarValue + _ => { + return error::UnsupportedSubstraitTypeSnafu { + ty: format!("{:?}", t), + } + .fail() + } + }) +} diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 27cd13e12e..caaf51b42d 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -45,7 +45,6 @@ use crate::error::{ }; use crate::heartbeat::HeartbeatTask; use crate::script::ScriptExecutor; -use crate::server::grpc::plan::PhysicalPlanner; use crate::sql::SqlHandler; mod grpc; @@ -59,7 +58,6 @@ pub struct Instance { pub(crate) query_engine: QueryEngineRef, pub(crate) sql_handler: SqlHandler, pub(crate) catalog_manager: CatalogManagerRef, - pub(crate) physical_planner: PhysicalPlanner, pub(crate) script_executor: ScriptExecutor, pub(crate) table_id_provider: Option, #[allow(unused)] @@ -159,7 +157,6 @@ impl Instance { query_engine.clone(), ), catalog_manager, - physical_planner: PhysicalPlanner::new(query_engine), script_executor, meta_client, heartbeat_task, diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index ddc03a6436..3f6d531633 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -39,7 +39,6 @@ use crate::error::{ UnsupportedExprSnafu, }; use crate::instance::Instance; -use crate::server::grpc::plan::PhysicalPlanner; impl Instance { pub async fn execute_grpc_insert( @@ -117,11 +116,6 @@ impl Instance { self.execute_sql(&sql, Arc::new(QueryContext::new())).await } Some(select_expr::Expr::LogicalPlan(plan)) => self.execute_logical(plan).await, - Some(select_expr::Expr::PhysicalPlan(api::v1::PhysicalPlan { original_ql, plan })) => { - self.physical_planner - .execute(PhysicalPlanner::parse(plan)?, original_ql) - .await - } _ => UnsupportedExprSnafu { name: format!("{:?}", expr), } @@ -151,9 +145,8 @@ impl Instance { } async fn execute_logical(&self, plan_bytes: Vec) -> Result { - let logical_plan_converter = DFLogicalSubstraitConvertor::new(self.catalog_manager.clone()); - let logical_plan = logical_plan_converter - .decode(plan_bytes.as_slice()) + let logical_plan = DFLogicalSubstraitConvertor + .decode(plan_bytes.as_slice(), self.catalog_manager.clone()) .context(DecodeLogicalPlanSnafu)?; self.query_engine diff --git a/src/datanode/src/mock.rs b/src/datanode/src/mock.rs index 73b758cc13..8282169025 100644 --- a/src/datanode/src/mock.rs +++ b/src/datanode/src/mock.rs @@ -31,7 +31,6 @@ use crate::error::Result; use crate::heartbeat::HeartbeatTask; use crate::instance::{create_local_file_log_store, new_object_store, DefaultEngine, Instance}; use crate::script::ScriptExecutor; -use crate::server::grpc::plan::PhysicalPlanner; use crate::sql::SqlHandler; impl Instance { @@ -63,7 +62,6 @@ impl Instance { catalog_manager.clone(), query_engine.clone(), ); - let physical_planner = PhysicalPlanner::new(query_engine.clone()); let script_executor = ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()) .await .unwrap(); @@ -79,7 +77,6 @@ impl Instance { query_engine, sql_handler, catalog_manager, - physical_planner, script_executor, meta_client, heartbeat_task, @@ -133,7 +130,6 @@ impl Instance { query_engine.clone(), ), catalog_manager, - physical_planner: PhysicalPlanner::new(query_engine), script_executor, table_id_provider: Some(Arc::new(LocalTableIdProvider::default())), meta_client: Some(meta_client), diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index 1a75f4c571..234c443040 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -12,5 +12,302 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod ddl; -pub(crate) mod plan; +use std::sync::Arc; + +use api::result::AdminResultBuilder; +use api::v1::{AdminResult, AlterExpr, CreateExpr, DropTableExpr}; +use common_error::prelude::{ErrorExt, StatusCode}; +use common_grpc_expr::{alter_expr_to_request, create_expr_to_request}; +use common_query::Output; +use common_telemetry::{error, info}; +use futures::TryFutureExt; +use session::context::QueryContext; +use snafu::prelude::*; +use table::requests::DropTableRequest; + +use crate::error::{AlterExprToRequestSnafu, BumpTableIdSnafu, CreateExprToRequestSnafu}; +use crate::instance::Instance; +use crate::sql::SqlRequest; + +impl Instance { + /// Handle gRPC create table requests. + pub(crate) async fn handle_create(&self, expr: CreateExpr) -> AdminResult { + // Respect CreateExpr's table id and region ids if present, or allocate table id + // from local table id provider and set region id to 0. + let table_id = if let Some(table_id) = expr.table_id { + info!( + "Creating table {:?}.{:?}.{:?} with table id from frontend: {}", + expr.catalog_name, expr.schema_name, expr.table_name, table_id + ); + table_id + } else { + match self.table_id_provider.as_ref() { + None => { + return AdminResultBuilder::default() + .status_code(StatusCode::Internal as u32) + .err_msg("Table id provider absent in standalone mode".to_string()) + .build(); + } + Some(table_id_provider) => { + match table_id_provider + .next_table_id() + .await + .context(BumpTableIdSnafu) + { + Ok(table_id) => { + info!( + "Creating table {:?}.{:?}.{:?} with table id from catalog manager: {}", + &expr.catalog_name, &expr.schema_name, expr.table_name, table_id + ); + table_id + } + Err(e) => { + error!(e;"Failed to create table id when creating table: {:?}.{:?}.{:?}", &expr.catalog_name, &expr.schema_name, expr.table_name); + return AdminResultBuilder::default() + .status_code(e.status_code() as u32) + .err_msg(e.to_string()) + .build(); + } + } + } + } + }; + + let request = create_expr_to_request(table_id, expr).context(CreateExprToRequestSnafu); + let result = futures::future::ready(request) + .and_then(|request| { + self.sql_handler().execute( + SqlRequest::CreateTable(request), + Arc::new(QueryContext::new()), + ) + }) + .await; + match result { + Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default() + .status_code(StatusCode::Success as u32) + .mutate_result(rows as u32, 0) + .build(), + // Unreachable because we are executing "CREATE TABLE"; otherwise it's an internal bug. + Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(), + Err(err) => AdminResultBuilder::default() + .status_code(err.status_code() as u32) + .err_msg(err.to_string()) + .build(), + } + } + + pub(crate) async fn handle_alter(&self, expr: AlterExpr) -> AdminResult { + let request = match alter_expr_to_request(expr) + .context(AlterExprToRequestSnafu) + .transpose() + { + None => { + return AdminResultBuilder::default() + .status_code(StatusCode::Success as u32) + .mutate_result(0, 0) + .build() + } + Some(req) => req, + }; + + let result = futures::future::ready(request) + .and_then(|request| { + self.sql_handler() + .execute(SqlRequest::Alter(request), Arc::new(QueryContext::new())) + }) + .await; + match result { + Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default() + .status_code(StatusCode::Success as u32) + .mutate_result(rows as u32, 0) + .build(), + Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(), + Err(err) => AdminResultBuilder::default() + .status_code(err.status_code() as u32) + .err_msg(err.to_string()) + .build(), + } + } + + pub(crate) async fn handle_drop_table(&self, expr: DropTableExpr) -> AdminResult { + let req = DropTableRequest { + catalog_name: expr.catalog_name, + schema_name: expr.schema_name, + table_name: expr.table_name, + }; + let result = self + .sql_handler() + .execute(SqlRequest::DropTable(req), Arc::new(QueryContext::new())) + .await; + match result { + Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default() + .status_code(StatusCode::Success as u32) + .mutate_result(rows as _, 0) + .build(), + Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(), + Err(err) => AdminResultBuilder::default() + .status_code(err.status_code() as u32) + .err_msg(err.to_string()) + .build(), + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use api::v1::{ColumnDataType, ColumnDef}; + use common_catalog::consts::MIN_USER_TABLE_ID; + use common_grpc_expr::create_table_schema; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaBuilder, SchemaRef}; + use datatypes::value::Value; + + use super::*; + + #[tokio::test(flavor = "multi_thread")] + async fn test_create_expr_to_request() { + common_telemetry::init_default_ut_logging(); + let expr = testing_create_expr(); + let request = create_expr_to_request(1024, expr).unwrap(); + assert_eq!(request.id, MIN_USER_TABLE_ID); + assert_eq!(request.catalog_name, "greptime".to_string()); + assert_eq!(request.schema_name, "public".to_string()); + assert_eq!(request.table_name, "my-metrics"); + assert_eq!(request.desc, Some("blabla".to_string())); + assert_eq!(request.schema, expected_table_schema()); + assert_eq!(request.primary_key_indices, vec![1, 0]); + assert!(request.create_if_not_exists); + + let mut expr = testing_create_expr(); + expr.primary_keys = vec!["host".to_string(), "not-exist-column".to_string()]; + let result = create_expr_to_request(1025, expr); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("Column `not-exist-column` not found in table `my-metrics`"), + "{}", + err_msg + ); + } + + #[test] + fn test_create_table_schema() { + let mut expr = testing_create_expr(); + let schema = create_table_schema(&expr).unwrap(); + assert_eq!(schema, expected_table_schema()); + + expr.time_index = "not-exist-column".to_string(); + let result = create_table_schema(&expr); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("Missing timestamp column"), + "actual: {}", + err_msg + ); + } + + #[test] + + fn test_create_column_schema() { + let column_def = ColumnDef { + name: "a".to_string(), + datatype: 1024, + is_nullable: true, + default_constraint: None, + }; + let result = column_def.try_as_column_schema(); + assert!(matches!( + result.unwrap_err(), + api::error::Error::UnknownColumnDataType { .. } + )); + + let column_def = ColumnDef { + name: "a".to_string(), + datatype: ColumnDataType::String as i32, + is_nullable: true, + default_constraint: None, + }; + let column_schema = column_def.try_as_column_schema().unwrap(); + assert_eq!(column_schema.name, "a"); + assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype()); + assert!(column_schema.is_nullable()); + + let default_constraint = ColumnDefaultConstraint::Value(Value::from("default value")); + let column_def = ColumnDef { + name: "a".to_string(), + datatype: ColumnDataType::String as i32, + is_nullable: true, + default_constraint: Some(default_constraint.clone().try_into().unwrap()), + }; + let column_schema = column_def.try_as_column_schema().unwrap(); + assert_eq!(column_schema.name, "a"); + assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype()); + assert!(column_schema.is_nullable()); + assert_eq!( + default_constraint, + *column_schema.default_constraint().unwrap() + ); + } + + fn testing_create_expr() -> CreateExpr { + let column_defs = vec![ + ColumnDef { + name: "host".to_string(), + datatype: ColumnDataType::String as i32, + is_nullable: false, + default_constraint: None, + }, + ColumnDef { + name: "ts".to_string(), + datatype: ColumnDataType::Timestamp as i32, + is_nullable: false, + default_constraint: None, + }, + ColumnDef { + name: "cpu".to_string(), + datatype: ColumnDataType::Float32 as i32, + is_nullable: true, + default_constraint: None, + }, + ColumnDef { + name: "memory".to_string(), + datatype: ColumnDataType::Float64 as i32, + is_nullable: true, + default_constraint: None, + }, + ]; + CreateExpr { + catalog_name: None, + schema_name: None, + table_name: "my-metrics".to_string(), + desc: Some("blabla".to_string()), + column_defs, + time_index: "ts".to_string(), + primary_keys: vec!["ts".to_string(), "host".to_string()], + create_if_not_exists: true, + table_options: Default::default(), + table_id: Some(MIN_USER_TABLE_ID), + region_ids: vec![0], + } + } + + fn expected_table_schema() -> SchemaRef { + let column_schemas = vec![ + ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + ColumnSchema::new("cpu", ConcreteDataType::float32_datatype(), true), + ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), + ]; + Arc::new( + SchemaBuilder::try_from(column_schemas) + .unwrap() + .build() + .unwrap(), + ) + } +} diff --git a/src/datanode/src/server/grpc/ddl.rs b/src/datanode/src/server/grpc/ddl.rs deleted file mode 100644 index 34add925a3..0000000000 --- a/src/datanode/src/server/grpc/ddl.rs +++ /dev/null @@ -1,314 +0,0 @@ -// Copyright 2022 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use api::result::AdminResultBuilder; -use api::v1::{AdminResult, AlterExpr, CreateExpr, DropTableExpr}; -use common_error::prelude::{ErrorExt, StatusCode}; -use common_grpc_expr::{alter_expr_to_request, create_expr_to_request}; -use common_query::Output; -use common_telemetry::{error, info}; -use futures::TryFutureExt; -use session::context::QueryContext; -use snafu::prelude::*; -use table::requests::DropTableRequest; - -use crate::error::{AlterExprToRequestSnafu, BumpTableIdSnafu, CreateExprToRequestSnafu}; -use crate::instance::Instance; -use crate::sql::SqlRequest; - -impl Instance { - /// Handle gRPC create table requests. - pub(crate) async fn handle_create(&self, expr: CreateExpr) -> AdminResult { - // Respect CreateExpr's table id and region ids if present, or allocate table id - // from local table id provider and set region id to 0. - let table_id = if let Some(table_id) = expr.table_id { - info!( - "Creating table {:?}.{:?}.{:?} with table id from frontend: {}", - expr.catalog_name, expr.schema_name, expr.table_name, table_id - ); - table_id - } else { - match self.table_id_provider.as_ref() { - None => { - return AdminResultBuilder::default() - .status_code(StatusCode::Internal as u32) - .err_msg("Table id provider absent in standalone mode".to_string()) - .build(); - } - Some(table_id_provider) => { - match table_id_provider - .next_table_id() - .await - .context(BumpTableIdSnafu) - { - Ok(table_id) => { - info!( - "Creating table {:?}.{:?}.{:?} with table id from catalog manager: {}", - &expr.catalog_name, &expr.schema_name, expr.table_name, table_id - ); - table_id - } - Err(e) => { - error!(e;"Failed to create table id when creating table: {:?}.{:?}.{:?}", &expr.catalog_name, &expr.schema_name, expr.table_name); - return AdminResultBuilder::default() - .status_code(e.status_code() as u32) - .err_msg(e.to_string()) - .build(); - } - } - } - } - }; - - let request = create_expr_to_request(table_id, expr).context(CreateExprToRequestSnafu); - let result = futures::future::ready(request) - .and_then(|request| { - self.sql_handler().execute( - SqlRequest::CreateTable(request), - Arc::new(QueryContext::new()), - ) - }) - .await; - match result { - Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default() - .status_code(StatusCode::Success as u32) - .mutate_result(rows as u32, 0) - .build(), - // Unreachable because we are executing "CREATE TABLE"; otherwise it's an internal bug. - Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(), - Err(err) => AdminResultBuilder::default() - .status_code(err.status_code() as u32) - .err_msg(err.to_string()) - .build(), - } - } - - pub(crate) async fn handle_alter(&self, expr: AlterExpr) -> AdminResult { - let request = match alter_expr_to_request(expr) - .context(AlterExprToRequestSnafu) - .transpose() - { - None => { - return AdminResultBuilder::default() - .status_code(StatusCode::Success as u32) - .mutate_result(0, 0) - .build() - } - Some(req) => req, - }; - - let result = futures::future::ready(request) - .and_then(|request| { - self.sql_handler() - .execute(SqlRequest::Alter(request), Arc::new(QueryContext::new())) - }) - .await; - match result { - Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default() - .status_code(StatusCode::Success as u32) - .mutate_result(rows as u32, 0) - .build(), - Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(), - Err(err) => AdminResultBuilder::default() - .status_code(err.status_code() as u32) - .err_msg(err.to_string()) - .build(), - } - } - - pub(crate) async fn handle_drop_table(&self, expr: DropTableExpr) -> AdminResult { - let req = DropTableRequest { - catalog_name: expr.catalog_name, - schema_name: expr.schema_name, - table_name: expr.table_name, - }; - let result = self - .sql_handler() - .execute(SqlRequest::DropTable(req), Arc::new(QueryContext::new())) - .await; - match result { - Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default() - .status_code(StatusCode::Success as u32) - .mutate_result(rows as _, 0) - .build(), - Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(), - Err(err) => AdminResultBuilder::default() - .status_code(err.status_code() as u32) - .err_msg(err.to_string()) - .build(), - } - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use api::v1::{ColumnDataType, ColumnDef}; - use common_catalog::consts::MIN_USER_TABLE_ID; - use common_grpc_expr::create_table_schema; - use datatypes::prelude::ConcreteDataType; - use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaBuilder, SchemaRef}; - use datatypes::value::Value; - - use super::*; - - #[tokio::test(flavor = "multi_thread")] - async fn test_create_expr_to_request() { - common_telemetry::init_default_ut_logging(); - let expr = testing_create_expr(); - let request = create_expr_to_request(1024, expr).unwrap(); - assert_eq!(request.id, MIN_USER_TABLE_ID); - assert_eq!(request.catalog_name, "greptime".to_string()); - assert_eq!(request.schema_name, "public".to_string()); - assert_eq!(request.table_name, "my-metrics"); - assert_eq!(request.desc, Some("blabla".to_string())); - assert_eq!(request.schema, expected_table_schema()); - assert_eq!(request.primary_key_indices, vec![1, 0]); - assert!(request.create_if_not_exists); - - let mut expr = testing_create_expr(); - expr.primary_keys = vec!["host".to_string(), "not-exist-column".to_string()]; - let result = create_expr_to_request(1025, expr); - let err_msg = result.unwrap_err().to_string(); - assert!( - err_msg.contains("Column `not-exist-column` not found in table `my-metrics`"), - "{}", - err_msg - ); - } - - #[test] - fn test_create_table_schema() { - let mut expr = testing_create_expr(); - let schema = create_table_schema(&expr).unwrap(); - assert_eq!(schema, expected_table_schema()); - - expr.time_index = "not-exist-column".to_string(); - let result = create_table_schema(&expr); - let err_msg = result.unwrap_err().to_string(); - assert!( - err_msg.contains("Missing timestamp column"), - "actual: {}", - err_msg - ); - } - - #[test] - - fn test_create_column_schema() { - let column_def = ColumnDef { - name: "a".to_string(), - datatype: 1024, - is_nullable: true, - default_constraint: None, - }; - let result = column_def.try_as_column_schema(); - assert!(matches!( - result.unwrap_err(), - api::error::Error::UnknownColumnDataType { .. } - )); - - let column_def = ColumnDef { - name: "a".to_string(), - datatype: ColumnDataType::String as i32, - is_nullable: true, - default_constraint: None, - }; - let column_schema = column_def.try_as_column_schema().unwrap(); - assert_eq!(column_schema.name, "a"); - assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype()); - assert!(column_schema.is_nullable()); - - let default_constraint = ColumnDefaultConstraint::Value(Value::from("default value")); - let column_def = ColumnDef { - name: "a".to_string(), - datatype: ColumnDataType::String as i32, - is_nullable: true, - default_constraint: Some(default_constraint.clone().try_into().unwrap()), - }; - let column_schema = column_def.try_as_column_schema().unwrap(); - assert_eq!(column_schema.name, "a"); - assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype()); - assert!(column_schema.is_nullable()); - assert_eq!( - default_constraint, - *column_schema.default_constraint().unwrap() - ); - } - - fn testing_create_expr() -> CreateExpr { - let column_defs = vec![ - ColumnDef { - name: "host".to_string(), - datatype: ColumnDataType::String as i32, - is_nullable: false, - default_constraint: None, - }, - ColumnDef { - name: "ts".to_string(), - datatype: ColumnDataType::Timestamp as i32, - is_nullable: false, - default_constraint: None, - }, - ColumnDef { - name: "cpu".to_string(), - datatype: ColumnDataType::Float32 as i32, - is_nullable: true, - default_constraint: None, - }, - ColumnDef { - name: "memory".to_string(), - datatype: ColumnDataType::Float64 as i32, - is_nullable: true, - default_constraint: None, - }, - ]; - CreateExpr { - catalog_name: None, - schema_name: None, - table_name: "my-metrics".to_string(), - desc: Some("blabla".to_string()), - column_defs, - time_index: "ts".to_string(), - primary_keys: vec!["ts".to_string(), "host".to_string()], - create_if_not_exists: true, - table_options: Default::default(), - table_id: Some(MIN_USER_TABLE_ID), - region_ids: vec![0], - } - } - - fn expected_table_schema() -> SchemaRef { - let column_schemas = vec![ - ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), - ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ) - .with_time_index(true), - ColumnSchema::new("cpu", ConcreteDataType::float32_datatype(), true), - ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), - ]; - Arc::new( - SchemaBuilder::try_from(column_schemas) - .unwrap() - .build() - .unwrap(), - ) - } -} diff --git a/src/datanode/src/server/grpc/plan.rs b/src/datanode/src/server/grpc/plan.rs deleted file mode 100644 index 5c228852f2..0000000000 --- a/src/datanode/src/server/grpc/plan.rs +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2022 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use common_grpc::{AsExecutionPlan, DefaultAsPlanImpl}; -use common_query::physical_plan::{PhysicalPlanAdapter, PhysicalPlanRef}; -use common_query::Output; -use datatypes::schema::Schema; -use query::QueryEngineRef; -use snafu::ResultExt; - -use crate::error::{ConvertSchemaSnafu, ExecutePhysicalPlanSnafu, IntoPhysicalPlanSnafu, Result}; - -pub struct PhysicalPlanner { - query_engine: QueryEngineRef, -} - -impl PhysicalPlanner { - pub fn new(query_engine: QueryEngineRef) -> Self { - Self { query_engine } - } - - pub fn parse(bytes: Vec) -> Result { - let physical_plan = DefaultAsPlanImpl { bytes } - .try_into_physical_plan() - .context(IntoPhysicalPlanSnafu)?; - - let schema: Arc = Arc::new( - physical_plan - .schema() - .try_into() - .context(ConvertSchemaSnafu)?, - ); - Ok(Arc::new(PhysicalPlanAdapter::new(schema, physical_plan))) - } - - pub async fn execute(&self, plan: PhysicalPlanRef, _original_ql: Vec) -> Result { - self.query_engine - .execute_physical(&plan) - .await - .context(ExecutePhysicalPlanSnafu) - } -} diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index 328fe0de24..4952e36cc0 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use std::sync::Arc; use arrow::datatypes::{Field, Schema as ArrowSchema}; +use datafusion_common::DFSchemaRef; use snafu::{ensure, ResultExt}; use crate::data_type::DataType; @@ -312,6 +313,15 @@ impl TryFrom for Schema { } } +impl TryFrom for Schema { + type Error = Error; + + fn try_from(value: DFSchemaRef) -> Result { + let s: ArrowSchema = value.as_ref().into(); + s.try_into() + } +} + fn try_parse_version(metadata: &HashMap, key: &str) -> Result { if let Some(value) = metadata.get(key) { let version = value diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index ac95ae3b6d..cd6f298bd0 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -42,6 +42,7 @@ snafu = { version = "0.7", features = ["backtraces"] } sql = { path = "../sql" } sqlparser = "0.15" store-api = { path = "../store-api" } +substrait = { path = "../common/substrait" } table = { path = "../table" } tokio = { version = "1.18", features = ["full"] } diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index 86356db08c..aea667367f 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -17,13 +17,16 @@ use std::collections::HashSet; use std::sync::Arc; use catalog::error::{self as catalog_err, InvalidCatalogValueSnafu}; +use catalog::helper::{ + build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, SchemaKey, + TableGlobalKey, TableGlobalValue, +}; use catalog::remote::{Kv, KvBackendRef}; use catalog::{ CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest, RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider, SchemaProviderRef, }; -use common_catalog::{CatalogKey, SchemaKey, TableGlobalKey, TableGlobalValue}; use futures::StreamExt; use meta_client::rpc::TableName; use snafu::prelude::*; @@ -130,7 +133,7 @@ impl CatalogList for FrontendCatalogManager { let backend = self.backend.clone(); let res = std::thread::spawn(|| { common_runtime::block_on_read(async move { - let key = common_catalog::build_catalog_prefix(); + let key = build_catalog_prefix(); let mut iter = backend.range(key.as_bytes()); let mut res = HashSet::new(); @@ -180,7 +183,7 @@ impl CatalogProvider for FrontendCatalogProvider { let catalog_name = self.catalog_name.clone(); let res = std::thread::spawn(|| { common_runtime::block_on_read(async move { - let key = common_catalog::build_schema_prefix(&catalog_name); + let key = build_schema_prefix(&catalog_name); let mut iter = backend.range(key.as_bytes()); let mut res = HashSet::new(); @@ -242,7 +245,7 @@ impl SchemaProvider for FrontendSchemaProvider { std::thread::spawn(|| { common_runtime::block_on_read(async move { - let key = common_catalog::build_table_global_prefix(catalog_name, schema_name); + let key = build_table_global_prefix(catalog_name, schema_name); let mut iter = backend.range(key.as_bytes()); let mut res = HashSet::new(); diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 9b6275c7bf..823ce693ce 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -445,6 +445,12 @@ pub enum Error { #[snafu(display("Table already exists: `{}`", table))] TableAlreadyExist { table: String, backtrace: Backtrace }, + + #[snafu(display("Failed to encode Substrait logical plan, source: {}", source))] + EncodeSubstraitLogicalPlan { + #[snafu(backtrace)] + source: substrait::error::Error, + }, } pub type Result = std::result::Result; @@ -536,6 +542,7 @@ impl ErrorExt for Error { Error::AlterExprToRequest { source, .. } => source.status_code(), Error::LeaderNotFound { .. } => StatusCode::StorageUnavailable, Error::TableAlreadyExist { .. } => StatusCode::TableAlreadyExists, + Error::EncodeSubstraitLogicalPlan { source } => source.status_code(), } } diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index a96f817035..d32e12ee24 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -17,11 +17,11 @@ use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; use api::v1::{AlterExpr, CreateDatabaseExpr, CreateExpr}; +use catalog::helper::{SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue}; use catalog::CatalogList; use chrono::DateTime; use client::admin::{admin_result_to_output, Admin}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; -use common_catalog::{SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue}; use common_query::Output; use common_telemetry::{debug, error, info}; use datatypes::prelude::ConcreteDataType; diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 8f97ba12f7..36d229a245 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -818,9 +818,9 @@ mod test { async fn new_dist_table() -> DistTable { let column_schemas = vec![ - ColumnSchema::new("ts", ConcreteDataType::uint64_datatype(), false), - ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true), - ColumnSchema::new("row_id", ConcreteDataType::uint32_datatype(), true), + ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false), + ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false), + ColumnSchema::new("row_id", ConcreteDataType::int32_datatype(), false), ]; let schema = Arc::new(Schema::new(column_schemas.clone())); diff --git a/src/frontend/src/table/scan.rs b/src/frontend/src/table/scan.rs index 1919dc0fb6..14ea9a6a93 100644 --- a/src/frontend/src/table/scan.rs +++ b/src/frontend/src/table/scan.rs @@ -16,17 +16,14 @@ use std::fmt::Formatter; use std::sync::Arc; use api::v1::InsertExpr; -use client::{Database, ObjectResult, Select}; +use client::{Database, ObjectResult}; use common_query::prelude::Expr; use common_query::Output; use common_recordbatch::{util, RecordBatches}; -use datafusion::logical_plan::{LogicalPlan as DfLogicPlan, LogicalPlanBuilder}; -use datafusion_expr::Expr as DfExpr; -use datatypes::prelude::*; -use datatypes::schema::SchemaRef; +use datafusion::logical_plan::{LogicalPlan, LogicalPlanBuilder}; use meta_client::rpc::TableName; -use query::plan::LogicalPlan; use snafu::ResultExt; +use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::table::adapter::DfTableProviderAdapter; use table::TableRef; @@ -56,12 +53,13 @@ impl DatanodeInstance { pub(crate) async fn grpc_table_scan(&self, plan: TableScanPlan) -> Result { let logical_plan = self.build_logical_plan(&plan)?; - // TODO(LFC): Directly pass in logical plan to GRPC interface when our substrait codec supports filter. - let sql = to_sql(logical_plan)?; + let substrait_plan = DFLogicalSubstraitConvertor + .encode(logical_plan) + .context(error::EncodeSubstraitLogicalPlanSnafu)?; let output = self .db - .select(Select::Sql(sql)) + .logical_plan(substrait_plan.to_vec()) .await .and_then(Output::try_from) .context(error::SelectSnafu)?; @@ -94,14 +92,25 @@ impl DatanodeInstance { ) .context(error::BuildDfLogicalPlanSnafu)?; + if let Some(filter) = table_scan + .filters + .iter() + .map(|x| x.df_expr()) + .cloned() + .reduce(|accum, expr| accum.and(expr)) + { + builder = builder + .filter(filter) + .context(error::BuildDfLogicalPlanSnafu)?; + } + if let Some(limit) = table_scan.limit { builder = builder .limit(limit) .context(error::BuildDfLogicalPlanSnafu)?; } - let plan = builder.build().context(error::BuildDfLogicalPlanSnafu)?; - Ok(LogicalPlan::DfPlan(plan)) + builder.build().context(error::BuildDfLogicalPlanSnafu) } } @@ -112,79 +121,3 @@ pub(crate) struct TableScanPlan { pub filters: Vec, pub limit: Option, } - -fn to_sql(plan: LogicalPlan) -> Result { - let LogicalPlan::DfPlan(plan) = plan; - let table_scan = match plan { - DfLogicPlan::TableScan(table_scan) => table_scan, - _ => unreachable!("unknown plan: {:?}", plan), - }; - - let schema: SchemaRef = Arc::new( - table_scan - .source - .schema() - .try_into() - .context(error::ConvertArrowSchemaSnafu)?, - ); - let projection = table_scan - .projection - .map(|x| { - x.iter() - .map(|i| schema.column_name_by_index(*i).to_string()) - .collect::>() - }) - .unwrap_or_else(|| { - schema - .column_schemas() - .iter() - .map(|x| x.name.clone()) - .collect::>() - }) - .join(", "); - - let mut sql = format!("select {} from {}", projection, &table_scan.table_name); - - let filters = table_scan - .filters - .iter() - .map(expr_to_sql) - .collect::>>()? - .join(" AND "); - if !filters.is_empty() { - sql.push_str(" where "); - sql.push_str(&filters); - } - - if let Some(limit) = table_scan.limit { - sql.push_str(" limit "); - sql.push_str(&limit.to_string()); - } - Ok(sql) -} - -fn expr_to_sql(expr: &DfExpr) -> Result { - Ok(match expr { - DfExpr::BinaryExpr { - ref left, - ref right, - ref op, - } => format!( - "{} {} {}", - expr_to_sql(left.as_ref())?, - op, - expr_to_sql(right.as_ref())? - ), - DfExpr::Column(c) => c.name.clone(), - DfExpr::Literal(sv) => { - let v: Value = Value::try_from(sv.clone()) - .with_context(|_| error::ConvertScalarValueSnafu { value: sv.clone() })?; - if matches!(v.data_type(), ConcreteDataType::String(_)) { - format!("'{}'", sv) - } else { - format!("{}", sv) - } - } - _ => unimplemented!("not implemented for {:?}", expr), - }) -} diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 738ca359fa..333cbac4d9 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -10,6 +10,7 @@ mock = [] [dependencies] api = { path = "../api" } async-trait = "0.1" +catalog = { path = "../catalog" } common-base = { path = "../common/base" } common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } diff --git a/src/meta-srv/src/keys.rs b/src/meta-srv/src/keys.rs index 71a24acbd6..b7e215fec9 100644 --- a/src/meta-srv/src/keys.rs +++ b/src/meta-srv/src/keys.rs @@ -15,7 +15,7 @@ use std::str::FromStr; use api::v1::meta::TableName; -use common_catalog::TableGlobalKey; +use catalog::helper::TableGlobalKey; use lazy_static::lazy_static; use regex::Regex; use serde::{Deserialize, Serialize}; diff --git a/src/meta-srv/src/service/router.rs b/src/meta-srv/src/service/router.rs index ba924e61d2..0c502be094 100644 --- a/src/meta-srv/src/service/router.rs +++ b/src/meta-srv/src/service/router.rs @@ -16,7 +16,7 @@ use api::v1::meta::{ router_server, CreateRequest, Error, PeerDict, PutRequest, RangeRequest, Region, RegionRoute, ResponseHeader, RouteRequest, RouteResponse, Table, TableRoute, TableRouteValue, }; -use common_catalog::{TableGlobalKey, TableGlobalValue}; +use catalog::helper::{TableGlobalKey, TableGlobalValue}; use common_telemetry::warn; use snafu::{OptionExt, ResultExt}; use tonic::{Request, Response}; diff --git a/src/script/Cargo.toml b/src/script/Cargo.toml index ff69720053..ed99a7b778 100644 --- a/src/script/Cargo.toml +++ b/src/script/Cargo.toml @@ -14,8 +14,10 @@ python = [ "dep:rustpython-parser", "dep:rustpython-compiler", "dep:rustpython-compiler-core", - "dep:rustpython-bytecode", + "dep:rustpython-codegen", "dep:rustpython-ast", + "dep:rustpython-pylib", + "dep:rustpython-stdlib", "dep:paste", ] @@ -39,13 +41,18 @@ futures = "0.3" futures-util = "0.3" paste = { version = "1.0", optional = true } query = { path = "../query" } -rustpython-ast = { git = "https://github.com/RustPython/RustPython", optional = true, rev = "02a1d1d" } -rustpython-bytecode = { git = "https://github.com/RustPython/RustPython", optional = true, rev = "02a1d1d" } -rustpython-compiler = { git = "https://github.com/RustPython/RustPython", optional = true, rev = "02a1d1d" } -rustpython-compiler-core = { git = "https://github.com/RustPython/RustPython", optional = true, rev = "02a1d1d" } -rustpython-parser = { git = "https://github.com/RustPython/RustPython", optional = true, rev = "02a1d1d" } -rustpython-vm = { git = "https://github.com/RustPython/RustPython", optional = true, rev = "02a1d1d", features = [ +# TODO(discord9): This is a forked and tweaked version of RustPython, please update it to newest original RustPython After Update toolchain to 1.65 +rustpython-ast = { git = "https://github.com/discord9/RustPython", optional = true, rev = "183e8dab" } +rustpython-compiler = { git = "https://github.com/discord9/RustPython", optional = true, rev = "183e8dab" } +rustpython-compiler-core = { git = "https://github.com/discord9/RustPython", optional = true, rev = "183e8dab" } +rustpython-codegen = { git = "https://github.com/discord9/RustPython", optional = true, rev = "183e8dab" } +rustpython-parser = { git = "https://github.com/discord9/RustPython", optional = true, rev = "183e8dab" } +rustpython-vm = { git = "https://github.com/discord9/RustPython", optional = true, rev = "183e8dab", features = [ "default", + "codegen", +] } +rustpython-stdlib = { git = "https://github.com/discord9/RustPython", optional = true, rev = "183e8dab" } +rustpython-pylib = { git = "https://github.com/discord9/RustPython", optional = true, rev = "183e8dab", features = [ "freeze-stdlib", ] } session = { path = "../session" } diff --git a/src/script/src/python/builtins/test.rs b/src/script/src/python/builtins/test.rs index 43ad8d4f5d..8fdeb9ad94 100644 --- a/src/script/src/python/builtins/test.rs +++ b/src/script/src/python/builtins/test.rs @@ -340,7 +340,6 @@ fn run_builtin_fn_testcases() { let testcases: Vec = from_ron_string(&buf).expect("Fail to convert to testcases"); let cached_vm = rustpython_vm::Interpreter::with_init(Default::default(), |vm| { vm.add_native_module("greptime", Box::new(greptime_builtin::make_module)); - // this can be in `.enter()` closure, but for clearity, put it in the `with_init()` PyVector::make_class(&vm.ctx); }); for (idx, case) in testcases.into_iter().enumerate() { @@ -358,7 +357,7 @@ fn run_builtin_fn_testcases() { let code_obj = vm .compile( &case.script, - rustpython_vm::compile::Mode::BlockExpr, + rustpython_compiler_core::Mode::BlockExpr, "".to_owned(), ) .map_err(|err| vm.new_syntax_error(&err)) @@ -466,7 +465,7 @@ fn test_vm() { r#" from udf_builtins import * sin(values)"#, - rustpython_vm::compile::Mode::BlockExpr, + rustpython_compiler_core::Mode::BlockExpr, "".to_owned(), ) .map_err(|err| vm.new_syntax_error(&err)) diff --git a/src/script/src/python/coprocessor.rs b/src/script/src/python/coprocessor.rs index bb32494dbf..3bc5c39f2a 100644 --- a/src/script/src/python/coprocessor.rs +++ b/src/script/src/python/coprocessor.rs @@ -16,7 +16,7 @@ pub mod compile; pub mod parse; use std::cell::RefCell; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::result::Result as StdResult; use std::sync::Arc; @@ -29,7 +29,7 @@ use datatypes::arrow::compute::cast::CastOptions; use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use datatypes::schema::Schema; use datatypes::vectors::{BooleanVector, Helper, StringVector, Vector, VectorRef}; -use rustpython_bytecode::CodeObject; +use rustpython_compiler_core::CodeObject; use rustpython_vm as vm; use rustpython_vm::class::PyClassImpl; use rustpython_vm::AsObject; @@ -430,7 +430,24 @@ pub(crate) fn init_interpreter() -> Arc { INTERPRETER.with(|i| { i.borrow_mut() .get_or_insert_with(|| { + // we limit stdlib imports for safety reason, i.e `fcntl` is not allowed here + let native_module_allow_list = HashSet::from([ + "array", "cmath", "gc", "hashlib", "_json", "_random", "math", + ]); let interpreter = Arc::new(vm::Interpreter::with_init(Default::default(), |vm| { + // not using full stdlib to prevent security issue, instead filter out a few simple util module + vm.add_native_modules( + rustpython_stdlib::get_module_inits() + .into_iter() + .filter(|(k, _)| native_module_allow_list.contains(k.as_ref())), + ); + + // We are freezing the stdlib to include the standard library inside the binary. + // so according to this issue: + // https://github.com/RustPython/RustPython/issues/4292 + // add this line for stdlib, so rustpython can found stdlib's python part in bytecode format + vm.add_frozen(rustpython_pylib::frozen_stdlib()); + // add our own custom datatype and module PyVector::make_class(&vm.ctx); vm.add_native_module("greptime", Box::new(greptime_builtin::make_module)); })); diff --git a/src/script/src/python/coprocessor/compile.rs b/src/script/src/python/coprocessor/compile.rs index f1321d1a0e..8b8a10d228 100644 --- a/src/script/src/python/coprocessor/compile.rs +++ b/src/script/src/python/coprocessor/compile.rs @@ -13,9 +13,9 @@ // limitations under the License. //! compile script to code object - -use rustpython_bytecode::CodeObject; -use rustpython_compiler_core::compile as python_compile; +use rustpython_codegen::compile::compile_top; +use rustpython_compiler::{CompileOpts, Mode}; +use rustpython_compiler_core::CodeObject; use rustpython_parser::ast::{Located, Location}; use rustpython_parser::{ast, parser}; use snafu::ResultExt; @@ -73,7 +73,8 @@ fn gen_call(name: &str, deco_args: &DecoratorArgs, loc: &Location) -> ast::Stmt< /// strip type annotation pub fn compile_script(name: &str, deco_args: &DecoratorArgs, script: &str) -> Result { // note that it's important to use `parser::Mode::Interactive` so the ast can be compile to return a result instead of return None in eval mode - let mut top = parser::parse(script, parser::Mode::Interactive).context(PyParseSnafu)?; + let mut top = + parser::parse(script, parser::Mode::Interactive, "").context(PyParseSnafu)?; // erase decorator if let ast::Mod::Interactive { body } = &mut top { let stmts = body; @@ -122,11 +123,11 @@ pub fn compile_script(name: &str, deco_args: &DecoratorArgs, script: &str) -> Re ); } // use `compile::Mode::BlockExpr` so it return the result of statement - python_compile::compile_top( + compile_top( &top, "".to_owned(), - python_compile::Mode::BlockExpr, - python_compile::CompileOpts { optimize: 0 }, + Mode::BlockExpr, + CompileOpts { optimize: 0 }, ) .context(PyCompileSnafu) } diff --git a/src/script/src/python/coprocessor/parse.rs b/src/script/src/python/coprocessor/parse.rs index 50bb0e3264..324b5f7fc8 100644 --- a/src/script/src/python/coprocessor/parse.rs +++ b/src/script/src/python/coprocessor/parse.rs @@ -99,7 +99,7 @@ fn try_into_datatype(ty: &str, loc: &Location) -> Result> { "_" => Ok(None), // note the different between "_" and _ _ => fail_parse_error!( - format!("Unknown datatype: {ty} at {}", loc), + format!("Unknown datatype: {ty} at {:?}", loc), Some(loc.to_owned()) ), } @@ -427,7 +427,7 @@ fn get_return_annotations(rets: &ast::Expr<()>) -> Result Result { - let python_ast = parser::parse_program(script).context(PyParseSnafu)?; + let python_ast = parser::parse_program(script, "").context(PyParseSnafu)?; let mut coprocessor = None; diff --git a/src/script/src/python/error.rs b/src/script/src/python/error.rs index 57499befaf..9a77984149 100644 --- a/src/script/src/python/error.rs +++ b/src/script/src/python/error.rs @@ -18,7 +18,7 @@ use datafusion::error::DataFusionError; use datatypes::arrow::error::ArrowError; use datatypes::error::Error as DataTypeError; use query::error::Error as QueryError; -use rustpython_compiler_core::error::CompileError as CoreCompileError; +use rustpython_codegen::error::CodegenError; use rustpython_parser::ast::Location; use rustpython_parser::error::ParseError; pub use snafu::ensure; @@ -54,7 +54,7 @@ pub enum Error { #[snafu(display("Failed to compile script, source: {}", source))] PyCompile { backtrace: Backtrace, - source: CoreCompileError, + source: CodegenError, }, /// rustpython problem, using python virtual machines' backtrace instead @@ -76,7 +76,7 @@ pub enum Error { /// errors in coprocessors' parse check for types and etc. #[snafu(display("Coprocessor error: {} {}.", reason, if let Some(loc) = loc{ - format!("at {loc}") + format!("at {loc:?}") }else{ "".into() }))] diff --git a/src/script/src/python/test.rs b/src/script/src/python/test.rs index 043dcba898..5790ce281c 100644 --- a/src/script/src/python/test.rs +++ b/src/script/src/python/test.rs @@ -192,7 +192,7 @@ fn test_type_anno() { def a(cpu, mem: vector[f64])->(vector[f64|None], vector[f64], vector[_], vector[ _ | None]): return cpu + mem, cpu - mem, cpu * mem, cpu / mem "#; - let pyast = parser::parse(python_source, parser::Mode::Interactive).unwrap(); + let pyast = parser::parse(python_source, parser::Mode::Interactive, "").unwrap(); let copr = parse_and_compile_copr(python_source); dbg!(copr); } diff --git a/src/script/src/python/testcases.ron b/src/script/src/python/testcases.ron index 91d736070e..8e2415429a 100644 --- a/src/script/src/python/testcases.ron +++ b/src/script/src/python/testcases.ron @@ -462,5 +462,72 @@ def a(cpu: vector[f32], mem: vector[f64])->(vector[f64], vector[f64]): predicate: ParseIsErr( reason: "Expect a function definition, but found a" ) - ) + ), + ( + // constant column(int) + name: "test_import_stdlib", + code: r#" +@copr(args=["cpu", "mem"], returns=["perf", "what"]) +def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None], + vector[f32]): + # test if using allow list for stdlib damage unrelated module + from collections import deque + import math + math.ceil(0.2) + import string + return cpu + mem, 1 +"#, + predicate: ExecIsOk( + fields: [ + ( + datatype: Some(Float64), + is_nullable: true + ), + ( + datatype: Some(Float32), + is_nullable: false + ), + ], + columns: [ + ( + ty: Float64, + len: 4 + ), + ( + ty: Float32, + len: 4 + ) + ] + ) + ), + ( + // constant column(int) + name: "test_neg_import_stdlib", + code: r#" +@copr(args=["cpu", "mem"], returns=["perf", "what"]) +def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None], + vector[f32]): + # test if module not in allow list can't be imported + import fcntl + return cpu + mem, 1 +"#, + predicate: ExecIsErr( + reason: "No module named 'fcntl'" + ) + ), + ( + // constant column(int) + name: "test_neg_import_depend_stdlib", + code: r#" +@copr(args=["cpu", "mem"], returns=["perf", "what"]) +def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None], + vector[f32]): + # test if module not in allow list can't be imported + import mailbox + return cpu + mem, 1 +"#, + predicate: ExecIsErr( + reason: "ModuleNotFoundError: No module named" + ) + ), ] diff --git a/src/script/src/python/vector.rs b/src/script/src/python/vector.rs index 4a432df602..951ad2f953 100644 --- a/src/script/src/python/vector.rs +++ b/src/script/src/python/vector.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::mem::ManuallyDrop; use std::ops::Deref; use std::sync::Arc; @@ -35,7 +36,7 @@ use rustpython_vm::protocol::{PyMappingMethods, PySequenceMethods}; use rustpython_vm::sliceable::{SaturatedSlice, SequenceIndex, SequenceIndexOp}; use rustpython_vm::types::{AsMapping, AsSequence, Comparable, PyComparisonOp}; use rustpython_vm::{ - pyclass, pyimpl, AsObject, PyObject, PyObjectRef, PyPayload, PyRef, PyResult, VirtualMachine, + pyclass, AsObject, PyObject, PyObjectRef, PyPayload, PyRef, PyResult, VirtualMachine, }; use crate::python::utils::{is_instance, PyVectorRef}; @@ -179,7 +180,7 @@ impl AsRef for PyVector { } /// PyVector type wraps a greptime vector, impl multiply/div/add/sub opeerators etc. -#[pyimpl(with(AsMapping, AsSequence, Comparable))] +#[pyclass(with(AsMapping, AsSequence, Comparable))] impl PyVector { pub(crate) fn new( iterable: OptionalArg, @@ -1012,9 +1013,14 @@ impl Comparable for PyVector { let ret = ret.into_pyobject(vm); Ok(Either::A(ret)) } else { + // Safety: we are manually drop this ref, so no problem here + let r = unsafe { + let ptr = std::ptr::NonNull::from(zelf); + ManuallyDrop::new(PyObjectRef::from_raw(ptr.as_ptr())) + }; Err(vm.new_type_error(format!( - "unexpected payload {} for {}", - zelf, + "unexpected payload {:?} for {}", + r, op.method_name(&vm.ctx).as_str() ))) } @@ -1134,6 +1140,7 @@ pub mod tests { .as_object() .set_item("a", vm.new_pyobj(a), vm) .expect("failed"); + scope .locals .as_object() @@ -1151,7 +1158,7 @@ pub mod tests { let code_obj = vm .compile( script, - rustpython_vm::compile::Mode::BlockExpr, + rustpython_compiler_core::Mode::BlockExpr, "".to_owned(), ) .map_err(|err| vm.new_syntax_error(&err))?; diff --git a/src/servers/tests/mysql/mysql_server_test.rs b/src/servers/tests/mysql/mysql_server_test.rs index 56f7c8a886..deffdebadd 100644 --- a/src/servers/tests/mysql/mysql_server_test.rs +++ b/src/servers/tests/mysql/mysql_server_test.rs @@ -149,7 +149,7 @@ async fn test_server_prefer_secure_client_secure() -> Result<()> { Ok(()) } -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn test_server_require_secure_client_secure() -> Result<()> { let server_tls = Arc::new(TlsOption { mode: servers::tls::TlsMode::Require, diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 23aa51e732..d80d5df293 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -7,6 +7,7 @@ license = "Apache-2.0" [dependencies] async-trait = "0.1" chrono = { version = "0.4", features = ["serde"] } +common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index ea77e39199..52c6deb691 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -16,6 +16,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use chrono::{DateTime, Utc}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; pub use datatypes::error::{Error as ConvertError, Result as ConvertResult}; use datatypes::schema::{ColumnSchema, RawSchema, Schema, SchemaBuilder, SchemaRef}; use derive_builder::Builder; @@ -333,9 +334,9 @@ pub struct TableInfo { /// Comment of the table. #[builder(default, setter(into))] pub desc: Option, - #[builder(default, setter(into))] + #[builder(default = "DEFAULT_CATALOG_NAME.to_string()", setter(into))] pub catalog_name: String, - #[builder(default, setter(into))] + #[builder(default = "DEFAULT_SCHEMA_NAME.to_string()", setter(into))] pub schema_name: String, pub meta: TableMeta, #[builder(default = "TableType::Base")]