mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-15 09:42:58 +00:00
Compare commits
3 Commits
main
...
chore/test
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8853e08a7d | ||
|
|
9cba14f904 | ||
|
|
09ba24b7a9 |
227
Cargo.lock
generated
227
Cargo.lock
generated
@@ -725,7 +725,7 @@ dependencies = [
|
||||
"memchr",
|
||||
"num",
|
||||
"regex",
|
||||
"regex-syntax 0.8.7",
|
||||
"regex-syntax",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -742,7 +742,7 @@ dependencies = [
|
||||
"memchr",
|
||||
"num-traits",
|
||||
"regex",
|
||||
"regex-syntax 0.8.7",
|
||||
"regex-syntax",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1482,7 +1482,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "234113d19d0d7d613b40e86fb654acf958910802bcceab913a4f9e7cda03b1a4"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
"regex-automata 0.4.13",
|
||||
"regex-automata",
|
||||
"serde",
|
||||
]
|
||||
|
||||
@@ -2013,7 +2013,7 @@ dependencies = [
|
||||
"humantime",
|
||||
"meta-client",
|
||||
"meta-srv",
|
||||
"nu-ansi-term",
|
||||
"nu-ansi-term 0.46.0",
|
||||
"object-store",
|
||||
"operator",
|
||||
"paste",
|
||||
@@ -2155,7 +2155,7 @@ dependencies = [
|
||||
"metric-engine",
|
||||
"mito2",
|
||||
"moka",
|
||||
"nu-ansi-term",
|
||||
"nu-ansi-term 0.46.0",
|
||||
"object-store",
|
||||
"parquet",
|
||||
"plugins",
|
||||
@@ -2844,10 +2844,10 @@ dependencies = [
|
||||
"humantime-serde",
|
||||
"lazy_static",
|
||||
"once_cell",
|
||||
"opentelemetry 0.30.0",
|
||||
"opentelemetry",
|
||||
"opentelemetry-otlp",
|
||||
"opentelemetry-semantic-conventions",
|
||||
"opentelemetry_sdk 0.30.0",
|
||||
"opentelemetry_sdk",
|
||||
"parking_lot 0.12.4",
|
||||
"prometheus",
|
||||
"serde",
|
||||
@@ -4074,7 +4074,7 @@ dependencies = [
|
||||
"log",
|
||||
"recursive",
|
||||
"regex",
|
||||
"regex-syntax 0.8.7",
|
||||
"regex-syntax",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4976,8 +4976,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6e24cb5a94bcae1e5408b0effca5cd7172ea3c5755049c5f3af4cd283a165298"
|
||||
dependencies = [
|
||||
"bit-set",
|
||||
"regex-automata 0.4.13",
|
||||
"regex-syntax 0.8.7",
|
||||
"regex-automata",
|
||||
"regex-syntax",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5323,7 +5323,7 @@ dependencies = [
|
||||
"meta-client",
|
||||
"meta-srv",
|
||||
"num_cpus",
|
||||
"opentelemetry-proto 0.31.0",
|
||||
"opentelemetry-proto",
|
||||
"operator",
|
||||
"otel-arrow-rust",
|
||||
"partition",
|
||||
@@ -6526,8 +6526,8 @@ dependencies = [
|
||||
"rand 0.9.1",
|
||||
"rand_chacha 0.9.0",
|
||||
"regex",
|
||||
"regex-automata 0.4.13",
|
||||
"roaring",
|
||||
"regex-automata",
|
||||
"roaring 0.10.12",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"snafu 0.8.6",
|
||||
@@ -7119,7 +7119,7 @@ dependencies = [
|
||||
"lalrpop-util",
|
||||
"petgraph 0.7.1",
|
||||
"regex",
|
||||
"regex-syntax 0.8.7",
|
||||
"regex-syntax",
|
||||
"sha3",
|
||||
"string_cache",
|
||||
"term",
|
||||
@@ -7133,7 +7133,7 @@ version = "0.22.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b5baa5e9ff84f1aefd264e6869907646538a52147a755d494517a8007fb48733"
|
||||
dependencies = [
|
||||
"regex-automata 0.4.13",
|
||||
"regex-automata",
|
||||
"rustversion",
|
||||
]
|
||||
|
||||
@@ -7527,7 +7527,7 @@ dependencies = [
|
||||
"num-traits",
|
||||
"quote",
|
||||
"regex",
|
||||
"regex-syntax 0.8.7",
|
||||
"regex-syntax",
|
||||
"serde",
|
||||
"vergen",
|
||||
]
|
||||
@@ -7653,11 +7653,11 @@ checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4"
|
||||
|
||||
[[package]]
|
||||
name = "matchers"
|
||||
version = "0.1.0"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
|
||||
checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9"
|
||||
dependencies = [
|
||||
"regex-automata 0.1.10",
|
||||
"regex-automata",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -8061,7 +8061,7 @@ dependencies = [
|
||||
"rand 0.9.1",
|
||||
"rayon",
|
||||
"regex",
|
||||
"roaring",
|
||||
"roaring 0.10.12",
|
||||
"rskafka",
|
||||
"rstest 0.25.0",
|
||||
"rstest_reuse",
|
||||
@@ -8523,6 +8523,15 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nu-ansi-term"
|
||||
version = "0.50.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5"
|
||||
dependencies = [
|
||||
"windows-sys 0.61.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num"
|
||||
version = "0.4.3"
|
||||
@@ -8942,20 +8951,6 @@ version = "0.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry"
|
||||
version = "0.30.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
"js-sys",
|
||||
"pin-project-lite",
|
||||
"thiserror 2.0.17",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry"
|
||||
version = "0.31.0"
|
||||
@@ -8972,48 +8967,36 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry-http"
|
||||
version = "0.30.0"
|
||||
version = "0.31.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "50f6639e842a97dbea8886e3439710ae463120091e2e064518ba8e716e6ac36d"
|
||||
checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"http 1.3.1",
|
||||
"opentelemetry 0.30.0",
|
||||
"opentelemetry",
|
||||
"reqwest",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry-otlp"
|
||||
version = "0.30.0"
|
||||
version = "0.31.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dbee664a43e07615731afc539ca60c6d9f1a9425e25ca09c57bc36c87c55852b"
|
||||
checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf"
|
||||
dependencies = [
|
||||
"http 1.3.1",
|
||||
"opentelemetry 0.30.0",
|
||||
"opentelemetry",
|
||||
"opentelemetry-http",
|
||||
"opentelemetry-proto 0.30.0",
|
||||
"opentelemetry_sdk 0.30.0",
|
||||
"prost 0.13.5",
|
||||
"opentelemetry-proto",
|
||||
"opentelemetry_sdk",
|
||||
"prost 0.14.1",
|
||||
"reqwest",
|
||||
"thiserror 2.0.17",
|
||||
"tokio",
|
||||
"tonic 0.13.1",
|
||||
"tonic 0.14.2",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry-proto"
|
||||
version = "0.30.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2e046fd7660710fe5a05e8748e70d9058dc15c94ba914e7c4faa7c728f0e8ddc"
|
||||
dependencies = [
|
||||
"opentelemetry 0.30.0",
|
||||
"opentelemetry_sdk 0.30.0",
|
||||
"prost 0.13.5",
|
||||
"tonic 0.13.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry-proto"
|
||||
version = "0.31.0"
|
||||
@@ -9022,8 +9005,8 @@ checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f"
|
||||
dependencies = [
|
||||
"base64 0.22.1",
|
||||
"const-hex",
|
||||
"opentelemetry 0.31.0",
|
||||
"opentelemetry_sdk 0.31.0",
|
||||
"opentelemetry",
|
||||
"opentelemetry_sdk",
|
||||
"prost 0.14.1",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -9033,27 +9016,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry-semantic-conventions"
|
||||
version = "0.30.0"
|
||||
version = "0.31.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "83d059a296a47436748557a353c5e6c5705b9470ef6c95cfc52c21a8814ddac2"
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry_sdk"
|
||||
version = "0.30.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b"
|
||||
dependencies = [
|
||||
"futures-channel",
|
||||
"futures-executor",
|
||||
"futures-util",
|
||||
"opentelemetry 0.30.0",
|
||||
"percent-encoding",
|
||||
"rand 0.9.1",
|
||||
"serde_json",
|
||||
"thiserror 2.0.17",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
]
|
||||
checksum = "e62e29dfe041afb8ed2a6c9737ab57db4907285d999ef8ad3a59092a36bdc846"
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry_sdk"
|
||||
@@ -9064,10 +9029,12 @@ dependencies = [
|
||||
"futures-channel",
|
||||
"futures-executor",
|
||||
"futures-util",
|
||||
"opentelemetry 0.31.0",
|
||||
"opentelemetry",
|
||||
"percent-encoding",
|
||||
"rand 0.9.1",
|
||||
"thiserror 2.0.17",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -9227,7 +9194,7 @@ checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1"
|
||||
[[package]]
|
||||
name = "otel-arrow-rust"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/otel-arrow?rev=5da284414e9b14f678344b51e5292229e4b5f8d2#5da284414e9b14f678344b51e5292229e4b5f8d2"
|
||||
source = "git+https://github.com/GreptimeTeam/otel-arrow?rev=452821e455b16e9a397a09d299340e197eb91571#452821e455b16e9a397a09d299340e197eb91571"
|
||||
dependencies = [
|
||||
"ahash 0.8.12",
|
||||
"arrow 56.2.0",
|
||||
@@ -9243,6 +9210,7 @@ dependencies = [
|
||||
"prost-build 0.14.1",
|
||||
"rand 0.9.1",
|
||||
"replace_with",
|
||||
"roaring 0.11.3",
|
||||
"serde",
|
||||
"smallvec",
|
||||
"snafu 0.8.6",
|
||||
@@ -9255,7 +9223,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "otlp-derive"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/otel-arrow?rev=5da284414e9b14f678344b51e5292229e4b5f8d2#5da284414e9b14f678344b51e5292229e4b5f8d2"
|
||||
source = "git+https://github.com/GreptimeTeam/otel-arrow?rev=452821e455b16e9a397a09d299340e197eb91571#452821e455b16e9a397a09d299340e197eb91571"
|
||||
dependencies = [
|
||||
"convert_case 0.8.0",
|
||||
"otlp-model",
|
||||
@@ -9267,7 +9235,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "otlp-model"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/otel-arrow?rev=5da284414e9b14f678344b51e5292229e4b5f8d2#5da284414e9b14f678344b51e5292229e4b5f8d2"
|
||||
source = "git+https://github.com/GreptimeTeam/otel-arrow?rev=452821e455b16e9a397a09d299340e197eb91571#452821e455b16e9a397a09d299340e197eb91571"
|
||||
dependencies = [
|
||||
"tonic-prost-build",
|
||||
]
|
||||
@@ -10281,7 +10249,7 @@ dependencies = [
|
||||
"rand 0.9.1",
|
||||
"rand_chacha 0.9.0",
|
||||
"rand_xorshift",
|
||||
"regex-syntax 0.8.7",
|
||||
"regex-syntax",
|
||||
"unarray",
|
||||
]
|
||||
|
||||
@@ -11041,17 +11009,8 @@ checksum = "843bc0191f75f3e22651ae5f1e72939ab2f72a4bc30fa80a066bd66edefc24d4"
|
||||
dependencies = [
|
||||
"aho-corasick",
|
||||
"memchr",
|
||||
"regex-automata 0.4.13",
|
||||
"regex-syntax 0.8.7",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "regex-automata"
|
||||
version = "0.1.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
|
||||
dependencies = [
|
||||
"regex-syntax 0.6.29",
|
||||
"regex-automata",
|
||||
"regex-syntax",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -11062,7 +11021,7 @@ checksum = "5276caf25ac86c8d810222b3dbb938e512c55c6831a10f3e6ed1c93b84041f1c"
|
||||
dependencies = [
|
||||
"aho-corasick",
|
||||
"memchr",
|
||||
"regex-syntax 0.8.7",
|
||||
"regex-syntax",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -11076,7 +11035,7 @@ dependencies = [
|
||||
"itertools 0.13.0",
|
||||
"nohash",
|
||||
"regex",
|
||||
"regex-syntax 0.8.7",
|
||||
"regex-syntax",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -11085,12 +11044,6 @@ version = "0.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a"
|
||||
|
||||
[[package]]
|
||||
name = "regex-syntax"
|
||||
version = "0.6.29"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
|
||||
|
||||
[[package]]
|
||||
name = "regex-syntax"
|
||||
version = "0.8.7"
|
||||
@@ -11295,6 +11248,16 @@ dependencies = [
|
||||
"byteorder",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "roaring"
|
||||
version = "0.11.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8ba9ce64a8f45d7fc86358410bb1a82e8c987504c0d4900e9141d69a9f26c885"
|
||||
dependencies = [
|
||||
"bytemuck",
|
||||
"byteorder",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "robust"
|
||||
version = "1.2.0"
|
||||
@@ -12174,7 +12137,7 @@ dependencies = [
|
||||
"once_cell",
|
||||
"openmetrics-parser",
|
||||
"opensrv-mysql",
|
||||
"opentelemetry-proto 0.31.0",
|
||||
"opentelemetry-proto",
|
||||
"operator",
|
||||
"otel-arrow-rust",
|
||||
"parking_lot 0.12.4",
|
||||
@@ -13434,7 +13397,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d60769b80ad7953d8a7b2c70cdfe722bbcdcac6bccc8ac934c40c034d866fc18"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
"regex-syntax 0.8.7",
|
||||
"regex-syntax",
|
||||
"utf8-ranges",
|
||||
]
|
||||
|
||||
@@ -13659,7 +13622,7 @@ dependencies = [
|
||||
"moka",
|
||||
"mysql_async",
|
||||
"object-store",
|
||||
"opentelemetry-proto 0.31.0",
|
||||
"opentelemetry-proto",
|
||||
"operator",
|
||||
"otel-arrow-rust",
|
||||
"partition",
|
||||
@@ -14124,32 +14087,6 @@ dependencies = [
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tonic"
|
||||
version = "0.13.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"base64 0.22.1",
|
||||
"bytes",
|
||||
"http 1.3.1",
|
||||
"http-body 1.0.1",
|
||||
"http-body-util",
|
||||
"hyper 1.6.0",
|
||||
"hyper-timeout 0.5.2",
|
||||
"hyper-util",
|
||||
"percent-encoding",
|
||||
"pin-project",
|
||||
"prost 0.13.5",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tower 0.5.2",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tonic"
|
||||
version = "0.14.2"
|
||||
@@ -14340,9 +14277,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
|
||||
|
||||
[[package]]
|
||||
name = "tracing"
|
||||
version = "0.1.41"
|
||||
version = "0.1.44"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0"
|
||||
checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100"
|
||||
dependencies = [
|
||||
"log",
|
||||
"pin-project-lite",
|
||||
@@ -14364,9 +14301,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tracing-attributes"
|
||||
version = "0.1.28"
|
||||
version = "0.1.31"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d"
|
||||
checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -14375,9 +14312,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tracing-core"
|
||||
version = "0.1.34"
|
||||
version = "0.1.36"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678"
|
||||
checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a"
|
||||
dependencies = [
|
||||
"once_cell",
|
||||
"valuable",
|
||||
@@ -14396,14 +14333,12 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tracing-opentelemetry"
|
||||
version = "0.31.0"
|
||||
version = "0.32.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ddcf5959f39507d0d04d6413119c04f33b623f4f951ebcbdddddfad2d0623a9c"
|
||||
checksum = "1ac28f2d093c6c477eaa76b23525478f38de514fa9aeb1285738d4b97a9552fc"
|
||||
dependencies = [
|
||||
"js-sys",
|
||||
"once_cell",
|
||||
"opentelemetry 0.30.0",
|
||||
"opentelemetry_sdk 0.30.0",
|
||||
"opentelemetry",
|
||||
"smallvec",
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
@@ -14424,14 +14359,14 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tracing-subscriber"
|
||||
version = "0.3.19"
|
||||
version = "0.3.22"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
|
||||
checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e"
|
||||
dependencies = [
|
||||
"matchers",
|
||||
"nu-ansi-term",
|
||||
"nu-ansi-term 0.50.3",
|
||||
"once_cell",
|
||||
"regex",
|
||||
"regex-automata",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sharded-slab",
|
||||
|
||||
@@ -181,7 +181,7 @@ opentelemetry-proto = { version = "0.31", features = [
|
||||
"logs",
|
||||
] }
|
||||
ordered-float = { version = "4.3", features = ["serde"] }
|
||||
otel-arrow-rust = { git = "https://github.com/GreptimeTeam/otel-arrow", rev = "5da284414e9b14f678344b51e5292229e4b5f8d2", features = [
|
||||
otel-arrow-rust = { git = "https://github.com/GreptimeTeam/otel-arrow", rev = "452821e455b16e9a397a09d299340e197eb91571", features = [
|
||||
"server",
|
||||
] }
|
||||
parking_lot = "0.12"
|
||||
@@ -191,8 +191,8 @@ pin-project = "1.0"
|
||||
pretty_assertions = "1.4.0"
|
||||
prometheus = { version = "0.13.3", features = ["process"] }
|
||||
promql-parser = { version = "0.7.1", features = ["ser"] }
|
||||
prost = { version = "0.14", features = ["no-recursion-limit"] }
|
||||
prost-types = "0.14"
|
||||
prost = { version = "=0.14.1", features = ["no-recursion-limit"] }
|
||||
prost-types = "=0.14.1"
|
||||
raft-engine = { version = "0.4.1", default-features = false }
|
||||
rand = "0.9"
|
||||
ratelimit = "0.10"
|
||||
@@ -240,7 +240,7 @@ tower = "0.5"
|
||||
tower-http = "0.6"
|
||||
tracing = "0.1"
|
||||
tracing-appender = "0.2"
|
||||
tracing-opentelemetry = "0.31.0"
|
||||
tracing-opentelemetry = "0.32.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "fmt"] }
|
||||
typetag = "0.2"
|
||||
uuid = { version = "1.17", features = ["serde", "v4", "fast-rng"] }
|
||||
|
||||
@@ -102,6 +102,6 @@ pub fn create_region_request_builder_from_raw_table_info(
|
||||
raw_table_info: &RawTableInfo,
|
||||
physical_table_id: TableId,
|
||||
) -> Result<CreateRequestBuilder> {
|
||||
let template = build_template_from_raw_table_info(raw_table_info, false)?;
|
||||
let template = build_template_from_raw_table_info(raw_table_info)?;
|
||||
Ok(CreateRequestBuilder::new(template, Some(physical_table_id)))
|
||||
}
|
||||
|
||||
@@ -20,9 +20,7 @@ use api::v1::region::{CreateRequest, RegionColumnDef};
|
||||
use api::v1::{ColumnDef, CreateTableExpr, SemanticType};
|
||||
use common_telemetry::warn;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::metric_engine_consts::{
|
||||
LOGICAL_TABLE_METADATA_KEY, is_metric_engine_internal_column,
|
||||
};
|
||||
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
|
||||
use store_api::storage::{RegionId, RegionNumber};
|
||||
use table::metadata::{RawTableInfo, TableId};
|
||||
|
||||
@@ -32,45 +30,34 @@ use crate::wal_provider::prepare_wal_options;
|
||||
/// Constructs a [CreateRequest] based on the provided [RawTableInfo].
|
||||
///
|
||||
/// Note: This function is primarily intended for creating logical tables or allocating placeholder regions.
|
||||
pub fn build_template_from_raw_table_info(
|
||||
raw_table_info: &RawTableInfo,
|
||||
skip_internal_columns: bool,
|
||||
) -> Result<CreateRequest> {
|
||||
pub fn build_template_from_raw_table_info(raw_table_info: &RawTableInfo) -> Result<CreateRequest> {
|
||||
let primary_key_indices = &raw_table_info.meta.primary_key_indices;
|
||||
let filtered = raw_table_info
|
||||
let column_defs = raw_table_info
|
||||
.meta
|
||||
.schema
|
||||
.column_schemas
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter(|(_, c)| !skip_internal_columns || !is_metric_engine_internal_column(&c.name))
|
||||
.map(|(i, c)| {
|
||||
let is_primary_key = primary_key_indices.contains(&i);
|
||||
let column_def = try_as_column_def(c, is_primary_key)
|
||||
.context(error::ConvertColumnDefSnafu { column: &c.name })?;
|
||||
Ok((
|
||||
is_primary_key.then_some(i),
|
||||
RegionColumnDef {
|
||||
column_def: Some(column_def),
|
||||
// The column id will be overridden by the metric engine.
|
||||
// So we just use the index as the column id.
|
||||
column_id: i as u32,
|
||||
},
|
||||
))
|
||||
})
|
||||
.collect::<Result<Vec<(Option<usize>, RegionColumnDef)>>>()?;
|
||||
|
||||
let (new_primary_key_indices, column_defs): (Vec<_>, Vec<_>) = filtered.into_iter().unzip();
|
||||
Ok(RegionColumnDef {
|
||||
column_def: Some(column_def),
|
||||
// The column id will be overridden by the metric engine.
|
||||
// So we just use the index as the column id.
|
||||
column_id: i as u32,
|
||||
})
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
let options = HashMap::from(&raw_table_info.meta.options);
|
||||
let template = CreateRequest {
|
||||
region_id: 0,
|
||||
engine: raw_table_info.meta.engine.clone(),
|
||||
column_defs,
|
||||
primary_key: new_primary_key_indices
|
||||
.iter()
|
||||
.flatten()
|
||||
.map(|i| *i as u32)
|
||||
.collect(),
|
||||
primary_key: primary_key_indices.iter().map(|i| *i as u32).collect(),
|
||||
path: String::new(),
|
||||
options,
|
||||
partition: None,
|
||||
|
||||
@@ -17,7 +17,6 @@ use std::fmt::{Display, Formatter};
|
||||
use std::time::Duration;
|
||||
|
||||
use serde::{Deserialize, Deserializer, Serialize};
|
||||
use store_api::region_engine::SyncRegionFromRequest;
|
||||
use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber};
|
||||
use strum::Display;
|
||||
use table::metadata::TableId;
|
||||
@@ -531,25 +530,6 @@ impl Display for EnterStagingRegion {
|
||||
}
|
||||
}
|
||||
|
||||
/// Instruction payload for syncing a region from a manifest or another region.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct SyncRegion {
|
||||
/// Region id to sync.
|
||||
pub region_id: RegionId,
|
||||
/// Request to sync the region.
|
||||
pub request: SyncRegionFromRequest,
|
||||
}
|
||||
|
||||
impl Display for SyncRegion {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"SyncRegion(region_id={}, request={:?})",
|
||||
self.region_id, self.request
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct RemapManifest {
|
||||
pub region_id: RegionId,
|
||||
@@ -622,11 +602,8 @@ pub enum Instruction {
|
||||
Suspend,
|
||||
/// Makes regions enter staging state.
|
||||
EnterStagingRegions(Vec<EnterStagingRegion>),
|
||||
/// Syncs regions.
|
||||
SyncRegions(Vec<SyncRegion>),
|
||||
/// Remaps manifests for a region.
|
||||
RemapManifest(RemapManifest),
|
||||
|
||||
/// Applies staging manifests for a region.
|
||||
ApplyStagingManifests(Vec<ApplyStagingManifest>),
|
||||
}
|
||||
@@ -692,13 +669,6 @@ impl Instruction {
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_sync_regions(self) -> Option<Vec<SyncRegion>> {
|
||||
match self {
|
||||
Self::SyncRegions(sync_regions) => Some(sync_regions),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The reply of [UpgradeRegion].
|
||||
@@ -814,31 +784,6 @@ impl EnterStagingRegionsReply {
|
||||
}
|
||||
}
|
||||
|
||||
/// Reply for a single region sync request.
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
|
||||
pub struct SyncRegionReply {
|
||||
/// Region id of the synced region.
|
||||
pub region_id: RegionId,
|
||||
/// Returns true if the region is successfully synced and ready.
|
||||
pub ready: bool,
|
||||
/// Indicates whether the region exists.
|
||||
pub exists: bool,
|
||||
/// Return error message if any during the operation.
|
||||
pub error: Option<String>,
|
||||
}
|
||||
|
||||
/// Reply for a batch of region sync requests.
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
|
||||
pub struct SyncRegionsReply {
|
||||
pub replies: Vec<SyncRegionReply>,
|
||||
}
|
||||
|
||||
impl SyncRegionsReply {
|
||||
pub fn new(replies: Vec<SyncRegionReply>) -> Self {
|
||||
Self { replies }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
|
||||
pub struct RemapManifestReply {
|
||||
/// Returns false if the region does not exist.
|
||||
@@ -902,7 +847,6 @@ pub enum InstructionReply {
|
||||
GetFileRefs(GetFileRefsReply),
|
||||
GcRegions(GcRegionsReply),
|
||||
EnterStagingRegions(EnterStagingRegionsReply),
|
||||
SyncRegions(SyncRegionsReply),
|
||||
RemapManifest(RemapManifestReply),
|
||||
ApplyStagingManifests(ApplyStagingManifestsReply),
|
||||
}
|
||||
@@ -928,9 +872,6 @@ impl Display for InstructionReply {
|
||||
reply.replies
|
||||
)
|
||||
}
|
||||
Self::SyncRegions(reply) => {
|
||||
write!(f, "InstructionReply::SyncRegions({:?})", reply.replies)
|
||||
}
|
||||
Self::RemapManifest(reply) => write!(f, "InstructionReply::RemapManifest({})", reply),
|
||||
Self::ApplyStagingManifests(reply) => write!(
|
||||
f,
|
||||
@@ -985,13 +926,6 @@ impl InstructionReply {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn expect_sync_regions_reply(self) -> Vec<SyncRegionReply> {
|
||||
match self {
|
||||
Self::SyncRegions(reply) => reply.replies,
|
||||
_ => panic!("Expected SyncRegion reply"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn expect_remap_manifest_reply(self) -> RemapManifestReply {
|
||||
match self {
|
||||
Self::RemapManifest(reply) => reply,
|
||||
|
||||
@@ -150,7 +150,7 @@ fn create_region_request_from_raw_table_info(
|
||||
raw_table_info: &RawTableInfo,
|
||||
physical_table_id: TableId,
|
||||
) -> Result<CreateRequestBuilder> {
|
||||
let template = build_template_from_raw_table_info(raw_table_info, false)?;
|
||||
let template = build_template_from_raw_table_info(raw_table_info)?;
|
||||
Ok(CreateRequestBuilder::new(template, Some(physical_table_id)))
|
||||
}
|
||||
|
||||
|
||||
@@ -21,12 +21,12 @@ greptime-proto.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
lazy_static.workspace = true
|
||||
once_cell.workspace = true
|
||||
opentelemetry = { version = "0.30.0", default-features = false, features = [
|
||||
opentelemetry = { version = "0.31.0", default-features = false, features = [
|
||||
"trace",
|
||||
] }
|
||||
opentelemetry-otlp = { version = "0.30.0", features = ["trace", "grpc-tonic", "http-proto"] }
|
||||
opentelemetry-semantic-conventions = { version = "0.30.0", features = ["semconv_experimental"] }
|
||||
opentelemetry_sdk = { version = "0.30.0", features = ["rt-tokio", "trace"] }
|
||||
opentelemetry-otlp = { version = "0.31.0", features = ["trace", "grpc-tonic", "http-proto"] }
|
||||
opentelemetry-semantic-conventions = { version = "0.31.0", features = ["semconv_experimental"] }
|
||||
opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio", "trace"] }
|
||||
parking_lot.workspace = true
|
||||
prometheus.workspace = true
|
||||
serde.workspace = true
|
||||
|
||||
@@ -73,7 +73,7 @@ impl TracingContext {
|
||||
|
||||
/// Attach the given span as a child of the context. Returns the attached span.
|
||||
pub fn attach(&self, span: tracing::Span) -> tracing::Span {
|
||||
span.set_parent(self.0.clone());
|
||||
let _ = span.set_parent(self.0.clone());
|
||||
span
|
||||
}
|
||||
|
||||
|
||||
@@ -31,7 +31,6 @@ mod flush_region;
|
||||
mod gc_worker;
|
||||
mod open_region;
|
||||
mod remap_manifest;
|
||||
mod sync_region;
|
||||
mod upgrade_region;
|
||||
|
||||
use crate::heartbeat::handler::apply_staging_manifest::ApplyStagingManifestsHandler;
|
||||
@@ -43,7 +42,6 @@ use crate::heartbeat::handler::flush_region::FlushRegionsHandler;
|
||||
use crate::heartbeat::handler::gc_worker::GcRegionsHandler;
|
||||
use crate::heartbeat::handler::open_region::OpenRegionsHandler;
|
||||
use crate::heartbeat::handler::remap_manifest::RemapManifestHandler;
|
||||
use crate::heartbeat::handler::sync_region::SyncRegionHandler;
|
||||
use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
|
||||
use crate::heartbeat::task_tracker::TaskTracker;
|
||||
use crate::region_server::RegionServer;
|
||||
@@ -134,7 +132,6 @@ impl RegionHeartbeatResponseHandler {
|
||||
Instruction::EnterStagingRegions(_) => {
|
||||
Ok(Some(Box::new(EnterStagingRegionsHandler.into())))
|
||||
}
|
||||
Instruction::SyncRegions(_) => Ok(Some(Box::new(SyncRegionHandler.into()))),
|
||||
Instruction::RemapManifest(_) => Ok(Some(Box::new(RemapManifestHandler.into()))),
|
||||
Instruction::ApplyStagingManifests(_) => {
|
||||
Ok(Some(Box::new(ApplyStagingManifestsHandler.into())))
|
||||
@@ -153,7 +150,6 @@ pub enum InstructionHandlers {
|
||||
GetFileRefs(GetFileRefsHandler),
|
||||
GcRegions(GcRegionsHandler),
|
||||
EnterStagingRegions(EnterStagingRegionsHandler),
|
||||
SyncRegions(SyncRegionHandler),
|
||||
RemapManifest(RemapManifestHandler),
|
||||
ApplyStagingManifests(ApplyStagingManifestsHandler),
|
||||
}
|
||||
@@ -179,7 +175,6 @@ impl_from_handler!(
|
||||
GetFileRefsHandler => GetFileRefs,
|
||||
GcRegionsHandler => GcRegions,
|
||||
EnterStagingRegionsHandler => EnterStagingRegions,
|
||||
SyncRegionHandler => SyncRegions,
|
||||
RemapManifestHandler => RemapManifest,
|
||||
ApplyStagingManifestsHandler => ApplyStagingManifests
|
||||
);
|
||||
@@ -227,7 +222,6 @@ dispatch_instr!(
|
||||
GetFileRefs => GetFileRefs,
|
||||
GcRegions => GcRegions,
|
||||
EnterStagingRegions => EnterStagingRegions,
|
||||
SyncRegions => SyncRegions,
|
||||
RemapManifest => RemapManifest,
|
||||
ApplyStagingManifests => ApplyStagingManifests,
|
||||
);
|
||||
|
||||
@@ -48,32 +48,19 @@ impl ApplyStagingManifestsHandler {
|
||||
ctx: &HandlerContext,
|
||||
request: ApplyStagingManifest,
|
||||
) -> ApplyStagingManifestReply {
|
||||
let ApplyStagingManifest {
|
||||
region_id,
|
||||
ref partition_expr,
|
||||
central_region_id,
|
||||
ref manifest_path,
|
||||
} = request;
|
||||
common_telemetry::info!(
|
||||
"Datanode received apply staging manifest request, region_id: {}, central_region_id: {}, partition_expr: {}, manifest_path: {}",
|
||||
region_id,
|
||||
central_region_id,
|
||||
partition_expr,
|
||||
manifest_path
|
||||
);
|
||||
let Some(leader) = ctx.region_server.is_region_leader(region_id) else {
|
||||
warn!("Region: {} is not found", region_id);
|
||||
let Some(leader) = ctx.region_server.is_region_leader(request.region_id) else {
|
||||
warn!("Region: {} is not found", request.region_id);
|
||||
return ApplyStagingManifestReply {
|
||||
region_id,
|
||||
region_id: request.region_id,
|
||||
exists: false,
|
||||
ready: false,
|
||||
error: None,
|
||||
};
|
||||
};
|
||||
if !leader {
|
||||
warn!("Region: {} is not leader", region_id);
|
||||
warn!("Region: {} is not leader", request.region_id);
|
||||
return ApplyStagingManifestReply {
|
||||
region_id,
|
||||
region_id: request.region_id,
|
||||
exists: true,
|
||||
ready: false,
|
||||
error: Some("Region is not leader".into()),
|
||||
@@ -83,25 +70,25 @@ impl ApplyStagingManifestsHandler {
|
||||
match ctx
|
||||
.region_server
|
||||
.handle_request(
|
||||
region_id,
|
||||
request.region_id,
|
||||
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
|
||||
partition_expr: partition_expr.clone(),
|
||||
central_region_id,
|
||||
manifest_path: manifest_path.clone(),
|
||||
partition_expr: request.partition_expr,
|
||||
central_region_id: request.central_region_id,
|
||||
manifest_path: request.manifest_path,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => ApplyStagingManifestReply {
|
||||
region_id,
|
||||
region_id: request.region_id,
|
||||
exists: true,
|
||||
ready: true,
|
||||
error: None,
|
||||
},
|
||||
Err(err) => {
|
||||
error!(err; "Failed to apply staging manifest, region_id: {}", region_id);
|
||||
error!(err; "Failed to apply staging manifest");
|
||||
ApplyStagingManifestReply {
|
||||
region_id,
|
||||
region_id: request.region_id,
|
||||
exists: true,
|
||||
ready: false,
|
||||
error: Some(format!("{err:?}")),
|
||||
|
||||
@@ -51,11 +51,6 @@ impl EnterStagingRegionsHandler {
|
||||
partition_expr,
|
||||
}: EnterStagingRegion,
|
||||
) -> EnterStagingRegionReply {
|
||||
common_telemetry::info!(
|
||||
"Datanode received enter staging region: {}, partition_expr: {}",
|
||||
region_id,
|
||||
partition_expr
|
||||
);
|
||||
let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
|
||||
warn!("Region: {} is not found", region_id);
|
||||
return EnterStagingRegionReply {
|
||||
@@ -90,7 +85,7 @@ impl EnterStagingRegionsHandler {
|
||||
error: None,
|
||||
},
|
||||
Err(err) => {
|
||||
error!(err; "Failed to enter staging region, region_id: {}", region_id);
|
||||
error!(err; "Failed to enter staging region");
|
||||
EnterStagingRegionReply {
|
||||
region_id,
|
||||
ready: false,
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use common_meta::instruction::{InstructionReply, RemapManifest, RemapManifestReply};
|
||||
use common_telemetry::{error, info, warn};
|
||||
use common_telemetry::warn;
|
||||
use store_api::region_engine::RemapManifestsRequest;
|
||||
|
||||
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
|
||||
@@ -34,12 +34,6 @@ impl InstructionHandler for RemapManifestHandler {
|
||||
region_mapping,
|
||||
new_partition_exprs,
|
||||
} = request;
|
||||
info!(
|
||||
"Datanode received remap manifest request, region_id: {}, input_regions: {}, target_regions: {}",
|
||||
region_id,
|
||||
input_regions.len(),
|
||||
new_partition_exprs.len()
|
||||
);
|
||||
let Some(leader) = ctx.region_server.is_region_leader(region_id) else {
|
||||
warn!("Region: {} is not found", region_id);
|
||||
return Some(InstructionReply::RemapManifest(RemapManifestReply {
|
||||
@@ -73,18 +67,11 @@ impl InstructionHandler for RemapManifestHandler {
|
||||
manifest_paths: result.manifest_paths,
|
||||
error: None,
|
||||
}),
|
||||
Err(e) => {
|
||||
error!(
|
||||
e;
|
||||
"Remap manifests failed on datanode, region_id: {}",
|
||||
region_id
|
||||
);
|
||||
InstructionReply::RemapManifest(RemapManifestReply {
|
||||
exists: true,
|
||||
manifest_paths: Default::default(),
|
||||
error: Some(format!("{e:?}")),
|
||||
})
|
||||
}
|
||||
Err(e) => InstructionReply::RemapManifest(RemapManifestReply {
|
||||
exists: true,
|
||||
manifest_paths: Default::default(),
|
||||
error: Some(format!("{e:?}")),
|
||||
}),
|
||||
};
|
||||
|
||||
Some(reply)
|
||||
|
||||
@@ -1,192 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_meta::instruction::{InstructionReply, SyncRegion, SyncRegionReply, SyncRegionsReply};
|
||||
use common_telemetry::{error, info, warn};
|
||||
use futures::future::join_all;
|
||||
|
||||
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
|
||||
|
||||
/// Handler for [SyncRegion] instruction.
|
||||
/// It syncs the region from a manifest or another region.
|
||||
#[derive(Debug, Clone, Copy, Default)]
|
||||
pub struct SyncRegionHandler;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl InstructionHandler for SyncRegionHandler {
|
||||
type Instruction = Vec<SyncRegion>;
|
||||
|
||||
/// Handles a batch of [SyncRegion] instructions.
|
||||
async fn handle(
|
||||
&self,
|
||||
ctx: &HandlerContext,
|
||||
regions: Self::Instruction,
|
||||
) -> Option<InstructionReply> {
|
||||
let futures = regions
|
||||
.into_iter()
|
||||
.map(|sync_region| Self::handle_sync_region(ctx, sync_region));
|
||||
let results = join_all(futures).await;
|
||||
|
||||
Some(InstructionReply::SyncRegions(SyncRegionsReply::new(
|
||||
results,
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
impl SyncRegionHandler {
|
||||
/// Handles a single [SyncRegion] instruction.
|
||||
async fn handle_sync_region(
|
||||
ctx: &HandlerContext,
|
||||
SyncRegion { region_id, request }: SyncRegion,
|
||||
) -> SyncRegionReply {
|
||||
let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
|
||||
warn!("Region: {} is not found", region_id);
|
||||
return SyncRegionReply {
|
||||
region_id,
|
||||
ready: false,
|
||||
exists: false,
|
||||
error: None,
|
||||
};
|
||||
};
|
||||
|
||||
if !writable {
|
||||
warn!("Region: {} is not writable", region_id);
|
||||
return SyncRegionReply {
|
||||
region_id,
|
||||
ready: false,
|
||||
exists: true,
|
||||
error: Some("Region is not writable".into()),
|
||||
};
|
||||
}
|
||||
|
||||
match ctx.region_server.sync_region(region_id, request).await {
|
||||
Ok(_) => {
|
||||
info!("Successfully synced region: {}", region_id);
|
||||
SyncRegionReply {
|
||||
region_id,
|
||||
ready: true,
|
||||
exists: true,
|
||||
error: None,
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!(e; "Failed to sync region: {}", region_id);
|
||||
SyncRegionReply {
|
||||
region_id,
|
||||
ready: false,
|
||||
exists: true,
|
||||
error: Some(format!("{:?}", e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
|
||||
use store_api::region_engine::{RegionRole, SyncRegionFromRequest};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::heartbeat::handler::sync_region::SyncRegionHandler;
|
||||
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
|
||||
use crate::tests::{MockRegionEngine, mock_region_server};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_handle_sync_region_not_found() {
|
||||
let mut mock_region_server = mock_region_server();
|
||||
let (mock_engine, _) = MockRegionEngine::new(METRIC_ENGINE_NAME);
|
||||
mock_region_server.register_engine(mock_engine);
|
||||
|
||||
let handler_context = HandlerContext::new_for_test(mock_region_server);
|
||||
let handler = SyncRegionHandler;
|
||||
|
||||
let region_id = RegionId::new(1024, 1);
|
||||
let sync_region = common_meta::instruction::SyncRegion {
|
||||
region_id,
|
||||
request: SyncRegionFromRequest::from_manifest(Default::default()),
|
||||
};
|
||||
|
||||
let reply = handler
|
||||
.handle(&handler_context, vec![sync_region])
|
||||
.await
|
||||
.unwrap()
|
||||
.expect_sync_regions_reply();
|
||||
|
||||
assert_eq!(reply.len(), 1);
|
||||
assert_eq!(reply[0].region_id, region_id);
|
||||
assert!(!reply[0].exists);
|
||||
assert!(!reply[0].ready);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_handle_sync_region_not_writable() {
|
||||
let mock_region_server = mock_region_server();
|
||||
let region_id = RegionId::new(1024, 1);
|
||||
let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(METRIC_ENGINE_NAME, |r| {
|
||||
r.mock_role = Some(Some(RegionRole::Follower));
|
||||
});
|
||||
mock_region_server.register_test_region(region_id, mock_engine);
|
||||
|
||||
let handler_context = HandlerContext::new_for_test(mock_region_server);
|
||||
let handler = SyncRegionHandler;
|
||||
|
||||
let sync_region = common_meta::instruction::SyncRegion {
|
||||
region_id,
|
||||
request: SyncRegionFromRequest::from_manifest(Default::default()),
|
||||
};
|
||||
|
||||
let reply = handler
|
||||
.handle(&handler_context, vec![sync_region])
|
||||
.await
|
||||
.unwrap()
|
||||
.expect_sync_regions_reply();
|
||||
|
||||
assert_eq!(reply.len(), 1);
|
||||
assert_eq!(reply[0].region_id, region_id);
|
||||
assert!(reply[0].exists);
|
||||
assert!(!reply[0].ready);
|
||||
assert!(reply[0].error.is_some());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_handle_sync_region_success() {
|
||||
let mock_region_server = mock_region_server();
|
||||
let region_id = RegionId::new(1024, 1);
|
||||
let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(METRIC_ENGINE_NAME, |r| {
|
||||
r.mock_role = Some(Some(RegionRole::Leader));
|
||||
});
|
||||
mock_region_server.register_test_region(region_id, mock_engine);
|
||||
|
||||
let handler_context = HandlerContext::new_for_test(mock_region_server);
|
||||
let handler = SyncRegionHandler;
|
||||
|
||||
let sync_region = common_meta::instruction::SyncRegion {
|
||||
region_id,
|
||||
request: SyncRegionFromRequest::from_manifest(Default::default()),
|
||||
};
|
||||
|
||||
let reply = handler
|
||||
.handle(&handler_context, vec![sync_region])
|
||||
.await
|
||||
.unwrap()
|
||||
.expect_sync_regions_reply();
|
||||
|
||||
assert_eq!(reply.len(), 1);
|
||||
assert_eq!(reply[0].region_id, region_id);
|
||||
assert!(reply[0].exists);
|
||||
assert!(reply[0].ready);
|
||||
assert!(reply[0].error.is_none());
|
||||
}
|
||||
}
|
||||
@@ -115,17 +115,12 @@ pub type MockSetReadonlyGracefullyHandler =
|
||||
pub type MockGetMetadataHandler =
|
||||
Box<dyn Fn(RegionId) -> Result<RegionMetadataRef, Error> + Send + Sync>;
|
||||
|
||||
pub type MockSyncRegionHandler = Box<
|
||||
dyn Fn(RegionId, SyncRegionFromRequest) -> Result<SyncRegionFromResponse, Error> + Send + Sync,
|
||||
>;
|
||||
|
||||
pub struct MockRegionEngine {
|
||||
sender: Sender<(RegionId, RegionRequest)>,
|
||||
pub(crate) handle_request_delay: Option<Duration>,
|
||||
pub(crate) handle_request_mock_fn: Option<MockRequestHandler>,
|
||||
pub(crate) handle_set_readonly_gracefully_mock_fn: Option<MockSetReadonlyGracefullyHandler>,
|
||||
pub(crate) handle_get_metadata_mock_fn: Option<MockGetMetadataHandler>,
|
||||
pub(crate) handle_sync_region_mock_fn: Option<MockSyncRegionHandler>,
|
||||
pub(crate) mock_role: Option<Option<RegionRole>>,
|
||||
engine: String,
|
||||
}
|
||||
@@ -141,7 +136,6 @@ impl MockRegionEngine {
|
||||
handle_request_mock_fn: None,
|
||||
handle_set_readonly_gracefully_mock_fn: None,
|
||||
handle_get_metadata_mock_fn: None,
|
||||
handle_sync_region_mock_fn: None,
|
||||
mock_role: None,
|
||||
engine: engine.to_string(),
|
||||
}),
|
||||
@@ -162,7 +156,6 @@ impl MockRegionEngine {
|
||||
handle_request_mock_fn: Some(mock_fn),
|
||||
handle_set_readonly_gracefully_mock_fn: None,
|
||||
handle_get_metadata_mock_fn: None,
|
||||
handle_sync_region_mock_fn: None,
|
||||
mock_role: None,
|
||||
engine: engine.to_string(),
|
||||
}),
|
||||
@@ -183,7 +176,6 @@ impl MockRegionEngine {
|
||||
handle_request_mock_fn: None,
|
||||
handle_set_readonly_gracefully_mock_fn: None,
|
||||
handle_get_metadata_mock_fn: Some(mock_fn),
|
||||
handle_sync_region_mock_fn: None,
|
||||
mock_role: None,
|
||||
engine: engine.to_string(),
|
||||
}),
|
||||
@@ -205,7 +197,6 @@ impl MockRegionEngine {
|
||||
handle_request_mock_fn: None,
|
||||
handle_set_readonly_gracefully_mock_fn: None,
|
||||
handle_get_metadata_mock_fn: None,
|
||||
handle_sync_region_mock_fn: None,
|
||||
mock_role: None,
|
||||
engine: engine.to_string(),
|
||||
};
|
||||
@@ -295,14 +286,10 @@ impl RegionEngine for MockRegionEngine {
|
||||
|
||||
async fn sync_region(
|
||||
&self,
|
||||
region_id: RegionId,
|
||||
request: SyncRegionFromRequest,
|
||||
_region_id: RegionId,
|
||||
_request: SyncRegionFromRequest,
|
||||
) -> Result<SyncRegionFromResponse, BoxedError> {
|
||||
if let Some(mock_fn) = &self.handle_sync_region_mock_fn {
|
||||
return mock_fn(region_id, request).map_err(BoxedError::new);
|
||||
};
|
||||
|
||||
Ok(SyncRegionFromResponse::Mito { synced: true })
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn remap_manifests(
|
||||
|
||||
@@ -14,15 +14,19 @@
|
||||
|
||||
use std::any::Any;
|
||||
|
||||
use api::v1::meta::MailboxMessage;
|
||||
use common_meta::instruction::{FlushErrorStrategy, FlushRegions, Instruction, InstructionReply};
|
||||
use common_procedure::{Context as ProcedureContext, Status};
|
||||
use common_telemetry::{info, warn};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::OptionExt;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use tokio::time::Instant;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::error::{self, Error, Result};
|
||||
use crate::handler::HeartbeatMailbox;
|
||||
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
|
||||
use crate::procedure::region_migration::{Context, State};
|
||||
use crate::procedure::utils;
|
||||
use crate::service::mailbox::Channel;
|
||||
|
||||
/// Flushes the leader region before downgrading it.
|
||||
///
|
||||
@@ -57,6 +61,15 @@ impl State for PreFlushRegion {
|
||||
}
|
||||
|
||||
impl PreFlushRegion {
|
||||
/// Builds flush leader region instruction.
|
||||
fn build_flush_leader_region_instruction(&self, ctx: &Context) -> Instruction {
|
||||
let pc = &ctx.persistent_ctx;
|
||||
Instruction::FlushRegions(FlushRegions::sync_batch(
|
||||
pc.region_ids.clone(),
|
||||
FlushErrorStrategy::TryAll,
|
||||
))
|
||||
}
|
||||
|
||||
/// Tries to flush a leader region.
|
||||
///
|
||||
/// Ignore:
|
||||
@@ -76,18 +89,109 @@ impl PreFlushRegion {
|
||||
.context(error::ExceededDeadlineSnafu {
|
||||
operation: "Flush leader region",
|
||||
})?;
|
||||
let flush_instruction = self.build_flush_leader_region_instruction(ctx);
|
||||
let region_ids = &ctx.persistent_ctx.region_ids;
|
||||
let leader = &ctx.persistent_ctx.from_peer;
|
||||
|
||||
utils::flush_region(
|
||||
&ctx.mailbox,
|
||||
&ctx.server_addr,
|
||||
region_ids,
|
||||
leader,
|
||||
operation_timeout,
|
||||
utils::ErrorStrategy::Ignore,
|
||||
let msg = MailboxMessage::json_message(
|
||||
&format!("Flush leader region: {:?}", region_ids),
|
||||
&format!("Metasrv@{}", ctx.server_addr()),
|
||||
&format!("Datanode-{}@{}", leader.id, leader.addr),
|
||||
common_time::util::current_time_millis(),
|
||||
&flush_instruction,
|
||||
)
|
||||
.await
|
||||
.with_context(|_| error::SerializeToJsonSnafu {
|
||||
input: flush_instruction.to_string(),
|
||||
})?;
|
||||
|
||||
let ch = Channel::Datanode(leader.id);
|
||||
let now = Instant::now();
|
||||
let result = ctx.mailbox.send(&ch, msg, operation_timeout).await;
|
||||
|
||||
match result {
|
||||
Ok(receiver) => match receiver.await {
|
||||
Ok(msg) => {
|
||||
let reply = HeartbeatMailbox::json_reply(&msg)?;
|
||||
info!(
|
||||
"Received flush leader region reply: {:?}, region: {:?}, elapsed: {:?}",
|
||||
reply,
|
||||
region_ids,
|
||||
now.elapsed()
|
||||
);
|
||||
|
||||
let reply_result = match reply {
|
||||
InstructionReply::FlushRegions(flush_reply) => {
|
||||
if flush_reply.results.len() != region_ids.len() {
|
||||
return error::UnexpectedInstructionReplySnafu {
|
||||
mailbox_message: msg.to_string(),
|
||||
reason: format!(
|
||||
"expect {} region flush result, but got {}",
|
||||
region_ids.len(),
|
||||
flush_reply.results.len()
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
match flush_reply.overall_success {
|
||||
true => (true, None),
|
||||
false => (
|
||||
false,
|
||||
Some(
|
||||
flush_reply
|
||||
.results
|
||||
.iter()
|
||||
.filter_map(|(region_id, result)| match result {
|
||||
Ok(_) => None,
|
||||
Err(e) => Some(format!("{}: {}", region_id, e)),
|
||||
})
|
||||
.collect::<Vec<String>>()
|
||||
.join("; "),
|
||||
),
|
||||
),
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
return error::UnexpectedInstructionReplySnafu {
|
||||
mailbox_message: msg.to_string(),
|
||||
reason: "expect flush region reply",
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
};
|
||||
let (result, error) = reply_result;
|
||||
|
||||
if let Some(error) = error {
|
||||
warn!(
|
||||
"Failed to flush leader regions {:?} on datanode {:?}, error: {}. Skip flush operation.",
|
||||
region_ids, leader, &error
|
||||
);
|
||||
} else if result {
|
||||
info!(
|
||||
"The flush leader regions {:?} on datanode {:?} is successful, elapsed: {:?}",
|
||||
region_ids,
|
||||
leader,
|
||||
now.elapsed()
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
|
||||
operation: "Flush leader regions",
|
||||
}
|
||||
.fail(),
|
||||
Err(err) => Err(err),
|
||||
},
|
||||
Err(Error::PusherNotFound { .. }) => {
|
||||
warn!(
|
||||
"Failed to flush leader regions({:?}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
|
||||
region_ids, leader
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -98,13 +202,11 @@ mod tests {
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use super::*;
|
||||
use crate::error::Error;
|
||||
use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
|
||||
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
|
||||
use crate::procedure::test_util::{
|
||||
new_close_region_reply, new_flush_region_reply_for_region, send_mock_reply,
|
||||
};
|
||||
use crate::service::mailbox::Channel;
|
||||
|
||||
fn new_persistent_context() -> PersistentContext {
|
||||
test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
|
||||
|
||||
@@ -47,7 +47,7 @@ use common_procedure::{
|
||||
BoxedProcedure, Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
|
||||
ProcedureManagerRef, Result as ProcedureResult, Status, StringKey, UserMetadata,
|
||||
};
|
||||
use common_telemetry::{error, info};
|
||||
use common_telemetry::error;
|
||||
use partition::expr::PartitionExpr;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
@@ -232,10 +232,7 @@ impl Context {
|
||||
&new_region_routes,
|
||||
table_id,
|
||||
)?;
|
||||
info!(
|
||||
"Updating table route for table: {}, new region routes: {:?}",
|
||||
table_id, new_region_routes
|
||||
);
|
||||
|
||||
self.table_metadata_manager
|
||||
.update_table_route(
|
||||
table_id,
|
||||
@@ -265,13 +262,6 @@ impl Context {
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the next operation timeout.
|
||||
///
|
||||
/// If the next operation timeout is not set, it will return `None`.
|
||||
pub fn next_operation_timeout(&self) -> Option<std::time::Duration> {
|
||||
Some(std::time::Duration::from_secs(10))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -345,13 +335,6 @@ impl Procedure for RepartitionProcedure {
|
||||
|
||||
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
|
||||
let state = &mut self.state;
|
||||
let state_name = state.name();
|
||||
// Log state transition
|
||||
common_telemetry::info!(
|
||||
"Repartition procedure executing state: {}, table_id: {}",
|
||||
state_name,
|
||||
self.context.persistent_ctx.table_id
|
||||
);
|
||||
match state.next(&mut self.context, _ctx).await {
|
||||
Ok((next, status)) => {
|
||||
*state = next;
|
||||
|
||||
@@ -65,12 +65,6 @@ impl State for AllocateRegion {
|
||||
&mut next_region_number,
|
||||
&self.plan_entries,
|
||||
);
|
||||
let plan_count = repartition_plan_entries.len();
|
||||
let to_allocate = Self::count_regions_to_allocate(&repartition_plan_entries);
|
||||
info!(
|
||||
"Repartition allocate regions start, table_id: {}, groups: {}, regions_to_allocate: {}",
|
||||
table_id, plan_count, to_allocate
|
||||
);
|
||||
|
||||
// If no region to allocate, directly dispatch the plan.
|
||||
if Self::count_regions_to_allocate(&repartition_plan_entries) == 0 {
|
||||
@@ -105,20 +99,6 @@ impl State for AllocateRegion {
|
||||
.await
|
||||
.context(error::AllocateWalOptionsSnafu { table_id })?;
|
||||
|
||||
let new_region_count = new_allocated_region_routes.len();
|
||||
let new_regions_brief: Vec<_> = new_allocated_region_routes
|
||||
.iter()
|
||||
.map(|route| {
|
||||
let region_id = route.region.id;
|
||||
let peer = route.leader_peer.as_ref().map(|p| p.id).unwrap_or_default();
|
||||
format!("region_id: {}, peer: {}", region_id, peer)
|
||||
})
|
||||
.collect();
|
||||
info!(
|
||||
"Allocated regions for repartition, table_id: {}, new_region_count: {}, new_regions: {:?}",
|
||||
table_id, new_region_count, new_regions_brief
|
||||
);
|
||||
|
||||
let _operating_guards = Self::register_operating_regions(
|
||||
&ctx.memory_region_keeper,
|
||||
&new_allocated_region_routes,
|
||||
@@ -157,6 +137,7 @@ impl AllocateRegion {
|
||||
Self { plan_entries }
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn register_operating_regions(
|
||||
memory_region_keeper: &MemoryRegionKeeperRef,
|
||||
region_routes: &[RegionRoute],
|
||||
@@ -174,6 +155,7 @@ impl AllocateRegion {
|
||||
Ok(operating_guards)
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn generate_region_routes(
|
||||
region_routes: &[RegionRoute],
|
||||
new_allocated_region_ids: &[RegionRoute],
|
||||
@@ -195,6 +177,7 @@ impl AllocateRegion {
|
||||
///
|
||||
/// This method takes the allocation plan entries and converts them to repartition plan entries,
|
||||
/// updating `next_region_number` for each newly allocated region.
|
||||
#[allow(dead_code)]
|
||||
fn convert_to_repartition_plans(
|
||||
table_id: TableId,
|
||||
next_region_number: &mut RegionNumber,
|
||||
@@ -213,6 +196,7 @@ impl AllocateRegion {
|
||||
}
|
||||
|
||||
/// Collects all regions that need to be allocated from the repartition plan entries.
|
||||
#[allow(dead_code)]
|
||||
fn collect_allocate_regions(
|
||||
repartition_plan_entries: &[RepartitionPlanEntry],
|
||||
) -> Vec<&RegionDescriptor> {
|
||||
@@ -223,6 +207,7 @@ impl AllocateRegion {
|
||||
}
|
||||
|
||||
/// Prepares region allocation data: region numbers and their partition expressions.
|
||||
#[allow(dead_code)]
|
||||
fn prepare_region_allocation_data(
|
||||
allocate_regions: &[&RegionDescriptor],
|
||||
) -> Result<Vec<(RegionNumber, String)>> {
|
||||
@@ -240,6 +225,7 @@ impl AllocateRegion {
|
||||
}
|
||||
|
||||
/// Calculates the total number of regions that need to be allocated.
|
||||
#[allow(dead_code)]
|
||||
fn count_regions_to_allocate(repartition_plan_entries: &[RepartitionPlanEntry]) -> usize {
|
||||
repartition_plan_entries
|
||||
.iter()
|
||||
@@ -248,10 +234,12 @@ impl AllocateRegion {
|
||||
}
|
||||
|
||||
/// Gets the next region number from the physical table route.
|
||||
#[allow(dead_code)]
|
||||
fn get_next_region_number(max_region_number: RegionNumber) -> RegionNumber {
|
||||
max_region_number + 1
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
async fn allocate_regions(
|
||||
node_manager: &NodeManagerRef,
|
||||
raw_table_info: &RawTableInfo,
|
||||
@@ -264,14 +252,12 @@ impl AllocateRegion {
|
||||
&raw_table_info.name,
|
||||
);
|
||||
let table_id = raw_table_info.ident.table_id;
|
||||
let request = build_template_from_raw_table_info(raw_table_info, true)
|
||||
let request = build_template_from_raw_table_info(raw_table_info)
|
||||
.context(error::BuildCreateRequestSnafu { table_id })?;
|
||||
let builder = CreateRequestBuilder::new(request, None);
|
||||
let region_count = region_routes.len();
|
||||
let wal_region_count = wal_options.len();
|
||||
info!(
|
||||
"Allocating regions on datanodes, table_id: {}, region_count: {}, wal_regions: {}",
|
||||
table_id, region_count, wal_region_count
|
||||
"Allocating regions for table: {}, region_routes: {:?}, wal_options: {:?}",
|
||||
table_id, region_routes, wal_options
|
||||
);
|
||||
let executor = CreateTableExecutor::new(table_ref.into(), false, builder);
|
||||
executor
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use std::any::Any;
|
||||
|
||||
use common_procedure::{Context as ProcedureContext, ProcedureId, Status, watcher};
|
||||
use common_telemetry::{error, info};
|
||||
use common_telemetry::error;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::ResultExt;
|
||||
|
||||
@@ -64,10 +64,9 @@ impl Collect {
|
||||
impl State for Collect {
|
||||
async fn next(
|
||||
&mut self,
|
||||
ctx: &mut Context,
|
||||
_ctx: &mut Context,
|
||||
procedure_ctx: &ProcedureContext,
|
||||
) -> Result<(Box<dyn State>, Status)> {
|
||||
let table_id = ctx.persistent_ctx.table_id;
|
||||
for procedure_meta in self.inflight_procedures.iter() {
|
||||
let procedure_id = procedure_meta.procedure_id;
|
||||
let group_id = procedure_meta.group_id;
|
||||
@@ -94,16 +93,7 @@ impl State for Collect {
|
||||
}
|
||||
}
|
||||
|
||||
let inflight = self.inflight_procedures.len();
|
||||
let succeeded = self.succeeded_procedures.len();
|
||||
let failed = self.failed_procedures.len();
|
||||
let unknown = self.unknown_procedures.len();
|
||||
info!(
|
||||
"Collected repartition group results for table_id: {}, inflight: {}, succeeded: {}, failed: {}, unknown: {}",
|
||||
table_id, inflight, succeeded, failed, unknown
|
||||
);
|
||||
|
||||
if failed > 0 || unknown > 0 {
|
||||
if !self.failed_procedures.is_empty() || !self.unknown_procedures.is_empty() {
|
||||
// TODO(weny): retry the failed or unknown procedures.
|
||||
}
|
||||
|
||||
|
||||
@@ -62,10 +62,9 @@ impl State for DeallocateRegion {
|
||||
.flat_map(|p| p.pending_deallocate_region_ids.iter())
|
||||
.cloned()
|
||||
.collect::<HashSet<_>>();
|
||||
let dealloc_count = pending_deallocate_region_ids.len();
|
||||
info!(
|
||||
"Deallocating regions for repartition, table_id: {}, count: {}, regions: {:?}",
|
||||
table_id, dealloc_count, pending_deallocate_region_ids
|
||||
"Deallocating regions: {:?} for table: {} during repartition procedure",
|
||||
pending_deallocate_region_ids, table_id
|
||||
);
|
||||
|
||||
let table_lock = TableLock::Write(table_id).into();
|
||||
@@ -112,6 +111,7 @@ impl State for DeallocateRegion {
|
||||
}
|
||||
|
||||
impl DeallocateRegion {
|
||||
#[allow(dead_code)]
|
||||
async fn deallocate_regions(
|
||||
node_manager: &NodeManagerRef,
|
||||
leader_region_registry: &LeaderRegionRegistryRef,
|
||||
@@ -136,6 +136,7 @@ impl DeallocateRegion {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn filter_deallocatable_region_routes(
|
||||
table_id: TableId,
|
||||
region_routes: &[RegionRoute],
|
||||
@@ -160,6 +161,7 @@ impl DeallocateRegion {
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn generate_region_routes(
|
||||
region_routes: &[RegionRoute],
|
||||
pending_deallocate_region_ids: &HashSet<RegionId>,
|
||||
|
||||
@@ -16,9 +16,7 @@ use std::any::Any;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use common_procedure::{Context as ProcedureContext, ProcedureWithId, Status};
|
||||
use common_telemetry::info;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::error::Result;
|
||||
@@ -30,6 +28,7 @@ use crate::procedure::repartition::{self, Context, State};
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Dispatch;
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn build_region_mapping(
|
||||
source_regions: &[RegionDescriptor],
|
||||
target_regions: &[RegionDescriptor],
|
||||
@@ -58,12 +57,8 @@ impl State for Dispatch {
|
||||
_procedure_ctx: &ProcedureContext,
|
||||
) -> Result<(Box<dyn State>, Status)> {
|
||||
let table_id = ctx.persistent_ctx.table_id;
|
||||
let table_info_value = ctx.get_table_info_value().await?;
|
||||
let table_engine = table_info_value.table_info.meta.engine;
|
||||
let sync_region = table_engine == METRIC_ENGINE_NAME;
|
||||
let plan_count = ctx.persistent_ctx.plans.len();
|
||||
let mut procedures = Vec::with_capacity(plan_count);
|
||||
let mut procedure_metas = Vec::with_capacity(plan_count);
|
||||
let mut procedures = Vec::with_capacity(ctx.persistent_ctx.plans.len());
|
||||
let mut procedure_metas = Vec::with_capacity(ctx.persistent_ctx.plans.len());
|
||||
for (plan_index, plan) in ctx.persistent_ctx.plans.iter().enumerate() {
|
||||
let region_mapping = build_region_mapping(
|
||||
&plan.source_regions,
|
||||
@@ -78,9 +73,6 @@ impl State for Dispatch {
|
||||
plan.source_regions.clone(),
|
||||
plan.target_regions.clone(),
|
||||
region_mapping,
|
||||
sync_region,
|
||||
plan.allocated_region_ids.clone(),
|
||||
plan.pending_deallocate_region_ids.clone(),
|
||||
);
|
||||
|
||||
let group_procedure = RepartitionGroupProcedure::new(persistent_ctx, ctx);
|
||||
@@ -93,14 +85,6 @@ impl State for Dispatch {
|
||||
procedures.push(procedure);
|
||||
}
|
||||
|
||||
let group_ids: Vec<_> = procedure_metas.iter().map(|m| m.group_id).collect();
|
||||
info!(
|
||||
"Dispatch repartition groups for table_id: {}, group_count: {}, group_ids: {:?}",
|
||||
table_id,
|
||||
group_ids.len(),
|
||||
group_ids
|
||||
);
|
||||
|
||||
Ok((
|
||||
Box::new(Collect::new(procedure_metas)),
|
||||
Status::suspended(procedures, true),
|
||||
|
||||
@@ -17,7 +17,6 @@ pub(crate) mod enter_staging_region;
|
||||
pub(crate) mod remap_manifest;
|
||||
pub(crate) mod repartition_end;
|
||||
pub(crate) mod repartition_start;
|
||||
pub(crate) mod sync_region;
|
||||
pub(crate) mod update_metadata;
|
||||
pub(crate) mod utils;
|
||||
|
||||
@@ -41,7 +40,7 @@ use common_procedure::{
|
||||
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
|
||||
Result as ProcedureResult, Status, StringKey, UserMetadata,
|
||||
};
|
||||
use common_telemetry::{error, info};
|
||||
use common_telemetry::error;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::{RegionId, TableId};
|
||||
@@ -56,6 +55,7 @@ use crate::service::mailbox::MailboxRef;
|
||||
|
||||
pub type GroupId = Uuid;
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub struct RepartitionGroupProcedure {
|
||||
state: Box<dyn State>,
|
||||
context: Context,
|
||||
@@ -113,14 +113,6 @@ impl Procedure for RepartitionGroupProcedure {
|
||||
|
||||
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
|
||||
let state = &mut self.state;
|
||||
let state_name = state.name();
|
||||
// Log state transition
|
||||
common_telemetry::info!(
|
||||
"Repartition group procedure executing state: {}, group id: {}, table id: {}",
|
||||
state_name,
|
||||
self.context.persistent_ctx.group_id,
|
||||
self.context.persistent_ctx.table_id
|
||||
);
|
||||
|
||||
match state.next(&mut self.context, _ctx).await {
|
||||
Ok((next, status)) => {
|
||||
@@ -229,16 +221,9 @@ pub struct PersistentContext {
|
||||
/// The staging manifest paths of the repartition group.
|
||||
/// The value will be set in [RemapManifest](crate::procedure::repartition::group::remap_manifest::RemapManifest) state.
|
||||
pub staging_manifest_paths: HashMap<RegionId, String>,
|
||||
/// Whether sync region is needed for this group.
|
||||
pub sync_region: bool,
|
||||
/// The region ids of the newly allocated regions.
|
||||
pub allocated_region_ids: Vec<RegionId>,
|
||||
/// The region ids of the regions that are pending deallocation.
|
||||
pub pending_deallocate_region_ids: Vec<RegionId>,
|
||||
}
|
||||
|
||||
impl PersistentContext {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
group_id: GroupId,
|
||||
table_id: TableId,
|
||||
@@ -247,9 +232,6 @@ impl PersistentContext {
|
||||
sources: Vec<RegionDescriptor>,
|
||||
targets: Vec<RegionDescriptor>,
|
||||
region_mapping: HashMap<RegionId, Vec<RegionId>>,
|
||||
sync_region: bool,
|
||||
allocated_region_ids: Vec<RegionId>,
|
||||
pending_deallocate_region_ids: Vec<RegionId>,
|
||||
) -> Self {
|
||||
Self {
|
||||
group_id,
|
||||
@@ -261,9 +243,6 @@ impl PersistentContext {
|
||||
region_mapping,
|
||||
group_prepare_result: None,
|
||||
staging_manifest_paths: HashMap::new(),
|
||||
sync_region,
|
||||
allocated_region_ids,
|
||||
pending_deallocate_region_ids,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -355,7 +334,6 @@ impl Context {
|
||||
new_region_routes: Vec<RegionRoute>,
|
||||
) -> Result<()> {
|
||||
let table_id = self.persistent_ctx.table_id;
|
||||
let group_id = self.persistent_ctx.group_id;
|
||||
// Safety: prepare result is set in [RepartitionStart] state.
|
||||
let prepare_result = self.persistent_ctx.group_prepare_result.as_ref().unwrap();
|
||||
let central_region_datanode_table_value = self
|
||||
@@ -367,10 +345,6 @@ impl Context {
|
||||
..
|
||||
} = ¢ral_region_datanode_table_value.region_info;
|
||||
|
||||
info!(
|
||||
"Updating table route for table: {}, group_id: {}, new region routes: {:?}",
|
||||
table_id, group_id, new_region_routes
|
||||
);
|
||||
self.table_metadata_manager
|
||||
.update_table_route(
|
||||
table_id,
|
||||
|
||||
@@ -31,7 +31,7 @@ use store_api::storage::RegionId;
|
||||
|
||||
use crate::error::{self, Error, Result};
|
||||
use crate::handler::HeartbeatMailbox;
|
||||
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
|
||||
use crate::procedure::repartition::group::repartition_end::RepartitionEnd;
|
||||
use crate::procedure::repartition::group::utils::{
|
||||
HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results,
|
||||
};
|
||||
@@ -52,10 +52,7 @@ impl State for ApplyStagingManifest {
|
||||
) -> Result<(Box<dyn State>, Status)> {
|
||||
self.apply_staging_manifests(ctx).await?;
|
||||
|
||||
Ok((
|
||||
Box::new(UpdateMetadata::ExitStaging),
|
||||
Status::executing(true),
|
||||
))
|
||||
Ok((Box::new(RepartitionEnd), Status::executing(true)))
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
@@ -128,6 +125,7 @@ impl ApplyStagingManifest {
|
||||
})
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
async fn apply_staging_manifests(&self, ctx: &mut Context) -> Result<()> {
|
||||
let table_id = ctx.persistent_ctx.table_id;
|
||||
let group_id = ctx.persistent_ctx.group_id;
|
||||
@@ -152,7 +150,6 @@ impl ApplyStagingManifest {
|
||||
operation: "Apply staging manifests",
|
||||
})?;
|
||||
|
||||
let instruction_region_count: usize = instructions.values().map(|v| v.len()).sum();
|
||||
let (peers, tasks): (Vec<_>, Vec<_>) = instructions
|
||||
.iter()
|
||||
.map(|(peer, apply_staging_manifests)| {
|
||||
@@ -169,11 +166,8 @@ impl ApplyStagingManifest {
|
||||
})
|
||||
.unzip();
|
||||
info!(
|
||||
"Sent apply staging manifests instructions, table_id: {}, group_id: {}, peers: {}, regions: {}",
|
||||
table_id,
|
||||
group_id,
|
||||
peers.len(),
|
||||
instruction_region_count
|
||||
"Sent apply staging manifests instructions to peers: {:?} for repartition table {}, group id {}",
|
||||
peers, table_id, group_id
|
||||
);
|
||||
|
||||
let format_err_msg = |idx: usize, error: &Error| {
|
||||
@@ -298,7 +292,11 @@ impl ApplyStagingManifest {
|
||||
match receiver.await {
|
||||
Ok(msg) => {
|
||||
let reply = HeartbeatMailbox::json_reply(&msg)?;
|
||||
let elapsed = now.elapsed();
|
||||
info!(
|
||||
"Received apply staging manifests reply: {:?}, elapsed: {:?}",
|
||||
reply,
|
||||
now.elapsed()
|
||||
);
|
||||
let InstructionReply::ApplyStagingManifests(ApplyStagingManifestsReply { replies }) =
|
||||
reply
|
||||
else {
|
||||
@@ -308,23 +306,9 @@ impl ApplyStagingManifest {
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
let total = replies.len();
|
||||
let (mut ready, mut not_ready, mut with_error) = (0, 0, 0);
|
||||
let region_ids = replies.iter().map(|r| r.region_id).collect::<Vec<_>>();
|
||||
for reply in replies {
|
||||
if reply.error.is_some() {
|
||||
with_error += 1;
|
||||
} else if reply.ready {
|
||||
ready += 1;
|
||||
} else {
|
||||
not_ready += 1;
|
||||
}
|
||||
Self::handle_apply_staging_manifest_reply(&reply, &now, peer)?;
|
||||
}
|
||||
info!(
|
||||
"Received apply staging manifests reply, peer: {:?}, total_regions: {}, regions:{:?}, ready: {}, not_ready: {}, with_error: {}, elapsed: {:?}",
|
||||
peer, total, region_ids, ready, not_ready, with_error, elapsed
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -23,7 +23,7 @@ use common_meta::instruction::{
|
||||
use common_meta::peer::Peer;
|
||||
use common_procedure::{Context as ProcedureContext, Status};
|
||||
use common_telemetry::info;
|
||||
use futures::future::{join_all, try_join_all};
|
||||
use futures::future::join_all;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{OptionExt, ResultExt, ensure};
|
||||
|
||||
@@ -35,7 +35,6 @@ use crate::procedure::repartition::group::utils::{
|
||||
};
|
||||
use crate::procedure::repartition::group::{Context, GroupPrepareResult, State};
|
||||
use crate::procedure::repartition::plan::RegionDescriptor;
|
||||
use crate::procedure::utils::{self, ErrorStrategy};
|
||||
use crate::service::mailbox::{Channel, MailboxRef};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
@@ -49,7 +48,6 @@ impl State for EnterStagingRegion {
|
||||
ctx: &mut Context,
|
||||
_procedure_ctx: &ProcedureContext,
|
||||
) -> Result<(Box<dyn State>, Status)> {
|
||||
self.flush_pending_deallocate_regions(ctx).await?;
|
||||
self.enter_staging_regions(ctx).await?;
|
||||
|
||||
Ok((Box::new(RemapManifest), Status::executing(true)))
|
||||
@@ -96,6 +94,7 @@ impl EnterStagingRegion {
|
||||
Ok(instructions)
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
async fn enter_staging_regions(&self, ctx: &mut Context) -> Result<()> {
|
||||
let table_id = ctx.persistent_ctx.table_id;
|
||||
let group_id = ctx.persistent_ctx.group_id;
|
||||
@@ -103,8 +102,6 @@ impl EnterStagingRegion {
|
||||
let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
|
||||
let targets = &ctx.persistent_ctx.targets;
|
||||
let instructions = Self::build_enter_staging_instructions(prepare_result, targets)?;
|
||||
let target_region_count = targets.len();
|
||||
let peer_count = instructions.len();
|
||||
let operation_timeout =
|
||||
ctx.next_operation_timeout()
|
||||
.context(error::ExceededDeadlineSnafu {
|
||||
@@ -126,8 +123,8 @@ impl EnterStagingRegion {
|
||||
})
|
||||
.unzip();
|
||||
info!(
|
||||
"Sent enter staging regions instructions, table_id: {}, group_id: {}, peers: {}, target_regions: {}",
|
||||
table_id, group_id, peer_count, target_region_count
|
||||
"Sent enter staging regions instructions to peers: {:?} for repartition table {}, group id {}",
|
||||
peers, table_id, group_id
|
||||
);
|
||||
|
||||
let format_err_msg = |idx: usize, error: &Error| {
|
||||
@@ -245,7 +242,11 @@ impl EnterStagingRegion {
|
||||
match receiver.await {
|
||||
Ok(msg) => {
|
||||
let reply = HeartbeatMailbox::json_reply(&msg)?;
|
||||
let elapsed = now.elapsed();
|
||||
info!(
|
||||
"Received enter staging regions reply: {:?}, elapsed: {:?}",
|
||||
reply,
|
||||
now.elapsed()
|
||||
);
|
||||
let InstructionReply::EnterStagingRegions(EnterStagingRegionsReply { replies }) =
|
||||
reply
|
||||
else {
|
||||
@@ -255,22 +256,9 @@ impl EnterStagingRegion {
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
let total = replies.len();
|
||||
let (mut ready, mut not_ready, mut with_error) = (0, 0, 0);
|
||||
for reply in replies {
|
||||
if reply.error.is_some() {
|
||||
with_error += 1;
|
||||
} else if reply.ready {
|
||||
ready += 1;
|
||||
} else {
|
||||
not_ready += 1;
|
||||
}
|
||||
Self::handle_enter_staging_region_reply(&reply, &now, peer)?;
|
||||
}
|
||||
info!(
|
||||
"Received enter staging regions reply, peer: {:?}, total_regions: {}, ready: {}, not_ready: {}, with_error: {}, elapsed: {:?}",
|
||||
peer, total, ready, not_ready, with_error, elapsed
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -332,61 +320,6 @@ impl EnterStagingRegion {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn flush_pending_deallocate_regions(&self, ctx: &mut Context) -> Result<()> {
|
||||
let pending_deallocate_region_ids = &ctx.persistent_ctx.pending_deallocate_region_ids;
|
||||
if pending_deallocate_region_ids.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let table_id = ctx.persistent_ctx.table_id;
|
||||
let group_id = ctx.persistent_ctx.group_id;
|
||||
let operation_timeout =
|
||||
ctx.next_operation_timeout()
|
||||
.context(error::ExceededDeadlineSnafu {
|
||||
operation: "Flush pending deallocate regions",
|
||||
})?;
|
||||
let result = &ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
|
||||
let source_routes = result
|
||||
.source_routes
|
||||
.iter()
|
||||
.filter(|route| pending_deallocate_region_ids.contains(&route.region.id))
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
let peer_region_ids_map = group_region_routes_by_peer(&source_routes);
|
||||
info!(
|
||||
"Flushing pending deallocate regions, table_id: {}, group_id: {}, peer_region_ids_map: {:?}",
|
||||
table_id, group_id, peer_region_ids_map
|
||||
);
|
||||
let now = Instant::now();
|
||||
let tasks = peer_region_ids_map
|
||||
.iter()
|
||||
.map(|(peer, region_ids)| {
|
||||
utils::flush_region(
|
||||
&ctx.mailbox,
|
||||
&ctx.server_addr,
|
||||
region_ids,
|
||||
peer,
|
||||
operation_timeout,
|
||||
ErrorStrategy::Retry,
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
try_join_all(tasks).await?;
|
||||
info!(
|
||||
"Flushed pending deallocate regions: {:?}, table_id: {}, group_id: {}, elapsed: {:?}",
|
||||
source_routes
|
||||
.iter()
|
||||
.map(|route| route.region.id)
|
||||
.collect::<Vec<_>>(),
|
||||
table_id,
|
||||
group_id,
|
||||
now.elapsed()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -65,13 +65,6 @@ impl State for RemapManifest {
|
||||
.await?;
|
||||
let table_id = ctx.persistent_ctx.table_id;
|
||||
let group_id = ctx.persistent_ctx.group_id;
|
||||
let manifest_count = manifest_paths.len();
|
||||
let input_region_count = ctx.persistent_ctx.sources.len();
|
||||
let target_region_count = ctx.persistent_ctx.targets.len();
|
||||
info!(
|
||||
"Remap manifests finished for repartition, table_id: {}, group_id: {}, input_regions: {}, target_regions: {}, manifest_paths: {}",
|
||||
table_id, group_id, input_region_count, target_region_count, manifest_count
|
||||
);
|
||||
|
||||
if manifest_paths.len() != ctx.persistent_ctx.targets.len() {
|
||||
warn!(
|
||||
@@ -163,7 +156,11 @@ impl RemapManifest {
|
||||
match receiver.await {
|
||||
Ok(msg) => {
|
||||
let reply = HeartbeatMailbox::json_reply(&msg)?;
|
||||
let elapsed = now.elapsed();
|
||||
info!(
|
||||
"Received remap manifest reply: {:?}, elapsed: {:?}",
|
||||
reply,
|
||||
now.elapsed()
|
||||
);
|
||||
let InstructionReply::RemapManifest(reply) = reply else {
|
||||
return error::UnexpectedInstructionReplySnafu {
|
||||
mailbox_message: msg.to_string(),
|
||||
@@ -171,11 +168,6 @@ impl RemapManifest {
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
let manifest_count = reply.manifest_paths.len();
|
||||
info!(
|
||||
"Received remap manifest reply for central_region: {}, manifest_paths: {}, elapsed: {:?}",
|
||||
remap.region_id, manifest_count, elapsed
|
||||
);
|
||||
|
||||
Self::handle_remap_manifest_reply(remap.region_id, reply, &now, peer)
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::HashMap;
|
||||
|
||||
use common_meta::rpc::router::RegionRoute;
|
||||
use common_procedure::{Context as ProcedureContext, Status};
|
||||
@@ -22,7 +22,6 @@ use serde::{Deserialize, Serialize};
|
||||
use snafu::{OptionExt, ResultExt, ensure};
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::procedure::repartition::group::sync_region::SyncRegion;
|
||||
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
|
||||
use crate::procedure::repartition::group::{
|
||||
Context, GroupId, GroupPrepareResult, State, region_routes,
|
||||
@@ -57,6 +56,7 @@ impl RepartitionStart {
|
||||
/// Ensures that both source and target regions are present in the region routes.
|
||||
///
|
||||
/// Both source and target regions must be present in the region routes (target regions should be allocated before repartitioning).
|
||||
#[allow(dead_code)]
|
||||
fn ensure_route_present(
|
||||
group_id: GroupId,
|
||||
region_routes: &[RegionRoute],
|
||||
@@ -172,28 +172,6 @@ impl State for RepartitionStart {
|
||||
ctx.persistent_ctx.targets.len()
|
||||
);
|
||||
|
||||
if ctx.persistent_ctx.sync_region {
|
||||
let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
|
||||
let allocated_region_ids: HashSet<_> = ctx
|
||||
.persistent_ctx
|
||||
.allocated_region_ids
|
||||
.iter()
|
||||
.copied()
|
||||
.collect();
|
||||
let region_routes: Vec<_> = prepare_result
|
||||
.target_routes
|
||||
.iter()
|
||||
.filter(|route| allocated_region_ids.contains(&route.region.id))
|
||||
.cloned()
|
||||
.collect();
|
||||
if !region_routes.is_empty() {
|
||||
return Ok((
|
||||
Box::new(SyncRegion { region_routes }),
|
||||
Status::executing(true),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
Ok((
|
||||
Box::new(UpdateMetadata::ApplyStaging),
|
||||
Status::executing(true),
|
||||
|
||||
@@ -1,445 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::collections::HashMap;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use api::v1::meta::MailboxMessage;
|
||||
use common_meta::instruction::{Instruction, InstructionReply, SyncRegionReply, SyncRegionsReply};
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::rpc::router::RegionRoute;
|
||||
use common_procedure::{Context as ProcedureContext, Status};
|
||||
use common_telemetry::info;
|
||||
use futures::future::join_all;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{OptionExt, ResultExt, ensure};
|
||||
use store_api::region_engine::SyncRegionFromRequest;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::error::{self, Error, Result};
|
||||
use crate::handler::HeartbeatMailbox;
|
||||
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
|
||||
use crate::procedure::repartition::group::utils::{
|
||||
HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results,
|
||||
};
|
||||
use crate::procedure::repartition::group::{Context, State};
|
||||
use crate::procedure::utils::ErrorStrategy;
|
||||
use crate::service::mailbox::{Channel, MailboxRef};
|
||||
|
||||
const DEFAULT_SYNC_REGION_PARALLELISM: usize = 3;
|
||||
|
||||
/// The state of syncing regions for a repartition group.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct SyncRegion {
|
||||
pub region_routes: Vec<RegionRoute>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
#[typetag::serde]
|
||||
impl State for SyncRegion {
|
||||
async fn next(
|
||||
&mut self,
|
||||
ctx: &mut Context,
|
||||
_procedure_ctx: &ProcedureContext,
|
||||
) -> Result<(Box<dyn State>, Status)> {
|
||||
Self::flush_central_region(ctx).await?;
|
||||
self.sync_regions(ctx).await?;
|
||||
|
||||
Ok((
|
||||
Box::new(UpdateMetadata::ApplyStaging),
|
||||
Status::executing(true),
|
||||
))
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl SyncRegion {
|
||||
async fn flush_central_region(ctx: &mut Context) -> Result<()> {
|
||||
let operation_timeout =
|
||||
ctx.next_operation_timeout()
|
||||
.context(error::ExceededDeadlineSnafu {
|
||||
operation: "Flush central region",
|
||||
})?;
|
||||
let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
|
||||
|
||||
crate::procedure::utils::flush_region(
|
||||
&ctx.mailbox,
|
||||
&ctx.server_addr,
|
||||
&[prepare_result.central_region],
|
||||
&prepare_result.central_region_datanode,
|
||||
operation_timeout,
|
||||
ErrorStrategy::Retry,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Builds instructions to sync regions on datanodes.
|
||||
fn build_sync_region_instructions(
|
||||
central_region: RegionId,
|
||||
region_routes: &[RegionRoute],
|
||||
) -> HashMap<Peer, Vec<common_meta::instruction::SyncRegion>> {
|
||||
let target_region_routes_by_peer = group_region_routes_by_peer(region_routes);
|
||||
let mut instructions = HashMap::with_capacity(target_region_routes_by_peer.len());
|
||||
|
||||
for (peer, region_ids) in target_region_routes_by_peer {
|
||||
let sync_regions = region_ids
|
||||
.into_iter()
|
||||
.map(|region_id| {
|
||||
let request = SyncRegionFromRequest::FromRegion {
|
||||
source_region_id: central_region,
|
||||
parallelism: DEFAULT_SYNC_REGION_PARALLELISM,
|
||||
};
|
||||
common_meta::instruction::SyncRegion { region_id, request }
|
||||
})
|
||||
.collect();
|
||||
instructions.insert((*peer).clone(), sync_regions);
|
||||
}
|
||||
|
||||
instructions
|
||||
}
|
||||
|
||||
/// Syncs regions on datanodes.
|
||||
async fn sync_regions(&self, ctx: &mut Context) -> Result<()> {
|
||||
let table_id = ctx.persistent_ctx.table_id;
|
||||
let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
|
||||
let instructions = Self::build_sync_region_instructions(
|
||||
prepare_result.central_region,
|
||||
&self.region_routes,
|
||||
);
|
||||
let operation_timeout =
|
||||
ctx.next_operation_timeout()
|
||||
.context(error::ExceededDeadlineSnafu {
|
||||
operation: "Sync regions",
|
||||
})?;
|
||||
|
||||
let (peers, tasks): (Vec<_>, Vec<_>) = instructions
|
||||
.iter()
|
||||
.map(|(peer, sync_regions)| {
|
||||
(
|
||||
peer,
|
||||
Self::sync_region(
|
||||
&ctx.mailbox,
|
||||
&ctx.server_addr,
|
||||
peer,
|
||||
sync_regions,
|
||||
operation_timeout,
|
||||
),
|
||||
)
|
||||
})
|
||||
.unzip();
|
||||
|
||||
info!(
|
||||
"Sent sync regions instructions to peers: {:?} for repartition table {}",
|
||||
peers, table_id
|
||||
);
|
||||
|
||||
let format_err_msg = |idx: usize, error: &Error| {
|
||||
let peer = peers[idx];
|
||||
format!(
|
||||
"Failed to sync regions on datanode {:?}, error: {:?}",
|
||||
peer, error
|
||||
)
|
||||
};
|
||||
|
||||
let results = join_all(tasks).await;
|
||||
let result = handle_multiple_results(&results);
|
||||
|
||||
match result {
|
||||
HandleMultipleResult::AllSuccessful => Ok(()),
|
||||
HandleMultipleResult::AllRetryable(retryable_errors) => error::RetryLaterSnafu {
|
||||
reason: format!(
|
||||
"All retryable errors during syncing regions for repartition table {}: {:?}",
|
||||
table_id,
|
||||
retryable_errors
|
||||
.iter()
|
||||
.map(|(idx, error)| format_err_msg(*idx, error))
|
||||
.collect::<Vec<_>>()
|
||||
.join(",")
|
||||
),
|
||||
}
|
||||
.fail(),
|
||||
HandleMultipleResult::AllNonRetryable(non_retryable_errors) => error::UnexpectedSnafu {
|
||||
violated: format!(
|
||||
"All non retryable errors during syncing regions for repartition table {}: {:?}",
|
||||
table_id,
|
||||
non_retryable_errors
|
||||
.iter()
|
||||
.map(|(idx, error)| format_err_msg(*idx, error))
|
||||
.collect::<Vec<_>>()
|
||||
.join(",")
|
||||
),
|
||||
}
|
||||
.fail(),
|
||||
HandleMultipleResult::PartialRetryable {
|
||||
retryable_errors,
|
||||
non_retryable_errors,
|
||||
} => error::UnexpectedSnafu {
|
||||
violated: format!(
|
||||
"Partial retryable errors during syncing regions for repartition table {}: {:?}, non retryable errors: {:?}",
|
||||
table_id,
|
||||
retryable_errors
|
||||
.iter()
|
||||
.map(|(idx, error)| format_err_msg(*idx, error))
|
||||
.collect::<Vec<_>>()
|
||||
.join(","),
|
||||
non_retryable_errors
|
||||
.iter()
|
||||
.map(|(idx, error)| format_err_msg(*idx, error))
|
||||
.collect::<Vec<_>>()
|
||||
.join(","),
|
||||
),
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Syncs regions on a datanode.
|
||||
async fn sync_region(
|
||||
mailbox: &MailboxRef,
|
||||
server_addr: &str,
|
||||
peer: &Peer,
|
||||
sync_regions: &[common_meta::instruction::SyncRegion],
|
||||
timeout: Duration,
|
||||
) -> Result<()> {
|
||||
let ch = Channel::Datanode(peer.id);
|
||||
let instruction = Instruction::SyncRegions(sync_regions.to_vec());
|
||||
let message = MailboxMessage::json_message(
|
||||
&format!(
|
||||
"Sync regions: {:?}",
|
||||
sync_regions.iter().map(|r| r.region_id).collect::<Vec<_>>()
|
||||
),
|
||||
&format!("Metasrv@{}", server_addr),
|
||||
&format!("Datanode-{}@{}", peer.id, peer.addr),
|
||||
common_time::util::current_time_millis(),
|
||||
&instruction,
|
||||
)
|
||||
.with_context(|_| error::SerializeToJsonSnafu {
|
||||
input: instruction.to_string(),
|
||||
})?;
|
||||
|
||||
let now = std::time::Instant::now();
|
||||
let receiver = mailbox.send(&ch, message, timeout).await;
|
||||
|
||||
let receiver = match receiver {
|
||||
Ok(receiver) => receiver,
|
||||
Err(error::Error::PusherNotFound { .. }) => error::RetryLaterSnafu {
|
||||
reason: format!(
|
||||
"Pusher not found for sync regions on datanode {:?}, elapsed: {:?}",
|
||||
peer,
|
||||
now.elapsed()
|
||||
),
|
||||
}
|
||||
.fail()?,
|
||||
Err(err) => {
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
match receiver.await {
|
||||
Ok(msg) => {
|
||||
let reply = HeartbeatMailbox::json_reply(&msg)?;
|
||||
info!(
|
||||
"Received sync regions reply: {:?}, elapsed: {:?}",
|
||||
reply,
|
||||
now.elapsed()
|
||||
);
|
||||
let InstructionReply::SyncRegions(SyncRegionsReply { replies }) = reply else {
|
||||
return error::UnexpectedInstructionReplySnafu {
|
||||
mailbox_message: msg.to_string(),
|
||||
reason: "expect sync regions reply",
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
for reply in replies {
|
||||
Self::handle_sync_region_reply(&reply, &now, peer)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
Err(error::Error::MailboxTimeout { .. }) => {
|
||||
let reason = format!(
|
||||
"Mailbox received timeout for sync regions on datanode {:?}, elapsed: {:?}",
|
||||
peer,
|
||||
now.elapsed()
|
||||
);
|
||||
error::RetryLaterSnafu { reason }.fail()
|
||||
}
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_sync_region_reply(
|
||||
SyncRegionReply {
|
||||
region_id,
|
||||
ready,
|
||||
exists,
|
||||
error,
|
||||
}: &SyncRegionReply,
|
||||
now: &Instant,
|
||||
peer: &Peer,
|
||||
) -> Result<()> {
|
||||
ensure!(
|
||||
exists,
|
||||
error::UnexpectedSnafu {
|
||||
violated: format!(
|
||||
"Region {} doesn't exist on datanode {:?}, elapsed: {:?}",
|
||||
region_id,
|
||||
peer,
|
||||
now.elapsed()
|
||||
)
|
||||
}
|
||||
);
|
||||
|
||||
if let Some(error) = error {
|
||||
return error::RetryLaterSnafu {
|
||||
reason: format!(
|
||||
"Failed to sync region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
|
||||
region_id,
|
||||
peer,
|
||||
error,
|
||||
now.elapsed()
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
ensure!(
|
||||
ready,
|
||||
error::RetryLaterSnafu {
|
||||
reason: format!(
|
||||
"Region {} failed to sync on datanode {:?}, elapsed: {:?}",
|
||||
region_id,
|
||||
peer,
|
||||
now.elapsed()
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::rpc::router::{Region, RegionRoute};
|
||||
use store_api::region_engine::SyncRegionFromRequest;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::error::Error;
|
||||
use crate::procedure::repartition::group::GroupPrepareResult;
|
||||
use crate::procedure::repartition::group::sync_region::SyncRegion;
|
||||
use crate::procedure::repartition::test_util::{TestingEnv, new_persistent_context};
|
||||
use crate::procedure::test_util::{new_sync_region_reply, send_mock_reply};
|
||||
use crate::service::mailbox::Channel;
|
||||
|
||||
#[test]
|
||||
fn test_build_sync_region_instructions() {
|
||||
let table_id = 1024;
|
||||
let central_region = RegionId::new(table_id, 1);
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region {
|
||||
id: RegionId::new(table_id, 3),
|
||||
..Default::default()
|
||||
},
|
||||
leader_peer: Some(Peer::empty(1)),
|
||||
..Default::default()
|
||||
}];
|
||||
|
||||
let instructions =
|
||||
SyncRegion::build_sync_region_instructions(central_region, ®ion_routes);
|
||||
assert_eq!(instructions.len(), 1);
|
||||
let peer_instructions = instructions.get(&Peer::empty(1)).unwrap();
|
||||
assert_eq!(peer_instructions.len(), 1);
|
||||
assert_eq!(peer_instructions[0].region_id, RegionId::new(table_id, 3));
|
||||
let SyncRegionFromRequest::FromRegion {
|
||||
source_region_id, ..
|
||||
} = &peer_instructions[0].request
|
||||
else {
|
||||
panic!("expect from region request");
|
||||
};
|
||||
assert_eq!(*source_region_id, central_region);
|
||||
}
|
||||
|
||||
fn test_prepare_result(table_id: u32) -> GroupPrepareResult {
|
||||
GroupPrepareResult {
|
||||
source_routes: vec![],
|
||||
target_routes: vec![],
|
||||
central_region: RegionId::new(table_id, 1),
|
||||
central_region_datanode: Peer::empty(1),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_sync_regions_all_successful() {
|
||||
let mut env = TestingEnv::new();
|
||||
let table_id = 1024;
|
||||
let mut persistent_context = new_persistent_context(table_id, vec![], vec![]);
|
||||
persistent_context.group_prepare_result = Some(test_prepare_result(table_id));
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(1);
|
||||
env.mailbox_ctx
|
||||
.insert_heartbeat_response_receiver(Channel::Datanode(1), tx)
|
||||
.await;
|
||||
send_mock_reply(env.mailbox_ctx.mailbox().clone(), rx, |id| {
|
||||
Ok(new_sync_region_reply(
|
||||
id,
|
||||
RegionId::new(1024, 3),
|
||||
true,
|
||||
true,
|
||||
None,
|
||||
))
|
||||
});
|
||||
|
||||
let mut ctx = env.create_context(persistent_context);
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region {
|
||||
id: RegionId::new(table_id, 3),
|
||||
..Default::default()
|
||||
},
|
||||
leader_peer: Some(Peer::empty(1)),
|
||||
..Default::default()
|
||||
}];
|
||||
let sync_region = SyncRegion { region_routes };
|
||||
|
||||
sync_region.sync_regions(&mut ctx).await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_sync_regions_retryable() {
|
||||
let env = TestingEnv::new();
|
||||
let table_id = 1024;
|
||||
let mut persistent_context = new_persistent_context(table_id, vec![], vec![]);
|
||||
persistent_context.group_prepare_result = Some(test_prepare_result(table_id));
|
||||
|
||||
let mut ctx = env.create_context(persistent_context);
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region {
|
||||
id: RegionId::new(table_id, 3),
|
||||
..Default::default()
|
||||
},
|
||||
leader_peer: Some(Peer::empty(1)),
|
||||
..Default::default()
|
||||
}];
|
||||
let sync_region = SyncRegion { region_routes };
|
||||
|
||||
let err = sync_region.sync_regions(&mut ctx).await.unwrap_err();
|
||||
assert_matches!(err, Error::RetryLater { .. });
|
||||
}
|
||||
}
|
||||
@@ -13,7 +13,6 @@
|
||||
// limitations under the License.
|
||||
|
||||
pub(crate) mod apply_staging_region;
|
||||
pub(crate) mod exit_staging_region;
|
||||
pub(crate) mod rollback_staging_region;
|
||||
|
||||
use std::any::Any;
|
||||
@@ -29,14 +28,11 @@ use crate::procedure::repartition::group::repartition_end::RepartitionEnd;
|
||||
use crate::procedure::repartition::group::{Context, State};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
pub enum UpdateMetadata {
|
||||
/// Applies the new partition expressions for staging regions.
|
||||
ApplyStaging,
|
||||
/// Rolls back the new partition expressions for staging regions.
|
||||
RollbackStaging,
|
||||
/// Exits the staging regions.
|
||||
ExitStaging,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -66,18 +62,7 @@ impl State for UpdateMetadata {
|
||||
|
||||
if let Err(err) = ctx.invalidate_table_cache().await {
|
||||
warn!(
|
||||
err;
|
||||
"Failed to broadcast the invalidate table cache message during the rollback staging regions"
|
||||
);
|
||||
};
|
||||
Ok((Box::new(RepartitionEnd), Status::executing(false)))
|
||||
}
|
||||
UpdateMetadata::ExitStaging => {
|
||||
self.exit_staging_regions(ctx).await?;
|
||||
if let Err(err) = ctx.invalidate_table_cache().await {
|
||||
warn!(
|
||||
err;
|
||||
"Failed to broadcast the invalidate table cache message during the exit staging regions"
|
||||
"Failed to broadcast the invalidate table cache message during the rollback staging regions, error: {err:?}"
|
||||
);
|
||||
};
|
||||
Ok((Box::new(RepartitionEnd), Status::executing(false)))
|
||||
|
||||
@@ -16,7 +16,7 @@ use std::collections::HashMap;
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::rpc::router::RegionRoute;
|
||||
use common_telemetry::{error, info};
|
||||
use common_telemetry::error;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::error::{self, Result};
|
||||
@@ -77,6 +77,7 @@ impl UpdateMetadata {
|
||||
/// - Source region not found.
|
||||
/// - Failed to update the table route.
|
||||
/// - Central region datanode table value not found.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn apply_staging_regions(&self, ctx: &mut Context) -> Result<()> {
|
||||
let table_id = ctx.persistent_ctx.table_id;
|
||||
let group_id = ctx.persistent_ctx.group_id;
|
||||
@@ -89,13 +90,6 @@ impl UpdateMetadata {
|
||||
region_routes,
|
||||
)?;
|
||||
|
||||
let source_count = ctx.persistent_ctx.sources.len();
|
||||
let target_count = ctx.persistent_ctx.targets.len();
|
||||
info!(
|
||||
"Apply staging regions for repartition, table_id: {}, group_id: {}, sources: {}, targets: {}",
|
||||
table_id, group_id, source_count, target_count
|
||||
);
|
||||
|
||||
if let Err(err) = ctx
|
||||
.update_table_route(¤t_table_route_value, new_region_routes)
|
||||
.await
|
||||
|
||||
@@ -1,104 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::rpc::router::RegionRoute;
|
||||
use common_telemetry::{error, info};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
|
||||
use crate::procedure::repartition::group::{Context, GroupId, region_routes};
|
||||
use crate::procedure::repartition::plan::RegionDescriptor;
|
||||
|
||||
impl UpdateMetadata {
|
||||
fn exit_staging_region_routes(
|
||||
group_id: GroupId,
|
||||
sources: &[RegionDescriptor],
|
||||
targets: &[RegionDescriptor],
|
||||
current_region_routes: &[RegionRoute],
|
||||
) -> Result<Vec<RegionRoute>> {
|
||||
let mut region_routes = current_region_routes.to_vec();
|
||||
let mut region_routes_map = region_routes
|
||||
.iter_mut()
|
||||
.map(|route| (route.region.id, route))
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
for target in targets {
|
||||
let region_route = region_routes_map.get_mut(&target.region_id).context(
|
||||
error::RepartitionTargetRegionMissingSnafu {
|
||||
group_id,
|
||||
region_id: target.region_id,
|
||||
},
|
||||
)?;
|
||||
region_route.clear_leader_staging();
|
||||
}
|
||||
|
||||
for source in sources {
|
||||
let region_route = region_routes_map.get_mut(&source.region_id).context(
|
||||
error::RepartitionSourceRegionMissingSnafu {
|
||||
group_id,
|
||||
region_id: source.region_id,
|
||||
},
|
||||
)?;
|
||||
region_route.clear_leader_staging();
|
||||
}
|
||||
|
||||
Ok(region_routes)
|
||||
}
|
||||
|
||||
/// Exits the staging regions.
|
||||
///
|
||||
/// Abort:
|
||||
/// - Table route is not physical.
|
||||
/// - Target region not found.
|
||||
/// - Source region not found.
|
||||
/// - Failed to update the table route.
|
||||
/// - Central region datanode table value not found.
|
||||
pub(crate) async fn exit_staging_regions(&self, ctx: &mut Context) -> Result<()> {
|
||||
let table_id = ctx.persistent_ctx.table_id;
|
||||
let group_id = ctx.persistent_ctx.group_id;
|
||||
let current_table_route_value = ctx.get_table_route_value().await?;
|
||||
let region_routes = region_routes(table_id, current_table_route_value.get_inner_ref())?;
|
||||
let new_region_routes = Self::exit_staging_region_routes(
|
||||
group_id,
|
||||
&ctx.persistent_ctx.sources,
|
||||
&ctx.persistent_ctx.targets,
|
||||
region_routes,
|
||||
)?;
|
||||
|
||||
let source_count = ctx.persistent_ctx.sources.len();
|
||||
let target_count = ctx.persistent_ctx.targets.len();
|
||||
info!(
|
||||
"Exit staging regions for repartition, table_id: {}, group_id: {}, sources: {}, targets: {}",
|
||||
table_id, group_id, source_count, target_count
|
||||
);
|
||||
|
||||
if let Err(err) = ctx
|
||||
.update_table_route(¤t_table_route_value, new_region_routes)
|
||||
.await
|
||||
{
|
||||
error!(err; "Failed to update the table route during the updating metadata for repartition: {table_id}, group_id: {group_id}");
|
||||
return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
|
||||
reason: format!(
|
||||
"Failed to update the table route during the updating metadata for repartition: {table_id}, group_id: {group_id}"
|
||||
),
|
||||
});
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -16,7 +16,7 @@ use std::collections::HashMap;
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::rpc::router::RegionRoute;
|
||||
use common_telemetry::{error, info};
|
||||
use common_telemetry::error;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::error::{self, Result};
|
||||
@@ -29,6 +29,7 @@ impl UpdateMetadata {
|
||||
/// Abort:
|
||||
/// - Source region not found.
|
||||
/// - Target region not found.
|
||||
#[allow(dead_code)]
|
||||
fn rollback_staging_region_routes(
|
||||
group_id: GroupId,
|
||||
source_routes: &[RegionRoute],
|
||||
@@ -73,6 +74,7 @@ impl UpdateMetadata {
|
||||
/// - Target region not found.
|
||||
/// - Failed to update the table route.
|
||||
/// - Central region datanode table value not found.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn rollback_staging_regions(&self, ctx: &mut Context) -> Result<()> {
|
||||
let table_id = ctx.persistent_ctx.table_id;
|
||||
let group_id = ctx.persistent_ctx.group_id;
|
||||
@@ -87,13 +89,6 @@ impl UpdateMetadata {
|
||||
region_routes,
|
||||
)?;
|
||||
|
||||
let source_count = prepare_result.source_routes.len();
|
||||
let target_count = prepare_result.target_routes.len();
|
||||
info!(
|
||||
"Rollback staging regions for repartition, table_id: {}, group_id: {}, sources: {}, targets: {}",
|
||||
table_id, group_id, source_count, target_count
|
||||
);
|
||||
|
||||
if let Err(err) = ctx
|
||||
.update_table_route(¤t_table_route_value, new_region_routes)
|
||||
.await
|
||||
|
||||
@@ -16,7 +16,6 @@ use std::any::Any;
|
||||
|
||||
use common_meta::key::table_route::PhysicalTableRouteValue;
|
||||
use common_procedure::{Context as ProcedureContext, Status};
|
||||
use common_telemetry::debug;
|
||||
use partition::expr::PartitionExpr;
|
||||
use partition::subtask::{self, RepartitionSubtask};
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -70,17 +69,6 @@ impl State for RepartitionStart {
|
||||
);
|
||||
|
||||
let plans = Self::build_plan(&table_route, &self.from_exprs, &self.to_exprs)?;
|
||||
let plan_count = plans.len();
|
||||
let total_source_regions: usize = plans.iter().map(|p| p.source_regions.len()).sum();
|
||||
let total_target_regions: usize =
|
||||
plans.iter().map(|p| p.target_partition_exprs.len()).sum();
|
||||
common_telemetry::info!(
|
||||
"Repartition start, table_id: {}, plans: {}, total_source_regions: {}, total_target_regions: {}",
|
||||
table_id,
|
||||
plan_count,
|
||||
total_source_regions,
|
||||
total_target_regions
|
||||
);
|
||||
|
||||
if plans.is_empty() {
|
||||
return Ok((Box::new(RepartitionEnd), Status::done()));
|
||||
@@ -98,6 +86,7 @@ impl State for RepartitionStart {
|
||||
}
|
||||
|
||||
impl RepartitionStart {
|
||||
#[allow(dead_code)]
|
||||
fn build_plan(
|
||||
physical_route: &PhysicalTableRouteValue,
|
||||
from_exprs: &[PartitionExpr],
|
||||
@@ -117,6 +106,7 @@ impl RepartitionStart {
|
||||
))
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn build_plan_entries(
|
||||
subtasks: Vec<RepartitionSubtask>,
|
||||
source_index: &[RegionDescriptor],
|
||||
@@ -169,9 +159,8 @@ impl RepartitionStart {
|
||||
.find_map(|(region_id, existing_expr)| {
|
||||
(existing_expr == &expr_json).then_some(*region_id)
|
||||
})
|
||||
.with_context(|| error::RepartitionSourceExprMismatchSnafu { expr: &expr_json })
|
||||
.inspect_err(|_| {
|
||||
debug!("Failed to find matching region for partition expression: {}, existing regions: {:?}", expr_json, existing_regions);
|
||||
.with_context(|| error::RepartitionSourceExprMismatchSnafu {
|
||||
expr: expr_json,
|
||||
})?;
|
||||
|
||||
Ok(RegionDescriptor {
|
||||
|
||||
@@ -96,8 +96,5 @@ pub fn new_persistent_context(
|
||||
region_mapping: HashMap::new(),
|
||||
group_prepare_result: None,
|
||||
staging_manifest_paths: HashMap::new(),
|
||||
sync_region: false,
|
||||
allocated_region_ids: vec![],
|
||||
pending_deallocate_region_ids: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,8 +18,7 @@ use api::v1::meta::mailbox_message::Payload;
|
||||
use api::v1::meta::{HeartbeatResponse, MailboxMessage};
|
||||
use common_meta::instruction::{
|
||||
DowngradeRegionReply, DowngradeRegionsReply, EnterStagingRegionReply, EnterStagingRegionsReply,
|
||||
FlushRegionReply, InstructionReply, SimpleReply, SyncRegionReply, SyncRegionsReply,
|
||||
UpgradeRegionReply, UpgradeRegionsReply,
|
||||
FlushRegionReply, InstructionReply, SimpleReply, UpgradeRegionReply, UpgradeRegionsReply,
|
||||
};
|
||||
use common_meta::key::TableMetadataManagerRef;
|
||||
use common_meta::key::table_route::TableRouteValue;
|
||||
@@ -254,34 +253,6 @@ pub fn new_enter_staging_region_reply(
|
||||
}
|
||||
}
|
||||
|
||||
/// Generates a [InstructionReply::SyncRegions] reply.
|
||||
pub fn new_sync_region_reply(
|
||||
id: u64,
|
||||
region_id: RegionId,
|
||||
ready: bool,
|
||||
exists: bool,
|
||||
error: Option<String>,
|
||||
) -> MailboxMessage {
|
||||
MailboxMessage {
|
||||
id,
|
||||
subject: "mock".to_string(),
|
||||
from: "datanode".to_string(),
|
||||
to: "meta".to_string(),
|
||||
timestamp_millis: current_time_millis(),
|
||||
payload: Some(Payload::Json(
|
||||
serde_json::to_string(&InstructionReply::SyncRegions(SyncRegionsReply::new(vec![
|
||||
SyncRegionReply {
|
||||
region_id,
|
||||
ready,
|
||||
exists,
|
||||
error,
|
||||
},
|
||||
])))
|
||||
.unwrap(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Mock the test data for WAL pruning.
|
||||
pub async fn new_wal_prune_metadata(
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
|
||||
@@ -12,185 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::MailboxMessage;
|
||||
use common_meta::instruction::{FlushErrorStrategy, FlushRegions, Instruction, InstructionReply};
|
||||
use common_meta::peer::Peer;
|
||||
use common_telemetry::{info, warn};
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::RegionId;
|
||||
use tokio::time::Instant;
|
||||
|
||||
use crate::error::{self, Error, Result};
|
||||
use crate::handler::HeartbeatMailbox;
|
||||
use crate::service::mailbox::{Channel, MailboxRef};
|
||||
|
||||
pub(crate) enum ErrorStrategy {
|
||||
Ignore,
|
||||
Retry,
|
||||
}
|
||||
|
||||
fn handle_flush_region_reply(
|
||||
reply: &InstructionReply,
|
||||
region_ids: &[RegionId],
|
||||
msg: &MailboxMessage,
|
||||
) -> Result<(bool, Option<String>)> {
|
||||
let result = match reply {
|
||||
InstructionReply::FlushRegions(flush_reply) => {
|
||||
if flush_reply.results.len() != region_ids.len() {
|
||||
return error::UnexpectedInstructionReplySnafu {
|
||||
mailbox_message: msg.to_string(),
|
||||
reason: format!(
|
||||
"expect {} region flush result, but got {}",
|
||||
region_ids.len(),
|
||||
flush_reply.results.len()
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
match flush_reply.overall_success {
|
||||
true => (true, None),
|
||||
false => (
|
||||
false,
|
||||
Some(
|
||||
flush_reply
|
||||
.results
|
||||
.iter()
|
||||
.filter_map(|(region_id, result)| match result {
|
||||
Ok(_) => None,
|
||||
Err(e) => Some(format!("{}: {:?}", region_id, e)),
|
||||
})
|
||||
.collect::<Vec<String>>()
|
||||
.join("; "),
|
||||
),
|
||||
),
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
return error::UnexpectedInstructionReplySnafu {
|
||||
mailbox_message: msg.to_string(),
|
||||
reason: "expect flush region reply",
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
};
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Flushes the regions on the datanode.
|
||||
///
|
||||
/// Retry Or Ignore:
|
||||
/// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
|
||||
/// - Failed to flush region on the Datanode.
|
||||
///
|
||||
/// Abort:
|
||||
/// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
|
||||
/// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply).
|
||||
/// - [ExceededDeadline](error::Error::ExceededDeadline)
|
||||
/// - Invalid JSON.
|
||||
pub(crate) async fn flush_region(
|
||||
mailbox: &MailboxRef,
|
||||
server_addr: &str,
|
||||
region_ids: &[RegionId],
|
||||
datanode: &Peer,
|
||||
timeout: Duration,
|
||||
error_strategy: ErrorStrategy,
|
||||
) -> Result<()> {
|
||||
let flush_instruction = Instruction::FlushRegions(FlushRegions::sync_batch(
|
||||
region_ids.to_vec(),
|
||||
FlushErrorStrategy::TryAll,
|
||||
));
|
||||
|
||||
let msg = MailboxMessage::json_message(
|
||||
&format!("Flush regions: {:?}", region_ids),
|
||||
&format!("Metasrv@{}", server_addr),
|
||||
&format!("Datanode-{}@{}", datanode.id, datanode.addr),
|
||||
common_time::util::current_time_millis(),
|
||||
&flush_instruction,
|
||||
)
|
||||
.with_context(|_| error::SerializeToJsonSnafu {
|
||||
input: flush_instruction.to_string(),
|
||||
})?;
|
||||
|
||||
let ch = Channel::Datanode(datanode.id);
|
||||
let now = Instant::now();
|
||||
let receiver = mailbox.send(&ch, msg, timeout).await;
|
||||
let receiver = match receiver {
|
||||
Ok(receiver) => receiver,
|
||||
Err(error::Error::PusherNotFound { .. }) => match error_strategy {
|
||||
ErrorStrategy::Ignore => {
|
||||
warn!(
|
||||
"Failed to flush regions({:?}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
|
||||
region_ids, datanode
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
ErrorStrategy::Retry => error::RetryLaterSnafu {
|
||||
reason: format!(
|
||||
"Pusher not found for flush regions on datanode {:?}, elapsed: {:?}",
|
||||
datanode,
|
||||
now.elapsed()
|
||||
),
|
||||
}
|
||||
.fail()?,
|
||||
},
|
||||
Err(err) => {
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
match receiver.await {
|
||||
Ok(msg) => {
|
||||
let reply = HeartbeatMailbox::json_reply(&msg)?;
|
||||
info!(
|
||||
"Received flush region reply: {:?}, regions: {:?}, elapsed: {:?}",
|
||||
reply,
|
||||
region_ids,
|
||||
now.elapsed()
|
||||
);
|
||||
let (result, error) = handle_flush_region_reply(&reply, region_ids, &msg)?;
|
||||
if let Some(error) = error {
|
||||
match error_strategy {
|
||||
ErrorStrategy::Ignore => {
|
||||
warn!(
|
||||
"Failed to flush regions {:?}, the datanode({}) error is ignored: {}",
|
||||
region_ids, datanode, error
|
||||
);
|
||||
}
|
||||
ErrorStrategy::Retry => {
|
||||
return error::RetryLaterSnafu {
|
||||
reason: format!(
|
||||
"Failed to flush regions {:?}, the datanode({}) error is retried: {}",
|
||||
region_ids,
|
||||
datanode,
|
||||
error,
|
||||
),
|
||||
}
|
||||
.fail()?;
|
||||
}
|
||||
}
|
||||
} else if result {
|
||||
info!(
|
||||
"The flush regions {:?} on datanode {:?} is successful, elapsed: {:?}",
|
||||
region_ids,
|
||||
datanode,
|
||||
now.elapsed()
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
|
||||
operation: "Flush regions",
|
||||
}
|
||||
.fail(),
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "mock"))]
|
||||
pub mod mock {
|
||||
use std::io::Error;
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
//! Drop a metric region
|
||||
|
||||
use common_telemetry::{debug, info};
|
||||
use common_telemetry::info;
|
||||
use snafu::ResultExt;
|
||||
use store_api::region_engine::RegionEngine;
|
||||
use store_api::region_request::{AffectedRows, RegionDropRequest, RegionRequest};
|
||||
@@ -46,15 +46,6 @@ impl MetricEngineInner {
|
||||
.physical_region_states()
|
||||
.get(&data_region_id)
|
||||
{
|
||||
debug!(
|
||||
"Physical region {} is busy, there are still some logical regions: {:?}",
|
||||
data_region_id,
|
||||
state
|
||||
.logical_regions()
|
||||
.iter()
|
||||
.map(|id| id.to_string())
|
||||
.collect::<Vec<_>>()
|
||||
);
|
||||
(true, !state.logical_regions().is_empty())
|
||||
} else {
|
||||
// the second argument is not used, just pass in a dummy value
|
||||
|
||||
@@ -314,8 +314,11 @@ impl MitoRegion {
|
||||
|
||||
/// Sets the dropping state.
|
||||
/// You should call this method in the worker loop.
|
||||
pub(crate) fn set_dropping(&self, expect: RegionLeaderState) -> Result<()> {
|
||||
self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Dropping))
|
||||
pub(crate) fn set_dropping(&self) -> Result<()> {
|
||||
self.compare_exchange_state(
|
||||
RegionLeaderState::Writable,
|
||||
RegionRoleState::Leader(RegionLeaderState::Dropping),
|
||||
)
|
||||
}
|
||||
|
||||
/// Sets the truncating state.
|
||||
|
||||
@@ -31,7 +31,7 @@ impl<S> RegionWorkerLoop<S> {
|
||||
let region_id = request.region_id;
|
||||
let source_region_id = request.source_region_id;
|
||||
let sender = request.sender;
|
||||
let region = match self.regions.writable_non_staging_region(region_id) {
|
||||
let region = match self.regions.writable_region(region_id) {
|
||||
Ok(region) => region,
|
||||
Err(e) => {
|
||||
let _ = sender.send(Err(e));
|
||||
|
||||
@@ -42,18 +42,12 @@ where
|
||||
&mut self,
|
||||
region_id: RegionId,
|
||||
) -> Result<AffectedRows> {
|
||||
let region = self.regions.writable_region(region_id)?;
|
||||
let region = self.regions.writable_non_staging_region(region_id)?;
|
||||
|
||||
info!("Try to drop region: {}, worker: {}", region_id, self.id);
|
||||
|
||||
let is_staging = region.is_staging();
|
||||
let expect_state = if is_staging {
|
||||
RegionLeaderState::Staging
|
||||
} else {
|
||||
RegionLeaderState::Writable
|
||||
};
|
||||
// Marks the region as dropping.
|
||||
region.set_dropping(expect_state)?;
|
||||
region.set_dropping()?;
|
||||
// Writes dropping marker
|
||||
// We rarely drop a region so we still operate in the worker loop.
|
||||
let region_dir = region.access_layer.build_region_dir(region_id);
|
||||
|
||||
@@ -638,7 +638,7 @@ impl RegionStatistic {
|
||||
}
|
||||
|
||||
/// Request to sync the region from a manifest or a region.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum SyncRegionFromRequest {
|
||||
/// Syncs the region using manifest information.
|
||||
/// Used in leader-follower manifest sync scenarios.
|
||||
|
||||
@@ -99,155 +99,3 @@ DROP TABLE alter_repartition_table;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- Metric engine repartition test
|
||||
CREATE TABLE metric_physical_table (
|
||||
ts TIMESTAMP TIME INDEX,
|
||||
host STRING,
|
||||
cpu DOUBLE,
|
||||
PRIMARY KEY(host)
|
||||
)
|
||||
PARTITION ON COLUMNS (host) (
|
||||
host < 'h1',
|
||||
host >= 'h1' AND host < 'h2',
|
||||
host >= 'h2'
|
||||
)
|
||||
ENGINE = metric
|
||||
WITH (
|
||||
physical_metric_table = "true"
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE TABLE logical_table_v1 (
|
||||
ts TIMESTAMP TIME INDEX,
|
||||
host STRING PRIMARY KEY,
|
||||
cpu DOUBLE,
|
||||
)
|
||||
ENGINE = metric
|
||||
WITH (
|
||||
on_physical_table = "metric_physical_table"
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE TABLE logical_table_v2 (
|
||||
ts TIMESTAMP TIME INDEX,
|
||||
host STRING PRIMARY KEY,
|
||||
cpu DOUBLE,
|
||||
)
|
||||
ENGINE = metric
|
||||
WITH (
|
||||
on_physical_table = "metric_physical_table"
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- Split physical table partition
|
||||
ALTER TABLE metric_physical_table SPLIT PARTITION (
|
||||
host < 'h1'
|
||||
) INTO (
|
||||
host < 'h0',
|
||||
host >= 'h0' AND host < 'h1'
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SHOW CREATE TABLE metric_physical_table;
|
||||
|
||||
+-----------------------+------------------------------------------------------+
|
||||
| Table | Create Table |
|
||||
+-----------------------+------------------------------------------------------+
|
||||
| metric_physical_table | CREATE TABLE IF NOT EXISTS "metric_physical_table" ( |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||
| | "host" STRING NULL, |
|
||||
| | "cpu" DOUBLE NULL, |
|
||||
| | TIME INDEX ("ts"), |
|
||||
| | PRIMARY KEY ("host") |
|
||||
| | ) |
|
||||
| | PARTITION ON COLUMNS ("host") ( |
|
||||
| | host < 'h0', |
|
||||
| | host >= 'h1' AND host < 'h2', |
|
||||
| | host >= 'h2', |
|
||||
| | host >= 'h0' AND host < 'h1' |
|
||||
| | ) |
|
||||
| | ENGINE=metric |
|
||||
| | WITH( |
|
||||
| | physical_metric_table = 'true' |
|
||||
| | ) |
|
||||
+-----------------------+------------------------------------------------------+
|
||||
|
||||
-- Verify select * works and returns empty
|
||||
SELECT * FROM metric_physical_table;
|
||||
|
||||
++
|
||||
++
|
||||
|
||||
SELECT * FROM logical_table_v1;
|
||||
|
||||
++
|
||||
++
|
||||
|
||||
SELECT * FROM logical_table_v2;
|
||||
|
||||
++
|
||||
++
|
||||
|
||||
-- Merge physical table partition
|
||||
ALTER TABLE metric_physical_table MERGE PARTITION (
|
||||
host < 'h0',
|
||||
host >= 'h0' AND host < 'h1'
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SHOW CREATE TABLE metric_physical_table;
|
||||
|
||||
+-----------------------+------------------------------------------------------+
|
||||
| Table | Create Table |
|
||||
+-----------------------+------------------------------------------------------+
|
||||
| metric_physical_table | CREATE TABLE IF NOT EXISTS "metric_physical_table" ( |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||
| | "host" STRING NULL, |
|
||||
| | "cpu" DOUBLE NULL, |
|
||||
| | TIME INDEX ("ts"), |
|
||||
| | PRIMARY KEY ("host") |
|
||||
| | ) |
|
||||
| | PARTITION ON COLUMNS ("host") ( |
|
||||
| | host < 'h0' OR host >= 'h0' AND host < 'h1', |
|
||||
| | host >= 'h1' AND host < 'h2', |
|
||||
| | host >= 'h2' |
|
||||
| | ) |
|
||||
| | ENGINE=metric |
|
||||
| | WITH( |
|
||||
| | physical_metric_table = 'true' |
|
||||
| | ) |
|
||||
+-----------------------+------------------------------------------------------+
|
||||
|
||||
-- Verify select * works and returns empty
|
||||
SELECT * FROM metric_physical_table;
|
||||
|
||||
++
|
||||
++
|
||||
|
||||
SELECT * FROM logical_table_v1;
|
||||
|
||||
++
|
||||
++
|
||||
|
||||
SELECT * FROM logical_table_v2;
|
||||
|
||||
++
|
||||
++
|
||||
|
||||
DROP TABLE logical_table_v1;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DROP TABLE logical_table_v2;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DROP TABLE metric_physical_table;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
|
||||
@@ -46,78 +46,3 @@ ALTER TABLE alter_repartition_table REPARTITION (
|
||||
);
|
||||
|
||||
DROP TABLE alter_repartition_table;
|
||||
|
||||
-- Metric engine repartition test
|
||||
CREATE TABLE metric_physical_table (
|
||||
ts TIMESTAMP TIME INDEX,
|
||||
host STRING,
|
||||
cpu DOUBLE,
|
||||
PRIMARY KEY(host)
|
||||
)
|
||||
PARTITION ON COLUMNS (host) (
|
||||
host < 'h1',
|
||||
host >= 'h1' AND host < 'h2',
|
||||
host >= 'h2'
|
||||
)
|
||||
ENGINE = metric
|
||||
WITH (
|
||||
physical_metric_table = "true"
|
||||
);
|
||||
|
||||
CREATE TABLE logical_table_v1 (
|
||||
ts TIMESTAMP TIME INDEX,
|
||||
host STRING PRIMARY KEY,
|
||||
cpu DOUBLE,
|
||||
)
|
||||
ENGINE = metric
|
||||
WITH (
|
||||
on_physical_table = "metric_physical_table"
|
||||
);
|
||||
|
||||
CREATE TABLE logical_table_v2 (
|
||||
ts TIMESTAMP TIME INDEX,
|
||||
host STRING PRIMARY KEY,
|
||||
cpu DOUBLE,
|
||||
)
|
||||
ENGINE = metric
|
||||
WITH (
|
||||
on_physical_table = "metric_physical_table"
|
||||
);
|
||||
|
||||
-- Split physical table partition
|
||||
ALTER TABLE metric_physical_table SPLIT PARTITION (
|
||||
host < 'h1'
|
||||
) INTO (
|
||||
host < 'h0',
|
||||
host >= 'h0' AND host < 'h1'
|
||||
);
|
||||
|
||||
SHOW CREATE TABLE metric_physical_table;
|
||||
|
||||
-- Verify select * works and returns empty
|
||||
SELECT * FROM metric_physical_table;
|
||||
|
||||
SELECT * FROM logical_table_v1;
|
||||
|
||||
SELECT * FROM logical_table_v2;
|
||||
|
||||
-- Merge physical table partition
|
||||
ALTER TABLE metric_physical_table MERGE PARTITION (
|
||||
host < 'h0',
|
||||
host >= 'h0' AND host < 'h1'
|
||||
);
|
||||
|
||||
SHOW CREATE TABLE metric_physical_table;
|
||||
|
||||
-- Verify select * works and returns empty
|
||||
SELECT * FROM metric_physical_table;
|
||||
|
||||
SELECT * FROM logical_table_v1;
|
||||
|
||||
SELECT * FROM logical_table_v2;
|
||||
|
||||
DROP TABLE logical_table_v1;
|
||||
|
||||
DROP TABLE logical_table_v2;
|
||||
|
||||
DROP TABLE metric_physical_table;
|
||||
|
||||
Reference in New Issue
Block a user