From a8f2e4468d928e775d62301f593c483809fe3e17 Mon Sep 17 00:00:00 2001 From: Niwaka <61189782+NiwakaDev@users.noreply.github.com> Date: Tue, 15 Aug 2023 17:22:46 +0900 Subject: [PATCH] feat: handle multiple grpc deletes (#2150) * feat: handle multiple grpc deletes * fix: make DistDeleter::grpc_delete return usize * fix: remove backtrace from MissingTimeIndexColumn * fix: avoid using unwrap in PartitionRuleManager::split_delete_request * fix: simplify MissingTimeIndexColumn --- Cargo.lock | 325 +++++++-------- Cargo.toml | 2 +- src/api/src/helper.rs | 2 +- src/client/src/database.rs | 6 +- src/common/grpc-expr/src/delete.rs | 19 +- src/datanode/src/instance/grpc.rs | 92 +++-- src/frontend/src/error.rs | 17 +- src/frontend/src/instance/distributed.rs | 35 +- .../src/instance/distributed/deleter.rs | 386 ++++++++++++++++++ .../src/instance/distributed/inserter.rs | 1 + src/frontend/src/instance/grpc.rs | 2 +- src/frontend/src/table.rs | 101 +---- src/frontend/src/table/delete.rs | 38 +- src/frontend/src/table/scan.rs | 5 - src/mito/src/engine/tests.rs | 7 +- src/mito/src/table.rs | 1 - src/partition/src/manager.rs | 17 +- src/partition/src/splitter.rs | 17 +- src/query/src/datafusion.rs | 6 + src/servers/tests/mod.rs | 2 +- src/table/src/requests.rs | 3 + tests-integration/src/grpc.rs | 66 +-- 22 files changed, 757 insertions(+), 393 deletions(-) create mode 100644 src/frontend/src/instance/distributed/deleter.rs diff --git a/Cargo.lock b/Cargo.lock index 77c54cd392..1fbceaba80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -64,9 +64,9 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43f6cb1bf222025340178f382c426f13757b2960e89779dfcb319c32542a5a41" +checksum = "86b8f9420f797f2d9e935edf629310eb938a0d839f984e25327f3c7eed22300c" dependencies = [ "memchr", ] @@ -183,9 +183,9 @@ dependencies = [ [[package]] name = "anstyle-wincon" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "180abfa45703aebe0093f79badacc01b8fd4ea2e35118747e5811127f926e188" +checksum = "c677ab05e09154296dd37acecd46420c17b9713e8366facafa8fc0885167cf4c" dependencies = [ "anstyle", "windows-sys 0.48.0", @@ -469,7 +469,7 @@ version = "43.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2bebcb57eef570b15afbcf2d07d813eb476fde9f6dd69c81004d6476c197e87e" dependencies = [ - "bitflags 2.3.3", + "bitflags 2.4.0", "serde", ] @@ -589,9 +589,9 @@ dependencies = [ [[package]] name = "async-lock" -version = "2.7.0" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa24f727524730b077666307f2734b4a1a1c57acb79193127dcc8914d5242dd7" +checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" dependencies = [ "event-listener", ] @@ -631,9 +631,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.72" +version = "0.1.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc6dde6e4ed435a4c1ee4e73592f5ba9da2151af10076cc04858746af9352d09" +checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" dependencies = [ "proc-macro2", "quote", @@ -712,9 +712,9 @@ dependencies = [ [[package]] name = "axum" -version = "0.6.19" +version = "0.6.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6a1de45611fdb535bfde7b7de4fd54f4fd2b17b1737c0a59b69bf9b92074b8c" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", "axum-core", @@ -849,7 +849,7 @@ name = "benchmarks" version = "0.3.2" dependencies = [ "arrow", - "clap 4.3.19", + "clap 4.3.21", "client", "indicatif", "itertools 0.10.5", @@ -903,7 +903,7 @@ version = "0.66.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2b84e06fc203107bfbad243f4aba2af864eb7db3b1cf46ea0a023b0b433d2a7" dependencies = [ - "bitflags 2.3.3", + "bitflags 2.4.0", "cexpr", "clang-sys", "lazy_static", @@ -940,9 +940,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.3.3" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630be753d4e58660abd17930c71b647fe46c27ea6b63cc59e1e3851406972e42" +checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635" [[package]] name = "bitvec" @@ -1072,7 +1072,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6798148dccfbff0fae41c7574d2fa8f1ef3492fba0face179de5d8d447d67b05" dependencies = [ "memchr", - "regex-automata 0.3.4", + "regex-automata 0.3.6", "serde", ] @@ -1204,7 +1204,7 @@ checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" dependencies = [ "camino", "cargo-platform", - "semver 1.0.18", + "semver", "serde", "serde_json", ] @@ -1256,7 +1256,7 @@ dependencies = [ "meta-client", "metrics", "mito", - "moka 0.11.2", + "moka 0.11.3", "object-store", "parking_lot 0.12.1", "regex", @@ -1272,11 +1272,12 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.79" +version = "1.0.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f" +checksum = "305fe645edc1442a0fa8b6726ba61d422798d37a52e12eaecf4b022ebbb88f01" dependencies = [ "jobserver", + "libc", ] [[package]] @@ -1446,9 +1447,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.3.19" +version = "4.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fd304a20bff958a57f04c4e96a2e7594cc4490a0e809cbd48bb6437edaa452d" +checksum = "c27cdf28c0f604ba3f512b0c9a409f8de8513e4816705deb0498b627e7c3a3fd" dependencies = [ "clap_builder", "clap_derive 4.3.12", @@ -1457,9 +1458,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.3.19" +version = "4.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01c6a3f08f1fe5662a35cfe393aec09c4df95f60ee93b7556505260f75eee9e1" +checksum = "08a9f1ab5e9f01a9b81f202e8562eb9a10de70abf9eaeac1be465c28b75aa4aa" dependencies = [ "anstream", "anstyle", @@ -1530,7 +1531,7 @@ dependencies = [ "derive-new", "enum_dispatch", "futures-util", - "moka 0.9.8", + "moka 0.9.9", "parking_lot 0.12.1", "prost", "rand", @@ -2070,9 +2071,9 @@ checksum = "e4c78c047431fee22c1a7bb92e00ad095a02a983affe4d8a72e2a2c62c1b94f3" [[package]] name = "const-oid" -version = "0.9.4" +version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "795bc6e66a8e340f075fcf6227e417a2dc976b92b91f3cdc778bb858778b6747" +checksum = "28c122c3980598d243d63d9a704629a2d748d101f278052ff068be5a4423ab6f" [[package]] name = "const-random" @@ -2521,7 +2522,7 @@ dependencies = [ "lazy_static", "sqlparser 0.35.0", "strum 0.25.0", - "strum_macros 0.25.1", + "strum_macros 0.25.2", ] [[package]] @@ -2612,7 +2613,7 @@ dependencies = [ "object_store", "prost", "prost-types", - "substrait 0.12.3", + "substrait 0.12.4", "tokio", ] @@ -2731,20 +2732,20 @@ dependencies = [ [[package]] name = "der" -version = "0.7.7" +version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c7ed52955ce76b1554f509074bb357d3fb8ac9b51288a65a3fd480d1dfba946" +checksum = "fffa369a668c8af7dbf8b5e56c9f744fbd399949ed171606040001947de40b1c" dependencies = [ - "const-oid 0.9.4", + "const-oid 0.9.5", "pem-rfc7468 0.7.0", "zeroize", ] [[package]] name = "deranged" -version = "0.3.6" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8810e7e2cf385b1e9b50d68264908ec367ba642c96d02edfe61c39e88e2a3c01" +checksum = "7684a49fb1af197853ef7b2ee694bc1f5b4179556f1e5710e1760c5db6f5e929" [[package]] name = "derive-new" @@ -2832,7 +2833,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", - "const-oid 0.9.4", + "const-oid 0.9.5", "crypto-common", "subtle", ] @@ -3142,7 +3143,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef033ed5e9bad94e55838ca0ca906db0e043f517adda0c8b79c7a8c66c93c1b5" dependencies = [ "cfg-if 1.0.0", - "rustix 0.38.4", + "rustix 0.38.8", "windows-sys 0.48.0", ] @@ -3175,13 +3176,13 @@ dependencies = [ [[package]] name = "filetime" -version = "0.2.21" +version = "0.2.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cbc844cecaee9d4443931972e1289c8ff485cb4cc2767cb03ca139ed6885153" +checksum = "d4029edd3e734da6fe05b6cd7bd2960760a616bd2ddd0d59a0124746d6272af0" dependencies = [ "cfg-if 1.0.0", "libc", - "redox_syscall 0.2.16", + "redox_syscall 0.3.5", "windows-sys 0.48.0", ] @@ -3216,7 +3217,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dac53e22462d78c16d64a1cd22371b54cc3fe94aa15e7886a2fa6e5d1ab8640" dependencies = [ "bitflags 1.3.2", - "rustc_version 0.4.0", + "rustc_version", ] [[package]] @@ -3296,7 +3297,7 @@ dependencies = [ "meter-macros", "metrics", "mito", - "moka 0.9.8", + "moka 0.9.9", "object-store", "openmetrics-parser", "opentelemetry-proto", @@ -3744,7 +3745,7 @@ dependencies = [ "bstr 1.6.0", "itoa", "thiserror", - "time 0.3.24", + "time 0.3.25", ] [[package]] @@ -4135,7 +4136,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=9b68af55c050a010f202fcccb22d58f080f0a868#9b68af55c050a010f202fcccb22d58f080f0a868" +source = "git+https://github.com/NiwakaDev/greptime-proto.git?rev=ec402b6500f908a0acfab6c889225cd4dc2228a4#ec402b6500f908a0acfab6c889225cd4dc2228a4" dependencies = [ "prost", "serde", @@ -4374,9 +4375,9 @@ checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" [[package]] name = "httpdate" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "humantime" @@ -4427,7 +4428,7 @@ dependencies = [ "futures-util", "http", "hyper", - "rustls 0.21.5", + "rustls 0.21.6", "tokio", "tokio-rustls 0.24.1", ] @@ -4541,9 +4542,9 @@ dependencies = [ [[package]] name = "indicatif" -version = "0.17.5" +version = "0.17.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ff8cc23a7393a397ed1d7f56e6365cba772aba9f9912ab968b03043c395d057" +checksum = "0b297dc40733f23a0e52728a58fa9489a5b7638a324932de16b41adc3ef80730" dependencies = [ "console", "instant", @@ -4664,7 +4665,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" dependencies = [ "hermit-abi 0.3.2", - "rustix 0.38.4", + "rustix 0.38.8", "windows-sys 0.48.0", ] @@ -4976,9 +4977,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.19" +version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" +checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" [[package]] name = "log-store" @@ -5174,9 +5175,9 @@ checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" [[package]] name = "matchit" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67827e6ea8ee8a7c4a72227ef4fc08957040acffdb5f122733b24fa12daff41b" +checksum = "ed1202b2a6f884ae56f04cff409ab315c5ce26b5e58d7412e484f01fd52f52ef" [[package]] name = "matrixmultiply" @@ -5563,9 +5564,9 @@ dependencies = [ [[package]] name = "moka" -version = "0.9.8" +version = "0.9.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19ca9b167ed904bc89a2f64c4be5014615c26fd9c4ddd2042c6094744c7df11a" +checksum = "b28455ac4363046076054a7e9cfbd7f168019c29dba32a625f59fc0aeffaaea4" dependencies = [ "async-io", "async-lock", @@ -5577,7 +5578,7 @@ dependencies = [ "once_cell", "parking_lot 0.12.1", "quanta 0.11.1", - "rustc_version 0.4.0", + "rustc_version", "scheduled-thread-pool", "skeptic", "smallvec", @@ -5589,9 +5590,9 @@ dependencies = [ [[package]] name = "moka" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "206bf83f415b0579fd885fe0804eb828e727636657dc1bf73d80d2f1218e14a1" +checksum = "fa6e72583bf6830c956235bff0d5afec8cf2952f579ebad18ae7821a917d950f" dependencies = [ "async-io", "async-lock", @@ -5602,7 +5603,7 @@ dependencies = [ "once_cell", "parking_lot 0.12.1", "quanta 0.11.1", - "rustc_version 0.4.0", + "rustc_version", "scheduled-thread-pool", "skeptic", "smallvec", @@ -5665,7 +5666,7 @@ dependencies = [ "percent-encoding", "pin-project", "priority-queue", - "rustls 0.21.5", + "rustls 0.21.6", "rustls-pemfile", "serde", "serde_json", @@ -5689,7 +5690,7 @@ dependencies = [ "base64 0.21.2", "bigdecimal", "bindgen 0.66.1", - "bitflags 2.3.3", + "bitflags 2.4.0", "bitvec", "byteorder", "bytes", @@ -5715,7 +5716,7 @@ dependencies = [ "smallvec", "subprocess", "thiserror", - "time 0.3.24", + "time 0.3.25", "uuid", ] @@ -5868,9 +5869,9 @@ dependencies = [ [[package]] name = "num-complex" -version = "0.4.3" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02e0d21255c828d6f128a1e41534206671e8c3ea0c62f32291e808dc82cff17d" +checksum = "1ba157ca0885411de85d6ca030ba7e2a83a28636056c7c699b07c8b6f7383214" dependencies = [ "num-traits", ] @@ -6452,7 +6453,7 @@ dependencies = [ "datafusion-expr", "datatypes", "meta-client", - "moka 0.9.8", + "moka 0.9.9", "serde", "serde_json", "snafu", @@ -6523,9 +6524,9 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" [[package]] name = "pest" -version = "2.7.1" +version = "2.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d2d1d55045829d65aad9d389139882ad623b33b904e7c9f1b10c5b8927298e5" +checksum = "1acb4a4365a13f749a93f1a094a7805e5cfa0955373a9de860d962eaa3a5fe5a" dependencies = [ "thiserror", "ucd-trie", @@ -6533,9 +6534,9 @@ dependencies = [ [[package]] name = "pest_derive" -version = "2.7.1" +version = "2.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f94bca7e7a599d89dea5dfa309e217e7906c3c007fb9c3299c40b10d6a315d3" +checksum = "666d00490d4ac815001da55838c500eafb0320019bbaa44444137c48b443a853" dependencies = [ "pest", "pest_generator", @@ -6543,9 +6544,9 @@ dependencies = [ [[package]] name = "pest_generator" -version = "2.7.1" +version = "2.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99d490fe7e8556575ff6911e45567ab95e71617f43781e5c05490dc8d75c965c" +checksum = "68ca01446f50dbda87c1786af8770d535423fa8a53aec03b8f4e3d7eb10e0929" dependencies = [ "pest", "pest_meta", @@ -6556,9 +6557,9 @@ dependencies = [ [[package]] name = "pest_meta" -version = "2.7.1" +version = "2.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2674c66ebb4b4d9036012091b537aae5878970d6999f81a265034d85b136b341" +checksum = "56af0a30af74d0445c0bf6d9d051c979b516a1a5af790d251daee76005420a48" dependencies = [ "once_cell", "pest", @@ -6596,7 +6597,7 @@ dependencies = [ "ring", "stringprep", "thiserror", - "time 0.3.24", + "time 0.3.25", "tokio", "tokio-rustls 0.24.1", "tokio-util", @@ -6653,18 +6654,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "030ad2bc4db10a8944cb0d837f158bdfec4d4a4873ab701a95046770d11f8842" +checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec2e072ecce94ec471b13398d5402c188e76ac03cf74dd1a975161b23a3f6d9c" +checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", @@ -6673,9 +6674,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.10" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c40d25201921e5ff0c862a505c6557ea88568a4e3ace775ab55e93f2f4f9d57" +checksum = "12cc1b0bf1727a77a54b6654e7b5f1af8604923edc8b81885f8ec92f9e3f0a05" [[package]] name = "pin-utils" @@ -6700,7 +6701,7 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8ffb9f10fa047879315e6625af03c164b16962a5368d724ed16323b68ace47f" dependencies = [ - "der 0.7.7", + "der 0.7.8", "pkcs8 0.10.2", "spki 0.7.2", ] @@ -6722,7 +6723,7 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" dependencies = [ - "der 0.7.7", + "der 0.7.8", "spki 0.7.2", ] @@ -7185,9 +7186,9 @@ checksum = "3b7e158a385023d209d6d5f2585c4b468f6dcb3dd5aca9b75c4f1678c05bb375" [[package]] name = "pyo3" -version = "0.19.1" +version = "0.19.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffb88ae05f306b4bfcde40ac4a51dc0b05936a9207a4b75b798c7729c4258a59" +checksum = "e681a6cfdc4adcc93b4d3cf993749a4552018ee0a9b65fc0ccfad74352c72a38" dependencies = [ "cfg-if 1.0.0", "indoc", @@ -7202,9 +7203,9 @@ dependencies = [ [[package]] name = "pyo3-build-config" -version = "0.19.1" +version = "0.19.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "554db24f0b3c180a9c0b1268f91287ab3f17c162e15b54caaae5a6b3773396b0" +checksum = "076c73d0bc438f7a4ef6fdd0c3bb4732149136abd952b110ac93e4edb13a6ba5" dependencies = [ "once_cell", "target-lexicon", @@ -7212,9 +7213,9 @@ dependencies = [ [[package]] name = "pyo3-ffi" -version = "0.19.1" +version = "0.19.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "922ede8759e8600ad4da3195ae41259654b9c55da4f7eec84a0ccc7d067a70a4" +checksum = "e53cee42e77ebe256066ba8aa77eff722b3bb91f3419177cf4cd0f304d3284d9" dependencies = [ "libc", "pyo3-build-config", @@ -7222,9 +7223,9 @@ dependencies = [ [[package]] name = "pyo3-macros" -version = "0.19.1" +version = "0.19.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a5caec6a1dd355964a841fcbeeb1b89fe4146c87295573f94228911af3cc5a2" +checksum = "dfeb4c99597e136528c6dd7d5e3de5434d1ceaf487436a3f03b2d56b6fc9efd1" dependencies = [ "proc-macro2", "pyo3-macros-backend", @@ -7234,9 +7235,9 @@ dependencies = [ [[package]] name = "pyo3-macros-backend" -version = "0.19.1" +version = "0.19.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0b78ccbb160db1556cdb6fd96c50334c5d4ec44dc5e0a968d0a1208fa0efa8b" +checksum = "947dc12175c254889edc0c02e399476c2f652b4b9ebd123aa655c224de259536" dependencies = [ "proc-macro2", "quote", @@ -7536,13 +7537,13 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.1" +version = "1.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2eae68fc220f7cf2532e4494aded17545fce192d59cd996e0fe7887f4ceb575" +checksum = "81bc1d4caf89fac26a70747fe603c130093b53c773888797a6329091246d651a" dependencies = [ - "aho-corasick 1.0.2", + "aho-corasick 1.0.3", "memchr", - "regex-automata 0.3.4", + "regex-automata 0.3.6", "regex-syntax 0.7.4", ] @@ -7557,11 +7558,11 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.4" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7b6d6190b7594385f61bd3911cd1be99dfddcfc365a4160cc2ab5bff4aed294" +checksum = "fed1ceff11a1dddaee50c9dc8e4938bd106e9d89ae372f192311e7da498e3b69" dependencies = [ - "aho-corasick 1.0.2", + "aho-corasick 1.0.3", "memchr", "regex-syntax 0.7.4", ] @@ -7662,7 +7663,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls 0.21.5", + "rustls 0.21.6", "rustls-native-certs", "rustls-pemfile", "serde", @@ -7847,7 +7848,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ab43bb47d23c1a631b4b680199a45255dce26fa9ab2fa902581f624ff13e6a8" dependencies = [ "byteorder", - "const-oid 0.9.4", + "const-oid 0.9.5", "digest", "num-bigint-dig", "num-integer", @@ -7871,7 +7872,7 @@ dependencies = [ "futures", "futures-timer", "rstest_macros", - "rustc_version 0.4.0", + "rustc_version", ] [[package]] @@ -7883,7 +7884,7 @@ dependencies = [ "cfg-if 1.0.0", "proc-macro2", "quote", - "rustc_version 0.4.0", + "rustc_version", "syn 1.0.109", "unicode-ident", ] @@ -7896,7 +7897,7 @@ checksum = "45f80dcc84beab3a327bbe161f77db25f336a1452428176787c8c79ac79d7073" dependencies = [ "quote", "rand", - "rustc_version 0.4.0", + "rustc_version", "syn 1.0.109", ] @@ -7983,22 +7984,13 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" -[[package]] -name = "rustc_version" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0dfe2087c51c460008730de8b57e6a320782fbfb312e1f4d520e6c6fae155ee" -dependencies = [ - "semver 0.11.0", -] - [[package]] name = "rustc_version" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" dependencies = [ - "semver 1.0.18", + "semver", ] [[package]] @@ -8031,11 +8023,11 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.4" +version = "0.38.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a962918ea88d644592894bc6dc55acc6c0956488adcebbfb6e273506b7fd6e5" +checksum = "19ed4fa021d81c8392ce04db050a3da9a60299050b7ae1cf482d862b54a7218f" dependencies = [ - "bitflags 2.3.3", + "bitflags 2.4.0", "errno 0.3.2", "libc", "linux-raw-sys 0.4.5", @@ -8056,13 +8048,13 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.5" +version = "0.21.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79ea77c539259495ce8ca47f53e66ae0330a8819f67e23ac96ca02f50e7b7d36" +checksum = "1d1feddffcfcc0b33f5c6ce9a29e341e4cd59c3f78e7ee45f4a40c038b1d6cbb" dependencies = [ "log", "ring", - "rustls-webpki 0.101.2", + "rustls-webpki 0.101.3", "sct", ] @@ -8099,9 +8091,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.101.2" +version = "0.101.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "513722fd73ad80a71f72b61009ea1b584bcfa1483ca93949c8f290298837fa59" +checksum = "261e9e0888cba427c3316e6322805653c9425240b6fd96cee7cb671ab70ab8d0" dependencies = [ "ring", "untrusted", @@ -8359,7 +8351,7 @@ dependencies = [ "paste", "rand", "result-like", - "rustc_version 0.4.0", + "rustc_version", "rustpython-ast", "rustpython-codegen", "rustpython-common", @@ -8676,15 +8668,6 @@ dependencies = [ "libc", ] -[[package]] -name = "semver" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f301af10236f6df4160f7c3f04eec6dbc70ace82d23326abad5edee88801c6b6" -dependencies = [ - "semver-parser", -] - [[package]] name = "semver" version = "1.0.18" @@ -8694,15 +8677,6 @@ dependencies = [ "serde", ] -[[package]] -name = "semver-parser" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00b0bef5b7f9e0df16536d3961cfb6e84331c065b4066afb39768d0e319411f7" -dependencies = [ - "pest", -] - [[package]] name = "seq-macro" version = "0.3.5" @@ -8711,9 +8685,9 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.180" +version = "1.0.183" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ea67f183f058fe88a4e3ec6e2788e003840893b91bac4559cabedd00863b3ed" +checksum = "32ac8da02677876d532745a130fc9d8e6edfa81a269b107c5b00829b91d8eb3c" dependencies = [ "serde_derive", ] @@ -8730,9 +8704,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.180" +version = "1.0.183" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24e744d7782b686ab3b73267ef05697159cc0e5abbed3f47f9933165e5219036" +checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816" dependencies = [ "proc-macro2", "quote", @@ -8904,7 +8878,7 @@ dependencies = [ "rand", "regex", "rust-embed", - "rustls 0.21.5", + "rustls 0.21.6", "rustls-pemfile", "schemars", "script", @@ -9065,7 +9039,7 @@ dependencies = [ "num-bigint", "num-traits", "thiserror", - "time 0.3.24", + "time 0.3.25", ] [[package]] @@ -9205,7 +9179,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d1e996ef02c474957d681f1b05213dfb0abab947b446a62d37770b23500184a" dependencies = [ "base64ct", - "der 0.7.7", + "der 0.7.8", ] [[package]] @@ -9619,7 +9593,7 @@ version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" dependencies = [ - "strum_macros 0.25.1", + "strum_macros 0.25.2", ] [[package]] @@ -9649,9 +9623,9 @@ dependencies = [ [[package]] name = "strum_macros" -version = "0.25.1" +version = "0.25.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6069ca09d878a33f883cc06aaa9718ede171841d3832450354410b718b097232" +checksum = "ad8d03b598d3d0fff69bf533ee3ef19b8eeb342729596df84bcc7e1f96ec4059" dependencies = [ "heck 0.4.1", "proc-macro2", @@ -9691,7 +9665,7 @@ dependencies = [ "prost", "session", "snafu", - "substrait 0.12.3", + "substrait 0.12.4", "table", "tokio", ] @@ -9709,7 +9683,7 @@ dependencies = [ "prost-build", "prost-types", "schemars", - "semver 1.0.18", + "semver", "serde", "serde_json", "serde_yaml", @@ -9720,9 +9694,9 @@ dependencies = [ [[package]] name = "substrait" -version = "0.12.3" +version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ac1ce8315086b127ca0abf162c62279550942bb26ebf7946fe17fe114446472" +checksum = "658f6cbbd29a250869b87e1bb5a4b42db534cacfc1c03284f2536cd36b6c1617" dependencies = [ "git2", "heck 0.4.1", @@ -9731,7 +9705,7 @@ dependencies = [ "prost-build", "prost-types", "schemars", - "semver 1.0.18", + "semver", "serde", "serde_json", "serde_yaml", @@ -9915,14 +9889,14 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.7.0" +version = "3.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5486094ee78b2e5038a6382ed7645bc084dc2ec433426ca4c3cb61e2007b8998" +checksum = "dc02fddf48964c42031a0b3fe0428320ecf3a73c401040fc0096f97794310651" dependencies = [ "cfg-if 1.0.0", "fastrand 2.0.0", "redox_syscall 0.3.5", - "rustix 0.38.4", + "rustix 0.38.8", "windows-sys 0.48.0", ] @@ -10156,9 +10130,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b79eabcd964882a646b3584543ccabeae7869e9ac32a46f6f22b7a5bd405308b" +checksum = "b0fdd63d58b18d663fbdf70e049f00a22c8e42be082203be7f26589213cd75ea" dependencies = [ "deranged", "itoa", @@ -10226,11 +10200,10 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.29.1" +version = "1.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "532826ff75199d5833b9d2c5fe410f29235e25704ee5f0ef599fb51c21f4a4da" +checksum = "40de3a2ba249dcb097e01be5e67a5ff53cf250397715a071a81543e8a832a920" dependencies = [ - "autocfg", "backtrace", "bytes", "libc", @@ -10239,7 +10212,7 @@ dependencies = [ "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", - "socket2 0.4.9", + "socket2 0.5.3", "tokio-macros", "tracing", "windows-sys 0.48.0", @@ -10298,7 +10271,7 @@ checksum = "dd5831152cb0d3f79ef5523b357319ba154795d64c7078b2daa95a803b54057f" dependencies = [ "futures", "ring", - "rustls 0.21.5", + "rustls 0.21.6", "tokio", "tokio-postgres", "tokio-rustls 0.24.1", @@ -10321,7 +10294,7 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls 0.21.5", + "rustls 0.21.6", "tokio", ] @@ -10579,7 +10552,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09d48f71a791638519505cefafe162606f706c25592e4bde4d97600c0195312e" dependencies = [ "crossbeam-channel", - "time 0.3.24", + "time 0.3.25", "tracing-subscriber", ] @@ -10605,7 +10578,7 @@ dependencies = [ "log", "serde", "serde_json", - "time 0.3.24", + "time 0.3.25", "tracing", "tracing-core", "tracing-log", @@ -11142,7 +11115,7 @@ dependencies = [ "getset", "rustversion", "thiserror", - "time 0.3.24", + "time 0.3.25", ] [[package]] @@ -11153,12 +11126,12 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "vob" -version = "3.0.2" +version = "3.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbdb3eee5dd38a27129832bca4a3171888e699a6ac36de86547975466997986f" +checksum = "c058f4c41e71a043c67744cb76dcc1ae63ece328c1732a72489ccccc2dec23e6" dependencies = [ "num-traits", - "rustc_version 0.3.3", + "rustc_version", "serde", ] @@ -11591,9 +11564,9 @@ checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" [[package]] name = "winnow" -version = "0.5.2" +version = "0.5.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8bd122eb777186e60c3fdf765a58ac76e41c582f1f535fbf3314434c6b58f3f7" +checksum = "5504cc7644f4b593cbc05c4a55bf9bd4e94b867c3c0bd440934174d50482427d" dependencies = [ "memchr", ] @@ -11625,7 +11598,7 @@ dependencies = [ "bcder", "bytes", "chrono", - "der 0.7.7", + "der 0.7.8", "hex", "pem 2.0.1", "ring", diff --git a/Cargo.toml b/Cargo.toml index 6cc91e9c2f..7e5f5268d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,7 +77,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git derive_builder = "0.12" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "9b68af55c050a010f202fcccb22d58f080f0a868" } +greptime-proto = { git = "https://github.com/NiwakaDev/greptime-proto.git", rev = "ec402b6500f908a0acfab6c889225cd4dc2228a4" } itertools = "0.10" lazy_static = "1.4" once_cell = "1.18" diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 98107438a6..c894a4c723 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -300,9 +300,9 @@ pub fn request_type(request: &Request) -> &'static str { Request::Inserts(_) => "inserts", Request::Query(query_req) => query_request_type(query_req), Request::Ddl(ddl_req) => ddl_request_type(ddl_req), - Request::Delete(_) => "delete", Request::RowInserts(_) => "row_inserts", Request::RowDelete(_) => "row_delete", + Request::Deletes(_) => "deletes", } } diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 80452a0cfd..e0c7022282 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -17,7 +17,7 @@ use api::v1::ddl_request::Expr as DdlExpr; use api::v1::greptime_request::Request; use api::v1::query_request::Query; use api::v1::{ - AlterExpr, AuthHeader, CompactTableExpr, CreateTableExpr, DdlRequest, DeleteRequest, + AlterExpr, AuthHeader, CompactTableExpr, CreateTableExpr, DdlRequest, DeleteRequests, DropTableExpr, FlushTableExpr, GreptimeRequest, InsertRequests, PromRangeQuery, QueryRequest, RequestHeader, TruncateTableExpr, }; @@ -132,9 +132,9 @@ impl Database { Ok(stream_inserter) } - pub async fn delete(&self, request: DeleteRequest) -> Result { + pub async fn delete(&self, request: DeleteRequests) -> Result { let _timer = timer!(metrics::METRIC_GRPC_DELETE); - self.handle(Request::Delete(request)).await + self.handle(Request::Deletes(request)).await } async fn handle(&self, request: Request) -> Result { diff --git a/src/common/grpc-expr/src/delete.rs b/src/common/grpc-expr/src/delete.rs index cd228857ed..18a480e8a4 100644 --- a/src/common/grpc-expr/src/delete.rs +++ b/src/common/grpc-expr/src/delete.rs @@ -23,7 +23,11 @@ use table::requests::DeleteRequest; use crate::error::{ColumnDataTypeSnafu, IllegalDeleteRequestSnafu, Result}; use crate::insert::add_values_to_builder; -pub fn to_table_delete_request(request: GrpcDeleteRequest) -> Result { +pub fn to_table_delete_request( + catalog_name: &str, + schema_name: &str, + request: GrpcDeleteRequest, +) -> Result { let row_count = request.row_count as usize; let mut key_column_values = HashMap::with_capacity(request.key_columns.len()); @@ -52,7 +56,12 @@ pub fn to_table_delete_request(request: GrpcDeleteRequest) -> Result Result { - let catalog = ctx.current_catalog(); - let schema = ctx.current_schema(); - let table_name = &request.table_name.clone(); - let table_ref = TableReference::full(catalog, schema, table_name); + async fn handle_deletes( + &self, + request: DeleteRequests, + ctx: QueryContextRef, + ) -> Result { + let results = future::try_join_all(request.deletes.into_iter().map(|delete| { + let catalog_manager = self.catalog_manager.clone(); + let catalog = ctx.current_catalog().to_string(); + let schema = ctx.current_schema().to_string(); + common_runtime::spawn_write(async move { + let table_name = delete.table_name.clone(); + let table_ref = TableReference::full(&catalog, &schema, &table_name); + let table = catalog_manager + .table(&catalog, &schema, &table_name) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: table_ref.to_string(), + })?; - let table = self - .catalog_manager - .table(catalog, schema, table_name) - .await - .context(CatalogSnafu)? - .with_context(|| TableNotFoundSnafu { - table_name: table_ref.to_string(), - })?; + let request = + common_grpc_expr::delete::to_table_delete_request(&catalog, &schema, delete) + .context(DeleteExprToRequestSnafu)?; - let request = common_grpc_expr::delete::to_table_delete_request(request) - .context(DeleteExprToRequestSnafu)?; - - let affected_rows = table.delete(request).await.with_context(|_| DeleteSnafu { - table_name: table_ref.to_string(), - })?; + table.delete(request).await.with_context(|_| DeleteSnafu { + table_name: table_ref.to_string(), + }) + }) + })) + .await + .context(JoinTaskSnafu)?; + let affected_rows = results.into_iter().sum::>()?; Ok(Output::AffectedRows(affected_rows)) } @@ -211,7 +222,7 @@ impl GrpcQueryHandler for Instance { async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result { match request { Request::Inserts(requests) => self.handle_inserts(requests, &ctx).await, - Request::Delete(request) => self.handle_delete(request, ctx).await, + Request::Deletes(request) => self.handle_deletes(request, ctx).await, Request::Query(query_request) => { let query = query_request .query @@ -310,8 +321,8 @@ mod test { use api::v1::column::Values; use api::v1::{ alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, - CreateDatabaseExpr, CreateTableExpr, DropTableExpr, InsertRequest, InsertRequests, - QueryRequest, RenameTable, SemanticType, TableId, TruncateTableExpr, + CreateDatabaseExpr, CreateTableExpr, DeleteRequest, DropTableExpr, InsertRequest, + InsertRequests, QueryRequest, RenameTable, SemanticType, TableId, TruncateTableExpr, }; use common_catalog::consts::MITO_ENGINE; use common_error::ext::ErrorExt; @@ -903,7 +914,7 @@ mod test { let output = instance.do_query(query, QueryContext::arc()).await.unwrap(); assert!(matches!(output, Output::AffectedRows(3))); - let request = DeleteRequest { + let request1 = DeleteRequest { table_name: "demo".to_string(), region_number: 0, key_columns: vec![ @@ -928,13 +939,39 @@ mod test { ], row_count: 1, }; - - let request = Request::Delete(request); + let request2 = DeleteRequest { + table_name: "demo".to_string(), + region_number: 0, + key_columns: vec![ + Column { + column_name: "host".to_string(), + values: Some(Values { + string_values: vec!["host3".to_string()], + ..Default::default() + }), + datatype: ColumnDataType::String as i32, + ..Default::default() + }, + Column { + column_name: "ts".to_string(), + values: Some(Values { + ts_millisecond_values: vec![1672201026000], + ..Default::default() + }), + datatype: ColumnDataType::TimestampMillisecond as i32, + ..Default::default() + }, + ], + row_count: 1, + }; + let request = Request::Deletes(DeleteRequests { + deletes: vec![request1, request2], + }); let output = instance .do_query(request, QueryContext::arc()) .await .unwrap(); - assert!(matches!(output, Output::AffectedRows(1))); + assert!(matches!(output, Output::AffectedRows(2))); let output = exec_selection(instance, "SELECT ts, host, cpu FROM demo").await; let Output::Stream(stream) = output else { @@ -946,7 +983,6 @@ mod test { | ts | host | cpu | +---------------------+-------+------+ | 2022-12-28T04:17:05 | host1 | 66.6 | -| 2022-12-28T04:17:06 | host3 | 88.8 | +---------------------+-------+------+"; assert_eq!(recordbatches.pretty_print().unwrap(), expected); } diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 1121a2a0a7..e5c4d03c1a 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -209,6 +209,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to split delete request, source: {}", source))] + SplitDelete { + source: partition::error::Error, + location: Location, + }, + #[snafu(display("Failed to create table info, source: {}", source))] CreateTableInfo { #[snafu(backtrace)] @@ -409,6 +415,12 @@ pub enum Error { source: table::error::Error, }, + #[snafu(display("Missing time index column: {}", source))] + MissingTimeIndexColumn { + location: Location, + source: table::error::Error, + }, + #[snafu(display("Failed to start script manager, source: {}", source))] StartScriptManager { #[snafu(backtrace)] @@ -644,6 +656,8 @@ impl ErrorExt for Error { source.status_code() } + Error::MissingTimeIndexColumn { source, .. } => source.status_code(), + Error::FindDatanode { .. } | Error::CreateTableRoute { .. } | Error::FindRegionRoute { .. } @@ -693,7 +707,8 @@ impl ErrorExt for Error { Error::DeserializePartition { source, .. } | Error::FindTablePartitionRule { source, .. } | Error::FindTableRoute { source, .. } - | Error::SplitInsert { source, .. } => source.status_code(), + | Error::SplitInsert { source, .. } + | Error::SplitDelete { source, .. } => source.status_code(), Error::UnrecognizedTableOption { .. } => StatusCode::InvalidArguments, diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 99452fb9c7..67f8abd4af 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod deleter; pub(crate) mod inserter; use std::collections::HashMap; @@ -21,7 +22,7 @@ use api::helper::ColumnDataTypeWrapper; use api::v1::ddl_request::Expr as DdlExpr; use api::v1::greptime_request::Request; use api::v1::{ - column_def, AlterExpr, CompactTableExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequest, + column_def, AlterExpr, CompactTableExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequests, FlushTableExpr, InsertRequests, TruncateTableExpr, }; use async_trait::async_trait; @@ -56,7 +57,6 @@ use sql::statements::create::{PartitionEntry, Partitions}; use sql::statements::statement::Statement; use sql::statements::{self, sql_value_to_value}; use store_api::storage::RegionNumber; -use table::engine::TableReference; use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableInfo, TableType}; use table::requests::{AlterTableRequest, TableOptions}; use table::TableRef; @@ -66,9 +66,10 @@ use crate::error::{ self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu, DeserializePartitionSnafu, InvokeDatanodeSnafu, NotSupportedSnafu, ParseSqlSnafu, RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaExistsSnafu, TableAlreadyExistSnafu, - TableNotFoundSnafu, TableSnafu, ToTableDeleteRequestSnafu, UnrecognizedTableOptionSnafu, + TableNotFoundSnafu, TableSnafu, UnrecognizedTableOptionSnafu, }; use crate::expr_factory; +use crate::instance::distributed::deleter::DistDeleter; use crate::instance::distributed::inserter::DistInserter; use crate::table::DistTable; @@ -626,27 +627,15 @@ impl DistInstance { async fn handle_dist_delete( &self, - request: DeleteRequest, + request: DeleteRequests, ctx: QueryContextRef, ) -> Result { - let catalog = ctx.current_catalog(); - let schema = ctx.current_schema(); - let table_name = &request.table_name; - let table_ref = TableReference::full(catalog, schema, table_name); - - let table = self - .catalog_manager - .table(catalog, schema, table_name) - .await - .context(CatalogSnafu)? - .with_context(|| TableNotFoundSnafu { - table_name: table_ref.to_string(), - })?; - - let request = common_grpc_expr::delete::to_table_delete_request(request) - .context(ToTableDeleteRequestSnafu)?; - - let affected_rows = table.delete(request).await.context(TableSnafu)?; + let deleter = DistDeleter::new( + ctx.current_catalog().to_string(), + ctx.current_schema().to_string(), + self.catalog_manager(), + ); + let affected_rows = deleter.grpc_delete(request).await?; Ok(Output::AffectedRows(affected_rows)) } @@ -676,11 +665,11 @@ impl GrpcQueryHandler for DistInstance { async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result { match request { Request::Inserts(requests) => self.handle_dist_insert(requests, ctx).await, - Request::Delete(request) => self.handle_dist_delete(request, ctx).await, Request::RowInserts(_) | Request::RowDelete(_) => NotSupportedSnafu { feat: "row insert/delete", } .fail(), + Request::Deletes(requests) => self.handle_dist_delete(requests, ctx).await, Request::Query(_) => { unreachable!("Query should have been handled directly in Frontend Instance!") } diff --git a/src/frontend/src/instance/distributed/deleter.rs b/src/frontend/src/instance/distributed/deleter.rs new file mode 100644 index 0000000000..3ebba9827b --- /dev/null +++ b/src/frontend/src/instance/distributed/deleter.rs @@ -0,0 +1,386 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::iter; +use std::sync::Arc; + +use api::v1::DeleteRequests; +use catalog::CatalogManager; +use client::Database; +use common_grpc_expr::delete::to_table_delete_request; +use common_meta::peer::Peer; +use common_meta::table_name::TableName; +use futures::future; +use snafu::{OptionExt, ResultExt}; +use table::requests::DeleteRequest; + +use crate::catalog::FrontendCatalogManager; +use crate::error::{ + CatalogSnafu, FindDatanodeSnafu, FindTableRouteSnafu, JoinTaskSnafu, + MissingTimeIndexColumnSnafu, RequestDatanodeSnafu, Result, SplitDeleteSnafu, + TableNotFoundSnafu, ToTableDeleteRequestSnafu, +}; +use crate::table::delete::to_grpc_delete_request; + +/// A distributed deleter. It ingests GRPC [DeleteRequests] or table [DeleteRequest] (so it can be +/// used in protocol handlers or table deletion API). +/// +/// Table data partitioning and Datanode requests batching are handled inside. +/// +/// Note that the deleter is confined to a single catalog and schema. I.e., it cannot handle +/// multiple deletes requests with different catalog or schema (will throw "NotSupported" error). +/// This is because we currently do not have this kind of requirements. Let's keep it simple for now. +pub(crate) struct DistDeleter { + catalog: String, + schema: String, + catalog_manager: Arc, +} + +impl DistDeleter { + pub(crate) fn new( + catalog: String, + schema: String, + catalog_manager: Arc, + ) -> Self { + Self { + catalog, + schema, + catalog_manager, + } + } + + pub async fn grpc_delete(&self, requests: DeleteRequests) -> Result { + let deletes = requests + .deletes + .into_iter() + .map(|delete| { + to_table_delete_request(&self.catalog, &self.schema, delete) + .context(ToTableDeleteRequestSnafu) + }) + .collect::>>()?; + self.delete(deletes).await + } + + pub(crate) async fn delete(&self, requests: Vec) -> Result { + debug_assert!(requests + .iter() + .all(|x| x.catalog_name == self.catalog && x.schema_name == self.schema)); + let deletes = self.split_deletes(requests).await?; + self.request_datanodes(deletes).await + } + + async fn split_deletes( + &self, + requests: Vec, + ) -> Result> { + let partition_manager = self.catalog_manager.partition_manager(); + + let mut deletes = HashMap::new(); + + for request in requests { + let table_name = &request.table_name; + let table = self + .catalog_manager + .table(&self.catalog, &self.schema, table_name) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: table_name.to_string(), + })?; + let table_info = table.table_info(); + let table_meta = &table_info.meta; + + let table_id = table_info.table_id(); + let table_name = &request.table_name; + let schema = table.schema(); + let time_index = &schema + .timestamp_column() + .with_context(|| table::error::MissingTimeIndexColumnSnafu { + table_name: table_name.to_string(), + }) + .context(MissingTimeIndexColumnSnafu)? + .name; + let primary_key_column_names = table_info + .meta + .row_key_column_names() + .chain(iter::once(time_index)) + .collect::>(); + let table_name = request.table_name.clone(); + let split = partition_manager + .split_delete_request(table_id, request, primary_key_column_names) + .await + .context(SplitDeleteSnafu)?; + let table_route = partition_manager + .find_table_route(table_id) + .await + .with_context(|_| FindTableRouteSnafu { + table_name: table_name.to_string(), + })?; + + for (region_number, delete) in split { + let datanode = + table_route + .find_region_leader(region_number) + .context(FindDatanodeSnafu { + region: region_number, + })?; + let table_name = TableName::new(&self.catalog, &self.schema, &table_name); + let delete = + to_grpc_delete_request(table_meta, &table_name, region_number, delete)?; + deletes + .entry(datanode.clone()) + .or_insert_with(|| DeleteRequests { deletes: vec![] }) + .deletes + .push(delete); + } + } + Ok(deletes) + } + + async fn request_datanodes(&self, deletes: HashMap) -> Result { + let results = future::try_join_all(deletes.into_iter().map(|(peer, deletes)| { + let datanode_clients = self.catalog_manager.datanode_clients(); + let catalog = self.catalog.clone(); + let schema = self.schema.clone(); + + common_runtime::spawn_write(async move { + let client = datanode_clients.get_client(&peer).await; + let database = Database::new(&catalog, &schema, client); + database.delete(deletes).await.context(RequestDatanodeSnafu) + }) + })) + .await + .context(JoinTaskSnafu)?; + + let affected_rows = results.into_iter().sum::>()?; + Ok(affected_rows as usize) + } +} + +#[cfg(test)] +mod tests { + use api::v1::column::Values; + use api::v1::{Column, ColumnDataType, DeleteRequest as GrpcDeleteRequest, SemanticType}; + use client::client_manager::DatanodeClients; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + use common_meta::helper::{CatalogValue, SchemaValue}; + use common_meta::key::catalog_name::CatalogNameKey; + use common_meta::key::schema_name::SchemaNameKey; + use common_meta::key::table_name::TableNameKey; + use common_meta::key::table_region::RegionDistribution; + use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; + use common_meta::kv_backend::memory::MemoryKvBackend; + use common_meta::kv_backend::{KvBackend, KvBackendRef}; + use common_meta::rpc::store::PutRequest; + use datatypes::prelude::{ConcreteDataType, VectorRef}; + use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema}; + use datatypes::vectors::Int32Vector; + use table::metadata::{RawTableInfo, TableInfoBuilder, TableMetaBuilder}; + + use super::*; + use crate::heartbeat::handler::tests::MockKvCacheInvalidator; + use crate::table::test::create_partition_rule_manager; + + async fn prepare_mocked_backend() -> KvBackendRef { + let backend = Arc::new(MemoryKvBackend::default()); + + let default_catalog = CatalogNameKey { + catalog: DEFAULT_CATALOG_NAME, + } + .to_string(); + let req = PutRequest::new() + .with_key(default_catalog.as_bytes()) + .with_value(CatalogValue.as_bytes().unwrap()); + backend.put(req).await.unwrap(); + + let default_schema = SchemaNameKey { + catalog: DEFAULT_CATALOG_NAME, + schema: DEFAULT_SCHEMA_NAME, + } + .to_string(); + let req = PutRequest::new() + .with_key(default_schema.as_bytes()) + .with_value(SchemaValue.as_bytes().unwrap()); + backend.put(req).await.unwrap(); + + backend + } + + async fn create_testing_table( + table_name: &str, + table_metadata_manager: &TableMetadataManagerRef, + ) { + let schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false) + .with_time_index(true) + .with_default_constraint(Some(ColumnDefaultConstraint::Function( + "current_timestamp()".to_string(), + ))) + .unwrap(), + ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true), + ColumnSchema::new("value", ConcreteDataType::int32_datatype(), false), + ])); + + let table_meta = TableMetaBuilder::default() + .schema(schema) + .primary_key_indices(vec![1]) + .next_column_id(1) + .build() + .unwrap(); + + let table_id = 1; + let table_info: RawTableInfo = TableInfoBuilder::new(table_name, table_meta) + .table_id(table_id) + .build() + .unwrap() + .into(); + + let key = TableNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name); + assert!(table_metadata_manager + .table_name_manager() + .create(&key, table_id) + .await + .is_ok()); + + assert!(table_metadata_manager + .table_info_manager() + .compare_and_put(table_id, None, table_info) + .await + .is_ok()); + + let _ = table_metadata_manager + .table_region_manager() + .compare_and_put( + 1, + None, + RegionDistribution::from([(1, vec![1]), (2, vec![2]), (3, vec![3])]), + ) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_split_deletes() { + let backend = prepare_mocked_backend().await; + + let table_metadata_manager = Arc::new(TableMetadataManager::new(backend.clone())); + let table_name = "one_column_partitioning_table"; + create_testing_table(table_name, &table_metadata_manager).await; + + let catalog_manager = Arc::new(FrontendCatalogManager::new( + backend, + Arc::new(MockKvCacheInvalidator::default()), + create_partition_rule_manager().await, + Arc::new(DatanodeClients::default()), + table_metadata_manager, + )); + + let new_delete_request = |vector: VectorRef| -> DeleteRequest { + DeleteRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: table_name.to_string(), + key_column_values: HashMap::from([("a".to_string(), vector)]), + } + }; + let requests = vec![ + new_delete_request(Arc::new(Int32Vector::from(vec![ + Some(1), + Some(11), + Some(50), + ]))), + new_delete_request(Arc::new(Int32Vector::from(vec![ + Some(2), + Some(12), + Some(102), + ]))), + ]; + + let deleter = DistDeleter::new( + DEFAULT_CATALOG_NAME.to_string(), + DEFAULT_SCHEMA_NAME.to_string(), + catalog_manager, + ); + let mut deletes = deleter.split_deletes(requests).await.unwrap(); + + assert_eq!(deletes.len(), 3); + + let new_grpc_delete_request = |column_values: Vec, + null_mask: Vec, + row_count: u32, + region_number: u32| + -> GrpcDeleteRequest { + GrpcDeleteRequest { + table_name: table_name.to_string(), + key_columns: vec![Column { + column_name: "a".to_string(), + semantic_type: SemanticType::Tag as i32, + values: Some(Values { + i32_values: column_values, + ..Default::default() + }), + null_mask, + datatype: ColumnDataType::Int32 as i32, + }], + row_count, + region_number, + } + }; + + // region to datanode placement: + // 1 -> 1 + // 2 -> 2 + // 3 -> 3 + // + // region value ranges: + // 1 -> [50, max) + // 2 -> [10, 50) + // 3 -> (min, 10) + + let datanode_deletes = deletes.remove(&Peer::new(1, "")).unwrap().deletes; + assert_eq!(datanode_deletes.len(), 2); + + assert_eq!( + datanode_deletes[0], + new_grpc_delete_request(vec![50], vec![0], 1, 1) + ); + assert_eq!( + datanode_deletes[1], + new_grpc_delete_request(vec![102], vec![0], 1, 1) + ); + + let datanode_deletes = deletes.remove(&Peer::new(2, "")).unwrap().deletes; + assert_eq!(datanode_deletes.len(), 2); + assert_eq!( + datanode_deletes[0], + new_grpc_delete_request(vec![11], vec![0], 1, 2) + ); + assert_eq!( + datanode_deletes[1], + new_grpc_delete_request(vec![12], vec![0], 1, 2) + ); + + let datanode_deletes = deletes.remove(&Peer::new(3, "")).unwrap().deletes; + assert_eq!(datanode_deletes.len(), 2); + assert_eq!( + datanode_deletes[0], + new_grpc_delete_request(vec![1], vec![0], 1, 3) + ); + assert_eq!( + datanode_deletes[1], + new_grpc_delete_request(vec![2], vec![0], 1, 3) + ); + } +} diff --git a/src/frontend/src/instance/distributed/inserter.rs b/src/frontend/src/instance/distributed/inserter.rs index dbbaf4b514..fa8b630720 100644 --- a/src/frontend/src/instance/distributed/inserter.rs +++ b/src/frontend/src/instance/distributed/inserter.rs @@ -312,6 +312,7 @@ mod tests { ]; let mut inserts = inserter.split_inserts(requests).await.unwrap(); + assert_eq!(inserts.len(), 3); let new_grpc_insert_request = |column_values: Vec, diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 148a36ea0e..93254b9312 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -90,7 +90,7 @@ impl GrpcQueryHandler for Instance { } } } - Request::Ddl(_) | Request::Delete(_) => { + Request::Ddl(_) | Request::Deletes(_) => { GrpcQueryHandler::do_query(self.grpc_query_handler.as_ref(), request, ctx.clone()) .await? } diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 52758ad64a..281dcdab26 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::any::Any; -use std::iter; use std::pin::Pin; use std::sync::Arc; @@ -24,7 +23,6 @@ use common_meta::table_name::TableName; use common_query::error::Result as QueryResult; use common_query::logical_plan::Expr; use common_query::physical_plan::{PhysicalPlan, PhysicalPlanRef}; -use common_query::Output; use common_recordbatch::adapter::AsyncRecordBatchStreamAdapter; use common_recordbatch::error::{ InitRecordbatchStreamSnafu, PollStreamSnafu, Result as RecordBatchResult, @@ -39,9 +37,8 @@ use datafusion::physical_plan::{ use datafusion_common::DataFusionError; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use futures_util::{Stream, StreamExt}; -use partition::splitter::WriteSplitter; use snafu::prelude::*; -use store_api::storage::{RegionNumber, ScanRequest}; +use store_api::storage::ScanRequest; use table::error::TableOperationSnafu; use table::metadata::{FilterPushDownType, TableInfoRef, TableType}; use table::requests::{DeleteRequest, InsertRequest}; @@ -49,12 +46,12 @@ use table::Table; use tokio::sync::RwLock; use crate::catalog::FrontendCatalogManager; -use crate::error::{FindDatanodeSnafu, FindTableRouteSnafu, Result}; +use crate::error::Result; +use crate::instance::distributed::deleter::DistDeleter; use crate::instance::distributed::inserter::DistInserter; -use crate::table::delete::to_grpc_delete_request; use crate::table::scan::{DatanodeInstance, TableScanPlan}; -mod delete; +pub mod delete; pub mod insert; pub(crate) mod scan; @@ -174,58 +171,17 @@ impl Table for DistTable { } async fn delete(&self, request: DeleteRequest) -> table::Result { - let partition_manager = self.catalog_manager.partition_manager(); - - let table_id = self.table_info.table_id(); - let partition_rule = partition_manager - .find_table_partition_rule(table_id) + let deleter = DistDeleter::new( + request.catalog_name.clone(), + request.schema_name.clone(), + self.catalog_manager.clone(), + ); + let affected_rows = deleter + .delete(vec![request]) .await .map_err(BoxedError::new) .context(TableOperationSnafu)?; - - let schema = self.schema(); - let time_index = &schema - .timestamp_column() - .with_context(|| table::error::MissingTimeIndexColumnSnafu { - table_name: self.table_name.to_string(), - })? - .name; - - let table_info = self.table_info(); - let key_column_names = table_info - .meta - .row_key_column_names() - .chain(iter::once(time_index)) - .collect::>(); - - let requests = WriteSplitter::with_partition_rule(partition_rule) - .split_delete(request, key_column_names) - .map_err(BoxedError::new) - .and_then(|requests| { - requests - .into_iter() - .map(|(region_number, request)| { - to_grpc_delete_request( - &table_info.meta, - &self.table_name, - region_number, - request, - ) - }) - .collect::>>() - .map_err(BoxedError::new) - }) - .context(TableOperationSnafu)?; - - let output = self - .dist_delete(requests) - .await - .map_err(BoxedError::new) - .context(TableOperationSnafu)?; - let Output::AffectedRows(rows) = output else { - unreachable!() - }; - Ok(rows) + Ok(affected_rows) } } @@ -241,39 +197,6 @@ impl DistTable { catalog_manager, } } - - async fn find_datanode_instances( - &self, - regions: &[RegionNumber], - ) -> Result> { - let table_name = &self.table_name; - let route = self - .catalog_manager - .partition_manager() - .find_table_route(self.table_info.table_id()) - .await - .with_context(|_| FindTableRouteSnafu { - table_name: table_name.to_string(), - })?; - - let datanodes = regions - .iter() - .map(|&n| { - route - .find_region_leader(n) - .context(FindDatanodeSnafu { region: n }) - }) - .collect::>>()?; - - let datanode_clients = self.catalog_manager.datanode_clients(); - let mut instances = Vec::with_capacity(datanodes.len()); - for datanode in datanodes { - let client = datanode_clients.get_client(datanode).await; - let db = Database::new(&table_name.catalog_name, &table_name.schema_name, client); - instances.push(DatanodeInstance::new(Arc::new(self.clone()) as _, db)); - } - Ok(instances) - } } fn project_schema(table_schema: SchemaRef, projection: Option<&Vec>) -> SchemaRef { diff --git a/src/frontend/src/table/delete.rs b/src/frontend/src/table/delete.rs index c8a3a709ec..9a6b3056db 100644 --- a/src/frontend/src/table/delete.rs +++ b/src/frontend/src/table/delete.rs @@ -14,41 +14,14 @@ use api::v1::DeleteRequest as GrpcDeleteRequest; use common_meta::table_name::TableName; -use common_query::Output; -use futures::future; -use snafu::ResultExt; use store_api::storage::RegionNumber; use table::metadata::TableMeta; use table::requests::DeleteRequest; -use crate::error::{JoinTaskSnafu, RequestDatanodeSnafu, Result}; +use crate::error::Result; use crate::table::insert::to_grpc_columns; -use crate::table::DistTable; -impl DistTable { - pub(super) async fn dist_delete(&self, requests: Vec) -> Result { - let regions = requests.iter().map(|x| x.region_number).collect::>(); - let instances = self.find_datanode_instances(®ions).await?; - - let results = future::try_join_all(instances.into_iter().zip(requests).map( - |(instance, request)| { - common_runtime::spawn_write(async move { - instance - .grpc_delete(request) - .await - .context(RequestDatanodeSnafu) - }) - }, - )) - .await - .context(JoinTaskSnafu)?; - - let affected_rows = results.into_iter().sum::>()?; - Ok(Output::AffectedRows(affected_rows as _)) - } -} - -pub(super) fn to_grpc_delete_request( +pub fn to_grpc_delete_request( table_meta: &TableMeta, table_name: &TableName, region_number: RegionNumber, @@ -103,7 +76,12 @@ mod tests { "id".to_string(), Arc::new(Int32Vector::from_slice(vec![1, 2, 3])) as VectorRef, )]); - let request = DeleteRequest { key_column_values }; + let request = DeleteRequest { + catalog_name: table_name.catalog_name.to_string(), + schema_name: table_name.schema_name.to_string(), + table_name: table_name.table_name.to_string(), + key_column_values, + }; let result = to_grpc_delete_request(&table_meta, &table_name, region_number, request).unwrap(); diff --git a/src/frontend/src/table/scan.rs b/src/frontend/src/table/scan.rs index 55b0015185..979eb6ca64 100644 --- a/src/frontend/src/table/scan.rs +++ b/src/frontend/src/table/scan.rs @@ -15,7 +15,6 @@ use std::fmt::Formatter; use std::sync::Arc; -use api::v1::DeleteRequest; use client::Database; use common_meta::table_name::TableName; use common_query::prelude::Expr; @@ -47,10 +46,6 @@ impl DatanodeInstance { Self { table, db } } - pub(crate) async fn grpc_delete(&self, request: DeleteRequest) -> client::Result { - self.db.delete(request).await - } - pub(crate) async fn grpc_table_scan(&self, plan: TableScanPlan) -> Result { let logical_plan = self.build_logical_plan(&plan)?; diff --git a/src/mito/src/engine/tests.rs b/src/mito/src/engine/tests.rs index 7ef3946d87..be533a8ca9 100644 --- a/src/mito/src/engine/tests.rs +++ b/src/mito/src/engine/tests.rs @@ -843,7 +843,12 @@ async fn test_table_delete_rows() { let del_tss: VectorRef = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2])); let key_column_values = HashMap::from([("host".to_string(), del_hosts), ("ts".to_string(), del_tss)]); - let del_req = DeleteRequest { key_column_values }; + let del_req = DeleteRequest { + catalog_name: "foo_catalog".to_string(), + schema_name: "foo_schema".to_string(), + table_name: "demo".to_string(), + key_column_values, + }; let _ = table.delete(del_req).await.unwrap(); let stream = table.scan_to_stream(ScanRequest::default()).await.unwrap(); diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index 80fbc19d4e..60ac8e8648 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -278,7 +278,6 @@ impl Table for MitoTable { let key_column_values = request.key_column_values.clone(); // Safety: key_column_values isn't empty. let rows_num = key_column_values.values().next().unwrap().len(); - logging::trace!( "Delete from table {} where key_columns are: {:?}", self.table_info().name, diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index 60cea2f8fa..e6f3021d5f 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -24,14 +24,14 @@ use datatypes::schema::Schema; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; -use table::requests::InsertRequest; +use table::requests::{DeleteRequest, InsertRequest}; use crate::columns::RangeColumnsPartitionRule; use crate::error::{FindLeaderSnafu, Result}; use crate::partition::{PartitionBound, PartitionDef, PartitionExpr}; use crate::range::RangePartitionRule; use crate::route::TableRoutes; -use crate::splitter::{InsertRequestSplit, WriteSplitter}; +use crate::splitter::{DeleteRequestSplit, InsertRequestSplit, WriteSplitter}; use crate::{error, PartitionRuleRef}; #[async_trait::async_trait] @@ -242,10 +242,21 @@ impl PartitionRuleManager { req: InsertRequest, schema: &Schema, ) -> Result { - let partition_rule = self.find_table_partition_rule(table).await.unwrap(); + let partition_rule = self.find_table_partition_rule(table).await?; let splitter = WriteSplitter::with_partition_rule(partition_rule); splitter.split_insert(req, schema) } + + pub async fn split_delete_request( + &self, + table: TableId, + req: DeleteRequest, + primary_key_column_names: Vec<&String>, + ) -> Result { + let partition_rule = self.find_table_partition_rule(table).await?; + let splitter = WriteSplitter::with_partition_rule(partition_rule); + splitter.split_delete(req, primary_key_column_names) + } } fn find_regions0(partition_rule: PartitionRuleRef, filter: &Expr) -> Result> { diff --git a/src/partition/src/splitter.rs b/src/partition/src/splitter.rs index 7ff422b257..ceeac2e1da 100644 --- a/src/partition/src/splitter.rs +++ b/src/partition/src/splitter.rs @@ -121,7 +121,15 @@ impl WriteSplitter { (column_name.to_string(), builder.to_vector()) }) .collect(); - (region_id, DeleteRequest { key_column_values }) + ( + region_id, + DeleteRequest { + catalog_name: request.catalog_name.clone(), + schema_name: request.schema_name.clone(), + table_name: request.table_name.clone(), + key_column_values, + }, + ) }) .collect(); Ok(requests) @@ -439,7 +447,12 @@ mod tests { "host".to_string(), Arc::new(StringVector::from(vec!["localhost"])) as _, ); - let delete = DeleteRequest { key_column_values }; + let delete = DeleteRequest { + catalog_name: "foo_catalog".to_string(), + schema_name: "foo_schema".to_string(), + table_name: "foo_table".to_string(), + key_column_values, + }; let rule = Arc::new(EmptyPartitionRule) as PartitionRuleRef; let spliter = WriteSplitter::with_partition_rule(rule); let ret = spliter diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 7f387d259d..3bccb75054 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -146,6 +146,9 @@ impl DatafusionQueryEngine { table: &TableRef, column_vectors: HashMap, ) -> Result { + let catalog_name = table_name.catalog.to_string(); + let schema_name = table_name.schema.to_string(); + let table_name = table_name.table.to_string(); let table_schema = table.schema(); let ts_column = table_schema .timestamp_column() @@ -165,6 +168,9 @@ impl DatafusionQueryEngine { .collect::>(); let request = DeleteRequest { + catalog_name, + schema_name, + table_name, key_column_values: column_vectors, }; diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index e0674204a1..3502807644 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -160,7 +160,7 @@ impl GrpcQueryHandler for DummyInstance { ) -> std::result::Result { let output = match request { Request::Inserts(_) - | Request::Delete(_) + | Request::Deletes(_) | Request::RowInserts(_) | Request::RowDelete(_) => unimplemented!(), Request::Query(query_request) => { diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 879a46b9e9..7ca269c68d 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -251,6 +251,9 @@ pub struct InsertRequest { /// Delete (by primary key) request #[derive(Debug)] pub struct DeleteRequest { + pub catalog_name: String, + pub schema_name: String, + pub table_name: String, /// Values of each column in this table's primary key and time index. /// /// The key is the column name, and the value is the column value. diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index 1ea5573afa..7447fdfeb8 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -22,8 +22,8 @@ mod test { use api::v1::query_request::Query; use api::v1::{ alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, - CreateDatabaseExpr, CreateTableExpr, DdlRequest, DeleteRequest, DropTableExpr, - FlushTableExpr, InsertRequest, InsertRequests, QueryRequest, SemanticType, + CreateDatabaseExpr, CreateTableExpr, DdlRequest, DeleteRequest, DeleteRequests, + DropTableExpr, FlushTableExpr, InsertRequest, InsertRequests, QueryRequest, SemanticType, }; use common_catalog::consts::MITO_ENGINE; use common_query::Output; @@ -216,7 +216,6 @@ CREATE TABLE {table_name} ( | ts | a | b | +---------------------+---+-------------------+ | 2023-01-01T07:26:12 | 1 | ts: 1672557972000 | -| 2023-01-01T07:26:14 | 3 | ts: 1672557974000 | | 2023-01-01T07:26:15 | 4 | ts: 1672557975000 | | 2023-01-01T07:26:16 | 5 | ts: 1672557976000 | | 2023-01-01T07:26:17 | | ts: 1672557977000 | @@ -250,7 +249,6 @@ CREATE TABLE {table_name} ( +---------------------+----+-------------------+ | 2023-01-01T07:26:24 | 50 | ts: 1672557984000 | | 2023-01-01T07:26:25 | 51 | ts: 1672557985000 | -| 2023-01-01T07:26:27 | 53 | ts: 1672557987000 | +---------------------+----+-------------------+", ), ]), @@ -527,7 +525,7 @@ CREATE TABLE {table_name} ( +---------------------+----+-------------------+"; assert_eq!(recordbatches.pretty_print().unwrap(), expected); - let delete = DeleteRequest { + let new_grpc_delete_request = |a, b, ts, row_count| DeleteRequest { table_name: table_name.to_string(), region_number: 0, key_columns: vec![ @@ -535,7 +533,7 @@ CREATE TABLE {table_name} ( column_name: "a".to_string(), semantic_type: SemanticType::Field as i32, values: Some(Values { - i32_values: vec![2, 12, 22, 52], + i32_values: a, ..Default::default() }), datatype: ColumnDataType::Int32 as i32, @@ -545,12 +543,7 @@ CREATE TABLE {table_name} ( column_name: "b".to_string(), semantic_type: SemanticType::Tag as i32, values: Some(Values { - string_values: vec![ - "ts: 1672557973000".to_string(), - "ts: 1672557979000".to_string(), - "ts: 1672557982000".to_string(), - "ts: 1672557986000".to_string(), - ], + string_values: b, ..Default::default() }), datatype: ColumnDataType::String as i32, @@ -560,22 +553,43 @@ CREATE TABLE {table_name} ( column_name: "ts".to_string(), semantic_type: SemanticType::Timestamp as i32, values: Some(Values { - ts_millisecond_values: vec![ - 1672557973000, - 1672557979000, - 1672557982000, - 1672557986000, - ], + ts_millisecond_values: ts, ..Default::default() }), datatype: ColumnDataType::TimestampMillisecond as i32, ..Default::default() }, ], - row_count: 4, + row_count, }; - let output = query(instance, Request::Delete(delete)).await; - assert!(matches!(output, Output::AffectedRows(4))); + let delete1 = new_grpc_delete_request( + vec![2, 12, 22, 52], + vec![ + "ts: 1672557973000".to_string(), + "ts: 1672557979000".to_string(), + "ts: 1672557982000".to_string(), + "ts: 1672557986000".to_string(), + ], + vec![1672557973000, 1672557979000, 1672557982000, 1672557986000], + 4, + ); + let delete2 = new_grpc_delete_request( + vec![3, 53], + vec![ + "ts: 1672557974000".to_string(), + "ts: 1672557987000".to_string(), + ], + vec![1672557974000, 1672557987000], + 2, + ); + let output = query( + instance, + Request::Deletes(DeleteRequests { + deletes: vec![delete1, delete2], + }), + ) + .await; + assert!(matches!(output, Output::AffectedRows(6))); let output = query(instance, request).await; let Output::Stream(stream) = output else { @@ -587,7 +601,6 @@ CREATE TABLE {table_name} ( | ts | a | b | +---------------------+----+-------------------+ | 2023-01-01T07:26:12 | 1 | ts: 1672557972000 | -| 2023-01-01T07:26:14 | 3 | ts: 1672557974000 | | 2023-01-01T07:26:15 | 4 | ts: 1672557975000 | | 2023-01-01T07:26:16 | 5 | ts: 1672557976000 | | 2023-01-01T07:26:17 | | ts: 1672557977000 | @@ -597,7 +610,6 @@ CREATE TABLE {table_name} ( | 2023-01-01T07:26:23 | 23 | ts: 1672557983000 | | 2023-01-01T07:26:24 | 50 | ts: 1672557984000 | | 2023-01-01T07:26:25 | 51 | ts: 1672557985000 | -| 2023-01-01T07:26:27 | 53 | ts: 1672557987000 | +---------------------+----+-------------------+"; assert_eq!(recordbatches.pretty_print().unwrap(), expected); } @@ -765,7 +777,13 @@ CREATE TABLE {table_name} ( row_count: 2, }; - let output = query(instance, Request::Delete(delete)).await; + let output = query( + instance, + Request::Deletes(DeleteRequests { + deletes: vec![delete], + }), + ) + .await; assert!(matches!(output, Output::AffectedRows(2))); let output = query(instance, request).await;