feat: handle multiple grpc deletes (#2150)

* feat: handle multiple grpc deletes

* fix: make DistDeleter::grpc_delete return usize

* fix: remove backtrace from MissingTimeIndexColumn

* fix: avoid using unwrap in PartitionRuleManager::split_delete_request

* fix: simplify MissingTimeIndexColumn
This commit is contained in:
Niwaka
2023-08-15 17:22:46 +09:00
committed by GitHub
parent d4565c0a94
commit a8f2e4468d
22 changed files with 757 additions and 393 deletions

325
Cargo.lock generated
View File

@@ -64,9 +64,9 @@ dependencies = [
[[package]]
name = "aho-corasick"
version = "1.0.2"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43f6cb1bf222025340178f382c426f13757b2960e89779dfcb319c32542a5a41"
checksum = "86b8f9420f797f2d9e935edf629310eb938a0d839f984e25327f3c7eed22300c"
dependencies = [
"memchr",
]
@@ -183,9 +183,9 @@ dependencies = [
[[package]]
name = "anstyle-wincon"
version = "1.0.1"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "180abfa45703aebe0093f79badacc01b8fd4ea2e35118747e5811127f926e188"
checksum = "c677ab05e09154296dd37acecd46420c17b9713e8366facafa8fc0885167cf4c"
dependencies = [
"anstyle",
"windows-sys 0.48.0",
@@ -469,7 +469,7 @@ version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2bebcb57eef570b15afbcf2d07d813eb476fde9f6dd69c81004d6476c197e87e"
dependencies = [
"bitflags 2.3.3",
"bitflags 2.4.0",
"serde",
]
@@ -589,9 +589,9 @@ dependencies = [
[[package]]
name = "async-lock"
version = "2.7.0"
version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa24f727524730b077666307f2734b4a1a1c57acb79193127dcc8914d5242dd7"
checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b"
dependencies = [
"event-listener",
]
@@ -631,9 +631,9 @@ dependencies = [
[[package]]
name = "async-trait"
version = "0.1.72"
version = "0.1.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc6dde6e4ed435a4c1ee4e73592f5ba9da2151af10076cc04858746af9352d09"
checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0"
dependencies = [
"proc-macro2",
"quote",
@@ -712,9 +712,9 @@ dependencies = [
[[package]]
name = "axum"
version = "0.6.19"
version = "0.6.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6a1de45611fdb535bfde7b7de4fd54f4fd2b17b1737c0a59b69bf9b92074b8c"
checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf"
dependencies = [
"async-trait",
"axum-core",
@@ -849,7 +849,7 @@ name = "benchmarks"
version = "0.3.2"
dependencies = [
"arrow",
"clap 4.3.19",
"clap 4.3.21",
"client",
"indicatif",
"itertools 0.10.5",
@@ -903,7 +903,7 @@ version = "0.66.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2b84e06fc203107bfbad243f4aba2af864eb7db3b1cf46ea0a023b0b433d2a7"
dependencies = [
"bitflags 2.3.3",
"bitflags 2.4.0",
"cexpr",
"clang-sys",
"lazy_static",
@@ -940,9 +940,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitflags"
version = "2.3.3"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "630be753d4e58660abd17930c71b647fe46c27ea6b63cc59e1e3851406972e42"
checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635"
[[package]]
name = "bitvec"
@@ -1072,7 +1072,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6798148dccfbff0fae41c7574d2fa8f1ef3492fba0face179de5d8d447d67b05"
dependencies = [
"memchr",
"regex-automata 0.3.4",
"regex-automata 0.3.6",
"serde",
]
@@ -1204,7 +1204,7 @@ checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa"
dependencies = [
"camino",
"cargo-platform",
"semver 1.0.18",
"semver",
"serde",
"serde_json",
]
@@ -1256,7 +1256,7 @@ dependencies = [
"meta-client",
"metrics",
"mito",
"moka 0.11.2",
"moka 0.11.3",
"object-store",
"parking_lot 0.12.1",
"regex",
@@ -1272,11 +1272,12 @@ dependencies = [
[[package]]
name = "cc"
version = "1.0.79"
version = "1.0.82"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f"
checksum = "305fe645edc1442a0fa8b6726ba61d422798d37a52e12eaecf4b022ebbb88f01"
dependencies = [
"jobserver",
"libc",
]
[[package]]
@@ -1446,9 +1447,9 @@ dependencies = [
[[package]]
name = "clap"
version = "4.3.19"
version = "4.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fd304a20bff958a57f04c4e96a2e7594cc4490a0e809cbd48bb6437edaa452d"
checksum = "c27cdf28c0f604ba3f512b0c9a409f8de8513e4816705deb0498b627e7c3a3fd"
dependencies = [
"clap_builder",
"clap_derive 4.3.12",
@@ -1457,9 +1458,9 @@ dependencies = [
[[package]]
name = "clap_builder"
version = "4.3.19"
version = "4.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01c6a3f08f1fe5662a35cfe393aec09c4df95f60ee93b7556505260f75eee9e1"
checksum = "08a9f1ab5e9f01a9b81f202e8562eb9a10de70abf9eaeac1be465c28b75aa4aa"
dependencies = [
"anstream",
"anstyle",
@@ -1530,7 +1531,7 @@ dependencies = [
"derive-new",
"enum_dispatch",
"futures-util",
"moka 0.9.8",
"moka 0.9.9",
"parking_lot 0.12.1",
"prost",
"rand",
@@ -2070,9 +2071,9 @@ checksum = "e4c78c047431fee22c1a7bb92e00ad095a02a983affe4d8a72e2a2c62c1b94f3"
[[package]]
name = "const-oid"
version = "0.9.4"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "795bc6e66a8e340f075fcf6227e417a2dc976b92b91f3cdc778bb858778b6747"
checksum = "28c122c3980598d243d63d9a704629a2d748d101f278052ff068be5a4423ab6f"
[[package]]
name = "const-random"
@@ -2521,7 +2522,7 @@ dependencies = [
"lazy_static",
"sqlparser 0.35.0",
"strum 0.25.0",
"strum_macros 0.25.1",
"strum_macros 0.25.2",
]
[[package]]
@@ -2612,7 +2613,7 @@ dependencies = [
"object_store",
"prost",
"prost-types",
"substrait 0.12.3",
"substrait 0.12.4",
"tokio",
]
@@ -2731,20 +2732,20 @@ dependencies = [
[[package]]
name = "der"
version = "0.7.7"
version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c7ed52955ce76b1554f509074bb357d3fb8ac9b51288a65a3fd480d1dfba946"
checksum = "fffa369a668c8af7dbf8b5e56c9f744fbd399949ed171606040001947de40b1c"
dependencies = [
"const-oid 0.9.4",
"const-oid 0.9.5",
"pem-rfc7468 0.7.0",
"zeroize",
]
[[package]]
name = "deranged"
version = "0.3.6"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8810e7e2cf385b1e9b50d68264908ec367ba642c96d02edfe61c39e88e2a3c01"
checksum = "7684a49fb1af197853ef7b2ee694bc1f5b4179556f1e5710e1760c5db6f5e929"
[[package]]
name = "derive-new"
@@ -2832,7 +2833,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
dependencies = [
"block-buffer",
"const-oid 0.9.4",
"const-oid 0.9.5",
"crypto-common",
"subtle",
]
@@ -3142,7 +3143,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef033ed5e9bad94e55838ca0ca906db0e043f517adda0c8b79c7a8c66c93c1b5"
dependencies = [
"cfg-if 1.0.0",
"rustix 0.38.4",
"rustix 0.38.8",
"windows-sys 0.48.0",
]
@@ -3175,13 +3176,13 @@ dependencies = [
[[package]]
name = "filetime"
version = "0.2.21"
version = "0.2.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5cbc844cecaee9d4443931972e1289c8ff485cb4cc2767cb03ca139ed6885153"
checksum = "d4029edd3e734da6fe05b6cd7bd2960760a616bd2ddd0d59a0124746d6272af0"
dependencies = [
"cfg-if 1.0.0",
"libc",
"redox_syscall 0.2.16",
"redox_syscall 0.3.5",
"windows-sys 0.48.0",
]
@@ -3216,7 +3217,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dac53e22462d78c16d64a1cd22371b54cc3fe94aa15e7886a2fa6e5d1ab8640"
dependencies = [
"bitflags 1.3.2",
"rustc_version 0.4.0",
"rustc_version",
]
[[package]]
@@ -3296,7 +3297,7 @@ dependencies = [
"meter-macros",
"metrics",
"mito",
"moka 0.9.8",
"moka 0.9.9",
"object-store",
"openmetrics-parser",
"opentelemetry-proto",
@@ -3744,7 +3745,7 @@ dependencies = [
"bstr 1.6.0",
"itoa",
"thiserror",
"time 0.3.24",
"time 0.3.25",
]
[[package]]
@@ -4135,7 +4136,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=9b68af55c050a010f202fcccb22d58f080f0a868#9b68af55c050a010f202fcccb22d58f080f0a868"
source = "git+https://github.com/NiwakaDev/greptime-proto.git?rev=ec402b6500f908a0acfab6c889225cd4dc2228a4#ec402b6500f908a0acfab6c889225cd4dc2228a4"
dependencies = [
"prost",
"serde",
@@ -4374,9 +4375,9 @@ checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904"
[[package]]
name = "httpdate"
version = "1.0.2"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]]
name = "humantime"
@@ -4427,7 +4428,7 @@ dependencies = [
"futures-util",
"http",
"hyper",
"rustls 0.21.5",
"rustls 0.21.6",
"tokio",
"tokio-rustls 0.24.1",
]
@@ -4541,9 +4542,9 @@ dependencies = [
[[package]]
name = "indicatif"
version = "0.17.5"
version = "0.17.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ff8cc23a7393a397ed1d7f56e6365cba772aba9f9912ab968b03043c395d057"
checksum = "0b297dc40733f23a0e52728a58fa9489a5b7638a324932de16b41adc3ef80730"
dependencies = [
"console",
"instant",
@@ -4664,7 +4665,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b"
dependencies = [
"hermit-abi 0.3.2",
"rustix 0.38.4",
"rustix 0.38.8",
"windows-sys 0.48.0",
]
@@ -4976,9 +4977,9 @@ dependencies = [
[[package]]
name = "log"
version = "0.4.19"
version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "log-store"
@@ -5174,9 +5175,9 @@ checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5"
[[package]]
name = "matchit"
version = "0.7.1"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67827e6ea8ee8a7c4a72227ef4fc08957040acffdb5f122733b24fa12daff41b"
checksum = "ed1202b2a6f884ae56f04cff409ab315c5ce26b5e58d7412e484f01fd52f52ef"
[[package]]
name = "matrixmultiply"
@@ -5563,9 +5564,9 @@ dependencies = [
[[package]]
name = "moka"
version = "0.9.8"
version = "0.9.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19ca9b167ed904bc89a2f64c4be5014615c26fd9c4ddd2042c6094744c7df11a"
checksum = "b28455ac4363046076054a7e9cfbd7f168019c29dba32a625f59fc0aeffaaea4"
dependencies = [
"async-io",
"async-lock",
@@ -5577,7 +5578,7 @@ dependencies = [
"once_cell",
"parking_lot 0.12.1",
"quanta 0.11.1",
"rustc_version 0.4.0",
"rustc_version",
"scheduled-thread-pool",
"skeptic",
"smallvec",
@@ -5589,9 +5590,9 @@ dependencies = [
[[package]]
name = "moka"
version = "0.11.2"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "206bf83f415b0579fd885fe0804eb828e727636657dc1bf73d80d2f1218e14a1"
checksum = "fa6e72583bf6830c956235bff0d5afec8cf2952f579ebad18ae7821a917d950f"
dependencies = [
"async-io",
"async-lock",
@@ -5602,7 +5603,7 @@ dependencies = [
"once_cell",
"parking_lot 0.12.1",
"quanta 0.11.1",
"rustc_version 0.4.0",
"rustc_version",
"scheduled-thread-pool",
"skeptic",
"smallvec",
@@ -5665,7 +5666,7 @@ dependencies = [
"percent-encoding",
"pin-project",
"priority-queue",
"rustls 0.21.5",
"rustls 0.21.6",
"rustls-pemfile",
"serde",
"serde_json",
@@ -5689,7 +5690,7 @@ dependencies = [
"base64 0.21.2",
"bigdecimal",
"bindgen 0.66.1",
"bitflags 2.3.3",
"bitflags 2.4.0",
"bitvec",
"byteorder",
"bytes",
@@ -5715,7 +5716,7 @@ dependencies = [
"smallvec",
"subprocess",
"thiserror",
"time 0.3.24",
"time 0.3.25",
"uuid",
]
@@ -5868,9 +5869,9 @@ dependencies = [
[[package]]
name = "num-complex"
version = "0.4.3"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "02e0d21255c828d6f128a1e41534206671e8c3ea0c62f32291e808dc82cff17d"
checksum = "1ba157ca0885411de85d6ca030ba7e2a83a28636056c7c699b07c8b6f7383214"
dependencies = [
"num-traits",
]
@@ -6452,7 +6453,7 @@ dependencies = [
"datafusion-expr",
"datatypes",
"meta-client",
"moka 0.9.8",
"moka 0.9.9",
"serde",
"serde_json",
"snafu",
@@ -6523,9 +6524,9 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
[[package]]
name = "pest"
version = "2.7.1"
version = "2.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d2d1d55045829d65aad9d389139882ad623b33b904e7c9f1b10c5b8927298e5"
checksum = "1acb4a4365a13f749a93f1a094a7805e5cfa0955373a9de860d962eaa3a5fe5a"
dependencies = [
"thiserror",
"ucd-trie",
@@ -6533,9 +6534,9 @@ dependencies = [
[[package]]
name = "pest_derive"
version = "2.7.1"
version = "2.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f94bca7e7a599d89dea5dfa309e217e7906c3c007fb9c3299c40b10d6a315d3"
checksum = "666d00490d4ac815001da55838c500eafb0320019bbaa44444137c48b443a853"
dependencies = [
"pest",
"pest_generator",
@@ -6543,9 +6544,9 @@ dependencies = [
[[package]]
name = "pest_generator"
version = "2.7.1"
version = "2.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99d490fe7e8556575ff6911e45567ab95e71617f43781e5c05490dc8d75c965c"
checksum = "68ca01446f50dbda87c1786af8770d535423fa8a53aec03b8f4e3d7eb10e0929"
dependencies = [
"pest",
"pest_meta",
@@ -6556,9 +6557,9 @@ dependencies = [
[[package]]
name = "pest_meta"
version = "2.7.1"
version = "2.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2674c66ebb4b4d9036012091b537aae5878970d6999f81a265034d85b136b341"
checksum = "56af0a30af74d0445c0bf6d9d051c979b516a1a5af790d251daee76005420a48"
dependencies = [
"once_cell",
"pest",
@@ -6596,7 +6597,7 @@ dependencies = [
"ring",
"stringprep",
"thiserror",
"time 0.3.24",
"time 0.3.25",
"tokio",
"tokio-rustls 0.24.1",
"tokio-util",
@@ -6653,18 +6654,18 @@ dependencies = [
[[package]]
name = "pin-project"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "030ad2bc4db10a8944cb0d837f158bdfec4d4a4873ab701a95046770d11f8842"
checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec2e072ecce94ec471b13398d5402c188e76ac03cf74dd1a975161b23a3f6d9c"
checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405"
dependencies = [
"proc-macro2",
"quote",
@@ -6673,9 +6674,9 @@ dependencies = [
[[package]]
name = "pin-project-lite"
version = "0.2.10"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c40d25201921e5ff0c862a505c6557ea88568a4e3ace775ab55e93f2f4f9d57"
checksum = "12cc1b0bf1727a77a54b6654e7b5f1af8604923edc8b81885f8ec92f9e3f0a05"
[[package]]
name = "pin-utils"
@@ -6700,7 +6701,7 @@ version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8ffb9f10fa047879315e6625af03c164b16962a5368d724ed16323b68ace47f"
dependencies = [
"der 0.7.7",
"der 0.7.8",
"pkcs8 0.10.2",
"spki 0.7.2",
]
@@ -6722,7 +6723,7 @@ version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7"
dependencies = [
"der 0.7.7",
"der 0.7.8",
"spki 0.7.2",
]
@@ -7185,9 +7186,9 @@ checksum = "3b7e158a385023d209d6d5f2585c4b468f6dcb3dd5aca9b75c4f1678c05bb375"
[[package]]
name = "pyo3"
version = "0.19.1"
version = "0.19.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffb88ae05f306b4bfcde40ac4a51dc0b05936a9207a4b75b798c7729c4258a59"
checksum = "e681a6cfdc4adcc93b4d3cf993749a4552018ee0a9b65fc0ccfad74352c72a38"
dependencies = [
"cfg-if 1.0.0",
"indoc",
@@ -7202,9 +7203,9 @@ dependencies = [
[[package]]
name = "pyo3-build-config"
version = "0.19.1"
version = "0.19.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "554db24f0b3c180a9c0b1268f91287ab3f17c162e15b54caaae5a6b3773396b0"
checksum = "076c73d0bc438f7a4ef6fdd0c3bb4732149136abd952b110ac93e4edb13a6ba5"
dependencies = [
"once_cell",
"target-lexicon",
@@ -7212,9 +7213,9 @@ dependencies = [
[[package]]
name = "pyo3-ffi"
version = "0.19.1"
version = "0.19.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "922ede8759e8600ad4da3195ae41259654b9c55da4f7eec84a0ccc7d067a70a4"
checksum = "e53cee42e77ebe256066ba8aa77eff722b3bb91f3419177cf4cd0f304d3284d9"
dependencies = [
"libc",
"pyo3-build-config",
@@ -7222,9 +7223,9 @@ dependencies = [
[[package]]
name = "pyo3-macros"
version = "0.19.1"
version = "0.19.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a5caec6a1dd355964a841fcbeeb1b89fe4146c87295573f94228911af3cc5a2"
checksum = "dfeb4c99597e136528c6dd7d5e3de5434d1ceaf487436a3f03b2d56b6fc9efd1"
dependencies = [
"proc-macro2",
"pyo3-macros-backend",
@@ -7234,9 +7235,9 @@ dependencies = [
[[package]]
name = "pyo3-macros-backend"
version = "0.19.1"
version = "0.19.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0b78ccbb160db1556cdb6fd96c50334c5d4ec44dc5e0a968d0a1208fa0efa8b"
checksum = "947dc12175c254889edc0c02e399476c2f652b4b9ebd123aa655c224de259536"
dependencies = [
"proc-macro2",
"quote",
@@ -7536,13 +7537,13 @@ dependencies = [
[[package]]
name = "regex"
version = "1.9.1"
version = "1.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2eae68fc220f7cf2532e4494aded17545fce192d59cd996e0fe7887f4ceb575"
checksum = "81bc1d4caf89fac26a70747fe603c130093b53c773888797a6329091246d651a"
dependencies = [
"aho-corasick 1.0.2",
"aho-corasick 1.0.3",
"memchr",
"regex-automata 0.3.4",
"regex-automata 0.3.6",
"regex-syntax 0.7.4",
]
@@ -7557,11 +7558,11 @@ dependencies = [
[[package]]
name = "regex-automata"
version = "0.3.4"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7b6d6190b7594385f61bd3911cd1be99dfddcfc365a4160cc2ab5bff4aed294"
checksum = "fed1ceff11a1dddaee50c9dc8e4938bd106e9d89ae372f192311e7da498e3b69"
dependencies = [
"aho-corasick 1.0.2",
"aho-corasick 1.0.3",
"memchr",
"regex-syntax 0.7.4",
]
@@ -7662,7 +7663,7 @@ dependencies = [
"once_cell",
"percent-encoding",
"pin-project-lite",
"rustls 0.21.5",
"rustls 0.21.6",
"rustls-native-certs",
"rustls-pemfile",
"serde",
@@ -7847,7 +7848,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ab43bb47d23c1a631b4b680199a45255dce26fa9ab2fa902581f624ff13e6a8"
dependencies = [
"byteorder",
"const-oid 0.9.4",
"const-oid 0.9.5",
"digest",
"num-bigint-dig",
"num-integer",
@@ -7871,7 +7872,7 @@ dependencies = [
"futures",
"futures-timer",
"rstest_macros",
"rustc_version 0.4.0",
"rustc_version",
]
[[package]]
@@ -7883,7 +7884,7 @@ dependencies = [
"cfg-if 1.0.0",
"proc-macro2",
"quote",
"rustc_version 0.4.0",
"rustc_version",
"syn 1.0.109",
"unicode-ident",
]
@@ -7896,7 +7897,7 @@ checksum = "45f80dcc84beab3a327bbe161f77db25f336a1452428176787c8c79ac79d7073"
dependencies = [
"quote",
"rand",
"rustc_version 0.4.0",
"rustc_version",
"syn 1.0.109",
]
@@ -7983,22 +7984,13 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc_version"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0dfe2087c51c460008730de8b57e6a320782fbfb312e1f4d520e6c6fae155ee"
dependencies = [
"semver 0.11.0",
]
[[package]]
name = "rustc_version"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
dependencies = [
"semver 1.0.18",
"semver",
]
[[package]]
@@ -8031,11 +8023,11 @@ dependencies = [
[[package]]
name = "rustix"
version = "0.38.4"
version = "0.38.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a962918ea88d644592894bc6dc55acc6c0956488adcebbfb6e273506b7fd6e5"
checksum = "19ed4fa021d81c8392ce04db050a3da9a60299050b7ae1cf482d862b54a7218f"
dependencies = [
"bitflags 2.3.3",
"bitflags 2.4.0",
"errno 0.3.2",
"libc",
"linux-raw-sys 0.4.5",
@@ -8056,13 +8048,13 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.21.5"
version = "0.21.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79ea77c539259495ce8ca47f53e66ae0330a8819f67e23ac96ca02f50e7b7d36"
checksum = "1d1feddffcfcc0b33f5c6ce9a29e341e4cd59c3f78e7ee45f4a40c038b1d6cbb"
dependencies = [
"log",
"ring",
"rustls-webpki 0.101.2",
"rustls-webpki 0.101.3",
"sct",
]
@@ -8099,9 +8091,9 @@ dependencies = [
[[package]]
name = "rustls-webpki"
version = "0.101.2"
version = "0.101.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "513722fd73ad80a71f72b61009ea1b584bcfa1483ca93949c8f290298837fa59"
checksum = "261e9e0888cba427c3316e6322805653c9425240b6fd96cee7cb671ab70ab8d0"
dependencies = [
"ring",
"untrusted",
@@ -8359,7 +8351,7 @@ dependencies = [
"paste",
"rand",
"result-like",
"rustc_version 0.4.0",
"rustc_version",
"rustpython-ast",
"rustpython-codegen",
"rustpython-common",
@@ -8676,15 +8668,6 @@ dependencies = [
"libc",
]
[[package]]
name = "semver"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f301af10236f6df4160f7c3f04eec6dbc70ace82d23326abad5edee88801c6b6"
dependencies = [
"semver-parser",
]
[[package]]
name = "semver"
version = "1.0.18"
@@ -8694,15 +8677,6 @@ dependencies = [
"serde",
]
[[package]]
name = "semver-parser"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0bef5b7f9e0df16536d3961cfb6e84331c065b4066afb39768d0e319411f7"
dependencies = [
"pest",
]
[[package]]
name = "seq-macro"
version = "0.3.5"
@@ -8711,9 +8685,9 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4"
[[package]]
name = "serde"
version = "1.0.180"
version = "1.0.183"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ea67f183f058fe88a4e3ec6e2788e003840893b91bac4559cabedd00863b3ed"
checksum = "32ac8da02677876d532745a130fc9d8e6edfa81a269b107c5b00829b91d8eb3c"
dependencies = [
"serde_derive",
]
@@ -8730,9 +8704,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.180"
version = "1.0.183"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24e744d7782b686ab3b73267ef05697159cc0e5abbed3f47f9933165e5219036"
checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816"
dependencies = [
"proc-macro2",
"quote",
@@ -8904,7 +8878,7 @@ dependencies = [
"rand",
"regex",
"rust-embed",
"rustls 0.21.5",
"rustls 0.21.6",
"rustls-pemfile",
"schemars",
"script",
@@ -9065,7 +9039,7 @@ dependencies = [
"num-bigint",
"num-traits",
"thiserror",
"time 0.3.24",
"time 0.3.25",
]
[[package]]
@@ -9205,7 +9179,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d1e996ef02c474957d681f1b05213dfb0abab947b446a62d37770b23500184a"
dependencies = [
"base64ct",
"der 0.7.7",
"der 0.7.8",
]
[[package]]
@@ -9619,7 +9593,7 @@ version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125"
dependencies = [
"strum_macros 0.25.1",
"strum_macros 0.25.2",
]
[[package]]
@@ -9649,9 +9623,9 @@ dependencies = [
[[package]]
name = "strum_macros"
version = "0.25.1"
version = "0.25.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6069ca09d878a33f883cc06aaa9718ede171841d3832450354410b718b097232"
checksum = "ad8d03b598d3d0fff69bf533ee3ef19b8eeb342729596df84bcc7e1f96ec4059"
dependencies = [
"heck 0.4.1",
"proc-macro2",
@@ -9691,7 +9665,7 @@ dependencies = [
"prost",
"session",
"snafu",
"substrait 0.12.3",
"substrait 0.12.4",
"table",
"tokio",
]
@@ -9709,7 +9683,7 @@ dependencies = [
"prost-build",
"prost-types",
"schemars",
"semver 1.0.18",
"semver",
"serde",
"serde_json",
"serde_yaml",
@@ -9720,9 +9694,9 @@ dependencies = [
[[package]]
name = "substrait"
version = "0.12.3"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ac1ce8315086b127ca0abf162c62279550942bb26ebf7946fe17fe114446472"
checksum = "658f6cbbd29a250869b87e1bb5a4b42db534cacfc1c03284f2536cd36b6c1617"
dependencies = [
"git2",
"heck 0.4.1",
@@ -9731,7 +9705,7 @@ dependencies = [
"prost-build",
"prost-types",
"schemars",
"semver 1.0.18",
"semver",
"serde",
"serde_json",
"serde_yaml",
@@ -9915,14 +9889,14 @@ dependencies = [
[[package]]
name = "tempfile"
version = "3.7.0"
version = "3.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5486094ee78b2e5038a6382ed7645bc084dc2ec433426ca4c3cb61e2007b8998"
checksum = "dc02fddf48964c42031a0b3fe0428320ecf3a73c401040fc0096f97794310651"
dependencies = [
"cfg-if 1.0.0",
"fastrand 2.0.0",
"redox_syscall 0.3.5",
"rustix 0.38.4",
"rustix 0.38.8",
"windows-sys 0.48.0",
]
@@ -10156,9 +10130,9 @@ dependencies = [
[[package]]
name = "time"
version = "0.3.24"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b79eabcd964882a646b3584543ccabeae7869e9ac32a46f6f22b7a5bd405308b"
checksum = "b0fdd63d58b18d663fbdf70e049f00a22c8e42be082203be7f26589213cd75ea"
dependencies = [
"deranged",
"itoa",
@@ -10226,11 +10200,10 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.29.1"
version = "1.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "532826ff75199d5833b9d2c5fe410f29235e25704ee5f0ef599fb51c21f4a4da"
checksum = "40de3a2ba249dcb097e01be5e67a5ff53cf250397715a071a81543e8a832a920"
dependencies = [
"autocfg",
"backtrace",
"bytes",
"libc",
@@ -10239,7 +10212,7 @@ dependencies = [
"parking_lot 0.12.1",
"pin-project-lite",
"signal-hook-registry",
"socket2 0.4.9",
"socket2 0.5.3",
"tokio-macros",
"tracing",
"windows-sys 0.48.0",
@@ -10298,7 +10271,7 @@ checksum = "dd5831152cb0d3f79ef5523b357319ba154795d64c7078b2daa95a803b54057f"
dependencies = [
"futures",
"ring",
"rustls 0.21.5",
"rustls 0.21.6",
"tokio",
"tokio-postgres",
"tokio-rustls 0.24.1",
@@ -10321,7 +10294,7 @@ version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081"
dependencies = [
"rustls 0.21.5",
"rustls 0.21.6",
"tokio",
]
@@ -10579,7 +10552,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09d48f71a791638519505cefafe162606f706c25592e4bde4d97600c0195312e"
dependencies = [
"crossbeam-channel",
"time 0.3.24",
"time 0.3.25",
"tracing-subscriber",
]
@@ -10605,7 +10578,7 @@ dependencies = [
"log",
"serde",
"serde_json",
"time 0.3.24",
"time 0.3.25",
"tracing",
"tracing-core",
"tracing-log",
@@ -11142,7 +11115,7 @@ dependencies = [
"getset",
"rustversion",
"thiserror",
"time 0.3.24",
"time 0.3.25",
]
[[package]]
@@ -11153,12 +11126,12 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "vob"
version = "3.0.2"
version = "3.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cbdb3eee5dd38a27129832bca4a3171888e699a6ac36de86547975466997986f"
checksum = "c058f4c41e71a043c67744cb76dcc1ae63ece328c1732a72489ccccc2dec23e6"
dependencies = [
"num-traits",
"rustc_version 0.3.3",
"rustc_version",
"serde",
]
@@ -11591,9 +11564,9 @@ checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a"
[[package]]
name = "winnow"
version = "0.5.2"
version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bd122eb777186e60c3fdf765a58ac76e41c582f1f535fbf3314434c6b58f3f7"
checksum = "5504cc7644f4b593cbc05c4a55bf9bd4e94b867c3c0bd440934174d50482427d"
dependencies = [
"memchr",
]
@@ -11625,7 +11598,7 @@ dependencies = [
"bcder",
"bytes",
"chrono",
"der 0.7.7",
"der 0.7.8",
"hex",
"pem 2.0.1",
"ring",

View File

@@ -77,7 +77,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git
derive_builder = "0.12"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "9b68af55c050a010f202fcccb22d58f080f0a868" }
greptime-proto = { git = "https://github.com/NiwakaDev/greptime-proto.git", rev = "ec402b6500f908a0acfab6c889225cd4dc2228a4" }
itertools = "0.10"
lazy_static = "1.4"
once_cell = "1.18"

View File

@@ -300,9 +300,9 @@ pub fn request_type(request: &Request) -> &'static str {
Request::Inserts(_) => "inserts",
Request::Query(query_req) => query_request_type(query_req),
Request::Ddl(ddl_req) => ddl_request_type(ddl_req),
Request::Delete(_) => "delete",
Request::RowInserts(_) => "row_inserts",
Request::RowDelete(_) => "row_delete",
Request::Deletes(_) => "deletes",
}
}

View File

@@ -17,7 +17,7 @@ use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::{
AlterExpr, AuthHeader, CompactTableExpr, CreateTableExpr, DdlRequest, DeleteRequest,
AlterExpr, AuthHeader, CompactTableExpr, CreateTableExpr, DdlRequest, DeleteRequests,
DropTableExpr, FlushTableExpr, GreptimeRequest, InsertRequests, PromRangeQuery, QueryRequest,
RequestHeader, TruncateTableExpr,
};
@@ -132,9 +132,9 @@ impl Database {
Ok(stream_inserter)
}
pub async fn delete(&self, request: DeleteRequest) -> Result<u32> {
pub async fn delete(&self, request: DeleteRequests) -> Result<u32> {
let _timer = timer!(metrics::METRIC_GRPC_DELETE);
self.handle(Request::Delete(request)).await
self.handle(Request::Deletes(request)).await
}
async fn handle(&self, request: Request) -> Result<u32> {

View File

@@ -23,7 +23,11 @@ use table::requests::DeleteRequest;
use crate::error::{ColumnDataTypeSnafu, IllegalDeleteRequestSnafu, Result};
use crate::insert::add_values_to_builder;
pub fn to_table_delete_request(request: GrpcDeleteRequest) -> Result<DeleteRequest> {
pub fn to_table_delete_request(
catalog_name: &str,
schema_name: &str,
request: GrpcDeleteRequest,
) -> Result<DeleteRequest> {
let row_count = request.row_count as usize;
let mut key_column_values = HashMap::with_capacity(request.key_columns.len());
@@ -52,7 +56,12 @@ pub fn to_table_delete_request(request: GrpcDeleteRequest) -> Result<DeleteReque
);
}
Ok(DeleteRequest { key_column_values })
Ok(DeleteRequest {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: request.table_name,
key_column_values,
})
}
#[cfg(test)]
@@ -94,8 +103,12 @@ mod tests {
row_count: 3,
};
let mut request = to_table_delete_request(grpc_request).unwrap();
let mut request =
to_table_delete_request("foo_catalog", "foo_schema", grpc_request).unwrap();
assert_eq!(request.catalog_name, "foo_catalog");
assert_eq!(request.schema_name, "foo_schema");
assert_eq!(request.table_name, "foo");
assert_eq!(
Arc::new(Int32Vector::from_slice(vec![1, 2, 3])) as VectorRef,
request.key_column_values.remove("id").unwrap()

View File

@@ -18,7 +18,7 @@ use std::sync::Arc;
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::{CreateDatabaseExpr, DdlRequest, DeleteRequest, InsertRequests};
use api::v1::{CreateDatabaseExpr, DdlRequest, DeleteRequests, InsertRequests};
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_grpc_expr::insert::to_table_insert_request;
@@ -164,27 +164,38 @@ impl Instance {
Ok(Output::AffectedRows(affected_rows))
}
async fn handle_delete(&self, request: DeleteRequest, ctx: QueryContextRef) -> Result<Output> {
let catalog = ctx.current_catalog();
let schema = ctx.current_schema();
let table_name = &request.table_name.clone();
let table_ref = TableReference::full(catalog, schema, table_name);
async fn handle_deletes(
&self,
request: DeleteRequests,
ctx: QueryContextRef,
) -> Result<Output> {
let results = future::try_join_all(request.deletes.into_iter().map(|delete| {
let catalog_manager = self.catalog_manager.clone();
let catalog = ctx.current_catalog().to_string();
let schema = ctx.current_schema().to_string();
common_runtime::spawn_write(async move {
let table_name = delete.table_name.clone();
let table_ref = TableReference::full(&catalog, &schema, &table_name);
let table = catalog_manager
.table(&catalog, &schema, &table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
let table = self
.catalog_manager
.table(catalog, schema, table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
let request =
common_grpc_expr::delete::to_table_delete_request(&catalog, &schema, delete)
.context(DeleteExprToRequestSnafu)?;
let request = common_grpc_expr::delete::to_table_delete_request(request)
.context(DeleteExprToRequestSnafu)?;
let affected_rows = table.delete(request).await.with_context(|_| DeleteSnafu {
table_name: table_ref.to_string(),
})?;
table.delete(request).await.with_context(|_| DeleteSnafu {
table_name: table_ref.to_string(),
})
})
}))
.await
.context(JoinTaskSnafu)?;
let affected_rows = results.into_iter().sum::<Result<usize>>()?;
Ok(Output::AffectedRows(affected_rows))
}
@@ -211,7 +222,7 @@ impl GrpcQueryHandler for Instance {
async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result<Output> {
match request {
Request::Inserts(requests) => self.handle_inserts(requests, &ctx).await,
Request::Delete(request) => self.handle_delete(request, ctx).await,
Request::Deletes(request) => self.handle_deletes(request, ctx).await,
Request::Query(query_request) => {
let query = query_request
.query
@@ -310,8 +321,8 @@ mod test {
use api::v1::column::Values;
use api::v1::{
alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef,
CreateDatabaseExpr, CreateTableExpr, DropTableExpr, InsertRequest, InsertRequests,
QueryRequest, RenameTable, SemanticType, TableId, TruncateTableExpr,
CreateDatabaseExpr, CreateTableExpr, DeleteRequest, DropTableExpr, InsertRequest,
InsertRequests, QueryRequest, RenameTable, SemanticType, TableId, TruncateTableExpr,
};
use common_catalog::consts::MITO_ENGINE;
use common_error::ext::ErrorExt;
@@ -903,7 +914,7 @@ mod test {
let output = instance.do_query(query, QueryContext::arc()).await.unwrap();
assert!(matches!(output, Output::AffectedRows(3)));
let request = DeleteRequest {
let request1 = DeleteRequest {
table_name: "demo".to_string(),
region_number: 0,
key_columns: vec![
@@ -928,13 +939,39 @@ mod test {
],
row_count: 1,
};
let request = Request::Delete(request);
let request2 = DeleteRequest {
table_name: "demo".to_string(),
region_number: 0,
key_columns: vec![
Column {
column_name: "host".to_string(),
values: Some(Values {
string_values: vec!["host3".to_string()],
..Default::default()
}),
datatype: ColumnDataType::String as i32,
..Default::default()
},
Column {
column_name: "ts".to_string(),
values: Some(Values {
ts_millisecond_values: vec![1672201026000],
..Default::default()
}),
datatype: ColumnDataType::TimestampMillisecond as i32,
..Default::default()
},
],
row_count: 1,
};
let request = Request::Deletes(DeleteRequests {
deletes: vec![request1, request2],
});
let output = instance
.do_query(request, QueryContext::arc())
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(1)));
assert!(matches!(output, Output::AffectedRows(2)));
let output = exec_selection(instance, "SELECT ts, host, cpu FROM demo").await;
let Output::Stream(stream) = output else {
@@ -946,7 +983,6 @@ mod test {
| ts | host | cpu |
+---------------------+-------+------+
| 2022-12-28T04:17:05 | host1 | 66.6 |
| 2022-12-28T04:17:06 | host3 | 88.8 |
+---------------------+-------+------+";
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
}

View File

@@ -209,6 +209,12 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to split delete request, source: {}", source))]
SplitDelete {
source: partition::error::Error,
location: Location,
},
#[snafu(display("Failed to create table info, source: {}", source))]
CreateTableInfo {
#[snafu(backtrace)]
@@ -409,6 +415,12 @@ pub enum Error {
source: table::error::Error,
},
#[snafu(display("Missing time index column: {}", source))]
MissingTimeIndexColumn {
location: Location,
source: table::error::Error,
},
#[snafu(display("Failed to start script manager, source: {}", source))]
StartScriptManager {
#[snafu(backtrace)]
@@ -644,6 +656,8 @@ impl ErrorExt for Error {
source.status_code()
}
Error::MissingTimeIndexColumn { source, .. } => source.status_code(),
Error::FindDatanode { .. }
| Error::CreateTableRoute { .. }
| Error::FindRegionRoute { .. }
@@ -693,7 +707,8 @@ impl ErrorExt for Error {
Error::DeserializePartition { source, .. }
| Error::FindTablePartitionRule { source, .. }
| Error::FindTableRoute { source, .. }
| Error::SplitInsert { source, .. } => source.status_code(),
| Error::SplitInsert { source, .. }
| Error::SplitDelete { source, .. } => source.status_code(),
Error::UnrecognizedTableOption { .. } => StatusCode::InvalidArguments,

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod deleter;
pub(crate) mod inserter;
use std::collections::HashMap;
@@ -21,7 +22,7 @@ use api::helper::ColumnDataTypeWrapper;
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::{
column_def, AlterExpr, CompactTableExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequest,
column_def, AlterExpr, CompactTableExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequests,
FlushTableExpr, InsertRequests, TruncateTableExpr,
};
use async_trait::async_trait;
@@ -56,7 +57,6 @@ use sql::statements::create::{PartitionEntry, Partitions};
use sql::statements::statement::Statement;
use sql::statements::{self, sql_value_to_value};
use store_api::storage::RegionNumber;
use table::engine::TableReference;
use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableInfo, TableType};
use table::requests::{AlterTableRequest, TableOptions};
use table::TableRef;
@@ -66,9 +66,10 @@ use crate::error::{
self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu,
DeserializePartitionSnafu, InvokeDatanodeSnafu, NotSupportedSnafu, ParseSqlSnafu,
RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaExistsSnafu, TableAlreadyExistSnafu,
TableNotFoundSnafu, TableSnafu, ToTableDeleteRequestSnafu, UnrecognizedTableOptionSnafu,
TableNotFoundSnafu, TableSnafu, UnrecognizedTableOptionSnafu,
};
use crate::expr_factory;
use crate::instance::distributed::deleter::DistDeleter;
use crate::instance::distributed::inserter::DistInserter;
use crate::table::DistTable;
@@ -626,27 +627,15 @@ impl DistInstance {
async fn handle_dist_delete(
&self,
request: DeleteRequest,
request: DeleteRequests,
ctx: QueryContextRef,
) -> Result<Output> {
let catalog = ctx.current_catalog();
let schema = ctx.current_schema();
let table_name = &request.table_name;
let table_ref = TableReference::full(catalog, schema, table_name);
let table = self
.catalog_manager
.table(catalog, schema, table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
let request = common_grpc_expr::delete::to_table_delete_request(request)
.context(ToTableDeleteRequestSnafu)?;
let affected_rows = table.delete(request).await.context(TableSnafu)?;
let deleter = DistDeleter::new(
ctx.current_catalog().to_string(),
ctx.current_schema().to_string(),
self.catalog_manager(),
);
let affected_rows = deleter.grpc_delete(request).await?;
Ok(Output::AffectedRows(affected_rows))
}
@@ -676,11 +665,11 @@ impl GrpcQueryHandler for DistInstance {
async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result<Output> {
match request {
Request::Inserts(requests) => self.handle_dist_insert(requests, ctx).await,
Request::Delete(request) => self.handle_dist_delete(request, ctx).await,
Request::RowInserts(_) | Request::RowDelete(_) => NotSupportedSnafu {
feat: "row insert/delete",
}
.fail(),
Request::Deletes(requests) => self.handle_dist_delete(requests, ctx).await,
Request::Query(_) => {
unreachable!("Query should have been handled directly in Frontend Instance!")
}

View File

@@ -0,0 +1,386 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::iter;
use std::sync::Arc;
use api::v1::DeleteRequests;
use catalog::CatalogManager;
use client::Database;
use common_grpc_expr::delete::to_table_delete_request;
use common_meta::peer::Peer;
use common_meta::table_name::TableName;
use futures::future;
use snafu::{OptionExt, ResultExt};
use table::requests::DeleteRequest;
use crate::catalog::FrontendCatalogManager;
use crate::error::{
CatalogSnafu, FindDatanodeSnafu, FindTableRouteSnafu, JoinTaskSnafu,
MissingTimeIndexColumnSnafu, RequestDatanodeSnafu, Result, SplitDeleteSnafu,
TableNotFoundSnafu, ToTableDeleteRequestSnafu,
};
use crate::table::delete::to_grpc_delete_request;
/// A distributed deleter. It ingests GRPC [DeleteRequests] or table [DeleteRequest] (so it can be
/// used in protocol handlers or table deletion API).
///
/// Table data partitioning and Datanode requests batching are handled inside.
///
/// Note that the deleter is confined to a single catalog and schema. I.e., it cannot handle
/// multiple deletes requests with different catalog or schema (will throw "NotSupported" error).
/// This is because we currently do not have this kind of requirements. Let's keep it simple for now.
pub(crate) struct DistDeleter {
catalog: String,
schema: String,
catalog_manager: Arc<FrontendCatalogManager>,
}
impl DistDeleter {
pub(crate) fn new(
catalog: String,
schema: String,
catalog_manager: Arc<FrontendCatalogManager>,
) -> Self {
Self {
catalog,
schema,
catalog_manager,
}
}
pub async fn grpc_delete(&self, requests: DeleteRequests) -> Result<usize> {
let deletes = requests
.deletes
.into_iter()
.map(|delete| {
to_table_delete_request(&self.catalog, &self.schema, delete)
.context(ToTableDeleteRequestSnafu)
})
.collect::<Result<Vec<_>>>()?;
self.delete(deletes).await
}
pub(crate) async fn delete(&self, requests: Vec<DeleteRequest>) -> Result<usize> {
debug_assert!(requests
.iter()
.all(|x| x.catalog_name == self.catalog && x.schema_name == self.schema));
let deletes = self.split_deletes(requests).await?;
self.request_datanodes(deletes).await
}
async fn split_deletes(
&self,
requests: Vec<DeleteRequest>,
) -> Result<HashMap<Peer, DeleteRequests>> {
let partition_manager = self.catalog_manager.partition_manager();
let mut deletes = HashMap::new();
for request in requests {
let table_name = &request.table_name;
let table = self
.catalog_manager
.table(&self.catalog, &self.schema, table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_name.to_string(),
})?;
let table_info = table.table_info();
let table_meta = &table_info.meta;
let table_id = table_info.table_id();
let table_name = &request.table_name;
let schema = table.schema();
let time_index = &schema
.timestamp_column()
.with_context(|| table::error::MissingTimeIndexColumnSnafu {
table_name: table_name.to_string(),
})
.context(MissingTimeIndexColumnSnafu)?
.name;
let primary_key_column_names = table_info
.meta
.row_key_column_names()
.chain(iter::once(time_index))
.collect::<Vec<_>>();
let table_name = request.table_name.clone();
let split = partition_manager
.split_delete_request(table_id, request, primary_key_column_names)
.await
.context(SplitDeleteSnafu)?;
let table_route = partition_manager
.find_table_route(table_id)
.await
.with_context(|_| FindTableRouteSnafu {
table_name: table_name.to_string(),
})?;
for (region_number, delete) in split {
let datanode =
table_route
.find_region_leader(region_number)
.context(FindDatanodeSnafu {
region: region_number,
})?;
let table_name = TableName::new(&self.catalog, &self.schema, &table_name);
let delete =
to_grpc_delete_request(table_meta, &table_name, region_number, delete)?;
deletes
.entry(datanode.clone())
.or_insert_with(|| DeleteRequests { deletes: vec![] })
.deletes
.push(delete);
}
}
Ok(deletes)
}
async fn request_datanodes(&self, deletes: HashMap<Peer, DeleteRequests>) -> Result<usize> {
let results = future::try_join_all(deletes.into_iter().map(|(peer, deletes)| {
let datanode_clients = self.catalog_manager.datanode_clients();
let catalog = self.catalog.clone();
let schema = self.schema.clone();
common_runtime::spawn_write(async move {
let client = datanode_clients.get_client(&peer).await;
let database = Database::new(&catalog, &schema, client);
database.delete(deletes).await.context(RequestDatanodeSnafu)
})
}))
.await
.context(JoinTaskSnafu)?;
let affected_rows = results.into_iter().sum::<Result<u32>>()?;
Ok(affected_rows as usize)
}
}
#[cfg(test)]
mod tests {
use api::v1::column::Values;
use api::v1::{Column, ColumnDataType, DeleteRequest as GrpcDeleteRequest, SemanticType};
use client::client_manager::DatanodeClients;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::helper::{CatalogValue, SchemaValue};
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::table_region::RegionDistribution;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackend, KvBackendRef};
use common_meta::rpc::store::PutRequest;
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema};
use datatypes::vectors::Int32Vector;
use table::metadata::{RawTableInfo, TableInfoBuilder, TableMetaBuilder};
use super::*;
use crate::heartbeat::handler::tests::MockKvCacheInvalidator;
use crate::table::test::create_partition_rule_manager;
async fn prepare_mocked_backend() -> KvBackendRef {
let backend = Arc::new(MemoryKvBackend::default());
let default_catalog = CatalogNameKey {
catalog: DEFAULT_CATALOG_NAME,
}
.to_string();
let req = PutRequest::new()
.with_key(default_catalog.as_bytes())
.with_value(CatalogValue.as_bytes().unwrap());
backend.put(req).await.unwrap();
let default_schema = SchemaNameKey {
catalog: DEFAULT_CATALOG_NAME,
schema: DEFAULT_SCHEMA_NAME,
}
.to_string();
let req = PutRequest::new()
.with_key(default_schema.as_bytes())
.with_value(SchemaValue.as_bytes().unwrap());
backend.put(req).await.unwrap();
backend
}
async fn create_testing_table(
table_name: &str,
table_metadata_manager: &TableMetadataManagerRef,
) {
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false)
.with_time_index(true)
.with_default_constraint(Some(ColumnDefaultConstraint::Function(
"current_timestamp()".to_string(),
)))
.unwrap(),
ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new("value", ConcreteDataType::int32_datatype(), false),
]));
let table_meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![1])
.next_column_id(1)
.build()
.unwrap();
let table_id = 1;
let table_info: RawTableInfo = TableInfoBuilder::new(table_name, table_meta)
.table_id(table_id)
.build()
.unwrap()
.into();
let key = TableNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
assert!(table_metadata_manager
.table_name_manager()
.create(&key, table_id)
.await
.is_ok());
assert!(table_metadata_manager
.table_info_manager()
.compare_and_put(table_id, None, table_info)
.await
.is_ok());
let _ = table_metadata_manager
.table_region_manager()
.compare_and_put(
1,
None,
RegionDistribution::from([(1, vec![1]), (2, vec![2]), (3, vec![3])]),
)
.await
.unwrap();
}
#[tokio::test]
async fn test_split_deletes() {
let backend = prepare_mocked_backend().await;
let table_metadata_manager = Arc::new(TableMetadataManager::new(backend.clone()));
let table_name = "one_column_partitioning_table";
create_testing_table(table_name, &table_metadata_manager).await;
let catalog_manager = Arc::new(FrontendCatalogManager::new(
backend,
Arc::new(MockKvCacheInvalidator::default()),
create_partition_rule_manager().await,
Arc::new(DatanodeClients::default()),
table_metadata_manager,
));
let new_delete_request = |vector: VectorRef| -> DeleteRequest {
DeleteRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
key_column_values: HashMap::from([("a".to_string(), vector)]),
}
};
let requests = vec![
new_delete_request(Arc::new(Int32Vector::from(vec![
Some(1),
Some(11),
Some(50),
]))),
new_delete_request(Arc::new(Int32Vector::from(vec![
Some(2),
Some(12),
Some(102),
]))),
];
let deleter = DistDeleter::new(
DEFAULT_CATALOG_NAME.to_string(),
DEFAULT_SCHEMA_NAME.to_string(),
catalog_manager,
);
let mut deletes = deleter.split_deletes(requests).await.unwrap();
assert_eq!(deletes.len(), 3);
let new_grpc_delete_request = |column_values: Vec<i32>,
null_mask: Vec<u8>,
row_count: u32,
region_number: u32|
-> GrpcDeleteRequest {
GrpcDeleteRequest {
table_name: table_name.to_string(),
key_columns: vec![Column {
column_name: "a".to_string(),
semantic_type: SemanticType::Tag as i32,
values: Some(Values {
i32_values: column_values,
..Default::default()
}),
null_mask,
datatype: ColumnDataType::Int32 as i32,
}],
row_count,
region_number,
}
};
// region to datanode placement:
// 1 -> 1
// 2 -> 2
// 3 -> 3
//
// region value ranges:
// 1 -> [50, max)
// 2 -> [10, 50)
// 3 -> (min, 10)
let datanode_deletes = deletes.remove(&Peer::new(1, "")).unwrap().deletes;
assert_eq!(datanode_deletes.len(), 2);
assert_eq!(
datanode_deletes[0],
new_grpc_delete_request(vec![50], vec![0], 1, 1)
);
assert_eq!(
datanode_deletes[1],
new_grpc_delete_request(vec![102], vec![0], 1, 1)
);
let datanode_deletes = deletes.remove(&Peer::new(2, "")).unwrap().deletes;
assert_eq!(datanode_deletes.len(), 2);
assert_eq!(
datanode_deletes[0],
new_grpc_delete_request(vec![11], vec![0], 1, 2)
);
assert_eq!(
datanode_deletes[1],
new_grpc_delete_request(vec![12], vec![0], 1, 2)
);
let datanode_deletes = deletes.remove(&Peer::new(3, "")).unwrap().deletes;
assert_eq!(datanode_deletes.len(), 2);
assert_eq!(
datanode_deletes[0],
new_grpc_delete_request(vec![1], vec![0], 1, 3)
);
assert_eq!(
datanode_deletes[1],
new_grpc_delete_request(vec![2], vec![0], 1, 3)
);
}
}

View File

@@ -312,6 +312,7 @@ mod tests {
];
let mut inserts = inserter.split_inserts(requests).await.unwrap();
assert_eq!(inserts.len(), 3);
let new_grpc_insert_request = |column_values: Vec<i32>,

View File

@@ -90,7 +90,7 @@ impl GrpcQueryHandler for Instance {
}
}
}
Request::Ddl(_) | Request::Delete(_) => {
Request::Ddl(_) | Request::Deletes(_) => {
GrpcQueryHandler::do_query(self.grpc_query_handler.as_ref(), request, ctx.clone())
.await?
}

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use std::any::Any;
use std::iter;
use std::pin::Pin;
use std::sync::Arc;
@@ -24,7 +23,6 @@ use common_meta::table_name::TableName;
use common_query::error::Result as QueryResult;
use common_query::logical_plan::Expr;
use common_query::physical_plan::{PhysicalPlan, PhysicalPlanRef};
use common_query::Output;
use common_recordbatch::adapter::AsyncRecordBatchStreamAdapter;
use common_recordbatch::error::{
InitRecordbatchStreamSnafu, PollStreamSnafu, Result as RecordBatchResult,
@@ -39,9 +37,8 @@ use datafusion::physical_plan::{
use datafusion_common::DataFusionError;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use futures_util::{Stream, StreamExt};
use partition::splitter::WriteSplitter;
use snafu::prelude::*;
use store_api::storage::{RegionNumber, ScanRequest};
use store_api::storage::ScanRequest;
use table::error::TableOperationSnafu;
use table::metadata::{FilterPushDownType, TableInfoRef, TableType};
use table::requests::{DeleteRequest, InsertRequest};
@@ -49,12 +46,12 @@ use table::Table;
use tokio::sync::RwLock;
use crate::catalog::FrontendCatalogManager;
use crate::error::{FindDatanodeSnafu, FindTableRouteSnafu, Result};
use crate::error::Result;
use crate::instance::distributed::deleter::DistDeleter;
use crate::instance::distributed::inserter::DistInserter;
use crate::table::delete::to_grpc_delete_request;
use crate::table::scan::{DatanodeInstance, TableScanPlan};
mod delete;
pub mod delete;
pub mod insert;
pub(crate) mod scan;
@@ -174,58 +171,17 @@ impl Table for DistTable {
}
async fn delete(&self, request: DeleteRequest) -> table::Result<usize> {
let partition_manager = self.catalog_manager.partition_manager();
let table_id = self.table_info.table_id();
let partition_rule = partition_manager
.find_table_partition_rule(table_id)
let deleter = DistDeleter::new(
request.catalog_name.clone(),
request.schema_name.clone(),
self.catalog_manager.clone(),
);
let affected_rows = deleter
.delete(vec![request])
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
let schema = self.schema();
let time_index = &schema
.timestamp_column()
.with_context(|| table::error::MissingTimeIndexColumnSnafu {
table_name: self.table_name.to_string(),
})?
.name;
let table_info = self.table_info();
let key_column_names = table_info
.meta
.row_key_column_names()
.chain(iter::once(time_index))
.collect::<Vec<_>>();
let requests = WriteSplitter::with_partition_rule(partition_rule)
.split_delete(request, key_column_names)
.map_err(BoxedError::new)
.and_then(|requests| {
requests
.into_iter()
.map(|(region_number, request)| {
to_grpc_delete_request(
&table_info.meta,
&self.table_name,
region_number,
request,
)
})
.collect::<Result<Vec<_>>>()
.map_err(BoxedError::new)
})
.context(TableOperationSnafu)?;
let output = self
.dist_delete(requests)
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
let Output::AffectedRows(rows) = output else {
unreachable!()
};
Ok(rows)
Ok(affected_rows)
}
}
@@ -241,39 +197,6 @@ impl DistTable {
catalog_manager,
}
}
async fn find_datanode_instances(
&self,
regions: &[RegionNumber],
) -> Result<Vec<DatanodeInstance>> {
let table_name = &self.table_name;
let route = self
.catalog_manager
.partition_manager()
.find_table_route(self.table_info.table_id())
.await
.with_context(|_| FindTableRouteSnafu {
table_name: table_name.to_string(),
})?;
let datanodes = regions
.iter()
.map(|&n| {
route
.find_region_leader(n)
.context(FindDatanodeSnafu { region: n })
})
.collect::<Result<Vec<_>>>()?;
let datanode_clients = self.catalog_manager.datanode_clients();
let mut instances = Vec::with_capacity(datanodes.len());
for datanode in datanodes {
let client = datanode_clients.get_client(datanode).await;
let db = Database::new(&table_name.catalog_name, &table_name.schema_name, client);
instances.push(DatanodeInstance::new(Arc::new(self.clone()) as _, db));
}
Ok(instances)
}
}
fn project_schema(table_schema: SchemaRef, projection: Option<&Vec<usize>>) -> SchemaRef {

View File

@@ -14,41 +14,14 @@
use api::v1::DeleteRequest as GrpcDeleteRequest;
use common_meta::table_name::TableName;
use common_query::Output;
use futures::future;
use snafu::ResultExt;
use store_api::storage::RegionNumber;
use table::metadata::TableMeta;
use table::requests::DeleteRequest;
use crate::error::{JoinTaskSnafu, RequestDatanodeSnafu, Result};
use crate::error::Result;
use crate::table::insert::to_grpc_columns;
use crate::table::DistTable;
impl DistTable {
pub(super) async fn dist_delete(&self, requests: Vec<GrpcDeleteRequest>) -> Result<Output> {
let regions = requests.iter().map(|x| x.region_number).collect::<Vec<_>>();
let instances = self.find_datanode_instances(&regions).await?;
let results = future::try_join_all(instances.into_iter().zip(requests).map(
|(instance, request)| {
common_runtime::spawn_write(async move {
instance
.grpc_delete(request)
.await
.context(RequestDatanodeSnafu)
})
},
))
.await
.context(JoinTaskSnafu)?;
let affected_rows = results.into_iter().sum::<Result<u32>>()?;
Ok(Output::AffectedRows(affected_rows as _))
}
}
pub(super) fn to_grpc_delete_request(
pub fn to_grpc_delete_request(
table_meta: &TableMeta,
table_name: &TableName,
region_number: RegionNumber,
@@ -103,7 +76,12 @@ mod tests {
"id".to_string(),
Arc::new(Int32Vector::from_slice(vec![1, 2, 3])) as VectorRef,
)]);
let request = DeleteRequest { key_column_values };
let request = DeleteRequest {
catalog_name: table_name.catalog_name.to_string(),
schema_name: table_name.schema_name.to_string(),
table_name: table_name.table_name.to_string(),
key_column_values,
};
let result =
to_grpc_delete_request(&table_meta, &table_name, region_number, request).unwrap();

View File

@@ -15,7 +15,6 @@
use std::fmt::Formatter;
use std::sync::Arc;
use api::v1::DeleteRequest;
use client::Database;
use common_meta::table_name::TableName;
use common_query::prelude::Expr;
@@ -47,10 +46,6 @@ impl DatanodeInstance {
Self { table, db }
}
pub(crate) async fn grpc_delete(&self, request: DeleteRequest) -> client::Result<u32> {
self.db.delete(request).await
}
pub(crate) async fn grpc_table_scan(&self, plan: TableScanPlan) -> Result<RecordBatches> {
let logical_plan = self.build_logical_plan(&plan)?;

View File

@@ -843,7 +843,12 @@ async fn test_table_delete_rows() {
let del_tss: VectorRef = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2]));
let key_column_values =
HashMap::from([("host".to_string(), del_hosts), ("ts".to_string(), del_tss)]);
let del_req = DeleteRequest { key_column_values };
let del_req = DeleteRequest {
catalog_name: "foo_catalog".to_string(),
schema_name: "foo_schema".to_string(),
table_name: "demo".to_string(),
key_column_values,
};
let _ = table.delete(del_req).await.unwrap();
let stream = table.scan_to_stream(ScanRequest::default()).await.unwrap();

View File

@@ -278,7 +278,6 @@ impl<R: Region> Table for MitoTable<R> {
let key_column_values = request.key_column_values.clone();
// Safety: key_column_values isn't empty.
let rows_num = key_column_values.values().next().unwrap().len();
logging::trace!(
"Delete from table {} where key_columns are: {:?}",
self.table_info().name,

View File

@@ -24,14 +24,14 @@ use datatypes::schema::Schema;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
use table::requests::InsertRequest;
use table::requests::{DeleteRequest, InsertRequest};
use crate::columns::RangeColumnsPartitionRule;
use crate::error::{FindLeaderSnafu, Result};
use crate::partition::{PartitionBound, PartitionDef, PartitionExpr};
use crate::range::RangePartitionRule;
use crate::route::TableRoutes;
use crate::splitter::{InsertRequestSplit, WriteSplitter};
use crate::splitter::{DeleteRequestSplit, InsertRequestSplit, WriteSplitter};
use crate::{error, PartitionRuleRef};
#[async_trait::async_trait]
@@ -242,10 +242,21 @@ impl PartitionRuleManager {
req: InsertRequest,
schema: &Schema,
) -> Result<InsertRequestSplit> {
let partition_rule = self.find_table_partition_rule(table).await.unwrap();
let partition_rule = self.find_table_partition_rule(table).await?;
let splitter = WriteSplitter::with_partition_rule(partition_rule);
splitter.split_insert(req, schema)
}
pub async fn split_delete_request(
&self,
table: TableId,
req: DeleteRequest,
primary_key_column_names: Vec<&String>,
) -> Result<DeleteRequestSplit> {
let partition_rule = self.find_table_partition_rule(table).await?;
let splitter = WriteSplitter::with_partition_rule(partition_rule);
splitter.split_delete(req, primary_key_column_names)
}
}
fn find_regions0(partition_rule: PartitionRuleRef, filter: &Expr) -> Result<HashSet<RegionNumber>> {

View File

@@ -121,7 +121,15 @@ impl WriteSplitter {
(column_name.to_string(), builder.to_vector())
})
.collect();
(region_id, DeleteRequest { key_column_values })
(
region_id,
DeleteRequest {
catalog_name: request.catalog_name.clone(),
schema_name: request.schema_name.clone(),
table_name: request.table_name.clone(),
key_column_values,
},
)
})
.collect();
Ok(requests)
@@ -439,7 +447,12 @@ mod tests {
"host".to_string(),
Arc::new(StringVector::from(vec!["localhost"])) as _,
);
let delete = DeleteRequest { key_column_values };
let delete = DeleteRequest {
catalog_name: "foo_catalog".to_string(),
schema_name: "foo_schema".to_string(),
table_name: "foo_table".to_string(),
key_column_values,
};
let rule = Arc::new(EmptyPartitionRule) as PartitionRuleRef;
let spliter = WriteSplitter::with_partition_rule(rule);
let ret = spliter

View File

@@ -146,6 +146,9 @@ impl DatafusionQueryEngine {
table: &TableRef,
column_vectors: HashMap<String, VectorRef>,
) -> Result<usize> {
let catalog_name = table_name.catalog.to_string();
let schema_name = table_name.schema.to_string();
let table_name = table_name.table.to_string();
let table_schema = table.schema();
let ts_column = table_schema
.timestamp_column()
@@ -165,6 +168,9 @@ impl DatafusionQueryEngine {
.collect::<HashMap<_, _>>();
let request = DeleteRequest {
catalog_name,
schema_name,
table_name,
key_column_values: column_vectors,
};

View File

@@ -160,7 +160,7 @@ impl GrpcQueryHandler for DummyInstance {
) -> std::result::Result<Output, Self::Error> {
let output = match request {
Request::Inserts(_)
| Request::Delete(_)
| Request::Deletes(_)
| Request::RowInserts(_)
| Request::RowDelete(_) => unimplemented!(),
Request::Query(query_request) => {

View File

@@ -251,6 +251,9 @@ pub struct InsertRequest {
/// Delete (by primary key) request
#[derive(Debug)]
pub struct DeleteRequest {
pub catalog_name: String,
pub schema_name: String,
pub table_name: String,
/// Values of each column in this table's primary key and time index.
///
/// The key is the column name, and the value is the column value.

View File

@@ -22,8 +22,8 @@ mod test {
use api::v1::query_request::Query;
use api::v1::{
alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef,
CreateDatabaseExpr, CreateTableExpr, DdlRequest, DeleteRequest, DropTableExpr,
FlushTableExpr, InsertRequest, InsertRequests, QueryRequest, SemanticType,
CreateDatabaseExpr, CreateTableExpr, DdlRequest, DeleteRequest, DeleteRequests,
DropTableExpr, FlushTableExpr, InsertRequest, InsertRequests, QueryRequest, SemanticType,
};
use common_catalog::consts::MITO_ENGINE;
use common_query::Output;
@@ -216,7 +216,6 @@ CREATE TABLE {table_name} (
| ts | a | b |
+---------------------+---+-------------------+
| 2023-01-01T07:26:12 | 1 | ts: 1672557972000 |
| 2023-01-01T07:26:14 | 3 | ts: 1672557974000 |
| 2023-01-01T07:26:15 | 4 | ts: 1672557975000 |
| 2023-01-01T07:26:16 | 5 | ts: 1672557976000 |
| 2023-01-01T07:26:17 | | ts: 1672557977000 |
@@ -250,7 +249,6 @@ CREATE TABLE {table_name} (
+---------------------+----+-------------------+
| 2023-01-01T07:26:24 | 50 | ts: 1672557984000 |
| 2023-01-01T07:26:25 | 51 | ts: 1672557985000 |
| 2023-01-01T07:26:27 | 53 | ts: 1672557987000 |
+---------------------+----+-------------------+",
),
]),
@@ -527,7 +525,7 @@ CREATE TABLE {table_name} (
+---------------------+----+-------------------+";
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
let delete = DeleteRequest {
let new_grpc_delete_request = |a, b, ts, row_count| DeleteRequest {
table_name: table_name.to_string(),
region_number: 0,
key_columns: vec![
@@ -535,7 +533,7 @@ CREATE TABLE {table_name} (
column_name: "a".to_string(),
semantic_type: SemanticType::Field as i32,
values: Some(Values {
i32_values: vec![2, 12, 22, 52],
i32_values: a,
..Default::default()
}),
datatype: ColumnDataType::Int32 as i32,
@@ -545,12 +543,7 @@ CREATE TABLE {table_name} (
column_name: "b".to_string(),
semantic_type: SemanticType::Tag as i32,
values: Some(Values {
string_values: vec![
"ts: 1672557973000".to_string(),
"ts: 1672557979000".to_string(),
"ts: 1672557982000".to_string(),
"ts: 1672557986000".to_string(),
],
string_values: b,
..Default::default()
}),
datatype: ColumnDataType::String as i32,
@@ -560,22 +553,43 @@ CREATE TABLE {table_name} (
column_name: "ts".to_string(),
semantic_type: SemanticType::Timestamp as i32,
values: Some(Values {
ts_millisecond_values: vec![
1672557973000,
1672557979000,
1672557982000,
1672557986000,
],
ts_millisecond_values: ts,
..Default::default()
}),
datatype: ColumnDataType::TimestampMillisecond as i32,
..Default::default()
},
],
row_count: 4,
row_count,
};
let output = query(instance, Request::Delete(delete)).await;
assert!(matches!(output, Output::AffectedRows(4)));
let delete1 = new_grpc_delete_request(
vec![2, 12, 22, 52],
vec![
"ts: 1672557973000".to_string(),
"ts: 1672557979000".to_string(),
"ts: 1672557982000".to_string(),
"ts: 1672557986000".to_string(),
],
vec![1672557973000, 1672557979000, 1672557982000, 1672557986000],
4,
);
let delete2 = new_grpc_delete_request(
vec![3, 53],
vec![
"ts: 1672557974000".to_string(),
"ts: 1672557987000".to_string(),
],
vec![1672557974000, 1672557987000],
2,
);
let output = query(
instance,
Request::Deletes(DeleteRequests {
deletes: vec![delete1, delete2],
}),
)
.await;
assert!(matches!(output, Output::AffectedRows(6)));
let output = query(instance, request).await;
let Output::Stream(stream) = output else {
@@ -587,7 +601,6 @@ CREATE TABLE {table_name} (
| ts | a | b |
+---------------------+----+-------------------+
| 2023-01-01T07:26:12 | 1 | ts: 1672557972000 |
| 2023-01-01T07:26:14 | 3 | ts: 1672557974000 |
| 2023-01-01T07:26:15 | 4 | ts: 1672557975000 |
| 2023-01-01T07:26:16 | 5 | ts: 1672557976000 |
| 2023-01-01T07:26:17 | | ts: 1672557977000 |
@@ -597,7 +610,6 @@ CREATE TABLE {table_name} (
| 2023-01-01T07:26:23 | 23 | ts: 1672557983000 |
| 2023-01-01T07:26:24 | 50 | ts: 1672557984000 |
| 2023-01-01T07:26:25 | 51 | ts: 1672557985000 |
| 2023-01-01T07:26:27 | 53 | ts: 1672557987000 |
+---------------------+----+-------------------+";
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
}
@@ -765,7 +777,13 @@ CREATE TABLE {table_name} (
row_count: 2,
};
let output = query(instance, Request::Delete(delete)).await;
let output = query(
instance,
Request::Deletes(DeleteRequests {
deletes: vec![delete],
}),
)
.await;
assert!(matches!(output, Output::AffectedRows(2)));
let output = query(instance, request).await;