Compare commits

...

17 Commits

Author SHA1 Message Date
Ning Sun
ef80503454 Merge branch 'main' into feature/df-binary-operator-nested-data 2025-12-02 14:58:46 +08:00
Ning Sun
69f0249039 feat: update pg-catalog for describe table (#7321) 2025-12-02 01:38:36 +00:00
dennis zhuang
1f91422bae feat!: improve mysql/pg compatibility (#7315)
* feat(mysql): add SHOW WARNINGS support and return warnings for unsupported SET variables

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* feat(function): add MySQL IF() function and PostgreSQL description functions for connector compatibility

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: show tables for mysql

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: partitions table in information_schema and add starrocks external catalog compatibility

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* refactor: async udf

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: set warnings

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* feat: impl pg_my_temp_schema and make description functions simple

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* test: add test for issue 7313

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* feat: apply suggestions

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: partition_expression and partition_description

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: test

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: unit tests

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: saerch_path only works for pg

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* feat: improve warnings processing

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: warnings while writing affected rows and refactor

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: improve ShobjDescriptionFunction signature

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* refactor: array_to_boolean

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

---------

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-12-01 20:41:14 +00:00
jeremyhi
377373b8fd fix: request limiter test case fix (#7323)
Signed-off-by: jeremyhi <fengjiachun@gmail.com>
2025-12-01 20:12:32 +00:00
fys
e107030d85 chore: add more fields to DdlManagerConfigureContext (#7310)
* feat: add more context for configurator

* move the flow grpc configure context to plugins crate

* move context to plugins crate

* add more fields

* fix: cargo check

* refactor: some

* refactor some

* adjust context

* fix: cargo check

* fix: ut
2025-12-01 08:03:12 +00:00
Weny Xu
18875eed4d feat: implement Display trait for FlushRegions (#7320)
feat: implement Display trait for FlushRegions

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-12-01 06:33:23 +00:00
discord9
ee76d50569 test: gc integration test (#7306)
* test: basic infra for set gc

Signed-off-by: discord9 <discord9@163.com>

* more stuff

Signed-off-by: discord9 <discord9@163.com>

* test: basic gc integration test

Signed-off-by: discord9 <discord9@163.com>

* rm unused

Signed-off-by: discord9 <discord9@163.com>

* clippy

Signed-off-by: discord9 <discord9@163.com>

* refactor: remove loader

Signed-off-by: discord9 <discord9@163.com>

* clippy

Signed-off-by: discord9 <discord9@163.com>

* fix: allow default endpoint

Signed-off-by: discord9 <discord9@163.com>

* filter out files

Signed-off-by: discord9 <discord9@163.com>

* chore: rm minio support

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2025-12-01 04:10:42 +00:00
Weny Xu
5d634aeba0 feat: implement metadata update for repartition group procedure (#7311)
Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-12-01 03:15:12 +00:00
Weny Xu
8346acb900 feat: introduce EnterStagingRequest for RegionEngine (#7261)
* feat: introduce `EnterStagingRequest` for region engine

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: improve error handling in staging mode entry

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-11-28 09:02:32 +00:00
LFC
fdab75ce27 feat: simple read write new json type values (#7175)
feat: basic json read and write

Signed-off-by: luofucong <luofc@foxmail.com>
2025-11-27 12:40:35 +00:00
Ruihang Xia
4c07d2d5de fix: metric engine deadlock when altering a group of tables (#7308)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-11-27 09:45:06 +00:00
fys
020477994b feat: add some configurable points (#7227)
* feat: enhance extension

* fix: cr

* move information schema table factories trait to standalone

* fix: self cr

* remove extension factory

* refactor

* remove extension filed from greptime options struct

* refactor

* minor refactor

* fix: cargo check

* fix: clippy

* fix: license check

* feat: enhance grpc and http configurator in servers crate

* grpc builder configurator

* remove unused file

* complete the remaining expansion points.

* fix: self-cr

* rename

* fix: typo
2025-11-27 09:21:46 +00:00
Yingwen
afefc0c604 fix: implement bulk write for time partitions and bulk memtable (#7293)
* feat: implement convert_bulk_part

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: convert bulk part in TimePartitions

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: fill missing columns for bulk parts

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: update comments

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: cast to dictionary type

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: add unit tests

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: update comment

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: do not convert part if bulk is written by write()

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-27 08:01:45 +00:00
Weny Xu
e44323c433 feat: add region repartition group procedure infrastructure (#7299)
* feat: add region repartition group procedure infrastructure

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-11-27 04:57:45 +00:00
discord9
0aeaf405c7 feat: add batch gc procedure (#7296)
* feat: add batch gc procedure

Signed-off-by: discord9 <discord9@163.com>

* chore

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* pcr

Signed-off-by: discord9 <discord9@163.com>

* per even review

Signed-off-by: discord9 <discord9@163.com>

* per review

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2025-11-27 03:58:15 +00:00
Yingwen
b5cbc35a0d fix: partition tree metric should the delta (#7307)
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-27 03:49:02 +00:00
Ning Sun
30ca2d7652 feat: switch to a datafusion branch that has binary operator support for nested data 2025-10-24 08:02:48 +08:00
158 changed files with 7376 additions and 1072 deletions

71
Cargo.lock generated
View File

@@ -3274,7 +3274,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"arrow",
"arrow-ipc",
@@ -3329,7 +3329,7 @@ dependencies = [
[[package]]
name = "datafusion-catalog"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"arrow",
"async-trait",
@@ -3353,7 +3353,7 @@ dependencies = [
[[package]]
name = "datafusion-catalog-listing"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"arrow",
"async-trait",
@@ -3375,7 +3375,7 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"ahash 0.8.12",
"arrow",
@@ -3398,7 +3398,7 @@ dependencies = [
[[package]]
name = "datafusion-common-runtime"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"futures",
"log",
@@ -3408,7 +3408,7 @@ dependencies = [
[[package]]
name = "datafusion-datasource"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"arrow",
"async-compression 0.4.19",
@@ -3442,7 +3442,7 @@ dependencies = [
[[package]]
name = "datafusion-datasource-csv"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"arrow",
"async-trait",
@@ -3464,7 +3464,7 @@ dependencies = [
[[package]]
name = "datafusion-datasource-json"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"arrow",
"async-trait",
@@ -3485,7 +3485,7 @@ dependencies = [
[[package]]
name = "datafusion-datasource-parquet"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"arrow",
"async-trait",
@@ -3514,12 +3514,12 @@ dependencies = [
[[package]]
name = "datafusion-doc"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
[[package]]
name = "datafusion-execution"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"arrow",
"async-trait",
@@ -3538,7 +3538,7 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"arrow",
"async-trait",
@@ -3560,7 +3560,7 @@ dependencies = [
[[package]]
name = "datafusion-expr-common"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"arrow",
"datafusion-common",
@@ -3572,7 +3572,7 @@ dependencies = [
[[package]]
name = "datafusion-functions"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"arrow",
"arrow-buffer",
@@ -3600,7 +3600,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-aggregate"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"ahash 0.8.12",
"arrow",
@@ -3620,7 +3620,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-aggregate-common"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"ahash 0.8.12",
"arrow",
@@ -3632,7 +3632,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-nested"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"arrow",
"arrow-ord",
@@ -3654,7 +3654,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-table"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"arrow",
"async-trait",
@@ -3669,7 +3669,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-window"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"arrow",
"datafusion-common",
@@ -3686,7 +3686,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-window-common"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"datafusion-common",
"datafusion-physical-expr-common",
@@ -3695,7 +3695,7 @@ dependencies = [
[[package]]
name = "datafusion-macros"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"datafusion-doc",
"quote",
@@ -3705,7 +3705,7 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"arrow",
"chrono",
@@ -3741,9 +3741,9 @@ dependencies = [
[[package]]
name = "datafusion-pg-catalog"
version = "0.12.1"
version = "0.12.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15824c98ff2009c23b0398d441499b147f7c5ac0e5ee993e7a473d79040e3626"
checksum = "755393864c0c2dd95575ceed4b25e348686028e1b83d06f8f39914209999f821"
dependencies = [
"async-trait",
"datafusion",
@@ -3756,7 +3756,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"ahash 0.8.12",
"arrow",
@@ -3777,7 +3777,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr-adapter"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"arrow",
"datafusion-common",
@@ -3791,7 +3791,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr-common"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"ahash 0.8.12",
"arrow",
@@ -3804,7 +3804,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-optimizer"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"arrow",
"datafusion-common",
@@ -3822,7 +3822,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-plan"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"ahash 0.8.12",
"arrow",
@@ -3852,7 +3852,7 @@ dependencies = [
[[package]]
name = "datafusion-pruning"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"arrow",
"datafusion-common",
@@ -3868,7 +3868,7 @@ dependencies = [
[[package]]
name = "datafusion-session"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"async-trait",
"datafusion-common",
@@ -3881,7 +3881,7 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"arrow",
"bigdecimal 0.4.8",
@@ -3898,7 +3898,7 @@ dependencies = [
[[package]]
name = "datafusion-substrait"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
dependencies = [
"async-recursion",
"async-trait",
@@ -7445,6 +7445,7 @@ dependencies = [
"once_cell",
"ordered-float 4.6.0",
"parking_lot 0.12.4",
"partition",
"prometheus",
"prost 0.13.5",
"rand 0.9.1",
@@ -8361,6 +8362,7 @@ dependencies = [
"common-macro",
"common-telemetry",
"common-test-util",
"derive_builder 0.20.2",
"futures",
"humantime-serde",
"lazy_static",
@@ -9499,6 +9501,7 @@ name = "plugins"
version = "1.0.0-beta.2"
dependencies = [
"auth",
"catalog",
"clap 4.5.40",
"cli",
"common-base",
@@ -9507,6 +9510,7 @@ dependencies = [
"datanode",
"flow",
"frontend",
"meta-client",
"meta-srv",
"serde",
"snafu 0.8.6",
@@ -13063,6 +13067,7 @@ dependencies = [
"loki-proto",
"meta-client",
"meta-srv",
"mito2",
"moka",
"mysql_async",
"object-store",

View File

@@ -131,7 +131,7 @@ datafusion-functions = "50"
datafusion-functions-aggregate-common = "50"
datafusion-optimizer = "50"
datafusion-orc = "0.5"
datafusion-pg-catalog = "0.12.1"
datafusion-pg-catalog = "0.12.2"
datafusion-physical-expr = "50"
datafusion-physical-plan = "50"
datafusion-sql = "50"
@@ -316,18 +316,18 @@ git = "https://github.com/GreptimeTeam/greptime-meter.git"
rev = "5618e779cf2bb4755b499c630fba4c35e91898cb"
[patch.crates-io]
datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-physical-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
datafusion-physical-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "4b519a5caa95472cc3988f5556813a583dd35af1" } # branch = "v0.58.x"
[profile.release]

View File

@@ -23,11 +23,9 @@ use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth
use datatypes::json::value::{JsonNumber, JsonValue, JsonValueRef, JsonVariant};
use datatypes::prelude::{ConcreteDataType, ValueRef};
use datatypes::types::{
IntervalType, JsonFormat, StructField, StructType, TimeType, TimestampType,
};
use datatypes::value::{
ListValue, ListValueRef, OrderedF32, OrderedF64, StructValue, StructValueRef, Value,
IntervalType, JsonFormat, JsonType, StructField, StructType, TimeType, TimestampType,
};
use datatypes::value::{ListValueRef, OrderedF32, OrderedF64, StructValueRef, Value};
use datatypes::vectors::VectorRef;
use greptime_proto::v1::column_data_type_extension::TypeExt;
use greptime_proto::v1::ddl_request::Expr;
@@ -82,6 +80,10 @@ impl ColumnDataTypeWrapper {
pub fn to_parts(&self) -> (ColumnDataType, Option<ColumnDataTypeExtension>) {
(self.datatype, self.datatype_ext.clone())
}
pub fn into_parts(self) -> (ColumnDataType, Option<ColumnDataTypeExtension>) {
(self.datatype, self.datatype_ext)
}
}
impl From<ColumnDataTypeWrapper> for ConcreteDataType {
@@ -127,6 +129,7 @@ impl From<ColumnDataTypeWrapper> for ConcreteDataType {
};
ConcreteDataType::json_native_datatype(inner_type.into())
}
None => ConcreteDataType::Json(JsonType::null()),
_ => {
// invalid state, type extension is missing or invalid
ConcreteDataType::null_datatype()
@@ -441,18 +444,22 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
JsonFormat::Jsonb => Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
}),
JsonFormat::Native(inner) => {
let inner_type = ColumnDataTypeWrapper::try_from(
ConcreteDataType::from(inner.as_ref()),
)?;
Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::JsonNativeType(Box::new(
JsonNativeTypeExtension {
datatype: inner_type.datatype.into(),
datatype_extension: inner_type.datatype_ext.map(Box::new),
},
))),
})
JsonFormat::Native(native_type) => {
if native_type.is_null() {
None
} else {
let native_type = ConcreteDataType::from(native_type.as_ref());
let (datatype, datatype_extension) =
ColumnDataTypeWrapper::try_from(native_type)?.into_parts();
Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::JsonNativeType(Box::new(
JsonNativeTypeExtension {
datatype: datatype as i32,
datatype_extension: datatype_extension.map(Box::new),
},
))),
})
}
}
}
} else {
@@ -887,111 +894,6 @@ pub fn is_column_type_value_eq(
.unwrap_or(false)
}
/// Convert value into proto's value.
pub fn to_proto_value(value: Value) -> v1::Value {
match value {
Value::Null => v1::Value { value_data: None },
Value::Boolean(v) => v1::Value {
value_data: Some(ValueData::BoolValue(v)),
},
Value::UInt8(v) => v1::Value {
value_data: Some(ValueData::U8Value(v.into())),
},
Value::UInt16(v) => v1::Value {
value_data: Some(ValueData::U16Value(v.into())),
},
Value::UInt32(v) => v1::Value {
value_data: Some(ValueData::U32Value(v)),
},
Value::UInt64(v) => v1::Value {
value_data: Some(ValueData::U64Value(v)),
},
Value::Int8(v) => v1::Value {
value_data: Some(ValueData::I8Value(v.into())),
},
Value::Int16(v) => v1::Value {
value_data: Some(ValueData::I16Value(v.into())),
},
Value::Int32(v) => v1::Value {
value_data: Some(ValueData::I32Value(v)),
},
Value::Int64(v) => v1::Value {
value_data: Some(ValueData::I64Value(v)),
},
Value::Float32(v) => v1::Value {
value_data: Some(ValueData::F32Value(*v)),
},
Value::Float64(v) => v1::Value {
value_data: Some(ValueData::F64Value(*v)),
},
Value::String(v) => v1::Value {
value_data: Some(ValueData::StringValue(v.as_utf8().to_string())),
},
Value::Binary(v) => v1::Value {
value_data: Some(ValueData::BinaryValue(v.to_vec())),
},
Value::Date(v) => v1::Value {
value_data: Some(ValueData::DateValue(v.val())),
},
Value::Timestamp(v) => match v.unit() {
TimeUnit::Second => v1::Value {
value_data: Some(ValueData::TimestampSecondValue(v.value())),
},
TimeUnit::Millisecond => v1::Value {
value_data: Some(ValueData::TimestampMillisecondValue(v.value())),
},
TimeUnit::Microsecond => v1::Value {
value_data: Some(ValueData::TimestampMicrosecondValue(v.value())),
},
TimeUnit::Nanosecond => v1::Value {
value_data: Some(ValueData::TimestampNanosecondValue(v.value())),
},
},
Value::Time(v) => match v.unit() {
TimeUnit::Second => v1::Value {
value_data: Some(ValueData::TimeSecondValue(v.value())),
},
TimeUnit::Millisecond => v1::Value {
value_data: Some(ValueData::TimeMillisecondValue(v.value())),
},
TimeUnit::Microsecond => v1::Value {
value_data: Some(ValueData::TimeMicrosecondValue(v.value())),
},
TimeUnit::Nanosecond => v1::Value {
value_data: Some(ValueData::TimeNanosecondValue(v.value())),
},
},
Value::IntervalYearMonth(v) => v1::Value {
value_data: Some(ValueData::IntervalYearMonthValue(v.to_i32())),
},
Value::IntervalDayTime(v) => v1::Value {
value_data: Some(ValueData::IntervalDayTimeValue(v.to_i64())),
},
Value::IntervalMonthDayNano(v) => v1::Value {
value_data: Some(ValueData::IntervalMonthDayNanoValue(
convert_month_day_nano_to_pb(v),
)),
},
Value::Decimal128(v) => v1::Value {
value_data: Some(ValueData::Decimal128Value(convert_to_pb_decimal128(v))),
},
Value::List(list_value) => v1::Value {
value_data: Some(ValueData::ListValue(v1::ListValue {
items: convert_list_to_pb_values(list_value),
})),
},
Value::Struct(struct_value) => v1::Value {
value_data: Some(ValueData::StructValue(v1::StructValue {
items: convert_struct_to_pb_values(struct_value),
})),
},
Value::Json(v) => v1::Value {
value_data: Some(ValueData::JsonValue(encode_json_value(*v))),
},
Value::Duration(_) => v1::Value { value_data: None },
}
}
fn encode_json_value(value: JsonValue) -> v1::JsonValue {
fn helper(json: JsonVariant) -> v1::JsonValue {
let value = match json {
@@ -1052,22 +954,6 @@ fn decode_json_value(value: &v1::JsonValue) -> JsonValueRef<'_> {
}
}
fn convert_list_to_pb_values(list_value: ListValue) -> Vec<v1::Value> {
list_value
.take_items()
.into_iter()
.map(to_proto_value)
.collect()
}
fn convert_struct_to_pb_values(struct_value: StructValue) -> Vec<v1::Value> {
struct_value
.take_items()
.into_iter()
.map(to_proto_value)
.collect()
}
/// Returns the [ColumnDataTypeWrapper] of the value.
///
/// If value is null, returns `None`.
@@ -1114,14 +1000,14 @@ pub fn vectors_to_rows<'a>(
let mut rows = vec![Row { values: vec![] }; row_count];
for column in columns {
for (row_index, row) in rows.iter_mut().enumerate() {
row.values.push(value_to_grpc_value(column.get(row_index)))
row.values.push(to_grpc_value(column.get(row_index)))
}
}
rows
}
pub fn value_to_grpc_value(value: Value) -> GrpcValue {
pub fn to_grpc_value(value: Value) -> GrpcValue {
GrpcValue {
value_data: match value {
Value::Null => None,
@@ -1161,7 +1047,7 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue {
let items = list_value
.take_items()
.into_iter()
.map(value_to_grpc_value)
.map(to_grpc_value)
.collect();
Some(ValueData::ListValue(v1::ListValue { items }))
}
@@ -1169,7 +1055,7 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue {
let items = struct_value
.take_items()
.into_iter()
.map(value_to_grpc_value)
.map(to_grpc_value)
.collect();
Some(ValueData::StructValue(v1::StructValue { items }))
}
@@ -1269,6 +1155,7 @@ mod tests {
use common_time::interval::IntervalUnit;
use datatypes::scalars::ScalarVector;
use datatypes::types::{Int8Type, Int32Type, UInt8Type, UInt32Type};
use datatypes::value::{ListValue, StructValue};
use datatypes::vectors::{
BooleanVector, DateVector, Float32Vector, PrimitiveVector, StringVector,
};
@@ -1872,7 +1759,7 @@ mod tests {
Arc::new(ConcreteDataType::boolean_datatype()),
));
let pb_value = to_proto_value(value);
let pb_value = to_grpc_value(value);
match pb_value.value_data.unwrap() {
ValueData::ListValue(pb_list_value) => {
@@ -1901,7 +1788,7 @@ mod tests {
.unwrap(),
);
let pb_value = to_proto_value(value);
let pb_value = to_grpc_value(value);
match pb_value.value_data.unwrap() {
ValueData::StructValue(pb_struct_value) => {

View File

@@ -5,7 +5,6 @@ edition.workspace = true
license.workspace = true
[features]
enterprise = []
testing = []
[lints]

View File

@@ -12,13 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub use client::{CachedKvBackend, CachedKvBackendBuilder, MetaKvBackend};
mod builder;
mod client;
mod manager;
mod table_cache;
pub use builder::KvBackendCatalogManagerBuilder;
pub use builder::{
CatalogManagerConfigurator, CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder,
};
pub use client::{CachedKvBackend, CachedKvBackendBuilder, MetaKvBackend};
pub use manager::KvBackendCatalogManager;
pub use table_cache::{TableCache, TableCacheRef, new_table_cache};

View File

@@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_error::ext::BoxedError;
use common_meta::cache::LayeredCacheRegistryRef;
use common_meta::key::TableMetadataManager;
use common_meta::key::flow::FlowMetadataManager;
@@ -23,24 +25,34 @@ use common_procedure::ProcedureManagerRef;
use moka::sync::Cache;
use partition::manager::PartitionRuleManager;
#[cfg(feature = "enterprise")]
use crate::information_schema::InformationSchemaTableFactoryRef;
use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
use crate::information_schema::{
InformationExtensionRef, InformationSchemaProvider, InformationSchemaTableFactoryRef,
};
use crate::kvbackend::KvBackendCatalogManager;
use crate::kvbackend::manager::{CATALOG_CACHE_MAX_CAPACITY, SystemCatalog};
use crate::process_manager::ProcessManagerRef;
use crate::system_schema::numbers_table_provider::NumbersTableProvider;
use crate::system_schema::pg_catalog::PGCatalogProvider;
/// The configurator that customizes or enhances the [`KvBackendCatalogManagerBuilder`].
#[async_trait::async_trait]
pub trait CatalogManagerConfigurator<C>: Send + Sync {
async fn configure(
&self,
builder: KvBackendCatalogManagerBuilder,
ctx: C,
) -> std::result::Result<KvBackendCatalogManagerBuilder, BoxedError>;
}
pub type CatalogManagerConfiguratorRef<C> = Arc<dyn CatalogManagerConfigurator<C>>;
pub struct KvBackendCatalogManagerBuilder {
information_extension: InformationExtensionRef,
backend: KvBackendRef,
cache_registry: LayeredCacheRegistryRef,
procedure_manager: Option<ProcedureManagerRef>,
process_manager: Option<ProcessManagerRef>,
#[cfg(feature = "enterprise")]
extra_information_table_factories:
std::collections::HashMap<String, InformationSchemaTableFactoryRef>,
extra_information_table_factories: HashMap<String, InformationSchemaTableFactoryRef>,
}
impl KvBackendCatalogManagerBuilder {
@@ -55,8 +67,7 @@ impl KvBackendCatalogManagerBuilder {
cache_registry,
procedure_manager: None,
process_manager: None,
#[cfg(feature = "enterprise")]
extra_information_table_factories: std::collections::HashMap::new(),
extra_information_table_factories: HashMap::new(),
}
}
@@ -71,10 +82,9 @@ impl KvBackendCatalogManagerBuilder {
}
/// Sets the extra information tables.
#[cfg(feature = "enterprise")]
pub fn with_extra_information_table_factories(
mut self,
factories: std::collections::HashMap<String, InformationSchemaTableFactoryRef>,
factories: HashMap<String, InformationSchemaTableFactoryRef>,
) -> Self {
self.extra_information_table_factories = factories;
self
@@ -87,7 +97,6 @@ impl KvBackendCatalogManagerBuilder {
cache_registry,
procedure_manager,
process_manager,
#[cfg(feature = "enterprise")]
extra_information_table_factories,
} = self;
Arc::new_cyclic(|me| KvBackendCatalogManager {
@@ -111,7 +120,6 @@ impl KvBackendCatalogManagerBuilder {
process_manager.clone(),
backend.clone(),
);
#[cfg(feature = "enterprise")]
let provider = provider
.with_extra_table_factories(extra_information_table_factories.clone());
Arc::new(provider)
@@ -123,7 +131,6 @@ impl KvBackendCatalogManagerBuilder {
numbers_table_provider: NumbersTableProvider,
backend,
process_manager,
#[cfg(feature = "enterprise")]
extra_information_table_factories,
},
cache_registry,

View File

@@ -53,9 +53,9 @@ use crate::error::{
CacheNotFoundSnafu, GetTableCacheSnafu, InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu,
ListSchemasSnafu, ListTablesSnafu, Result, TableMetadataManagerSnafu,
};
#[cfg(feature = "enterprise")]
use crate::information_schema::InformationSchemaTableFactoryRef;
use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
use crate::information_schema::{
InformationExtensionRef, InformationSchemaProvider, InformationSchemaTableFactoryRef,
};
use crate::kvbackend::TableCacheRef;
use crate::process_manager::ProcessManagerRef;
use crate::system_schema::SystemSchemaProvider;
@@ -557,7 +557,6 @@ pub(super) struct SystemCatalog {
pub(super) numbers_table_provider: NumbersTableProvider,
pub(super) backend: KvBackendRef,
pub(super) process_manager: Option<ProcessManagerRef>,
#[cfg(feature = "enterprise")]
pub(super) extra_information_table_factories:
std::collections::HashMap<String, InformationSchemaTableFactoryRef>,
}
@@ -628,7 +627,6 @@ impl SystemCatalog {
self.process_manager.clone(),
self.backend.clone(),
);
#[cfg(feature = "enterprise")]
let provider = provider
.with_extra_table_factories(self.extra_information_table_factories.clone());
Arc::new(provider)

View File

@@ -117,7 +117,6 @@ macro_rules! setup_memory_table {
};
}
#[cfg(feature = "enterprise")]
pub struct MakeInformationTableRequest {
pub catalog_name: String,
pub catalog_manager: Weak<dyn CatalogManager>,
@@ -128,12 +127,10 @@ pub struct MakeInformationTableRequest {
///
/// This trait allows for extensibility of the information schema by providing
/// a way to dynamically create custom information schema tables.
#[cfg(feature = "enterprise")]
pub trait InformationSchemaTableFactory {
fn make_information_table(&self, req: MakeInformationTableRequest) -> SystemTableRef;
}
#[cfg(feature = "enterprise")]
pub type InformationSchemaTableFactoryRef = Arc<dyn InformationSchemaTableFactory + Send + Sync>;
/// The `information_schema` tables info provider.
@@ -143,9 +140,7 @@ pub struct InformationSchemaProvider {
process_manager: Option<ProcessManagerRef>,
flow_metadata_manager: Arc<FlowMetadataManager>,
tables: HashMap<String, TableRef>,
#[allow(dead_code)]
kv_backend: KvBackendRef,
#[cfg(feature = "enterprise")]
extra_table_factories: HashMap<String, InformationSchemaTableFactoryRef>,
}
@@ -166,7 +161,6 @@ impl SystemSchemaProviderInner for InformationSchemaProvider {
}
fn system_table(&self, name: &str) -> Option<SystemTableRef> {
#[cfg(feature = "enterprise")]
if let Some(factory) = self.extra_table_factories.get(name) {
let req = MakeInformationTableRequest {
catalog_name: self.catalog_name.clone(),
@@ -281,7 +275,6 @@ impl InformationSchemaProvider {
process_manager,
tables: HashMap::new(),
kv_backend,
#[cfg(feature = "enterprise")]
extra_table_factories: HashMap::new(),
};
@@ -290,7 +283,6 @@ impl InformationSchemaProvider {
provider
}
#[cfg(feature = "enterprise")]
pub(crate) fn with_extra_table_factories(
mut self,
factories: HashMap<String, InformationSchemaTableFactoryRef>,
@@ -358,7 +350,6 @@ impl InformationSchemaProvider {
if let Some(process_list) = self.build_table(PROCESS_LIST) {
tables.insert(PROCESS_LIST.to_string(), process_list);
}
#[cfg(feature = "enterprise")]
for name in self.extra_table_factories.keys() {
tables.insert(name.clone(), self.build_table(name).expect(name));
}

View File

@@ -211,6 +211,7 @@ struct InformationSchemaPartitionsBuilder {
partition_names: StringVectorBuilder,
partition_ordinal_positions: Int64VectorBuilder,
partition_expressions: StringVectorBuilder,
partition_descriptions: StringVectorBuilder,
create_times: TimestampSecondVectorBuilder,
partition_ids: UInt64VectorBuilder,
}
@@ -231,6 +232,7 @@ impl InformationSchemaPartitionsBuilder {
partition_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
partition_ordinal_positions: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
partition_expressions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
partition_descriptions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
create_times: TimestampSecondVectorBuilder::with_capacity(INIT_CAPACITY),
partition_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
}
@@ -319,6 +321,21 @@ impl InformationSchemaPartitionsBuilder {
return;
}
// Get partition column names (shared by all partitions)
// In MySQL, PARTITION_EXPRESSION is the partitioning function expression (e.g., column name)
let partition_columns: String = table_info
.meta
.partition_column_names()
.cloned()
.collect::<Vec<_>>()
.join(", ");
let partition_expr_str = if partition_columns.is_empty() {
None
} else {
Some(partition_columns)
};
for (index, partition) in partitions.iter().enumerate() {
let partition_name = format!("p{index}");
@@ -328,8 +345,12 @@ impl InformationSchemaPartitionsBuilder {
self.partition_names.push(Some(&partition_name));
self.partition_ordinal_positions
.push(Some((index + 1) as i64));
let expression = partition.partition_expr.as_ref().map(|e| e.to_string());
self.partition_expressions.push(expression.as_deref());
// PARTITION_EXPRESSION: partition column names (same for all partitions)
self.partition_expressions
.push(partition_expr_str.as_deref());
// PARTITION_DESCRIPTION: partition boundary expression (different for each partition)
let description = partition.partition_expr.as_ref().map(|e| e.to_string());
self.partition_descriptions.push(description.as_deref());
self.create_times.push(Some(TimestampSecond::from(
table_info.meta.created_on.timestamp(),
)));
@@ -369,7 +390,7 @@ impl InformationSchemaPartitionsBuilder {
null_string_vector.clone(),
Arc::new(self.partition_expressions.finish()),
null_string_vector.clone(),
null_string_vector.clone(),
Arc::new(self.partition_descriptions.finish()),
// TODO(dennis): rows and index statistics info
null_i64_vector.clone(),
null_i64_vector.clone(),

View File

@@ -16,7 +16,7 @@ default = [
"meta-srv/pg_kvbackend",
"meta-srv/mysql_kvbackend",
]
enterprise = ["common-meta/enterprise", "frontend/enterprise", "meta-srv/enterprise", "catalog/enterprise"]
enterprise = ["common-meta/enterprise", "frontend/enterprise", "meta-srv/enterprise"]
tokio-console = ["common-telemetry/tokio-console"]
[lints]

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Debug;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
@@ -39,12 +40,14 @@ use flow::{
get_flow_auth_options,
};
use meta_client::{MetaClientOptions, MetaClientType};
use plugins::flownode::context::GrpcConfigureContext;
use servers::configurator::GrpcBuilderConfiguratorRef;
use snafu::{OptionExt, ResultExt, ensure};
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{
BuildCacheRegistrySnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu,
MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
MissingConfigSnafu, OtherSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
@@ -55,33 +58,14 @@ type FlownodeOptions = GreptimeOptions<flow::FlownodeOptions>;
pub struct Instance {
flownode: FlownodeInstance,
// The components of flownode, which make it easier to expand based
// on the components.
#[cfg(feature = "enterprise")]
components: Components,
// Keep the logging guard to prevent the worker from being dropped.
_guard: Vec<WorkerGuard>,
}
#[cfg(feature = "enterprise")]
pub struct Components {
pub catalog_manager: catalog::CatalogManagerRef,
pub fe_client: Arc<FrontendClient>,
pub kv_backend: common_meta::kv_backend::KvBackendRef,
}
impl Instance {
pub fn new(
flownode: FlownodeInstance,
#[cfg(feature = "enterprise")] components: Components,
guard: Vec<WorkerGuard>,
) -> Self {
pub fn new(flownode: FlownodeInstance, guard: Vec<WorkerGuard>) -> Self {
Self {
flownode,
#[cfg(feature = "enterprise")]
components,
_guard: guard,
}
}
@@ -94,11 +78,6 @@ impl Instance {
pub fn flownode_mut(&mut self) -> &mut FlownodeInstance {
&mut self.flownode
}
#[cfg(feature = "enterprise")]
pub fn components(&self) -> &Components {
&self.components
}
}
#[async_trait::async_trait]
@@ -396,7 +375,7 @@ impl StartCommand {
let frontend_client = Arc::new(frontend_client);
let flownode_builder = FlownodeBuilder::new(
opts.clone(),
plugins,
plugins.clone(),
table_metadata_manager,
catalog_manager.clone(),
flow_metadata_manager,
@@ -405,8 +384,29 @@ impl StartCommand {
.with_heartbeat_task(heartbeat_task);
let mut flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?;
let builder =
FlownodeServiceBuilder::grpc_server_builder(&opts, flownode.flownode_server());
let builder = if let Some(configurator) =
plugins.get::<GrpcBuilderConfiguratorRef<GrpcConfigureContext>>()
{
let context = GrpcConfigureContext {
kv_backend: cached_meta_backend.clone(),
fe_client: frontend_client.clone(),
flownode_id: member_id,
catalog_manager: catalog_manager.clone(),
};
configurator
.configure(builder, context)
.await
.context(OtherSnafu)?
} else {
builder
};
let grpc_server = builder.build();
let services = FlownodeServiceBuilder::new(&opts)
.with_default_grpc_server(flownode.flownode_server())
.with_grpc_server(grpc_server)
.enable_http_service()
.build()
.context(StartFlownodeSnafu)?;
@@ -430,16 +430,6 @@ impl StartCommand {
.set_frontend_invoker(invoker)
.await;
#[cfg(feature = "enterprise")]
let components = Components {
catalog_manager: catalog_manager.clone(),
fe_client: frontend_client,
kv_backend: cached_meta_backend,
};
#[cfg(not(feature = "enterprise"))]
return Ok(Instance::new(flownode, guard));
#[cfg(feature = "enterprise")]
Ok(Instance::new(flownode, components, guard))
Ok(Instance::new(flownode, guard))
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Debug;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
@@ -19,7 +20,10 @@ use std::time::Duration;
use async_trait::async_trait;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::information_extension::DistributedInformationExtension;
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend};
use catalog::kvbackend::{
CachedKvBackendBuilder, CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder,
MetaKvBackend,
};
use catalog::process_manager::ProcessManager;
use clap::Parser;
use client::client_manager::NodeClients;
@@ -42,13 +46,16 @@ use frontend::heartbeat::HeartbeatTask;
use frontend::instance::builder::FrontendBuilder;
use frontend::server::Services;
use meta_client::{MetaClientOptions, MetaClientType};
use plugins::frontend::context::{
CatalogManagerConfigureContext, DistributedCatalogManagerConfigureContext,
};
use servers::addrs;
use servers::grpc::GrpcOptions;
use servers::tls::{TlsMode, TlsOption};
use snafu::{OptionExt, ResultExt};
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{self, Result};
use crate::error::{self, OtherSnafu, Result};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
@@ -416,9 +423,18 @@ impl StartCommand {
layered_cache_registry.clone(),
)
.with_process_manager(process_manager.clone());
#[cfg(feature = "enterprise")]
let builder = if let Some(factories) = plugins.get() {
builder.with_extra_information_table_factories(factories)
let builder = if let Some(configurator) =
plugins.get::<CatalogManagerConfiguratorRef<CatalogManagerConfigureContext>>()
{
let ctx = DistributedCatalogManagerConfigureContext {
meta_client: meta_client.clone(),
};
let ctx = CatalogManagerConfigureContext::Distributed(ctx);
configurator
.configure(builder, ctx)
.await
.context(OtherSnafu)?
} else {
builder
};

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use std::fmt::{self, Debug};
use std::path::Path;
use std::time::Duration;
@@ -23,7 +23,7 @@ use common_config::Configurable;
use common_telemetry::info;
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
use common_version::{short_version, verbose_version};
use meta_srv::bootstrap::MetasrvInstance;
use meta_srv::bootstrap::{MetasrvInstance, metasrv_builder};
use meta_srv::metasrv::BackendImpl;
use snafu::ResultExt;
use tracing_appender::non_blocking::WorkerGuard;
@@ -177,7 +177,7 @@ pub struct StartCommand {
backend: Option<BackendImpl>,
}
impl fmt::Debug for StartCommand {
impl Debug for StartCommand {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StartCommand")
.field("rpc_bind_addr", &self.rpc_bind_addr)
@@ -341,7 +341,7 @@ impl StartCommand {
.await
.context(StartMetaServerSnafu)?;
let builder = meta_srv::bootstrap::metasrv_builder(&opts, plugins, None)
let builder = metasrv_builder(&opts, plugins, None)
.await
.context(error::BuildMetaServerSnafu)?;
let metasrv = builder.build().await.context(error::BuildMetaServerSnafu)?;

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Debug;
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
@@ -20,7 +21,7 @@ use std::{fs, path};
use async_trait::async_trait;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::information_schema::InformationExtensionRef;
use catalog::kvbackend::KvBackendCatalogManagerBuilder;
use catalog::kvbackend::{CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder};
use catalog::process_manager::ProcessManager;
use clap::Parser;
use common_base::Plugins;
@@ -31,7 +32,7 @@ use common_meta::cache::LayeredCacheRegistryBuilder;
use common_meta::ddl::flow_meta::FlowMetadataAllocator;
use common_meta::ddl::table_meta::TableMetadataAllocator;
use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
use common_meta::ddl_manager::DdlManager;
use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef};
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
@@ -57,13 +58,17 @@ use frontend::instance::StandaloneDatanodeManager;
use frontend::instance::builder::FrontendBuilder;
use frontend::server::Services;
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
use plugins::frontend::context::{
CatalogManagerConfigureContext, StandaloneCatalogManagerConfigureContext,
};
use plugins::standalone::context::DdlManagerConfigureContext;
use servers::tls::{TlsMode, TlsOption};
use snafu::ResultExt;
use standalone::StandaloneInformationExtension;
use standalone::options::StandaloneOptions;
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{Result, StartFlownodeSnafu};
use crate::error::{OtherSnafu, Result, StartFlownodeSnafu};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{App, create_resource_limit_metrics, error, log_versions, maybe_activate_heap_profile};
@@ -116,34 +121,15 @@ pub struct Instance {
flownode: FlownodeInstance,
procedure_manager: ProcedureManagerRef,
wal_options_allocator: WalOptionsAllocatorRef,
// The components of standalone, which make it easier to expand based
// on the components.
#[cfg(feature = "enterprise")]
components: Components,
// Keep the logging guard to prevent the worker from being dropped.
_guard: Vec<WorkerGuard>,
}
#[cfg(feature = "enterprise")]
pub struct Components {
pub plugins: Plugins,
pub kv_backend: KvBackendRef,
pub frontend_client: Arc<FrontendClient>,
pub catalog_manager: catalog::CatalogManagerRef,
}
impl Instance {
/// Find the socket addr of a server by its `name`.
pub fn server_addr(&self, name: &str) -> Option<SocketAddr> {
self.frontend.server_handlers().addr(name)
}
#[cfg(feature = "enterprise")]
pub fn components(&self) -> &Components {
&self.components
}
}
#[async_trait]
@@ -415,6 +401,13 @@ impl StartCommand {
plugins.insert::<InformationExtensionRef>(information_extension.clone());
let process_manager = Arc::new(ProcessManager::new(opts.grpc.server_addr.clone(), None));
// for standalone not use grpc, but get a handler to frontend grpc client without
// actually make a connection
let (frontend_client, frontend_instance_handler) =
FrontendClient::from_empty_grpc_handler(opts.query.clone());
let frontend_client = Arc::new(frontend_client);
let builder = KvBackendCatalogManagerBuilder::new(
information_extension.clone(),
kv_backend.clone(),
@@ -422,9 +415,17 @@ impl StartCommand {
)
.with_procedure_manager(procedure_manager.clone())
.with_process_manager(process_manager.clone());
#[cfg(feature = "enterprise")]
let builder = if let Some(factories) = plugins.get() {
builder.with_extra_information_table_factories(factories)
let builder = if let Some(configurator) =
plugins.get::<CatalogManagerConfiguratorRef<CatalogManagerConfigureContext>>()
{
let ctx = StandaloneCatalogManagerConfigureContext {
fe_client: frontend_client.clone(),
};
let ctx = CatalogManagerConfigureContext::Standalone(ctx);
configurator
.configure(builder, ctx)
.await
.context(OtherSnafu)?
} else {
builder
};
@@ -439,11 +440,6 @@ impl StartCommand {
..Default::default()
};
// for standalone not use grpc, but get a handler to frontend grpc client without
// actually make a connection
let (frontend_client, frontend_instance_handler) =
FrontendClient::from_empty_grpc_handler(opts.query.clone());
let frontend_client = Arc::new(frontend_client);
let flow_builder = FlownodeBuilder::new(
flownode_options,
plugins.clone(),
@@ -514,11 +510,21 @@ impl StartCommand {
let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager.clone(), true)
.context(error::InitDdlManagerSnafu)?;
#[cfg(feature = "enterprise")]
let ddl_manager = {
let trigger_ddl_manager: Option<common_meta::ddl_manager::TriggerDdlManagerRef> =
plugins.get();
ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager)
let ddl_manager = if let Some(configurator) =
plugins.get::<DdlManagerConfiguratorRef<DdlManagerConfigureContext>>()
{
let ctx = DdlManagerConfigureContext {
kv_backend: kv_backend.clone(),
fe_client: frontend_client.clone(),
catalog_manager: catalog_manager.clone(),
};
configurator
.configure(ddl_manager, ctx)
.await
.context(OtherSnafu)?
} else {
ddl_manager
};
let procedure_executor = Arc::new(LocalProcedureExecutor::new(
@@ -574,22 +580,12 @@ impl StartCommand {
heartbeat_task: None,
};
#[cfg(feature = "enterprise")]
let components = Components {
plugins,
kv_backend,
frontend_client,
catalog_manager,
};
Ok(Instance {
datanode,
frontend,
flownode,
procedure_manager,
wal_options_allocator,
#[cfg(feature = "enterprise")]
components,
_guard: guard,
})
}

View File

@@ -32,7 +32,12 @@ impl Plugins {
pub fn insert<T: 'static + Send + Sync>(&self, value: T) {
let last = self.write().insert(value);
assert!(last.is_none(), "each type of plugins must be one and only");
if last.is_some() {
panic!(
"Plugin of type {} already exists",
std::any::type_name::<T>()
);
}
}
pub fn get<T: 'static + Send + Sync + Clone>(&self) -> Option<T> {
@@ -140,7 +145,7 @@ mod tests {
}
#[test]
#[should_panic(expected = "each type of plugins must be one and only")]
#[should_panic(expected = "Plugin of type i32 already exists")]
fn test_plugin_uniqueness() {
let plugins = Plugins::new();
plugins.insert(1i32);

View File

@@ -14,6 +14,7 @@
mod binary;
mod ctx;
mod if_func;
mod is_null;
mod unary;
@@ -22,6 +23,7 @@ pub use ctx::EvalContext;
pub use unary::scalar_unary_op;
use crate::function_registry::FunctionRegistry;
use crate::scalars::expression::if_func::IfFunction;
use crate::scalars::expression::is_null::IsNullFunction;
pub(crate) struct ExpressionFunction;
@@ -29,5 +31,6 @@ pub(crate) struct ExpressionFunction;
impl ExpressionFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(IsNullFunction::default());
registry.register_scalar(IfFunction::default());
}
}

View File

@@ -0,0 +1,404 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use std::fmt::Display;
use arrow::array::ArrowNativeTypeOp;
use arrow::datatypes::ArrowPrimitiveType;
use datafusion::arrow::array::{Array, ArrayRef, AsArray, BooleanArray, PrimitiveArray};
use datafusion::arrow::compute::kernels::zip::zip;
use datafusion::arrow::datatypes::DataType;
use datafusion_common::DataFusionError;
use datafusion_expr::type_coercion::binary::comparison_coercion;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
use crate::function::Function;
const NAME: &str = "if";
/// MySQL-compatible IF function: IF(condition, true_value, false_value)
///
/// Returns true_value if condition is TRUE (not NULL and not 0),
/// otherwise returns false_value.
///
/// MySQL truthy rules:
/// - NULL -> false
/// - 0 (numeric zero) -> false
/// - Any non-zero numeric -> true
/// - Boolean true/false -> use directly
#[derive(Clone, Debug)]
pub struct IfFunction {
signature: Signature,
}
impl Default for IfFunction {
fn default() -> Self {
Self {
signature: Signature::any(3, Volatility::Immutable),
}
}
}
impl Display for IfFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", NAME.to_ascii_uppercase())
}
}
impl Function for IfFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, input_types: &[DataType]) -> datafusion_common::Result<DataType> {
// Return the common type of true_value and false_value (args[1] and args[2])
if input_types.len() < 3 {
return Err(DataFusionError::Plan(format!(
"{} requires 3 arguments, got {}",
NAME,
input_types.len()
)));
}
let true_type = &input_types[1];
let false_type = &input_types[2];
// Use comparison_coercion to find common type
comparison_coercion(true_type, false_type).ok_or_else(|| {
DataFusionError::Plan(format!(
"Cannot find common type for IF function between {:?} and {:?}",
true_type, false_type
))
})
}
fn signature(&self) -> &Signature {
&self.signature
}
fn invoke_with_args(
&self,
args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
if args.args.len() != 3 {
return Err(DataFusionError::Plan(format!(
"{} requires exactly 3 arguments, got {}",
NAME,
args.args.len()
)));
}
let condition = &args.args[0];
let true_value = &args.args[1];
let false_value = &args.args[2];
// Convert condition to boolean array using MySQL truthy rules
let bool_array = to_boolean_array(condition, args.number_rows)?;
// Convert true and false values to arrays
let true_array = true_value.to_array(args.number_rows)?;
let false_array = false_value.to_array(args.number_rows)?;
// Use zip to select values based on condition
// zip expects &dyn Datum, and ArrayRef (Arc<dyn Array>) implements Datum
let result = zip(&bool_array, &true_array, &false_array)?;
Ok(ColumnarValue::Array(result))
}
}
/// Convert a ColumnarValue to a BooleanArray using MySQL truthy rules:
/// - NULL -> false
/// - 0 (any numeric zero) -> false
/// - Non-zero numeric -> true
/// - Boolean -> use directly
fn to_boolean_array(
value: &ColumnarValue,
num_rows: usize,
) -> datafusion_common::Result<BooleanArray> {
let array = value.to_array(num_rows)?;
array_to_bool(array)
}
/// Convert an integer PrimitiveArray to BooleanArray using MySQL truthy rules:
/// NULL -> false, 0 -> false, non-zero -> true
fn int_array_to_bool<T>(array: &PrimitiveArray<T>) -> BooleanArray
where
T: ArrowPrimitiveType,
T::Native: ArrowNativeTypeOp,
{
BooleanArray::from_iter(
array
.iter()
.map(|opt| Some(opt.is_some_and(|v| !v.is_zero()))),
)
}
/// Convert a float PrimitiveArray to BooleanArray using MySQL truthy rules:
/// NULL -> false, 0 (including -0.0) -> false, NaN -> true, other non-zero -> true
fn float_array_to_bool<T>(array: &PrimitiveArray<T>) -> BooleanArray
where
T: ArrowPrimitiveType,
T::Native: ArrowNativeTypeOp + num_traits::Float,
{
use num_traits::Float;
BooleanArray::from_iter(
array
.iter()
.map(|opt| Some(opt.is_some_and(|v| v.is_nan() || !v.is_zero()))),
)
}
/// Convert an Array to BooleanArray using MySQL truthy rules
fn array_to_bool(array: ArrayRef) -> datafusion_common::Result<BooleanArray> {
use arrow::datatypes::*;
match array.data_type() {
DataType::Boolean => {
let bool_array = array.as_boolean();
Ok(BooleanArray::from_iter(
bool_array.iter().map(|opt| Some(opt.unwrap_or(false))),
))
}
DataType::Int8 => Ok(int_array_to_bool(array.as_primitive::<Int8Type>())),
DataType::Int16 => Ok(int_array_to_bool(array.as_primitive::<Int16Type>())),
DataType::Int32 => Ok(int_array_to_bool(array.as_primitive::<Int32Type>())),
DataType::Int64 => Ok(int_array_to_bool(array.as_primitive::<Int64Type>())),
DataType::UInt8 => Ok(int_array_to_bool(array.as_primitive::<UInt8Type>())),
DataType::UInt16 => Ok(int_array_to_bool(array.as_primitive::<UInt16Type>())),
DataType::UInt32 => Ok(int_array_to_bool(array.as_primitive::<UInt32Type>())),
DataType::UInt64 => Ok(int_array_to_bool(array.as_primitive::<UInt64Type>())),
// Float16 needs special handling since half::f16 doesn't implement num_traits::Float
DataType::Float16 => {
let typed_array = array.as_primitive::<Float16Type>();
Ok(BooleanArray::from_iter(typed_array.iter().map(|opt| {
Some(opt.is_some_and(|v| {
let f = v.to_f32();
f.is_nan() || !f.is_zero()
}))
})))
}
DataType::Float32 => Ok(float_array_to_bool(array.as_primitive::<Float32Type>())),
DataType::Float64 => Ok(float_array_to_bool(array.as_primitive::<Float64Type>())),
// Null type is always false.
// Note: NullArray::is_null() returns false (physical null), so we must handle it explicitly.
// See: https://github.com/apache/arrow-rs/issues/4840
DataType::Null => Ok(BooleanArray::from(vec![false; array.len()])),
// For other types, treat non-null as true
_ => {
let len = array.len();
Ok(BooleanArray::from_iter(
(0..len).map(|i| Some(!array.is_null(i))),
))
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow_schema::Field;
use datafusion_common::ScalarValue;
use datafusion_common::arrow::array::{AsArray, Int32Array, StringArray};
use super::*;
#[test]
fn test_if_function_basic() {
let if_func = IfFunction::default();
assert_eq!("if", if_func.name());
// Test IF(true, 'yes', 'no') -> 'yes'
let result = if_func
.invoke_with_args(ScalarFunctionArgs {
args: vec![
ColumnarValue::Scalar(ScalarValue::Boolean(Some(true))),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("yes".to_string()))),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("no".to_string()))),
],
arg_fields: vec![],
number_rows: 1,
return_field: Arc::new(Field::new("", DataType::Utf8, true)),
config_options: Arc::new(Default::default()),
})
.unwrap();
if let ColumnarValue::Array(arr) = result {
let str_arr = arr.as_string::<i32>();
assert_eq!(str_arr.value(0), "yes");
} else {
panic!("Expected Array result");
}
}
#[test]
fn test_if_function_false() {
let if_func = IfFunction::default();
// Test IF(false, 'yes', 'no') -> 'no'
let result = if_func
.invoke_with_args(ScalarFunctionArgs {
args: vec![
ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("yes".to_string()))),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("no".to_string()))),
],
arg_fields: vec![],
number_rows: 1,
return_field: Arc::new(Field::new("", DataType::Utf8, true)),
config_options: Arc::new(Default::default()),
})
.unwrap();
if let ColumnarValue::Array(arr) = result {
let str_arr = arr.as_string::<i32>();
assert_eq!(str_arr.value(0), "no");
} else {
panic!("Expected Array result");
}
}
#[test]
fn test_if_function_null_is_false() {
let if_func = IfFunction::default();
// Test IF(NULL, 'yes', 'no') -> 'no' (NULL is treated as false)
// Using Boolean(None) - typed null
let result = if_func
.invoke_with_args(ScalarFunctionArgs {
args: vec![
ColumnarValue::Scalar(ScalarValue::Boolean(None)),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("yes".to_string()))),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("no".to_string()))),
],
arg_fields: vec![],
number_rows: 1,
return_field: Arc::new(Field::new("", DataType::Utf8, true)),
config_options: Arc::new(Default::default()),
})
.unwrap();
if let ColumnarValue::Array(arr) = result {
let str_arr = arr.as_string::<i32>();
assert_eq!(str_arr.value(0), "no");
} else {
panic!("Expected Array result");
}
// Test IF(NULL, 'yes', 'no') -> 'no' using ScalarValue::Null (untyped null from SQL NULL literal)
let result = if_func
.invoke_with_args(ScalarFunctionArgs {
args: vec![
ColumnarValue::Scalar(ScalarValue::Null),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("yes".to_string()))),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("no".to_string()))),
],
arg_fields: vec![],
number_rows: 1,
return_field: Arc::new(Field::new("", DataType::Utf8, true)),
config_options: Arc::new(Default::default()),
})
.unwrap();
if let ColumnarValue::Array(arr) = result {
let str_arr = arr.as_string::<i32>();
assert_eq!(str_arr.value(0), "no");
} else {
panic!("Expected Array result");
}
}
#[test]
fn test_if_function_numeric_truthy() {
let if_func = IfFunction::default();
// Test IF(1, 'yes', 'no') -> 'yes' (non-zero is true)
let result = if_func
.invoke_with_args(ScalarFunctionArgs {
args: vec![
ColumnarValue::Scalar(ScalarValue::Int32(Some(1))),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("yes".to_string()))),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("no".to_string()))),
],
arg_fields: vec![],
number_rows: 1,
return_field: Arc::new(Field::new("", DataType::Utf8, true)),
config_options: Arc::new(Default::default()),
})
.unwrap();
if let ColumnarValue::Array(arr) = result {
let str_arr = arr.as_string::<i32>();
assert_eq!(str_arr.value(0), "yes");
} else {
panic!("Expected Array result");
}
// Test IF(0, 'yes', 'no') -> 'no' (zero is false)
let result = if_func
.invoke_with_args(ScalarFunctionArgs {
args: vec![
ColumnarValue::Scalar(ScalarValue::Int32(Some(0))),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("yes".to_string()))),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("no".to_string()))),
],
arg_fields: vec![],
number_rows: 1,
return_field: Arc::new(Field::new("", DataType::Utf8, true)),
config_options: Arc::new(Default::default()),
})
.unwrap();
if let ColumnarValue::Array(arr) = result {
let str_arr = arr.as_string::<i32>();
assert_eq!(str_arr.value(0), "no");
} else {
panic!("Expected Array result");
}
}
#[test]
fn test_if_function_with_arrays() {
let if_func = IfFunction::default();
// Test with array condition
let condition = Int32Array::from(vec![Some(1), Some(0), None, Some(5)]);
let true_val = StringArray::from(vec!["yes", "yes", "yes", "yes"]);
let false_val = StringArray::from(vec!["no", "no", "no", "no"]);
let result = if_func
.invoke_with_args(ScalarFunctionArgs {
args: vec![
ColumnarValue::Array(Arc::new(condition)),
ColumnarValue::Array(Arc::new(true_val)),
ColumnarValue::Array(Arc::new(false_val)),
],
arg_fields: vec![],
number_rows: 4,
return_field: Arc::new(Field::new("", DataType::Utf8, true)),
config_options: Arc::new(Default::default()),
})
.unwrap();
if let ColumnarValue::Array(arr) = result {
let str_arr = arr.as_string::<i32>();
assert_eq!(str_arr.value(0), "yes"); // 1 is true
assert_eq!(str_arr.value(1), "no"); // 0 is false
assert_eq!(str_arr.value(2), "no"); // NULL is false
assert_eq!(str_arr.value(3), "yes"); // 5 is true
} else {
panic!("Expected Array result");
}
}
}

View File

@@ -17,7 +17,7 @@ use std::sync::Arc;
use common_catalog::consts::{
DEFAULT_PRIVATE_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME,
};
use datafusion::arrow::array::{ArrayRef, StringArray, as_boolean_array};
use datafusion::arrow::array::{ArrayRef, StringArray, StringBuilder, as_boolean_array};
use datafusion::catalog::TableFunction;
use datafusion::common::ScalarValue;
use datafusion::common::utils::SingleRowListArrayBuilder;
@@ -34,10 +34,15 @@ const CURRENT_SCHEMA_FUNCTION_NAME: &str = "current_schema";
const CURRENT_SCHEMAS_FUNCTION_NAME: &str = "current_schemas";
const SESSION_USER_FUNCTION_NAME: &str = "session_user";
const CURRENT_DATABASE_FUNCTION_NAME: &str = "current_database";
const OBJ_DESCRIPTION_FUNCTION_NAME: &str = "obj_description";
const COL_DESCRIPTION_FUNCTION_NAME: &str = "col_description";
const SHOBJ_DESCRIPTION_FUNCTION_NAME: &str = "shobj_description";
const PG_MY_TEMP_SCHEMA_FUNCTION_NAME: &str = "pg_my_temp_schema";
define_nullary_udf!(CurrentSchemaFunction);
define_nullary_udf!(SessionUserFunction);
define_nullary_udf!(CurrentDatabaseFunction);
define_nullary_udf!(PgMyTempSchemaFunction);
impl Function for CurrentDatabaseFunction {
fn name(&self) -> &str {
@@ -173,6 +178,175 @@ impl Function for CurrentSchemasFunction {
}
}
/// PostgreSQL obj_description - returns NULL for compatibility
#[derive(Display, Debug, Clone)]
#[display("{}", self.name())]
pub(super) struct ObjDescriptionFunction {
signature: Signature,
}
impl ObjDescriptionFunction {
pub fn new() -> Self {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Exact(vec![DataType::Int64, DataType::Utf8]),
TypeSignature::Exact(vec![DataType::UInt32, DataType::Utf8]),
TypeSignature::Exact(vec![DataType::Int64]),
TypeSignature::Exact(vec![DataType::UInt32]),
],
Volatility::Stable,
),
}
}
}
impl Function for ObjDescriptionFunction {
fn name(&self) -> &str {
OBJ_DESCRIPTION_FUNCTION_NAME
}
fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
Ok(DataType::Utf8)
}
fn signature(&self) -> &Signature {
&self.signature
}
fn invoke_with_args(
&self,
args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
let num_rows = args.number_rows;
let mut builder = StringBuilder::with_capacity(num_rows, 0);
for _ in 0..num_rows {
builder.append_null();
}
Ok(ColumnarValue::Array(Arc::new(builder.finish())))
}
}
/// PostgreSQL col_description - returns NULL for compatibility
#[derive(Display, Debug, Clone)]
#[display("{}", self.name())]
pub(super) struct ColDescriptionFunction {
signature: Signature,
}
impl ColDescriptionFunction {
pub fn new() -> Self {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Exact(vec![DataType::Int64, DataType::Int32]),
TypeSignature::Exact(vec![DataType::UInt32, DataType::Int32]),
TypeSignature::Exact(vec![DataType::Int64, DataType::Int64]),
TypeSignature::Exact(vec![DataType::UInt32, DataType::Int64]),
],
Volatility::Stable,
),
}
}
}
impl Function for ColDescriptionFunction {
fn name(&self) -> &str {
COL_DESCRIPTION_FUNCTION_NAME
}
fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
Ok(DataType::Utf8)
}
fn signature(&self) -> &Signature {
&self.signature
}
fn invoke_with_args(
&self,
args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
let num_rows = args.number_rows;
let mut builder = StringBuilder::with_capacity(num_rows, 0);
for _ in 0..num_rows {
builder.append_null();
}
Ok(ColumnarValue::Array(Arc::new(builder.finish())))
}
}
/// PostgreSQL shobj_description - returns NULL for compatibility
#[derive(Display, Debug, Clone)]
#[display("{}", self.name())]
pub(super) struct ShobjDescriptionFunction {
signature: Signature,
}
impl ShobjDescriptionFunction {
pub fn new() -> Self {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Exact(vec![DataType::Int64, DataType::Utf8]),
TypeSignature::Exact(vec![DataType::UInt64, DataType::Utf8]),
TypeSignature::Exact(vec![DataType::Int32, DataType::Utf8]),
TypeSignature::Exact(vec![DataType::UInt32, DataType::Utf8]),
],
Volatility::Stable,
),
}
}
}
impl Function for ShobjDescriptionFunction {
fn name(&self) -> &str {
SHOBJ_DESCRIPTION_FUNCTION_NAME
}
fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
Ok(DataType::Utf8)
}
fn signature(&self) -> &Signature {
&self.signature
}
fn invoke_with_args(
&self,
args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
let num_rows = args.number_rows;
let mut builder = StringBuilder::with_capacity(num_rows, 0);
for _ in 0..num_rows {
builder.append_null();
}
Ok(ColumnarValue::Array(Arc::new(builder.finish())))
}
}
/// PostgreSQL pg_my_temp_schema - returns 0 (no temp schema) for compatibility
impl Function for PgMyTempSchemaFunction {
fn name(&self) -> &str {
PG_MY_TEMP_SCHEMA_FUNCTION_NAME
}
fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
Ok(DataType::UInt32)
}
fn signature(&self) -> &Signature {
&self.signature
}
fn invoke_with_args(
&self,
_args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
Ok(ColumnarValue::Scalar(ScalarValue::UInt32(Some(0))))
}
}
pub(super) struct PGCatalogFunction;
impl PGCatalogFunction {
@@ -212,5 +386,98 @@ impl PGCatalogFunction {
registry.register(pg_catalog::create_pg_total_relation_size_udf());
registry.register(pg_catalog::create_pg_stat_get_numscans());
registry.register(pg_catalog::create_pg_get_constraintdef());
registry.register(pg_catalog::create_pg_get_partition_ancestors_udf());
registry.register_scalar(ObjDescriptionFunction::new());
registry.register_scalar(ColDescriptionFunction::new());
registry.register_scalar(ShobjDescriptionFunction::new());
registry.register_scalar(PgMyTempSchemaFunction::default());
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow_schema::Field;
use datafusion::arrow::array::Array;
use datafusion_common::ScalarValue;
use datafusion_expr::ColumnarValue;
use super::*;
fn create_test_args(args: Vec<ColumnarValue>, number_rows: usize) -> ScalarFunctionArgs {
ScalarFunctionArgs {
args,
arg_fields: vec![],
number_rows,
return_field: Arc::new(Field::new("result", DataType::Utf8, true)),
config_options: Arc::new(Default::default()),
}
}
#[test]
fn test_obj_description_function() {
let func = ObjDescriptionFunction::new();
assert_eq!("obj_description", func.name());
assert_eq!(DataType::Utf8, func.return_type(&[]).unwrap());
let args = create_test_args(
vec![
ColumnarValue::Scalar(ScalarValue::Int64(Some(1234))),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("pg_class".to_string()))),
],
1,
);
let result = func.invoke_with_args(args).unwrap();
if let ColumnarValue::Array(arr) = result {
assert_eq!(1, arr.len());
assert!(arr.is_null(0));
} else {
panic!("Expected Array result");
}
}
#[test]
fn test_col_description_function() {
let func = ColDescriptionFunction::new();
assert_eq!("col_description", func.name());
assert_eq!(DataType::Utf8, func.return_type(&[]).unwrap());
let args = create_test_args(
vec![
ColumnarValue::Scalar(ScalarValue::Int64(Some(1234))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(1))),
],
1,
);
let result = func.invoke_with_args(args).unwrap();
if let ColumnarValue::Array(arr) = result {
assert_eq!(1, arr.len());
assert!(arr.is_null(0));
} else {
panic!("Expected Array result");
}
}
#[test]
fn test_shobj_description_function() {
let func = ShobjDescriptionFunction::new();
assert_eq!("shobj_description", func.name());
assert_eq!(DataType::Utf8, func.return_type(&[]).unwrap());
let args = create_test_args(
vec![
ColumnarValue::Scalar(ScalarValue::Int64(Some(1))),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("pg_database".to_string()))),
],
1,
);
let result = func.invoke_with_args(args).unwrap();
if let ColumnarValue::Array(arr) = result {
assert_eq!(1, arr.len());
assert!(arr.is_null(0));
} else {
panic!("Expected Array result");
}
}
}

View File

@@ -14,6 +14,7 @@
use std::sync::Arc;
use common_error::ext::BoxedError;
use common_procedure::{
BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId, watcher,
};
@@ -66,6 +67,19 @@ use crate::rpc::ddl::{
};
use crate::rpc::router::RegionRoute;
/// A configurator that customizes or enhances a [`DdlManager`].
#[async_trait::async_trait]
pub trait DdlManagerConfigurator<C>: Send + Sync {
/// Configures the given [`DdlManager`] using the provided [`DdlManagerConfigureContext`].
async fn configure(
&self,
ddl_manager: DdlManager,
ctx: C,
) -> std::result::Result<DdlManager, BoxedError>;
}
pub type DdlManagerConfiguratorRef<C> = Arc<dyn DdlManagerConfigurator<C>>;
pub type DdlManagerRef = Arc<DdlManager>;
pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoader;
@@ -148,11 +162,8 @@ impl DdlManager {
}
#[cfg(feature = "enterprise")]
pub fn with_trigger_ddl_manager(
mut self,
trigger_ddl_manager: Option<TriggerDdlManagerRef>,
) -> Self {
self.trigger_ddl_manager = trigger_ddl_manager;
pub fn with_trigger_ddl_manager(mut self, trigger_ddl_manager: TriggerDdlManagerRef) -> Self {
self.trigger_ddl_manager = Some(trigger_ddl_manager);
self
}

View File

@@ -339,6 +339,16 @@ pub struct FlushRegions {
pub error_strategy: FlushErrorStrategy,
}
impl Display for FlushRegions {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"FlushRegions(region_ids={:?}, strategy={:?}, error_strategy={:?})",
self.region_ids, self.strategy, self.error_strategy
)
}
}
impl FlushRegions {
/// Create synchronous single-region flush
pub fn sync_single(region_id: RegionId) -> Self {

View File

@@ -34,6 +34,8 @@ pub mod memory;
#[cfg(any(feature = "mysql_kvbackend", feature = "pg_kvbackend"))]
pub mod rds;
pub mod test;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
pub mod txn;
pub mod util;
pub type KvBackendRef<E = Error> = Arc<dyn KvBackend<Error = E> + Send + Sync>;

View File

@@ -0,0 +1,125 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::sync::Arc;
use derive_builder::Builder;
use crate::error::Result;
use crate::kv_backend::txn::{Txn, TxnResponse};
use crate::kv_backend::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, KvBackend, PutRequest, PutResponse,
RangeRequest, RangeResponse, TxnService,
};
pub type MockFn<Req, Resp> = Arc<dyn Fn(Req) -> Result<Resp> + Send + Sync>;
/// A mock kv backend for testing.
#[derive(Builder)]
pub struct MockKvBackend {
#[builder(setter(strip_option), default)]
pub range_fn: Option<MockFn<RangeRequest, RangeResponse>>,
#[builder(setter(strip_option), default)]
pub put_fn: Option<MockFn<PutRequest, PutResponse>>,
#[builder(setter(strip_option), default)]
pub batch_put_fn: Option<MockFn<BatchPutRequest, BatchPutResponse>>,
#[builder(setter(strip_option), default)]
pub batch_get_fn: Option<MockFn<BatchGetRequest, BatchGetResponse>>,
#[builder(setter(strip_option), default)]
pub delete_range_fn: Option<MockFn<DeleteRangeRequest, DeleteRangeResponse>>,
#[builder(setter(strip_option), default)]
pub batch_delete_fn: Option<MockFn<BatchDeleteRequest, BatchDeleteResponse>>,
#[builder(setter(strip_option), default)]
pub txn: Option<MockFn<Txn, TxnResponse>>,
#[builder(setter(strip_option), default)]
pub max_txn_ops: Option<usize>,
}
#[async_trait::async_trait]
impl TxnService for MockKvBackend {
type Error = crate::error::Error;
async fn txn(&self, txn: Txn) -> Result<TxnResponse> {
if let Some(f) = &self.txn {
f(txn)
} else {
unimplemented!()
}
}
fn max_txn_ops(&self) -> usize {
self.max_txn_ops.unwrap()
}
}
#[async_trait::async_trait]
impl KvBackend for MockKvBackend {
fn name(&self) -> &str {
"mock_kv_backend"
}
fn as_any(&self) -> &dyn Any {
self
}
async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
if let Some(f) = &self.range_fn {
f(req)
} else {
unimplemented!()
}
}
async fn put(&self, req: PutRequest) -> Result<PutResponse> {
if let Some(f) = &self.put_fn {
f(req)
} else {
unimplemented!()
}
}
async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
if let Some(f) = &self.batch_put_fn {
f(req)
} else {
unimplemented!()
}
}
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
if let Some(f) = &self.batch_get_fn {
f(req)
} else {
unimplemented!()
}
}
async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
if let Some(f) = &self.delete_range_fn {
f(req)
} else {
unimplemented!()
}
}
async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
if let Some(f) = &self.batch_delete_fn {
f(req)
} else {
unimplemented!()
}
}
}

View File

@@ -246,14 +246,6 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Loader for {type_name} is not implemented: {reason}"))]
ProcedureLoaderNotImplemented {
#[snafu(implicit)]
location: Location,
type_name: String,
reason: String,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -274,8 +266,7 @@ impl ErrorExt for Error {
Error::ToJson { .. }
| Error::DeleteState { .. }
| Error::FromJson { .. }
| Error::WaitWatcher { .. }
| Error::ProcedureLoaderNotImplemented { .. } => StatusCode::Internal,
| Error::WaitWatcher { .. } => StatusCode::Internal,
Error::RetryTimesExceeded { .. }
| Error::RollbackTimesExceeded { .. }

View File

@@ -188,6 +188,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to align JSON array, reason: {reason}"))]
AlignJsonArray {
reason: String,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -203,7 +210,8 @@ impl ErrorExt for Error {
| Error::ToArrowScalar { .. }
| Error::ProjectArrowRecordBatch { .. }
| Error::PhysicalExpr { .. }
| Error::RecordBatchSliceIndexOverflow { .. } => StatusCode::Internal,
| Error::RecordBatchSliceIndexOverflow { .. }
| Error::AlignJsonArray { .. } => StatusCode::Internal,
Error::PollStream { .. } => StatusCode::EngineExecuteQuery,

View File

@@ -18,7 +18,7 @@ pub mod adapter;
pub mod cursor;
pub mod error;
pub mod filter;
mod recordbatch;
pub mod recordbatch;
pub mod util;
use std::fmt;

View File

@@ -20,7 +20,8 @@ use datafusion::arrow::util::pretty::pretty_format_batches;
use datafusion_common::arrow::array::ArrayRef;
use datafusion_common::arrow::compute;
use datafusion_common::arrow::datatypes::{DataType as ArrowDataType, SchemaRef as ArrowSchemaRef};
use datatypes::arrow::array::{Array, AsArray, RecordBatchOptions};
use datatypes::arrow::array::{Array, AsArray, RecordBatchOptions, StructArray, new_null_array};
use datatypes::extension::json::is_json_extension_type;
use datatypes::prelude::DataType;
use datatypes::schema::SchemaRef;
use datatypes::vectors::{Helper, VectorRef};
@@ -30,8 +31,8 @@ use snafu::{OptionExt, ResultExt, ensure};
use crate::DfRecordBatch;
use crate::error::{
self, ArrowComputeSnafu, ColumnNotExistsSnafu, DataTypesSnafu, ProjectArrowRecordBatchSnafu,
Result,
self, AlignJsonArraySnafu, ArrowComputeSnafu, ColumnNotExistsSnafu, DataTypesSnafu,
NewDfRecordBatchSnafu, ProjectArrowRecordBatchSnafu, Result,
};
/// A two-dimensional batch of column-oriented data with a defined schema.
@@ -59,6 +60,8 @@ impl RecordBatch {
// TODO(LFC): Remove the casting here once `Batch` is no longer used.
let arrow_arrays = Self::cast_view_arrays(schema.arrow_schema(), arrow_arrays)?;
let arrow_arrays = maybe_align_json_array_with_schema(schema.arrow_schema(), arrow_arrays)?;
let df_record_batch = DfRecordBatch::try_new(schema.arrow_schema().clone(), arrow_arrays)
.context(error::NewDfRecordBatchSnafu)?;
@@ -327,12 +330,111 @@ pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Resul
Ok(RecordBatch::from_df_record_batch(schema, record_batch))
}
/// Align a json array `json_array` to the json type `schema_type`. The `schema_type` is often the
/// "largest" json type after some insertions in the table schema, while the json array previously
/// written in the SST could be lagged behind it. So it's important to "amend" the json array's
/// missing fields with null arrays, to align the array's data type with the provided one.
///
/// # Panics
///
/// - The json array is not an Arrow [StructArray], or the provided data type `schema_type` is not
/// of Struct type. Both of which shouldn't happen unless we switch our implementation of how
/// json array is physically stored.
pub fn align_json_array(json_array: &ArrayRef, schema_type: &ArrowDataType) -> Result<ArrayRef> {
let json_type = json_array.data_type();
if json_type == schema_type {
return Ok(json_array.clone());
}
let json_array = json_array.as_struct();
let array_fields = json_array.fields();
let array_columns = json_array.columns();
let ArrowDataType::Struct(schema_fields) = schema_type else {
unreachable!()
};
let mut aligned = Vec::with_capacity(schema_fields.len());
// Compare the fields in the json array and the to-be-aligned schema, amending with null arrays
// on the way. It's very important to note that fields in the json array and in the json type
// are both SORTED.
let mut i = 0; // point to the schema fields
let mut j = 0; // point to the array fields
while i < schema_fields.len() && j < array_fields.len() {
let schema_field = &schema_fields[i];
let array_field = &array_fields[j];
if schema_field.name() == array_field.name() {
if matches!(schema_field.data_type(), ArrowDataType::Struct(_)) {
// A `StructArray`s in a json array must be another json array. (Like a nested json
// object in a json value.)
aligned.push(align_json_array(
&array_columns[j],
schema_field.data_type(),
)?);
} else {
aligned.push(array_columns[j].clone());
}
j += 1;
} else {
aligned.push(new_null_array(schema_field.data_type(), json_array.len()));
}
i += 1;
}
if i < schema_fields.len() {
for field in &schema_fields[i..] {
aligned.push(new_null_array(field.data_type(), json_array.len()));
}
}
ensure!(
j == array_fields.len(),
AlignJsonArraySnafu {
reason: format!(
"this json array has more fields {:?}",
array_fields[j..]
.iter()
.map(|x| x.name())
.collect::<Vec<_>>(),
)
}
);
let json_array =
StructArray::try_new(schema_fields.clone(), aligned, json_array.nulls().cloned())
.context(NewDfRecordBatchSnafu)?;
Ok(Arc::new(json_array))
}
fn maybe_align_json_array_with_schema(
schema: &ArrowSchemaRef,
arrays: Vec<ArrayRef>,
) -> Result<Vec<ArrayRef>> {
if schema.fields().iter().all(|f| !is_json_extension_type(f)) {
return Ok(arrays);
}
let mut aligned = Vec::with_capacity(arrays.len());
for (field, array) in schema.fields().iter().zip(arrays.into_iter()) {
if !is_json_extension_type(field) {
aligned.push(array);
continue;
}
let json_array = align_json_array(&array, field.data_type())?;
aligned.push(json_array);
}
Ok(aligned)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use datatypes::arrow::array::{AsArray, UInt32Array};
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, UInt32Type};
use datatypes::arrow::array::{
AsArray, BooleanArray, Float64Array, Int64Array, ListArray, UInt32Array,
};
use datatypes::arrow::datatypes::{
DataType, Field, Fields, Int64Type, Schema as ArrowSchema, UInt32Type,
};
use datatypes::arrow_array::StringArray;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
@@ -340,6 +442,165 @@ mod tests {
use super::*;
#[test]
fn test_align_json_array() -> Result<()> {
struct TestCase {
json_array: ArrayRef,
schema_type: DataType,
expected: std::result::Result<ArrayRef, String>,
}
impl TestCase {
fn new(
json_array: StructArray,
schema_type: Fields,
expected: std::result::Result<Vec<ArrayRef>, String>,
) -> Self {
Self {
json_array: Arc::new(json_array),
schema_type: DataType::Struct(schema_type.clone()),
expected: expected
.map(|x| Arc::new(StructArray::new(schema_type, x, None)) as ArrayRef),
}
}
fn test(self) -> Result<()> {
let result = align_json_array(&self.json_array, &self.schema_type);
match (result, self.expected) {
(Ok(json_array), Ok(expected)) => assert_eq!(&json_array, &expected),
(Ok(json_array), Err(e)) => {
panic!("expecting error {e} but actually get: {json_array:?}")
}
(Err(e), Err(expected)) => assert_eq!(e.to_string(), expected),
(Err(e), Ok(_)) => return Err(e),
}
Ok(())
}
}
// Test empty json array can be aligned with a complex json type.
TestCase::new(
StructArray::new_empty_fields(2, None),
Fields::from(vec![
Field::new("int", DataType::Int64, true),
Field::new_struct(
"nested",
vec![Field::new("bool", DataType::Boolean, true)],
true,
),
Field::new("string", DataType::Utf8, true),
]),
Ok(vec![
Arc::new(Int64Array::new_null(2)) as ArrayRef,
Arc::new(StructArray::new_null(
Fields::from(vec![Arc::new(Field::new("bool", DataType::Boolean, true))]),
2,
)),
Arc::new(StringArray::new_null(2)),
]),
)
.test()?;
// Test simple json array alignment.
TestCase::new(
StructArray::from(vec![(
Arc::new(Field::new("float", DataType::Float64, true)),
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
)]),
Fields::from(vec![
Field::new("float", DataType::Float64, true),
Field::new("string", DataType::Utf8, true),
]),
Ok(vec![
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
Arc::new(StringArray::new_null(3)),
]),
)
.test()?;
// Test complex json array alignment.
TestCase::new(
StructArray::from(vec![
(
Arc::new(Field::new_list(
"list",
Field::new_list_field(DataType::Int64, true),
true,
)),
Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(vec![Some(1)]),
None,
Some(vec![Some(2), Some(3)]),
])) as ArrayRef,
),
(
Arc::new(Field::new_struct(
"nested",
vec![Field::new("int", DataType::Int64, true)],
true,
)),
Arc::new(StructArray::from(vec![(
Arc::new(Field::new("int", DataType::Int64, true)),
Arc::new(Int64Array::from(vec![-1, -2, -3])) as ArrayRef,
)])),
),
(
Arc::new(Field::new("string", DataType::Utf8, true)),
Arc::new(StringArray::from(vec!["a", "b", "c"])),
),
]),
Fields::from(vec![
Field::new("bool", DataType::Boolean, true),
Field::new_list("list", Field::new_list_field(DataType::Int64, true), true),
Field::new_struct(
"nested",
vec![
Field::new("float", DataType::Float64, true),
Field::new("int", DataType::Int64, true),
],
true,
),
Field::new("string", DataType::Utf8, true),
]),
Ok(vec![
Arc::new(BooleanArray::new_null(3)) as ArrayRef,
Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(vec![Some(1)]),
None,
Some(vec![Some(2), Some(3)]),
])),
Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("float", DataType::Float64, true)),
Arc::new(Float64Array::new_null(3)) as ArrayRef,
),
(
Arc::new(Field::new("int", DataType::Int64, true)),
Arc::new(Int64Array::from(vec![-1, -2, -3])),
),
])),
Arc::new(StringArray::from(vec!["a", "b", "c"])),
]),
)
.test()?;
// Test align failed.
TestCase::new(
StructArray::try_from(vec![
("i", Arc::new(Int64Array::from(vec![1])) as ArrayRef),
("j", Arc::new(Int64Array::from(vec![2])) as ArrayRef),
])
.unwrap(),
Fields::from(vec![Field::new("i", DataType::Int64, true)]),
Err(
r#"Failed to align JSON array, reason: this json array has more fields ["j"]"#
.to_string(),
),
)
.test()?;
Ok(())
}
#[test]
fn test_record_batch() {
let arrow_schema = Arc::new(ArrowSchema::new(vec![

View File

@@ -231,13 +231,15 @@ pub fn sql_value_to_value(
}
}
if value.data_type() != *data_type {
let value_datatype = value.data_type();
// The datatype of json value is determined by its actual data, so we can't simply "cast" it here.
if value_datatype.is_json() || value_datatype == *data_type {
Ok(value)
} else {
datatypes::types::cast(value, data_type).with_context(|_| InvalidCastSnafu {
sql_value: sql_val.clone(),
datatype: data_type,
})
} else {
Ok(value)
}
}

View File

@@ -16,6 +16,7 @@ use common_time::timezone::Timezone;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnDefaultConstraint;
use datatypes::schema::constraint::{CURRENT_TIMESTAMP, CURRENT_TIMESTAMP_FN};
use snafu::ensure;
use sqlparser::ast::ValueWithSpan;
pub use sqlparser::ast::{
BinaryOperator, ColumnDef, ColumnOption, ColumnOptionDef, DataType, Expr, Function,
@@ -37,6 +38,14 @@ pub fn parse_column_default_constraint(
.iter()
.find(|o| matches!(o.option, ColumnOption::Default(_)))
{
ensure!(
!data_type.is_json(),
UnsupportedDefaultValueSnafu {
column_name,
reason: "json column cannot have a default value",
}
);
let default_constraint = match &opt.option {
ColumnOption::Default(Expr::Value(v)) => ColumnDefaultConstraint::Value(
sql_value_to_value(column_name, data_type, &v.value, timezone, None, false)?,
@@ -82,7 +91,7 @@ pub fn parse_column_default_constraint(
} else {
return UnsupportedDefaultValueSnafu {
column_name,
expr: *expr.clone(),
reason: format!("expr '{expr}' not supported"),
}
.fail();
}
@@ -90,14 +99,14 @@ pub fn parse_column_default_constraint(
ColumnOption::Default(others) => {
return UnsupportedDefaultValueSnafu {
column_name,
expr: others.clone(),
reason: format!("expr '{others}' not supported"),
}
.fail();
}
_ => {
return UnsupportedDefaultValueSnafu {
column_name,
expr: Expr::Value(SqlValue::Null.into()),
reason: format!("option '{}' not supported", opt.option),
}
.fail();
}

View File

@@ -55,13 +55,11 @@ pub enum Error {
},
#[snafu(display(
"Unsupported expr in default constraint: {} for column: {}",
expr,
column_name
"Unsupported default constraint for column: '{column_name}', reason: {reason}"
))]
UnsupportedDefaultValue {
column_name: String,
expr: Expr,
reason: String,
#[snafu(implicit)]
location: Location,
},

View File

@@ -320,4 +320,15 @@ mod tests {
assert!(flush_reply.results[0].1.is_ok());
assert!(flush_reply.results[1].1.is_err());
}
#[test]
fn test_flush_regions_display() {
let region_id = RegionId::new(1024, 1);
let flush_regions = FlushRegions::sync_single(region_id);
let display = format!("{}", flush_regions);
assert_eq!(
display,
"FlushRegions(region_ids=[4398046511105(1024, 1)], strategy=Sync, error_strategy=FailFast)"
);
}
}

View File

@@ -1200,7 +1200,8 @@ impl RegionServerInner {
| RegionRequest::Flush(_)
| RegionRequest::Compact(_)
| RegionRequest::Truncate(_)
| RegionRequest::BuildIndex(_) => RegionChange::None,
| RegionRequest::BuildIndex(_)
| RegionRequest::EnterStaging(_) => RegionChange::None,
RegionRequest::Catchup(_) => RegionChange::Catchup,
};

View File

@@ -15,7 +15,6 @@
use std::fmt;
use std::sync::Arc;
use arrow::compute::cast as arrow_array_cast;
use arrow::datatypes::{
DataType as ArrowDataType, IntervalUnit as ArrowIntervalUnit, TimeUnit as ArrowTimeUnit,
};
@@ -368,8 +367,10 @@ impl ConcreteDataType {
/// Checks if the data type can cast to another data type.
pub fn can_arrow_type_cast_to(&self, to_type: &ConcreteDataType) -> bool {
let array = arrow_array::new_empty_array(&self.as_arrow_type());
arrow_array_cast(array.as_ref(), &to_type.as_arrow_type()).is_ok()
match (self, to_type) {
(ConcreteDataType::Json(this), ConcreteDataType::Json(that)) => that.is_include(this),
_ => arrow::compute::can_cast_types(&self.as_arrow_type(), &to_type.as_arrow_type()),
}
}
/// Try to cast data type as a [`DurationType`].

View File

@@ -15,7 +15,7 @@
use std::sync::Arc;
use arrow_schema::extension::ExtensionType;
use arrow_schema::{ArrowError, DataType};
use arrow_schema::{ArrowError, DataType, FieldRef};
use serde::{Deserialize, Serialize};
use crate::json::JsonStructureSettings;
@@ -102,3 +102,8 @@ impl ExtensionType for JsonExtensionType {
Ok(json)
}
}
/// Check if this field is to be treated as json extension type.
pub fn is_json_extension_type(field: &FieldRef) -> bool {
field.extension_type_name() == Some(JsonExtensionType::NAME)
}

View File

@@ -260,7 +260,7 @@ impl JsonValue {
ConcreteDataType::Json(self.json_type().clone())
}
pub(crate) fn json_type(&self) -> &JsonType {
pub fn json_type(&self) -> &JsonType {
self.json_type.get_or_init(|| self.json_variant.json_type())
}
@@ -268,6 +268,14 @@ impl JsonValue {
matches!(self.json_variant, JsonVariant::Null)
}
/// Check if this JSON value is an empty object.
pub fn is_empty_object(&self) -> bool {
match &self.json_variant {
JsonVariant::Object(object) => object.is_empty(),
_ => false,
}
}
pub(crate) fn as_i64(&self) -> Option<i64> {
match self.json_variant {
JsonVariant::Number(n) => n.as_i64(),

View File

@@ -273,8 +273,9 @@ fn collect_fields(column_schemas: &[ColumnSchema]) -> Result<FieldsAndIndices> {
_ => None,
};
if let Some(extype) = extype {
let metadata = HashMap::from([(TYPE_KEY.to_string(), extype.to_string())]);
field = field.with_metadata(metadata);
field
.metadata_mut()
.insert(TYPE_KEY.to_string(), extype.to_string());
}
fields.push(field);
ensure!(

View File

@@ -20,7 +20,7 @@ mod decimal_type;
mod dictionary_type;
mod duration_type;
mod interval_type;
pub(crate) mod json_type;
pub mod json_type;
mod list_type;
mod null_type;
mod primitive_type;

View File

@@ -18,7 +18,6 @@ use std::str::FromStr;
use std::sync::Arc;
use arrow::datatypes::DataType as ArrowDataType;
use arrow_schema::Fields;
use common_base::bytes::Bytes;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
@@ -36,7 +35,7 @@ use crate::vectors::json::builder::JsonVectorBuilder;
use crate::vectors::{BinaryVectorBuilder, MutableVector};
pub const JSON_TYPE_NAME: &str = "Json";
const JSON_PLAIN_FIELD_NAME: &str = "__plain__";
const JSON_PLAIN_FIELD_NAME: &str = "__json_plain__";
const JSON_PLAIN_FIELD_METADATA_KEY: &str = "is_plain_json";
pub type JsonObjectType = BTreeMap<String, JsonNativeType>;
@@ -59,6 +58,10 @@ pub enum JsonNativeType {
}
impl JsonNativeType {
pub fn is_null(&self) -> bool {
matches!(self, JsonNativeType::Null)
}
pub fn u64() -> Self {
Self::Number(JsonNumberType::U64)
}
@@ -187,7 +190,7 @@ impl JsonType {
}
}
pub(crate) fn empty() -> Self {
pub fn null() -> Self {
Self {
format: JsonFormat::Native(Box::new(JsonNativeType::Null)),
}
@@ -208,7 +211,7 @@ impl JsonType {
}
/// Try to merge this json type with others, error on datatype conflict.
pub(crate) fn merge(&mut self, other: &JsonType) -> Result<()> {
pub fn merge(&mut self, other: &JsonType) -> Result<()> {
match (&self.format, &other.format) {
(JsonFormat::Jsonb, JsonFormat::Jsonb) => Ok(()),
(JsonFormat::Native(this), JsonFormat::Native(that)) => {
@@ -223,7 +226,8 @@ impl JsonType {
}
}
pub(crate) fn is_mergeable(&self, other: &JsonType) -> bool {
/// Check if it can merge with `other` json type.
pub fn is_mergeable(&self, other: &JsonType) -> bool {
match (&self.format, &other.format) {
(JsonFormat::Jsonb, JsonFormat::Jsonb) => true,
(JsonFormat::Native(this), JsonFormat::Native(that)) => {
@@ -232,6 +236,43 @@ impl JsonType {
_ => false,
}
}
/// Check if it includes all fields in `other` json type.
pub fn is_include(&self, other: &JsonType) -> bool {
match (&self.format, &other.format) {
(JsonFormat::Jsonb, JsonFormat::Jsonb) => true,
(JsonFormat::Native(this), JsonFormat::Native(that)) => {
is_include(this.as_ref(), that.as_ref())
}
_ => false,
}
}
}
fn is_include(this: &JsonNativeType, that: &JsonNativeType) -> bool {
fn is_include_object(this: &JsonObjectType, that: &JsonObjectType) -> bool {
for (type_name, that_type) in that {
let Some(this_type) = this.get(type_name) else {
return false;
};
if !is_include(this_type, that_type) {
return false;
}
}
true
}
match (this, that) {
(this, that) if this == that => true,
(JsonNativeType::Array(this), JsonNativeType::Array(that)) => {
is_include(this.as_ref(), that.as_ref())
}
(JsonNativeType::Object(this), JsonNativeType::Object(that)) => {
is_include_object(this, that)
}
(_, JsonNativeType::Null) => true,
_ => false,
}
}
/// A special struct type for denoting "plain"(not object) json value. It has only one field, with
@@ -317,14 +358,14 @@ impl DataType for JsonType {
fn as_arrow_type(&self) -> ArrowDataType {
match self.format {
JsonFormat::Jsonb => ArrowDataType::Binary,
JsonFormat::Native(_) => ArrowDataType::Struct(Fields::empty()),
JsonFormat::Native(_) => self.as_struct_type().as_arrow_type(),
}
}
fn create_mutable_vector(&self, capacity: usize) -> Box<dyn MutableVector> {
match self.format {
match &self.format {
JsonFormat::Jsonb => Box::new(BinaryVectorBuilder::with_capacity(capacity)),
JsonFormat::Native(_) => Box::new(JsonVectorBuilder::with_capacity(capacity)),
JsonFormat::Native(x) => Box::new(JsonVectorBuilder::new(*x.clone(), capacity)),
}
}
@@ -336,6 +377,12 @@ impl DataType for JsonType {
}
}
impl Display for JsonType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name())
}
}
/// Converts a json type value to string
pub fn jsonb_to_string(val: &[u8]) -> Result<String> {
match jsonb::from_slice(val) {
@@ -366,6 +413,204 @@ mod tests {
use super::*;
use crate::json::JsonStructureSettings;
#[test]
fn test_json_type_include() {
fn test(this: &JsonNativeType, that: &JsonNativeType, expected: bool) {
assert_eq!(is_include(this, that), expected);
}
test(&JsonNativeType::Null, &JsonNativeType::Null, true);
test(&JsonNativeType::Null, &JsonNativeType::Bool, false);
test(&JsonNativeType::Bool, &JsonNativeType::Null, true);
test(&JsonNativeType::Bool, &JsonNativeType::Bool, true);
test(&JsonNativeType::Bool, &JsonNativeType::u64(), false);
test(&JsonNativeType::u64(), &JsonNativeType::Null, true);
test(&JsonNativeType::u64(), &JsonNativeType::u64(), true);
test(&JsonNativeType::u64(), &JsonNativeType::String, false);
test(&JsonNativeType::String, &JsonNativeType::Null, true);
test(&JsonNativeType::String, &JsonNativeType::String, true);
test(
&JsonNativeType::String,
&JsonNativeType::Array(Box::new(JsonNativeType::f64())),
false,
);
test(
&JsonNativeType::Array(Box::new(JsonNativeType::f64())),
&JsonNativeType::Null,
true,
);
test(
&JsonNativeType::Array(Box::new(JsonNativeType::f64())),
&JsonNativeType::Array(Box::new(JsonNativeType::Null)),
true,
);
test(
&JsonNativeType::Array(Box::new(JsonNativeType::f64())),
&JsonNativeType::Array(Box::new(JsonNativeType::f64())),
true,
);
test(
&JsonNativeType::Array(Box::new(JsonNativeType::f64())),
&JsonNativeType::String,
false,
);
test(
&JsonNativeType::Array(Box::new(JsonNativeType::f64())),
&JsonNativeType::Object(JsonObjectType::new()),
false,
);
let simple_json_object = &JsonNativeType::Object(JsonObjectType::from([(
"foo".to_string(),
JsonNativeType::String,
)]));
test(simple_json_object, &JsonNativeType::Null, true);
test(simple_json_object, simple_json_object, true);
test(simple_json_object, &JsonNativeType::i64(), false);
test(
simple_json_object,
&JsonNativeType::Object(JsonObjectType::from([(
"bar".to_string(),
JsonNativeType::i64(),
)])),
false,
);
let complex_json_object = &JsonNativeType::Object(JsonObjectType::from([
(
"nested".to_string(),
JsonNativeType::Object(JsonObjectType::from([(
"a".to_string(),
JsonNativeType::Object(JsonObjectType::from([(
"b".to_string(),
JsonNativeType::Object(JsonObjectType::from([(
"c".to_string(),
JsonNativeType::String,
)])),
)])),
)])),
),
("bar".to_string(), JsonNativeType::i64()),
]));
test(complex_json_object, &JsonNativeType::Null, true);
test(complex_json_object, &JsonNativeType::String, false);
test(complex_json_object, complex_json_object, true);
test(
complex_json_object,
&JsonNativeType::Object(JsonObjectType::from([(
"bar".to_string(),
JsonNativeType::i64(),
)])),
true,
);
test(
complex_json_object,
&JsonNativeType::Object(JsonObjectType::from([
(
"nested".to_string(),
JsonNativeType::Object(JsonObjectType::from([(
"a".to_string(),
JsonNativeType::Null,
)])),
),
("bar".to_string(), JsonNativeType::i64()),
])),
true,
);
test(
complex_json_object,
&JsonNativeType::Object(JsonObjectType::from([
(
"nested".to_string(),
JsonNativeType::Object(JsonObjectType::from([(
"a".to_string(),
JsonNativeType::String,
)])),
),
("bar".to_string(), JsonNativeType::i64()),
])),
false,
);
test(
complex_json_object,
&JsonNativeType::Object(JsonObjectType::from([
(
"nested".to_string(),
JsonNativeType::Object(JsonObjectType::from([(
"a".to_string(),
JsonNativeType::Object(JsonObjectType::from([(
"b".to_string(),
JsonNativeType::String,
)])),
)])),
),
("bar".to_string(), JsonNativeType::i64()),
])),
false,
);
test(
complex_json_object,
&JsonNativeType::Object(JsonObjectType::from([
(
"nested".to_string(),
JsonNativeType::Object(JsonObjectType::from([(
"a".to_string(),
JsonNativeType::Object(JsonObjectType::from([(
"b".to_string(),
JsonNativeType::Object(JsonObjectType::from([(
"c".to_string(),
JsonNativeType::Null,
)])),
)])),
)])),
),
("bar".to_string(), JsonNativeType::i64()),
])),
true,
);
test(
complex_json_object,
&JsonNativeType::Object(JsonObjectType::from([
(
"nested".to_string(),
JsonNativeType::Object(JsonObjectType::from([(
"a".to_string(),
JsonNativeType::Object(JsonObjectType::from([(
"b".to_string(),
JsonNativeType::Object(JsonObjectType::from([(
"c".to_string(),
JsonNativeType::Bool,
)])),
)])),
)])),
),
("bar".to_string(), JsonNativeType::i64()),
])),
false,
);
test(
complex_json_object,
&JsonNativeType::Object(JsonObjectType::from([(
"nested".to_string(),
JsonNativeType::Object(JsonObjectType::from([(
"a".to_string(),
JsonNativeType::Object(JsonObjectType::from([(
"b".to_string(),
JsonNativeType::Object(JsonObjectType::from([(
"c".to_string(),
JsonNativeType::String,
)])),
)])),
)])),
)])),
true,
);
}
#[test]
fn test_merge_json_type() -> Result<()> {
fn test(

View File

@@ -20,6 +20,7 @@ use crate::data_type::ConcreteDataType;
use crate::error::{Result, TryFromValueSnafu, UnsupportedOperationSnafu};
use crate::json::value::JsonValueRef;
use crate::prelude::{ValueRef, Vector, VectorRef};
use crate::types::json_type::JsonNativeType;
use crate::types::{JsonType, json_type};
use crate::value::StructValueRef;
use crate::vectors::{MutableVector, StructVectorBuilder};
@@ -181,9 +182,9 @@ pub(crate) struct JsonVectorBuilder {
}
impl JsonVectorBuilder {
pub(crate) fn with_capacity(capacity: usize) -> Self {
pub(crate) fn new(json_type: JsonNativeType, capacity: usize) -> Self {
Self {
merged_type: JsonType::empty(),
merged_type: JsonType::new_native(json_type),
capacity,
builders: vec![],
}
@@ -326,18 +327,18 @@ mod tests {
"Failed to merge JSON datatype: datatypes have conflict, this: Number(I64), that: Array[Bool]",
),
];
let mut builder = JsonVectorBuilder::with_capacity(1);
let mut builder = JsonVectorBuilder::new(JsonNativeType::Null, 1);
for (json, result) in jsons.into_iter().zip(results.into_iter()) {
push(json, &mut builder, result);
}
let vector = builder.to_vector();
let expected = r#"
+----------------+
| StructVector |
+----------------+
| {__plain__: 1} |
| {__plain__: 2} |
+----------------+"#;
+---------------------+
| StructVector |
+---------------------+
| {__json_plain__: 1} |
| {__json_plain__: 2} |
+---------------------+"#;
assert_eq!(pretty_print(vector), expected.trim());
Ok(())
}
@@ -386,7 +387,7 @@ mod tests {
"object": {"timestamp": 1761523203000}
}"#,
];
let mut builder = JsonVectorBuilder::with_capacity(1);
let mut builder = JsonVectorBuilder::new(JsonNativeType::Null, 1);
for json in jsons {
push(json, &mut builder, Ok(()));
}

View File

@@ -379,10 +379,8 @@ impl MutableVector for StructVectorBuilder {
},
StructValueRef::Ref(val) => self.push_struct_value(val)?,
StructValueRef::RefList { val, fields } => {
let struct_value = StructValue::try_new(
val.iter().map(|v| Value::from(v.clone())).collect(),
fields.clone(),
)?;
let struct_value =
StructValue::try_new(val.into_iter().map(Value::from).collect(), fields)?;
self.push_struct_value(&struct_value)?;
}
}
@@ -429,12 +427,17 @@ impl ScalarVectorBuilder for StructVectorBuilder {
.value_builders
.iter_mut()
.map(|b| b.to_vector().to_arrow_array())
.collect();
let struct_array = StructArray::new(
self.fields.as_arrow_fields(),
arrays,
self.null_buffer.finish(),
);
.collect::<Vec<_>>();
let struct_array = if arrays.is_empty() {
StructArray::new_empty_fields(self.len(), self.null_buffer.finish())
} else {
StructArray::new(
self.fields.as_arrow_fields(),
arrays,
self.null_buffer.finish(),
)
};
StructVector::try_new(self.fields.clone(), struct_array).unwrap()
}

View File

@@ -17,7 +17,7 @@
mod relation;
use api::helper::{pb_value_to_value_ref, value_to_grpc_value};
use api::helper::{pb_value_to_value_ref, to_grpc_value};
use api::v1::Row as ProtoRow;
use datatypes::data_type::ConcreteDataType;
use datatypes::types::cast;
@@ -201,11 +201,7 @@ impl From<ProtoRow> for Row {
impl From<Row> for ProtoRow {
fn from(row: Row) -> Self {
let values = row
.unpack()
.into_iter()
.map(value_to_grpc_value)
.collect_vec();
let values = row.unpack().into_iter().map(to_grpc_value).collect_vec();
ProtoRow { values }
}
}

View File

@@ -32,15 +32,18 @@ use operator::flow::FlowServiceOperator;
use operator::insert::Inserter;
use operator::procedure::ProcedureServiceOperator;
use operator::request::Requester;
use operator::statement::{StatementExecutor, StatementExecutorRef};
use operator::statement::{
ExecutorConfigureContext, StatementExecutor, StatementExecutorConfiguratorRef,
StatementExecutorRef,
};
use operator::table::TableMutationOperator;
use partition::manager::PartitionRuleManager;
use pipeline::pipeline_operator::PipelineOperator;
use query::QueryEngineFactory;
use query::region_query::RegionQueryHandlerFactoryRef;
use snafu::OptionExt;
use snafu::{OptionExt, ResultExt};
use crate::error::{self, Result};
use crate::error::{self, ExternalSnafu, Result};
use crate::events::EventHandlerImpl;
use crate::frontend::FrontendOptions;
use crate::instance::Instance;
@@ -187,10 +190,15 @@ impl FrontendBuilder {
Some(process_manager.clone()),
);
#[cfg(feature = "enterprise")]
let statement_executor =
if let Some(factory) = plugins.get::<operator::statement::TriggerQuerierFactoryRef>() {
statement_executor.with_trigger_querier(factory.create(kv_backend.clone()))
if let Some(configurator) = plugins.get::<StatementExecutorConfiguratorRef>() {
let ctx = ExecutorConfigureContext {
kv_backend: kv_backend.clone(),
};
configurator
.configure(statement_executor, ctx)
.await
.context(ExternalSnafu)?
} else {
statement_executor
};

View File

@@ -64,6 +64,7 @@ lazy_static.workspace = true
once_cell.workspace = true
ordered-float.workspace = true
parking_lot.workspace = true
partition.workspace = true
prometheus.workspace = true
prost.workspace = true
rand.workspace = true

View File

@@ -29,7 +29,7 @@ use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use common_telemetry::info;
use either::Either;
use servers::configurator::ConfiguratorRef;
use servers::configurator::GrpcRouterConfiguratorRef;
use servers::http::{HttpServer, HttpServerBuilder};
use servers::metrics_handler::MetricsHandler;
use servers::server::Server;
@@ -44,6 +44,7 @@ use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
use crate::election::CANDIDATE_LEASE_SECS;
use crate::election::etcd::EtcdElection;
use crate::error::OtherSnafu;
use crate::metasrv::builder::MetasrvBuilder;
use crate::metasrv::{
BackendImpl, ElectionRef, Metasrv, MetasrvOptions, SelectTarget, SelectorRef,
@@ -131,8 +132,15 @@ impl MetasrvInstance {
// Start gRPC server with admin services for backward compatibility
let mut router = router(self.metasrv.clone());
if let Some(configurator) = self.metasrv.plugins().get::<ConfiguratorRef>() {
router = configurator.config_grpc(router);
if let Some(configurator) = self
.metasrv
.plugins()
.get::<GrpcRouterConfiguratorRef<()>>()
{
router = configurator
.configure_grpc_router(router, ())
.await
.context(OtherSnafu)?;
}
let (serve_state_tx, serve_state_rx) = oneshot::channel();

View File

@@ -23,6 +23,7 @@ use store_api::storage::RegionId;
use table::metadata::TableId;
use tokio::sync::mpsc::error::SendError;
use tonic::codegen::http;
use uuid::Uuid;
use crate::metasrv::SelectTarget;
use crate::pubsub::Message;
@@ -982,6 +983,52 @@ pub enum Error {
#[snafu(source)]
source: common_meta::error::Error,
},
#[snafu(display(
"Repartition group {} source region missing, region id: {}",
group_id,
region_id
))]
RepartitionSourceRegionMissing {
group_id: Uuid,
region_id: RegionId,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Repartition group {} target region missing, region id: {}",
group_id,
region_id
))]
RepartitionTargetRegionMissing {
group_id: Uuid,
region_id: RegionId,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to serialize partition expression: {}", source))]
SerializePartitionExpr {
#[snafu(source)]
source: partition::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Partition expression mismatch, region id: {}, expected: {}, actual: {}",
region_id,
expected,
actual
))]
PartitionExprMismatch {
region_id: RegionId,
expected: String,
actual: String,
#[snafu(implicit)]
location: Location,
},
}
impl Error {
@@ -1041,6 +1088,7 @@ impl ErrorExt for Error {
| Error::MailboxChannelClosed { .. }
| Error::IsNotLeader { .. } => StatusCode::IllegalState,
Error::RetryLaterWithSource { source, .. } => source.status_code(),
Error::SerializePartitionExpr { source, .. } => source.status_code(),
Error::Unsupported { .. } => StatusCode::Unsupported,
@@ -1062,7 +1110,10 @@ impl ErrorExt for Error {
| Error::TooManyPartitions { .. }
| Error::TomlFormat { .. }
| Error::HandlerNotFound { .. }
| Error::LeaderPeerChanged { .. } => StatusCode::InvalidArguments,
| Error::LeaderPeerChanged { .. }
| Error::RepartitionSourceRegionMissing { .. }
| Error::RepartitionTargetRegionMissing { .. }
| Error::PartitionExprMismatch { .. } => StatusCode::InvalidArguments,
Error::LeaseKeyFromUtf8 { .. }
| Error::LeaseValueFromUtf8 { .. }
| Error::InvalidRegionKeyFromUtf8 { .. }

View File

@@ -28,9 +28,10 @@ mod procedure;
mod scheduler;
mod tracker;
pub(crate) use options::GcSchedulerOptions;
pub use options::GcSchedulerOptions;
pub use procedure::BatchGcProcedure;
pub(crate) use scheduler::{GcScheduler, GcTickerRef};
pub(crate) type Region2Peers = HashMap<RegionId, (Peer, Vec<Peer>)>;
pub type Region2Peers = HashMap<RegionId, (Peer, Vec<Peer>)>;
pub(crate) type Peer2Regions = HashMap<Peer, HashSet<RegionId>>;

View File

@@ -32,7 +32,7 @@ use table::metadata::TableId;
use crate::cluster::MetaPeerClientRef;
use crate::error::{self, Result, TableMetadataManagerSnafu, UnexpectedSnafu};
use crate::gc::Region2Peers;
use crate::gc::procedure::GcRegionProcedure;
use crate::gc::procedure::{BatchGcProcedure, GcRegionProcedure};
use crate::handler::HeartbeatMailbox;
use crate::service::mailbox::{Channel, MailboxRef};
@@ -84,25 +84,6 @@ impl DefaultGcSchedulerCtx {
mailbox: MailboxRef,
server_addr: String,
) -> Result<Self> {
// register a noop loader for `GcRegionProcedure` to avoid error when deserializing procedure when rebooting
procedure_manager
.register_loader(
GcRegionProcedure::TYPE_NAME,
Box::new(move |json| {
common_procedure::error::ProcedureLoaderNotImplementedSnafu {
type_name: GcRegionProcedure::TYPE_NAME.to_string(),
reason:
"GC procedure should be retried by scheduler, not reloaded from storage"
.to_string(),
}
.fail()
}),
)
.context(error::RegisterProcedureLoaderSnafu {
type_name: GcRegionProcedure::TYPE_NAME,
});
Ok(Self {
table_metadata_manager,
procedure_manager,

View File

@@ -13,11 +13,12 @@
// limitations under the License.
use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_meta::instruction::{self, GcRegions, InstructionReply};
use common_meta::instruction::{self, GcRegions, GetFileRefs, GetFileRefsReply, InstructionReply};
use common_meta::lock_key::RegionLock;
use common_meta::peer::Peer;
use common_procedure::error::ToJsonSnafu;
@@ -25,16 +26,126 @@ use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
Result as ProcedureResult, Status,
};
use common_telemetry::error;
use common_telemetry::{debug, error, info, warn};
use itertools::Itertools as _;
use serde::{Deserialize, Serialize};
use snafu::ResultExt as _;
use store_api::storage::GcReport;
use store_api::storage::{FileRefsManifest, GcReport, RegionId};
use crate::error::{self, Result, SerializeToJsonSnafu};
use crate::gc::Region2Peers;
use crate::handler::HeartbeatMailbox;
use crate::service::mailbox::{Channel, MailboxRef};
/// Helper function to send GetFileRefs instruction and wait for reply.
async fn send_get_file_refs(
mailbox: &MailboxRef,
server_addr: &str,
peer: &Peer,
instruction: GetFileRefs,
timeout: Duration,
) -> Result<GetFileRefsReply> {
let instruction = instruction::Instruction::GetFileRefs(instruction);
let msg = MailboxMessage::json_message(
&format!("Get file references: {}", instruction),
&format!("Metasrv@{}", server_addr),
&format!("Datanode-{}@{}", peer.id, peer.addr),
common_time::util::current_time_millis(),
&instruction,
)
.with_context(|_| SerializeToJsonSnafu {
input: instruction.to_string(),
})?;
let mailbox_rx = mailbox
.send(&Channel::Datanode(peer.id), msg, timeout)
.await?;
let reply = match mailbox_rx.await {
Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
Err(e) => {
error!(
"Failed to receive reply from datanode {} for GetFileRefs: {}",
peer, e
);
return Err(e);
}
};
let InstructionReply::GetFileRefs(reply) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: format!("{:?}", reply),
reason: "Unexpected reply of the GetFileRefs instruction",
}
.fail();
};
Ok(reply)
}
/// Helper function to send GcRegions instruction and wait for reply.
async fn send_gc_regions(
mailbox: &MailboxRef,
peer: &Peer,
gc_regions: GcRegions,
server_addr: &str,
timeout: Duration,
description: &str,
) -> Result<GcReport> {
let instruction = instruction::Instruction::GcRegions(gc_regions.clone());
let msg = MailboxMessage::json_message(
&format!("{}: {}", description, instruction),
&format!("Metasrv@{}", server_addr),
&format!("Datanode-{}@{}", peer.id, peer.addr),
common_time::util::current_time_millis(),
&instruction,
)
.with_context(|_| SerializeToJsonSnafu {
input: instruction.to_string(),
})?;
let mailbox_rx = mailbox
.send(&Channel::Datanode(peer.id), msg, timeout)
.await?;
let reply = match mailbox_rx.await {
Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
Err(e) => {
error!(
"Failed to receive reply from datanode {} for {}: {}",
peer, description, e
);
return Err(e);
}
};
let InstructionReply::GcRegions(reply) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: format!("{:?}", reply),
reason: "Unexpected reply of the GcRegions instruction",
}
.fail();
};
let res = reply.result;
match res {
Ok(report) => Ok(report),
Err(e) => {
error!(
"Datanode {} reported error during GC for regions {:?}: {}",
peer, gc_regions, e
);
error::UnexpectedSnafu {
violated: format!(
"Datanode {} reported error during GC for regions {:?}: {}",
peer, gc_regions, e
),
}
.fail()
}
}
}
/// TODO(discord9): another procedure which do both get file refs and gc regions.
pub struct GcRegionProcedure {
mailbox: MailboxRef,
@@ -74,60 +185,15 @@ impl GcRegionProcedure {
}
async fn send_gc_instr(&self) -> Result<GcReport> {
let peer = &self.data.peer;
let instruction = instruction::Instruction::GcRegions(self.data.gc_regions.clone());
let msg = MailboxMessage::json_message(
&format!("{}: {}", self.data.description, instruction),
&format!("Metasrv@{}", self.data.server_addr),
&format!("Datanode-{}@{}", peer.id, peer.addr),
common_time::util::current_time_millis(),
&instruction,
send_gc_regions(
&self.mailbox,
&self.data.peer,
self.data.gc_regions.clone(),
&self.data.server_addr,
self.data.timeout,
&self.data.description,
)
.with_context(|_| SerializeToJsonSnafu {
input: instruction.to_string(),
})?;
let mailbox_rx = self
.mailbox
.send(&Channel::Datanode(peer.id), msg, self.data.timeout)
.await?;
let reply = match mailbox_rx.await {
Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
Err(e) => {
error!(
"Failed to receive reply from datanode {} for {}: {}",
peer, self.data.description, e
);
return Err(e);
}
};
let InstructionReply::GcRegions(reply) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: format!("{:?}", reply),
reason: "Unexpected reply of the GcRegions instruction",
}
.fail();
};
let res = reply.result;
match res {
Ok(report) => Ok(report),
Err(e) => {
error!(
"Datanode {} reported error during GC for regions {:?}: {}",
peer, self.data.gc_regions, e
);
Err(error::UnexpectedSnafu {
violated: format!(
"Datanode {} reported error during GC for regions {:?}: {}",
peer, self.data.gc_regions, e
),
}
.fail()?)
}
}
.await
}
pub fn cast_result(res: Arc<dyn Any>) -> Result<GcReport> {
@@ -164,10 +230,10 @@ impl Procedure for GcRegionProcedure {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}
/// Write lock all regions involved in this GC procedure.
/// Read lock all regions involved in this GC procedure.
/// So i.e. region migration won't happen during GC and cause race conditions.
///
/// only write lock the regions not catatlog/schema because it can run concurrently with other procedures(i.e. drop database/table)
/// only read lock the regions not catatlog/schema because it can run concurrently with other procedures(i.e. drop database/table)
/// TODO:(discord9): integration test to verify this
fn lock_key(&self) -> LockKey {
let lock_key: Vec<_> = self
@@ -182,3 +248,297 @@ impl Procedure for GcRegionProcedure {
LockKey::new(lock_key)
}
}
/// Procedure to perform get file refs then batch GC for multiple regions, should only be used by admin function
/// for triggering manual gc, as it holds locks for too long and for all regions during the procedure.
pub struct BatchGcProcedure {
mailbox: MailboxRef,
data: BatchGcData,
}
#[derive(Serialize, Deserialize)]
pub struct BatchGcData {
state: State,
server_addr: String,
/// The regions to be GC-ed
regions: Vec<RegionId>,
full_file_listing: bool,
region_routes: Region2Peers,
/// Related regions (e.g., for shared files). Map: RegionId -> List of related RegionIds.
related_regions: HashMap<RegionId, Vec<RegionId>>,
/// Acquired file references (Populated in Acquiring state)
file_refs: FileRefsManifest,
/// mailbox timeout duration
timeout: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum State {
/// Initial state
Start,
/// Fetching file references from datanodes
Acquiring,
/// Sending GC instruction to the target datanode
Gcing,
}
impl BatchGcProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::BatchGcProcedure";
pub fn new(
mailbox: MailboxRef,
server_addr: String,
regions: Vec<RegionId>,
full_file_listing: bool,
region_routes: Region2Peers,
related_regions: HashMap<RegionId, Vec<RegionId>>,
timeout: Duration,
) -> Self {
Self {
mailbox,
data: BatchGcData {
state: State::Start,
server_addr,
regions,
full_file_listing,
region_routes,
related_regions,
file_refs: FileRefsManifest::default(),
timeout,
},
}
}
/// Get file references from all datanodes that host the regions
async fn get_file_references(&self) -> Result<FileRefsManifest> {
use std::collections::{HashMap, HashSet};
let query_regions = &self.data.regions;
let related_regions = &self.data.related_regions;
let region_routes = &self.data.region_routes;
let timeout = self.data.timeout;
// Group regions by datanode to minimize RPC calls
let mut datanode2query_regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
for region_id in query_regions {
if let Some((leader, followers)) = region_routes.get(region_id) {
datanode2query_regions
.entry(leader.clone())
.or_default()
.push(*region_id);
// also need to send for follower regions for file refs in case query is running on follower
for follower in followers {
datanode2query_regions
.entry(follower.clone())
.or_default()
.push(*region_id);
}
} else {
return error::UnexpectedSnafu {
violated: format!(
"region_routes: {region_routes:?} does not contain region_id: {region_id}",
),
}
.fail();
}
}
let mut datanode2related_regions: HashMap<Peer, HashMap<RegionId, Vec<RegionId>>> =
HashMap::new();
for (related_region, queries) in related_regions {
if let Some((leader, _followers)) = region_routes.get(related_region) {
datanode2related_regions
.entry(leader.clone())
.or_default()
.insert(*related_region, queries.clone());
} // since read from manifest, no need to send to followers
}
// Send GetFileRefs instructions to each datanode
let mut all_file_refs: HashMap<RegionId, HashSet<store_api::storage::FileId>> =
HashMap::new();
let mut all_manifest_versions = HashMap::new();
for (peer, regions) in datanode2query_regions {
let related_regions_for_peer =
datanode2related_regions.remove(&peer).unwrap_or_default();
let instruction = GetFileRefs {
query_regions: regions.clone(),
related_regions: related_regions_for_peer,
};
let reply = send_get_file_refs(
&self.mailbox,
&self.data.server_addr,
&peer,
instruction,
timeout,
)
.await?;
if !reply.success {
return error::UnexpectedSnafu {
violated: format!(
"Failed to get file references from datanode {}: {:?}",
peer, reply.error
),
}
.fail();
}
// Merge the file references from this datanode
for (region_id, file_refs) in reply.file_refs_manifest.file_refs {
all_file_refs
.entry(region_id)
.or_default()
.extend(file_refs);
}
// region manifest version should be the smallest one among all peers, so outdated region can be detected
for (region_id, version) in reply.file_refs_manifest.manifest_version {
let entry = all_manifest_versions.entry(region_id).or_insert(version);
*entry = (*entry).min(version);
}
}
Ok(FileRefsManifest {
file_refs: all_file_refs,
manifest_version: all_manifest_versions,
})
}
/// Send GC instruction to all datanodes that host the regions,
/// returns regions that need retry.
async fn send_gc_instructions(&self) -> Result<Vec<RegionId>> {
let regions = &self.data.regions;
let region_routes = &self.data.region_routes;
let file_refs = &self.data.file_refs;
let timeout = self.data.timeout;
// Group regions by datanode
let mut datanode2regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
for region_id in regions {
if let Some((leader, _followers)) = region_routes.get(region_id) {
datanode2regions
.entry(leader.clone())
.or_default()
.push(*region_id);
} else {
return error::UnexpectedSnafu {
violated: format!(
"region_routes: {region_routes:?} does not contain region_id: {region_id}",
),
}
.fail();
}
}
let mut all_need_retry = HashSet::new();
// Send GC instructions to each datanode
for (peer, regions_for_peer) in datanode2regions {
let gc_regions = GcRegions {
regions: regions_for_peer.clone(),
// file_refs_manifest can be large; cloning for each datanode is acceptable here since this is an admin-only operation.
file_refs_manifest: file_refs.clone(),
full_file_listing: self.data.full_file_listing,
};
let report = send_gc_regions(
&self.mailbox,
&peer,
gc_regions,
self.data.server_addr.as_str(),
timeout,
"Batch GC",
)
.await?;
let success = report.deleted_files.keys().collect_vec();
let need_retry = report.need_retry_regions.iter().cloned().collect_vec();
if need_retry.is_empty() {
info!(
"GC report from datanode {}: successfully deleted files for regions {:?}",
peer, success
);
} else {
warn!(
"GC report from datanode {}: successfully deleted files for regions {:?}, need retry for regions {:?}",
peer, success, need_retry
);
}
all_need_retry.extend(report.need_retry_regions);
}
Ok(all_need_retry.into_iter().collect())
}
}
#[async_trait::async_trait]
impl Procedure for BatchGcProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
match self.data.state {
State::Start => {
// Transition to Acquiring state
self.data.state = State::Acquiring;
Ok(Status::executing(false))
}
State::Acquiring => {
// Get file references from all datanodes
match self.get_file_references().await {
Ok(file_refs) => {
self.data.file_refs = file_refs;
self.data.state = State::Gcing;
Ok(Status::executing(false))
}
Err(e) => {
error!("Failed to get file references: {}", e);
Err(ProcedureError::external(e))
}
}
}
State::Gcing => {
// Send GC instructions to all datanodes
// TODO(discord9): handle need-retry regions
match self.send_gc_instructions().await {
Ok(_) => {
info!(
"Batch GC completed successfully for regions {:?}",
self.data.regions
);
Ok(Status::done())
}
Err(e) => {
error!("Failed to send GC instructions: {}", e);
Err(ProcedureError::external(e))
}
}
}
}
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}
/// Read lock all regions involved in this GC procedure.
/// So i.e. region migration won't happen during GC and cause race conditions.
fn lock_key(&self) -> LockKey {
let lock_key: Vec<_> = self
.data
.regions
.iter()
.sorted() // sort to have a deterministic lock order
.map(|id| RegionLock::Read(*id).into())
.collect();
LockKey::new(lock_key)
}
}

View File

@@ -129,27 +129,20 @@ impl HeartbeatHandler for RegionLeaseHandler {
#[cfg(test)]
mod test {
use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
use common_meta::distributed_time_constants;
use common_meta::error::Result as MetaResult;
use common_meta::key::TableMetadataManager;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::txn::{Txn, TxnResponse};
use common_meta::kv_backend::{KvBackend, TxnService};
use common_meta::kv_backend::test_util::MockKvBackendBuilder;
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse,
BatchPutRequest, BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest,
PutResponse, RangeRequest, RangeResponse,
};
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
@@ -425,63 +418,19 @@ mod test {
assert_eq!(granted, expected);
}
struct MockKvBackend;
#[async_trait::async_trait]
impl TxnService for MockKvBackend {
type Error = common_meta::error::Error;
async fn txn(&self, _txn: Txn) -> MetaResult<TxnResponse> {
unimplemented!()
}
fn max_txn_ops(&self) -> usize {
unimplemented!()
}
}
#[async_trait::async_trait]
impl KvBackend for MockKvBackend {
fn name(&self) -> &str {
"mock_kv_backend"
}
fn as_any(&self) -> &dyn Any {
self
}
async fn range(&self, _req: RangeRequest) -> MetaResult<RangeResponse> {
unimplemented!()
}
async fn put(&self, _req: PutRequest) -> MetaResult<PutResponse> {
unimplemented!()
}
async fn batch_put(&self, _req: BatchPutRequest) -> MetaResult<BatchPutResponse> {
unimplemented!()
}
async fn batch_get(&self, _req: BatchGetRequest) -> MetaResult<BatchGetResponse> {
common_meta::error::UnexpectedSnafu {
err_msg: "mock err",
}
.fail()
}
async fn delete_range(&self, _req: DeleteRangeRequest) -> MetaResult<DeleteRangeResponse> {
unimplemented!()
}
async fn batch_delete(&self, _req: BatchDeleteRequest) -> MetaResult<BatchDeleteResponse> {
unimplemented!()
}
}
#[tokio::test]
async fn test_handle_renew_region_lease_failure() {
common_telemetry::init_default_ut_logging();
let kvbackend = Arc::new(MockKvBackend);
let kv = MockKvBackendBuilder::default()
.batch_get_fn(Arc::new(|_| {
common_meta::error::UnexpectedSnafu {
err_msg: "mock err",
}
.fail()
}) as _)
.build()
.unwrap();
let kvbackend = Arc::new(kv);
let table_metadata_manager = Arc::new(TableMetadataManager::new(kvbackend));
let datanode_id = 1;

View File

@@ -28,7 +28,7 @@ use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocato
use common_meta::ddl::{
DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef,
};
use common_meta::ddl_manager::DdlManager;
use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef};
use common_meta::distributed_time_constants::{self};
use common_meta::key::TableMetadataManager;
use common_meta::key::flow::FlowMetadataManager;
@@ -54,7 +54,7 @@ use store_api::storage::MAX_REGION_SEQ;
use crate::bootstrap::build_default_meta_peer_client;
use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::cluster::MetaPeerClientRef;
use crate::error::{self, BuildWalOptionsAllocatorSnafu, Result};
use crate::error::{self, BuildWalOptionsAllocatorSnafu, OtherSnafu, Result};
use crate::events::EventHandlerImpl;
use crate::gc::GcScheduler;
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
@@ -402,13 +402,23 @@ impl MetasrvBuilder {
let procedure_manager_c = procedure_manager.clone();
let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager_c, true)
.context(error::InitDdlManagerSnafu)?;
#[cfg(feature = "enterprise")]
let ddl_manager = {
let trigger_ddl_manager = plugins.as_ref().and_then(|plugins| {
plugins.get::<common_meta::ddl_manager::TriggerDdlManagerRef>()
});
ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager)
let ddl_manager = if let Some(configurator) = plugins
.as_ref()
.and_then(|p| p.get::<DdlManagerConfiguratorRef<DdlManagerConfigureContext>>())
{
let ctx = DdlManagerConfigureContext {
kv_backend: kv_backend.clone(),
meta_peer_client: meta_peer_client.clone(),
};
configurator
.configure(ddl_manager, ctx)
.await
.context(OtherSnafu)?
} else {
ddl_manager
};
let ddl_manager = Arc::new(ddl_manager);
let region_flush_ticker = if is_remote_wal {
@@ -628,3 +638,9 @@ impl Default for MetasrvBuilder {
Self::new()
}
}
/// The context for [`DdlManagerConfiguratorRef`].
pub struct DdlManagerConfigureContext {
pub kv_backend: KvBackendRef,
pub meta_peer_client: MetaPeerClientRef,
}

View File

@@ -19,6 +19,7 @@ use common_procedure::ProcedureManagerRef;
use snafu::ResultExt;
pub mod region_migration;
pub mod repartition;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
#[cfg(test)]

View File

@@ -0,0 +1,19 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod group;
pub mod plan;
#[cfg(test)]
pub mod test_util;

View File

@@ -0,0 +1,284 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod repartition_start;
pub(crate) mod update_metadata;
use std::any::Any;
use std::fmt::Debug;
use common_error::ext::BoxedError;
use common_meta::DatanodeId;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::instruction::CacheIdent;
use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue, RegionInfo};
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use common_meta::rpc::router::RegionRoute;
use common_procedure::{Context as ProcedureContext, Status};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId};
use uuid::Uuid;
use crate::error::{self, Result};
use crate::procedure::repartition::plan::RegionDescriptor;
pub type GroupId = Uuid;
pub struct RepartitionGroupProcedure {}
pub struct Context {
pub persistent_ctx: PersistentContext,
pub cache_invalidator: CacheInvalidatorRef,
pub table_metadata_manager: TableMetadataManagerRef,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct GroupPrepareResult {
pub source_routes: Vec<RegionRoute>,
pub target_routes: Vec<RegionRoute>,
pub central_region: RegionId,
pub central_region_datanode_id: DatanodeId,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PersistentContext {
pub group_id: GroupId,
/// The table id of the repartition group.
pub table_id: TableId,
/// The source regions of the repartition group.
pub sources: Vec<RegionDescriptor>,
/// The target regions of the repartition group.
pub targets: Vec<RegionDescriptor>,
/// The result of group prepare.
/// The value will be set in [RepartitionStart](crate::procedure::repartition::group::repartition_start::RepartitionStart) state.
pub group_prepare_result: Option<GroupPrepareResult>,
}
impl Context {
/// Retrieves the table route value for the given table id.
///
/// Retry:
/// - Failed to retrieve the metadata of table.
///
/// Abort:
/// - Table route not found.
pub async fn get_table_route_value(
&self,
) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
let table_id = self.persistent_ctx.table_id;
let group_id = self.persistent_ctx.group_id;
let table_route_value = self
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.get_with_raw_bytes(table_id)
.await
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!(
"Failed to get table route for table: {}, repartition group: {}",
table_id, group_id
),
})?
.context(error::TableRouteNotFoundSnafu { table_id })?;
Ok(table_route_value)
}
/// Returns the `datanode_table_value`
///
/// Retry:
/// - Failed to retrieve the metadata of datanode table.
pub async fn get_datanode_table_value(
&self,
table_id: TableId,
datanode_id: u64,
) -> Result<DatanodeTableValue> {
let datanode_table_value = self
.table_metadata_manager
.datanode_table_manager()
.get(&DatanodeTableKey {
datanode_id,
table_id,
})
.await
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to get DatanodeTable: {table_id}"),
})?
.context(error::DatanodeTableNotFoundSnafu {
table_id,
datanode_id,
})?;
Ok(datanode_table_value)
}
/// Broadcasts the invalidate table cache message.
pub async fn invalidate_table_cache(&self) -> Result<()> {
let table_id = self.persistent_ctx.table_id;
let group_id = self.persistent_ctx.group_id;
let subject = format!(
"Invalidate table cache for repartition table, group: {}, table: {}",
group_id, table_id,
);
let ctx = common_meta::cache_invalidator::Context {
subject: Some(subject),
};
let _ = self
.cache_invalidator
.invalidate(&ctx, &[CacheIdent::TableId(table_id)])
.await;
Ok(())
}
/// Updates the table route.
///
/// Retry:
/// - Failed to retrieve the metadata of datanode table.
///
/// Abort:
/// - Table route not found.
/// - Failed to update the table route.
pub async fn update_table_route(
&self,
current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
new_region_routes: Vec<RegionRoute>,
) -> Result<()> {
let table_id = self.persistent_ctx.table_id;
// Safety: prepare result is set in [RepartitionStart] state.
let prepare_result = self.persistent_ctx.group_prepare_result.as_ref().unwrap();
let central_region_datanode_table_value = self
.get_datanode_table_value(table_id, prepare_result.central_region_datanode_id)
.await?;
let RegionInfo {
region_options,
region_wal_options,
..
} = &central_region_datanode_table_value.region_info;
self.table_metadata_manager
.update_table_route(
table_id,
central_region_datanode_table_value.region_info.clone(),
current_table_route_value,
new_region_routes,
region_options,
region_wal_options,
)
.await
.context(error::TableMetadataManagerSnafu)
}
}
/// Returns the region routes of the given table route value.
///
/// Abort:
/// - Table route value is not physical.
pub fn region_routes(
table_id: TableId,
table_route_value: &TableRouteValue,
) -> Result<&Vec<RegionRoute>> {
table_route_value
.region_routes()
.with_context(|_| error::UnexpectedLogicalRouteTableSnafu {
err_msg: format!(
"TableRoute({:?}) is a non-physical TableRouteValue.",
table_id
),
})
}
#[async_trait::async_trait]
#[typetag::serde(tag = "repartition_group_state")]
pub(crate) trait State: Sync + Send + Debug {
fn name(&self) -> &'static str {
let type_name = std::any::type_name::<Self>();
// short name
type_name.split("::").last().unwrap_or(type_name)
}
/// Yields the next [State] and [Status].
async fn next(
&mut self,
ctx: &mut Context,
procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)>;
fn as_any(&self) -> &dyn Any;
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::test_util::MockKvBackendBuilder;
use crate::error::Error;
use crate::procedure::repartition::test_util::{TestingEnv, new_persistent_context};
#[tokio::test]
async fn test_get_table_route_value_not_found_error() {
let env = TestingEnv::new();
let persistent_context = new_persistent_context(1024, vec![], vec![]);
let ctx = env.create_context(persistent_context);
let err = ctx.get_table_route_value().await.unwrap_err();
assert_matches!(err, Error::TableRouteNotFound { .. });
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_get_table_route_value_retry_error() {
let kv = MockKvBackendBuilder::default()
.range_fn(Arc::new(|_| {
common_meta::error::UnexpectedSnafu {
err_msg: "mock err",
}
.fail()
}))
.build()
.unwrap();
let mut env = TestingEnv::new();
env.table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new(kv)));
let persistent_context = new_persistent_context(1024, vec![], vec![]);
let ctx = env.create_context(persistent_context);
let err = ctx.get_table_route_value().await.unwrap_err();
assert!(err.is_retryable());
}
#[tokio::test]
async fn test_get_datanode_table_value_retry_error() {
let kv = MockKvBackendBuilder::default()
.range_fn(Arc::new(|_| {
common_meta::error::UnexpectedSnafu {
err_msg: "mock err",
}
.fail()
}))
.build()
.unwrap();
let mut env = TestingEnv::new();
env.table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new(kv)));
let persistent_context = new_persistent_context(1024, vec![], vec![]);
let ctx = env.create_context(persistent_context);
let err = ctx.get_datanode_table_value(1024, 1).await.unwrap_err();
assert!(err.is_retryable());
}
}

View File

@@ -0,0 +1,273 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::collections::HashMap;
use common_meta::rpc::router::RegionRoute;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::debug;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use crate::error::{self, Result};
use crate::procedure::repartition::group::{
Context, GroupId, GroupPrepareResult, State, region_routes,
};
use crate::procedure::repartition::plan::RegionDescriptor;
#[derive(Debug, Serialize, Deserialize)]
pub struct RepartitionStart;
/// Ensures that the partition expression of the region route matches the partition expression of the region descriptor.
fn ensure_region_route_expr_match(
region_route: &RegionRoute,
region_descriptor: &RegionDescriptor,
) -> Result<RegionRoute> {
let actual = &region_route.region.partition_expr;
let expected = region_descriptor
.partition_expr
.as_json_str()
.context(error::SerializePartitionExprSnafu)?;
ensure!(
actual == &expected,
error::PartitionExprMismatchSnafu {
region_id: region_route.region.id,
expected,
actual,
}
);
Ok(region_route.clone())
}
impl RepartitionStart {
/// Ensures that both source and target regions are present in the region routes.
///
/// Both source and target regions must be present in the region routes (target regions should be allocated before repartitioning).
#[allow(dead_code)]
fn ensure_route_present(
group_id: GroupId,
region_routes: &[RegionRoute],
sources: &[RegionDescriptor],
targets: &[RegionDescriptor],
) -> Result<GroupPrepareResult> {
ensure!(
!sources.is_empty(),
error::UnexpectedSnafu {
violated: "Sources are empty"
}
);
let region_routes_map = region_routes
.iter()
.map(|r| (r.region.id, r))
.collect::<HashMap<_, _>>();
let source_region_routes = sources
.iter()
.map(|s| {
region_routes_map
.get(&s.region_id)
.context(error::RepartitionSourceRegionMissingSnafu {
group_id,
region_id: s.region_id,
})
.and_then(|r| ensure_region_route_expr_match(r, s))
})
.collect::<Result<Vec<_>>>()?;
let target_region_routes = targets
.iter()
.map(|t| {
region_routes_map
.get(&t.region_id)
.context(error::RepartitionTargetRegionMissingSnafu {
group_id,
region_id: t.region_id,
})
.map(|r| (*r).clone())
})
.collect::<Result<Vec<_>>>()?;
let central_region = sources[0].region_id;
let central_region_datanode_id = source_region_routes[0]
.leader_peer
.as_ref()
.context(error::UnexpectedSnafu {
violated: format!(
"Leader peer is not set for central region: {}",
central_region
),
})?
.id;
Ok(GroupPrepareResult {
source_routes: source_region_routes,
target_routes: target_region_routes,
central_region,
central_region_datanode_id,
})
}
#[allow(dead_code)]
fn next_state() -> (Box<dyn State>, Status) {
// TODO(weny): change it later.
(Box::new(RepartitionStart), Status::executing(true))
}
}
#[async_trait::async_trait]
#[typetag::serde]
impl State for RepartitionStart {
/// Captures the group prepare result.
///
/// Retry:
/// - Failed to get the table route.
///
/// Abort
/// - Table route not found.
/// - Table route is not physical.
/// - Failed to ensure the route is present.
/// - Failed to capture the group prepare result.
async fn next(
&mut self,
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
if ctx.persistent_ctx.group_prepare_result.is_some() {
return Ok(Self::next_state());
}
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
let table_route_value = ctx.get_table_route_value().await?.into_inner();
let region_routes = region_routes(table_id, &table_route_value)?;
let group_prepare_result = Self::ensure_route_present(
group_id,
region_routes,
&ctx.persistent_ctx.sources,
&ctx.persistent_ctx.targets,
)?;
ctx.persistent_ctx.group_prepare_result = Some(group_prepare_result);
debug!(
"Repartition group {}: captured {} sources, {} targets",
group_id,
ctx.persistent_ctx.sources.len(),
ctx.persistent_ctx.targets.len()
);
Ok(Self::next_state())
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use store_api::storage::RegionId;
use uuid::Uuid;
use crate::error::Error;
use crate::procedure::repartition::group::repartition_start::RepartitionStart;
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::test_util::range_expr;
#[test]
fn test_ensure_route_present_missing_source_region() {
let source_region = RegionDescriptor {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 100),
};
let target_region = RegionDescriptor {
region_id: RegionId::new(1024, 2),
partition_expr: range_expr("x", 0, 10),
};
let region_routes = vec![RegionRoute {
region: Region {
id: RegionId::new(1024, 2),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}];
let err = RepartitionStart::ensure_route_present(
Uuid::new_v4(),
&region_routes,
&[source_region],
&[target_region],
)
.unwrap_err();
assert_matches!(err, Error::RepartitionSourceRegionMissing { .. });
}
#[test]
fn test_ensure_route_present_partition_expr_mismatch() {
let source_region = RegionDescriptor {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 100),
};
let target_region = RegionDescriptor {
region_id: RegionId::new(1024, 2),
partition_expr: range_expr("x", 0, 10),
};
let region_routes = vec![RegionRoute {
region: Region {
id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 5).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}];
let err = RepartitionStart::ensure_route_present(
Uuid::new_v4(),
&region_routes,
&[source_region],
&[target_region],
)
.unwrap_err();
assert_matches!(err, Error::PartitionExprMismatch { .. });
}
#[test]
fn test_ensure_route_present_missing_target_region() {
let source_region = RegionDescriptor {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 100),
};
let target_region = RegionDescriptor {
region_id: RegionId::new(1024, 2),
partition_expr: range_expr("x", 0, 10),
};
let region_routes = vec![RegionRoute {
region: Region {
id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}];
let err = RepartitionStart::ensure_route_present(
Uuid::new_v4(),
&region_routes,
&[source_region],
&[target_region],
)
.unwrap_err();
assert_matches!(err, Error::RepartitionTargetRegionMissing { .. });
}
}

View File

@@ -0,0 +1,80 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod apply_staging_region;
pub(crate) mod rollback_staging_region;
use std::any::Any;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::warn;
use serde::{Deserialize, Serialize};
use crate::error::Result;
use crate::procedure::repartition::group::repartition_start::RepartitionStart;
use crate::procedure::repartition::group::{Context, State};
#[derive(Debug, Serialize, Deserialize)]
pub enum UpdateMetadata {
/// Applies the new partition expressions for staging regions.
ApplyStaging,
/// Rolls back the new partition expressions for staging regions.
RollbackStaging,
}
impl UpdateMetadata {
#[allow(dead_code)]
fn next_state() -> (Box<dyn State>, Status) {
// TODO(weny): change it later.
(Box::new(RepartitionStart), Status::executing(true))
}
}
#[async_trait::async_trait]
#[typetag::serde]
impl State for UpdateMetadata {
async fn next(
&mut self,
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
match self {
UpdateMetadata::ApplyStaging => {
// TODO(weny): If all metadata have already been updated, skip applying staging regions.
self.apply_staging_regions(ctx).await?;
if let Err(err) = ctx.invalidate_table_cache().await {
warn!(
"Failed to broadcast the invalidate table cache message during the apply staging regions, error: {err:?}"
);
};
Ok(Self::next_state())
}
UpdateMetadata::RollbackStaging => {
self.rollback_staging_regions(ctx).await?;
if let Err(err) = ctx.invalidate_table_cache().await {
warn!(
"Failed to broadcast the invalidate table cache message during the rollback staging regions, error: {err:?}"
);
};
Ok(Self::next_state())
}
}
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -0,0 +1,181 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use common_error::ext::BoxedError;
use common_meta::rpc::router::RegionRoute;
use common_telemetry::error;
use snafu::{OptionExt, ResultExt};
use crate::error::{self, Result};
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::group::{Context, GroupId, region_routes};
use crate::procedure::repartition::plan::RegionDescriptor;
impl UpdateMetadata {
/// Applies the new partition expressions for staging regions.
///
/// Abort:
/// - Target region not found.
/// - Source region not found.
fn apply_staging_region_routes(
group_id: GroupId,
sources: &[RegionDescriptor],
targets: &[RegionDescriptor],
current_region_routes: &[RegionRoute],
) -> Result<Vec<RegionRoute>> {
let mut region_routes = current_region_routes.to_vec();
let mut region_routes_map = region_routes
.iter_mut()
.map(|route| (route.region.id, route))
.collect::<HashMap<_, _>>();
for target in targets {
let region_route = region_routes_map.get_mut(&target.region_id).context(
error::RepartitionTargetRegionMissingSnafu {
group_id,
region_id: target.region_id,
},
)?;
region_route.region.partition_expr = target
.partition_expr
.as_json_str()
.context(error::SerializePartitionExprSnafu)?;
region_route.set_leader_staging();
}
for source in sources {
let region_route = region_routes_map.get_mut(&source.region_id).context(
error::RepartitionSourceRegionMissingSnafu {
group_id,
region_id: source.region_id,
},
)?;
region_route.set_leader_staging();
}
Ok(region_routes)
}
/// Applies the new partition expressions for staging regions.
///
/// Abort:
/// - Table route is not physical.
/// - Target region not found.
/// - Source region not found.
/// - Failed to update the table route.
/// - Central region datanode table value not found.
#[allow(dead_code)]
pub(crate) async fn apply_staging_regions(&self, ctx: &mut Context) -> Result<()> {
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
let current_table_route_value = ctx.get_table_route_value().await?;
let region_routes = region_routes(table_id, current_table_route_value.get_inner_ref())?;
let new_region_routes = Self::apply_staging_region_routes(
group_id,
&ctx.persistent_ctx.sources,
&ctx.persistent_ctx.targets,
region_routes,
)?;
if let Err(err) = ctx
.update_table_route(&current_table_route_value, new_region_routes)
.await
{
error!(err; "Failed to update the table route during the updating metadata for repartition: {table_id}, group_id: {group_id}");
return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
reason: format!(
"Failed to update the table route during the updating metadata for repartition: {table_id}, group_id: {group_id}"
),
});
};
Ok(())
}
}
#[cfg(test)]
mod tests {
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use store_api::storage::RegionId;
use uuid::Uuid;
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::test_util::range_expr;
#[test]
fn test_generate_region_routes() {
let group_id = Uuid::new_v4();
let table_id = 1024;
let region_routes = vec![
RegionRoute {
region: Region {
id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
},
RegionRoute {
region: Region {
id: RegionId::new(table_id, 2),
partition_expr: String::new(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
},
RegionRoute {
region: Region {
id: RegionId::new(table_id, 3),
partition_expr: String::new(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
},
];
let source_region = RegionDescriptor {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 100),
};
let target_region = RegionDescriptor {
region_id: RegionId::new(table_id, 2),
partition_expr: range_expr("x", 0, 10),
};
let new_region_routes = UpdateMetadata::apply_staging_region_routes(
group_id,
&[source_region],
&[target_region],
&region_routes,
)
.unwrap();
assert!(new_region_routes[0].is_leader_staging());
assert_eq!(
new_region_routes[0].region.partition_expr,
range_expr("x", 0, 100).as_json_str().unwrap()
);
assert_eq!(
new_region_routes[1].region.partition_expr,
range_expr("x", 0, 10).as_json_str().unwrap()
);
assert!(new_region_routes[1].is_leader_staging());
assert!(!new_region_routes[2].is_leader_staging());
}
}

View File

@@ -0,0 +1,187 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use common_error::ext::BoxedError;
use common_meta::rpc::router::RegionRoute;
use common_telemetry::error;
use snafu::{OptionExt, ResultExt};
use crate::error::{self, Result};
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::group::{Context, GroupId, region_routes};
impl UpdateMetadata {
/// Rolls back the staging regions.
///
/// Abort:
/// - Source region not found.
/// - Target region not found.
#[allow(dead_code)]
fn rollback_staging_region_routes(
group_id: GroupId,
source_routes: &[RegionRoute],
target_routes: &[RegionRoute],
current_region_routes: &[RegionRoute],
) -> Result<Vec<RegionRoute>> {
let mut region_routes = current_region_routes.to_vec();
let mut region_routes_map = region_routes
.iter_mut()
.map(|route| (route.region.id, route))
.collect::<HashMap<_, _>>();
for source in source_routes {
let region_route = region_routes_map.get_mut(&source.region.id).context(
error::RepartitionSourceRegionMissingSnafu {
group_id,
region_id: source.region.id,
},
)?;
region_route.region.partition_expr = source.region.partition_expr.clone();
region_route.clear_leader_staging();
}
for target in target_routes {
let region_route = region_routes_map.get_mut(&target.region.id).context(
error::RepartitionTargetRegionMissingSnafu {
group_id,
region_id: target.region.id,
},
)?;
region_route.clear_leader_staging();
}
Ok(region_routes)
}
/// Rolls back the metadata for staging regions.
///
/// Abort:
/// - Table route is not physical.
/// - Source region not found.
/// - Target region not found.
/// - Failed to update the table route.
/// - Central region datanode table value not found.
#[allow(dead_code)]
pub(crate) async fn rollback_staging_regions(&self, ctx: &mut Context) -> Result<()> {
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
let current_table_route_value = ctx.get_table_route_value().await?;
let region_routes = region_routes(table_id, current_table_route_value.get_inner_ref())?;
// Safety: prepare result is set in [RepartitionStart] state.
let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
let new_region_routes = Self::rollback_staging_region_routes(
group_id,
&prepare_result.source_routes,
&prepare_result.target_routes,
region_routes,
)?;
if let Err(err) = ctx
.update_table_route(&current_table_route_value, new_region_routes)
.await
{
error!(err; "Failed to update the table route during the updating metadata for repartition: {table_id}, group_id: {group_id}");
return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
reason: format!(
"Failed to update the table route during the updating metadata for repartition: {table_id}, group_id: {group_id}"
),
});
};
Ok(())
}
}
#[cfg(test)]
mod tests {
use common_meta::peer::Peer;
use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
use store_api::storage::RegionId;
use uuid::Uuid;
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::test_util::range_expr;
#[test]
fn test_rollback_staging_region_routes() {
let group_id = Uuid::new_v4();
let table_id = 1024;
let region_routes = vec![
RegionRoute {
region: Region {
id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
leader_state: Some(LeaderState::Staging),
..Default::default()
},
RegionRoute {
region: Region {
id: RegionId::new(table_id, 2),
partition_expr: String::new(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
leader_state: Some(LeaderState::Staging),
..Default::default()
},
RegionRoute {
region: Region {
id: RegionId::new(table_id, 3),
partition_expr: String::new(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
leader_state: Some(LeaderState::Downgrading),
..Default::default()
},
];
let source_routes = vec![RegionRoute {
region: Region {
id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 20).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}];
let target_routes = vec![RegionRoute {
region: Region {
id: RegionId::new(table_id, 2),
partition_expr: range_expr("x", 0, 20).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}];
let new_region_routes = UpdateMetadata::rollback_staging_region_routes(
group_id,
&source_routes,
&target_routes,
&region_routes,
)
.unwrap();
assert!(!new_region_routes[0].is_leader_staging());
assert_eq!(
new_region_routes[0].region.partition_expr,
range_expr("x", 0, 20).as_json_str().unwrap(),
);
assert!(!new_region_routes[1].is_leader_staging());
assert!(new_region_routes[2].is_leader_downgrading());
}
}

View File

@@ -0,0 +1,26 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use partition::expr::PartitionExpr;
use serde::{Deserialize, Serialize};
use store_api::storage::RegionId;
/// Metadata describing a region involved in the plan.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RegionDescriptor {
/// The region id of the region involved in the plan.
pub region_id: RegionId,
/// The new partition expression of the region.
pub partition_expr: PartitionExpr,
}

View File

@@ -0,0 +1,91 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::sequence::SequenceBuilder;
use datatypes::value::Value;
use partition::expr::{PartitionExpr, col};
use store_api::storage::TableId;
use uuid::Uuid;
use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::metasrv::MetasrvInfo;
use crate::procedure::repartition::group::{Context, PersistentContext};
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::test_util::MailboxContext;
/// `TestingEnv` provides components during the tests.
pub struct TestingEnv {
pub table_metadata_manager: TableMetadataManagerRef,
pub mailbox_ctx: MailboxContext,
}
impl Default for TestingEnv {
fn default() -> Self {
Self::new()
}
}
impl TestingEnv {
pub fn new() -> Self {
let kv_backend = Arc::new(MemoryKvBackend::new());
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
let mailbox_sequence =
SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build();
let mailbox_ctx = MailboxContext::new(mailbox_sequence);
Self {
table_metadata_manager,
mailbox_ctx,
}
}
pub fn create_context(self, persistent_context: PersistentContext) -> Context {
let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new(
self.mailbox_ctx.mailbox().clone(),
MetasrvInfo {
server_addr: String::new(),
},
));
Context {
persistent_ctx: persistent_context,
table_metadata_manager: self.table_metadata_manager.clone(),
cache_invalidator,
}
}
}
pub fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr {
col(col_name)
.gt_eq(Value::Int64(start))
.and(col(col_name).lt(Value::Int64(end)))
}
pub fn new_persistent_context(
table_id: TableId,
sources: Vec<RegionDescriptor>,
targets: Vec<RegionDescriptor>,
) -> PersistentContext {
PersistentContext {
group_id: Uuid::new_v4(),
table_id,
sources,
targets,
group_prepare_result: None,
}
}

View File

@@ -23,6 +23,7 @@ mod options;
mod put;
mod read;
mod region_metadata;
mod staging;
mod state;
mod sync;
@@ -211,6 +212,13 @@ impl RegionEngine for MetricEngine {
let mut extension_return_value = HashMap::new();
let result = match request {
RegionRequest::EnterStaging(_) => {
if self.inner.is_physical_region(region_id) {
self.handle_enter_staging_request(region_id, request).await
} else {
UnsupportedRegionRequestSnafu { request }.fail()
}
}
RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
RegionRequest::Create(create) => {
self.inner

View File

@@ -15,7 +15,7 @@
mod extract_new_columns;
mod validate;
use std::collections::{HashMap, HashSet};
use std::collections::{BTreeSet, HashMap, HashSet};
use extract_new_columns::extract_new_columns;
use snafu::{OptionExt, ResultExt, ensure};
@@ -143,16 +143,20 @@ impl MetricEngineInner {
};
let data_region_id = to_data_region_id(physical_region_id);
let mut write_guards = HashMap::with_capacity(requests.len());
for (region_id, _) in requests.iter() {
if write_guards.contains_key(region_id) {
continue;
}
let _write_guard = self
.metadata_region
.write_lock_logical_region(*region_id)
.await?;
write_guards.insert(*region_id, _write_guard);
// Acquire logical region locks in a deterministic order to avoid deadlocks when multiple
// alter operations target overlapping regions concurrently.
let region_ids = requests
.iter()
.map(|(region_id, _)| *region_id)
.collect::<BTreeSet<_>>();
let mut write_guards = Vec::with_capacity(region_ids.len());
for region_id in region_ids {
write_guards.push(
self.metadata_region
.write_lock_logical_region(region_id)
.await?,
);
}
self.data_region

View File

@@ -0,0 +1,54 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::AffectedRows;
use snafu::ResultExt;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{EnterStagingRequest, RegionRequest};
use store_api::storage::RegionId;
use crate::engine::MetricEngine;
use crate::error::{MitoEnterStagingOperationSnafu, Result};
use crate::utils;
impl MetricEngine {
/// Handles the enter staging request for the given region.
pub(crate) async fn handle_enter_staging_request(
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<AffectedRows> {
let metadata_region_id = utils::to_metadata_region_id(region_id);
let data_region_id = utils::to_data_region_id(region_id);
// For metadata region, it doesn't care about the partition expr, so we can just pass an empty string.
self.inner
.mito
.handle_request(
metadata_region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: String::new(),
}),
)
.await
.context(MitoEnterStagingOperationSnafu)?;
self.inner
.mito
.handle_request(data_region_id, request)
.await
.context(MitoEnterStagingOperationSnafu)
.map(|response| response.affected_rows)
}
}

View File

@@ -156,6 +156,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Mito enter staging operation fails"))]
MitoEnterStagingOperation {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to collect record batch stream"))]
CollectRecordBatchStream {
source: common_recordbatch::error::Error,
@@ -360,6 +367,7 @@ impl ErrorExt for Error {
| MitoWriteOperation { source, .. }
| MitoFlushOperation { source, .. }
| MitoSyncOperation { source, .. }
| MitoEnterStagingOperation { source, .. }
| BatchOpenMitoRegion { source, .. }
| BatchCatchupMitoRegion { source, .. } => source.status_code(),

View File

@@ -55,7 +55,7 @@ lazy_static = "1.4"
log-store = { workspace = true }
mito-codec.workspace = true
moka = { workspace = true, features = ["sync", "future"] }
object-store.workspace = true
object-store = { workspace = true, features = ["testing"] }
parquet = { workspace = true, features = ["async"] }
paste.workspace = true
pin-project.workspace = true

View File

@@ -501,7 +501,7 @@ impl Compactor for DefaultCompactor {
// TODO: We might leak files if we fail to update manifest. We can add a cleanup task to remove them later.
compaction_region
.manifest_ctx
.update_manifest(RegionLeaderState::Writable, action_list)
.update_manifest(RegionLeaderState::Writable, action_list, false)
.await?;
Ok(edit)

View File

@@ -117,7 +117,7 @@ impl CompactionTaskImpl {
};
if let Err(e) = compaction_region
.manifest_ctx
.update_manifest(current_region_state, action_list)
.update_manifest(current_region_state, action_list, false)
.await
{
warn!(

View File

@@ -901,7 +901,7 @@ async fn test_alter_region_ttl_options_with_format(flat_format: bool) {
check_ttl(&engine, &Duration::from_secs(500));
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_write_stall_on_altering() {
common_telemetry::init_default_ut_logging();
@@ -952,6 +952,8 @@ async fn test_write_stall_on_altering_with_format(flat_format: bool) {
.await
.unwrap();
});
// Make sure the loop is handling the alter request.
tokio::time::sleep(Duration::from_millis(100)).await;
let column_schemas_cloned = column_schemas.clone();
let engine_cloned = engine.clone();
@@ -962,6 +964,8 @@ async fn test_write_stall_on_altering_with_format(flat_format: bool) {
};
put_rows(&engine_cloned, region_id, rows).await;
});
// Make sure the loop is handling the put request.
tokio::time::sleep(Duration::from_millis(100)).await;
listener.wake_notify();
alter_job.await.unwrap();

View File

@@ -74,6 +74,9 @@ pub trait EventListener: Send + Sync {
/// Notifies the listener that region starts to send a region change result to worker.
async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {}
/// Notifies the listener that region starts to send a enter staging result to worker.
async fn on_enter_staging_result_begin(&self, _region_id: RegionId) {}
/// Notifies the listener that the index build task is executed successfully.
async fn on_index_build_finish(&self, _region_file_id: RegionFileId) {}
@@ -307,6 +310,37 @@ impl EventListener for NotifyRegionChangeResultListener {
region_id
);
self.notify.notified().await;
info!(
"Continue to sending region change result for region {}",
region_id
);
}
}
#[derive(Default)]
pub struct NotifyEnterStagingResultListener {
notify: Notify,
}
impl NotifyEnterStagingResultListener {
/// Continue to sending enter staging result.
pub fn wake_notify(&self) {
self.notify.notify_one();
}
}
#[async_trait]
impl EventListener for NotifyEnterStagingResultListener {
async fn on_enter_staging_result_begin(&self, region_id: RegionId) {
info!(
"Wait on notify to start notify enter staging result for region {}",
region_id
);
self.notify.notified().await;
info!(
"Continue to sending enter staging result for region {}",
region_id
);
}
}

View File

@@ -14,17 +14,30 @@
//! Integration tests for staging state functionality.
use std::assert_matches::assert_matches;
use std::fs;
use std::sync::Arc;
use std::time::Duration;
use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
use object_store::Buffer;
use object_store::layers::mock::{
Entry, Error as MockError, ErrorKind, List, Lister, Metadata, MockLayerBuilder,
Result as MockResult, Write, Writer,
};
use store_api::region_engine::{RegionEngine, SettableRegionRoleState};
use store_api::region_request::{
RegionAlterRequest, RegionFlushRequest, RegionRequest, RegionTruncateRequest,
EnterStagingRequest, RegionAlterRequest, RegionFlushRequest, RegionRequest,
RegionTruncateRequest,
};
use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::engine::listener::NotifyEnterStagingResultListener;
use crate::error::Error;
use crate::region::{RegionLeaderState, RegionRoleState};
use crate::request::WorkerRequest;
use crate::test_util::{CreateRequestBuilder, TestEnv, build_rows, put_rows, rows_schema};
@@ -214,6 +227,8 @@ async fn test_staging_state_validation_patterns() {
);
}
const PARTITION_EXPR: &str = "partition_expr";
#[tokio::test]
async fn test_staging_manifest_directory() {
test_staging_manifest_directory_with_format(false).await;
@@ -221,6 +236,7 @@ async fn test_staging_manifest_directory() {
}
async fn test_staging_manifest_directory_with_format(flat_format: bool) {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
@@ -255,9 +271,57 @@ async fn test_staging_manifest_directory_with_format(flat_format: bool) {
// Now test staging mode manifest creation
// Set region to staging mode using the engine API
engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader)
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: PARTITION_EXPR.to_string(),
}),
)
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();
let staging_partition_expr = region.staging_partition_expr.lock().unwrap().clone();
assert_eq!(staging_partition_expr.unwrap(), PARTITION_EXPR);
{
let manager = region.manifest_ctx.manifest_manager.read().await;
assert_eq!(
manager
.staging_manifest()
.unwrap()
.metadata
.partition_expr
.as_deref()
.unwrap(),
PARTITION_EXPR
);
assert!(manager.manifest().metadata.partition_expr.is_none());
}
// Should be ok to enter staging mode again with the same partition expr
engine
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: PARTITION_EXPR.to_string(),
}),
)
.await
.unwrap();
// Should throw error if try to enter staging mode again with a different partition expr
let err = engine
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: "".to_string(),
}),
)
.await
.unwrap_err();
assert_matches!(
err.into_inner().as_any().downcast_ref::<Error>().unwrap(),
Error::StagingPartitionExprMismatch { .. }
);
// Put some data and flush in staging mode
let rows_data = Rows {
@@ -312,6 +376,7 @@ async fn test_staging_exit_success_with_manifests() {
}
async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool) {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
@@ -330,16 +395,28 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
.await
.unwrap();
// Add some data and flush in staging mode to generate staging manifests
let rows_data = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows_data).await;
// Enter staging mode
engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader)
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: PARTITION_EXPR.to_string(),
}),
)
.await
.unwrap();
// Add some data and flush in staging mode to generate staging manifests
let rows_data = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 5),
rows: build_rows(3, 8),
};
put_rows(&engine, region_id, rows_data).await;
@@ -357,7 +434,7 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
// Add more data and flush again to generate multiple staging manifests
let rows_data2 = Rows {
schema: column_schemas.clone(),
rows: build_rows(5, 10),
rows: build_rows(8, 10),
};
put_rows(&engine, region_id, rows_data2).await;
@@ -382,8 +459,11 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
.unwrap();
assert_eq!(
staging_files_before.len(),
2,
"Staging manifest directory should contain two files before exit"
// Two files for flush operation
// One file for entering staging mode
3,
"Staging manifest directory should contain 3 files before exit, got: {:?}",
staging_files_before
);
// Count normal manifest files before exit
@@ -394,8 +474,11 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
.unwrap();
let normal_count_before = normal_files_before.len();
assert_eq!(
normal_count_before, 1,
"Normal manifest directory should initially contain one file"
// One file for table creation
// One file for flush operation
normal_count_before,
2,
"Normal manifest directory should initially contain 2 files"
);
// Try read data before exiting staging, SST files should be invisible
@@ -403,8 +486,8 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
let scanner = engine.scanner(region_id, request).await.unwrap();
assert_eq!(
scanner.num_files(),
0,
"No SST files should be scanned before exit"
1,
"1 SST files should be scanned before exit"
);
assert_eq!(
scanner.num_memtables(),
@@ -415,14 +498,20 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
let batches = RecordBatches::try_collect(stream).await.unwrap();
let total_rows: usize = batches.iter().map(|rb| rb.num_rows()).sum();
assert_eq!(
total_rows, 0,
"No data should be readable before exit staging mode"
total_rows, 3,
"3 rows should be readable before exit staging mode"
);
// Inspect SSTs from manifest
let sst_entries = engine.all_ssts_from_manifest().await;
assert_eq!(sst_entries.len(), 2);
assert!(sst_entries.iter().all(|e| !e.visible));
assert_eq!(
sst_entries.len(),
3,
"sst entries should be 3, got: {:?}",
sst_entries
);
assert_eq!(sst_entries.iter().filter(|e| e.visible).count(), 1);
assert_eq!(sst_entries.iter().filter(|e| !e.visible).count(), 2);
// Exit staging mode successfully
engine
@@ -470,7 +559,7 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
let scanner = engine.scanner(region_id, request).await.unwrap();
assert_eq!(
scanner.num_files(),
2,
3,
"SST files should be scanned after exit"
);
@@ -482,6 +571,209 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
// Inspect SSTs from manifest
let sst_entries = engine.all_ssts_from_manifest().await;
assert_eq!(sst_entries.len(), 2);
assert_eq!(sst_entries.len(), 3);
assert!(sst_entries.iter().all(|e| e.visible));
}
#[tokio::test(flavor = "multi_thread")]
async fn test_write_stall_on_enter_staging() {
test_write_stall_on_enter_staging_with_format(false).await;
test_write_stall_on_enter_staging_with_format(true).await;
}
async fn test_write_stall_on_enter_staging_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let listener = Arc::new(NotifyEnterStagingResultListener::default());
let engine = env
.create_engine_with(
MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
},
None,
Some(listener.clone()),
None,
)
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let engine_cloned = engine.clone();
let alter_job = tokio::spawn(async move {
engine_cloned
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: PARTITION_EXPR.to_string(),
}),
)
.await
.unwrap();
});
// Make sure the loop is handling the alter request.
tokio::time::sleep(Duration::from_millis(100)).await;
let column_schemas_cloned = column_schemas.clone();
let engine_cloned = engine.clone();
let put_job = tokio::spawn(async move {
let rows = Rows {
schema: column_schemas_cloned,
rows: build_rows(0, 3),
};
put_rows(&engine_cloned, region_id, rows).await;
});
// Make sure the loop is handling the put request.
tokio::time::sleep(Duration::from_millis(100)).await;
listener.wake_notify();
alter_job.await.unwrap();
put_job.await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).await.unwrap();
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, batches.pretty_print().unwrap());
}
#[tokio::test]
async fn test_enter_staging_clean_staging_manifest_error() {
common_telemetry::init_default_ut_logging();
test_enter_staging_clean_staging_manifest_error_with_format(false).await;
test_enter_staging_clean_staging_manifest_error_with_format(true).await;
}
struct MockLister {
path: String,
inner: Lister,
}
impl List for MockLister {
async fn next(&mut self) -> MockResult<Option<Entry>> {
if self.path.contains("staging") {
return Err(MockError::new(ErrorKind::Unexpected, "mock error"));
}
self.inner.next().await
}
}
struct MockWriter {
path: String,
inner: Writer,
}
impl Write for MockWriter {
async fn write(&mut self, bs: Buffer) -> MockResult<()> {
self.inner.write(bs).await
}
async fn close(&mut self) -> MockResult<Metadata> {
if self.path.contains("staging") {
return Err(MockError::new(ErrorKind::Unexpected, "mock error"));
}
self.inner.close().await
}
async fn abort(&mut self) -> MockResult<()> {
self.inner.abort().await
}
}
async fn test_enter_staging_error(env: &mut TestEnv, flat_format: bool) {
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let region_id = RegionId::new(1024, 0);
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let err = engine
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: PARTITION_EXPR.to_string(),
}),
)
.await
.unwrap_err();
assert_eq!(err.status_code(), StatusCode::StorageUnavailable);
let region = engine.get_region(region_id).unwrap();
assert!(
region
.manifest_ctx
.manifest_manager
.read()
.await
.staging_manifest()
.is_none()
);
let state = region.state();
assert_eq!(state, RegionRoleState::Leader(RegionLeaderState::Writable));
}
async fn test_enter_staging_clean_staging_manifest_error_with_format(flat_format: bool) {
let mock_layer = MockLayerBuilder::default()
.lister_factory(Arc::new(|path, _args, lister| {
Box::new(MockLister {
path: path.to_string(),
inner: lister,
})
}))
.build()
.unwrap();
let mut env = TestEnv::new().await.with_mock_layer(mock_layer);
test_enter_staging_error(&mut env, flat_format).await;
}
#[tokio::test]
async fn test_enter_staging_save_staging_manifest_error() {
common_telemetry::init_default_ut_logging();
test_enter_staging_save_staging_manifest_error_with_format(false).await;
test_enter_staging_save_staging_manifest_error_with_format(true).await;
}
async fn test_enter_staging_save_staging_manifest_error_with_format(flat_format: bool) {
let mock_layer = MockLayerBuilder::default()
.writer_factory(Arc::new(|path, _args, lister| {
Box::new(MockWriter {
path: path.to_string(),
inner: lister,
})
}))
.build()
.unwrap();
let mut env = TestEnv::new().await.with_mock_layer(mock_layer);
test_enter_staging_error(&mut env, flat_format).await;
}

View File

@@ -1150,6 +1150,18 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Staging partition expr mismatch, manifest: {:?}, request: {}",
manifest_expr,
request_expr
))]
StagingPartitionExprMismatch {
manifest_expr: Option<String>,
request_expr: String,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1196,7 +1208,8 @@ impl ErrorExt for Error {
| InstallManifestTo { .. }
| Unexpected { .. }
| SerializeColumnMetadata { .. }
| SerializeManifest { .. } => StatusCode::Unexpected,
| SerializeManifest { .. }
| StagingPartitionExprMismatch { .. } => StatusCode::Unexpected,
RegionNotFound { .. } => StatusCode::RegionNotFound,
ObjectStoreNotFound { .. }

View File

@@ -208,7 +208,7 @@ impl WriteBufferManager for WriteBufferManagerImpl {
}
/// Reason of a flush task.
#[derive(Debug, IntoStaticStr)]
#[derive(Debug, IntoStaticStr, Clone, Copy, PartialEq, Eq)]
pub enum FlushReason {
/// Other reasons.
Others,
@@ -222,6 +222,8 @@ pub enum FlushReason {
Periodically,
/// Flush memtable during downgrading state.
Downgrading,
/// Enter staging mode.
EnterStaging,
}
impl FlushReason {
@@ -253,6 +255,8 @@ pub(crate) struct RegionFlushTask {
pub(crate) index_options: IndexOptions,
/// Semaphore to control flush concurrency.
pub(crate) flush_semaphore: Arc<Semaphore>,
/// Whether the region is in staging mode.
pub(crate) is_staging: bool,
}
impl RegionFlushTask {
@@ -316,6 +320,7 @@ impl RegionFlushTask {
_timer: timer,
edit,
memtables_to_remove,
is_staging: self.is_staging,
};
WorkerRequest::Background {
region_id: self.region_id,
@@ -398,7 +403,10 @@ impl RegionFlushTask {
flushed_sequence: Some(version_data.committed_sequence),
committed_sequence: None,
};
info!("Applying {edit:?} to region {}", self.region_id);
info!(
"Applying {edit:?} to region {}, is_staging: {}",
self.region_id, self.is_staging
);
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
@@ -417,11 +425,12 @@ impl RegionFlushTask {
// add a cleanup job to remove them later.
let version = self
.manifest_ctx
.update_manifest(expected_state, action_list)
.update_manifest(expected_state, action_list, self.is_staging)
.await?;
info!(
"Successfully update manifest version to {version}, region: {}, reason: {}",
"Successfully update manifest version to {version}, region: {}, is_staging: {}, reason: {}",
self.region_id,
self.is_staging,
self.reason.as_str()
);
@@ -1292,6 +1301,7 @@ mod tests {
.await,
index_options: IndexOptions::default(),
flush_semaphore: Arc::new(Semaphore::new(2)),
is_staging: false,
};
task.push_sender(OptionOutputTx::from(output_tx));
scheduler
@@ -1334,6 +1344,7 @@ mod tests {
manifest_ctx: manifest_ctx.clone(),
index_options: IndexOptions::default(),
flush_semaphore: Arc::new(Semaphore::new(2)),
is_staging: false,
})
.collect();
// Schedule first task.

View File

@@ -208,10 +208,6 @@ impl LocalGcWorker {
}
/// Get tmp ref files for all current regions
///
/// Outdated regions are added to `outdated_regions` set, which means their manifest version in
/// self.file_ref_manifest is older than the current manifest version on datanode.
/// so they need to retry GC later by metasrv with updated tmp ref files.
pub async fn read_tmp_ref_files(&self) -> Result<HashMap<RegionId, HashSet<FileId>>> {
let mut tmp_ref_files = HashMap::new();
for (region_id, file_refs) in &self.file_ref_manifest.file_refs {

View File

@@ -25,7 +25,6 @@ use crate::manifest::action::{RegionCheckpoint, RegionManifest};
use crate::manifest::manager::RegionManifestOptions;
use crate::manifest::storage::ManifestObjectStore;
use crate::metrics::MANIFEST_OP_ELAPSED;
use crate::region::{RegionLeaderState, RegionRoleState};
/// [`Checkpointer`] is responsible for doing checkpoint for a region, in an asynchronous way.
#[derive(Debug)]
@@ -137,20 +136,7 @@ impl Checkpointer {
/// Check if it's needed to do checkpoint for the region by the checkpoint distance.
/// If needed, and there's no currently running checkpoint task, it will start a new checkpoint
/// task running in the background.
pub(crate) fn maybe_do_checkpoint(
&self,
manifest: &RegionManifest,
region_state: RegionRoleState,
) {
// Skip checkpoint if region is in staging state
if region_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
info!(
"Skipping checkpoint for region {} in staging mode, manifest version: {}",
manifest.metadata.region_id, manifest.manifest_version
);
return;
}
pub(crate) fn maybe_do_checkpoint(&self, manifest: &RegionManifest) {
if self.manifest_options.checkpoint_distance == 0 {
return;
}

View File

@@ -151,6 +151,10 @@ pub struct RegionManifestManager {
last_version: Arc<AtomicU64>,
checkpointer: Checkpointer,
manifest: Arc<RegionManifest>,
// Staging manifest is used to store the manifest of the staging region before it becomes available.
// It is initially inherited from the previous manifest(i.e., `self.manifest`).
// When the staging manifest becomes available, it will be used to construct the new manifest.
staging_manifest: Option<Arc<RegionManifest>>,
stats: ManifestStats,
stopped: bool,
}
@@ -229,6 +233,7 @@ impl RegionManifestManager {
last_version: manifest_version,
checkpointer,
manifest: Arc::new(manifest),
staging_manifest: None,
stats: stats.clone(),
stopped: false,
})
@@ -334,6 +339,8 @@ impl RegionManifestManager {
last_version: manifest_version,
checkpointer,
manifest: Arc::new(manifest),
// TODO(weny): open the staging manifest if exists.
staging_manifest: None,
stats: stats.clone(),
stopped: false,
}))
@@ -504,7 +511,7 @@ impl RegionManifestManager {
pub async fn update(
&mut self,
action_list: RegionMetaActionList,
region_state: RegionRoleState,
is_staging: bool,
) -> Result<ManifestVersion> {
let _t = MANIFEST_OP_ELAPSED
.with_label_values(&["update"])
@@ -518,13 +525,19 @@ impl RegionManifestManager {
);
let version = self.increase_version();
let is_staging = region_state == RegionRoleState::Leader(RegionLeaderState::Staging);
self.store
.save(version, &action_list.encode()?, is_staging)
.await?;
// For a staging region, the manifest is initially inherited from the previous manifest(i.e., `self.manifest`).
// When the staging manifest becomes available, it will be used to construct the new manifest.
let mut manifest_builder =
RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
if is_staging && let Some(staging_manifest) = self.staging_manifest.as_ref() {
RegionManifestBuilder::with_checkpoint(Some(staging_manifest.as_ref().clone()))
} else {
RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()))
};
for action in action_list.actions {
match action {
RegionMetaAction::Change(action) => {
@@ -544,17 +557,27 @@ impl RegionManifestManager {
}
}
}
let new_manifest = manifest_builder.try_build()?;
new_manifest
.removed_files
.update_file_removed_cnt_to_stats(&self.stats);
let updated_manifest = self
.checkpointer
.update_manifest_removed_files(new_manifest)?;
self.manifest = Arc::new(updated_manifest);
self.checkpointer
.maybe_do_checkpoint(self.manifest.as_ref(), region_state);
if is_staging {
let new_manifest = manifest_builder.try_build()?;
self.staging_manifest = Some(Arc::new(new_manifest));
info!(
"Skipping checkpoint for region {} in staging mode, manifest version: {}",
self.manifest.metadata.region_id, self.manifest.manifest_version
);
} else {
let new_manifest = manifest_builder.try_build()?;
new_manifest
.removed_files
.update_file_removed_cnt_to_stats(&self.stats);
let updated_manifest = self
.checkpointer
.update_manifest_removed_files(new_manifest)?;
self.manifest = Arc::new(updated_manifest);
self.checkpointer
.maybe_do_checkpoint(self.manifest.as_ref());
}
Ok(version)
}
@@ -575,6 +598,11 @@ impl RegionManifestManager {
self.manifest.clone()
}
/// Retrieves the current [RegionManifest].
pub fn staging_manifest(&self) -> Option<Arc<RegionManifest>> {
self.staging_manifest.clone()
}
/// Returns total manifest size.
pub fn manifest_usage(&self) -> u64 {
self.store.total_manifest_size()
@@ -711,6 +739,22 @@ impl RegionManifestManager {
Ok(Some(RegionMetaActionList::new(merged_actions)))
}
/// Unsets the staging manifest.
pub(crate) fn unset_staging_manifest(&mut self) {
self.staging_manifest = None;
}
/// Clear all staging manifests.
pub(crate) async fn clear_staging_manifest_and_dir(&mut self) -> Result<()> {
self.staging_manifest = None;
self.store.clear_staging_manifests().await?;
info!(
"Cleared all staging manifests for region {}",
self.manifest.metadata.region_id
);
Ok(())
}
}
#[cfg(test)]
@@ -837,13 +881,7 @@ mod test {
sst_format: FormatType::PrimaryKey,
}));
let current_version = manager
.update(
action_list,
RegionRoleState::Leader(RegionLeaderState::Writable),
)
.await
.unwrap();
let current_version = manager.update(action_list, false).await.unwrap();
assert_eq!(current_version, 1);
manager.validate_manifest(&new_metadata, 1);
@@ -906,13 +944,7 @@ mod test {
sst_format: FormatType::PrimaryKey,
}));
let current_version = manager
.update(
action_list,
RegionRoleState::Leader(RegionLeaderState::Writable),
)
.await
.unwrap();
let current_version = manager.update(action_list, false).await.unwrap();
assert_eq!(current_version, 1);
manager.validate_manifest(&new_metadata, 1);
@@ -933,7 +965,7 @@ mod test {
flushed_sequence: None,
committed_sequence: None,
})]),
RegionRoleState::Leader(RegionLeaderState::Writable),
false,
)
.await
.unwrap();

View File

@@ -27,7 +27,6 @@ use crate::manifest::action::{
use crate::manifest::manager::RegionManifestManager;
use crate::manifest::storage::CheckpointMetadata;
use crate::manifest::tests::utils::basic_region_metadata;
use crate::region::{RegionLeaderState, RegionRoleState};
use crate::sst::file::FileMeta;
use crate::test_util::TestEnv;
@@ -87,13 +86,7 @@ async fn manager_without_checkpoint() {
// apply 10 actions
for _ in 0..10 {
manager
.update(
nop_action(),
RegionRoleState::Leader(RegionLeaderState::Writable),
)
.await
.unwrap();
manager.update(nop_action(), false).await.unwrap();
}
// no checkpoint
@@ -138,13 +131,7 @@ async fn manager_with_checkpoint_distance_1() {
// apply 10 actions
for _ in 0..10 {
manager
.update(
nop_action(),
RegionRoleState::Leader(RegionLeaderState::Writable),
)
.await
.unwrap();
manager.update(nop_action(), false).await.unwrap();
while manager.checkpointer().is_doing_checkpoint() {
tokio::time::sleep(Duration::from_millis(10)).await;
@@ -205,13 +192,7 @@ async fn test_corrupted_data_causing_checksum_error() {
// Apply actions
for _ in 0..10 {
manager
.update(
nop_action(),
RegionRoleState::Leader(RegionLeaderState::Writable),
)
.await
.unwrap();
manager.update(nop_action(), false).await.unwrap();
}
// Wait for the checkpoint to finish.
@@ -302,10 +283,7 @@ async fn generate_checkpoint_with_compression_types(
let (_env, mut manager) = build_manager(1, compress_type).await;
for action in actions {
manager
.update(action, RegionRoleState::Leader(RegionLeaderState::Writable))
.await
.unwrap();
manager.update(action, false).await.unwrap();
while manager.checkpointer().is_doing_checkpoint() {
tokio::time::sleep(Duration::from_millis(10)).await;
@@ -361,10 +339,7 @@ async fn manifest_install_manifest_to() {
let (env, mut manager) = build_manager(0, CompressionType::Uncompressed).await;
let (files, actions) = generate_action_lists(10);
for action in actions {
manager
.update(action, RegionRoleState::Leader(RegionLeaderState::Writable))
.await
.unwrap();
manager.update(action, false).await.unwrap();
}
// Nothing to install
@@ -402,10 +377,7 @@ async fn manifest_install_manifest_to_with_checkpoint() {
let (env, mut manager) = build_manager(3, CompressionType::Uncompressed).await;
let (files, actions) = generate_action_lists(10);
for action in actions {
manager
.update(action, RegionRoleState::Leader(RegionLeaderState::Writable))
.await
.unwrap();
manager.update(action, false).await.unwrap();
while manager.checkpointer().is_doing_checkpoint() {
tokio::time::sleep(Duration::from_millis(10)).await;
@@ -477,13 +449,7 @@ async fn test_checkpoint_bypass_in_staging_mode() {
// Apply actions in staging mode - checkpoint should be bypassed
for _ in 0..15 {
manager
.update(
nop_action(),
RegionRoleState::Leader(RegionLeaderState::Staging),
)
.await
.unwrap();
manager.update(nop_action(), true).await.unwrap();
}
assert!(!manager.checkpointer().is_doing_checkpoint());
@@ -498,13 +464,7 @@ async fn test_checkpoint_bypass_in_staging_mode() {
);
// Now switch to normal mode and apply one more action
manager
.update(
nop_action(),
RegionRoleState::Leader(RegionLeaderState::Writable),
)
.await
.unwrap();
manager.update(nop_action(), false).await.unwrap();
// Wait for potential checkpoint
while manager.checkpointer().is_doing_checkpoint() {

View File

@@ -14,11 +14,11 @@
//! Bulk part encoder/decoder.
use std::collections::VecDeque;
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};
use api::helper::{ColumnDataTypeWrapper, value_to_grpc_value};
use api::helper::{ColumnDataTypeWrapper, to_grpc_value};
use api::v1::bulk_wal_entry::Body;
use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType, bulk_wal_entry};
use bytes::Bytes;
@@ -34,7 +34,9 @@ use datatypes::arrow::array::{
UInt64Array, UInt64Builder,
};
use datatypes::arrow::compute::{SortColumn, SortOptions, TakeOptions};
use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
use datatypes::arrow::datatypes::{
DataType as ArrowDataType, Field, Schema, SchemaRef, UInt32Type,
};
use datatypes::arrow_array::BinaryArray;
use datatypes::data_type::DataType;
use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
@@ -51,14 +53,15 @@ use parquet::file::metadata::ParquetMetaData;
use parquet::file::properties::WriterProperties;
use snafu::{OptionExt, ResultExt, Snafu};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
use store_api::storage::{FileId, SequenceNumber, SequenceRange};
use store_api::storage::{FileId, RegionId, SequenceNumber, SequenceRange};
use table::predicate::Predicate;
use crate::error::{
self, ColumnNotFoundSnafu, ComputeArrowSnafu, DataTypeMismatchSnafu, EncodeMemtableSnafu,
EncodeSnafu, InvalidMetadataSnafu, NewRecordBatchSnafu, Result,
self, ColumnNotFoundSnafu, ComputeArrowSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu,
DataTypeMismatchSnafu, EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu,
InvalidRequestSnafu, NewRecordBatchSnafu, Result, UnexpectedSnafu,
};
use crate::memtable::bulk::context::BulkIterContextRef;
use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
@@ -167,6 +170,86 @@ impl BulkPart {
}
}
/// Fills missing columns in the BulkPart batch with default values.
///
/// This function checks if the batch schema matches the region metadata schema,
/// and if there are missing columns, it fills them with default values (or null
/// for nullable columns).
///
/// # Arguments
///
/// * `region_metadata` - The region metadata containing the expected schema
pub fn fill_missing_columns(&mut self, region_metadata: &RegionMetadata) -> Result<()> {
// Builds a map of existing columns in the batch
let batch_schema = self.batch.schema();
let batch_columns: HashSet<_> = batch_schema
.fields()
.iter()
.map(|f| f.name().as_str())
.collect();
// Finds columns that need to be filled
let mut columns_to_fill = Vec::new();
for column_meta in &region_metadata.column_metadatas {
// TODO(yingwen): Returns error if it is impure default after we support filling
// bulk insert request in the frontend
if !batch_columns.contains(column_meta.column_schema.name.as_str()) {
columns_to_fill.push(column_meta);
}
}
if columns_to_fill.is_empty() {
return Ok(());
}
let num_rows = self.batch.num_rows();
let mut new_columns = Vec::new();
let mut new_fields = Vec::new();
// First, adds all existing columns
new_fields.extend(batch_schema.fields().iter().cloned());
new_columns.extend_from_slice(self.batch.columns());
let region_id = region_metadata.region_id;
// Then adds the missing columns with default values
for column_meta in columns_to_fill {
let default_vector = column_meta
.column_schema
.create_default_vector(num_rows)
.context(CreateDefaultSnafu {
region_id,
column: &column_meta.column_schema.name,
})?
.with_context(|| InvalidRequestSnafu {
region_id,
reason: format!(
"column {} does not have default value",
column_meta.column_schema.name
),
})?;
let arrow_array = default_vector.to_arrow_array();
column_meta.column_schema.data_type.as_arrow_type();
new_fields.push(Arc::new(Field::new(
column_meta.column_schema.name.clone(),
column_meta.column_schema.data_type.as_arrow_type(),
column_meta.column_schema.is_nullable(),
)));
new_columns.push(arrow_array);
}
// Create a new schema and batch with the filled columns
let new_schema = Arc::new(Schema::new(new_fields));
let new_batch =
RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)?;
// Update the batch
self.batch = new_batch;
Ok(())
}
/// Converts [BulkPart] to [Mutation] for fallback `write_bulk` implementation.
pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
let vectors = region_metadata
@@ -185,7 +268,7 @@ impl BulkPart {
let values = (0..self.batch.num_columns())
.map(|col_idx| {
if let Some(v) = &vectors[col_idx] {
value_to_grpc_value(v.get(row_idx))
to_grpc_value(v.get(row_idx))
} else {
api::v1::Value { value_data: None }
}
@@ -667,6 +750,196 @@ fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
}
/// Converts a `BulkPart` that is unordered and without encoded primary keys into a `BulkPart`
/// with the same format as produced by [BulkPartConverter].
///
/// This function takes a `BulkPart` where:
/// - For dense encoding: Primary key columns may be stored as individual columns
/// - For sparse encoding: The `__primary_key` column should already be present with encoded keys
/// - The batch may not be sorted
///
/// And produces a `BulkPart` where:
/// - Primary key columns are optionally stored (depending on `store_primary_key_columns` and encoding)
/// - An encoded `__primary_key` dictionary column is present
/// - The batch is sorted by (primary_key, timestamp, sequence desc)
///
/// # Arguments
///
/// * `part` - The input `BulkPart` to convert
/// * `region_metadata` - Region metadata containing schema information
/// * `primary_key_codec` - Codec for encoding primary keys
/// * `schema` - Target schema for the output batch
/// * `store_primary_key_columns` - If true and encoding is not sparse, stores individual primary key columns
///
/// # Returns
///
/// Returns `None` if the input part has no rows, otherwise returns a new `BulkPart` with
/// encoded primary keys and sorted data.
pub fn convert_bulk_part(
part: BulkPart,
region_metadata: &RegionMetadataRef,
primary_key_codec: Arc<dyn PrimaryKeyCodec>,
schema: SchemaRef,
store_primary_key_columns: bool,
) -> Result<Option<BulkPart>> {
if part.num_rows() == 0 {
return Ok(None);
}
let num_rows = part.num_rows();
let is_sparse = region_metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse;
// Builds a column name-to-index map for efficient lookups
let input_schema = part.batch.schema();
let column_indices: HashMap<&str, usize> = input_schema
.fields()
.iter()
.enumerate()
.map(|(idx, field)| (field.name().as_str(), idx))
.collect();
// Determines the structure of the input batch by looking up columns by name
let mut output_columns = Vec::new();
// Extracts primary key columns if we need to encode them (dense encoding)
let pk_array = if is_sparse {
// For sparse encoding, the input should already have the __primary_key column
// We need to find it in the input batch
None
} else {
// For dense encoding, extract and encode primary key columns by name
let pk_vectors: Result<Vec<_>> = region_metadata
.primary_key_columns()
.map(|col_meta| {
let col_idx = column_indices
.get(col_meta.column_schema.name.as_str())
.context(ColumnNotFoundSnafu {
column: &col_meta.column_schema.name,
})?;
let col = part.batch.column(*col_idx);
Helper::try_into_vector(col).context(error::ComputeVectorSnafu)
})
.collect();
let pk_vectors = pk_vectors?;
let mut key_array_builder = PrimaryKeyArrayBuilder::new();
let mut encode_buf = Vec::new();
for row_idx in 0..num_rows {
encode_buf.clear();
// Collects primary key values with column IDs for this row
let pk_values_with_ids: Vec<_> = region_metadata
.primary_key
.iter()
.zip(pk_vectors.iter())
.map(|(col_id, vector)| (*col_id, vector.get_ref(row_idx)))
.collect();
// Encodes the primary key
primary_key_codec
.encode_value_refs(&pk_values_with_ids, &mut encode_buf)
.context(EncodeSnafu)?;
key_array_builder
.append(&encode_buf)
.context(ComputeArrowSnafu)?;
}
Some(key_array_builder.finish())
};
// Adds primary key columns if storing them (only for dense encoding)
if store_primary_key_columns && !is_sparse {
for col_meta in region_metadata.primary_key_columns() {
let col_idx = column_indices
.get(col_meta.column_schema.name.as_str())
.context(ColumnNotFoundSnafu {
column: &col_meta.column_schema.name,
})?;
let col = part.batch.column(*col_idx);
// Converts to dictionary if needed for string types
let col = if col_meta.column_schema.data_type.is_string() {
let target_type = ArrowDataType::Dictionary(
Box::new(ArrowDataType::UInt32),
Box::new(ArrowDataType::Utf8),
);
arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
} else {
col.clone()
};
output_columns.push(col);
}
}
// Adds field columns
for col_meta in region_metadata.field_columns() {
let col_idx = column_indices
.get(col_meta.column_schema.name.as_str())
.context(ColumnNotFoundSnafu {
column: &col_meta.column_schema.name,
})?;
output_columns.push(part.batch.column(*col_idx).clone());
}
// Adds timestamp column
let new_timestamp_index = output_columns.len();
let ts_col_idx = column_indices
.get(
region_metadata
.time_index_column()
.column_schema
.name
.as_str(),
)
.context(ColumnNotFoundSnafu {
column: &region_metadata.time_index_column().column_schema.name,
})?;
output_columns.push(part.batch.column(*ts_col_idx).clone());
// Adds encoded primary key dictionary column
let pk_dictionary = if let Some(pk_dict_array) = pk_array {
Arc::new(pk_dict_array) as ArrayRef
} else {
let pk_col_idx =
column_indices
.get(PRIMARY_KEY_COLUMN_NAME)
.context(ColumnNotFoundSnafu {
column: PRIMARY_KEY_COLUMN_NAME,
})?;
let col = part.batch.column(*pk_col_idx);
// Casts to dictionary type if needed
let target_type = ArrowDataType::Dictionary(
Box::new(ArrowDataType::UInt32),
Box::new(ArrowDataType::Binary),
);
arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
};
output_columns.push(pk_dictionary);
let sequence_array = UInt64Array::from(vec![part.sequence; num_rows]);
output_columns.push(Arc::new(sequence_array) as ArrayRef);
let op_type_array = UInt8Array::from(vec![OpType::Put as u8; num_rows]);
output_columns.push(Arc::new(op_type_array) as ArrayRef);
let batch = RecordBatch::try_new(schema, output_columns).context(NewRecordBatchSnafu)?;
// Sorts the batch by (primary_key, timestamp, sequence desc)
let sorted_batch = sort_primary_key_record_batch(&batch)?;
Ok(Some(BulkPart {
batch: sorted_batch,
max_timestamp: part.max_timestamp,
min_timestamp: part.min_timestamp,
sequence: part.sequence,
timestamp_index: new_timestamp_index,
raw_data: None,
}))
}
#[derive(Debug, Clone)]
pub struct EncodedBulkPart {
data: Bytes,
@@ -1189,11 +1462,14 @@ fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
mod tests {
use std::collections::VecDeque;
use api::v1::{Row, WriteHint};
use api::v1::{Row, SemanticType, WriteHint};
use datafusion_common::ScalarValue;
use datatypes::arrow::array::Float64Array;
use datatypes::prelude::{ConcreteDataType, ScalarVector, Value};
use datatypes::schema::ColumnSchema;
use datatypes::vectors::{Float64Vector, TimestampMillisecondVector};
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
use store_api::storage::consts::ReservedColumnId;
use super::*;
@@ -2166,4 +2442,379 @@ mod tests {
);
}
}
#[test]
fn test_convert_bulk_part_empty() {
let metadata = metadata_for_test();
let schema = to_flat_sst_arrow_schema(
&metadata,
&FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
);
let primary_key_codec = build_primary_key_codec(&metadata);
// Create empty batch
let empty_batch = RecordBatch::new_empty(schema.clone());
let empty_part = BulkPart {
batch: empty_batch,
max_timestamp: 0,
min_timestamp: 0,
sequence: 0,
timestamp_index: 0,
raw_data: None,
};
let result =
convert_bulk_part(empty_part, &metadata, primary_key_codec, schema, true).unwrap();
assert!(result.is_none());
}
#[test]
fn test_convert_bulk_part_dense_with_pk_columns() {
let metadata = metadata_for_test();
let primary_key_codec = build_primary_key_codec(&metadata);
let k0_array = Arc::new(arrow::array::StringArray::from(vec![
"key1", "key2", "key1",
]));
let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2, 1]));
let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200, 300]));
let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0, 3.0]));
let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000, 1500]));
let input_schema = Arc::new(Schema::new(vec![
Field::new("k0", ArrowDataType::Utf8, false),
Field::new("k1", ArrowDataType::UInt32, false),
Field::new("v0", ArrowDataType::Int64, true),
Field::new("v1", ArrowDataType::Float64, true),
Field::new(
"ts",
ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
false,
),
]));
let input_batch = RecordBatch::try_new(
input_schema,
vec![k0_array, k1_array, v0_array, v1_array, ts_array],
)
.unwrap();
let part = BulkPart {
batch: input_batch,
max_timestamp: 2000,
min_timestamp: 1000,
sequence: 5,
timestamp_index: 4,
raw_data: None,
};
let output_schema = to_flat_sst_arrow_schema(
&metadata,
&FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
);
let result = convert_bulk_part(
part,
&metadata,
primary_key_codec,
output_schema,
true, // store primary key columns
)
.unwrap();
let converted = result.unwrap();
assert_eq!(converted.num_rows(), 3);
assert_eq!(converted.max_timestamp, 2000);
assert_eq!(converted.min_timestamp, 1000);
assert_eq!(converted.sequence, 5);
let schema = converted.batch.schema();
let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
assert_eq!(
field_names,
vec![
"k0",
"k1",
"v0",
"v1",
"ts",
"__primary_key",
"__sequence",
"__op_type"
]
);
let k0_col = converted.batch.column_by_name("k0").unwrap();
assert!(matches!(
k0_col.data_type(),
ArrowDataType::Dictionary(_, _)
));
let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
let dict_array = pk_col
.as_any()
.downcast_ref::<DictionaryArray<UInt32Type>>()
.unwrap();
let keys = dict_array.keys();
assert_eq!(keys.len(), 3);
}
#[test]
fn test_convert_bulk_part_dense_without_pk_columns() {
let metadata = metadata_for_test();
let primary_key_codec = build_primary_key_codec(&metadata);
// Create input batch with primary key columns (k0, k1)
let k0_array = Arc::new(arrow::array::StringArray::from(vec!["key1", "key2"]));
let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2]));
let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
let input_schema = Arc::new(Schema::new(vec![
Field::new("k0", ArrowDataType::Utf8, false),
Field::new("k1", ArrowDataType::UInt32, false),
Field::new("v0", ArrowDataType::Int64, true),
Field::new("v1", ArrowDataType::Float64, true),
Field::new(
"ts",
ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
false,
),
]));
let input_batch = RecordBatch::try_new(
input_schema,
vec![k0_array, k1_array, v0_array, v1_array, ts_array],
)
.unwrap();
let part = BulkPart {
batch: input_batch,
max_timestamp: 2000,
min_timestamp: 1000,
sequence: 3,
timestamp_index: 4,
raw_data: None,
};
let output_schema = to_flat_sst_arrow_schema(
&metadata,
&FlatSchemaOptions {
raw_pk_columns: false,
string_pk_use_dict: true,
},
);
let result = convert_bulk_part(
part,
&metadata,
primary_key_codec,
output_schema,
false, // don't store primary key columns
)
.unwrap();
let converted = result.unwrap();
assert_eq!(converted.num_rows(), 2);
assert_eq!(converted.max_timestamp, 2000);
assert_eq!(converted.min_timestamp, 1000);
assert_eq!(converted.sequence, 3);
// Verify schema does NOT include individual primary key columns
let schema = converted.batch.schema();
let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
assert_eq!(
field_names,
vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
);
// Verify __primary_key column is present and is a dictionary
let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
assert!(matches!(
pk_col.data_type(),
ArrowDataType::Dictionary(_, _)
));
}
#[test]
fn test_convert_bulk_part_sparse_encoding() {
let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 0,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
semantic_type: SemanticType::Field,
column_id: 3,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
semantic_type: SemanticType::Field,
column_id: 4,
})
.primary_key(vec![0, 1])
.primary_key_encoding(PrimaryKeyEncoding::Sparse);
let metadata = Arc::new(builder.build().unwrap());
let primary_key_codec = build_primary_key_codec(&metadata);
// Create input batch with __primary_key column (sparse encoding)
let pk_array = Arc::new(arrow::array::BinaryArray::from(vec![
b"encoded_key_1".as_slice(),
b"encoded_key_2".as_slice(),
]));
let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
let input_schema = Arc::new(Schema::new(vec![
Field::new("__primary_key", ArrowDataType::Binary, false),
Field::new("v0", ArrowDataType::Int64, true),
Field::new("v1", ArrowDataType::Float64, true),
Field::new(
"ts",
ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
false,
),
]));
let input_batch =
RecordBatch::try_new(input_schema, vec![pk_array, v0_array, v1_array, ts_array])
.unwrap();
let part = BulkPart {
batch: input_batch,
max_timestamp: 2000,
min_timestamp: 1000,
sequence: 7,
timestamp_index: 3,
raw_data: None,
};
let output_schema = to_flat_sst_arrow_schema(
&metadata,
&FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
);
let result = convert_bulk_part(
part,
&metadata,
primary_key_codec,
output_schema,
true, // store_primary_key_columns (ignored for sparse)
)
.unwrap();
let converted = result.unwrap();
assert_eq!(converted.num_rows(), 2);
assert_eq!(converted.max_timestamp, 2000);
assert_eq!(converted.min_timestamp, 1000);
assert_eq!(converted.sequence, 7);
// Verify schema does NOT include individual primary key columns (sparse encoding)
let schema = converted.batch.schema();
let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
assert_eq!(
field_names,
vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
);
// Verify __primary_key is dictionary encoded
let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
assert!(matches!(
pk_col.data_type(),
ArrowDataType::Dictionary(_, _)
));
}
#[test]
fn test_convert_bulk_part_sorting_with_multiple_series() {
let metadata = metadata_for_test();
let primary_key_codec = build_primary_key_codec(&metadata);
// Create unsorted batch with multiple series and timestamps
let k0_array = Arc::new(arrow::array::StringArray::from(vec![
"series_b", "series_a", "series_b", "series_a",
]));
let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![2, 1, 2, 1]));
let v0_array = Arc::new(arrow::array::Int64Array::from(vec![200, 100, 400, 300]));
let v1_array = Arc::new(arrow::array::Float64Array::from(vec![2.0, 1.0, 4.0, 3.0]));
let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
2000, 1000, 4000, 3000,
]));
let input_schema = Arc::new(Schema::new(vec![
Field::new("k0", ArrowDataType::Utf8, false),
Field::new("k1", ArrowDataType::UInt32, false),
Field::new("v0", ArrowDataType::Int64, true),
Field::new("v1", ArrowDataType::Float64, true),
Field::new(
"ts",
ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
false,
),
]));
let input_batch = RecordBatch::try_new(
input_schema,
vec![k0_array, k1_array, v0_array, v1_array, ts_array],
)
.unwrap();
let part = BulkPart {
batch: input_batch,
max_timestamp: 4000,
min_timestamp: 1000,
sequence: 10,
timestamp_index: 4,
raw_data: None,
};
let output_schema = to_flat_sst_arrow_schema(
&metadata,
&FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
);
let result =
convert_bulk_part(part, &metadata, primary_key_codec, output_schema, true).unwrap();
let converted = result.unwrap();
assert_eq!(converted.num_rows(), 4);
// Verify data is sorted by (primary_key, timestamp, sequence desc)
let ts_col = converted.batch.column(converted.timestamp_index);
let ts_array = ts_col
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
// After sorting by (pk, ts), we should have:
// series_a,1: ts=1000, 3000
// series_b,2: ts=2000, 4000
let timestamps: Vec<i64> = ts_array.values().to_vec();
assert_eq!(timestamps, vec![1000, 3000, 2000, 4000]);
}
}

View File

@@ -103,7 +103,7 @@ impl KeyDictBuilder {
self.key_bytes_in_index += full_primary_key.len() + sparse_key_len;
// Adds key size of index to the metrics.
MEMTABLE_DICT_BYTES.add(self.key_bytes_in_index as i64);
MEMTABLE_DICT_BYTES.add((full_primary_key.len() + sparse_key_len) as i64);
pk_index
}

View File

@@ -261,7 +261,7 @@ impl TimePartitions {
converter.append_key_values(kvs)?;
let part = converter.convert()?;
return self.write_bulk(part);
return self.write_bulk_inner(part);
}
// Get all parts.
@@ -291,7 +291,31 @@ impl TimePartitions {
self.write_multi_parts(kvs, &parts)
}
/// Writes a bulk part.
pub fn write_bulk(&self, part: BulkPart) -> Result<()> {
// Convert the bulk part if bulk_schema is Some
let part = if let Some(bulk_schema) = &self.bulk_schema {
let converted = crate::memtable::bulk::part::convert_bulk_part(
part,
&self.metadata,
self.primary_key_codec.clone(),
bulk_schema.clone(),
// Always store primary keys for bulk mode.
true,
)?;
match converted {
Some(p) => p,
None => return Ok(()),
}
} else {
part
};
self.write_bulk_inner(part)
}
/// Writes a bulk part without converting.
fn write_bulk_inner(&self, part: BulkPart) -> Result<()> {
let time_type = self
.metadata
.time_index_column()

View File

@@ -18,6 +18,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use api::v1::SemanticType;
use common_recordbatch::recordbatch::align_json_array;
use datatypes::arrow::array::{
Array, ArrayRef, BinaryArray, BinaryBuilder, DictionaryArray, UInt32Array,
};
@@ -27,7 +28,7 @@ use datatypes::arrow::record_batch::RecordBatch;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::DataType;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use datatypes::vectors::{Helper, VectorRef};
use mito_codec::row_converter::{
CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec,
build_primary_key_codec_with_fields,
@@ -38,8 +39,9 @@ use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use crate::error::{
CompatReaderSnafu, ComputeArrowSnafu, CreateDefaultSnafu, DecodeSnafu, EncodeSnafu,
NewRecordBatchSnafu, Result, UnexpectedSnafu, UnsupportedOperationSnafu,
CastVectorSnafu, CompatReaderSnafu, ComputeArrowSnafu, ConvertVectorSnafu, CreateDefaultSnafu,
DecodeSnafu, EncodeSnafu, NewRecordBatchSnafu, RecordBatchSnafu, Result, UnexpectedSnafu,
UnsupportedOperationSnafu,
};
use crate::read::flat_projection::{FlatProjectionMapper, flat_projected_columns};
use crate::read::projection::{PrimaryKeyProjectionMapper, ProjectionMapper};
@@ -150,7 +152,7 @@ impl PrimaryKeyCompatBatch {
batch = compat_pk.compat(batch)?;
}
if let Some(compat_fields) = &self.compat_fields {
batch = compat_fields.compat(batch);
batch = compat_fields.compat(batch)?;
}
Ok(batch)
@@ -351,11 +353,13 @@ impl FlatCompatBatch {
let old_column = batch.column(*pos);
if let Some(ty) = cast_type {
// Safety: We ensure type can be converted and the new batch should be valid.
// Tips: `safe` must be true in `CastOptions`, which will replace the specific value with null when it cannot be converted.
let casted =
let casted = if let Some(json_type) = ty.as_json() {
align_json_array(old_column, &json_type.as_arrow_type())
.context(RecordBatchSnafu)?
} else {
datatypes::arrow::compute::cast(old_column, &ty.as_arrow_type())
.context(ComputeArrowSnafu)?;
.context(ComputeArrowSnafu)?
};
Ok(casted)
} else {
Ok(old_column.clone())
@@ -452,8 +456,7 @@ struct CompatFields {
impl CompatFields {
/// Make fields of the `batch` compatible.
#[must_use]
fn compat(&self, batch: Batch) -> Batch {
fn compat(&self, batch: Batch) -> Result<Batch> {
debug_assert_eq!(self.actual_fields.len(), batch.fields().len());
debug_assert!(
self.actual_fields
@@ -463,24 +466,32 @@ impl CompatFields {
);
let len = batch.num_rows();
let fields = self
.index_or_defaults
self.index_or_defaults
.iter()
.map(|index_or_default| match index_or_default {
IndexOrDefault::Index { pos, cast_type } => {
let old_column = &batch.fields()[*pos];
let data = if let Some(ty) = cast_type {
// Safety: We ensure type can be converted and the new batch should be valid.
// Tips: `safe` must be true in `CastOptions`, which will replace the specific value with null when it cannot be converted.
old_column.data.cast(ty).unwrap()
if let Some(json_type) = ty.as_json() {
let json_array = old_column.data.to_arrow_array();
let json_array =
align_json_array(&json_array, &json_type.as_arrow_type())
.context(RecordBatchSnafu)?;
Helper::try_into_vector(&json_array).context(ConvertVectorSnafu)?
} else {
old_column.data.cast(ty).with_context(|_| CastVectorSnafu {
from: old_column.data.data_type(),
to: ty.clone(),
})?
}
} else {
old_column.data.clone()
};
BatchColumn {
Ok(BatchColumn {
column_id: old_column.column_id,
data,
}
})
}
IndexOrDefault::DefaultValue {
column_id,
@@ -488,16 +499,14 @@ impl CompatFields {
semantic_type: _,
} => {
let data = default_vector.replicate(&[len]);
BatchColumn {
Ok(BatchColumn {
column_id: *column_id,
data,
}
})
}
})
.collect();
// Safety: We ensure all columns have the same length and the new batch should be valid.
batch.with_fields(fields).unwrap()
.collect::<Result<Vec<_>>>()
.and_then(|fields| batch.with_fields(fields))
}
}

View File

@@ -22,7 +22,7 @@ pub(crate) mod version;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Mutex, RwLock};
use common_telemetry::{error, info, warn};
use crossbeam_utils::atomic::AtomicCell;
@@ -77,6 +77,8 @@ pub enum RegionLeaderState {
Writable,
/// The region is in staging mode - writable but no checkpoint/compaction.
Staging,
/// The region is entering staging mode. - write requests will be stalled.
EnteringStaging,
/// The region is altering.
Altering,
/// The region is dropping.
@@ -138,6 +140,14 @@ pub struct MitoRegion {
pub(crate) topic_latest_entry_id: AtomicU64,
/// The total bytes written to the region.
pub(crate) written_bytes: Arc<AtomicU64>,
/// The partition expression of the region in staging mode.
///
/// During the staging mode, the region metadata in [`VersionControlRef`] is not updated,
/// so we need to store the partition expression separately.
/// TODO(weny):
/// 1. Reload the staging partition expr during region open.
/// 2. Rejects requests with mismatching partition expr.
pub(crate) staging_partition_expr: Mutex<Option<String>>,
/// manifest stats
stats: ManifestStats,
}
@@ -326,11 +336,19 @@ impl MitoRegion {
)
}
/// Sets the entering staging state.
pub(crate) fn set_entering_staging(&self) -> Result<()> {
self.compare_exchange_state(
RegionLeaderState::Writable,
RegionRoleState::Leader(RegionLeaderState::EnteringStaging),
)
}
/// Exits the staging state back to writable.
///
/// You should call this method in the worker loop.
/// Transitions from Staging to Writable state.
fn exit_staging(&self) -> Result<()> {
pub fn exit_staging(&self) -> Result<()> {
self.compare_exchange_state(
RegionLeaderState::Staging,
RegionRoleState::Leader(RegionLeaderState::Writable),
@@ -457,10 +475,7 @@ impl MitoRegion {
sst_format: current_version.options.sst_format.unwrap_or_default(),
});
let result = manager
.update(
RegionMetaActionList::with_action(action),
RegionRoleState::Leader(RegionLeaderState::Writable),
)
.update(RegionMetaActionList::with_action(action), false)
.await;
match result {
@@ -492,6 +507,16 @@ impl MitoRegion {
}
}
/// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Staging)` if the current state is `expect`.
/// Otherwise, logs an error.
pub(crate) fn switch_state_to_staging(&self, expect: RegionLeaderState) {
if let Err(e) =
self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Staging))
{
error!(e; "failed to switch region state to staging, expect state is {:?}", expect);
}
}
/// Returns the region statistic.
pub(crate) fn region_statistic(&self) -> RegionStatistic {
let version = self.version();
@@ -575,10 +600,19 @@ impl MitoRegion {
.flat_map(|level| level.files().map(|file| file.file_id().file_id()))
.collect::<HashSet<_>>();
self.manifest_ctx
.manifest()
let manifest_files = self.manifest_ctx.manifest().await.files.clone();
let staging_files = self
.manifest_ctx
.staging_manifest()
.await
.files
.map(|m| m.files.clone())
.unwrap_or_default();
let files = manifest_files
.into_iter()
.chain(staging_files.into_iter())
.collect::<HashMap<_, _>>();
files
.values()
.map(|meta| {
let region_id = self.region_id;
@@ -654,9 +688,8 @@ impl MitoRegion {
};
// Submit merged actions using the manifest manager's update method
// Pass the target state (Writable) so it saves to normal directory, not staging
let target_state = RegionRoleState::Leader(RegionLeaderState::Writable);
let new_version = manager.update(merged_actions.clone(), target_state).await?;
// Pass the `false` so it saves to normal directory, not staging
let new_version = manager.update(merged_actions.clone(), false).await?;
info!(
"Successfully submitted merged staged manifests for region {}, new version: {}",
@@ -731,6 +764,7 @@ impl ManifestContext {
&self,
expect_state: RegionLeaderState,
action_list: RegionMetaActionList,
is_staging: bool,
) -> Result<ManifestVersion> {
// Acquires the write lock of the manifest manager.
let mut manager = self.manifest_manager.write().await;
@@ -806,7 +840,7 @@ impl ManifestContext {
}
// Now we can update the manifest.
let version = manager.update(action_list, current_state).await.inspect_err(
let version = manager.update(action_list, is_staging).await.inspect_err(
|e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
)?;
@@ -913,9 +947,17 @@ impl ManifestContext {
}
}
/// Returns the normal manifest of the region.
pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
self.manifest_manager.read().await.manifest()
}
/// Returns the staging manifest of the region.
pub(crate) async fn staging_manifest(
&self,
) -> Option<Arc<crate::manifest::action::RegionManifest>> {
self.manifest_manager.read().await.staging_manifest()
}
}
pub(crate) type ManifestContextRef = Arc<ManifestContext>;
@@ -1213,8 +1255,8 @@ impl ManifestStats {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex};
use common_datasource::compression::CompressionType;
use common_test_util::temp_dir::create_temp_dir;
@@ -1404,6 +1446,7 @@ mod tests {
topic_latest_entry_id: Default::default(),
written_bytes: Arc::new(AtomicU64::new(0)),
stats: ManifestStats::default(),
staging_partition_expr: Mutex::new(None),
};
// Test initial state

View File

@@ -16,8 +16,8 @@
use std::any::TypeId;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicI64, AtomicU64};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use common_telemetry::{debug, error, info, warn};
@@ -334,6 +334,7 @@ impl RegionOpener {
topic_latest_entry_id: AtomicU64::new(0),
written_bytes: Arc::new(AtomicU64::new(0)),
stats: self.stats,
staging_partition_expr: Mutex::new(None),
}))
}
@@ -563,6 +564,8 @@ impl RegionOpener {
topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id),
written_bytes: Arc::new(AtomicU64::new(0)),
stats: self.stats.clone(),
// TODO(weny): reload the staging partition expr from the manifest.
staging_partition_expr: Mutex::new(None),
};
let region = Arc::new(region);
@@ -973,6 +976,7 @@ fn can_load_cache(state: RegionRoleState) -> bool {
RegionRoleState::Leader(RegionLeaderState::Writable)
| RegionRoleState::Leader(RegionLeaderState::Staging)
| RegionRoleState::Leader(RegionLeaderState::Altering)
| RegionRoleState::Leader(RegionLeaderState::EnteringStaging)
| RegionRoleState::Leader(RegionLeaderState::Editing)
| RegionRoleState::Follower => true,
// The region will be closed soon if it is downgrading.

View File

@@ -20,7 +20,6 @@ use std::time::Instant;
use api::helper::{
ColumnDataTypeWrapper, is_column_type_value_eq, is_semantic_type_eq, proto_value_type,
to_proto_value,
};
use api::v1::column_def::options_from_column_schema;
use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
@@ -36,9 +35,10 @@ use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint}
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
use store_api::region_request::{
AffectedRows, RegionAlterRequest, RegionBuildIndexRequest, RegionBulkInsertsRequest,
RegionCatchupRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest,
RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest,
AffectedRows, EnterStagingRequest, RegionAlterRequest, RegionBuildIndexRequest,
RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest, RegionCompactRequest,
RegionCreateRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest,
RegionTruncateRequest,
};
use store_api::storage::{FileId, RegionId};
use tokio::sync::oneshot::{self, Receiver, Sender};
@@ -414,7 +414,7 @@ impl WriteRequest {
};
// Convert default value into proto's value.
Ok(to_proto_value(default_value))
Ok(api::helper::to_grpc_value(default_value))
}
}
@@ -726,6 +726,11 @@ impl WorkerRequest {
sender: sender.into(),
request: DdlRequest::Catchup((v, None)),
}),
RegionRequest::EnterStaging(v) => WorkerRequest::Ddl(SenderDdlRequest {
region_id,
sender: sender.into(),
request: DdlRequest::EnterStaging(v),
}),
RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
metadata: region_metadata,
sender: sender.into(),
@@ -823,6 +828,7 @@ pub(crate) enum DdlRequest {
BuildIndex(RegionBuildIndexRequest),
Truncate(RegionTruncateRequest),
Catchup((RegionCatchupRequest, Option<WalEntryReceiver>)),
EnterStaging(EnterStagingRequest),
}
/// Sender and Ddl request.
@@ -859,6 +865,8 @@ pub(crate) enum BackgroundNotify {
RegionChange(RegionChangeResult),
/// Region edit result.
RegionEdit(RegionEditResult),
/// Enter staging result.
EnterStaging(EnterStagingResult),
}
/// Notifies a flush job is finished.
@@ -876,6 +884,8 @@ pub(crate) struct FlushFinished {
pub(crate) edit: RegionEdit,
/// Memtables to remove.
pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
/// Whether the region is in staging mode.
pub(crate) is_staging: bool,
}
impl FlushFinished {
@@ -1000,6 +1010,19 @@ pub(crate) struct RegionChangeResult {
pub(crate) new_options: Option<RegionOptions>,
}
/// Notifies the region the result of entering staging.
#[derive(Debug)]
pub(crate) struct EnterStagingResult {
/// Region id.
pub(crate) region_id: RegionId,
/// The new partition expression to apply.
pub(crate) partition_expr: String,
/// Result sender.
pub(crate) sender: OptionOutputTx,
/// Result from the manifest manager.
pub(crate) result: Result<()>,
}
/// Request to edit a region directly.
#[derive(Debug)]
pub(crate) struct RegionEditRequest {

View File

@@ -776,6 +776,7 @@ impl IndexBuildTask {
.update_manifest(
RegionLeaderState::Writable,
RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())),
false,
)
.await?;
info!(

View File

@@ -39,7 +39,7 @@ use common_meta::cache::{new_schema_cache, new_table_schema_cache};
use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_telemetry::warn;
use common_telemetry::{debug, warn};
use common_test_util::temp_dir::{TempDir, create_temp_dir};
use common_wal::options::{KafkaWalOptions, WAL_OPTIONS_KEY, WalOptions};
use datatypes::arrow::array::{TimestampMillisecondArray, UInt8Array, UInt64Array};
@@ -50,6 +50,7 @@ use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::test_util::log_store_util;
use moka::future::CacheBuilder;
use object_store::ObjectStore;
use object_store::layers::mock::MockLayer;
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::services::Fs;
use rskafka::client::partition::{Compression, UnknownTopicHandling};
@@ -228,6 +229,7 @@ pub struct TestEnv {
file_ref_manager: FileReferenceManagerRef,
kv_backend: KvBackendRef,
partition_expr_fetcher: PartitionExprFetcherRef,
object_store_mock_layer: Option<MockLayer>,
}
impl TestEnv {
@@ -264,6 +266,7 @@ impl TestEnv {
file_ref_manager: Arc::new(FileReferenceManager::new(None)),
kv_backend,
partition_expr_fetcher: noop_partition_expr_fetcher(),
object_store_mock_layer: None,
}
}
@@ -273,6 +276,12 @@ impl TestEnv {
self
}
/// Sets the original `object_store_mock_layer`.
pub fn with_mock_layer(mut self, mock_layer: MockLayer) -> TestEnv {
self.object_store_mock_layer = Some(mock_layer);
self
}
pub fn get_object_store(&self) -> Option<ObjectStore> {
self.object_store_manager
.as_ref()
@@ -569,7 +578,16 @@ impl TestEnv {
let data_home = self.data_home.path();
let data_path = data_home.join("data").as_path().display().to_string();
let builder = Fs::default().root(&data_path);
let object_store = ObjectStore::new(builder).unwrap().finish();
let object_store = if let Some(mock_layer) = self.object_store_mock_layer.as_ref() {
debug!("create object store with mock layer");
ObjectStore::new(builder)
.unwrap()
.layer(mock_layer.clone())
.finish()
} else {
ObjectStore::new(builder).unwrap().finish()
};
ObjectStoreManager::new("default", object_store)
}

View File

@@ -21,6 +21,7 @@ mod handle_close;
mod handle_compaction;
mod handle_create;
mod handle_drop;
mod handle_enter_staging;
mod handle_flush;
mod handle_manifest;
mod handle_open;
@@ -1039,8 +1040,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
continue;
}
DdlRequest::Flush(req) => {
self.handle_flush_request(ddl.region_id, req, ddl.sender)
.await;
self.handle_flush_request(ddl.region_id, req, ddl.sender);
continue;
}
DdlRequest::Compact(req) => {
@@ -1063,6 +1063,15 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.await;
continue;
}
DdlRequest::EnterStaging(req) => {
self.handle_enter_staging_request(
ddl.region_id,
req.partition_expr,
ddl.sender,
)
.await;
continue;
}
};
ddl.sender.send(res);
@@ -1111,6 +1120,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
BackgroundNotify::RegionChange(req) => {
self.handle_manifest_region_change_result(req).await
}
BackgroundNotify::EnterStaging(req) => self.handle_enter_staging_result(req).await,
BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
}
}
@@ -1272,6 +1282,13 @@ impl WorkerListener {
}
}
pub(crate) async fn on_enter_staging_result_begin(&self, _region_id: RegionId) {
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener.on_enter_staging_result_begin(_region_id).await;
}
}
pub(crate) async fn on_index_build_finish(&self, _region_file_id: RegionFileId) {
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {

View File

@@ -113,7 +113,13 @@ impl<S> RegionWorkerLoop<S> {
info!("Flush region: {} before alteration", region_id);
// Try to submit a flush task.
let task = self.new_flush_task(&region, FlushReason::Alter, None, self.config.clone());
let task = self.new_flush_task(
&region,
FlushReason::Alter,
None,
self.config.clone(),
region.is_staging(),
);
if let Err(e) =
self.flush_scheduler
.schedule_flush(region.region_id, &region.version_control, task)

View File

@@ -0,0 +1,249 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::time::Instant;
use common_telemetry::{error, info, warn};
use store_api::logstore::LogStore;
use store_api::region_request::EnterStagingRequest;
use store_api::storage::RegionId;
use crate::error::{RegionNotFoundSnafu, Result, StagingPartitionExprMismatchSnafu};
use crate::flush::FlushReason;
use crate::manifest::action::{RegionChange, RegionMetaAction, RegionMetaActionList};
use crate::region::{MitoRegionRef, RegionLeaderState};
use crate::request::{
BackgroundNotify, DdlRequest, EnterStagingResult, OptionOutputTx, SenderDdlRequest,
WorkerRequest, WorkerRequestWithTime,
};
use crate::worker::RegionWorkerLoop;
impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) async fn handle_enter_staging_request(
&mut self,
region_id: RegionId,
partition_expr: String,
mut sender: OptionOutputTx,
) {
let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
return;
};
// If the region is already in staging mode, verify the partition expr matches.
if region.is_staging() {
let staging_partition_expr = region.staging_partition_expr.lock().unwrap().clone();
// If the partition expr mismatch, return error.
if staging_partition_expr.as_ref() != Some(&partition_expr) {
sender.send(Err(StagingPartitionExprMismatchSnafu {
manifest_expr: staging_partition_expr,
request_expr: partition_expr,
}
.build()));
return;
}
// If the partition expr matches, return success.
sender.send(Ok(0));
return;
}
let version = region.version();
if !version.memtables.is_empty() {
// If memtable is not empty, we can't enter staging directly and need to flush
// all memtables first.
info!("Flush region: {} before entering staging", region_id);
debug_assert!(!region.is_staging());
let task = self.new_flush_task(
&region,
FlushReason::EnterStaging,
None,
self.config.clone(),
region.is_staging(),
);
if let Err(e) =
self.flush_scheduler
.schedule_flush(region.region_id, &region.version_control, task)
{
// Unable to flush the region, send error to waiter.
sender.send(Err(e));
return;
}
// Safety: We have requested flush.
self.flush_scheduler
.add_ddl_request_to_pending(SenderDdlRequest {
region_id,
sender,
request: DdlRequest::EnterStaging(EnterStagingRequest { partition_expr }),
});
return;
}
self.handle_enter_staging(region, partition_expr, sender);
}
async fn enter_staging(region: &MitoRegionRef, partition_expr: String) -> Result<()> {
let now = Instant::now();
// First step: clear all staging manifest files.
{
let mut manager = region.manifest_ctx.manifest_manager.write().await;
manager
.clear_staging_manifest_and_dir()
.await
.inspect_err(|e| {
error!(
e;
"Failed to clear staging manifest files for region {}",
region.region_id
);
})?;
info!(
"Cleared all staging manifest files for region {}, elapsed: {:?}",
region.region_id,
now.elapsed(),
);
}
// Second step: write new staging manifest.
let mut new_meta = (*region.metadata()).clone();
new_meta.partition_expr = Some(partition_expr.clone());
let sst_format = region.version().options.sst_format.unwrap_or_default();
let change = RegionChange {
metadata: Arc::new(new_meta),
sst_format,
};
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change));
region
.manifest_ctx
.update_manifest(RegionLeaderState::EnteringStaging, action_list, true)
.await?;
Ok(())
}
fn handle_enter_staging(
&self,
region: MitoRegionRef,
partition_expr: String,
sender: OptionOutputTx,
) {
if let Err(e) = region.set_entering_staging() {
sender.send(Err(e));
return;
}
let listener = self.listener.clone();
let request_sender = self.sender.clone();
common_runtime::spawn_global(async move {
let now = Instant::now();
let result = Self::enter_staging(&region, partition_expr.clone()).await;
match result {
Ok(_) => {
info!(
"Created staging manifest for region {}, elapsed: {:?}",
region.region_id,
now.elapsed(),
);
}
Err(ref e) => {
// Unset the staging manifest
region
.manifest_ctx
.manifest_manager
.write()
.await
.unset_staging_manifest();
error!(
"Failed to create staging manifest for region {}: {:?}, elapsed: {:?}",
region.region_id,
e,
now.elapsed(),
);
}
}
let notify = WorkerRequest::Background {
region_id: region.region_id,
notify: BackgroundNotify::EnterStaging(EnterStagingResult {
region_id: region.region_id,
sender,
result,
partition_expr,
}),
};
listener
.on_enter_staging_result_begin(region.region_id)
.await;
if let Err(res) = request_sender
.send(WorkerRequestWithTime::new(notify))
.await
{
warn!(
"Failed to send enter staging result back to the worker, region_id: {}, res: {:?}",
region.region_id, res
);
}
});
}
/// Handles enter staging result.
pub(crate) async fn handle_enter_staging_result(
&mut self,
enter_staging_result: EnterStagingResult,
) {
let region = match self.regions.get_region(enter_staging_result.region_id) {
Some(region) => region,
None => {
self.reject_region_stalled_requests(&enter_staging_result.region_id);
enter_staging_result.sender.send(
RegionNotFoundSnafu {
region_id: enter_staging_result.region_id,
}
.fail(),
);
return;
}
};
if enter_staging_result.result.is_ok() {
info!(
"Updating region {} staging partition expr to {}",
region.region_id, enter_staging_result.partition_expr
);
Self::update_region_staging_partition_expr(
&region,
enter_staging_result.partition_expr,
);
region.switch_state_to_staging(RegionLeaderState::EnteringStaging);
} else {
region.switch_state_to_writable(RegionLeaderState::EnteringStaging);
}
enter_staging_result
.sender
.send(enter_staging_result.result.map(|_| 0));
// Handles the stalled requests.
self.handle_region_stalled_requests(&enter_staging_result.region_id)
.await;
}
fn update_region_staging_partition_expr(region: &MitoRegionRef, partition_expr: String) {
let mut staging_partition_expr = region.staging_partition_expr.lock().unwrap();
debug_assert!(staging_partition_expr.is_none());
*staging_partition_expr = Some(partition_expr);
}
}

View File

@@ -76,8 +76,13 @@ impl<S> RegionWorkerLoop<S> {
if region.last_flush_millis() < min_last_flush_time {
// If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
let task =
self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
let task = self.new_flush_task(
region,
FlushReason::EngineFull,
None,
self.config.clone(),
region.is_staging(),
);
self.flush_scheduler.schedule_flush(
region.region_id,
&region.version_control,
@@ -91,8 +96,13 @@ impl<S> RegionWorkerLoop<S> {
if let Some(region) = max_mem_region
&& !self.flush_scheduler.is_flush_requested(region.region_id)
{
let task =
self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
let task = self.new_flush_task(
region,
FlushReason::EngineFull,
None,
self.config.clone(),
region.is_staging(),
);
self.flush_scheduler
.schedule_flush(region.region_id, &region.version_control, task)?;
}
@@ -107,6 +117,7 @@ impl<S> RegionWorkerLoop<S> {
reason: FlushReason,
row_group_size: Option<usize>,
engine_config: Arc<MitoConfig>,
is_staging: bool,
) -> RegionFlushTask {
RegionFlushTask {
region_id: region.region_id,
@@ -121,13 +132,14 @@ impl<S> RegionWorkerLoop<S> {
manifest_ctx: region.manifest_ctx.clone(),
index_options: region.version().options.index_options.clone(),
flush_semaphore: self.flush_semaphore.clone(),
is_staging,
}
}
}
impl<S: LogStore> RegionWorkerLoop<S> {
/// Handles manual flush request.
pub(crate) async fn handle_flush_request(
pub(crate) fn handle_flush_request(
&mut self,
region_id: RegionId,
request: RegionFlushRequest,
@@ -147,8 +159,13 @@ impl<S: LogStore> RegionWorkerLoop<S> {
FlushReason::Manual
};
let mut task =
self.new_flush_task(&region, reason, request.row_group_size, self.config.clone());
let mut task = self.new_flush_task(
&region,
reason,
request.row_group_size,
self.config.clone(),
region.is_staging(),
);
task.push_sender(sender);
if let Err(e) =
self.flush_scheduler
@@ -178,6 +195,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
FlushReason::Periodically,
None,
self.config.clone(),
region.is_staging(),
);
self.flush_scheduler.schedule_flush(
region.region_id,
@@ -208,11 +226,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
};
// Check if region is currently in staging mode
let is_staging = region.manifest_ctx.current_state()
== crate::region::RegionRoleState::Leader(crate::region::RegionLeaderState::Staging);
if is_staging {
if request.is_staging {
// Skip the region metadata update.
info!(
"Skipping region metadata update for region {} in staging mode",
region_id

View File

@@ -346,6 +346,7 @@ impl<S> RegionWorkerLoop<S> {
let request_sender = self.sender.clone();
let manifest_ctx = region.manifest_ctx.clone();
let is_staging = region.is_staging();
// Updates manifest in background.
common_runtime::spawn_global(async move {
@@ -354,7 +355,7 @@ impl<S> RegionWorkerLoop<S> {
RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone()));
let result = manifest_ctx
.update_manifest(RegionLeaderState::Truncating, action_list)
.update_manifest(RegionLeaderState::Truncating, action_list, is_staging)
.await
.map(|_| ());
@@ -391,6 +392,7 @@ impl<S> RegionWorkerLoop<S> {
}
let listener = self.listener.clone();
let request_sender = self.sender.clone();
let is_staging = region.is_staging();
// Now the region is in altering state.
common_runtime::spawn_global(async move {
let new_meta = change.metadata.clone();
@@ -398,7 +400,7 @@ impl<S> RegionWorkerLoop<S> {
let result = region
.manifest_ctx
.update_manifest(RegionLeaderState::Altering, action_list)
.update_manifest(RegionLeaderState::Altering, action_list, is_staging)
.await
.map(|_| ());
let notify = WorkerRequest::Background {
@@ -463,6 +465,7 @@ async fn edit_region(
listener: WorkerListener,
) -> Result<()> {
let region_id = region.region_id;
let is_staging = region.is_staging();
if let Some(write_cache) = cache_manager.write_cache() {
for file_meta in &edit.files_to_add {
let write_cache = write_cache.clone();
@@ -532,7 +535,7 @@ async fn edit_region(
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit));
region
.manifest_ctx
.update_manifest(RegionLeaderState::Editing, action_list)
.update_manifest(RegionLeaderState::Editing, action_list, is_staging)
.await
.map(|_| ())
}

View File

@@ -241,6 +241,12 @@ impl<S> RegionWorkerLoop<S> {
// No such region.
continue;
};
#[cfg(test)]
debug!(
"Handling write request for region {}, state: {:?}",
region_id,
region.state()
);
match region.state() {
RegionRoleState::Leader(RegionLeaderState::Writable)
| RegionRoleState::Leader(RegionLeaderState::Staging) => {
@@ -263,6 +269,16 @@ impl<S> RegionWorkerLoop<S> {
self.stalled_requests.push(sender_req);
continue;
}
RegionRoleState::Leader(RegionLeaderState::EnteringStaging) => {
debug!(
"Region {} is entering staging, add request to pending writes",
region.region_id
);
self.stalling_count.add(1);
WRITE_STALL_TOTAL.inc();
self.stalled_requests.push(sender_req);
continue;
}
state => {
// The region is not writable.
sender_req.sender.send(
@@ -388,17 +404,14 @@ impl<S> RegionWorkerLoop<S> {
let need_fill_missing_columns = region_ctx.version().metadata.schema_version
!= bulk_req.region_metadata.schema_version;
// Only fill missing columns if primary key is dense encoded.
if need_fill_missing_columns {
// todo(hl): support filling default columns
bulk_req.sender.send(
InvalidRequestSnafu {
region_id,
reason: "Schema mismatch",
}
.fail(),
);
return;
// Fill missing columns if needed
if need_fill_missing_columns
&& let Err(e) = bulk_req
.request
.fill_missing_columns(&region_ctx.version().metadata)
{
bulk_req.sender.send(Err(e));
continue;
}
// Collect requests by region.

View File

@@ -9,6 +9,7 @@ workspace = true
[features]
services-memory = ["opendal/services-memory"]
testing = ["derive_builder"]
[dependencies]
bytes.workspace = true
@@ -16,6 +17,7 @@ common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-telemetry.workspace = true
derive_builder = { workspace = true, optional = true }
futures.workspace = true
humantime-serde.workspace = true
lazy_static.workspace = true

View File

@@ -13,6 +13,8 @@
// limitations under the License.
mod lru_cache;
#[cfg(feature = "testing")]
pub mod mock;
pub use lru_cache::*;
pub use opendal::layers::*;

View File

@@ -0,0 +1,217 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Debug;
use std::sync::Arc;
use derive_builder::Builder;
pub use oio::*;
pub use opendal::raw::{
Access, Layer, LayeredAccess, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead,
RpWrite, oio,
};
pub use opendal::{Buffer, Error, ErrorKind, Metadata, Result};
pub type MockWriterFactory = Arc<dyn Fn(&str, OpWrite, oio::Writer) -> oio::Writer + Send + Sync>;
pub type MockReaderFactory = Arc<dyn Fn(&str, OpRead, oio::Reader) -> oio::Reader + Send + Sync>;
pub type MockListerFactory = Arc<dyn Fn(&str, OpList, oio::Lister) -> oio::Lister + Send + Sync>;
pub type MockDeleterFactory = Arc<dyn Fn(oio::Deleter) -> oio::Deleter + Send + Sync>;
#[derive(Builder)]
pub struct MockLayer {
#[builder(setter(strip_option), default)]
writer_factory: Option<MockWriterFactory>,
#[builder(setter(strip_option), default)]
reader_factory: Option<MockReaderFactory>,
#[builder(setter(strip_option), default)]
lister_factory: Option<MockListerFactory>,
#[builder(setter(strip_option), default)]
deleter_factory: Option<MockDeleterFactory>,
}
impl Clone for MockLayer {
fn clone(&self) -> Self {
Self {
writer_factory: self.writer_factory.clone(),
reader_factory: self.reader_factory.clone(),
lister_factory: self.lister_factory.clone(),
deleter_factory: self.deleter_factory.clone(),
}
}
}
impl<A: Access> Layer<A> for MockLayer {
type LayeredAccess = MockAccessor<A>;
fn layer(&self, inner: A) -> Self::LayeredAccess {
MockAccessor {
inner,
writer_factory: self.writer_factory.clone(),
reader_factory: self.reader_factory.clone(),
lister_factory: self.lister_factory.clone(),
deleter_factory: self.deleter_factory.clone(),
}
}
}
pub struct MockAccessor<A> {
inner: A,
writer_factory: Option<MockWriterFactory>,
reader_factory: Option<MockReaderFactory>,
lister_factory: Option<MockListerFactory>,
deleter_factory: Option<MockDeleterFactory>,
}
impl<A: Debug> Debug for MockAccessor<A> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MockAccessor")
.field("inner", &self.inner)
.finish()
}
}
pub struct MockReader {
inner: oio::Reader,
}
impl oio::Read for MockReader {
async fn read(&mut self) -> Result<Buffer> {
self.inner.read().await
}
}
pub struct MockWriter {
inner: oio::Writer,
}
impl oio::Write for MockWriter {
async fn write(&mut self, bs: Buffer) -> Result<()> {
self.inner.write(bs).await
}
async fn close(&mut self) -> Result<Metadata> {
self.inner.close().await
}
async fn abort(&mut self) -> Result<()> {
self.inner.abort().await
}
}
pub struct MockLister {
inner: oio::Lister,
}
impl oio::List for MockLister {
async fn next(&mut self) -> Result<Option<oio::Entry>> {
self.inner.next().await
}
}
pub struct MockDeleter {
inner: oio::Deleter,
}
impl oio::Delete for MockDeleter {
fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
self.inner.delete(path, args)
}
async fn flush(&mut self) -> Result<usize> {
self.inner.flush().await
}
}
impl<A: Access> LayeredAccess for MockAccessor<A> {
type Inner = A;
type Reader = MockReader;
type Writer = MockWriter;
type Lister = MockLister;
type Deleter = MockDeleter;
fn inner(&self) -> &Self::Inner {
&self.inner
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
if let Some(reader_factory) = self.reader_factory.as_ref() {
let (rp_read, reader) = self.inner.read(path, args.clone()).await?;
let reader = reader_factory(path, args, Box::new(reader));
Ok((rp_read, MockReader { inner: reader }))
} else {
self.inner.read(path, args).await.map(|(rp_read, reader)| {
(
rp_read,
MockReader {
inner: Box::new(reader),
},
)
})
}
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
if let Some(writer_factory) = self.writer_factory.as_ref() {
let (rp_write, writer) = self.inner.write(path, args.clone()).await?;
let writer = writer_factory(path, args, Box::new(writer));
Ok((rp_write, MockWriter { inner: writer }))
} else {
self.inner
.write(path, args)
.await
.map(|(rp_write, writer)| {
(
rp_write,
MockWriter {
inner: Box::new(writer),
},
)
})
}
}
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
if let Some(deleter_factory) = self.deleter_factory.as_ref() {
let (rp_delete, deleter) = self.inner.delete().await?;
let deleter = deleter_factory(Box::new(deleter));
Ok((rp_delete, MockDeleter { inner: deleter }))
} else {
self.inner.delete().await.map(|(rp_delete, deleter)| {
(
rp_delete,
MockDeleter {
inner: Box::new(deleter),
},
)
})
}
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
if let Some(lister_factory) = self.lister_factory.as_ref() {
let (rp_list, lister) = self.inner.list(path, args.clone()).await?;
let lister = lister_factory(path, args, Box::new(lister));
Ok((rp_list, MockLister { inner: lister }))
} else {
self.inner.list(path, args).await.map(|(rp_list, lister)| {
(
rp_list,
MockLister {
inner: Box::new(lister),
},
)
})
}
}
}

View File

@@ -66,6 +66,7 @@ impl Inserter {
return Ok(0);
}
// TODO(yingwen): Fill record batch impure default values.
// notify flownode to update dirty timestamps if flow is configured.
self.maybe_update_flow_dirty_window(table_info.clone(), record_batch.clone());

View File

@@ -762,7 +762,8 @@ pub(crate) fn to_alter_table_expr(
target_type,
} => {
let target_type =
sql_data_type_to_concrete_data_type(&target_type).context(ParseSqlSnafu)?;
sql_data_type_to_concrete_data_type(&target_type, &Default::default())
.context(ParseSqlSnafu)?;
let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type)
.map(|w| w.to_parts())
.context(ColumnDataTypeSnafu)?;

View File

@@ -353,10 +353,11 @@ impl Inserter {
&self,
insert: &Insert,
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<Output> {
let (inserts, table_info) =
StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
.convert(insert, ctx)
.convert(insert, ctx, statement_executor)
.await?;
let table_infos =

View File

@@ -36,6 +36,7 @@ pub fn find_all_impure_columns(table_info: &TableInfo) -> Vec<ColumnSchema> {
.collect()
}
// TODO(yingwen): Support Bulk insert request.
/// Fill impure default values in the request
pub struct ImpureDefaultFiller {
impure_columns: HashMap<String, (api::v1::ColumnSchema, api::v1::Value)>,
@@ -62,7 +63,7 @@ impl ImpureDefaultFiller {
column.default_constraint()
),
})?;
let grpc_default_value = api::helper::to_proto_value(default_value);
let grpc_default_value = api::helper::to_grpc_value(default_value);
let def = column_schemas_to_defs(vec![column], &pk_names)?.swap_remove(0);
let grpc_column_schema = api::v1::ColumnSchema {
column_name: def.name,

View File

@@ -12,13 +12,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::helper::{ColumnDataTypeWrapper, value_to_grpc_value};
use std::cell::LazyCell;
use std::collections::HashMap;
use api::helper::{ColumnDataTypeWrapper, to_grpc_value};
use api::v1::alter_table_expr::Kind;
use api::v1::column_def::options_from_column_schema;
use api::v1::region::InsertRequests as RegionInsertRequests;
use api::v1::{ColumnSchema as GrpcColumnSchema, Row, Rows, Value as GrpcValue};
use api::v1::{
AlterTableExpr, ColumnSchema as GrpcColumnSchema, ModifyColumnType, ModifyColumnTypes, Row,
Rows,
};
use catalog::CatalogManager;
use common_telemetry::info;
use common_time::Timezone;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaRef};
use datatypes::types::JsonType;
use datatypes::value::Value;
use partition::manager::PartitionRuleManager;
use session::context::{QueryContext, QueryContextRef};
use snafu::{OptionExt, ResultExt, ensure};
@@ -30,12 +41,13 @@ use table::metadata::TableInfoRef;
use crate::error::{
CatalogSnafu, ColumnDataTypeSnafu, ColumnDefaultValueSnafu, ColumnNoneDefaultValueSnafu,
ColumnNotFoundSnafu, InvalidSqlSnafu, MissingInsertBodySnafu, ParseSqlSnafu, Result,
SchemaReadOnlySnafu, TableNotFoundSnafu,
ColumnNotFoundSnafu, InvalidInsertRequestSnafu, InvalidSqlSnafu, MissingInsertBodySnafu,
ParseSqlSnafu, Result, SchemaReadOnlySnafu, TableNotFoundSnafu,
};
use crate::insert::InstantAndNormalInsertRequests;
use crate::req_convert::common::partitioner::Partitioner;
use crate::req_convert::insert::semantic_type;
use crate::statement::StatementExecutor;
const DEFAULT_PLACEHOLDER_VALUE: &str = "default";
@@ -62,12 +74,12 @@ impl<'a> StatementToRegion<'a> {
&self,
stmt: &Insert,
query_ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<(InstantAndNormalInsertRequests, TableInfoRef)> {
let name = stmt.table_name().context(ParseSqlSnafu)?;
let (catalog, schema, table_name) = self.get_full_name(name)?;
let table = self.get_table(&catalog, &schema, &table_name).await?;
let mut table = self.get_table(&catalog, &schema, &table_name).await?;
let table_schema = table.schema();
let table_info = table.table_info();
ensure!(
!common_catalog::consts::is_readonly_schema(&schema),
@@ -94,7 +106,6 @@ impl<'a> StatementToRegion<'a> {
Ok(())
})?;
let mut schema = Vec::with_capacity(column_count);
let mut rows = vec![
Row {
values: Vec::with_capacity(column_count)
@@ -102,17 +113,57 @@ impl<'a> StatementToRegion<'a> {
row_count
];
for (i, column_name) in column_names.into_iter().enumerate() {
let column_schema = table_schema
.column_schema_by_name(column_name)
.with_context(|| ColumnNotFoundSnafu {
msg: format!("Column {} not found in table {}", column_name, &table_name),
})?;
fn find_insert_columns<'a>(
table: &'a TableRef,
column_names: &[&String],
) -> Result<Vec<&'a ColumnSchema>> {
let schema = table.schema_ref();
column_names
.iter()
.map(|name| {
schema
.column_schema_by_name(name)
.context(ColumnNotFoundSnafu { msg: *name })
})
.collect::<Result<Vec<_>>>()
}
let mut insert_columns = find_insert_columns(&table, &column_names)?;
let converter = SqlRowConverter::new(&insert_columns, query_ctx);
// Convert the SQL values to GreptimeDB values, and merge a "largest" JSON types of all
// values on the way by `JsonColumnTypeUpdater`.
let mut updater = JsonColumnTypeUpdater::new(statement_executor, query_ctx);
let value_rows = converter.convert(&mut updater, &sql_rows)?;
// If the JSON values have a "larger" json type than the one in the table schema, modify
// the column's json type first, by executing an "alter table" DDL.
if updater
.maybe_update_column_type(&catalog, &schema, &table_name, &insert_columns)
.await?
{
// Update with the latest schema, if changed.
table = self.get_table(&catalog, &schema, &table_name).await?;
insert_columns = find_insert_columns(&table, &column_names)?;
}
// Finally convert GreptimeDB values to GRPC values, ready to do insertion on Datanode.
for (i, row) in value_rows.into_iter().enumerate() {
for value in row {
let grpc_value = to_grpc_value(value);
rows[i].values.push(grpc_value);
}
}
let table_info = table.table_info();
let mut schema = Vec::with_capacity(column_count);
for column_schema in insert_columns {
let (datatype, datatype_extension) =
ColumnDataTypeWrapper::try_from(column_schema.data_type.clone())
.context(ColumnDataTypeSnafu)?
.to_parts();
let column_name = &column_schema.name;
let semantic_type = semantic_type(&table_info, column_name)?;
let grpc_column_schema = GrpcColumnSchema {
@@ -123,16 +174,6 @@ impl<'a> StatementToRegion<'a> {
options: options_from_column_schema(column_schema),
};
schema.push(grpc_column_schema);
for (sql_row, grpc_row) in sql_rows.iter().zip(rows.iter_mut()) {
let value = sql_value_to_grpc_value(
column_schema,
&sql_row[i],
Some(&query_ctx.timezone()),
query_ctx.auto_string_to_numeric(),
)?;
grpc_row.values.push(value);
}
}
let requests = Partitioner::new(self.partition_manager)
@@ -194,6 +235,147 @@ impl<'a> StatementToRegion<'a> {
}
}
struct SqlRowConverter<'a, 'b> {
insert_columns: &'a [&'a ColumnSchema],
query_context: &'b QueryContextRef,
}
impl<'a, 'b> SqlRowConverter<'a, 'b> {
fn new(insert_columns: &'a [&'a ColumnSchema], query_context: &'b QueryContextRef) -> Self {
Self {
insert_columns,
query_context,
}
}
fn convert(
&self,
updater: &mut JsonColumnTypeUpdater<'_, 'a>,
sql_rows: &[Vec<SqlValue>],
) -> Result<Vec<Vec<Value>>> {
let timezone = Some(&self.query_context.timezone());
let auto_string_to_numeric = self.query_context.auto_string_to_numeric();
let mut value_rows = Vec::with_capacity(sql_rows.len());
for sql_row in sql_rows {
let mut value_row = Vec::with_capacity(self.insert_columns.len());
for (insert_column, sql_value) in self.insert_columns.iter().zip(sql_row) {
let value =
sql_value_to_value(insert_column, sql_value, timezone, auto_string_to_numeric)?;
updater.merge_types(insert_column, &value)?;
value_row.push(value);
}
value_rows.push(value_row);
}
Ok(value_rows)
}
}
struct JsonColumnTypeUpdater<'a, 'b> {
statement_executor: &'a StatementExecutor,
query_context: &'a QueryContextRef,
merged_value_types: LazyCell<HashMap<&'b str, JsonType>>,
}
impl<'a, 'b> JsonColumnTypeUpdater<'a, 'b> {
fn new(statement_executor: &'a StatementExecutor, query_context: &'a QueryContextRef) -> Self {
Self {
statement_executor,
query_context,
merged_value_types: LazyCell::new(Default::default),
}
}
fn merge_types(&mut self, column_schema: &'b ColumnSchema, value: &Value) -> Result<()> {
if !matches!(value, Value::Json(_)) {
return Ok(());
}
if let ConcreteDataType::Json(value_type) = value.data_type() {
let merged_type = self
.merged_value_types
.entry(&column_schema.name)
.or_insert_with(|| value_type.clone());
if !merged_type.is_include(&value_type) {
merged_type.merge(&value_type).map_err(|e| {
InvalidInsertRequestSnafu {
reason: format!(r#"cannot merge "{value_type}" into "{merged_type}": {e}"#),
}
.build()
})?;
}
}
Ok(())
}
async fn maybe_update_column_type(
self,
catalog: &str,
schema: &str,
table: &str,
insert_columns: &[&ColumnSchema],
) -> Result<bool> {
let mut has_update = false;
for (column_name, merged_type) in self.merged_value_types.iter() {
let Some(column_type) = insert_columns
.iter()
.find_map(|x| (&x.name == column_name).then(|| x.data_type.as_json()))
.flatten()
else {
continue;
};
if column_type.is_include(merged_type) {
continue;
}
let new_column_type = {
let mut x = column_type.clone();
x.merge(merged_type)
.map_err(|e| {
InvalidInsertRequestSnafu {
reason: format!(
r#"cannot merge "{merged_type}" into "{column_type}": {e}"#
),
}
.build()
})
.map(|()| x)
}?;
info!(
"updating table {}.{}.{} column {} json type: {} => {}",
catalog, schema, table, column_name, column_type, new_column_type,
);
let (target_type, target_type_extension) =
ColumnDataTypeWrapper::try_from(ConcreteDataType::Json(new_column_type))
.context(ColumnDataTypeSnafu)?
.into_parts();
let alter_expr = AlterTableExpr {
catalog_name: catalog.to_string(),
schema_name: schema.to_string(),
table_name: table.to_string(),
kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes {
modify_column_types: vec![ModifyColumnType {
column_name: column_name.to_string(),
target_type: target_type as i32,
target_type_extension,
}],
})),
};
self.statement_executor
.alter_table_inner(alter_expr, self.query_context.clone())
.await?;
has_update = true;
}
Ok(has_update)
}
}
fn column_names<'a>(stmt: &'a Insert, table_schema: &'a SchemaRef) -> Vec<&'a String> {
if !stmt.columns().is_empty() {
stmt.columns()
@@ -209,12 +391,12 @@ fn column_names<'a>(stmt: &'a Insert, table_schema: &'a SchemaRef) -> Vec<&'a St
/// Converts SQL value to gRPC value according to the column schema.
/// If `auto_string_to_numeric` is true, tries to cast the string value to numeric values,
/// and fills the default value if the cast fails.
fn sql_value_to_grpc_value(
fn sql_value_to_value(
column_schema: &ColumnSchema,
sql_val: &SqlValue,
timezone: Option<&Timezone>,
auto_string_to_numeric: bool,
) -> Result<GrpcValue> {
) -> Result<Value> {
let column = &column_schema.name;
let value = if replace_default(sql_val) {
let default_value = column_schema
@@ -237,9 +419,25 @@ fn sql_value_to_grpc_value(
)
.context(crate::error::SqlCommonSnafu)?
};
validate(&value)?;
Ok(value)
}
let grpc_value = value_to_grpc_value(value);
Ok(grpc_value)
fn validate(value: &Value) -> Result<()> {
match value {
Value::Json(value) => {
// Json object will be stored as Arrow struct in parquet, and it has the restriction:
// "Parquet does not support writing empty structs".
ensure!(
!value.is_empty_object(),
InvalidInsertRequestSnafu {
reason: "empty json object is not supported, consider adding a dummy field"
}
);
Ok(())
}
_ => Ok(()),
}
}
fn replace_default(sql_val: &SqlValue) -> bool {

View File

@@ -46,7 +46,7 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::procedure_executor::ProcedureExecutorRef;
use common_query::Output;
use common_telemetry::{debug, tracing};
use common_telemetry::{debug, tracing, warn};
use common_time::Timestamp;
use common_time::range::TimestampRange;
use datafusion_expr::LogicalPlan;
@@ -88,6 +88,22 @@ use crate::insert::InserterRef;
use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
use crate::statement::set::set_allow_query_fallback;
/// A configurator that customizes or enhances a [`StatementExecutor`].
#[async_trait::async_trait]
pub trait StatementExecutorConfigurator: Send + Sync {
async fn configure(
&self,
executor: StatementExecutor,
ctx: ExecutorConfigureContext,
) -> std::result::Result<StatementExecutor, BoxedError>;
}
pub type StatementExecutorConfiguratorRef = Arc<dyn StatementExecutorConfigurator>;
pub struct ExecutorConfigureContext {
pub kv_backend: KvBackendRef,
}
#[derive(Clone)]
pub struct StatementExecutor {
catalog_manager: CatalogManagerRef,
@@ -106,15 +122,6 @@ pub struct StatementExecutor {
pub type StatementExecutorRef = Arc<StatementExecutor>;
/// Trait for creating [`TriggerQuerier`] instance.
#[cfg(feature = "enterprise")]
pub trait TriggerQuerierFactory: Send + Sync {
fn create(&self, kv_backend: KvBackendRef) -> TriggerQuerierRef;
}
#[cfg(feature = "enterprise")]
pub type TriggerQuerierFactoryRef = Arc<dyn TriggerQuerierFactory>;
/// Trait for querying trigger info, such as `SHOW CREATE TRIGGER` etc.
#[cfg(feature = "enterprise")]
#[async_trait::async_trait]
@@ -481,6 +488,11 @@ impl StatementExecutor {
"@@SESSION.MAX_EXECUTION_TIME" | "MAX_EXECUTION_TIME" => match query_ctx.channel() {
Channel::Mysql => set_query_timeout(set_var.value, query_ctx)?,
Channel::Postgres => {
warn!(
"Unsupported set variable {} for channel {:?}",
var_name,
query_ctx.channel()
);
query_ctx.set_warning(format!("Unsupported set variable {}", var_name))
}
_ => {
@@ -490,16 +502,23 @@ impl StatementExecutor {
.fail();
}
},
"STATEMENT_TIMEOUT" => {
if query_ctx.channel() == Channel::Postgres {
set_query_timeout(set_var.value, query_ctx)?
} else {
"STATEMENT_TIMEOUT" => match query_ctx.channel() {
Channel::Postgres => set_query_timeout(set_var.value, query_ctx)?,
Channel::Mysql => {
warn!(
"Unsupported set variable {} for channel {:?}",
var_name,
query_ctx.channel()
);
query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
}
_ => {
return NotSupportedSnafu {
feat: format!("Unsupported set variable {}", var_name),
}
.fail();
}
}
},
"SEARCH_PATH" => {
if query_ctx.channel() == Channel::Postgres {
set_search_path(set_var.value, query_ctx)?
@@ -511,14 +530,16 @@ impl StatementExecutor {
}
}
_ => {
// for postgres, we give unknown SET statements a warning with
// success, this is prevent the SET call becoming a blocker
// of connection establishment
//
if query_ctx.channel() == Channel::Postgres {
query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
} else if query_ctx.channel() == Channel::Mysql && var_name.starts_with("@@") {
// Just ignore `SET @@` commands for MySQL
if query_ctx.channel() == Channel::Postgres || query_ctx.channel() == Channel::Mysql
{
// For unknown SET statements, we give a warning with success.
// This prevents the SET call from becoming a blocker of MySQL/Postgres clients'
// connection establishment.
warn!(
"Unsupported set variable {} for channel {:?}",
var_name,
query_ctx.channel()
);
query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
} else {
return NotSupportedSnafu {

View File

@@ -28,7 +28,7 @@ impl StatementExecutor {
if insert.can_extract_values() {
// Fast path: plain insert ("insert with literal values") is executed directly
self.inserter
.handle_statement_insert(insert.as_ref(), &query_ctx)
.handle_statement_insert(insert.as_ref(), &query_ctx, self)
.await
} else {
// Slow path: insert with subquery. Execute using query engine.

View File

@@ -9,6 +9,7 @@ workspace = true
[dependencies]
auth.workspace = true
catalog.workspace = true
clap.workspace = true
cli.workspace = true
common-base.workspace = true
@@ -17,6 +18,7 @@ common-meta.workspace = true
datanode.workspace = true
flow.workspace = true
frontend.workspace = true
meta-client.workspace = true
meta-srv.workspace = true
serde.workspace = true
snafu.workspace = true

Some files were not shown because too many files have changed in this diff Show More