Compare commits

..

1 Commits

Author SHA1 Message Date
luofucong
e06ee80057 feat: ingest jsonbench data through pipeline
Signed-off-by: luofucong <luofc@foxmail.com>
2025-11-28 20:07:26 +08:00
113 changed files with 1101 additions and 4415 deletions

71
Cargo.lock generated
View File

@@ -3274,7 +3274,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"arrow",
"arrow-ipc",
@@ -3329,7 +3329,7 @@ dependencies = [
[[package]]
name = "datafusion-catalog"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"arrow",
"async-trait",
@@ -3353,7 +3353,7 @@ dependencies = [
[[package]]
name = "datafusion-catalog-listing"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"arrow",
"async-trait",
@@ -3375,7 +3375,7 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"ahash 0.8.12",
"arrow",
@@ -3398,7 +3398,7 @@ dependencies = [
[[package]]
name = "datafusion-common-runtime"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"futures",
"log",
@@ -3408,7 +3408,7 @@ dependencies = [
[[package]]
name = "datafusion-datasource"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"arrow",
"async-compression 0.4.19",
@@ -3442,7 +3442,7 @@ dependencies = [
[[package]]
name = "datafusion-datasource-csv"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"arrow",
"async-trait",
@@ -3464,7 +3464,7 @@ dependencies = [
[[package]]
name = "datafusion-datasource-json"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"arrow",
"async-trait",
@@ -3485,7 +3485,7 @@ dependencies = [
[[package]]
name = "datafusion-datasource-parquet"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"arrow",
"async-trait",
@@ -3514,12 +3514,12 @@ dependencies = [
[[package]]
name = "datafusion-doc"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
[[package]]
name = "datafusion-execution"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"arrow",
"async-trait",
@@ -3538,7 +3538,7 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"arrow",
"async-trait",
@@ -3560,7 +3560,7 @@ dependencies = [
[[package]]
name = "datafusion-expr-common"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"arrow",
"datafusion-common",
@@ -3572,7 +3572,7 @@ dependencies = [
[[package]]
name = "datafusion-functions"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"arrow",
"arrow-buffer",
@@ -3600,7 +3600,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-aggregate"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"ahash 0.8.12",
"arrow",
@@ -3620,7 +3620,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-aggregate-common"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"ahash 0.8.12",
"arrow",
@@ -3632,7 +3632,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-nested"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"arrow",
"arrow-ord",
@@ -3654,7 +3654,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-table"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"arrow",
"async-trait",
@@ -3669,7 +3669,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-window"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"arrow",
"datafusion-common",
@@ -3686,7 +3686,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-window-common"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"datafusion-common",
"datafusion-physical-expr-common",
@@ -3695,7 +3695,7 @@ dependencies = [
[[package]]
name = "datafusion-macros"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"datafusion-doc",
"quote",
@@ -3705,7 +3705,7 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"arrow",
"chrono",
@@ -3741,9 +3741,9 @@ dependencies = [
[[package]]
name = "datafusion-pg-catalog"
version = "0.12.2"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "755393864c0c2dd95575ceed4b25e348686028e1b83d06f8f39914209999f821"
checksum = "15824c98ff2009c23b0398d441499b147f7c5ac0e5ee993e7a473d79040e3626"
dependencies = [
"async-trait",
"datafusion",
@@ -3756,7 +3756,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"ahash 0.8.12",
"arrow",
@@ -3777,7 +3777,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr-adapter"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"arrow",
"datafusion-common",
@@ -3791,7 +3791,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr-common"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"ahash 0.8.12",
"arrow",
@@ -3804,7 +3804,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-optimizer"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"arrow",
"datafusion-common",
@@ -3822,7 +3822,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-plan"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"ahash 0.8.12",
"arrow",
@@ -3852,7 +3852,7 @@ dependencies = [
[[package]]
name = "datafusion-pruning"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"arrow",
"datafusion-common",
@@ -3868,7 +3868,7 @@ dependencies = [
[[package]]
name = "datafusion-session"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"async-trait",
"datafusion-common",
@@ -3881,7 +3881,7 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"arrow",
"bigdecimal 0.4.8",
@@ -3898,7 +3898,7 @@ dependencies = [
[[package]]
name = "datafusion-substrait"
version = "50.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
dependencies = [
"async-recursion",
"async-trait",
@@ -8362,7 +8362,6 @@ dependencies = [
"common-macro",
"common-telemetry",
"common-test-util",
"derive_builder 0.20.2",
"futures",
"humantime-serde",
"lazy_static",
@@ -9501,7 +9500,6 @@ name = "plugins"
version = "1.0.0-beta.2"
dependencies = [
"auth",
"catalog",
"clap 4.5.40",
"cli",
"common-base",
@@ -9510,7 +9508,6 @@ dependencies = [
"datanode",
"flow",
"frontend",
"meta-client",
"meta-srv",
"serde",
"snafu 0.8.6",
@@ -13067,7 +13064,6 @@ dependencies = [
"loki-proto",
"meta-client",
"meta-srv",
"mito2",
"moka",
"mysql_async",
"object-store",
@@ -13080,6 +13076,7 @@ dependencies = [
"prost 0.13.5",
"query",
"rand 0.9.1",
"regex",
"rstest",
"rstest_reuse",
"sea-query",

View File

@@ -131,7 +131,7 @@ datafusion-functions = "50"
datafusion-functions-aggregate-common = "50"
datafusion-optimizer = "50"
datafusion-orc = "0.5"
datafusion-pg-catalog = "0.12.2"
datafusion-pg-catalog = "0.12.1"
datafusion-physical-expr = "50"
datafusion-physical-plan = "50"
datafusion-sql = "50"
@@ -316,18 +316,18 @@ git = "https://github.com/GreptimeTeam/greptime-meter.git"
rev = "5618e779cf2bb4755b499c630fba4c35e91898cb"
[patch.crates-io]
datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
datafusion-physical-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-physical-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "4b519a5caa95472cc3988f5556813a583dd35af1" } # branch = "v0.58.x"
[profile.release]

View File

@@ -894,7 +894,7 @@ pub fn is_column_type_value_eq(
.unwrap_or(false)
}
fn encode_json_value(value: JsonValue) -> v1::JsonValue {
pub fn encode_json_value(value: JsonValue) -> v1::JsonValue {
fn helper(json: JsonVariant) -> v1::JsonValue {
let value = match json {
JsonVariant::Null => None,

View File

@@ -17,8 +17,8 @@ use std::collections::HashMap;
use arrow_schema::extension::{EXTENSION_TYPE_METADATA_KEY, EXTENSION_TYPE_NAME_KEY};
use datatypes::schema::{
COMMENT_KEY, ColumnDefaultConstraint, ColumnSchema, FULLTEXT_KEY, FulltextAnalyzer,
FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY, SkippingIndexOptions,
SkippingIndexType,
FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, Metadata, SKIPPING_INDEX_KEY,
SkippingIndexOptions, SkippingIndexType,
};
use greptime_proto::v1::{
Analyzer, FulltextBackend as PbFulltextBackend, SkippingIndexType as PbSkippingIndexType,
@@ -131,6 +131,31 @@ pub fn try_as_column_def(column_schema: &ColumnSchema, is_primary_key: bool) ->
})
}
/// Collect the [ColumnOptions] into the [Metadata] that can be used in, for example, [ColumnSchema].
pub fn collect_column_options(column_options: Option<&ColumnOptions>) -> Metadata {
let mut metadata = Metadata::default();
let Some(ColumnOptions { options }) = column_options else {
return metadata;
};
if let Some(v) = options.get(FULLTEXT_GRPC_KEY) {
metadata.insert(FULLTEXT_KEY.to_string(), v.clone());
}
if let Some(v) = options.get(INVERTED_INDEX_GRPC_KEY) {
metadata.insert(INVERTED_INDEX_KEY.to_string(), v.clone());
}
if let Some(v) = options.get(SKIPPING_INDEX_GRPC_KEY) {
metadata.insert(SKIPPING_INDEX_KEY.to_string(), v.clone());
}
if let Some(v) = options.get(EXTENSION_TYPE_NAME_KEY) {
metadata.insert(EXTENSION_TYPE_NAME_KEY.to_string(), v.clone());
}
if let Some(v) = options.get(EXTENSION_TYPE_METADATA_KEY) {
metadata.insert(EXTENSION_TYPE_METADATA_KEY.to_string(), v.clone());
}
metadata
}
/// Constructs a `ColumnOptions` from the given `ColumnSchema`.
pub fn options_from_column_schema(column_schema: &ColumnSchema) -> Option<ColumnOptions> {
let mut options = ColumnOptions::default();

View File

@@ -211,7 +211,6 @@ struct InformationSchemaPartitionsBuilder {
partition_names: StringVectorBuilder,
partition_ordinal_positions: Int64VectorBuilder,
partition_expressions: StringVectorBuilder,
partition_descriptions: StringVectorBuilder,
create_times: TimestampSecondVectorBuilder,
partition_ids: UInt64VectorBuilder,
}
@@ -232,7 +231,6 @@ impl InformationSchemaPartitionsBuilder {
partition_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
partition_ordinal_positions: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
partition_expressions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
partition_descriptions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
create_times: TimestampSecondVectorBuilder::with_capacity(INIT_CAPACITY),
partition_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
}
@@ -321,21 +319,6 @@ impl InformationSchemaPartitionsBuilder {
return;
}
// Get partition column names (shared by all partitions)
// In MySQL, PARTITION_EXPRESSION is the partitioning function expression (e.g., column name)
let partition_columns: String = table_info
.meta
.partition_column_names()
.cloned()
.collect::<Vec<_>>()
.join(", ");
let partition_expr_str = if partition_columns.is_empty() {
None
} else {
Some(partition_columns)
};
for (index, partition) in partitions.iter().enumerate() {
let partition_name = format!("p{index}");
@@ -345,12 +328,8 @@ impl InformationSchemaPartitionsBuilder {
self.partition_names.push(Some(&partition_name));
self.partition_ordinal_positions
.push(Some((index + 1) as i64));
// PARTITION_EXPRESSION: partition column names (same for all partitions)
self.partition_expressions
.push(partition_expr_str.as_deref());
// PARTITION_DESCRIPTION: partition boundary expression (different for each partition)
let description = partition.partition_expr.as_ref().map(|e| e.to_string());
self.partition_descriptions.push(description.as_deref());
let expression = partition.partition_expr.as_ref().map(|e| e.to_string());
self.partition_expressions.push(expression.as_deref());
self.create_times.push(Some(TimestampSecond::from(
table_info.meta.created_on.timestamp(),
)));
@@ -390,7 +369,7 @@ impl InformationSchemaPartitionsBuilder {
null_string_vector.clone(),
Arc::new(self.partition_expressions.finish()),
null_string_vector.clone(),
Arc::new(self.partition_descriptions.finish()),
null_string_vector.clone(),
// TODO(dennis): rows and index statistics info
null_i64_vector.clone(),
null_i64_vector.clone(),

View File

@@ -18,6 +18,7 @@ use std::sync::Arc;
use std::time::Duration;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::CatalogManagerRef;
use catalog::information_extension::DistributedInformationExtension;
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend};
use clap::Parser;
@@ -25,12 +26,14 @@ use client::client_manager::NodeClients;
use common_base::Plugins;
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_grpc::channel_manager::ChannelConfig;
use common_meta::FlownodeId;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::key::TableMetadataManager;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_stat::ResourceStatImpl;
use common_telemetry::info;
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
@@ -40,7 +43,6 @@ use flow::{
get_flow_auth_options,
};
use meta_client::{MetaClientOptions, MetaClientType};
use plugins::flownode::context::GrpcConfigureContext;
use servers::configurator::GrpcBuilderConfiguratorRef;
use snafu::{OptionExt, ResultExt, ensure};
use tracing_appender::non_blocking::WorkerGuard;
@@ -433,3 +435,11 @@ impl StartCommand {
Ok(Instance::new(flownode, guard))
}
}
/// The context for [`GrpcBuilderConfiguratorRef`] in flownode.
pub struct GrpcConfigureContext {
pub kv_backend: KvBackendRef,
pub fe_client: Arc<FrontendClient>,
pub flownode_id: FlownodeId,
pub catalog_manager: CatalogManagerRef,
}

View File

@@ -45,10 +45,7 @@ use frontend::frontend::Frontend;
use frontend::heartbeat::HeartbeatTask;
use frontend::instance::builder::FrontendBuilder;
use frontend::server::Services;
use meta_client::{MetaClientOptions, MetaClientType};
use plugins::frontend::context::{
CatalogManagerConfigureContext, DistributedCatalogManagerConfigureContext,
};
use meta_client::{MetaClientOptions, MetaClientRef, MetaClientType};
use servers::addrs;
use servers::grpc::GrpcOptions;
use servers::tls::{TlsMode, TlsOption};
@@ -426,11 +423,9 @@ impl StartCommand {
let builder = if let Some(configurator) =
plugins.get::<CatalogManagerConfiguratorRef<CatalogManagerConfigureContext>>()
{
let ctx = DistributedCatalogManagerConfigureContext {
let ctx = CatalogManagerConfigureContext {
meta_client: meta_client.clone(),
};
let ctx = CatalogManagerConfigureContext::Distributed(ctx);
configurator
.configure(builder, ctx)
.await
@@ -487,6 +482,11 @@ impl StartCommand {
}
}
/// The context for [`CatalogManagerConfigratorRef`] in frontend.
pub struct CatalogManagerConfigureContext {
pub meta_client: MetaClientRef,
}
#[cfg(test)]
mod tests {
use std::io::Write;

View File

@@ -32,7 +32,7 @@ use common_meta::cache::LayeredCacheRegistryBuilder;
use common_meta::ddl::flow_meta::FlowMetadataAllocator;
use common_meta::ddl::table_meta::TableMetadataAllocator;
use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef};
use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef, DdlManagerConfigureContext};
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
@@ -58,10 +58,6 @@ use frontend::instance::StandaloneDatanodeManager;
use frontend::instance::builder::FrontendBuilder;
use frontend::server::Services;
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
use plugins::frontend::context::{
CatalogManagerConfigureContext, StandaloneCatalogManagerConfigureContext,
};
use plugins::standalone::context::DdlManagerConfigureContext;
use servers::tls::{TlsMode, TlsOption};
use snafu::ResultExt;
use standalone::StandaloneInformationExtension;
@@ -418,10 +414,9 @@ impl StartCommand {
let builder = if let Some(configurator) =
plugins.get::<CatalogManagerConfiguratorRef<CatalogManagerConfigureContext>>()
{
let ctx = StandaloneCatalogManagerConfigureContext {
let ctx = CatalogManagerConfigureContext {
fe_client: frontend_client.clone(),
};
let ctx = CatalogManagerConfigureContext::Standalone(ctx);
configurator
.configure(builder, ctx)
.await
@@ -511,13 +506,9 @@ impl StartCommand {
let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager.clone(), true)
.context(error::InitDdlManagerSnafu)?;
let ddl_manager = if let Some(configurator) =
plugins.get::<DdlManagerConfiguratorRef<DdlManagerConfigureContext>>()
{
let ddl_manager = if let Some(configurator) = plugins.get::<DdlManagerConfiguratorRef>() {
let ctx = DdlManagerConfigureContext {
kv_backend: kv_backend.clone(),
fe_client: frontend_client.clone(),
catalog_manager: catalog_manager.clone(),
};
configurator
.configure(ddl_manager, ctx)
@@ -604,6 +595,11 @@ impl StartCommand {
}
}
/// The context for [`CatalogManagerConfigratorRef`] in standalone.
pub struct CatalogManagerConfigureContext {
pub fe_client: Arc<FrontendClient>,
}
#[cfg(test)]
mod tests {
use std::default::Default;

View File

@@ -32,12 +32,7 @@ impl Plugins {
pub fn insert<T: 'static + Send + Sync>(&self, value: T) {
let last = self.write().insert(value);
if last.is_some() {
panic!(
"Plugin of type {} already exists",
std::any::type_name::<T>()
);
}
assert!(last.is_none(), "each type of plugins must be one and only");
}
pub fn get<T: 'static + Send + Sync + Clone>(&self) -> Option<T> {
@@ -145,7 +140,7 @@ mod tests {
}
#[test]
#[should_panic(expected = "Plugin of type i32 already exists")]
#[should_panic(expected = "each type of plugins must be one and only")]
fn test_plugin_uniqueness() {
let plugins = Plugins::new();
plugins.insert(1i32);

View File

@@ -14,7 +14,6 @@
mod binary;
mod ctx;
mod if_func;
mod is_null;
mod unary;
@@ -23,7 +22,6 @@ pub use ctx::EvalContext;
pub use unary::scalar_unary_op;
use crate::function_registry::FunctionRegistry;
use crate::scalars::expression::if_func::IfFunction;
use crate::scalars::expression::is_null::IsNullFunction;
pub(crate) struct ExpressionFunction;
@@ -31,6 +29,5 @@ pub(crate) struct ExpressionFunction;
impl ExpressionFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(IsNullFunction::default());
registry.register_scalar(IfFunction::default());
}
}

View File

@@ -1,404 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use std::fmt::Display;
use arrow::array::ArrowNativeTypeOp;
use arrow::datatypes::ArrowPrimitiveType;
use datafusion::arrow::array::{Array, ArrayRef, AsArray, BooleanArray, PrimitiveArray};
use datafusion::arrow::compute::kernels::zip::zip;
use datafusion::arrow::datatypes::DataType;
use datafusion_common::DataFusionError;
use datafusion_expr::type_coercion::binary::comparison_coercion;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
use crate::function::Function;
const NAME: &str = "if";
/// MySQL-compatible IF function: IF(condition, true_value, false_value)
///
/// Returns true_value if condition is TRUE (not NULL and not 0),
/// otherwise returns false_value.
///
/// MySQL truthy rules:
/// - NULL -> false
/// - 0 (numeric zero) -> false
/// - Any non-zero numeric -> true
/// - Boolean true/false -> use directly
#[derive(Clone, Debug)]
pub struct IfFunction {
signature: Signature,
}
impl Default for IfFunction {
fn default() -> Self {
Self {
signature: Signature::any(3, Volatility::Immutable),
}
}
}
impl Display for IfFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", NAME.to_ascii_uppercase())
}
}
impl Function for IfFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, input_types: &[DataType]) -> datafusion_common::Result<DataType> {
// Return the common type of true_value and false_value (args[1] and args[2])
if input_types.len() < 3 {
return Err(DataFusionError::Plan(format!(
"{} requires 3 arguments, got {}",
NAME,
input_types.len()
)));
}
let true_type = &input_types[1];
let false_type = &input_types[2];
// Use comparison_coercion to find common type
comparison_coercion(true_type, false_type).ok_or_else(|| {
DataFusionError::Plan(format!(
"Cannot find common type for IF function between {:?} and {:?}",
true_type, false_type
))
})
}
fn signature(&self) -> &Signature {
&self.signature
}
fn invoke_with_args(
&self,
args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
if args.args.len() != 3 {
return Err(DataFusionError::Plan(format!(
"{} requires exactly 3 arguments, got {}",
NAME,
args.args.len()
)));
}
let condition = &args.args[0];
let true_value = &args.args[1];
let false_value = &args.args[2];
// Convert condition to boolean array using MySQL truthy rules
let bool_array = to_boolean_array(condition, args.number_rows)?;
// Convert true and false values to arrays
let true_array = true_value.to_array(args.number_rows)?;
let false_array = false_value.to_array(args.number_rows)?;
// Use zip to select values based on condition
// zip expects &dyn Datum, and ArrayRef (Arc<dyn Array>) implements Datum
let result = zip(&bool_array, &true_array, &false_array)?;
Ok(ColumnarValue::Array(result))
}
}
/// Convert a ColumnarValue to a BooleanArray using MySQL truthy rules:
/// - NULL -> false
/// - 0 (any numeric zero) -> false
/// - Non-zero numeric -> true
/// - Boolean -> use directly
fn to_boolean_array(
value: &ColumnarValue,
num_rows: usize,
) -> datafusion_common::Result<BooleanArray> {
let array = value.to_array(num_rows)?;
array_to_bool(array)
}
/// Convert an integer PrimitiveArray to BooleanArray using MySQL truthy rules:
/// NULL -> false, 0 -> false, non-zero -> true
fn int_array_to_bool<T>(array: &PrimitiveArray<T>) -> BooleanArray
where
T: ArrowPrimitiveType,
T::Native: ArrowNativeTypeOp,
{
BooleanArray::from_iter(
array
.iter()
.map(|opt| Some(opt.is_some_and(|v| !v.is_zero()))),
)
}
/// Convert a float PrimitiveArray to BooleanArray using MySQL truthy rules:
/// NULL -> false, 0 (including -0.0) -> false, NaN -> true, other non-zero -> true
fn float_array_to_bool<T>(array: &PrimitiveArray<T>) -> BooleanArray
where
T: ArrowPrimitiveType,
T::Native: ArrowNativeTypeOp + num_traits::Float,
{
use num_traits::Float;
BooleanArray::from_iter(
array
.iter()
.map(|opt| Some(opt.is_some_and(|v| v.is_nan() || !v.is_zero()))),
)
}
/// Convert an Array to BooleanArray using MySQL truthy rules
fn array_to_bool(array: ArrayRef) -> datafusion_common::Result<BooleanArray> {
use arrow::datatypes::*;
match array.data_type() {
DataType::Boolean => {
let bool_array = array.as_boolean();
Ok(BooleanArray::from_iter(
bool_array.iter().map(|opt| Some(opt.unwrap_or(false))),
))
}
DataType::Int8 => Ok(int_array_to_bool(array.as_primitive::<Int8Type>())),
DataType::Int16 => Ok(int_array_to_bool(array.as_primitive::<Int16Type>())),
DataType::Int32 => Ok(int_array_to_bool(array.as_primitive::<Int32Type>())),
DataType::Int64 => Ok(int_array_to_bool(array.as_primitive::<Int64Type>())),
DataType::UInt8 => Ok(int_array_to_bool(array.as_primitive::<UInt8Type>())),
DataType::UInt16 => Ok(int_array_to_bool(array.as_primitive::<UInt16Type>())),
DataType::UInt32 => Ok(int_array_to_bool(array.as_primitive::<UInt32Type>())),
DataType::UInt64 => Ok(int_array_to_bool(array.as_primitive::<UInt64Type>())),
// Float16 needs special handling since half::f16 doesn't implement num_traits::Float
DataType::Float16 => {
let typed_array = array.as_primitive::<Float16Type>();
Ok(BooleanArray::from_iter(typed_array.iter().map(|opt| {
Some(opt.is_some_and(|v| {
let f = v.to_f32();
f.is_nan() || !f.is_zero()
}))
})))
}
DataType::Float32 => Ok(float_array_to_bool(array.as_primitive::<Float32Type>())),
DataType::Float64 => Ok(float_array_to_bool(array.as_primitive::<Float64Type>())),
// Null type is always false.
// Note: NullArray::is_null() returns false (physical null), so we must handle it explicitly.
// See: https://github.com/apache/arrow-rs/issues/4840
DataType::Null => Ok(BooleanArray::from(vec![false; array.len()])),
// For other types, treat non-null as true
_ => {
let len = array.len();
Ok(BooleanArray::from_iter(
(0..len).map(|i| Some(!array.is_null(i))),
))
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow_schema::Field;
use datafusion_common::ScalarValue;
use datafusion_common::arrow::array::{AsArray, Int32Array, StringArray};
use super::*;
#[test]
fn test_if_function_basic() {
let if_func = IfFunction::default();
assert_eq!("if", if_func.name());
// Test IF(true, 'yes', 'no') -> 'yes'
let result = if_func
.invoke_with_args(ScalarFunctionArgs {
args: vec![
ColumnarValue::Scalar(ScalarValue::Boolean(Some(true))),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("yes".to_string()))),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("no".to_string()))),
],
arg_fields: vec![],
number_rows: 1,
return_field: Arc::new(Field::new("", DataType::Utf8, true)),
config_options: Arc::new(Default::default()),
})
.unwrap();
if let ColumnarValue::Array(arr) = result {
let str_arr = arr.as_string::<i32>();
assert_eq!(str_arr.value(0), "yes");
} else {
panic!("Expected Array result");
}
}
#[test]
fn test_if_function_false() {
let if_func = IfFunction::default();
// Test IF(false, 'yes', 'no') -> 'no'
let result = if_func
.invoke_with_args(ScalarFunctionArgs {
args: vec![
ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("yes".to_string()))),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("no".to_string()))),
],
arg_fields: vec![],
number_rows: 1,
return_field: Arc::new(Field::new("", DataType::Utf8, true)),
config_options: Arc::new(Default::default()),
})
.unwrap();
if let ColumnarValue::Array(arr) = result {
let str_arr = arr.as_string::<i32>();
assert_eq!(str_arr.value(0), "no");
} else {
panic!("Expected Array result");
}
}
#[test]
fn test_if_function_null_is_false() {
let if_func = IfFunction::default();
// Test IF(NULL, 'yes', 'no') -> 'no' (NULL is treated as false)
// Using Boolean(None) - typed null
let result = if_func
.invoke_with_args(ScalarFunctionArgs {
args: vec![
ColumnarValue::Scalar(ScalarValue::Boolean(None)),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("yes".to_string()))),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("no".to_string()))),
],
arg_fields: vec![],
number_rows: 1,
return_field: Arc::new(Field::new("", DataType::Utf8, true)),
config_options: Arc::new(Default::default()),
})
.unwrap();
if let ColumnarValue::Array(arr) = result {
let str_arr = arr.as_string::<i32>();
assert_eq!(str_arr.value(0), "no");
} else {
panic!("Expected Array result");
}
// Test IF(NULL, 'yes', 'no') -> 'no' using ScalarValue::Null (untyped null from SQL NULL literal)
let result = if_func
.invoke_with_args(ScalarFunctionArgs {
args: vec![
ColumnarValue::Scalar(ScalarValue::Null),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("yes".to_string()))),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("no".to_string()))),
],
arg_fields: vec![],
number_rows: 1,
return_field: Arc::new(Field::new("", DataType::Utf8, true)),
config_options: Arc::new(Default::default()),
})
.unwrap();
if let ColumnarValue::Array(arr) = result {
let str_arr = arr.as_string::<i32>();
assert_eq!(str_arr.value(0), "no");
} else {
panic!("Expected Array result");
}
}
#[test]
fn test_if_function_numeric_truthy() {
let if_func = IfFunction::default();
// Test IF(1, 'yes', 'no') -> 'yes' (non-zero is true)
let result = if_func
.invoke_with_args(ScalarFunctionArgs {
args: vec![
ColumnarValue::Scalar(ScalarValue::Int32(Some(1))),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("yes".to_string()))),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("no".to_string()))),
],
arg_fields: vec![],
number_rows: 1,
return_field: Arc::new(Field::new("", DataType::Utf8, true)),
config_options: Arc::new(Default::default()),
})
.unwrap();
if let ColumnarValue::Array(arr) = result {
let str_arr = arr.as_string::<i32>();
assert_eq!(str_arr.value(0), "yes");
} else {
panic!("Expected Array result");
}
// Test IF(0, 'yes', 'no') -> 'no' (zero is false)
let result = if_func
.invoke_with_args(ScalarFunctionArgs {
args: vec![
ColumnarValue::Scalar(ScalarValue::Int32(Some(0))),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("yes".to_string()))),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("no".to_string()))),
],
arg_fields: vec![],
number_rows: 1,
return_field: Arc::new(Field::new("", DataType::Utf8, true)),
config_options: Arc::new(Default::default()),
})
.unwrap();
if let ColumnarValue::Array(arr) = result {
let str_arr = arr.as_string::<i32>();
assert_eq!(str_arr.value(0), "no");
} else {
panic!("Expected Array result");
}
}
#[test]
fn test_if_function_with_arrays() {
let if_func = IfFunction::default();
// Test with array condition
let condition = Int32Array::from(vec![Some(1), Some(0), None, Some(5)]);
let true_val = StringArray::from(vec!["yes", "yes", "yes", "yes"]);
let false_val = StringArray::from(vec!["no", "no", "no", "no"]);
let result = if_func
.invoke_with_args(ScalarFunctionArgs {
args: vec![
ColumnarValue::Array(Arc::new(condition)),
ColumnarValue::Array(Arc::new(true_val)),
ColumnarValue::Array(Arc::new(false_val)),
],
arg_fields: vec![],
number_rows: 4,
return_field: Arc::new(Field::new("", DataType::Utf8, true)),
config_options: Arc::new(Default::default()),
})
.unwrap();
if let ColumnarValue::Array(arr) = result {
let str_arr = arr.as_string::<i32>();
assert_eq!(str_arr.value(0), "yes"); // 1 is true
assert_eq!(str_arr.value(1), "no"); // 0 is false
assert_eq!(str_arr.value(2), "no"); // NULL is false
assert_eq!(str_arr.value(3), "yes"); // 5 is true
} else {
panic!("Expected Array result");
}
}
}

View File

@@ -17,7 +17,7 @@ use std::sync::Arc;
use common_catalog::consts::{
DEFAULT_PRIVATE_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME,
};
use datafusion::arrow::array::{ArrayRef, StringArray, StringBuilder, as_boolean_array};
use datafusion::arrow::array::{ArrayRef, StringArray, as_boolean_array};
use datafusion::catalog::TableFunction;
use datafusion::common::ScalarValue;
use datafusion::common::utils::SingleRowListArrayBuilder;
@@ -34,15 +34,10 @@ const CURRENT_SCHEMA_FUNCTION_NAME: &str = "current_schema";
const CURRENT_SCHEMAS_FUNCTION_NAME: &str = "current_schemas";
const SESSION_USER_FUNCTION_NAME: &str = "session_user";
const CURRENT_DATABASE_FUNCTION_NAME: &str = "current_database";
const OBJ_DESCRIPTION_FUNCTION_NAME: &str = "obj_description";
const COL_DESCRIPTION_FUNCTION_NAME: &str = "col_description";
const SHOBJ_DESCRIPTION_FUNCTION_NAME: &str = "shobj_description";
const PG_MY_TEMP_SCHEMA_FUNCTION_NAME: &str = "pg_my_temp_schema";
define_nullary_udf!(CurrentSchemaFunction);
define_nullary_udf!(SessionUserFunction);
define_nullary_udf!(CurrentDatabaseFunction);
define_nullary_udf!(PgMyTempSchemaFunction);
impl Function for CurrentDatabaseFunction {
fn name(&self) -> &str {
@@ -178,175 +173,6 @@ impl Function for CurrentSchemasFunction {
}
}
/// PostgreSQL obj_description - returns NULL for compatibility
#[derive(Display, Debug, Clone)]
#[display("{}", self.name())]
pub(super) struct ObjDescriptionFunction {
signature: Signature,
}
impl ObjDescriptionFunction {
pub fn new() -> Self {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Exact(vec![DataType::Int64, DataType::Utf8]),
TypeSignature::Exact(vec![DataType::UInt32, DataType::Utf8]),
TypeSignature::Exact(vec![DataType::Int64]),
TypeSignature::Exact(vec![DataType::UInt32]),
],
Volatility::Stable,
),
}
}
}
impl Function for ObjDescriptionFunction {
fn name(&self) -> &str {
OBJ_DESCRIPTION_FUNCTION_NAME
}
fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
Ok(DataType::Utf8)
}
fn signature(&self) -> &Signature {
&self.signature
}
fn invoke_with_args(
&self,
args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
let num_rows = args.number_rows;
let mut builder = StringBuilder::with_capacity(num_rows, 0);
for _ in 0..num_rows {
builder.append_null();
}
Ok(ColumnarValue::Array(Arc::new(builder.finish())))
}
}
/// PostgreSQL col_description - returns NULL for compatibility
#[derive(Display, Debug, Clone)]
#[display("{}", self.name())]
pub(super) struct ColDescriptionFunction {
signature: Signature,
}
impl ColDescriptionFunction {
pub fn new() -> Self {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Exact(vec![DataType::Int64, DataType::Int32]),
TypeSignature::Exact(vec![DataType::UInt32, DataType::Int32]),
TypeSignature::Exact(vec![DataType::Int64, DataType::Int64]),
TypeSignature::Exact(vec![DataType::UInt32, DataType::Int64]),
],
Volatility::Stable,
),
}
}
}
impl Function for ColDescriptionFunction {
fn name(&self) -> &str {
COL_DESCRIPTION_FUNCTION_NAME
}
fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
Ok(DataType::Utf8)
}
fn signature(&self) -> &Signature {
&self.signature
}
fn invoke_with_args(
&self,
args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
let num_rows = args.number_rows;
let mut builder = StringBuilder::with_capacity(num_rows, 0);
for _ in 0..num_rows {
builder.append_null();
}
Ok(ColumnarValue::Array(Arc::new(builder.finish())))
}
}
/// PostgreSQL shobj_description - returns NULL for compatibility
#[derive(Display, Debug, Clone)]
#[display("{}", self.name())]
pub(super) struct ShobjDescriptionFunction {
signature: Signature,
}
impl ShobjDescriptionFunction {
pub fn new() -> Self {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Exact(vec![DataType::Int64, DataType::Utf8]),
TypeSignature::Exact(vec![DataType::UInt64, DataType::Utf8]),
TypeSignature::Exact(vec![DataType::Int32, DataType::Utf8]),
TypeSignature::Exact(vec![DataType::UInt32, DataType::Utf8]),
],
Volatility::Stable,
),
}
}
}
impl Function for ShobjDescriptionFunction {
fn name(&self) -> &str {
SHOBJ_DESCRIPTION_FUNCTION_NAME
}
fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
Ok(DataType::Utf8)
}
fn signature(&self) -> &Signature {
&self.signature
}
fn invoke_with_args(
&self,
args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
let num_rows = args.number_rows;
let mut builder = StringBuilder::with_capacity(num_rows, 0);
for _ in 0..num_rows {
builder.append_null();
}
Ok(ColumnarValue::Array(Arc::new(builder.finish())))
}
}
/// PostgreSQL pg_my_temp_schema - returns 0 (no temp schema) for compatibility
impl Function for PgMyTempSchemaFunction {
fn name(&self) -> &str {
PG_MY_TEMP_SCHEMA_FUNCTION_NAME
}
fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
Ok(DataType::UInt32)
}
fn signature(&self) -> &Signature {
&self.signature
}
fn invoke_with_args(
&self,
_args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
Ok(ColumnarValue::Scalar(ScalarValue::UInt32(Some(0))))
}
}
pub(super) struct PGCatalogFunction;
impl PGCatalogFunction {
@@ -386,98 +212,5 @@ impl PGCatalogFunction {
registry.register(pg_catalog::create_pg_total_relation_size_udf());
registry.register(pg_catalog::create_pg_stat_get_numscans());
registry.register(pg_catalog::create_pg_get_constraintdef());
registry.register(pg_catalog::create_pg_get_partition_ancestors_udf());
registry.register_scalar(ObjDescriptionFunction::new());
registry.register_scalar(ColDescriptionFunction::new());
registry.register_scalar(ShobjDescriptionFunction::new());
registry.register_scalar(PgMyTempSchemaFunction::default());
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow_schema::Field;
use datafusion::arrow::array::Array;
use datafusion_common::ScalarValue;
use datafusion_expr::ColumnarValue;
use super::*;
fn create_test_args(args: Vec<ColumnarValue>, number_rows: usize) -> ScalarFunctionArgs {
ScalarFunctionArgs {
args,
arg_fields: vec![],
number_rows,
return_field: Arc::new(Field::new("result", DataType::Utf8, true)),
config_options: Arc::new(Default::default()),
}
}
#[test]
fn test_obj_description_function() {
let func = ObjDescriptionFunction::new();
assert_eq!("obj_description", func.name());
assert_eq!(DataType::Utf8, func.return_type(&[]).unwrap());
let args = create_test_args(
vec![
ColumnarValue::Scalar(ScalarValue::Int64(Some(1234))),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("pg_class".to_string()))),
],
1,
);
let result = func.invoke_with_args(args).unwrap();
if let ColumnarValue::Array(arr) = result {
assert_eq!(1, arr.len());
assert!(arr.is_null(0));
} else {
panic!("Expected Array result");
}
}
#[test]
fn test_col_description_function() {
let func = ColDescriptionFunction::new();
assert_eq!("col_description", func.name());
assert_eq!(DataType::Utf8, func.return_type(&[]).unwrap());
let args = create_test_args(
vec![
ColumnarValue::Scalar(ScalarValue::Int64(Some(1234))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(1))),
],
1,
);
let result = func.invoke_with_args(args).unwrap();
if let ColumnarValue::Array(arr) = result {
assert_eq!(1, arr.len());
assert!(arr.is_null(0));
} else {
panic!("Expected Array result");
}
}
#[test]
fn test_shobj_description_function() {
let func = ShobjDescriptionFunction::new();
assert_eq!("shobj_description", func.name());
assert_eq!(DataType::Utf8, func.return_type(&[]).unwrap());
let args = create_test_args(
vec![
ColumnarValue::Scalar(ScalarValue::Int64(Some(1))),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("pg_database".to_string()))),
],
1,
);
let result = func.invoke_with_args(args).unwrap();
if let ColumnarValue::Array(arr) = result {
assert_eq!(1, arr.len());
assert!(arr.is_null(0));
} else {
panic!("Expected Array result");
}
}
}

View File

@@ -46,6 +46,7 @@ use crate::error::{
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use crate::kv_backend::KvBackendRef;
use crate::procedure_executor::ExecutorContext;
#[cfg(feature = "enterprise")]
use crate::rpc::ddl::DdlTask::CreateTrigger;
@@ -69,16 +70,20 @@ use crate::rpc::router::RegionRoute;
/// A configurator that customizes or enhances a [`DdlManager`].
#[async_trait::async_trait]
pub trait DdlManagerConfigurator<C>: Send + Sync {
pub trait DdlManagerConfigurator: Send + Sync {
/// Configures the given [`DdlManager`] using the provided [`DdlManagerConfigureContext`].
async fn configure(
&self,
ddl_manager: DdlManager,
ctx: C,
ctx: DdlManagerConfigureContext,
) -> std::result::Result<DdlManager, BoxedError>;
}
pub type DdlManagerConfiguratorRef<C> = Arc<dyn DdlManagerConfigurator<C>>;
pub type DdlManagerConfiguratorRef = Arc<dyn DdlManagerConfigurator>;
pub struct DdlManagerConfigureContext {
pub kv_backend: KvBackendRef,
}
pub type DdlManagerRef = Arc<DdlManager>;

View File

@@ -339,16 +339,6 @@ pub struct FlushRegions {
pub error_strategy: FlushErrorStrategy,
}
impl Display for FlushRegions {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"FlushRegions(region_ids={:?}, strategy={:?}, error_strategy={:?})",
self.region_ids, self.strategy, self.error_strategy
)
}
}
impl FlushRegions {
/// Create synchronous single-region flush
pub fn sync_single(region_id: RegionId) -> Self {

View File

@@ -246,6 +246,14 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Loader for {type_name} is not implemented: {reason}"))]
ProcedureLoaderNotImplemented {
#[snafu(implicit)]
location: Location,
type_name: String,
reason: String,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -266,7 +274,8 @@ impl ErrorExt for Error {
Error::ToJson { .. }
| Error::DeleteState { .. }
| Error::FromJson { .. }
| Error::WaitWatcher { .. } => StatusCode::Internal,
| Error::WaitWatcher { .. }
| Error::ProcedureLoaderNotImplemented { .. } => StatusCode::Internal,
Error::RetryTimesExceeded { .. }
| Error::RollbackTimesExceeded { .. }

View File

@@ -320,15 +320,4 @@ mod tests {
assert!(flush_reply.results[0].1.is_ok());
assert!(flush_reply.results[1].1.is_err());
}
#[test]
fn test_flush_regions_display() {
let region_id = RegionId::new(1024, 1);
let flush_regions = FlushRegions::sync_single(region_id);
let display = format!("{}", flush_regions);
assert_eq!(
display,
"FlushRegions(region_ids=[4398046511105(1024, 1)], strategy=Sync, error_strategy=FailFast)"
);
}
}

View File

@@ -1200,8 +1200,7 @@ impl RegionServerInner {
| RegionRequest::Flush(_)
| RegionRequest::Compact(_)
| RegionRequest::Truncate(_)
| RegionRequest::BuildIndex(_)
| RegionRequest::EnterStaging(_) => RegionChange::None,
| RegionRequest::BuildIndex(_) => RegionChange::None,
RegionRequest::Catchup(_) => RegionChange::Catchup,
};

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use std::collections::BTreeMap;
use std::fmt::{Display, Formatter};
use std::fmt::{Debug, Display, Formatter};
use std::str::FromStr;
use std::sync::Arc;
@@ -133,28 +133,24 @@ impl From<&ConcreteDataType> for JsonNativeType {
impl Display for JsonNativeType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
JsonNativeType::Null => write!(f, "Null"),
JsonNativeType::Bool => write!(f, "Bool"),
JsonNativeType::Number(t) => {
write!(f, "Number({t:?})")
}
JsonNativeType::String => write!(f, "String"),
JsonNativeType::Array(item_type) => {
write!(f, "Array[{}]", item_type)
}
JsonNativeType::Object(object) => {
write!(
f,
"Object{{{}}}",
fn to_serde_value(t: &JsonNativeType) -> serde_json::Value {
match t {
JsonNativeType::Null => serde_json::Value::String("<Null>".to_string()),
JsonNativeType::Bool => serde_json::Value::String("<Bool>".to_string()),
JsonNativeType::Number(_) => serde_json::Value::String("<Number>".to_string()),
JsonNativeType::String => serde_json::Value::String("<String>".to_string()),
JsonNativeType::Array(item_type) => {
serde_json::Value::Array(vec![to_serde_value(item_type)])
}
JsonNativeType::Object(object) => serde_json::Value::Object(
object
.iter()
.map(|(k, v)| format!(r#""{k}": {v}"#))
.collect::<Vec<_>>()
.join(", ")
)
.map(|(k, v)| (k.clone(), to_serde_value(v)))
.collect(),
),
}
}
write!(f, "{}", to_serde_value(self))
}
}
@@ -183,7 +179,11 @@ impl JsonType {
}
}
pub(crate) fn native_type(&self) -> &JsonNativeType {
pub fn is_native_type(&self) -> bool {
matches!(self.format, JsonFormat::Native(_))
}
pub fn native_type(&self) -> &JsonNativeType {
match &self.format {
JsonFormat::Jsonb => &JsonNativeType::String,
JsonFormat::Native(x) => x.as_ref(),

View File

@@ -28,10 +28,9 @@ mod procedure;
mod scheduler;
mod tracker;
pub use options::GcSchedulerOptions;
pub use procedure::BatchGcProcedure;
pub(crate) use options::GcSchedulerOptions;
pub(crate) use scheduler::{GcScheduler, GcTickerRef};
pub type Region2Peers = HashMap<RegionId, (Peer, Vec<Peer>)>;
pub(crate) type Region2Peers = HashMap<RegionId, (Peer, Vec<Peer>)>;
pub(crate) type Peer2Regions = HashMap<Peer, HashSet<RegionId>>;

View File

@@ -84,6 +84,44 @@ impl DefaultGcSchedulerCtx {
mailbox: MailboxRef,
server_addr: String,
) -> Result<Self> {
// register a noop loader for `GcRegionProcedure` to avoid error when deserializing procedure when rebooting
procedure_manager
.register_loader(
GcRegionProcedure::TYPE_NAME,
Box::new(move |json| {
common_procedure::error::ProcedureLoaderNotImplementedSnafu {
type_name: GcRegionProcedure::TYPE_NAME.to_string(),
reason:
"GC procedure should be retried by scheduler, not reloaded from storage"
.to_string(),
}
.fail()
}),
)
.context(error::RegisterProcedureLoaderSnafu {
type_name: GcRegionProcedure::TYPE_NAME,
})?;
// register a noop loader for `BatchGcProcedure` to avoid error when deserializing procedure when rebooting
procedure_manager
.register_loader(
BatchGcProcedure::TYPE_NAME,
Box::new(move |json| {
common_procedure::error::ProcedureLoaderNotImplementedSnafu {
type_name: BatchGcProcedure::TYPE_NAME.to_string(),
reason:
"Batch GC procedure should not be reloaded from storage, as it doesn't need to be retried if interrupted"
.to_string(),
}
.fail()
}),
)
.context(error::RegisterProcedureLoaderSnafu {
type_name: BatchGcProcedure::TYPE_NAME,
})?;
Ok(Self {
table_metadata_manager,
procedure_manager,

View File

@@ -28,7 +28,7 @@ use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocato
use common_meta::ddl::{
DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef,
};
use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef};
use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef, DdlManagerConfigureContext};
use common_meta::distributed_time_constants::{self};
use common_meta::key::TableMetadataManager;
use common_meta::key::flow::FlowMetadataManager;
@@ -405,11 +405,10 @@ impl MetasrvBuilder {
let ddl_manager = if let Some(configurator) = plugins
.as_ref()
.and_then(|p| p.get::<DdlManagerConfiguratorRef<DdlManagerConfigureContext>>())
.and_then(|p| p.get::<DdlManagerConfiguratorRef>())
{
let ctx = DdlManagerConfigureContext {
kv_backend: kv_backend.clone(),
meta_peer_client: meta_peer_client.clone(),
};
configurator
.configure(ddl_manager, ctx)
@@ -638,9 +637,3 @@ impl Default for MetasrvBuilder {
Self::new()
}
}
/// The context for [`DdlManagerConfiguratorRef`].
pub struct DdlManagerConfigureContext {
pub kv_backend: KvBackendRef,
pub meta_peer_client: MetaPeerClientRef,
}

View File

@@ -13,16 +13,11 @@
// limitations under the License.
pub(crate) mod repartition_start;
pub(crate) mod update_metadata;
use std::any::Any;
use std::fmt::Debug;
use common_error::ext::BoxedError;
use common_meta::DatanodeId;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::instruction::CacheIdent;
use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue, RegionInfo};
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use common_meta::rpc::router::RegionRoute;
@@ -42,8 +37,6 @@ pub struct RepartitionGroupProcedure {}
pub struct Context {
pub persistent_ctx: PersistentContext,
pub cache_invalidator: CacheInvalidatorRef,
pub table_metadata_manager: TableMetadataManagerRef,
}
@@ -52,7 +45,6 @@ pub struct GroupPrepareResult {
pub source_routes: Vec<RegionRoute>,
pub target_routes: Vec<RegionRoute>,
pub central_region: RegionId,
pub central_region_datanode_id: DatanodeId,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
@@ -99,109 +91,6 @@ impl Context {
Ok(table_route_value)
}
/// Returns the `datanode_table_value`
///
/// Retry:
/// - Failed to retrieve the metadata of datanode table.
pub async fn get_datanode_table_value(
&self,
table_id: TableId,
datanode_id: u64,
) -> Result<DatanodeTableValue> {
let datanode_table_value = self
.table_metadata_manager
.datanode_table_manager()
.get(&DatanodeTableKey {
datanode_id,
table_id,
})
.await
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to get DatanodeTable: {table_id}"),
})?
.context(error::DatanodeTableNotFoundSnafu {
table_id,
datanode_id,
})?;
Ok(datanode_table_value)
}
/// Broadcasts the invalidate table cache message.
pub async fn invalidate_table_cache(&self) -> Result<()> {
let table_id = self.persistent_ctx.table_id;
let group_id = self.persistent_ctx.group_id;
let subject = format!(
"Invalidate table cache for repartition table, group: {}, table: {}",
group_id, table_id,
);
let ctx = common_meta::cache_invalidator::Context {
subject: Some(subject),
};
let _ = self
.cache_invalidator
.invalidate(&ctx, &[CacheIdent::TableId(table_id)])
.await;
Ok(())
}
/// Updates the table route.
///
/// Retry:
/// - Failed to retrieve the metadata of datanode table.
///
/// Abort:
/// - Table route not found.
/// - Failed to update the table route.
pub async fn update_table_route(
&self,
current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
new_region_routes: Vec<RegionRoute>,
) -> Result<()> {
let table_id = self.persistent_ctx.table_id;
// Safety: prepare result is set in [RepartitionStart] state.
let prepare_result = self.persistent_ctx.group_prepare_result.as_ref().unwrap();
let central_region_datanode_table_value = self
.get_datanode_table_value(table_id, prepare_result.central_region_datanode_id)
.await?;
let RegionInfo {
region_options,
region_wal_options,
..
} = &central_region_datanode_table_value.region_info;
self.table_metadata_manager
.update_table_route(
table_id,
central_region_datanode_table_value.region_info.clone(),
current_table_route_value,
new_region_routes,
region_options,
region_wal_options,
)
.await
.context(error::TableMetadataManagerSnafu)
}
}
/// Returns the region routes of the given table route value.
///
/// Abort:
/// - Table route value is not physical.
pub fn region_routes(
table_id: TableId,
table_route_value: &TableRouteValue,
) -> Result<&Vec<RegionRoute>> {
table_route_value
.region_routes()
.with_context(|_| error::UnexpectedLogicalRouteTableSnafu {
err_msg: format!(
"TableRoute({:?}) is a non-physical TableRouteValue.",
table_id
),
})
}
#[async_trait::async_trait]
@@ -262,23 +151,4 @@ mod tests {
let err = ctx.get_table_route_value().await.unwrap_err();
assert!(err.is_retryable());
}
#[tokio::test]
async fn test_get_datanode_table_value_retry_error() {
let kv = MockKvBackendBuilder::default()
.range_fn(Arc::new(|_| {
common_meta::error::UnexpectedSnafu {
err_msg: "mock err",
}
.fail()
}))
.build()
.unwrap();
let mut env = TestingEnv::new();
env.table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new(kv)));
let persistent_context = new_persistent_context(1024, vec![], vec![]);
let ctx = env.create_context(persistent_context);
let err = ctx.get_datanode_table_value(1024, 1).await.unwrap_err();
assert!(err.is_retryable());
}
}

View File

@@ -22,9 +22,7 @@ use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use crate::error::{self, Result};
use crate::procedure::repartition::group::{
Context, GroupId, GroupPrepareResult, State, region_routes,
};
use crate::procedure::repartition::group::{Context, GroupId, GroupPrepareResult, State};
use crate::procedure::repartition::plan::RegionDescriptor;
#[derive(Debug, Serialize, Deserialize)]
@@ -69,6 +67,7 @@ impl RepartitionStart {
}
);
let central_region = sources[0].region_id;
let region_routes_map = region_routes
.iter()
.map(|r| (r.region.id, r))
@@ -94,26 +93,14 @@ impl RepartitionStart {
group_id,
region_id: t.region_id,
})
.map(|r| (*r).clone())
.and_then(|r| ensure_region_route_expr_match(r, t))
})
.collect::<Result<Vec<_>>>()?;
let central_region = sources[0].region_id;
let central_region_datanode_id = source_region_routes[0]
.leader_peer
.as_ref()
.context(error::UnexpectedSnafu {
violated: format!(
"Leader peer is not set for central region: {}",
central_region
),
})?
.id;
Ok(GroupPrepareResult {
source_routes: source_region_routes,
target_routes: target_region_routes,
central_region,
central_region_datanode_id,
})
}
@@ -148,7 +135,14 @@ impl State for RepartitionStart {
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
let table_route_value = ctx.get_table_route_value().await?.into_inner();
let region_routes = region_routes(table_id, &table_route_value)?;
let region_routes = table_route_value.region_routes().with_context(|_| {
error::UnexpectedLogicalRouteTableSnafu {
err_msg: format!(
"TableRoute({:?}) is a non-physical TableRouteValue.",
table_id
),
}
})?;
let group_prepare_result = Self::ensure_route_present(
group_id,
region_routes,
@@ -240,6 +234,43 @@ mod tests {
)
.unwrap_err();
assert_matches!(err, Error::PartitionExprMismatch { .. });
let source_region = RegionDescriptor {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 100),
};
let target_region = RegionDescriptor {
region_id: RegionId::new(1024, 2),
partition_expr: range_expr("x", 0, 10),
};
let region_routes = vec![
RegionRoute {
region: Region {
id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
},
RegionRoute {
region: Region {
id: RegionId::new(1024, 2),
partition_expr: range_expr("x", 0, 5).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
},
];
let err = RepartitionStart::ensure_route_present(
Uuid::new_v4(),
&region_routes,
&[source_region],
&[target_region],
)
.unwrap_err();
assert_matches!(err, Error::PartitionExprMismatch { .. });
}
#[test]

View File

@@ -1,80 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod apply_staging_region;
pub(crate) mod rollback_staging_region;
use std::any::Any;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::warn;
use serde::{Deserialize, Serialize};
use crate::error::Result;
use crate::procedure::repartition::group::repartition_start::RepartitionStart;
use crate::procedure::repartition::group::{Context, State};
#[derive(Debug, Serialize, Deserialize)]
pub enum UpdateMetadata {
/// Applies the new partition expressions for staging regions.
ApplyStaging,
/// Rolls back the new partition expressions for staging regions.
RollbackStaging,
}
impl UpdateMetadata {
#[allow(dead_code)]
fn next_state() -> (Box<dyn State>, Status) {
// TODO(weny): change it later.
(Box::new(RepartitionStart), Status::executing(true))
}
}
#[async_trait::async_trait]
#[typetag::serde]
impl State for UpdateMetadata {
async fn next(
&mut self,
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
match self {
UpdateMetadata::ApplyStaging => {
// TODO(weny): If all metadata have already been updated, skip applying staging regions.
self.apply_staging_regions(ctx).await?;
if let Err(err) = ctx.invalidate_table_cache().await {
warn!(
"Failed to broadcast the invalidate table cache message during the apply staging regions, error: {err:?}"
);
};
Ok(Self::next_state())
}
UpdateMetadata::RollbackStaging => {
self.rollback_staging_regions(ctx).await?;
if let Err(err) = ctx.invalidate_table_cache().await {
warn!(
"Failed to broadcast the invalidate table cache message during the rollback staging regions, error: {err:?}"
);
};
Ok(Self::next_state())
}
}
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -1,181 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use common_error::ext::BoxedError;
use common_meta::rpc::router::RegionRoute;
use common_telemetry::error;
use snafu::{OptionExt, ResultExt};
use crate::error::{self, Result};
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::group::{Context, GroupId, region_routes};
use crate::procedure::repartition::plan::RegionDescriptor;
impl UpdateMetadata {
/// Applies the new partition expressions for staging regions.
///
/// Abort:
/// - Target region not found.
/// - Source region not found.
fn apply_staging_region_routes(
group_id: GroupId,
sources: &[RegionDescriptor],
targets: &[RegionDescriptor],
current_region_routes: &[RegionRoute],
) -> Result<Vec<RegionRoute>> {
let mut region_routes = current_region_routes.to_vec();
let mut region_routes_map = region_routes
.iter_mut()
.map(|route| (route.region.id, route))
.collect::<HashMap<_, _>>();
for target in targets {
let region_route = region_routes_map.get_mut(&target.region_id).context(
error::RepartitionTargetRegionMissingSnafu {
group_id,
region_id: target.region_id,
},
)?;
region_route.region.partition_expr = target
.partition_expr
.as_json_str()
.context(error::SerializePartitionExprSnafu)?;
region_route.set_leader_staging();
}
for source in sources {
let region_route = region_routes_map.get_mut(&source.region_id).context(
error::RepartitionSourceRegionMissingSnafu {
group_id,
region_id: source.region_id,
},
)?;
region_route.set_leader_staging();
}
Ok(region_routes)
}
/// Applies the new partition expressions for staging regions.
///
/// Abort:
/// - Table route is not physical.
/// - Target region not found.
/// - Source region not found.
/// - Failed to update the table route.
/// - Central region datanode table value not found.
#[allow(dead_code)]
pub(crate) async fn apply_staging_regions(&self, ctx: &mut Context) -> Result<()> {
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
let current_table_route_value = ctx.get_table_route_value().await?;
let region_routes = region_routes(table_id, current_table_route_value.get_inner_ref())?;
let new_region_routes = Self::apply_staging_region_routes(
group_id,
&ctx.persistent_ctx.sources,
&ctx.persistent_ctx.targets,
region_routes,
)?;
if let Err(err) = ctx
.update_table_route(&current_table_route_value, new_region_routes)
.await
{
error!(err; "Failed to update the table route during the updating metadata for repartition: {table_id}, group_id: {group_id}");
return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
reason: format!(
"Failed to update the table route during the updating metadata for repartition: {table_id}, group_id: {group_id}"
),
});
};
Ok(())
}
}
#[cfg(test)]
mod tests {
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use store_api::storage::RegionId;
use uuid::Uuid;
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::test_util::range_expr;
#[test]
fn test_generate_region_routes() {
let group_id = Uuid::new_v4();
let table_id = 1024;
let region_routes = vec![
RegionRoute {
region: Region {
id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
},
RegionRoute {
region: Region {
id: RegionId::new(table_id, 2),
partition_expr: String::new(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
},
RegionRoute {
region: Region {
id: RegionId::new(table_id, 3),
partition_expr: String::new(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
},
];
let source_region = RegionDescriptor {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 100),
};
let target_region = RegionDescriptor {
region_id: RegionId::new(table_id, 2),
partition_expr: range_expr("x", 0, 10),
};
let new_region_routes = UpdateMetadata::apply_staging_region_routes(
group_id,
&[source_region],
&[target_region],
&region_routes,
)
.unwrap();
assert!(new_region_routes[0].is_leader_staging());
assert_eq!(
new_region_routes[0].region.partition_expr,
range_expr("x", 0, 100).as_json_str().unwrap()
);
assert_eq!(
new_region_routes[1].region.partition_expr,
range_expr("x", 0, 10).as_json_str().unwrap()
);
assert!(new_region_routes[1].is_leader_staging());
assert!(!new_region_routes[2].is_leader_staging());
}
}

View File

@@ -1,187 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use common_error::ext::BoxedError;
use common_meta::rpc::router::RegionRoute;
use common_telemetry::error;
use snafu::{OptionExt, ResultExt};
use crate::error::{self, Result};
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::group::{Context, GroupId, region_routes};
impl UpdateMetadata {
/// Rolls back the staging regions.
///
/// Abort:
/// - Source region not found.
/// - Target region not found.
#[allow(dead_code)]
fn rollback_staging_region_routes(
group_id: GroupId,
source_routes: &[RegionRoute],
target_routes: &[RegionRoute],
current_region_routes: &[RegionRoute],
) -> Result<Vec<RegionRoute>> {
let mut region_routes = current_region_routes.to_vec();
let mut region_routes_map = region_routes
.iter_mut()
.map(|route| (route.region.id, route))
.collect::<HashMap<_, _>>();
for source in source_routes {
let region_route = region_routes_map.get_mut(&source.region.id).context(
error::RepartitionSourceRegionMissingSnafu {
group_id,
region_id: source.region.id,
},
)?;
region_route.region.partition_expr = source.region.partition_expr.clone();
region_route.clear_leader_staging();
}
for target in target_routes {
let region_route = region_routes_map.get_mut(&target.region.id).context(
error::RepartitionTargetRegionMissingSnafu {
group_id,
region_id: target.region.id,
},
)?;
region_route.clear_leader_staging();
}
Ok(region_routes)
}
/// Rolls back the metadata for staging regions.
///
/// Abort:
/// - Table route is not physical.
/// - Source region not found.
/// - Target region not found.
/// - Failed to update the table route.
/// - Central region datanode table value not found.
#[allow(dead_code)]
pub(crate) async fn rollback_staging_regions(&self, ctx: &mut Context) -> Result<()> {
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
let current_table_route_value = ctx.get_table_route_value().await?;
let region_routes = region_routes(table_id, current_table_route_value.get_inner_ref())?;
// Safety: prepare result is set in [RepartitionStart] state.
let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
let new_region_routes = Self::rollback_staging_region_routes(
group_id,
&prepare_result.source_routes,
&prepare_result.target_routes,
region_routes,
)?;
if let Err(err) = ctx
.update_table_route(&current_table_route_value, new_region_routes)
.await
{
error!(err; "Failed to update the table route during the updating metadata for repartition: {table_id}, group_id: {group_id}");
return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
reason: format!(
"Failed to update the table route during the updating metadata for repartition: {table_id}, group_id: {group_id}"
),
});
};
Ok(())
}
}
#[cfg(test)]
mod tests {
use common_meta::peer::Peer;
use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
use store_api::storage::RegionId;
use uuid::Uuid;
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::test_util::range_expr;
#[test]
fn test_rollback_staging_region_routes() {
let group_id = Uuid::new_v4();
let table_id = 1024;
let region_routes = vec![
RegionRoute {
region: Region {
id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
leader_state: Some(LeaderState::Staging),
..Default::default()
},
RegionRoute {
region: Region {
id: RegionId::new(table_id, 2),
partition_expr: String::new(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
leader_state: Some(LeaderState::Staging),
..Default::default()
},
RegionRoute {
region: Region {
id: RegionId::new(table_id, 3),
partition_expr: String::new(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
leader_state: Some(LeaderState::Downgrading),
..Default::default()
},
];
let source_routes = vec![RegionRoute {
region: Region {
id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 20).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}];
let target_routes = vec![RegionRoute {
region: Region {
id: RegionId::new(table_id, 2),
partition_expr: range_expr("x", 0, 20).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}];
let new_region_routes = UpdateMetadata::rollback_staging_region_routes(
group_id,
&source_routes,
&target_routes,
&region_routes,
)
.unwrap();
assert!(!new_region_routes[0].is_leader_staging());
assert_eq!(
new_region_routes[0].region.partition_expr,
range_expr("x", 0, 20).as_json_str().unwrap(),
);
assert!(!new_region_routes[1].is_leader_staging());
assert!(new_region_routes[2].is_leader_downgrading());
}
}

View File

@@ -21,6 +21,6 @@ use store_api::storage::RegionId;
pub struct RegionDescriptor {
/// The region id of the region involved in the plan.
pub region_id: RegionId,
/// The new partition expression of the region.
/// The partition expression of the region.
pub partition_expr: PartitionExpr,
}

View File

@@ -16,22 +16,17 @@ use std::sync::Arc;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::sequence::SequenceBuilder;
use datatypes::value::Value;
use partition::expr::{PartitionExpr, col};
use store_api::storage::TableId;
use uuid::Uuid;
use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::metasrv::MetasrvInfo;
use crate::procedure::repartition::group::{Context, PersistentContext};
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::test_util::MailboxContext;
/// `TestingEnv` provides components during the tests.
pub struct TestingEnv {
pub table_metadata_manager: TableMetadataManagerRef,
pub mailbox_ctx: MailboxContext,
}
impl Default for TestingEnv {
@@ -44,28 +39,16 @@ impl TestingEnv {
pub fn new() -> Self {
let kv_backend = Arc::new(MemoryKvBackend::new());
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
let mailbox_sequence =
SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build();
let mailbox_ctx = MailboxContext::new(mailbox_sequence);
Self {
table_metadata_manager,
mailbox_ctx,
}
}
pub fn create_context(self, persistent_context: PersistentContext) -> Context {
let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new(
self.mailbox_ctx.mailbox().clone(),
MetasrvInfo {
server_addr: String::new(),
},
));
Context {
persistent_ctx: persistent_context,
table_metadata_manager: self.table_metadata_manager.clone(),
cache_invalidator,
}
}
}

View File

@@ -23,7 +23,6 @@ mod options;
mod put;
mod read;
mod region_metadata;
mod staging;
mod state;
mod sync;
@@ -212,13 +211,6 @@ impl RegionEngine for MetricEngine {
let mut extension_return_value = HashMap::new();
let result = match request {
RegionRequest::EnterStaging(_) => {
if self.inner.is_physical_region(region_id) {
self.handle_enter_staging_request(region_id, request).await
} else {
UnsupportedRegionRequestSnafu { request }.fail()
}
}
RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
RegionRequest::Create(create) => {
self.inner

View File

@@ -1,54 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::AffectedRows;
use snafu::ResultExt;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{EnterStagingRequest, RegionRequest};
use store_api::storage::RegionId;
use crate::engine::MetricEngine;
use crate::error::{MitoEnterStagingOperationSnafu, Result};
use crate::utils;
impl MetricEngine {
/// Handles the enter staging request for the given region.
pub(crate) async fn handle_enter_staging_request(
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<AffectedRows> {
let metadata_region_id = utils::to_metadata_region_id(region_id);
let data_region_id = utils::to_data_region_id(region_id);
// For metadata region, it doesn't care about the partition expr, so we can just pass an empty string.
self.inner
.mito
.handle_request(
metadata_region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: String::new(),
}),
)
.await
.context(MitoEnterStagingOperationSnafu)?;
self.inner
.mito
.handle_request(data_region_id, request)
.await
.context(MitoEnterStagingOperationSnafu)
.map(|response| response.affected_rows)
}
}

View File

@@ -156,13 +156,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Mito enter staging operation fails"))]
MitoEnterStagingOperation {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to collect record batch stream"))]
CollectRecordBatchStream {
source: common_recordbatch::error::Error,
@@ -367,7 +360,6 @@ impl ErrorExt for Error {
| MitoWriteOperation { source, .. }
| MitoFlushOperation { source, .. }
| MitoSyncOperation { source, .. }
| MitoEnterStagingOperation { source, .. }
| BatchOpenMitoRegion { source, .. }
| BatchCatchupMitoRegion { source, .. } => source.status_code(),

View File

@@ -55,7 +55,7 @@ lazy_static = "1.4"
log-store = { workspace = true }
mito-codec.workspace = true
moka = { workspace = true, features = ["sync", "future"] }
object-store = { workspace = true, features = ["testing"] }
object-store.workspace = true
parquet = { workspace = true, features = ["async"] }
paste.workspace = true
pin-project.workspace = true

View File

@@ -501,7 +501,7 @@ impl Compactor for DefaultCompactor {
// TODO: We might leak files if we fail to update manifest. We can add a cleanup task to remove them later.
compaction_region
.manifest_ctx
.update_manifest(RegionLeaderState::Writable, action_list, false)
.update_manifest(RegionLeaderState::Writable, action_list)
.await?;
Ok(edit)

View File

@@ -117,7 +117,7 @@ impl CompactionTaskImpl {
};
if let Err(e) = compaction_region
.manifest_ctx
.update_manifest(current_region_state, action_list, false)
.update_manifest(current_region_state, action_list)
.await
{
warn!(

View File

@@ -901,7 +901,7 @@ async fn test_alter_region_ttl_options_with_format(flat_format: bool) {
check_ttl(&engine, &Duration::from_secs(500));
}
#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn test_write_stall_on_altering() {
common_telemetry::init_default_ut_logging();
@@ -952,8 +952,6 @@ async fn test_write_stall_on_altering_with_format(flat_format: bool) {
.await
.unwrap();
});
// Make sure the loop is handling the alter request.
tokio::time::sleep(Duration::from_millis(100)).await;
let column_schemas_cloned = column_schemas.clone();
let engine_cloned = engine.clone();
@@ -964,8 +962,6 @@ async fn test_write_stall_on_altering_with_format(flat_format: bool) {
};
put_rows(&engine_cloned, region_id, rows).await;
});
// Make sure the loop is handling the put request.
tokio::time::sleep(Duration::from_millis(100)).await;
listener.wake_notify();
alter_job.await.unwrap();

View File

@@ -74,9 +74,6 @@ pub trait EventListener: Send + Sync {
/// Notifies the listener that region starts to send a region change result to worker.
async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {}
/// Notifies the listener that region starts to send a enter staging result to worker.
async fn on_enter_staging_result_begin(&self, _region_id: RegionId) {}
/// Notifies the listener that the index build task is executed successfully.
async fn on_index_build_finish(&self, _region_file_id: RegionFileId) {}
@@ -310,37 +307,6 @@ impl EventListener for NotifyRegionChangeResultListener {
region_id
);
self.notify.notified().await;
info!(
"Continue to sending region change result for region {}",
region_id
);
}
}
#[derive(Default)]
pub struct NotifyEnterStagingResultListener {
notify: Notify,
}
impl NotifyEnterStagingResultListener {
/// Continue to sending enter staging result.
pub fn wake_notify(&self) {
self.notify.notify_one();
}
}
#[async_trait]
impl EventListener for NotifyEnterStagingResultListener {
async fn on_enter_staging_result_begin(&self, region_id: RegionId) {
info!(
"Wait on notify to start notify enter staging result for region {}",
region_id
);
self.notify.notified().await;
info!(
"Continue to sending enter staging result for region {}",
region_id
);
}
}

View File

@@ -14,30 +14,17 @@
//! Integration tests for staging state functionality.
use std::assert_matches::assert_matches;
use std::fs;
use std::sync::Arc;
use std::time::Duration;
use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
use object_store::Buffer;
use object_store::layers::mock::{
Entry, Error as MockError, ErrorKind, List, Lister, Metadata, MockLayerBuilder,
Result as MockResult, Write, Writer,
};
use store_api::region_engine::{RegionEngine, SettableRegionRoleState};
use store_api::region_request::{
EnterStagingRequest, RegionAlterRequest, RegionFlushRequest, RegionRequest,
RegionTruncateRequest,
RegionAlterRequest, RegionFlushRequest, RegionRequest, RegionTruncateRequest,
};
use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::engine::listener::NotifyEnterStagingResultListener;
use crate::error::Error;
use crate::region::{RegionLeaderState, RegionRoleState};
use crate::request::WorkerRequest;
use crate::test_util::{CreateRequestBuilder, TestEnv, build_rows, put_rows, rows_schema};
@@ -227,8 +214,6 @@ async fn test_staging_state_validation_patterns() {
);
}
const PARTITION_EXPR: &str = "partition_expr";
#[tokio::test]
async fn test_staging_manifest_directory() {
test_staging_manifest_directory_with_format(false).await;
@@ -236,7 +221,6 @@ async fn test_staging_manifest_directory() {
}
async fn test_staging_manifest_directory_with_format(flat_format: bool) {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
@@ -271,57 +255,9 @@ async fn test_staging_manifest_directory_with_format(flat_format: bool) {
// Now test staging mode manifest creation
// Set region to staging mode using the engine API
engine
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: PARTITION_EXPR.to_string(),
}),
)
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader)
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();
let staging_partition_expr = region.staging_partition_expr.lock().unwrap().clone();
assert_eq!(staging_partition_expr.unwrap(), PARTITION_EXPR);
{
let manager = region.manifest_ctx.manifest_manager.read().await;
assert_eq!(
manager
.staging_manifest()
.unwrap()
.metadata
.partition_expr
.as_deref()
.unwrap(),
PARTITION_EXPR
);
assert!(manager.manifest().metadata.partition_expr.is_none());
}
// Should be ok to enter staging mode again with the same partition expr
engine
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: PARTITION_EXPR.to_string(),
}),
)
.await
.unwrap();
// Should throw error if try to enter staging mode again with a different partition expr
let err = engine
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: "".to_string(),
}),
)
.await
.unwrap_err();
assert_matches!(
err.into_inner().as_any().downcast_ref::<Error>().unwrap(),
Error::StagingPartitionExprMismatch { .. }
);
// Put some data and flush in staging mode
let rows_data = Rows {
@@ -376,7 +312,6 @@ async fn test_staging_exit_success_with_manifests() {
}
async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool) {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
@@ -395,28 +330,16 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
.await
.unwrap();
// Add some data and flush in staging mode to generate staging manifests
let rows_data = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows_data).await;
// Enter staging mode
engine
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: PARTITION_EXPR.to_string(),
}),
)
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader)
.await
.unwrap();
// Add some data and flush in staging mode to generate staging manifests
let rows_data = Rows {
schema: column_schemas.clone(),
rows: build_rows(3, 8),
rows: build_rows(0, 5),
};
put_rows(&engine, region_id, rows_data).await;
@@ -434,7 +357,7 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
// Add more data and flush again to generate multiple staging manifests
let rows_data2 = Rows {
schema: column_schemas.clone(),
rows: build_rows(8, 10),
rows: build_rows(5, 10),
};
put_rows(&engine, region_id, rows_data2).await;
@@ -459,11 +382,8 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
.unwrap();
assert_eq!(
staging_files_before.len(),
// Two files for flush operation
// One file for entering staging mode
3,
"Staging manifest directory should contain 3 files before exit, got: {:?}",
staging_files_before
2,
"Staging manifest directory should contain two files before exit"
);
// Count normal manifest files before exit
@@ -474,11 +394,8 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
.unwrap();
let normal_count_before = normal_files_before.len();
assert_eq!(
// One file for table creation
// One file for flush operation
normal_count_before,
2,
"Normal manifest directory should initially contain 2 files"
normal_count_before, 1,
"Normal manifest directory should initially contain one file"
);
// Try read data before exiting staging, SST files should be invisible
@@ -486,8 +403,8 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
let scanner = engine.scanner(region_id, request).await.unwrap();
assert_eq!(
scanner.num_files(),
1,
"1 SST files should be scanned before exit"
0,
"No SST files should be scanned before exit"
);
assert_eq!(
scanner.num_memtables(),
@@ -498,20 +415,14 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
let batches = RecordBatches::try_collect(stream).await.unwrap();
let total_rows: usize = batches.iter().map(|rb| rb.num_rows()).sum();
assert_eq!(
total_rows, 3,
"3 rows should be readable before exit staging mode"
total_rows, 0,
"No data should be readable before exit staging mode"
);
// Inspect SSTs from manifest
let sst_entries = engine.all_ssts_from_manifest().await;
assert_eq!(
sst_entries.len(),
3,
"sst entries should be 3, got: {:?}",
sst_entries
);
assert_eq!(sst_entries.iter().filter(|e| e.visible).count(), 1);
assert_eq!(sst_entries.iter().filter(|e| !e.visible).count(), 2);
assert_eq!(sst_entries.len(), 2);
assert!(sst_entries.iter().all(|e| !e.visible));
// Exit staging mode successfully
engine
@@ -559,7 +470,7 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
let scanner = engine.scanner(region_id, request).await.unwrap();
assert_eq!(
scanner.num_files(),
3,
2,
"SST files should be scanned after exit"
);
@@ -571,209 +482,6 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
// Inspect SSTs from manifest
let sst_entries = engine.all_ssts_from_manifest().await;
assert_eq!(sst_entries.len(), 3);
assert_eq!(sst_entries.len(), 2);
assert!(sst_entries.iter().all(|e| e.visible));
}
#[tokio::test(flavor = "multi_thread")]
async fn test_write_stall_on_enter_staging() {
test_write_stall_on_enter_staging_with_format(false).await;
test_write_stall_on_enter_staging_with_format(true).await;
}
async fn test_write_stall_on_enter_staging_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let listener = Arc::new(NotifyEnterStagingResultListener::default());
let engine = env
.create_engine_with(
MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
},
None,
Some(listener.clone()),
None,
)
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let engine_cloned = engine.clone();
let alter_job = tokio::spawn(async move {
engine_cloned
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: PARTITION_EXPR.to_string(),
}),
)
.await
.unwrap();
});
// Make sure the loop is handling the alter request.
tokio::time::sleep(Duration::from_millis(100)).await;
let column_schemas_cloned = column_schemas.clone();
let engine_cloned = engine.clone();
let put_job = tokio::spawn(async move {
let rows = Rows {
schema: column_schemas_cloned,
rows: build_rows(0, 3),
};
put_rows(&engine_cloned, region_id, rows).await;
});
// Make sure the loop is handling the put request.
tokio::time::sleep(Duration::from_millis(100)).await;
listener.wake_notify();
alter_job.await.unwrap();
put_job.await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).await.unwrap();
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, batches.pretty_print().unwrap());
}
#[tokio::test]
async fn test_enter_staging_clean_staging_manifest_error() {
common_telemetry::init_default_ut_logging();
test_enter_staging_clean_staging_manifest_error_with_format(false).await;
test_enter_staging_clean_staging_manifest_error_with_format(true).await;
}
struct MockLister {
path: String,
inner: Lister,
}
impl List for MockLister {
async fn next(&mut self) -> MockResult<Option<Entry>> {
if self.path.contains("staging") {
return Err(MockError::new(ErrorKind::Unexpected, "mock error"));
}
self.inner.next().await
}
}
struct MockWriter {
path: String,
inner: Writer,
}
impl Write for MockWriter {
async fn write(&mut self, bs: Buffer) -> MockResult<()> {
self.inner.write(bs).await
}
async fn close(&mut self) -> MockResult<Metadata> {
if self.path.contains("staging") {
return Err(MockError::new(ErrorKind::Unexpected, "mock error"));
}
self.inner.close().await
}
async fn abort(&mut self) -> MockResult<()> {
self.inner.abort().await
}
}
async fn test_enter_staging_error(env: &mut TestEnv, flat_format: bool) {
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let region_id = RegionId::new(1024, 0);
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let err = engine
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: PARTITION_EXPR.to_string(),
}),
)
.await
.unwrap_err();
assert_eq!(err.status_code(), StatusCode::StorageUnavailable);
let region = engine.get_region(region_id).unwrap();
assert!(
region
.manifest_ctx
.manifest_manager
.read()
.await
.staging_manifest()
.is_none()
);
let state = region.state();
assert_eq!(state, RegionRoleState::Leader(RegionLeaderState::Writable));
}
async fn test_enter_staging_clean_staging_manifest_error_with_format(flat_format: bool) {
let mock_layer = MockLayerBuilder::default()
.lister_factory(Arc::new(|path, _args, lister| {
Box::new(MockLister {
path: path.to_string(),
inner: lister,
})
}))
.build()
.unwrap();
let mut env = TestEnv::new().await.with_mock_layer(mock_layer);
test_enter_staging_error(&mut env, flat_format).await;
}
#[tokio::test]
async fn test_enter_staging_save_staging_manifest_error() {
common_telemetry::init_default_ut_logging();
test_enter_staging_save_staging_manifest_error_with_format(false).await;
test_enter_staging_save_staging_manifest_error_with_format(true).await;
}
async fn test_enter_staging_save_staging_manifest_error_with_format(flat_format: bool) {
let mock_layer = MockLayerBuilder::default()
.writer_factory(Arc::new(|path, _args, lister| {
Box::new(MockWriter {
path: path.to_string(),
inner: lister,
})
}))
.build()
.unwrap();
let mut env = TestEnv::new().await.with_mock_layer(mock_layer);
test_enter_staging_error(&mut env, flat_format).await;
}

View File

@@ -1150,18 +1150,6 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Staging partition expr mismatch, manifest: {:?}, request: {}",
manifest_expr,
request_expr
))]
StagingPartitionExprMismatch {
manifest_expr: Option<String>,
request_expr: String,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1208,8 +1196,7 @@ impl ErrorExt for Error {
| InstallManifestTo { .. }
| Unexpected { .. }
| SerializeColumnMetadata { .. }
| SerializeManifest { .. }
| StagingPartitionExprMismatch { .. } => StatusCode::Unexpected,
| SerializeManifest { .. } => StatusCode::Unexpected,
RegionNotFound { .. } => StatusCode::RegionNotFound,
ObjectStoreNotFound { .. }

View File

@@ -208,7 +208,7 @@ impl WriteBufferManager for WriteBufferManagerImpl {
}
/// Reason of a flush task.
#[derive(Debug, IntoStaticStr, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, IntoStaticStr)]
pub enum FlushReason {
/// Other reasons.
Others,
@@ -222,8 +222,6 @@ pub enum FlushReason {
Periodically,
/// Flush memtable during downgrading state.
Downgrading,
/// Enter staging mode.
EnterStaging,
}
impl FlushReason {
@@ -255,8 +253,6 @@ pub(crate) struct RegionFlushTask {
pub(crate) index_options: IndexOptions,
/// Semaphore to control flush concurrency.
pub(crate) flush_semaphore: Arc<Semaphore>,
/// Whether the region is in staging mode.
pub(crate) is_staging: bool,
}
impl RegionFlushTask {
@@ -320,7 +316,6 @@ impl RegionFlushTask {
_timer: timer,
edit,
memtables_to_remove,
is_staging: self.is_staging,
};
WorkerRequest::Background {
region_id: self.region_id,
@@ -403,10 +398,7 @@ impl RegionFlushTask {
flushed_sequence: Some(version_data.committed_sequence),
committed_sequence: None,
};
info!(
"Applying {edit:?} to region {}, is_staging: {}",
self.region_id, self.is_staging
);
info!("Applying {edit:?} to region {}", self.region_id);
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
@@ -425,12 +417,11 @@ impl RegionFlushTask {
// add a cleanup job to remove them later.
let version = self
.manifest_ctx
.update_manifest(expected_state, action_list, self.is_staging)
.update_manifest(expected_state, action_list)
.await?;
info!(
"Successfully update manifest version to {version}, region: {}, is_staging: {}, reason: {}",
"Successfully update manifest version to {version}, region: {}, reason: {}",
self.region_id,
self.is_staging,
self.reason.as_str()
);
@@ -1301,7 +1292,6 @@ mod tests {
.await,
index_options: IndexOptions::default(),
flush_semaphore: Arc::new(Semaphore::new(2)),
is_staging: false,
};
task.push_sender(OptionOutputTx::from(output_tx));
scheduler
@@ -1344,7 +1334,6 @@ mod tests {
manifest_ctx: manifest_ctx.clone(),
index_options: IndexOptions::default(),
flush_semaphore: Arc::new(Semaphore::new(2)),
is_staging: false,
})
.collect();
// Schedule first task.

View File

@@ -25,6 +25,7 @@ use crate::manifest::action::{RegionCheckpoint, RegionManifest};
use crate::manifest::manager::RegionManifestOptions;
use crate::manifest::storage::ManifestObjectStore;
use crate::metrics::MANIFEST_OP_ELAPSED;
use crate::region::{RegionLeaderState, RegionRoleState};
/// [`Checkpointer`] is responsible for doing checkpoint for a region, in an asynchronous way.
#[derive(Debug)]
@@ -136,7 +137,20 @@ impl Checkpointer {
/// Check if it's needed to do checkpoint for the region by the checkpoint distance.
/// If needed, and there's no currently running checkpoint task, it will start a new checkpoint
/// task running in the background.
pub(crate) fn maybe_do_checkpoint(&self, manifest: &RegionManifest) {
pub(crate) fn maybe_do_checkpoint(
&self,
manifest: &RegionManifest,
region_state: RegionRoleState,
) {
// Skip checkpoint if region is in staging state
if region_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
info!(
"Skipping checkpoint for region {} in staging mode, manifest version: {}",
manifest.metadata.region_id, manifest.manifest_version
);
return;
}
if self.manifest_options.checkpoint_distance == 0 {
return;
}

View File

@@ -151,10 +151,6 @@ pub struct RegionManifestManager {
last_version: Arc<AtomicU64>,
checkpointer: Checkpointer,
manifest: Arc<RegionManifest>,
// Staging manifest is used to store the manifest of the staging region before it becomes available.
// It is initially inherited from the previous manifest(i.e., `self.manifest`).
// When the staging manifest becomes available, it will be used to construct the new manifest.
staging_manifest: Option<Arc<RegionManifest>>,
stats: ManifestStats,
stopped: bool,
}
@@ -233,7 +229,6 @@ impl RegionManifestManager {
last_version: manifest_version,
checkpointer,
manifest: Arc::new(manifest),
staging_manifest: None,
stats: stats.clone(),
stopped: false,
})
@@ -339,8 +334,6 @@ impl RegionManifestManager {
last_version: manifest_version,
checkpointer,
manifest: Arc::new(manifest),
// TODO(weny): open the staging manifest if exists.
staging_manifest: None,
stats: stats.clone(),
stopped: false,
}))
@@ -511,7 +504,7 @@ impl RegionManifestManager {
pub async fn update(
&mut self,
action_list: RegionMetaActionList,
is_staging: bool,
region_state: RegionRoleState,
) -> Result<ManifestVersion> {
let _t = MANIFEST_OP_ELAPSED
.with_label_values(&["update"])
@@ -525,19 +518,13 @@ impl RegionManifestManager {
);
let version = self.increase_version();
let is_staging = region_state == RegionRoleState::Leader(RegionLeaderState::Staging);
self.store
.save(version, &action_list.encode()?, is_staging)
.await?;
// For a staging region, the manifest is initially inherited from the previous manifest(i.e., `self.manifest`).
// When the staging manifest becomes available, it will be used to construct the new manifest.
let mut manifest_builder =
if is_staging && let Some(staging_manifest) = self.staging_manifest.as_ref() {
RegionManifestBuilder::with_checkpoint(Some(staging_manifest.as_ref().clone()))
} else {
RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()))
};
RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
for action in action_list.actions {
match action {
RegionMetaAction::Change(action) => {
@@ -557,27 +544,17 @@ impl RegionManifestManager {
}
}
}
let new_manifest = manifest_builder.try_build()?;
new_manifest
.removed_files
.update_file_removed_cnt_to_stats(&self.stats);
let updated_manifest = self
.checkpointer
.update_manifest_removed_files(new_manifest)?;
self.manifest = Arc::new(updated_manifest);
if is_staging {
let new_manifest = manifest_builder.try_build()?;
self.staging_manifest = Some(Arc::new(new_manifest));
info!(
"Skipping checkpoint for region {} in staging mode, manifest version: {}",
self.manifest.metadata.region_id, self.manifest.manifest_version
);
} else {
let new_manifest = manifest_builder.try_build()?;
new_manifest
.removed_files
.update_file_removed_cnt_to_stats(&self.stats);
let updated_manifest = self
.checkpointer
.update_manifest_removed_files(new_manifest)?;
self.manifest = Arc::new(updated_manifest);
self.checkpointer
.maybe_do_checkpoint(self.manifest.as_ref());
}
self.checkpointer
.maybe_do_checkpoint(self.manifest.as_ref(), region_state);
Ok(version)
}
@@ -598,11 +575,6 @@ impl RegionManifestManager {
self.manifest.clone()
}
/// Retrieves the current [RegionManifest].
pub fn staging_manifest(&self) -> Option<Arc<RegionManifest>> {
self.staging_manifest.clone()
}
/// Returns total manifest size.
pub fn manifest_usage(&self) -> u64 {
self.store.total_manifest_size()
@@ -739,22 +711,6 @@ impl RegionManifestManager {
Ok(Some(RegionMetaActionList::new(merged_actions)))
}
/// Unsets the staging manifest.
pub(crate) fn unset_staging_manifest(&mut self) {
self.staging_manifest = None;
}
/// Clear all staging manifests.
pub(crate) async fn clear_staging_manifest_and_dir(&mut self) -> Result<()> {
self.staging_manifest = None;
self.store.clear_staging_manifests().await?;
info!(
"Cleared all staging manifests for region {}",
self.manifest.metadata.region_id
);
Ok(())
}
}
#[cfg(test)]
@@ -881,7 +837,13 @@ mod test {
sst_format: FormatType::PrimaryKey,
}));
let current_version = manager.update(action_list, false).await.unwrap();
let current_version = manager
.update(
action_list,
RegionRoleState::Leader(RegionLeaderState::Writable),
)
.await
.unwrap();
assert_eq!(current_version, 1);
manager.validate_manifest(&new_metadata, 1);
@@ -944,7 +906,13 @@ mod test {
sst_format: FormatType::PrimaryKey,
}));
let current_version = manager.update(action_list, false).await.unwrap();
let current_version = manager
.update(
action_list,
RegionRoleState::Leader(RegionLeaderState::Writable),
)
.await
.unwrap();
assert_eq!(current_version, 1);
manager.validate_manifest(&new_metadata, 1);
@@ -965,7 +933,7 @@ mod test {
flushed_sequence: None,
committed_sequence: None,
})]),
false,
RegionRoleState::Leader(RegionLeaderState::Writable),
)
.await
.unwrap();

View File

@@ -27,6 +27,7 @@ use crate::manifest::action::{
use crate::manifest::manager::RegionManifestManager;
use crate::manifest::storage::CheckpointMetadata;
use crate::manifest::tests::utils::basic_region_metadata;
use crate::region::{RegionLeaderState, RegionRoleState};
use crate::sst::file::FileMeta;
use crate::test_util::TestEnv;
@@ -86,7 +87,13 @@ async fn manager_without_checkpoint() {
// apply 10 actions
for _ in 0..10 {
manager.update(nop_action(), false).await.unwrap();
manager
.update(
nop_action(),
RegionRoleState::Leader(RegionLeaderState::Writable),
)
.await
.unwrap();
}
// no checkpoint
@@ -131,7 +138,13 @@ async fn manager_with_checkpoint_distance_1() {
// apply 10 actions
for _ in 0..10 {
manager.update(nop_action(), false).await.unwrap();
manager
.update(
nop_action(),
RegionRoleState::Leader(RegionLeaderState::Writable),
)
.await
.unwrap();
while manager.checkpointer().is_doing_checkpoint() {
tokio::time::sleep(Duration::from_millis(10)).await;
@@ -192,7 +205,13 @@ async fn test_corrupted_data_causing_checksum_error() {
// Apply actions
for _ in 0..10 {
manager.update(nop_action(), false).await.unwrap();
manager
.update(
nop_action(),
RegionRoleState::Leader(RegionLeaderState::Writable),
)
.await
.unwrap();
}
// Wait for the checkpoint to finish.
@@ -283,7 +302,10 @@ async fn generate_checkpoint_with_compression_types(
let (_env, mut manager) = build_manager(1, compress_type).await;
for action in actions {
manager.update(action, false).await.unwrap();
manager
.update(action, RegionRoleState::Leader(RegionLeaderState::Writable))
.await
.unwrap();
while manager.checkpointer().is_doing_checkpoint() {
tokio::time::sleep(Duration::from_millis(10)).await;
@@ -339,7 +361,10 @@ async fn manifest_install_manifest_to() {
let (env, mut manager) = build_manager(0, CompressionType::Uncompressed).await;
let (files, actions) = generate_action_lists(10);
for action in actions {
manager.update(action, false).await.unwrap();
manager
.update(action, RegionRoleState::Leader(RegionLeaderState::Writable))
.await
.unwrap();
}
// Nothing to install
@@ -377,7 +402,10 @@ async fn manifest_install_manifest_to_with_checkpoint() {
let (env, mut manager) = build_manager(3, CompressionType::Uncompressed).await;
let (files, actions) = generate_action_lists(10);
for action in actions {
manager.update(action, false).await.unwrap();
manager
.update(action, RegionRoleState::Leader(RegionLeaderState::Writable))
.await
.unwrap();
while manager.checkpointer().is_doing_checkpoint() {
tokio::time::sleep(Duration::from_millis(10)).await;
@@ -449,7 +477,13 @@ async fn test_checkpoint_bypass_in_staging_mode() {
// Apply actions in staging mode - checkpoint should be bypassed
for _ in 0..15 {
manager.update(nop_action(), true).await.unwrap();
manager
.update(
nop_action(),
RegionRoleState::Leader(RegionLeaderState::Staging),
)
.await
.unwrap();
}
assert!(!manager.checkpointer().is_doing_checkpoint());
@@ -464,7 +498,13 @@ async fn test_checkpoint_bypass_in_staging_mode() {
);
// Now switch to normal mode and apply one more action
manager.update(nop_action(), false).await.unwrap();
manager
.update(
nop_action(),
RegionRoleState::Leader(RegionLeaderState::Writable),
)
.await
.unwrap();
// Wait for potential checkpoint
while manager.checkpointer().is_doing_checkpoint() {

View File

@@ -22,7 +22,7 @@ pub(crate) mod version;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::sync::{Arc, RwLock};
use common_telemetry::{error, info, warn};
use crossbeam_utils::atomic::AtomicCell;
@@ -77,8 +77,6 @@ pub enum RegionLeaderState {
Writable,
/// The region is in staging mode - writable but no checkpoint/compaction.
Staging,
/// The region is entering staging mode. - write requests will be stalled.
EnteringStaging,
/// The region is altering.
Altering,
/// The region is dropping.
@@ -140,14 +138,6 @@ pub struct MitoRegion {
pub(crate) topic_latest_entry_id: AtomicU64,
/// The total bytes written to the region.
pub(crate) written_bytes: Arc<AtomicU64>,
/// The partition expression of the region in staging mode.
///
/// During the staging mode, the region metadata in [`VersionControlRef`] is not updated,
/// so we need to store the partition expression separately.
/// TODO(weny):
/// 1. Reload the staging partition expr during region open.
/// 2. Rejects requests with mismatching partition expr.
pub(crate) staging_partition_expr: Mutex<Option<String>>,
/// manifest stats
stats: ManifestStats,
}
@@ -336,19 +326,11 @@ impl MitoRegion {
)
}
/// Sets the entering staging state.
pub(crate) fn set_entering_staging(&self) -> Result<()> {
self.compare_exchange_state(
RegionLeaderState::Writable,
RegionRoleState::Leader(RegionLeaderState::EnteringStaging),
)
}
/// Exits the staging state back to writable.
///
/// You should call this method in the worker loop.
/// Transitions from Staging to Writable state.
pub fn exit_staging(&self) -> Result<()> {
fn exit_staging(&self) -> Result<()> {
self.compare_exchange_state(
RegionLeaderState::Staging,
RegionRoleState::Leader(RegionLeaderState::Writable),
@@ -475,7 +457,10 @@ impl MitoRegion {
sst_format: current_version.options.sst_format.unwrap_or_default(),
});
let result = manager
.update(RegionMetaActionList::with_action(action), false)
.update(
RegionMetaActionList::with_action(action),
RegionRoleState::Leader(RegionLeaderState::Writable),
)
.await;
match result {
@@ -507,16 +492,6 @@ impl MitoRegion {
}
}
/// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Staging)` if the current state is `expect`.
/// Otherwise, logs an error.
pub(crate) fn switch_state_to_staging(&self, expect: RegionLeaderState) {
if let Err(e) =
self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Staging))
{
error!(e; "failed to switch region state to staging, expect state is {:?}", expect);
}
}
/// Returns the region statistic.
pub(crate) fn region_statistic(&self) -> RegionStatistic {
let version = self.version();
@@ -600,19 +575,10 @@ impl MitoRegion {
.flat_map(|level| level.files().map(|file| file.file_id().file_id()))
.collect::<HashSet<_>>();
let manifest_files = self.manifest_ctx.manifest().await.files.clone();
let staging_files = self
.manifest_ctx
.staging_manifest()
self.manifest_ctx
.manifest()
.await
.map(|m| m.files.clone())
.unwrap_or_default();
let files = manifest_files
.into_iter()
.chain(staging_files.into_iter())
.collect::<HashMap<_, _>>();
files
.files
.values()
.map(|meta| {
let region_id = self.region_id;
@@ -688,8 +654,9 @@ impl MitoRegion {
};
// Submit merged actions using the manifest manager's update method
// Pass the `false` so it saves to normal directory, not staging
let new_version = manager.update(merged_actions.clone(), false).await?;
// Pass the target state (Writable) so it saves to normal directory, not staging
let target_state = RegionRoleState::Leader(RegionLeaderState::Writable);
let new_version = manager.update(merged_actions.clone(), target_state).await?;
info!(
"Successfully submitted merged staged manifests for region {}, new version: {}",
@@ -764,7 +731,6 @@ impl ManifestContext {
&self,
expect_state: RegionLeaderState,
action_list: RegionMetaActionList,
is_staging: bool,
) -> Result<ManifestVersion> {
// Acquires the write lock of the manifest manager.
let mut manager = self.manifest_manager.write().await;
@@ -840,7 +806,7 @@ impl ManifestContext {
}
// Now we can update the manifest.
let version = manager.update(action_list, is_staging).await.inspect_err(
let version = manager.update(action_list, current_state).await.inspect_err(
|e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
)?;
@@ -947,17 +913,9 @@ impl ManifestContext {
}
}
/// Returns the normal manifest of the region.
pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
self.manifest_manager.read().await.manifest()
}
/// Returns the staging manifest of the region.
pub(crate) async fn staging_manifest(
&self,
) -> Option<Arc<crate::manifest::action::RegionManifest>> {
self.manifest_manager.read().await.staging_manifest()
}
}
pub(crate) type ManifestContextRef = Arc<ManifestContext>;
@@ -1255,8 +1213,8 @@ impl ManifestStats {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex};
use common_datasource::compression::CompressionType;
use common_test_util::temp_dir::create_temp_dir;
@@ -1446,7 +1404,6 @@ mod tests {
topic_latest_entry_id: Default::default(),
written_bytes: Arc::new(AtomicU64::new(0)),
stats: ManifestStats::default(),
staging_partition_expr: Mutex::new(None),
};
// Test initial state

View File

@@ -16,8 +16,8 @@
use std::any::TypeId;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicI64, AtomicU64};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use common_telemetry::{debug, error, info, warn};
@@ -334,7 +334,6 @@ impl RegionOpener {
topic_latest_entry_id: AtomicU64::new(0),
written_bytes: Arc::new(AtomicU64::new(0)),
stats: self.stats,
staging_partition_expr: Mutex::new(None),
}))
}
@@ -564,8 +563,6 @@ impl RegionOpener {
topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id),
written_bytes: Arc::new(AtomicU64::new(0)),
stats: self.stats.clone(),
// TODO(weny): reload the staging partition expr from the manifest.
staging_partition_expr: Mutex::new(None),
};
let region = Arc::new(region);
@@ -976,7 +973,6 @@ fn can_load_cache(state: RegionRoleState) -> bool {
RegionRoleState::Leader(RegionLeaderState::Writable)
| RegionRoleState::Leader(RegionLeaderState::Staging)
| RegionRoleState::Leader(RegionLeaderState::Altering)
| RegionRoleState::Leader(RegionLeaderState::EnteringStaging)
| RegionRoleState::Leader(RegionLeaderState::Editing)
| RegionRoleState::Follower => true,
// The region will be closed soon if it is downgrading.

View File

@@ -35,10 +35,9 @@ use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint}
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
use store_api::region_request::{
AffectedRows, EnterStagingRequest, RegionAlterRequest, RegionBuildIndexRequest,
RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest, RegionCompactRequest,
RegionCreateRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest,
RegionTruncateRequest,
AffectedRows, RegionAlterRequest, RegionBuildIndexRequest, RegionBulkInsertsRequest,
RegionCatchupRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest,
RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest,
};
use store_api::storage::{FileId, RegionId};
use tokio::sync::oneshot::{self, Receiver, Sender};
@@ -726,11 +725,6 @@ impl WorkerRequest {
sender: sender.into(),
request: DdlRequest::Catchup((v, None)),
}),
RegionRequest::EnterStaging(v) => WorkerRequest::Ddl(SenderDdlRequest {
region_id,
sender: sender.into(),
request: DdlRequest::EnterStaging(v),
}),
RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
metadata: region_metadata,
sender: sender.into(),
@@ -828,7 +822,6 @@ pub(crate) enum DdlRequest {
BuildIndex(RegionBuildIndexRequest),
Truncate(RegionTruncateRequest),
Catchup((RegionCatchupRequest, Option<WalEntryReceiver>)),
EnterStaging(EnterStagingRequest),
}
/// Sender and Ddl request.
@@ -865,8 +858,6 @@ pub(crate) enum BackgroundNotify {
RegionChange(RegionChangeResult),
/// Region edit result.
RegionEdit(RegionEditResult),
/// Enter staging result.
EnterStaging(EnterStagingResult),
}
/// Notifies a flush job is finished.
@@ -884,8 +875,6 @@ pub(crate) struct FlushFinished {
pub(crate) edit: RegionEdit,
/// Memtables to remove.
pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
/// Whether the region is in staging mode.
pub(crate) is_staging: bool,
}
impl FlushFinished {
@@ -1010,19 +999,6 @@ pub(crate) struct RegionChangeResult {
pub(crate) new_options: Option<RegionOptions>,
}
/// Notifies the region the result of entering staging.
#[derive(Debug)]
pub(crate) struct EnterStagingResult {
/// Region id.
pub(crate) region_id: RegionId,
/// The new partition expression to apply.
pub(crate) partition_expr: String,
/// Result sender.
pub(crate) sender: OptionOutputTx,
/// Result from the manifest manager.
pub(crate) result: Result<()>,
}
/// Request to edit a region directly.
#[derive(Debug)]
pub(crate) struct RegionEditRequest {

View File

@@ -776,7 +776,6 @@ impl IndexBuildTask {
.update_manifest(
RegionLeaderState::Writable,
RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())),
false,
)
.await?;
info!(

View File

@@ -39,7 +39,7 @@ use common_meta::cache::{new_schema_cache, new_table_schema_cache};
use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_telemetry::{debug, warn};
use common_telemetry::warn;
use common_test_util::temp_dir::{TempDir, create_temp_dir};
use common_wal::options::{KafkaWalOptions, WAL_OPTIONS_KEY, WalOptions};
use datatypes::arrow::array::{TimestampMillisecondArray, UInt8Array, UInt64Array};
@@ -50,7 +50,6 @@ use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::test_util::log_store_util;
use moka::future::CacheBuilder;
use object_store::ObjectStore;
use object_store::layers::mock::MockLayer;
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::services::Fs;
use rskafka::client::partition::{Compression, UnknownTopicHandling};
@@ -229,7 +228,6 @@ pub struct TestEnv {
file_ref_manager: FileReferenceManagerRef,
kv_backend: KvBackendRef,
partition_expr_fetcher: PartitionExprFetcherRef,
object_store_mock_layer: Option<MockLayer>,
}
impl TestEnv {
@@ -266,7 +264,6 @@ impl TestEnv {
file_ref_manager: Arc::new(FileReferenceManager::new(None)),
kv_backend,
partition_expr_fetcher: noop_partition_expr_fetcher(),
object_store_mock_layer: None,
}
}
@@ -276,12 +273,6 @@ impl TestEnv {
self
}
/// Sets the original `object_store_mock_layer`.
pub fn with_mock_layer(mut self, mock_layer: MockLayer) -> TestEnv {
self.object_store_mock_layer = Some(mock_layer);
self
}
pub fn get_object_store(&self) -> Option<ObjectStore> {
self.object_store_manager
.as_ref()
@@ -578,16 +569,7 @@ impl TestEnv {
let data_home = self.data_home.path();
let data_path = data_home.join("data").as_path().display().to_string();
let builder = Fs::default().root(&data_path);
let object_store = if let Some(mock_layer) = self.object_store_mock_layer.as_ref() {
debug!("create object store with mock layer");
ObjectStore::new(builder)
.unwrap()
.layer(mock_layer.clone())
.finish()
} else {
ObjectStore::new(builder).unwrap().finish()
};
let object_store = ObjectStore::new(builder).unwrap().finish();
ObjectStoreManager::new("default", object_store)
}

View File

@@ -21,7 +21,6 @@ mod handle_close;
mod handle_compaction;
mod handle_create;
mod handle_drop;
mod handle_enter_staging;
mod handle_flush;
mod handle_manifest;
mod handle_open;
@@ -1040,7 +1039,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
continue;
}
DdlRequest::Flush(req) => {
self.handle_flush_request(ddl.region_id, req, ddl.sender);
self.handle_flush_request(ddl.region_id, req, ddl.sender)
.await;
continue;
}
DdlRequest::Compact(req) => {
@@ -1063,15 +1063,6 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.await;
continue;
}
DdlRequest::EnterStaging(req) => {
self.handle_enter_staging_request(
ddl.region_id,
req.partition_expr,
ddl.sender,
)
.await;
continue;
}
};
ddl.sender.send(res);
@@ -1120,7 +1111,6 @@ impl<S: LogStore> RegionWorkerLoop<S> {
BackgroundNotify::RegionChange(req) => {
self.handle_manifest_region_change_result(req).await
}
BackgroundNotify::EnterStaging(req) => self.handle_enter_staging_result(req).await,
BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
}
}
@@ -1282,13 +1272,6 @@ impl WorkerListener {
}
}
pub(crate) async fn on_enter_staging_result_begin(&self, _region_id: RegionId) {
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener.on_enter_staging_result_begin(_region_id).await;
}
}
pub(crate) async fn on_index_build_finish(&self, _region_file_id: RegionFileId) {
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {

View File

@@ -113,13 +113,7 @@ impl<S> RegionWorkerLoop<S> {
info!("Flush region: {} before alteration", region_id);
// Try to submit a flush task.
let task = self.new_flush_task(
&region,
FlushReason::Alter,
None,
self.config.clone(),
region.is_staging(),
);
let task = self.new_flush_task(&region, FlushReason::Alter, None, self.config.clone());
if let Err(e) =
self.flush_scheduler
.schedule_flush(region.region_id, &region.version_control, task)

View File

@@ -1,249 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::time::Instant;
use common_telemetry::{error, info, warn};
use store_api::logstore::LogStore;
use store_api::region_request::EnterStagingRequest;
use store_api::storage::RegionId;
use crate::error::{RegionNotFoundSnafu, Result, StagingPartitionExprMismatchSnafu};
use crate::flush::FlushReason;
use crate::manifest::action::{RegionChange, RegionMetaAction, RegionMetaActionList};
use crate::region::{MitoRegionRef, RegionLeaderState};
use crate::request::{
BackgroundNotify, DdlRequest, EnterStagingResult, OptionOutputTx, SenderDdlRequest,
WorkerRequest, WorkerRequestWithTime,
};
use crate::worker::RegionWorkerLoop;
impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) async fn handle_enter_staging_request(
&mut self,
region_id: RegionId,
partition_expr: String,
mut sender: OptionOutputTx,
) {
let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
return;
};
// If the region is already in staging mode, verify the partition expr matches.
if region.is_staging() {
let staging_partition_expr = region.staging_partition_expr.lock().unwrap().clone();
// If the partition expr mismatch, return error.
if staging_partition_expr.as_ref() != Some(&partition_expr) {
sender.send(Err(StagingPartitionExprMismatchSnafu {
manifest_expr: staging_partition_expr,
request_expr: partition_expr,
}
.build()));
return;
}
// If the partition expr matches, return success.
sender.send(Ok(0));
return;
}
let version = region.version();
if !version.memtables.is_empty() {
// If memtable is not empty, we can't enter staging directly and need to flush
// all memtables first.
info!("Flush region: {} before entering staging", region_id);
debug_assert!(!region.is_staging());
let task = self.new_flush_task(
&region,
FlushReason::EnterStaging,
None,
self.config.clone(),
region.is_staging(),
);
if let Err(e) =
self.flush_scheduler
.schedule_flush(region.region_id, &region.version_control, task)
{
// Unable to flush the region, send error to waiter.
sender.send(Err(e));
return;
}
// Safety: We have requested flush.
self.flush_scheduler
.add_ddl_request_to_pending(SenderDdlRequest {
region_id,
sender,
request: DdlRequest::EnterStaging(EnterStagingRequest { partition_expr }),
});
return;
}
self.handle_enter_staging(region, partition_expr, sender);
}
async fn enter_staging(region: &MitoRegionRef, partition_expr: String) -> Result<()> {
let now = Instant::now();
// First step: clear all staging manifest files.
{
let mut manager = region.manifest_ctx.manifest_manager.write().await;
manager
.clear_staging_manifest_and_dir()
.await
.inspect_err(|e| {
error!(
e;
"Failed to clear staging manifest files for region {}",
region.region_id
);
})?;
info!(
"Cleared all staging manifest files for region {}, elapsed: {:?}",
region.region_id,
now.elapsed(),
);
}
// Second step: write new staging manifest.
let mut new_meta = (*region.metadata()).clone();
new_meta.partition_expr = Some(partition_expr.clone());
let sst_format = region.version().options.sst_format.unwrap_or_default();
let change = RegionChange {
metadata: Arc::new(new_meta),
sst_format,
};
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change));
region
.manifest_ctx
.update_manifest(RegionLeaderState::EnteringStaging, action_list, true)
.await?;
Ok(())
}
fn handle_enter_staging(
&self,
region: MitoRegionRef,
partition_expr: String,
sender: OptionOutputTx,
) {
if let Err(e) = region.set_entering_staging() {
sender.send(Err(e));
return;
}
let listener = self.listener.clone();
let request_sender = self.sender.clone();
common_runtime::spawn_global(async move {
let now = Instant::now();
let result = Self::enter_staging(&region, partition_expr.clone()).await;
match result {
Ok(_) => {
info!(
"Created staging manifest for region {}, elapsed: {:?}",
region.region_id,
now.elapsed(),
);
}
Err(ref e) => {
// Unset the staging manifest
region
.manifest_ctx
.manifest_manager
.write()
.await
.unset_staging_manifest();
error!(
"Failed to create staging manifest for region {}: {:?}, elapsed: {:?}",
region.region_id,
e,
now.elapsed(),
);
}
}
let notify = WorkerRequest::Background {
region_id: region.region_id,
notify: BackgroundNotify::EnterStaging(EnterStagingResult {
region_id: region.region_id,
sender,
result,
partition_expr,
}),
};
listener
.on_enter_staging_result_begin(region.region_id)
.await;
if let Err(res) = request_sender
.send(WorkerRequestWithTime::new(notify))
.await
{
warn!(
"Failed to send enter staging result back to the worker, region_id: {}, res: {:?}",
region.region_id, res
);
}
});
}
/// Handles enter staging result.
pub(crate) async fn handle_enter_staging_result(
&mut self,
enter_staging_result: EnterStagingResult,
) {
let region = match self.regions.get_region(enter_staging_result.region_id) {
Some(region) => region,
None => {
self.reject_region_stalled_requests(&enter_staging_result.region_id);
enter_staging_result.sender.send(
RegionNotFoundSnafu {
region_id: enter_staging_result.region_id,
}
.fail(),
);
return;
}
};
if enter_staging_result.result.is_ok() {
info!(
"Updating region {} staging partition expr to {}",
region.region_id, enter_staging_result.partition_expr
);
Self::update_region_staging_partition_expr(
&region,
enter_staging_result.partition_expr,
);
region.switch_state_to_staging(RegionLeaderState::EnteringStaging);
} else {
region.switch_state_to_writable(RegionLeaderState::EnteringStaging);
}
enter_staging_result
.sender
.send(enter_staging_result.result.map(|_| 0));
// Handles the stalled requests.
self.handle_region_stalled_requests(&enter_staging_result.region_id)
.await;
}
fn update_region_staging_partition_expr(region: &MitoRegionRef, partition_expr: String) {
let mut staging_partition_expr = region.staging_partition_expr.lock().unwrap();
debug_assert!(staging_partition_expr.is_none());
*staging_partition_expr = Some(partition_expr);
}
}

View File

@@ -76,13 +76,8 @@ impl<S> RegionWorkerLoop<S> {
if region.last_flush_millis() < min_last_flush_time {
// If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
let task = self.new_flush_task(
region,
FlushReason::EngineFull,
None,
self.config.clone(),
region.is_staging(),
);
let task =
self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
self.flush_scheduler.schedule_flush(
region.region_id,
&region.version_control,
@@ -96,13 +91,8 @@ impl<S> RegionWorkerLoop<S> {
if let Some(region) = max_mem_region
&& !self.flush_scheduler.is_flush_requested(region.region_id)
{
let task = self.new_flush_task(
region,
FlushReason::EngineFull,
None,
self.config.clone(),
region.is_staging(),
);
let task =
self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
self.flush_scheduler
.schedule_flush(region.region_id, &region.version_control, task)?;
}
@@ -117,7 +107,6 @@ impl<S> RegionWorkerLoop<S> {
reason: FlushReason,
row_group_size: Option<usize>,
engine_config: Arc<MitoConfig>,
is_staging: bool,
) -> RegionFlushTask {
RegionFlushTask {
region_id: region.region_id,
@@ -132,14 +121,13 @@ impl<S> RegionWorkerLoop<S> {
manifest_ctx: region.manifest_ctx.clone(),
index_options: region.version().options.index_options.clone(),
flush_semaphore: self.flush_semaphore.clone(),
is_staging,
}
}
}
impl<S: LogStore> RegionWorkerLoop<S> {
/// Handles manual flush request.
pub(crate) fn handle_flush_request(
pub(crate) async fn handle_flush_request(
&mut self,
region_id: RegionId,
request: RegionFlushRequest,
@@ -159,13 +147,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
FlushReason::Manual
};
let mut task = self.new_flush_task(
&region,
reason,
request.row_group_size,
self.config.clone(),
region.is_staging(),
);
let mut task =
self.new_flush_task(&region, reason, request.row_group_size, self.config.clone());
task.push_sender(sender);
if let Err(e) =
self.flush_scheduler
@@ -195,7 +178,6 @@ impl<S: LogStore> RegionWorkerLoop<S> {
FlushReason::Periodically,
None,
self.config.clone(),
region.is_staging(),
);
self.flush_scheduler.schedule_flush(
region.region_id,
@@ -226,8 +208,11 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
};
if request.is_staging {
// Skip the region metadata update.
// Check if region is currently in staging mode
let is_staging = region.manifest_ctx.current_state()
== crate::region::RegionRoleState::Leader(crate::region::RegionLeaderState::Staging);
if is_staging {
info!(
"Skipping region metadata update for region {} in staging mode",
region_id

View File

@@ -346,7 +346,6 @@ impl<S> RegionWorkerLoop<S> {
let request_sender = self.sender.clone();
let manifest_ctx = region.manifest_ctx.clone();
let is_staging = region.is_staging();
// Updates manifest in background.
common_runtime::spawn_global(async move {
@@ -355,7 +354,7 @@ impl<S> RegionWorkerLoop<S> {
RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone()));
let result = manifest_ctx
.update_manifest(RegionLeaderState::Truncating, action_list, is_staging)
.update_manifest(RegionLeaderState::Truncating, action_list)
.await
.map(|_| ());
@@ -392,7 +391,6 @@ impl<S> RegionWorkerLoop<S> {
}
let listener = self.listener.clone();
let request_sender = self.sender.clone();
let is_staging = region.is_staging();
// Now the region is in altering state.
common_runtime::spawn_global(async move {
let new_meta = change.metadata.clone();
@@ -400,7 +398,7 @@ impl<S> RegionWorkerLoop<S> {
let result = region
.manifest_ctx
.update_manifest(RegionLeaderState::Altering, action_list, is_staging)
.update_manifest(RegionLeaderState::Altering, action_list)
.await
.map(|_| ());
let notify = WorkerRequest::Background {
@@ -465,7 +463,6 @@ async fn edit_region(
listener: WorkerListener,
) -> Result<()> {
let region_id = region.region_id;
let is_staging = region.is_staging();
if let Some(write_cache) = cache_manager.write_cache() {
for file_meta in &edit.files_to_add {
let write_cache = write_cache.clone();
@@ -535,7 +532,7 @@ async fn edit_region(
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit));
region
.manifest_ctx
.update_manifest(RegionLeaderState::Editing, action_list, is_staging)
.update_manifest(RegionLeaderState::Editing, action_list)
.await
.map(|_| ())
}

View File

@@ -241,12 +241,6 @@ impl<S> RegionWorkerLoop<S> {
// No such region.
continue;
};
#[cfg(test)]
debug!(
"Handling write request for region {}, state: {:?}",
region_id,
region.state()
);
match region.state() {
RegionRoleState::Leader(RegionLeaderState::Writable)
| RegionRoleState::Leader(RegionLeaderState::Staging) => {
@@ -269,16 +263,6 @@ impl<S> RegionWorkerLoop<S> {
self.stalled_requests.push(sender_req);
continue;
}
RegionRoleState::Leader(RegionLeaderState::EnteringStaging) => {
debug!(
"Region {} is entering staging, add request to pending writes",
region.region_id
);
self.stalling_count.add(1);
WRITE_STALL_TOTAL.inc();
self.stalled_requests.push(sender_req);
continue;
}
state => {
// The region is not writable.
sender_req.sender.send(

View File

@@ -9,7 +9,6 @@ workspace = true
[features]
services-memory = ["opendal/services-memory"]
testing = ["derive_builder"]
[dependencies]
bytes.workspace = true
@@ -17,7 +16,6 @@ common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-telemetry.workspace = true
derive_builder = { workspace = true, optional = true }
futures.workspace = true
humantime-serde.workspace = true
lazy_static.workspace = true

View File

@@ -13,8 +13,6 @@
// limitations under the License.
mod lru_cache;
#[cfg(feature = "testing")]
pub mod mock;
pub use lru_cache::*;
pub use opendal::layers::*;

View File

@@ -1,217 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Debug;
use std::sync::Arc;
use derive_builder::Builder;
pub use oio::*;
pub use opendal::raw::{
Access, Layer, LayeredAccess, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead,
RpWrite, oio,
};
pub use opendal::{Buffer, Error, ErrorKind, Metadata, Result};
pub type MockWriterFactory = Arc<dyn Fn(&str, OpWrite, oio::Writer) -> oio::Writer + Send + Sync>;
pub type MockReaderFactory = Arc<dyn Fn(&str, OpRead, oio::Reader) -> oio::Reader + Send + Sync>;
pub type MockListerFactory = Arc<dyn Fn(&str, OpList, oio::Lister) -> oio::Lister + Send + Sync>;
pub type MockDeleterFactory = Arc<dyn Fn(oio::Deleter) -> oio::Deleter + Send + Sync>;
#[derive(Builder)]
pub struct MockLayer {
#[builder(setter(strip_option), default)]
writer_factory: Option<MockWriterFactory>,
#[builder(setter(strip_option), default)]
reader_factory: Option<MockReaderFactory>,
#[builder(setter(strip_option), default)]
lister_factory: Option<MockListerFactory>,
#[builder(setter(strip_option), default)]
deleter_factory: Option<MockDeleterFactory>,
}
impl Clone for MockLayer {
fn clone(&self) -> Self {
Self {
writer_factory: self.writer_factory.clone(),
reader_factory: self.reader_factory.clone(),
lister_factory: self.lister_factory.clone(),
deleter_factory: self.deleter_factory.clone(),
}
}
}
impl<A: Access> Layer<A> for MockLayer {
type LayeredAccess = MockAccessor<A>;
fn layer(&self, inner: A) -> Self::LayeredAccess {
MockAccessor {
inner,
writer_factory: self.writer_factory.clone(),
reader_factory: self.reader_factory.clone(),
lister_factory: self.lister_factory.clone(),
deleter_factory: self.deleter_factory.clone(),
}
}
}
pub struct MockAccessor<A> {
inner: A,
writer_factory: Option<MockWriterFactory>,
reader_factory: Option<MockReaderFactory>,
lister_factory: Option<MockListerFactory>,
deleter_factory: Option<MockDeleterFactory>,
}
impl<A: Debug> Debug for MockAccessor<A> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MockAccessor")
.field("inner", &self.inner)
.finish()
}
}
pub struct MockReader {
inner: oio::Reader,
}
impl oio::Read for MockReader {
async fn read(&mut self) -> Result<Buffer> {
self.inner.read().await
}
}
pub struct MockWriter {
inner: oio::Writer,
}
impl oio::Write for MockWriter {
async fn write(&mut self, bs: Buffer) -> Result<()> {
self.inner.write(bs).await
}
async fn close(&mut self) -> Result<Metadata> {
self.inner.close().await
}
async fn abort(&mut self) -> Result<()> {
self.inner.abort().await
}
}
pub struct MockLister {
inner: oio::Lister,
}
impl oio::List for MockLister {
async fn next(&mut self) -> Result<Option<oio::Entry>> {
self.inner.next().await
}
}
pub struct MockDeleter {
inner: oio::Deleter,
}
impl oio::Delete for MockDeleter {
fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
self.inner.delete(path, args)
}
async fn flush(&mut self) -> Result<usize> {
self.inner.flush().await
}
}
impl<A: Access> LayeredAccess for MockAccessor<A> {
type Inner = A;
type Reader = MockReader;
type Writer = MockWriter;
type Lister = MockLister;
type Deleter = MockDeleter;
fn inner(&self) -> &Self::Inner {
&self.inner
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
if let Some(reader_factory) = self.reader_factory.as_ref() {
let (rp_read, reader) = self.inner.read(path, args.clone()).await?;
let reader = reader_factory(path, args, Box::new(reader));
Ok((rp_read, MockReader { inner: reader }))
} else {
self.inner.read(path, args).await.map(|(rp_read, reader)| {
(
rp_read,
MockReader {
inner: Box::new(reader),
},
)
})
}
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
if let Some(writer_factory) = self.writer_factory.as_ref() {
let (rp_write, writer) = self.inner.write(path, args.clone()).await?;
let writer = writer_factory(path, args, Box::new(writer));
Ok((rp_write, MockWriter { inner: writer }))
} else {
self.inner
.write(path, args)
.await
.map(|(rp_write, writer)| {
(
rp_write,
MockWriter {
inner: Box::new(writer),
},
)
})
}
}
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
if let Some(deleter_factory) = self.deleter_factory.as_ref() {
let (rp_delete, deleter) = self.inner.delete().await?;
let deleter = deleter_factory(Box::new(deleter));
Ok((rp_delete, MockDeleter { inner: deleter }))
} else {
self.inner.delete().await.map(|(rp_delete, deleter)| {
(
rp_delete,
MockDeleter {
inner: Box::new(deleter),
},
)
})
}
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
if let Some(lister_factory) = self.lister_factory.as_ref() {
let (rp_list, lister) = self.inner.list(path, args.clone()).await?;
let lister = lister_factory(path, args, Box::new(lister));
Ok((rp_list, MockLister { inner: lister }))
} else {
self.inner.list(path, args).await.map(|(rp_list, lister)| {
(
rp_list,
MockLister {
inner: Box::new(lister),
},
)
})
}
}
}

View File

@@ -15,6 +15,7 @@
use std::sync::Arc;
use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
use api::helper::ColumnDataTypeWrapper;
use api::v1::alter_table_expr::Kind;
use api::v1::column_def::options_from_skipping;
use api::v1::region::{
@@ -23,7 +24,7 @@ use api::v1::region::{
};
use api::v1::{
AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests,
RowInsertRequest, RowInsertRequests, SemanticType,
ModifyColumnType, ModifyColumnTypes, RowInsertRequest, RowInsertRequests, SemanticType,
};
use catalog::CatalogManagerRef;
use client::{OutputData, OutputMeta};
@@ -40,6 +41,7 @@ use common_query::Output;
use common_query::prelude::{greptime_timestamp, greptime_value};
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{error, info, warn};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::SkippingIndexOptions;
use futures_util::future;
use meter_macros::write_meter;
@@ -67,8 +69,9 @@ use table::requests::{
use table::table_reference::TableReference;
use crate::error::{
CatalogSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu,
InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu,
CatalogSnafu, ColumnDataTypeSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu,
FindRegionLeaderSnafu, InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result,
TableNotFoundSnafu,
};
use crate::expr_helper;
use crate::region_req_factory::RegionRequestFactory;
@@ -475,6 +478,7 @@ impl Inserter {
/// Creates or alter tables on demand:
/// - if table does not exist, create table by inferred CreateExpr
/// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr`
/// or align the json columns' datatypes with insert values.
///
/// Returns a mapping from table name to table id, where table name is the table name involved in the requests.
/// This mapping is used in the conversion of RowToRegion.
@@ -559,6 +563,10 @@ impl Inserter {
)? {
alter_tables.push(alter_expr);
}
if let Some(expr) = maybe_alter_json_column_type(ctx, &table, req)? {
alter_tables.push(expr);
}
}
None => {
let create_expr =
@@ -981,6 +989,86 @@ impl Inserter {
}
}
fn maybe_alter_json_column_type(
query_context: &QueryContextRef,
table: &TableRef,
request: &RowInsertRequest,
) -> Result<Option<AlterTableExpr>> {
let Some(rows) = request.rows.as_ref() else {
return Ok(None);
};
// Fast path: skip altering json column type if insert request doesn't contain any json values.
let row_schema = &rows.schema;
if row_schema
.iter()
.all(|x| x.datatype() != ColumnDataType::Json)
{
return Ok(None);
}
let table_schema = table.schema_ref();
let mut modify_column_types = vec![];
for value_schema in row_schema {
if let Some(column_schema) = table_schema.column_schema_by_name(&value_schema.column_name)
&& let Some(column_type) = column_schema.data_type.as_json()
{
let value_type: ConcreteDataType = ColumnDataTypeWrapper::new(
value_schema.datatype(),
value_schema.datatype_extension.clone(),
)
.into();
let Some(value_type) = value_type.as_json() else {
return InvalidInsertRequestSnafu {
reason: format!(
"expecting json value for json column '{}', but found type {}",
column_schema.name, value_type
),
}
.fail();
};
if column_type.is_include(value_type) {
continue;
}
let merged = {
let mut column_type = column_type.clone();
column_type.merge(value_type).map_err(|e| {
InvalidInsertRequestSnafu {
reason: format!("insert json value is conflicting with column type: {e}"),
}
.build()
})?;
column_type
};
let (target_type, target_type_extension) =
ColumnDataTypeWrapper::try_from(ConcreteDataType::Json(merged))
.map(|x| x.into_parts())
.context(ColumnDataTypeSnafu)?;
modify_column_types.push(ModifyColumnType {
column_name: column_schema.name.clone(),
target_type: target_type as i32,
target_type_extension,
});
}
}
if modify_column_types.is_empty() {
Ok(None)
} else {
Ok(Some(AlterTableExpr {
catalog_name: query_context.current_catalog().to_string(),
schema_name: query_context.current_schema(),
table_name: table.table_info().name.clone(),
kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes {
modify_column_types,
})),
}))
}
}
fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
for request in &requests.inserts {
let rows = request.rows.as_ref().unwrap();

View File

@@ -46,7 +46,7 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::procedure_executor::ProcedureExecutorRef;
use common_query::Output;
use common_telemetry::{debug, tracing, warn};
use common_telemetry::{debug, tracing};
use common_time::Timestamp;
use common_time::range::TimestampRange;
use datafusion_expr::LogicalPlan;
@@ -488,11 +488,6 @@ impl StatementExecutor {
"@@SESSION.MAX_EXECUTION_TIME" | "MAX_EXECUTION_TIME" => match query_ctx.channel() {
Channel::Mysql => set_query_timeout(set_var.value, query_ctx)?,
Channel::Postgres => {
warn!(
"Unsupported set variable {} for channel {:?}",
var_name,
query_ctx.channel()
);
query_ctx.set_warning(format!("Unsupported set variable {}", var_name))
}
_ => {
@@ -502,23 +497,16 @@ impl StatementExecutor {
.fail();
}
},
"STATEMENT_TIMEOUT" => match query_ctx.channel() {
Channel::Postgres => set_query_timeout(set_var.value, query_ctx)?,
Channel::Mysql => {
warn!(
"Unsupported set variable {} for channel {:?}",
var_name,
query_ctx.channel()
);
query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
}
_ => {
"STATEMENT_TIMEOUT" => {
if query_ctx.channel() == Channel::Postgres {
set_query_timeout(set_var.value, query_ctx)?
} else {
return NotSupportedSnafu {
feat: format!("Unsupported set variable {}", var_name),
}
.fail();
}
},
}
"SEARCH_PATH" => {
if query_ctx.channel() == Channel::Postgres {
set_search_path(set_var.value, query_ctx)?
@@ -530,16 +518,14 @@ impl StatementExecutor {
}
}
_ => {
if query_ctx.channel() == Channel::Postgres || query_ctx.channel() == Channel::Mysql
{
// For unknown SET statements, we give a warning with success.
// This prevents the SET call from becoming a blocker of MySQL/Postgres clients'
// connection establishment.
warn!(
"Unsupported set variable {} for channel {:?}",
var_name,
query_ctx.channel()
);
// for postgres, we give unknown SET statements a warning with
// success, this is prevent the SET call becoming a blocker
// of connection establishment
//
if query_ctx.channel() == Channel::Postgres {
query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
} else if query_ctx.channel() == Channel::Mysql && var_name.starts_with("@@") {
// Just ignore `SET @@` commands for MySQL
query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
} else {
return NotSupportedSnafu {

View File

@@ -776,6 +776,20 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(transparent)]
GreptimeProto {
source: api::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(transparent)]
Datatypes {
source: datatypes::error::Error,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -893,6 +907,9 @@ impl ErrorExt for Error {
FloatIsNan { .. }
| InvalidEpochForResolution { .. }
| UnsupportedTypeInPipeline { .. } => StatusCode::InvalidArguments,
GreptimeProto { source, .. } => source.status_code(),
Datatypes { source, .. } => source.status_code(),
}
}

View File

@@ -19,13 +19,16 @@ use std::collections::{BTreeMap, HashSet};
use std::sync::Arc;
use ahash::{HashMap, HashMapExt};
use api::helper::proto_value_type;
use api::v1::column_data_type_extension::TypeExt;
use api::helper::{ColumnDataTypeWrapper, encode_json_value};
use api::v1::column_def::{collect_column_options, options_from_column_schema};
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType};
use api::v1::{ColumnDataType, SemanticType};
use coerce::{coerce_columns, coerce_value};
use common_query::prelude::{greptime_timestamp, greptime_value};
use common_telemetry::warn;
use datatypes::data_type::ConcreteDataType;
use datatypes::json::JsonStructureSettings;
use datatypes::value::Value;
use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
use itertools::Itertools;
use jsonb::Number;
@@ -33,12 +36,14 @@ use once_cell::sync::OnceCell;
use serde_json as serde_json_crate;
use session::context::Channel;
use snafu::OptionExt;
use table::Table;
use vrl::prelude::{Bytes, VrlValueConvert};
use vrl::value::value::StdError;
use vrl::value::{KeyString, Value as VrlValue};
use crate::error::{
IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, Result,
TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
CoerceIncompatibleTypesSnafu, IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu,
Result, TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu,
};
use crate::etl::PipelineDocVersion;
@@ -269,15 +274,75 @@ impl GreptimeTransformer {
}
}
#[derive(Clone)]
pub struct ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema,
semantic_type: SemanticType,
}
impl From<ColumnSchema> for ColumnMetadata {
fn from(value: ColumnSchema) -> Self {
let datatype = value.datatype();
let semantic_type = value.semantic_type();
let ColumnSchema {
column_name,
datatype: _,
semantic_type: _,
datatype_extension,
options,
} = value;
let column_schema = datatypes::schema::ColumnSchema::new(
column_name,
ColumnDataTypeWrapper::new(datatype, datatype_extension).into(),
semantic_type != SemanticType::Timestamp,
);
let metadata = collect_column_options(options.as_ref());
let column_schema = column_schema.with_metadata(metadata);
Self {
column_schema,
semantic_type,
}
}
}
impl TryFrom<ColumnMetadata> for ColumnSchema {
type Error = api::error::Error;
fn try_from(value: ColumnMetadata) -> std::result::Result<Self, Self::Error> {
let ColumnMetadata {
column_schema,
semantic_type,
} = value;
let options = options_from_column_schema(&column_schema);
let (datatype, datatype_extension) =
ColumnDataTypeWrapper::try_from(column_schema.data_type).map(|x| x.into_parts())?;
Ok(ColumnSchema {
column_name: column_schema.name,
datatype: datatype as _,
semantic_type: semantic_type as _,
datatype_extension,
options,
})
}
}
/// This is used to record the current state schema information and a sequential cache of field names.
/// As you traverse the user input JSON, this will change.
/// It will record a superset of all user input schemas.
#[derive(Debug, Default)]
#[derive(Default)]
pub struct SchemaInfo {
/// schema info
pub schema: Vec<ColumnSchema>,
pub schema: Vec<ColumnMetadata>,
/// index of the column name
pub index: HashMap<String, usize>,
/// The pipeline's corresponding table (if already created). Useful to retrieve column schemas.
table: Option<Arc<Table>>,
}
impl SchemaInfo {
@@ -285,6 +350,7 @@ impl SchemaInfo {
Self {
schema: Vec::with_capacity(capacity),
index: HashMap::with_capacity(capacity),
table: None,
}
}
@@ -294,46 +360,91 @@ impl SchemaInfo {
index.insert(schema.column_name.clone(), i);
}
Self {
schema: schema_list,
schema: schema_list.into_iter().map(Into::into).collect(),
index,
table: None,
}
}
pub fn set_table(&mut self, table: Option<Arc<Table>>) {
self.table = table;
}
fn find_column_schema_in_table(&self, column_name: &str) -> Option<ColumnMetadata> {
if let Some(table) = &self.table
&& let Some(i) = table.schema_ref().column_index_by_name(column_name)
{
let column_schema = table.schema_ref().column_schemas()[i].clone();
let semantic_type = if column_schema.is_time_index() {
SemanticType::Timestamp
} else if table.table_info().meta.primary_key_indices.contains(&i) {
SemanticType::Tag
} else {
SemanticType::Field
};
Some(ColumnMetadata {
column_schema,
semantic_type,
})
} else {
None
}
}
pub fn column_schemas(&self) -> api::error::Result<Vec<ColumnSchema>> {
self.schema
.iter()
.map(|x| x.clone().try_into())
.collect::<api::error::Result<Vec<_>>>()
}
}
fn resolve_schema(
index: Option<usize>,
value_data: ValueData,
column_schema: ColumnSchema,
row: &mut Vec<GreptimeValue>,
pipeline_context: &PipelineContext,
column: &str,
value_type: &ConcreteDataType,
schema_info: &mut SchemaInfo,
) -> Result<()> {
if let Some(index) = index {
let api_value = GreptimeValue {
value_data: Some(value_data),
};
// Safety unwrap is fine here because api_value is always valid
let value_column_data_type = proto_value_type(&api_value).unwrap();
// Safety unwrap is fine here because index is always valid
let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype();
if value_column_data_type != schema_column_data_type {
IdentifyPipelineColumnTypeMismatchSnafu {
column: column_schema.column_name,
expected: schema_column_data_type.as_str_name(),
actual: value_column_data_type.as_str_name(),
let column_type = &mut schema_info.schema[index].column_schema.data_type;
if value_type != column_type {
if let ConcreteDataType::Json(value_type) = value_type
&& let ConcreteDataType::Json(column_type) = column_type
{
if !column_type.is_include(value_type) {
column_type.merge(value_type)?;
}
} else {
return IdentifyPipelineColumnTypeMismatchSnafu {
column,
expected: column_type.to_string(),
actual: value_type.to_string(),
}
.fail();
}
.fail()
} else {
row[index] = api_value;
Ok(())
}
Ok(())
} else {
let key = column_schema.column_name.clone();
let column_schema = schema_info
.find_column_schema_in_table(column)
.unwrap_or_else(|| {
let semantic_type = decide_semantic(pipeline_context, column);
let column_schema = datatypes::schema::ColumnSchema::new(
column,
value_type.clone(),
semantic_type != SemanticType::Timestamp,
);
ColumnMetadata {
column_schema,
semantic_type,
}
});
let key = column.to_string();
schema_info.schema.push(column_schema);
schema_info.index.insert(key, schema_info.schema.len() - 1);
let api_value = GreptimeValue {
value_data: Some(value_data),
};
row.push(api_value);
Ok(())
}
}
@@ -411,11 +522,11 @@ pub(crate) fn values_to_row(
Ok(Row { values: row })
}
fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> i32 {
fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> SemanticType {
if p_ctx.channel == Channel::Prometheus && column_name != greptime_value() {
SemanticType::Tag as i32
SemanticType::Tag
} else {
SemanticType::Field as i32
SemanticType::Field
}
}
@@ -427,55 +538,54 @@ fn resolve_value(
p_ctx: &PipelineContext,
) -> Result<()> {
let index = schema_info.index.get(&column_name).copied();
let mut resolve_simple_type =
|value_data: ValueData, column_name: String, data_type: ColumnDataType| {
let semantic_type = decide_semantic(p_ctx, &column_name);
resolve_schema(
index,
value_data,
ColumnSchema {
column_name,
datatype: data_type as i32,
semantic_type,
datatype_extension: None,
options: None,
},
row,
schema_info,
)
};
match value {
VrlValue::Null => {}
let value_data = match value {
VrlValue::Null => None,
VrlValue::Integer(v) => {
// safe unwrap after type matched
resolve_simple_type(ValueData::I64Value(v), column_name, ColumnDataType::Int64)?;
resolve_schema(
index,
p_ctx,
&column_name,
&ConcreteDataType::int64_datatype(),
schema_info,
)?;
Some(ValueData::I64Value(v))
}
VrlValue::Float(v) => {
// safe unwrap after type matched
resolve_simple_type(
ValueData::F64Value(v.into()),
column_name,
ColumnDataType::Float64,
resolve_schema(
index,
p_ctx,
&column_name,
&ConcreteDataType::float64_datatype(),
schema_info,
)?;
Some(ValueData::F64Value(v.into()))
}
VrlValue::Boolean(v) => {
resolve_simple_type(
ValueData::BoolValue(v),
column_name,
ColumnDataType::Boolean,
resolve_schema(
index,
p_ctx,
&column_name,
&ConcreteDataType::boolean_datatype(),
schema_info,
)?;
Some(ValueData::BoolValue(v))
}
VrlValue::Bytes(v) => {
resolve_simple_type(
ValueData::StringValue(String::from_utf8_lossy_owned(v.to_vec())),
column_name,
ColumnDataType::String,
resolve_schema(
index,
p_ctx,
&column_name,
&ConcreteDataType::binary_datatype(),
schema_info,
)?;
Some(ValueData::BinaryValue(v.into()))
}
VrlValue::Regex(v) => {
@@ -483,42 +593,75 @@ fn resolve_value(
"Persisting regex value in the table, this should not happen, column_name: {}",
column_name
);
resolve_simple_type(
ValueData::StringValue(v.to_string()),
column_name,
ColumnDataType::String,
resolve_schema(
index,
p_ctx,
&column_name,
&ConcreteDataType::string_datatype(),
schema_info,
)?;
Some(ValueData::StringValue(v.to_string()))
}
VrlValue::Timestamp(ts) => {
let ns = ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
input: ts.to_rfc3339(),
})?;
resolve_simple_type(
ValueData::TimestampNanosecondValue(ns),
column_name,
ColumnDataType::TimestampNanosecond,
resolve_schema(
index,
p_ctx,
&column_name,
&ConcreteDataType::timestamp_nanosecond_datatype(),
schema_info,
)?;
Some(ValueData::TimestampNanosecondValue(ns))
}
VrlValue::Array(_) | VrlValue::Object(_) => {
let data = vrl_value_to_jsonb_value(&value);
resolve_schema(
index,
ValueData::BinaryValue(data.to_vec()),
ColumnSchema {
column_name,
datatype: ColumnDataType::Binary as i32,
semantic_type: SemanticType::Field as i32,
datatype_extension: Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
}),
options: None,
},
row,
schema_info,
)?;
let is_json_native_type = schema_info
.find_column_schema_in_table(&column_name)
.is_some_and(|x| {
if let ConcreteDataType::Json(column_type) = &x.column_schema.data_type {
column_type.is_native_type()
} else {
false
}
});
let value = if is_json_native_type {
let settings = JsonStructureSettings::Structured(None);
let value: serde_json::Value = value.try_into().map_err(|e: StdError| {
CoerceIncompatibleTypesSnafu { msg: e.to_string() }.build()
})?;
let value = settings.encode(value)?;
resolve_schema(index, p_ctx, &column_name, &value.data_type(), schema_info)?;
let Value::Json(value) = value else {
unreachable!()
};
ValueData::JsonValue(encode_json_value(*value))
} else {
resolve_schema(
index,
p_ctx,
&column_name,
&ConcreteDataType::json_datatype(),
schema_info,
)?;
let value = vrl_value_to_jsonb_value(&value);
ValueData::BinaryValue(value.to_vec())
};
Some(value)
}
};
let value = GreptimeValue { value_data };
if let Some(index) = index {
row[index] = value;
} else {
row.push(value);
}
Ok(())
}
@@ -556,20 +699,24 @@ fn identity_pipeline_inner(
let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
// set time index column schema first
schema_info.schema.push(ColumnSchema {
column_name: custom_ts
let column_schema = datatypes::schema::ColumnSchema::new(
custom_ts
.map(|ts| ts.get_column_name().to_string())
.unwrap_or_else(|| greptime_timestamp().to_string()),
datatype: custom_ts.map(|c| c.get_datatype()).unwrap_or_else(|| {
if pipeline_ctx.channel == Channel::Prometheus {
ColumnDataType::TimestampMillisecond
} else {
ColumnDataType::TimestampNanosecond
}
}) as i32,
semantic_type: SemanticType::Timestamp as i32,
datatype_extension: None,
options: None,
custom_ts
.map(|c| ConcreteDataType::from(ColumnDataTypeWrapper::new(c.get_datatype(), None)))
.unwrap_or_else(|| {
if pipeline_ctx.channel == Channel::Prometheus {
ConcreteDataType::timestamp_millisecond_datatype()
} else {
ConcreteDataType::timestamp_nanosecond_datatype()
}
}),
false,
);
schema_info.schema.push(ColumnMetadata {
column_schema,
semantic_type: SemanticType::Timestamp,
});
let mut opt_map = HashMap::new();
@@ -627,28 +774,29 @@ pub fn identity_pipeline(
input.push(result);
}
identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, opt_map)| {
identity_pipeline_inner(input, pipeline_ctx).and_then(|(mut schema, opt_map)| {
if let Some(table) = table {
let table_info = table.table_info();
for tag_name in table_info.meta.row_key_column_names() {
if let Some(index) = schema.index.get(tag_name) {
schema.schema[*index].semantic_type = SemanticType::Tag as i32;
schema.schema[*index].semantic_type = SemanticType::Tag;
}
}
}
opt_map
let column_schemas = schema.column_schemas()?;
Ok(opt_map
.into_iter()
.map(|(opt, rows)| {
(
opt,
Rows {
schema: schema.schema.clone(),
schema: column_schemas.clone(),
rows,
},
)
})
.collect::<HashMap<ContextOpt, Rows>>()
.collect::<HashMap<ContextOpt, Rows>>())
})
}
@@ -872,7 +1020,7 @@ mod tests {
.map(|(mut schema, mut rows)| {
for name in tag_column_names {
if let Some(index) = schema.index.get(&name) {
schema.schema[*index].semantic_type = SemanticType::Tag as i32;
schema.schema[*index].semantic_type = SemanticType::Tag;
}
}
@@ -880,7 +1028,7 @@ mod tests {
let rows = rows.remove(&ContextOpt::default()).unwrap();
Rows {
schema: schema.schema,
schema: schema.column_schemas().unwrap(),
rows,
}
});

View File

@@ -57,7 +57,7 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows {
}
Rows {
schema: schema_info.schema.clone(),
schema: schema_info.column_schemas().unwrap(),
rows,
}
}

View File

@@ -9,7 +9,6 @@ workspace = true
[dependencies]
auth.workspace = true
catalog.workspace = true
clap.workspace = true
cli.workspace = true
common-base.workspace = true
@@ -18,7 +17,6 @@ common-meta.workspace = true
datanode.workspace = true
flow.workspace = true
frontend.workspace = true
meta-client.workspace = true
meta-srv.workspace = true
serde.workspace = true
snafu.workspace = true

View File

@@ -30,20 +30,3 @@ pub async fn setup_flownode_plugins(
pub async fn start_flownode_plugins(_plugins: Plugins) -> Result<()> {
Ok(())
}
pub mod context {
use std::sync::Arc;
use catalog::CatalogManagerRef;
use common_meta::FlownodeId;
use common_meta::kv_backend::KvBackendRef;
use flow::FrontendClient;
/// The context for `GrpcBuilderConfiguratorRef` in flownode.
pub struct GrpcConfigureContext {
pub kv_backend: KvBackendRef,
pub fe_client: Arc<FrontendClient>,
pub flownode_id: FlownodeId,
pub catalog_manager: CatalogManagerRef,
}
}

View File

@@ -40,25 +40,3 @@ pub async fn setup_frontend_plugins(
pub async fn start_frontend_plugins(_plugins: Plugins) -> Result<()> {
Ok(())
}
pub mod context {
use std::sync::Arc;
use flow::FrontendClient;
use meta_client::MetaClientRef;
/// The context for [`catalog::kvbackend::CatalogManagerConfiguratorRef`] in standalone or
/// distributed.
pub enum CatalogManagerConfigureContext {
Distributed(DistributedCatalogManagerConfigureContext),
Standalone(StandaloneCatalogManagerConfigureContext),
}
pub struct DistributedCatalogManagerConfigureContext {
pub meta_client: MetaClientRef,
}
pub struct StandaloneCatalogManagerConfigureContext {
pub fe_client: Arc<FrontendClient>,
}
}

View File

@@ -13,12 +13,12 @@
// limitations under the License.
mod cli;
pub mod datanode;
pub mod flownode;
pub mod frontend;
mod datanode;
mod flownode;
mod frontend;
mod meta_srv;
mod options;
pub mod standalone;
mod standalone;
pub use cli::SubCommand;
pub use datanode::{setup_datanode_plugins, start_datanode_plugins};

View File

@@ -33,18 +33,3 @@ pub async fn setup_standalone_plugins(
pub async fn start_standalone_plugins(_plugins: Plugins) -> Result<()> {
Ok(())
}
pub mod context {
use std::sync::Arc;
use catalog::CatalogManagerRef;
use common_meta::kv_backend::KvBackendRef;
use flow::FrontendClient;
/// The context for [`common_meta::ddl_manager::DdlManagerConfiguratorRef`] in standalone.
pub struct DdlManagerConfigureContext {
pub kv_backend: KvBackendRef,
pub fe_client: Arc<FrontendClient>,
pub catalog_manager: CatalogManagerRef,
}
}

View File

@@ -15,7 +15,6 @@
mod show_create_table;
use std::collections::HashMap;
use std::ops::ControlFlow;
use std::sync::Arc;
use catalog::CatalogManagerRef;
@@ -53,7 +52,7 @@ use regex::Regex;
use session::context::{Channel, QueryContextRef};
pub use show_create_table::create_table_stmt;
use snafu::{OptionExt, ResultExt, ensure};
use sql::ast::{Ident, visit_expressions_mut};
use sql::ast::Ident;
use sql::parser::ParserContext;
use sql::statements::OptionMap;
use sql::statements::create::{CreateDatabase, CreateFlow, CreateView, Partitions, SqlOrTql};
@@ -74,6 +73,7 @@ use crate::planner::DfLogicalPlanner;
const SCHEMAS_COLUMN: &str = "Database";
const OPTIONS_COLUMN: &str = "Options";
const TABLES_COLUMN: &str = "Tables";
const VIEWS_COLUMN: &str = "Views";
const FLOWS_COLUMN: &str = "Flows";
const FIELD_COLUMN: &str = "Field";
@@ -210,29 +210,6 @@ pub async fn show_databases(
.await
}
/// Replaces column identifier references in a SQL expression.
/// Used for backward compatibility where old column names should work with new ones.
fn replace_column_in_expr(expr: &mut sqlparser::ast::Expr, from_column: &str, to_column: &str) {
let _ = visit_expressions_mut(expr, |e| {
match e {
sqlparser::ast::Expr::Identifier(ident) => {
if ident.value.eq_ignore_ascii_case(from_column) {
ident.value = to_column.to_string();
}
}
sqlparser::ast::Expr::CompoundIdentifier(idents) => {
if let Some(last) = idents.last_mut()
&& last.value.eq_ignore_ascii_case(from_column)
{
last.value = to_column.to_string();
}
}
_ => {}
}
ControlFlow::<()>::Continue(())
});
}
/// Cast a `show` statement execution into a query from tables in `information_schema`.
/// - `table_name`: the table name in `information_schema`,
/// - `projects`: query projection, a list of `(column, renamed_column)`,
@@ -563,15 +540,15 @@ pub async fn show_tables(
query_ctx.current_schema()
};
// MySQL renames `table_name` to `Tables_in_{schema}` for protocol compatibility
let tables_column = format!("Tables_in_{}", schema_name);
// (dennis): MySQL rename `table_name` to `Tables_in_{schema}`, but we use `Tables` instead.
// I don't want to modify this currently, our dashboard may depend on it.
let projects = if stmt.full {
vec![
(tables::TABLE_NAME, tables_column.as_str()),
(tables::TABLE_NAME, TABLES_COLUMN),
(tables::TABLE_TYPE, TABLE_TYPE_COLUMN),
]
} else {
vec![(tables::TABLE_NAME, tables_column.as_str())]
vec![(tables::TABLE_NAME, TABLES_COLUMN)]
};
let filters = vec![
col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
@@ -580,16 +557,6 @@ pub async fn show_tables(
let like_field = Some(tables::TABLE_NAME);
let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
// Transform the WHERE clause for backward compatibility:
// Replace "Tables" with "Tables_in_{schema}" to support old queries
let kind = match stmt.kind {
ShowKind::Where(mut filter) => {
replace_column_in_expr(&mut filter, "Tables", &tables_column);
ShowKind::Where(filter)
}
other => other,
};
query_from_information_schema_table(
query_engine,
catalog_manager,
@@ -600,7 +567,7 @@ pub async fn show_tables(
filters,
like_field,
sort,
kind,
stmt.kind,
)
.await
}

View File

@@ -651,6 +651,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(transparent)]
GreptimeProto {
source: api::error::Error,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -777,6 +784,8 @@ impl ErrorExt for Error {
HandleOtelArrowRequest { .. } => StatusCode::Internal,
Cancelled { .. } => StatusCode::Cancelled,
GreptimeProto { source, .. } => source.status_code(),
}
}

View File

@@ -31,7 +31,7 @@ use axum_extra::TypedHeader;
use common_catalog::consts::default_engine;
use common_error::ext::{BoxedError, ErrorExt};
use common_query::{Output, OutputData};
use common_telemetry::{debug, error, warn};
use common_telemetry::{error, warn};
use headers::ContentType;
use lazy_static::lazy_static;
use mime_guess::mime;
@@ -738,11 +738,6 @@ pub async fn log_ingester(
let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
debug!(
"receiving logs: {:?}",
serde_json::to_string(&value).unwrap()
);
query_ctx.set_channel(Channel::Log);
let query_ctx = Arc::new(query_ctx);

View File

@@ -152,7 +152,7 @@ pub async fn loki_ingest(
rows.push(row);
}
let schemas = schema_info.schema;
let schemas = schema_info.column_schemas()?;
// fill Null for missing values
for row in rows.iter_mut() {
row.resize(schemas.len(), GreptimeValue::default());
@@ -746,13 +746,16 @@ fn process_labels(
} else {
// not exist
// add schema and append to values
schemas.push(ColumnSchema {
column_name: k.clone(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Tag.into(),
datatype_extension: None,
options: None,
});
schemas.push(
ColumnSchema {
column_name: k.clone(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Tag.into(),
datatype_extension: None,
options: None,
}
.into(),
);
column_indexer.insert(k, schemas.len() - 1);
row.push(GreptimeValue {

View File

@@ -37,8 +37,6 @@ static SHOW_LOWER_CASE_PATTERN: Lazy<Regex> =
Lazy::new(|| Regex::new("(?i)^(SHOW VARIABLES LIKE 'lower_case_table_names'(.*))").unwrap());
static SHOW_VARIABLES_LIKE_PATTERN: Lazy<Regex> =
Lazy::new(|| Regex::new("(?i)^(SHOW VARIABLES( LIKE (.*))?)").unwrap());
static SHOW_WARNINGS_PATTERN: Lazy<Regex> =
Lazy::new(|| Regex::new("(?i)^(/\\* ApplicationName=.*)?SHOW WARNINGS").unwrap());
// SELECT TIMEDIFF(NOW(), UTC_TIMESTAMP());
static SELECT_TIME_DIFF_FUNC_PATTERN: Lazy<Regex> =
@@ -87,6 +85,8 @@ static OTHER_NOT_SUPPORTED_STMT: Lazy<RegexSet> = Lazy::new(|| {
"(?i)^(/\\*!40101 SET(.*) \\*/)$",
// DBeaver.
"(?i)^(SHOW WARNINGS)",
"(?i)^(/\\* ApplicationName=(.*)SHOW WARNINGS)",
"(?i)^(/\\* ApplicationName=(.*)SHOW PLUGINS)",
"(?i)^(/\\* ApplicationName=(.*)SHOW ENGINES)",
"(?i)^(/\\* ApplicationName=(.*)SELECT @@(.*))",
@@ -252,47 +252,6 @@ fn check_show_variables(query: &str) -> Option<Output> {
recordbatches.map(Output::new_with_record_batches)
}
/// Build SHOW WARNINGS result from session's warnings
fn show_warnings(session: &SessionRef) -> RecordBatches {
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new("Level", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("Code", ConcreteDataType::uint16_datatype(), false),
ColumnSchema::new("Message", ConcreteDataType::string_datatype(), false),
]));
let warnings = session.warnings();
let count = warnings.len();
let columns = if count > 0 {
vec![
Arc::new(StringVector::from(vec!["Warning"; count])) as _,
Arc::new(datatypes::vectors::UInt16Vector::from(vec![
Some(1000u16);
count
])) as _,
Arc::new(StringVector::from(warnings)) as _,
]
} else {
vec![
Arc::new(StringVector::from(Vec::<String>::new())) as _,
Arc::new(datatypes::vectors::UInt16Vector::from(
Vec::<Option<u16>>::new(),
)) as _,
Arc::new(StringVector::from(Vec::<String>::new())) as _,
]
};
RecordBatches::try_from_columns(schema, columns).unwrap()
}
fn check_show_warnings(query: &str, session: &SessionRef) -> Option<Output> {
if SHOW_WARNINGS_PATTERN.is_match(query) {
Some(Output::new_with_record_batches(show_warnings(session)))
} else {
None
}
}
// Check for SET or others query, this is the final check of the federated query.
fn check_others(query: &str, _query_ctx: QueryContextRef) -> Option<Output> {
if OTHER_NOT_SUPPORTED_STMT.is_match(query.as_bytes()) {
@@ -315,7 +274,7 @@ fn check_others(query: &str, _query_ctx: QueryContextRef) -> Option<Output> {
pub(crate) fn check(
query: &str,
query_ctx: QueryContextRef,
session: SessionRef,
_session: SessionRef,
) -> Option<Output> {
// INSERT don't need MySQL federated check. We assume the query doesn't contain
// federated or driver setup command if it starts with a 'INSERT' statement.
@@ -328,8 +287,8 @@ pub(crate) fn check(
// First to check the query is like "select @@variables".
check_select_variable(query, query_ctx.clone())
// Then to check "show variables like ...".
.or_else(|| check_show_variables(query))
.or_else(|| check_show_warnings(query, &session))
// Last check
.or_else(|| check_others(query, query_ctx))
}
@@ -433,64 +392,4 @@ mod test {
+----------------------------------+";
test(query, expected);
}
#[test]
fn test_show_warnings() {
// Test SHOW WARNINGS with no warnings
let session = Arc::new(Session::new(None, Channel::Mysql, Default::default(), 0));
let output = check("SHOW WARNINGS", QueryContext::arc(), session.clone());
match output.unwrap().data {
OutputData::RecordBatches(r) => {
assert_eq!(r.iter().map(|b| b.num_rows()).sum::<usize>(), 0);
}
_ => unreachable!(),
}
// Test SHOW WARNINGS with a single warning
session.add_warning("Test warning message".to_string());
let output = check("SHOW WARNINGS", QueryContext::arc(), session.clone());
match output.unwrap().data {
OutputData::RecordBatches(r) => {
let expected = "\
+---------+------+----------------------+
| Level | Code | Message |
+---------+------+----------------------+
| Warning | 1000 | Test warning message |
+---------+------+----------------------+";
assert_eq!(&r.pretty_print().unwrap(), expected);
}
_ => unreachable!(),
}
// Test SHOW WARNINGS with multiple warnings
session.clear_warnings();
session.add_warning("First warning".to_string());
session.add_warning("Second warning".to_string());
let output = check("SHOW WARNINGS", QueryContext::arc(), session.clone());
match output.unwrap().data {
OutputData::RecordBatches(r) => {
let expected = "\
+---------+------+----------------+
| Level | Code | Message |
+---------+------+----------------+
| Warning | 1000 | First warning |
| Warning | 1000 | Second warning |
+---------+------+----------------+";
assert_eq!(&r.pretty_print().unwrap(), expected);
}
_ => unreachable!(),
}
// Test case insensitivity
let output = check("show warnings", QueryContext::arc(), session.clone());
assert!(output.is_some());
// Test with DBeaver-style comment prefix
let output = check(
"/* ApplicationName=DBeaver */SHOW WARNINGS",
QueryContext::arc(),
session.clone(),
);
assert!(output.is_some());
}
}

View File

@@ -475,8 +475,6 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
p: ParamParser<'a>,
w: QueryResultWriter<'a, W>,
) -> Result<()> {
self.session.clear_warnings();
let query_ctx = self.session.new_query_context();
let db = query_ctx.get_db_string();
let _timer = crate::metrics::METRIC_MYSQL_QUERY_TIMER
@@ -502,7 +500,7 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
}
};
writer::write_output(w, query_ctx, self.session.clone(), outputs).await?;
writer::write_output(w, query_ctx, outputs).await?;
Ok(())
}
@@ -527,12 +525,7 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
.with_label_values(&[crate::metrics::METRIC_MYSQL_TEXTQUERY, db.as_str()])
.start_timer();
// Clear warnings for non SHOW WARNINGS queries
let query_upcase = query.to_uppercase();
if !query_upcase.starts_with("SHOW WARNINGS") {
self.session.clear_warnings();
}
if query_upcase.starts_with("PREPARE ") {
match ParserContext::parse_mysql_prepare_stmt(query, query_ctx.sql_dialect()) {
Ok((stmt_name, stmt)) => {
@@ -541,8 +534,7 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
match prepare_results {
Ok(_) => {
let outputs = vec![Ok(Output::new_with_affected_rows(0))];
writer::write_output(writer, query_ctx, self.session.clone(), outputs)
.await?;
writer::write_output(writer, query_ctx, outputs).await?;
return Ok(());
}
Err(e) => {
@@ -578,8 +570,7 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
return Ok(());
}
};
writer::write_output(writer, query_ctx, self.session.clone(), outputs).await?;
writer::write_output(writer, query_ctx, outputs).await?;
return Ok(());
}
Err(e) => {
@@ -594,7 +585,7 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
Ok(stmt_name) => {
self.do_close(stmt_name);
let outputs = vec![Ok(Output::new_with_affected_rows(0))];
writer::write_output(writer, query_ctx, self.session.clone(), outputs).await?;
writer::write_output(writer, query_ctx, outputs).await?;
return Ok(());
}
Err(e) => {
@@ -607,8 +598,7 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
}
let outputs = self.do_query(query, query_ctx.clone()).await;
writer::write_output(writer, query_ctx, self.session.clone(), outputs).await?;
writer::write_output(writer, query_ctx, outputs).await?;
Ok(())
}

View File

@@ -36,7 +36,6 @@ use futures::StreamExt;
use opensrv_mysql::{
Column, ColumnFlags, ColumnType, ErrorKind, OkResponse, QueryResultWriter, RowWriter,
};
use session::SessionRef;
use session::context::QueryContextRef;
use snafu::prelude::*;
use tokio::io::AsyncWrite;
@@ -48,18 +47,9 @@ use crate::metrics::*;
pub async fn write_output<W: AsyncWrite + Send + Sync + Unpin>(
w: QueryResultWriter<'_, W>,
query_context: QueryContextRef,
session: SessionRef,
outputs: Vec<Result<Output>>,
) -> Result<()> {
if let Some(warning) = query_context.warning() {
session.add_warning(warning);
}
let mut writer = Some(MysqlResultWriter::new(
w,
query_context.clone(),
session.clone(),
));
let mut writer = Some(MysqlResultWriter::new(w, query_context.clone()));
for output in outputs {
let result_writer = writer.take().context(error::InternalSnafu {
err_msg: "Sending multiple result set is unsupported",
@@ -104,19 +94,16 @@ struct QueryResult {
pub struct MysqlResultWriter<'a, W: AsyncWrite + Unpin> {
writer: QueryResultWriter<'a, W>,
query_context: QueryContextRef,
session: SessionRef,
}
impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
pub fn new(
writer: QueryResultWriter<'a, W>,
query_context: QueryContextRef,
session: SessionRef,
) -> MysqlResultWriter<'a, W> {
MysqlResultWriter::<'a, W> {
writer,
query_context,
session,
}
}
@@ -144,12 +131,10 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
Self::write_query_result(query_result, self.writer, self.query_context).await?;
}
OutputData::AffectedRows(rows) => {
let next_writer =
Self::write_affected_rows(self.writer, rows, &self.session).await?;
let next_writer = Self::write_affected_rows(self.writer, rows).await?;
return Ok(Some(MysqlResultWriter::new(
next_writer,
self.query_context,
self.session,
)));
}
},
@@ -167,14 +152,10 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
async fn write_affected_rows(
w: QueryResultWriter<'a, W>,
rows: usize,
session: &SessionRef,
) -> Result<QueryResultWriter<'a, W>> {
let warnings = session.warnings_count() as u16;
let next_writer = w
.complete_one(OkResponse {
affected_rows: rows as u64,
warnings,
..Default::default()
})
.await?;

View File

@@ -381,6 +381,7 @@ fn extract_field_from_attr_and_combine_schema(
if let Some(index) = select_schema.index.get(key) {
let column_schema = &select_schema.schema[*index];
let column_schema: ColumnSchema = column_schema.clone().try_into()?;
// datatype of the same column name should be the same
ensure!(
column_schema.datatype == schema.datatype,
@@ -393,7 +394,7 @@ fn extract_field_from_attr_and_combine_schema(
);
extracted_values[*index] = value;
} else {
select_schema.schema.push(schema);
select_schema.schema.push(schema.into());
select_schema
.index
.insert(key.clone(), select_schema.schema.len() - 1);
@@ -480,7 +481,7 @@ fn parse_export_logs_service_request_to_rows(
let mut parse_ctx = ParseContext::new(select_info);
let mut rows = parse_resource(&mut parse_ctx, request.resource_logs)?;
schemas.extend(parse_ctx.select_schema.schema);
schemas.extend(parse_ctx.select_schema.column_schemas()?);
rows.iter_mut().for_each(|row| {
row.values.resize(schemas.len(), GreptimeValue::default());

View File

@@ -136,12 +136,18 @@ async fn run_custom_pipeline(
let mut schema_info = SchemaInfo::default();
schema_info
.schema
.push(time_index_column_schema(ts_name, timeunit));
.push(time_index_column_schema(ts_name, timeunit).into());
schema_info
}
};
let table = handler
.get_table(&table_name, query_ctx)
.await
.context(CatalogSnafu)?;
schema_info.set_table(table);
for pipeline_map in pipeline_maps {
let result = pipeline
.exec_mut(pipeline_map, pipeline_ctx, &mut schema_info)
@@ -186,7 +192,7 @@ async fn run_custom_pipeline(
RowInsertRequest {
rows: Some(Rows {
rows,
schema: schema_info.schema.clone(),
schema: schema_info.column_schemas()?,
}),
table_name,
},

View File

@@ -133,8 +133,6 @@ impl Drop for RequestMemoryGuard {
#[cfg(test)]
mod tests {
use tokio::sync::Barrier;
use super::*;
#[test]
@@ -190,33 +188,21 @@ mod tests {
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_limiter_concurrent() {
let limiter = RequestMemoryLimiter::new(1000);
let barrier = Arc::new(Barrier::new(11)); // 10 tasks + main
let mut handles = vec![];
// Spawn 10 tasks each trying to acquire 200 bytes
for _ in 0..10 {
let limiter_clone = limiter.clone();
let barrier_clone = barrier.clone();
let handle = tokio::spawn(async move {
barrier_clone.wait().await;
limiter_clone.try_acquire(200)
});
let handle = tokio::spawn(async move { limiter_clone.try_acquire(200) });
handles.push(handle);
}
// Let all tasks start together
barrier.wait().await;
let mut success_count = 0;
let mut fail_count = 0;
let mut guards = Vec::new();
for handle in handles {
match handle.await.unwrap() {
Ok(Some(guard)) => {
success_count += 1;
guards.push(guard);
}
Ok(Some(_)) => success_count += 1,
Err(_) => fail_count += 1,
Ok(None) => unreachable!(),
}
@@ -225,6 +211,5 @@ mod tests {
// Only 5 tasks should succeed (5 * 200 = 1000)
assert_eq!(success_count, 5);
assert_eq!(fail_count, 5);
drop(guards);
}
}

View File

@@ -18,7 +18,7 @@ pub mod protocol_ctx;
pub mod session_config;
pub mod table_name;
use std::collections::{HashMap, VecDeque};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
use std::time::Duration;
@@ -35,9 +35,6 @@ use derive_more::Debug;
use crate::context::{Channel, ConnInfo, QueryContextRef};
/// Maximum number of warnings to store per session (similar to MySQL's max_error_count)
const MAX_WARNINGS: usize = 64;
/// Session for persistent connection such as MySQL, PostgreSQL etc.
#[derive(Debug)]
pub struct Session {
@@ -61,8 +58,6 @@ pub(crate) struct MutableInner {
read_preference: ReadPreference,
#[debug(skip)]
pub(crate) cursors: HashMap<String, Arc<RecordBatchStreamCursor>>,
/// Warning messages for MySQL SHOW WARNINGS support
warnings: VecDeque<String>,
}
impl Default for MutableInner {
@@ -74,7 +69,6 @@ impl Default for MutableInner {
query_timeout: None,
read_preference: ReadPreference::Leader,
cursors: HashMap::with_capacity(0),
warnings: VecDeque::new(),
}
}
}
@@ -162,35 +156,4 @@ impl Session {
pub fn process_id(&self) -> u32 {
self.process_id
}
pub fn warnings_count(&self) -> usize {
self.mutable_inner.read().unwrap().warnings.len()
}
pub fn warnings(&self) -> Vec<String> {
self.mutable_inner
.read()
.unwrap()
.warnings
.iter()
.cloned()
.collect()
}
/// Add a warning message. If the limit is reached, discard the oldest warning.
pub fn add_warning(&self, warning: String) {
let mut inner = self.mutable_inner.write().unwrap();
if inner.warnings.len() >= MAX_WARNINGS {
inner.warnings.pop_front();
}
inner.warnings.push_back(warning);
}
pub fn clear_warnings(&self) {
let mut inner = self.mutable_inner.write().unwrap();
if inner.warnings.is_empty() {
return;
}
inner.warnings.clear();
}
}

View File

@@ -151,7 +151,6 @@ pub enum RegionRequest {
Truncate(RegionTruncateRequest),
Catchup(RegionCatchupRequest),
BulkInserts(RegionBulkInsertsRequest),
EnterStaging(EnterStagingRequest),
}
impl RegionRequest {
@@ -1417,17 +1416,6 @@ impl RegionBulkInsertsRequest {
}
}
/// Request to stage a region with a new region rule(partition expression).
///
/// This request transitions a region into the staging mode.
/// It first flushes the memtable for the old region rule if it is not empty,
/// then enters the staging mode with the new region rule.
#[derive(Debug, Clone)]
pub struct EnterStagingRequest {
/// The partition expression of the staging region.
pub partition_expr: String,
}
impl fmt::Display for RegionRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
@@ -1444,7 +1432,6 @@ impl fmt::Display for RegionRequest {
RegionRequest::Truncate(_) => write!(f, "Truncate"),
RegionRequest::Catchup(_) => write!(f, "Catchup"),
RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
RegionRequest::EnterStaging(_) => write!(f, "EnterStaging"),
}
}
}

View File

@@ -54,7 +54,6 @@ log-query = { workspace = true }
loki-proto.workspace = true
meta-client.workspace = true
meta-srv = { workspace = true, features = ["mock"] }
mito2.workspace = true
moka.workspace = true
mysql_async = { version = "0.35", default-features = false, features = [
"time",
@@ -106,6 +105,7 @@ paste.workspace = true
pipeline.workspace = true
prost.workspace = true
rand.workspace = true
regex.workspace = true
session = { workspace = true, features = ["testing"] }
store-api.workspace = true
tokio-postgres = { workspace = true }

View File

@@ -59,10 +59,8 @@ use hyper_util::rt::TokioIo;
use meta_client::client::MetaClientBuilder;
use meta_srv::cluster::MetaPeerClientRef;
use meta_srv::discovery;
use meta_srv::gc::GcSchedulerOptions;
use meta_srv::metasrv::{Metasrv, MetasrvOptions, SelectorRef};
use meta_srv::mocks::MockInfo;
use mito2::gc::GcConfig;
use object_store::config::ObjectStoreConfig;
use rand::Rng;
use servers::grpc::GrpcOptions;
@@ -105,8 +103,6 @@ pub struct GreptimeDbClusterBuilder {
datanodes: Option<u32>,
datanode_wal_config: DatanodeWalConfig,
metasrv_wal_config: MetasrvWalConfig,
datanode_gc_config: GcConfig,
metasrv_gc_config: GcSchedulerOptions,
shared_home_dir: Option<Arc<TempDir>>,
meta_selector: Option<SelectorRef>,
}
@@ -138,8 +134,6 @@ impl GreptimeDbClusterBuilder {
datanodes: None,
datanode_wal_config: DatanodeWalConfig::default(),
metasrv_wal_config: MetasrvWalConfig::default(),
datanode_gc_config: GcConfig::default(),
metasrv_gc_config: GcSchedulerOptions::default(),
shared_home_dir: None,
meta_selector: None,
}
@@ -175,17 +169,6 @@ impl GreptimeDbClusterBuilder {
self
}
#[must_use]
pub fn with_datanode_gc_config(mut self, datanode_gc_config: GcConfig) -> Self {
self.datanode_gc_config = datanode_gc_config;
self
}
pub fn with_metasrv_gc_config(mut self, metasrv_gc_config: GcSchedulerOptions) -> Self {
self.metasrv_gc_config = metasrv_gc_config;
self
}
#[must_use]
pub fn with_shared_home_dir(mut self, shared_home_dir: Arc<TempDir>) -> Self {
self.shared_home_dir = Some(shared_home_dir);
@@ -222,7 +205,6 @@ impl GreptimeDbClusterBuilder {
server_addr: "127.0.0.1:3002".to_string(),
..Default::default()
},
gc: self.metasrv_gc_config.clone(),
..Default::default()
};
@@ -297,7 +279,6 @@ impl GreptimeDbClusterBuilder {
vec![],
home_dir,
self.datanode_wal_config.clone(),
self.datanode_gc_config.clone(),
)
} else {
let (opts, guard) = create_tmp_dir_and_datanode_opts(
@@ -305,7 +286,6 @@ impl GreptimeDbClusterBuilder {
self.store_providers.clone().unwrap_or_default(),
&format!("{}-dn-{}", self.cluster_name, datanode_id),
self.datanode_wal_config.clone(),
self.datanode_gc_config.clone(),
);
guards.push(guard);

View File

@@ -309,7 +309,6 @@ impl GreptimeDbStandaloneBuilder {
store_types,
&self.instance_name,
self.datanode_wal_config.clone(),
Default::default(),
);
let kv_backend_config = KvBackendConfig::default();

View File

@@ -32,7 +32,6 @@ use common_wal::config::DatanodeWalConfig;
use datanode::config::{DatanodeOptions, StorageConfig};
use frontend::instance::Instance;
use frontend::service_config::{MysqlOptions, PostgresOptions};
use mito2::gc::GcConfig;
use object_store::config::{
AzblobConfig, FileConfig, GcsConfig, ObjectStoreConfig, OssConfig, S3Config,
};
@@ -146,7 +145,6 @@ fn s3_test_config() -> S3Config {
secret_access_key: env::var("GT_S3_ACCESS_KEY").unwrap().into(),
bucket: env::var("GT_S3_BUCKET").unwrap(),
region: Some(env::var("GT_S3_REGION").unwrap()),
endpoint: env::var("GT_S3_ENDPOINT_URL").ok(),
..Default::default()
},
..Default::default()
@@ -165,7 +163,7 @@ pub fn get_test_store_config(store_type: &StorageType) -> (ObjectStoreConfig, Te
scope: env::var("GT_GCS_SCOPE").unwrap(),
credential_path: env::var("GT_GCS_CREDENTIAL_PATH").unwrap().into(),
credential: env::var("GT_GCS_CREDENTIAL").unwrap().into(),
endpoint: env::var("GT_GCS_ENDPOINT").unwrap_or_default(),
endpoint: env::var("GT_GCS_ENDPOINT").unwrap(),
},
..Default::default()
};
@@ -299,7 +297,6 @@ pub fn create_tmp_dir_and_datanode_opts(
store_provider_types: Vec<StorageType>,
name: &str,
wal_config: DatanodeWalConfig,
gc_config: GcConfig,
) -> (DatanodeOptions, TestGuard) {
let home_tmp_dir = create_temp_dir(&format!("gt_data_{name}"));
let home_dir = home_tmp_dir.path().to_str().unwrap().to_string();
@@ -317,13 +314,7 @@ pub fn create_tmp_dir_and_datanode_opts(
store_providers.push(store);
storage_guards.push(StorageGuard(data_tmp_dir))
}
let opts = create_datanode_opts(
default_store,
store_providers,
home_dir,
wal_config,
gc_config,
);
let opts = create_datanode_opts(default_store, store_providers, home_dir, wal_config);
(
opts,
@@ -339,18 +330,7 @@ pub(crate) fn create_datanode_opts(
providers: Vec<ObjectStoreConfig>,
home_dir: String,
wal_config: DatanodeWalConfig,
gc_config: GcConfig,
) -> DatanodeOptions {
let region_engine = DatanodeOptions::default()
.region_engine
.into_iter()
.map(|mut v| {
if let datanode::config::RegionEngineConfig::Mito(mito_config) = &mut v {
mito_config.gc = gc_config.clone();
}
v
})
.collect();
DatanodeOptions {
node_id: Some(0),
require_lease_before_startup: true,
@@ -363,7 +343,6 @@ pub(crate) fn create_datanode_opts(
.with_bind_addr(PEER_PLACEHOLDER_ADDR)
.with_server_addr(PEER_PLACEHOLDER_ADDR),
wal: wal_config,
region_engine,
..Default::default()
}
}

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod gc;
mod instance_kafka_wal_test;
mod instance_noop_wal_test;
mod instance_test;

View File

@@ -1,262 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::time::Duration;
use common_meta::key::TableMetadataManagerRef;
use common_procedure::ProcedureWithId;
use common_telemetry::info;
use common_test_util::recordbatch::check_output_stream;
use futures::TryStreamExt as _;
use itertools::Itertools;
use meta_srv::gc::{BatchGcProcedure, GcSchedulerOptions, Region2Peers};
use mito2::gc::GcConfig;
use store_api::storage::RegionId;
use table::metadata::TableId;
use crate::cluster::GreptimeDbClusterBuilder;
use crate::test_util::{StorageType, TempDirGuard, get_test_store_config};
use crate::tests::test_util::{MockInstanceBuilder, TestContext, execute_sql, wait_procedure};
/// Helper function to get table route information for GC procedure
async fn get_table_route(
table_metadata_manager: &TableMetadataManagerRef,
table_id: TableId,
) -> (Region2Peers, Vec<RegionId>) {
// Get physical table route
let (_, physical_table_route) = table_metadata_manager
.table_route_manager()
.get_physical_table_route(table_id)
.await
.unwrap();
let mut region_routes = Region2Peers::new();
let mut regions = Vec::new();
// Convert region routes to Region2Peers format
for region_route in physical_table_route.region_routes {
let region_id = region_route.region.id;
let leader_peer = region_route.leader_peer.clone().unwrap();
let follower_peers = region_route.follower_peers.clone();
region_routes.insert(region_id, (leader_peer, follower_peers));
regions.push(region_id);
}
(region_routes, regions)
}
/// Helper function to list all SST files
async fn list_sst_files(test_context: &TestContext) -> HashSet<String> {
let mut sst_files = HashSet::new();
for datanode in test_context.datanodes().values() {
let region_server = datanode.region_server();
let mito = region_server.mito_engine().unwrap();
let all_files = mito
.all_ssts_from_storage()
.try_collect::<Vec<_>>()
.await
.unwrap()
.into_iter()
.map(|e| e.file_path)
.collect_vec();
sst_files.extend(all_files);
}
sst_files
}
async fn distributed_with_gc(store_type: &StorageType) -> (TestContext, TempDirGuard) {
common_telemetry::init_default_ut_logging();
let test_name = uuid::Uuid::new_v4().to_string();
let (store_config, guard) = get_test_store_config(store_type);
let builder = GreptimeDbClusterBuilder::new(&test_name)
.await
.with_metasrv_gc_config(GcSchedulerOptions {
enable: true,
..Default::default()
})
.with_datanode_gc_config(GcConfig {
enable: true,
// set lingering time to zero for test speedup
lingering_time: Some(Duration::ZERO),
..Default::default()
})
.with_store_config(store_config);
(
TestContext::new(MockInstanceBuilder::Distributed(builder)).await,
guard,
)
}
#[tokio::test]
async fn test_gc_basic_different_store() {
common_telemetry::init_default_ut_logging();
let store_type = StorageType::build_storage_types_based_on_env();
for store in store_type {
if store == StorageType::File {
continue; // no point in test gc in fs storage
}
info!("Running GC test with storage type: {}", store);
test_gc_basic(&store).await;
}
}
async fn test_gc_basic(store_type: &StorageType) {
let (test_context, _guard) = distributed_with_gc(store_type).await;
let instance = test_context.frontend();
let metasrv = test_context.metasrv();
// Step 1: Create table with append_mode to easily generate multiple files
let create_table_sql = r#"
CREATE TABLE test_gc_table (
ts TIMESTAMP TIME INDEX,
val DOUBLE,
host STRING
) WITH (append_mode = 'true')
"#;
execute_sql(&instance, create_table_sql).await;
// Step 2: Generate SST files by inserting data and flushing multiple times
for i in 0..4 {
let insert_sql = format!(
r#"
INSERT INTO test_gc_table (ts, val, host) VALUES
('2023-01-0{} 10:00:00', {}, 'host{}'),
('2023-01-0{} 11:00:00', {}, 'host{}'),
('2023-01-0{} 12:00:00', {}, 'host{}')
"#,
i + 1,
10.0 + i as f64,
i,
i + 1,
20.0 + i as f64,
i,
i + 1,
30.0 + i as f64,
i
);
execute_sql(&instance, &insert_sql).await;
// Flush the table to create SST files
let flush_sql = "ADMIN FLUSH_TABLE('test_gc_table')";
execute_sql(&instance, flush_sql).await;
}
// Step 3: Get table information
let table = instance
.catalog_manager()
.table("greptime", "public", "test_gc_table", None)
.await
.unwrap()
.unwrap();
let table_id = table.table_info().table_id();
// List SST files before compaction (for verification)
let sst_files_before_compaction = list_sst_files(&test_context).await;
info!(
"SST files before compaction: {:?}",
sst_files_before_compaction
);
assert_eq!(sst_files_before_compaction.len(), 4); // 4 files from 4 flushes
// Step 4: Trigger compaction to create garbage SST files
let compact_sql = "ADMIN COMPACT_TABLE('test_gc_table')";
execute_sql(&instance, compact_sql).await;
// Wait for compaction to complete
tokio::time::sleep(Duration::from_secs(2)).await;
// List SST files after compaction (should have both old and new files)
let sst_files_after_compaction = list_sst_files(&test_context).await;
info!(
"SST files after compaction: {:?}",
sst_files_after_compaction
);
assert_eq!(sst_files_after_compaction.len(), 5); // 4 old + 1 new
// Step 5: Get table route information for GC procedure
let (region_routes, regions) =
get_table_route(metasrv.table_metadata_manager(), table_id).await;
// Step 6: Create and execute BatchGcProcedure
let procedure = BatchGcProcedure::new(
metasrv.mailbox().clone(),
metasrv.options().grpc.server_addr.clone(),
regions.clone(),
false, // full_file_listing
region_routes,
HashMap::new(), // related_regions (empty for this simple test)
Duration::from_secs(10), // timeout
);
// Submit the procedure to the procedure manager
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let procedure_id = procedure_with_id.id;
let _watcher = metasrv
.procedure_manager()
.submit(procedure_with_id)
.await
.unwrap();
// Wait for the procedure to complete
wait_procedure(metasrv.procedure_manager(), procedure_id).await;
// Step 7: Verify GC results
let sst_files_after_gc = list_sst_files(&test_context).await;
info!("SST files after GC: {:?}", sst_files_after_gc);
assert_eq!(sst_files_after_gc.len(), 1); // Only the compacted file should remain after gc
// Verify that data is still accessible
let count_sql = "SELECT COUNT(*) FROM test_gc_table";
let count_output = execute_sql(&instance, count_sql).await;
let expected = r#"
+----------+
| count(*) |
+----------+
| 12 |
+----------+"#
.trim();
check_output_stream(count_output.data, expected).await;
let select_sql = "SELECT * FROM test_gc_table ORDER BY ts";
let select_output = execute_sql(&instance, select_sql).await;
let expected = r#"
+---------------------+------+-------+
| ts | val | host |
+---------------------+------+-------+
| 2023-01-01T10:00:00 | 10.0 | host0 |
| 2023-01-01T11:00:00 | 20.0 | host0 |
| 2023-01-01T12:00:00 | 30.0 | host0 |
| 2023-01-02T10:00:00 | 11.0 | host1 |
| 2023-01-02T11:00:00 | 21.0 | host1 |
| 2023-01-02T12:00:00 | 31.0 | host1 |
| 2023-01-03T10:00:00 | 12.0 | host2 |
| 2023-01-03T11:00:00 | 22.0 | host2 |
| 2023-01-03T12:00:00 | 32.0 | host2 |
| 2023-01-04T10:00:00 | 13.0 | host3 |
| 2023-01-04T11:00:00 | 23.0 | host3 |
| 2023-01-04T12:00:00 | 33.0 | host3 |
+---------------------+------+-------+"#
.trim();
check_output_stream(select_output.data, expected).await;
// TODO: Add more specific assertions once we have proper file system access
// For now, the test passes if the procedure executes without errors
info!("GC test completed successfully");
}

View File

@@ -19,7 +19,7 @@ use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use crate::cluster::GreptimeDbClusterBuilder;
use crate::tests::test_util::{
MockInstanceBuilder, RebuildableMockInstance, TestContext, execute_sql,
MockInstance, MockInstanceBuilder, RebuildableMockInstance, TestContext, execute_sql,
};
pub(crate) async fn distributed_with_noop_wal() -> TestContext {

View File

@@ -478,11 +478,11 @@ async fn test_execute_show_databases_tables(instance: Arc<dyn MockInstance>) {
check_unordered_output_stream(output, expected).await;
let expected = "\
+------------------+
| Tables_in_public |
+------------------+
| numbers |
+------------------+\
+---------+
| Tables |
+---------+
| numbers |
+---------+\
";
let output = execute_sql(&instance, "show tables").await;
check_unordered_output_stream(output, expected).await;
@@ -494,23 +494,23 @@ async fn test_execute_show_databases_tables(instance: Arc<dyn MockInstance>) {
let output = execute_sql(&instance, "show tables").await;
let expected = "\
+------------------+
| Tables_in_public |
+------------------+
| demo |
| numbers |
+------------------+\
+---------+
| Tables |
+---------+
| demo |
| numbers |
+---------+\
";
check_unordered_output_stream(output, expected).await;
let output = execute_sql(&instance, "SHOW FULL TABLES WHERE Table_type != 'VIEW'").await;
let expected = "\
+------------------+-----------------+
| Tables_in_public | Table_type |
+------------------+-----------------+
| demo | BASE TABLE |
| numbers | LOCAL TEMPORARY |
+------------------+-----------------+\
+---------+-----------------+
| Tables | Table_type |
+---------+-----------------+
| demo | BASE TABLE |
| numbers | LOCAL TEMPORARY |
+---------+-----------------+\
";
check_unordered_output_stream(output, expected).await;
@@ -520,22 +520,22 @@ async fn test_execute_show_databases_tables(instance: Arc<dyn MockInstance>) {
)
.await;
let expected = "\
+------------------+------------+
| Tables_in_public | Table_type |
+------------------+------------+
| demo | BASE TABLE |
+------------------+------------+\
+--------+------------+
| Tables | Table_type |
+--------+------------+
| demo | BASE TABLE |
+--------+------------+\
";
check_unordered_output_stream(output, expected).await;
// show tables like [string]
let output = execute_sql(&instance, "show tables like 'de%'").await;
let expected = "\
+------------------+
| Tables_in_public |
+------------------+
| demo |
+------------------+\
+--------+
| Tables |
+--------+
| demo |
+--------+\
";
check_unordered_output_stream(output, expected).await;
}
@@ -1252,11 +1252,11 @@ async fn test_rename_table(instance: Arc<dyn MockInstance>) {
.await
.data;
let expect = "\
+--------------+
| Tables_in_db |
+--------------+
| test_table |
+--------------+";
+------------+
| Tables |
+------------+
| test_table |
+------------+";
check_output_stream(output, expect).await;
let output = execute_sql_with(
@@ -1323,12 +1323,12 @@ async fn test_create_table_after_rename_table(instance: Arc<dyn MockInstance>) {
assert!(matches!(output, OutputData::AffectedRows(0)));
let expect = "\
+--------------+
| Tables_in_db |
+--------------+
| demo |
| test_table |
+--------------+";
+------------+
| Tables |
+------------+
| demo |
| test_table |
+------------+";
let output = execute_sql_with(&instance, "show tables", query_ctx)
.await
.data;
@@ -1516,11 +1516,11 @@ async fn test_use_database(instance: Arc<dyn MockInstance>) {
.await
.data;
let expected = "\
+---------------+
| Tables_in_db1 |
+---------------+
| tb1 |
+---------------+";
+--------+
| Tables |
+--------+
| tb1 |
+--------+";
check_output_stream(output, expected).await;
let output = execute_sql_with(

View File

@@ -24,8 +24,8 @@ use table::table_reference::TableReference;
use crate::cluster::GreptimeDbClusterBuilder;
use crate::tests::test_util::{
MockInstanceBuilder, RebuildableMockInstance, TestContext, dump_kvbackend, execute_sql,
restore_kvbackend, try_execute_sql, wait_procedure,
MockInstance, MockInstanceBuilder, RebuildableMockInstance, TestContext, dump_kvbackend,
execute_sql, restore_kvbackend, try_execute_sql, wait_procedure,
};
const CREATE_MONITOR_TABLE_SQL: &str = r#"
@@ -409,11 +409,11 @@ async fn test_recover_metadata_failed() {
// Only grpc_latencies table is visible.
let output = execute_sql(&test_context.frontend(), "show tables;").await;
let expected = r#"+------------------+
| Tables_in_public |
+------------------+
| numbers |
+------------------+"#;
let expected = r#"+---------+
| Tables |
+---------+
| numbers |
+---------+"#;
check_output_stream(output.data, expected).await;
// Expect table creation to fail because the region directory already exists.
@@ -474,12 +474,12 @@ async fn test_dropped_table() {
test_context.rebuild().await;
let output = execute_sql(&test_context.frontend(), "show tables;").await;
let expected = r#"+------------------+
| Tables_in_public |
+------------------+
| grpc_latencies |
| numbers |
+------------------+"#;
let expected = r#"+----------------+
| Tables |
+----------------+
| grpc_latencies |
| numbers |
+----------------+"#;
check_output_stream(output.data, expected).await;
// We can't query the table because the table is dropped.
@@ -531,12 +531,12 @@ async fn test_renamed_table() {
check_output_stream(output.data, expected).await;
let output = execute_sql(&test_context.frontend(), "show tables;").await;
let expected = r#"+------------------+
| Tables_in_public |
+------------------+
| grpc_latencies |
| numbers |
+------------------+"#;
let expected = r#"+----------------+
| Tables |
+----------------+
| grpc_latencies |
| numbers |
+----------------+"#;
check_output_stream(output.data, expected).await;
}

View File

@@ -12,13 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::env;
use std::sync::Arc;
use async_trait::async_trait;
use client::OutputData;
use common_meta::DatanodeId;
use common_meta::kv_backend::KvBackendRef;
use common_meta::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
use common_meta::rpc::KeyValue;
@@ -32,7 +30,6 @@ use common_test_util::find_workspace_path;
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use datanode::datanode::Datanode;
use frontend::error::Result;
use frontend::instance::Instance;
use futures::TryStreamExt;
@@ -98,13 +95,6 @@ impl MockInstanceImpl {
MockInstanceImpl::Distributed(instance) => &instance.metasrv,
}
}
pub(crate) fn datanodes(&self) -> &HashMap<DatanodeId, Datanode> {
match self {
MockInstanceImpl::Standalone(_) => unreachable!(),
MockInstanceImpl::Distributed(instance) => &instance.datanode_instances,
}
}
}
impl MockInstance for MockInstanceImpl {
@@ -195,14 +185,6 @@ impl TestContext {
pub(crate) fn metasrv(&self) -> &Arc<Metasrv> {
self.instance.as_ref().unwrap().metasrv()
}
pub(crate) fn frontend(&self) -> Arc<Instance> {
self.instance.as_ref().unwrap().frontend()
}
pub(crate) fn datanodes(&self) -> &HashMap<DatanodeId, Datanode> {
self.instance.as_ref().unwrap().datanodes()
}
}
#[async_trait::async_trait]

View File

@@ -42,6 +42,7 @@ use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequ
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use pipeline::GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME;
use prost::Message;
use regex::Regex;
use serde_json::{Value, json};
use servers::http::GreptimeQueryOutput;
use servers::http::handler::HealthResponse;
@@ -126,6 +127,7 @@ macro_rules! http_tests {
test_pipeline_skip_error,
test_pipeline_filter,
test_pipeline_create_table,
test_pipeline_ingest_jsonbench_data,
test_otlp_metrics_new,
test_otlp_traces_v0,
@@ -2701,6 +2703,95 @@ transform:
guard.remove_all().await;
}
pub async fn test_pipeline_ingest_jsonbench_data(store_type: StorageType) {
let (app, mut guard) =
setup_test_http_app_with_frontend(store_type, "test_pipeline_ingest_jsonbench_data").await;
let client = TestClient::new(app).await;
// Create the pipeline for ingesting jsonbench data.
let pipeline = r#"
version: 2
processors:
- json_parse:
fields:
- message, log
ignore_missing: true
- simple_extract:
fields:
- log, time_us
key: "time_us"
ignore_missing: false
- epoch:
fields:
- time_us
resolution: microsecond
- select:
fields:
- time_us
- log
transform:
- fields:
- time_us
type: epoch, us
index: timestamp
"#;
let response = client
.post("/v1/pipelines/jsonbench")
.header("Content-Type", "application/x-yaml")
.body(pipeline)
.send()
.await;
assert_eq!(response.status(), StatusCode::OK);
let pattern =
r#"\{"pipelines":\[\{"name":"jsonbench","version":"[^"]*"}],"execution_time_ms":\d+}"#
.parse::<Regex>()
.unwrap();
assert!(pattern.is_match(&response.text().await));
// Create the table for storing jsonbench data.
let sql = r#"
CREATE TABLE jsonbench(time_us TimestampMicrosecond TIME INDEX, `log` Json())
"#;
let response = client
.post("/v1/sql")
.header("Content-Type", "application/x-www-form-urlencoded")
.body(format!("sql={sql}"))
.send()
.await;
assert_eq!(response.status(), StatusCode::OK);
let pattern = r#"\{"output":\[\{"affectedrows":0}],"execution_time_ms":\d+}"#
.parse::<Regex>()
.unwrap();
assert!(pattern.is_match(&response.text().await));
// Start ingesting jsonbench data.
// The input file only contains head 100 lines of the whole jsonbench test dataset.
let path = common_test_util::find_workspace_path(
"/tests-integration/resources/jsonbench-head-100.ndjson",
);
// Jsonbench data do contain some malformed jsons that are meant to skip inserting.
let skip_error = true;
let response = client
.post(&format!(
"/v1/ingest?table=jsonbench&pipeline_name=jsonbench&skip_error={skip_error}"
))
.header("Content-Type", "text/plain")
.body(std::fs::read(path).unwrap())
.send()
.await;
assert_eq!(response.status(), StatusCode::OK);
// Note that this patten also matches the inserted rows: "74".
let pattern = r#"\{"output":\[\{"affectedrows":74}],"execution_time_ms":\d+}"#
.parse::<Regex>()
.unwrap();
assert!(pattern.is_match(&response.text().await));
guard.remove_all().await;
}
pub async fn test_pipeline_dispatcher(storage_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =

View File

@@ -113,7 +113,7 @@ Error: 3001(EngineExecuteQuery), Invalid query: TQL query only supports one f64
SHOW TABLES;
+------------------------+
| Tables_in_public |
| Tables |
+------------------------+
| http_requests_two_vals |
| numbers |

View File

@@ -4,12 +4,12 @@ Affected Rows: 0
SHOW TABLES;
+------------------+
| Tables_in_public |
+------------------+
| numbers |
| phy |
+------------------+
+---------+
| Tables |
+---------+
| numbers |
| phy |
+---------+
DESC TABLE phy;

View File

@@ -45,19 +45,19 @@ Affected Rows: 0
SHOW TABLES FROM test_public_schema;
+------------------------------+
| Tables_in_test_public_schema |
+------------------------------+
| hello |
+------------------------------+
+--------+
| Tables |
+--------+
| hello |
+--------+
SHOW TABLES FROM public;
+------------------+
| Tables_in_public |
+------------------+
| numbers |
+------------------+
+---------+
| Tables |
+---------+
| numbers |
+---------+
INSERT INTO hello VALUES (2), (3), (4);
@@ -75,19 +75,19 @@ SELECT * FROM hello;
SHOW TABLES;
+------------------------------+
| Tables_in_test_public_schema |
+------------------------------+
| hello |
+------------------------------+
+--------+
| Tables |
+--------+
| hello |
+--------+
SHOW FULL TABLES WHERE Table_type != 'VIEW';
+------------------------------+------------+
| Tables_in_test_public_schema | Table_type |
+------------------------------+------------+
| hello | BASE TABLE |
+------------------------------+------------+
+--------+------------+
| Tables | Table_type |
+--------+------------+
| hello | BASE TABLE |
+--------+------------+
DROP TABLE hello;
@@ -104,19 +104,19 @@ SHOW TABLES FROM test_public_schema;
SHOW TABLES FROM public;
+------------------+
| Tables_in_public |
+------------------+
| numbers |
+------------------+
+---------+
| Tables |
+---------+
| numbers |
+---------+
SHOW TABLES FROM public WHERE Tables = 'numbers';
+------------------+
| Tables_in_public |
+------------------+
| numbers |
+------------------+
+---------+
| Tables |
+---------+
| numbers |
+---------+
DROP SCHEMA test_public_schema;

View File

@@ -4,12 +4,12 @@ Affected Rows: 0
SHOW TABLES;
+------------------+
| Tables_in_public |
+------------------+
| numbers |
| phy |
+------------------+
+---------+
| Tables |
+---------+
| numbers |
| phy |
+---------+
DESC TABLE phy;

View File

@@ -40,12 +40,12 @@ Error: 4001(TableNotFound), Table not found: greptime.public.bar
SHOW TABLES;
+------------------+
| Tables_in_public |
+------------------+
| foo |
| numbers |
+------------------+
+---------+
| Tables |
+---------+
| foo |
| numbers |
+---------+
DROP TABLE IF EXISTS foo, bar;

View File

@@ -991,11 +991,11 @@ ADMIN FLUSH_FLOW('temp_monitoring');
-- This table should not exist yet
SHOW TABLES LIKE 'temp_alerts';
+------------------+
| Tables_in_public |
+------------------+
| temp_alerts |
+------------------+
+-------------+
| Tables |
+-------------+
| temp_alerts |
+-------------+
INSERT INTO
temp_sensor_data
@@ -1015,11 +1015,11 @@ ADMIN FLUSH_FLOW('temp_monitoring');
SHOW TABLES LIKE 'temp_alerts';
+------------------+
| Tables_in_public |
+------------------+
| temp_alerts |
+------------------+
+-------------+
| Tables |
+-------------+
| temp_alerts |
+-------------+
SELECT
sensor_id,

View File

@@ -96,133 +96,6 @@ SELECT LAST_VALUE('a');
| a |
+-----------------------+
-- MySQL-compatible IF function tests
SELECT IF(true, 'yes', 'no');
+------------------------------------------+
| if(Boolean(true),Utf8("yes"),Utf8("no")) |
+------------------------------------------+
| yes |
+------------------------------------------+
SELECT IF(false, 'yes', 'no');
+-------------------------------------------+
| if(Boolean(false),Utf8("yes"),Utf8("no")) |
+-------------------------------------------+
| no |
+-------------------------------------------+
SELECT IF(NULL, 'yes', 'no');
+---------------------------------+
| if(NULL,Utf8("yes"),Utf8("no")) |
+---------------------------------+
| no |
+---------------------------------+
SELECT IF(1, 'yes', 'no');
+-------------------------------------+
| if(Int64(1),Utf8("yes"),Utf8("no")) |
+-------------------------------------+
| yes |
+-------------------------------------+
SELECT IF(0, 'yes', 'no');
+-------------------------------------+
| if(Int64(0),Utf8("yes"),Utf8("no")) |
+-------------------------------------+
| no |
+-------------------------------------+
SELECT IF(-1, 'yes', 'no');
+--------------------------------------+
| if(Int64(-1),Utf8("yes"),Utf8("no")) |
+--------------------------------------+
| yes |
+--------------------------------------+
SELECT IF(1.5, 'yes', 'no');
+-----------------------------------------+
| if(Float64(1.5),Utf8("yes"),Utf8("no")) |
+-----------------------------------------+
| yes |
+-----------------------------------------+
SELECT IF(0.0, 'yes', 'no');
+---------------------------------------+
| if(Float64(0),Utf8("yes"),Utf8("no")) |
+---------------------------------------+
| no |
+---------------------------------------+
-- Test with table column
SELECT IF(a > 1, 'greater', 'not greater') FROM t;
+--------------------------------------------------------+
| if(t.a > Int64(1),Utf8("greater"),Utf8("not greater")) |
+--------------------------------------------------------+
| not greater |
| not greater |
| greater |
+--------------------------------------------------------+
-- Test numeric return types
SELECT IF(true, 100, 200);
+-----------------------------------------+
| if(Boolean(true),Int64(100),Int64(200)) |
+-----------------------------------------+
| 100 |
+-----------------------------------------+
SELECT IF(false, 100, 200);
+------------------------------------------+
| if(Boolean(false),Int64(100),Int64(200)) |
+------------------------------------------+
| 200 |
+------------------------------------------+
-- Test with IFNULL (should already work via DataFusion)
SELECT IFNULL(NULL, 'default');
+------------------------------+
| ifnull(NULL,Utf8("default")) |
+------------------------------+
| default |
+------------------------------+
SELECT IFNULL('value', 'default');
+---------------------------------------+
| ifnull(Utf8("value"),Utf8("default")) |
+---------------------------------------+
| value |
+---------------------------------------+
-- Test COALESCE (should already work via DataFusion)
SELECT COALESCE(NULL, NULL, 'third');
+-----------------------------------+
| coalesce(NULL,NULL,Utf8("third")) |
+-----------------------------------+
| third |
+-----------------------------------+
SELECT COALESCE('first', 'second');
+----------------------------------------+
| coalesce(Utf8("first"),Utf8("second")) |
+----------------------------------------+
| first |
+----------------------------------------+
DROP TABLE t;
Affected Rows: 0

View File

@@ -24,39 +24,4 @@ SELECT LAST_VALUE(1);
SELECT LAST_VALUE('a');
-- MySQL-compatible IF function tests
SELECT IF(true, 'yes', 'no');
SELECT IF(false, 'yes', 'no');
SELECT IF(NULL, 'yes', 'no');
SELECT IF(1, 'yes', 'no');
SELECT IF(0, 'yes', 'no');
SELECT IF(-1, 'yes', 'no');
SELECT IF(1.5, 'yes', 'no');
SELECT IF(0.0, 'yes', 'no');
-- Test with table column
SELECT IF(a > 1, 'greater', 'not greater') FROM t;
-- Test numeric return types
SELECT IF(true, 100, 200);
SELECT IF(false, 100, 200);
-- Test with IFNULL (should already work via DataFusion)
SELECT IFNULL(NULL, 'default');
SELECT IFNULL('value', 'default');
-- Test COALESCE (should already work via DataFusion)
SELECT COALESCE(NULL, NULL, 'third');
SELECT COALESCE('first', 'second');
DROP TABLE t;

View File

@@ -27,78 +27,3 @@ SHOW DATABASES;
| public |
+--------------------+
-- ======================================================
-- MySQL compatibility tests for JDBC connectors
-- ======================================================
-- Test MySQL IF() function (issue #7278 compatibility)
-- SQLNESS PROTOCOL MYSQL
SELECT IF(1, 'yes', 'no') as result;
+--------+
| result |
+--------+
| yes |
+--------+
-- SQLNESS PROTOCOL MYSQL
SELECT IF(0, 'yes', 'no') as result;
+--------+
| result |
+--------+
| no |
+--------+
-- SQLNESS PROTOCOL MYSQL
SELECT IF(NULL, 'yes', 'no') as result;
+--------+
| result |
+--------+
| no |
+--------+
-- Test IFNULL (should work via DataFusion)
-- SQLNESS PROTOCOL MYSQL
SELECT IFNULL(NULL, 'default') as result;
+---------+
| result |
+---------+
| default |
+---------+
-- SQLNESS PROTOCOL MYSQL
SELECT IFNULL('value', 'default') as result;
+--------+
| result |
+--------+
| value |
+--------+
-- Test COALESCE
-- SQLNESS PROTOCOL MYSQL
SELECT COALESCE(NULL, NULL, 'third') as result;
+--------+
| result |
+--------+
| third |
+--------+
-- Verify SHOW TABLES column naming
-- SQLNESS PROTOCOL MYSQL
USE public;
affected_rows: 0
-- SQLNESS PROTOCOL MYSQL
SHOW TABLES;
+------------------+
| Tables_in_public |
+------------------+
| numbers |
+------------------+

View File

@@ -6,35 +6,3 @@ SELECT @@version_comment;
-- SQLNESS PROTOCOL MYSQL
SHOW DATABASES;
-- ======================================================
-- MySQL compatibility tests for JDBC connectors
-- ======================================================
-- Test MySQL IF() function (issue #7278 compatibility)
-- SQLNESS PROTOCOL MYSQL
SELECT IF(1, 'yes', 'no') as result;
-- SQLNESS PROTOCOL MYSQL
SELECT IF(0, 'yes', 'no') as result;
-- SQLNESS PROTOCOL MYSQL
SELECT IF(NULL, 'yes', 'no') as result;
-- Test IFNULL (should work via DataFusion)
-- SQLNESS PROTOCOL MYSQL
SELECT IFNULL(NULL, 'default') as result;
-- SQLNESS PROTOCOL MYSQL
SELECT IFNULL('value', 'default') as result;
-- Test COALESCE
-- SQLNESS PROTOCOL MYSQL
SELECT COALESCE(NULL, NULL, 'third') as result;
-- Verify SHOW TABLES column naming
-- SQLNESS PROTOCOL MYSQL
USE public;
-- SQLNESS PROTOCOL MYSQL
SHOW TABLES;

View File

@@ -12,15 +12,15 @@ PARTITION ON COLUMNS (a) (
Affected Rows: 0
-- SQLNESS REPLACE (\d{13}) ID
SELECT table_catalog, table_schema, table_name, partition_name, partition_expression, partition_description, greptime_partition_id from information_schema.partitions WHERE table_name = 'my_table' ORDER BY table_catalog, table_schema, table_name, partition_name;
SELECT table_catalog, table_schema, table_name, partition_name, partition_expression, greptime_partition_id from information_schema.partitions WHERE table_name = 'my_table' ORDER BY table_catalog, table_schema, table_name, partition_name;
+---------------+--------------+------------+----------------+----------------------+------------------------+-----------------------+
| table_catalog | table_schema | table_name | partition_name | partition_expression | partition_description | greptime_partition_id |
+---------------+--------------+------------+----------------+----------------------+------------------------+-----------------------+
| greptime | public | my_table | p0 | a | a < 1000 | ID |
| greptime | public | my_table | p1 | a | a >= 1000 AND a < 2000 | ID |
| greptime | public | my_table | p2 | a | a >= 2000 | ID |
+---------------+--------------+------------+----------------+----------------------+------------------------+-----------------------+
+---------------+--------------+------------+----------------+------------------------+-----------------------+
| table_catalog | table_schema | table_name | partition_name | partition_expression | greptime_partition_id |
+---------------+--------------+------------+----------------+------------------------+-----------------------+
| greptime | public | my_table | p0 | a < 1000 | ID |
| greptime | public | my_table | p1 | a >= 1000 AND a < 2000 | ID |
| greptime | public | my_table | p2 | a >= 2000 | ID |
+---------------+--------------+------------+----------------+------------------------+-----------------------+
-- SQLNESS REPLACE (\d{13}) REGION_ID
-- SQLNESS REPLACE (\d{1}) PEER_ID
@@ -126,7 +126,7 @@ SELECT table_catalog, table_schema, table_name, partition_name, partition_expres
+---------------+--------------+------------+----------------+----------------------+-----------------------+
| table_catalog | table_schema | table_name | partition_name | partition_expression | greptime_partition_id |
+---------------+--------------+------------+----------------+----------------------+-----------------------+
| greptime | public | my_table | p0 | a | ID |
| greptime | public | my_table | p0 | | ID |
+---------------+--------------+------------+----------------+----------------------+-----------------------+
-- SQLNESS REPLACE (\d{13}) REGION_ID

Some files were not shown because too many files have changed in this diff Show More