mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 22:49:58 +00:00
Compare commits
2 Commits
v1.0.0-bet
...
feature/df
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ef80503454 | ||
|
|
30ca2d7652 |
64
Cargo.lock
generated
64
Cargo.lock
generated
@@ -3274,7 +3274,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-ipc",
|
||||
@@ -3329,7 +3329,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-catalog"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-trait",
|
||||
@@ -3353,7 +3353,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-catalog-listing"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-trait",
|
||||
@@ -3375,7 +3375,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-common"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"ahash 0.8.12",
|
||||
"arrow",
|
||||
@@ -3398,7 +3398,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-common-runtime"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"log",
|
||||
@@ -3408,7 +3408,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-datasource"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-compression 0.4.19",
|
||||
@@ -3442,7 +3442,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-datasource-csv"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-trait",
|
||||
@@ -3464,7 +3464,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-datasource-json"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-trait",
|
||||
@@ -3485,7 +3485,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-datasource-parquet"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-trait",
|
||||
@@ -3514,12 +3514,12 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-doc"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
|
||||
[[package]]
|
||||
name = "datafusion-execution"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-trait",
|
||||
@@ -3538,7 +3538,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-expr"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-trait",
|
||||
@@ -3560,7 +3560,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-expr-common"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"datafusion-common",
|
||||
@@ -3572,7 +3572,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-functions"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-buffer",
|
||||
@@ -3600,7 +3600,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-functions-aggregate"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"ahash 0.8.12",
|
||||
"arrow",
|
||||
@@ -3620,7 +3620,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-functions-aggregate-common"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"ahash 0.8.12",
|
||||
"arrow",
|
||||
@@ -3632,7 +3632,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-functions-nested"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-ord",
|
||||
@@ -3654,7 +3654,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-functions-table"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-trait",
|
||||
@@ -3669,7 +3669,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-functions-window"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"datafusion-common",
|
||||
@@ -3686,7 +3686,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-functions-window-common"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"datafusion-common",
|
||||
"datafusion-physical-expr-common",
|
||||
@@ -3695,7 +3695,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-macros"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"datafusion-doc",
|
||||
"quote",
|
||||
@@ -3705,7 +3705,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-optimizer"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"chrono",
|
||||
@@ -3756,7 +3756,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-physical-expr"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"ahash 0.8.12",
|
||||
"arrow",
|
||||
@@ -3777,7 +3777,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-physical-expr-adapter"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"datafusion-common",
|
||||
@@ -3791,7 +3791,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-physical-expr-common"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"ahash 0.8.12",
|
||||
"arrow",
|
||||
@@ -3804,7 +3804,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-physical-optimizer"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"datafusion-common",
|
||||
@@ -3822,7 +3822,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-physical-plan"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"ahash 0.8.12",
|
||||
"arrow",
|
||||
@@ -3852,7 +3852,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-pruning"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"datafusion-common",
|
||||
@@ -3868,7 +3868,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-session"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"datafusion-common",
|
||||
@@ -3881,7 +3881,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-sql"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"bigdecimal 0.4.8",
|
||||
@@ -3898,7 +3898,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-substrait"
|
||||
version = "50.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
||||
dependencies = [
|
||||
"async-recursion",
|
||||
"async-trait",
|
||||
@@ -7514,11 +7514,9 @@ dependencies = [
|
||||
"common-test-util",
|
||||
"common-time",
|
||||
"common-wal",
|
||||
"criterion 0.4.0",
|
||||
"datafusion",
|
||||
"datatypes",
|
||||
"futures-util",
|
||||
"fxhash",
|
||||
"humantime-serde",
|
||||
"itertools 0.14.0",
|
||||
"lazy_static",
|
||||
|
||||
24
Cargo.toml
24
Cargo.toml
@@ -316,18 +316,18 @@ git = "https://github.com/GreptimeTeam/greptime-meter.git"
|
||||
rev = "5618e779cf2bb4755b499c630fba4c35e91898cb"
|
||||
|
||||
[patch.crates-io]
|
||||
datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
|
||||
datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
|
||||
datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
|
||||
datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
|
||||
datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
|
||||
datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
|
||||
datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
|
||||
datafusion-physical-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
|
||||
datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
|
||||
datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
|
||||
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
|
||||
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
|
||||
datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
|
||||
datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
|
||||
datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
|
||||
datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
|
||||
datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
|
||||
datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
|
||||
datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
|
||||
datafusion-physical-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
|
||||
datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
|
||||
datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
|
||||
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
|
||||
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
|
||||
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "4b519a5caa95472cc3988f5556813a583dd35af1" } # branch = "v0.58.x"
|
||||
|
||||
[profile.release]
|
||||
|
||||
@@ -14,7 +14,6 @@ async-stream.workspace = true
|
||||
async-trait.workspace = true
|
||||
base64.workspace = true
|
||||
bytes.workspace = true
|
||||
fxhash = "0.2"
|
||||
common-base.workspace = true
|
||||
common-error.workspace = true
|
||||
common-macro.workspace = true
|
||||
@@ -32,6 +31,7 @@ lazy_static = "1.4"
|
||||
mito-codec.workspace = true
|
||||
mito2.workspace = true
|
||||
moka.workspace = true
|
||||
mur3 = "0.1"
|
||||
object-store.workspace = true
|
||||
prometheus.workspace = true
|
||||
serde.workspace = true
|
||||
@@ -47,12 +47,6 @@ common-meta = { workspace = true, features = ["testing"] }
|
||||
common-test-util.workspace = true
|
||||
mito2 = { workspace = true, features = ["test"] }
|
||||
common-wal = { workspace = true }
|
||||
criterion = { version = "0.4", features = ["async", "async_tokio"] }
|
||||
mur3 = "0.1"
|
||||
|
||||
[[bench]]
|
||||
name = "bench_tsid_generator"
|
||||
harness = false
|
||||
|
||||
[package.metadata.cargo-udeps.ignore]
|
||||
normal = ["aquamarine"]
|
||||
|
||||
@@ -1,273 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::hash::Hasher;
|
||||
|
||||
use criterion::{Criterion, black_box, criterion_group, criterion_main};
|
||||
use fxhash::FxHasher;
|
||||
use mur3::Hasher128;
|
||||
|
||||
// A random number (from original implementation)
|
||||
const TSID_HASH_SEED: u32 = 846793005;
|
||||
|
||||
/// Original TSID generator using mur3::Hasher128
|
||||
/// Hashes both label name and value for each label pair
|
||||
struct OriginalTsidGenerator {
|
||||
hasher: Hasher128,
|
||||
}
|
||||
|
||||
impl OriginalTsidGenerator {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
hasher: Hasher128::with_seed(TSID_HASH_SEED),
|
||||
}
|
||||
}
|
||||
|
||||
/// Writes a label pair (name and value) to the generator.
|
||||
fn write_label(&mut self, name: &str, value: &str) {
|
||||
use std::hash::Hash;
|
||||
name.hash(&mut self.hasher);
|
||||
value.hash(&mut self.hasher);
|
||||
}
|
||||
|
||||
/// Generates a new TSID.
|
||||
fn finish(&mut self) -> u64 {
|
||||
// TSID is 64 bits, simply truncate the 128 bits hash
|
||||
let (hash, _) = self.hasher.finish128();
|
||||
hash
|
||||
}
|
||||
}
|
||||
|
||||
/// Current TSID generator using fxhash::FxHasher
|
||||
/// Fast path: pre-computes label name hash, only hashes values
|
||||
struct CurrentTsidGenerator {
|
||||
hasher: FxHasher,
|
||||
}
|
||||
|
||||
impl CurrentTsidGenerator {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
hasher: FxHasher::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn new_with_label_name_hash(label_name_hash: u64) -> Self {
|
||||
let mut hasher = FxHasher::default();
|
||||
hasher.write_u64(label_name_hash);
|
||||
Self { hasher }
|
||||
}
|
||||
|
||||
/// Writes a label value to the generator.
|
||||
fn write_str(&mut self, value: &str) {
|
||||
self.hasher.write(value.as_bytes());
|
||||
self.hasher.write_u8(0xff);
|
||||
}
|
||||
|
||||
/// Generates a new TSID.
|
||||
fn finish(&mut self) -> u64 {
|
||||
self.hasher.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// Pre-computes label name hash (used in fast path)
|
||||
fn compute_label_name_hash(labels: &[(&str, &str)]) -> u64 {
|
||||
let mut hasher = FxHasher::default();
|
||||
for (name, _) in labels {
|
||||
hasher.write(name.as_bytes());
|
||||
hasher.write_u8(0xff);
|
||||
}
|
||||
hasher.finish()
|
||||
}
|
||||
|
||||
fn bench_tsid_generator_small(c: &mut Criterion) {
|
||||
let labels = vec![("namespace", "greptimedb"), ("host", "127.0.0.1")];
|
||||
|
||||
let mut group = c.benchmark_group("tsid_generator_small_2_labels");
|
||||
group.bench_function("original_mur3", |b| {
|
||||
b.iter(|| {
|
||||
let mut tsid_gen = OriginalTsidGenerator::new();
|
||||
for (name, value) in &labels {
|
||||
tsid_gen.write_label(black_box(name), black_box(value));
|
||||
}
|
||||
black_box(tsid_gen.finish())
|
||||
})
|
||||
});
|
||||
|
||||
let label_name_hash = compute_label_name_hash(&labels);
|
||||
group.bench_function("current_fxhash_fast_path", |b| {
|
||||
b.iter(|| {
|
||||
let mut tsid_gen =
|
||||
CurrentTsidGenerator::new_with_label_name_hash(black_box(label_name_hash));
|
||||
for (_, value) in &labels {
|
||||
tsid_gen.write_str(black_box(value));
|
||||
}
|
||||
black_box(tsid_gen.finish())
|
||||
})
|
||||
});
|
||||
|
||||
group.finish();
|
||||
}
|
||||
|
||||
fn bench_tsid_generator_medium(c: &mut Criterion) {
|
||||
let labels = vec![
|
||||
("namespace", "greptimedb"),
|
||||
("host", "127.0.0.1"),
|
||||
("region", "us-west-2"),
|
||||
("env", "production"),
|
||||
("service", "api"),
|
||||
];
|
||||
|
||||
let mut group = c.benchmark_group("tsid_generator_medium_5_labels");
|
||||
group.bench_function("original_mur3", |b| {
|
||||
b.iter(|| {
|
||||
let mut tsid_gen = OriginalTsidGenerator::new();
|
||||
for (name, value) in &labels {
|
||||
tsid_gen.write_label(black_box(name), black_box(value));
|
||||
}
|
||||
black_box(tsid_gen.finish())
|
||||
})
|
||||
});
|
||||
|
||||
let label_name_hash = compute_label_name_hash(&labels);
|
||||
group.bench_function("current_fxhash_fast_path", |b| {
|
||||
b.iter(|| {
|
||||
let mut tsid_gen =
|
||||
CurrentTsidGenerator::new_with_label_name_hash(black_box(label_name_hash));
|
||||
for (_, value) in &labels {
|
||||
tsid_gen.write_str(black_box(value));
|
||||
}
|
||||
black_box(tsid_gen.finish())
|
||||
})
|
||||
});
|
||||
|
||||
group.finish();
|
||||
}
|
||||
|
||||
fn bench_tsid_generator_large(c: &mut Criterion) {
|
||||
let labels = vec![
|
||||
("namespace", "greptimedb"),
|
||||
("host", "127.0.0.1"),
|
||||
("region", "us-west-2"),
|
||||
("env", "production"),
|
||||
("service", "api"),
|
||||
("version", "v1.0.0"),
|
||||
("cluster", "cluster-1"),
|
||||
("dc", "dc1"),
|
||||
("rack", "rack-1"),
|
||||
("pod", "pod-123"),
|
||||
];
|
||||
|
||||
let mut group = c.benchmark_group("tsid_generator_large_10_labels");
|
||||
group.bench_function("original_mur3", |b| {
|
||||
b.iter(|| {
|
||||
let mut tsid_gen = OriginalTsidGenerator::new();
|
||||
for (name, value) in &labels {
|
||||
tsid_gen.write_label(black_box(name), black_box(value));
|
||||
}
|
||||
black_box(tsid_gen.finish())
|
||||
})
|
||||
});
|
||||
|
||||
let label_name_hash = compute_label_name_hash(&labels);
|
||||
group.bench_function("current_fxhash_fast_path", |b| {
|
||||
b.iter(|| {
|
||||
let mut tsid_gen =
|
||||
CurrentTsidGenerator::new_with_label_name_hash(black_box(label_name_hash));
|
||||
for (_, value) in &labels {
|
||||
tsid_gen.write_str(black_box(value));
|
||||
}
|
||||
black_box(tsid_gen.finish())
|
||||
})
|
||||
});
|
||||
|
||||
group.finish();
|
||||
}
|
||||
|
||||
fn bench_tsid_generator_slow_path(c: &mut Criterion) {
|
||||
// Simulate slow path: some labels have null values (empty strings)
|
||||
let labels_with_nulls = vec![
|
||||
("namespace", "greptimedb"),
|
||||
("host", "127.0.0.1"),
|
||||
("region", ""), // null
|
||||
("env", "production"),
|
||||
];
|
||||
|
||||
let labels_all_non_null = vec![
|
||||
("namespace", "greptimedb"),
|
||||
("host", "127.0.0.1"),
|
||||
("env", "production"),
|
||||
];
|
||||
|
||||
let mut group = c.benchmark_group("tsid_generator_slow_path_with_nulls");
|
||||
|
||||
// Original: always hashes name and value
|
||||
group.bench_function("original_mur3_with_nulls", |b| {
|
||||
b.iter(|| {
|
||||
let mut tsid_gen = OriginalTsidGenerator::new();
|
||||
for (name, value) in &labels_with_nulls {
|
||||
if !value.is_empty() {
|
||||
tsid_gen.write_label(black_box(name), black_box(value));
|
||||
}
|
||||
}
|
||||
black_box(tsid_gen.finish())
|
||||
})
|
||||
});
|
||||
|
||||
// Current slow path: recomputes label name hash
|
||||
group.bench_function("current_fxhash_slow_path", |b| {
|
||||
b.iter(|| {
|
||||
// Step 1: Compute label name hash for non-null labels
|
||||
let mut name_hasher = CurrentTsidGenerator::new();
|
||||
for (name, value) in &labels_with_nulls {
|
||||
if !value.is_empty() {
|
||||
name_hasher.write_str(black_box(name));
|
||||
}
|
||||
}
|
||||
let label_name_hash = name_hasher.finish();
|
||||
|
||||
// Step 2: Use label name hash and hash values
|
||||
let mut tsid_gen = CurrentTsidGenerator::new_with_label_name_hash(label_name_hash);
|
||||
for (_, value) in &labels_with_nulls {
|
||||
if !value.is_empty() {
|
||||
tsid_gen.write_str(black_box(value));
|
||||
}
|
||||
}
|
||||
black_box(tsid_gen.finish())
|
||||
})
|
||||
});
|
||||
|
||||
// Current fast path: pre-computed (for comparison)
|
||||
let label_name_hash = compute_label_name_hash(&labels_all_non_null);
|
||||
group.bench_function("current_fxhash_fast_path_no_nulls", |b| {
|
||||
b.iter(|| {
|
||||
let mut tsid_gen =
|
||||
CurrentTsidGenerator::new_with_label_name_hash(black_box(label_name_hash));
|
||||
for (_, value) in &labels_all_non_null {
|
||||
tsid_gen.write_str(black_box(value));
|
||||
}
|
||||
black_box(tsid_gen.finish())
|
||||
})
|
||||
});
|
||||
|
||||
group.finish();
|
||||
}
|
||||
|
||||
criterion_group!(
|
||||
benches,
|
||||
bench_tsid_generator_small,
|
||||
bench_tsid_generator_medium,
|
||||
bench_tsid_generator_large,
|
||||
bench_tsid_generator_slow_path
|
||||
);
|
||||
criterion_main!(benches);
|
||||
@@ -272,15 +272,15 @@ mod tests {
|
||||
.unwrap();
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
let expected = "\
|
||||
+-------------------------+----------------+------------+---------------------+-------+
|
||||
| greptime_timestamp | greptime_value | __table_id | __tsid | job |
|
||||
+-------------------------+----------------+------------+---------------------+-------+
|
||||
| 1970-01-01T00:00:00 | 0.0 | 3 | 2955007454552897459 | tag_0 |
|
||||
| 1970-01-01T00:00:00.001 | 1.0 | 3 | 2955007454552897459 | tag_0 |
|
||||
| 1970-01-01T00:00:00.002 | 2.0 | 3 | 2955007454552897459 | tag_0 |
|
||||
| 1970-01-01T00:00:00.003 | 3.0 | 3 | 2955007454552897459 | tag_0 |
|
||||
| 1970-01-01T00:00:00.004 | 4.0 | 3 | 2955007454552897459 | tag_0 |
|
||||
+-------------------------+----------------+------------+---------------------+-------+";
|
||||
+-------------------------+----------------+------------+----------------------+-------+
|
||||
| greptime_timestamp | greptime_value | __table_id | __tsid | job |
|
||||
+-------------------------+----------------+------------+----------------------+-------+
|
||||
| 1970-01-01T00:00:00 | 0.0 | 3 | 12881218023286672757 | tag_0 |
|
||||
| 1970-01-01T00:00:00.001 | 1.0 | 3 | 12881218023286672757 | tag_0 |
|
||||
| 1970-01-01T00:00:00.002 | 2.0 | 3 | 12881218023286672757 | tag_0 |
|
||||
| 1970-01-01T00:00:00.003 | 3.0 | 3 | 12881218023286672757 | tag_0 |
|
||||
| 1970-01-01T00:00:00.004 | 4.0 | 3 | 12881218023286672757 | tag_0 |
|
||||
+-------------------------+----------------+------------+----------------------+-------+";
|
||||
assert_eq!(expected, batches.pretty_print().unwrap(), "physical region");
|
||||
|
||||
// read data from logical region
|
||||
|
||||
@@ -13,12 +13,11 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::hash::Hasher;
|
||||
use std::hash::Hash;
|
||||
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
|
||||
use datatypes::value::ValueRef;
|
||||
use fxhash::FxHasher;
|
||||
use mito_codec::row_converter::SparsePrimaryKeyCodec;
|
||||
use smallvec::SmallVec;
|
||||
use snafu::ResultExt;
|
||||
@@ -31,6 +30,9 @@ use store_api::storage::{ColumnId, TableId};
|
||||
|
||||
use crate::error::{EncodePrimaryKeySnafu, Result};
|
||||
|
||||
// A random number
|
||||
const TSID_HASH_SEED: u32 = 846793005;
|
||||
|
||||
/// A row modifier modifies [`Rows`].
|
||||
///
|
||||
/// - For [`PrimaryKeyEncoding::Sparse`] encoding,
|
||||
@@ -73,7 +75,6 @@ impl RowModifier {
|
||||
let num_output_column = num_column - num_primary_key_column + 1;
|
||||
|
||||
let mut buffer = vec![];
|
||||
|
||||
for mut iter in iter.iter_mut() {
|
||||
let (table_id, tsid) = Self::fill_internal_columns(table_id, &iter);
|
||||
let mut values = Vec::with_capacity(num_output_column);
|
||||
@@ -146,72 +147,47 @@ impl RowModifier {
|
||||
|
||||
/// Fills internal columns of a row with table name and a hash of tag values.
|
||||
pub fn fill_internal_columns(table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) {
|
||||
let ts_id = if !iter.has_null_labels() {
|
||||
// No null labels in row, we can safely reuse the precomputed label name hash.
|
||||
let mut ts_id_gen = TsidGenerator::new(iter.index.label_name_hash);
|
||||
for (_, value) in iter.primary_keys_with_name() {
|
||||
// The type is checked before. So only null is ignored.
|
||||
if let Some(ValueData::StringValue(string)) = &value.value_data {
|
||||
ts_id_gen.write_str(string);
|
||||
} else {
|
||||
unreachable!(
|
||||
"Should not contain null or non-string value: {:?}, table id: {}",
|
||||
value, table_id
|
||||
);
|
||||
}
|
||||
let mut hasher = TsidGenerator::default();
|
||||
for (name, value) in iter.primary_keys_with_name() {
|
||||
// The type is checked before. So only null is ignored.
|
||||
if let Some(ValueData::StringValue(string)) = &value.value_data {
|
||||
hasher.write_label(name, string);
|
||||
}
|
||||
ts_id_gen.finish()
|
||||
} else {
|
||||
// Slow path: row contains null, recompute label hash
|
||||
let mut hasher = TsidGenerator::default();
|
||||
// 1. Find out label names with non-null values and get the hash.
|
||||
for (name, value) in iter.primary_keys_with_name() {
|
||||
// The type is checked before. So only null is ignored.
|
||||
if let Some(ValueData::StringValue(_)) = &value.value_data {
|
||||
hasher.write_str(name);
|
||||
}
|
||||
}
|
||||
let label_name_hash = hasher.finish();
|
||||
|
||||
// 2. Use label name hash as seed and continue with label values.
|
||||
let mut final_hasher = TsidGenerator::new(label_name_hash);
|
||||
for (_, value) in iter.primary_keys_with_name() {
|
||||
if let Some(ValueData::StringValue(value)) = &value.value_data {
|
||||
final_hasher.write_str(value);
|
||||
}
|
||||
}
|
||||
final_hasher.finish()
|
||||
};
|
||||
}
|
||||
let hash = hasher.finish();
|
||||
|
||||
(
|
||||
ValueData::U32Value(table_id).into(),
|
||||
ValueData::U64Value(ts_id).into(),
|
||||
ValueData::U64Value(hash).into(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Tsid generator.
|
||||
#[derive(Default)]
|
||||
pub struct TsidGenerator {
|
||||
hasher: FxHasher,
|
||||
hasher: mur3::Hasher128,
|
||||
}
|
||||
|
||||
impl Default for TsidGenerator {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
hasher: mur3::Hasher128::with_seed(TSID_HASH_SEED),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TsidGenerator {
|
||||
pub fn new(label_name_hash: u64) -> Self {
|
||||
let mut hasher = FxHasher::default();
|
||||
hasher.write_u64(label_name_hash);
|
||||
Self { hasher }
|
||||
}
|
||||
|
||||
/// Writes a label pair to the generator.
|
||||
pub fn write_str(&mut self, value: &str) {
|
||||
self.hasher.write(value.as_bytes());
|
||||
self.hasher.write_u8(0xff);
|
||||
pub fn write_label(&mut self, name: &str, value: &str) {
|
||||
name.hash(&mut self.hasher);
|
||||
value.hash(&mut self.hasher);
|
||||
}
|
||||
|
||||
/// Generates a new TSID.
|
||||
pub fn finish(&mut self) -> u64 {
|
||||
self.hasher.finish()
|
||||
// TSID is 64 bits, simply truncate the 128 bits hash
|
||||
let (hash, _) = self.hasher.finish128();
|
||||
hash
|
||||
}
|
||||
}
|
||||
|
||||
@@ -226,8 +202,6 @@ struct ValueIndex {
|
||||
struct IterIndex {
|
||||
indices: Vec<ValueIndex>,
|
||||
num_primary_key_column: usize,
|
||||
/// Precomputed hash for label names.
|
||||
label_name_hash: u64,
|
||||
}
|
||||
|
||||
impl IterIndex {
|
||||
@@ -278,22 +252,15 @@ impl IterIndex {
|
||||
}
|
||||
}
|
||||
let num_primary_key_column = primary_key_indices.len() + reserved_indices.len();
|
||||
let mut indices = Vec::with_capacity(num_primary_key_column + 2);
|
||||
indices.extend(reserved_indices);
|
||||
let mut label_name_hasher = TsidGenerator::default();
|
||||
for (pk_name, pk_index) in primary_key_indices {
|
||||
// primary_key_indices already sorted.
|
||||
label_name_hasher.write_str(pk_name);
|
||||
indices.push(pk_index);
|
||||
}
|
||||
let label_name_hash = label_name_hasher.finish();
|
||||
|
||||
indices.extend(ts_index);
|
||||
indices.extend(field_indices);
|
||||
let indices = reserved_indices
|
||||
.into_iter()
|
||||
.chain(primary_key_indices.values().cloned())
|
||||
.chain(ts_index)
|
||||
.chain(field_indices)
|
||||
.collect();
|
||||
IterIndex {
|
||||
indices,
|
||||
num_primary_key_column,
|
||||
label_name_hash,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -347,13 +314,6 @@ impl RowIter<'_> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns true if any label in current row is null.
|
||||
fn has_null_labels(&self) -> bool {
|
||||
self.index.indices[..self.index.num_primary_key_column]
|
||||
.iter()
|
||||
.any(|idx| self.row.values[idx.index].value_data.is_none())
|
||||
}
|
||||
|
||||
/// Returns the primary keys.
|
||||
pub fn primary_keys(&self) -> impl Iterator<Item = (ColumnId, ValueRef<'_>)> {
|
||||
self.index.indices[..self.index.num_primary_key_column]
|
||||
@@ -439,9 +399,9 @@ mod tests {
|
||||
let result = encoder.modify_rows_sparse(rows_iter, table_id).unwrap();
|
||||
assert_eq!(result.rows[0].values.len(), 1);
|
||||
let encoded_primary_key = vec![
|
||||
128, 0, 0, 4, 1, 0, 0, 4, 1, 128, 0, 0, 3, 1, 37, 196, 242, 181, 117, 224, 7, 137, 0,
|
||||
0, 0, 2, 1, 1, 49, 50, 55, 46, 48, 46, 48, 46, 9, 49, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0,
|
||||
1, 1, 1, 103, 114, 101, 112, 116, 105, 109, 101, 9, 100, 98, 0, 0, 0, 0, 0, 0, 2,
|
||||
128, 0, 0, 4, 1, 0, 0, 4, 1, 128, 0, 0, 3, 1, 131, 9, 166, 190, 173, 37, 39, 240, 0, 0,
|
||||
0, 2, 1, 1, 49, 50, 55, 46, 48, 46, 48, 46, 9, 49, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1,
|
||||
1, 1, 103, 114, 101, 112, 116, 105, 109, 101, 9, 100, 98, 0, 0, 0, 0, 0, 0, 2,
|
||||
];
|
||||
assert_eq!(
|
||||
result.rows[0].values[0],
|
||||
@@ -517,7 +477,7 @@ mod tests {
|
||||
assert_eq!(result.rows[0].values[2], ValueData::U32Value(1025).into());
|
||||
assert_eq!(
|
||||
result.rows[0].values[3],
|
||||
ValueData::U64Value(2721566936019240841).into()
|
||||
ValueData::U64Value(9442261431637846000).into()
|
||||
);
|
||||
assert_eq!(result.schema, expected_dense_schema());
|
||||
}
|
||||
@@ -536,7 +496,7 @@ mod tests {
|
||||
let row_iter = rows_iter.iter_mut().next().unwrap();
|
||||
let (encoded_table_id, tsid) = RowModifier::fill_internal_columns(table_id, &row_iter);
|
||||
assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
|
||||
assert_eq!(tsid, ValueData::U64Value(2721566936019240841).into());
|
||||
assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into());
|
||||
|
||||
// Change the column order
|
||||
let schema = vec![
|
||||
@@ -564,264 +524,6 @@ mod tests {
|
||||
let row_iter = rows_iter.iter_mut().next().unwrap();
|
||||
let (encoded_table_id, tsid) = RowModifier::fill_internal_columns(table_id, &row_iter);
|
||||
assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
|
||||
assert_eq!(tsid, ValueData::U64Value(2721566936019240841).into());
|
||||
}
|
||||
|
||||
/// Helper function to create a schema with multiple label columns
|
||||
fn create_multi_label_schema(labels: &[&str]) -> Vec<ColumnSchema> {
|
||||
labels
|
||||
.iter()
|
||||
.map(|name| ColumnSchema {
|
||||
column_name: name.to_string(),
|
||||
datatype: ColumnDataType::String as i32,
|
||||
semantic_type: SemanticType::Tag as _,
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Helper function to create a name_to_column_id map
|
||||
fn create_name_to_column_id(labels: &[&str]) -> HashMap<String, ColumnId> {
|
||||
labels
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(idx, name)| (name.to_string(), idx as ColumnId + 1))
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Helper function to create a row with string values
|
||||
fn create_row_with_values(values: &[&str]) -> Row {
|
||||
Row {
|
||||
values: values
|
||||
.iter()
|
||||
.map(|v| ValueData::StringValue(v.to_string()).into())
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper function to create a row with some null values
|
||||
fn create_row_with_nulls(values: &[Option<&str>]) -> Row {
|
||||
Row {
|
||||
values: values
|
||||
.iter()
|
||||
.map(|v| {
|
||||
v.map(|s| ValueData::StringValue(s.to_string()).into())
|
||||
.unwrap_or(Value { value_data: None })
|
||||
})
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper function to extract TSID from a row
|
||||
fn extract_tsid(
|
||||
schema: Vec<ColumnSchema>,
|
||||
row: Row,
|
||||
name_to_column_id: &HashMap<String, ColumnId>,
|
||||
table_id: TableId,
|
||||
) -> u64 {
|
||||
let rows = Rows {
|
||||
schema,
|
||||
rows: vec![row],
|
||||
};
|
||||
let mut rows_iter = RowsIter::new(rows, name_to_column_id);
|
||||
let row_iter = rows_iter.iter_mut().next().unwrap();
|
||||
let (_, tsid_value) = RowModifier::fill_internal_columns(table_id, &row_iter);
|
||||
match tsid_value.value_data {
|
||||
Some(ValueData::U64Value(tsid)) => tsid,
|
||||
_ => panic!("Expected U64Value for TSID"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_tsid_same_for_different_label_orders() {
|
||||
// Test that rows with the same label name-value pairs but in different orders
|
||||
// produce the same TSID
|
||||
let table_id = 1025;
|
||||
|
||||
// Schema 1: a, b, c
|
||||
let schema1 = create_multi_label_schema(&["a", "b", "c"]);
|
||||
let name_to_column_id1 = create_name_to_column_id(&["a", "b", "c"]);
|
||||
let row1 = create_row_with_values(&["A", "B", "C"]);
|
||||
let tsid1 = extract_tsid(schema1, row1, &name_to_column_id1, table_id);
|
||||
|
||||
// Schema 2: b, a, c (different order)
|
||||
let schema2 = create_multi_label_schema(&["b", "a", "c"]);
|
||||
let name_to_column_id2 = create_name_to_column_id(&["a", "b", "c"]);
|
||||
let row2 = create_row_with_values(&["B", "A", "C"]);
|
||||
let tsid2 = extract_tsid(schema2, row2, &name_to_column_id2, table_id);
|
||||
|
||||
// Schema 3: c, b, a (another different order)
|
||||
let schema3 = create_multi_label_schema(&["c", "b", "a"]);
|
||||
let name_to_column_id3 = create_name_to_column_id(&["a", "b", "c"]);
|
||||
let row3 = create_row_with_values(&["C", "B", "A"]);
|
||||
let tsid3 = extract_tsid(schema3, row3, &name_to_column_id3, table_id);
|
||||
|
||||
// All should have the same TSID since label names are sorted lexicographically
|
||||
// and we're using the same label name-value pairs
|
||||
assert_eq!(
|
||||
tsid1, tsid2,
|
||||
"TSID should be same for different column orders"
|
||||
);
|
||||
assert_eq!(
|
||||
tsid2, tsid3,
|
||||
"TSID should be same for different column orders"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_tsid_same_with_null_labels() {
|
||||
// Test that rows that differ only by null label values produce the same TSID
|
||||
let table_id = 1025;
|
||||
|
||||
// Row 1: a=A, b=B (no nulls, fast path)
|
||||
let schema1 = create_multi_label_schema(&["a", "b"]);
|
||||
let name_to_column_id1 = create_name_to_column_id(&["a", "b"]);
|
||||
let row1 = create_row_with_values(&["A", "B"]);
|
||||
let tsid1 = extract_tsid(schema1, row1, &name_to_column_id1, table_id);
|
||||
|
||||
// Row 2: a=A, b=B, c=null (has null, slow path)
|
||||
let schema2 = create_multi_label_schema(&["a", "b", "c"]);
|
||||
let name_to_column_id2 = create_name_to_column_id(&["a", "b", "c"]);
|
||||
let row2 = create_row_with_nulls(&[Some("A"), Some("B"), None]);
|
||||
let tsid2 = extract_tsid(schema2, row2, &name_to_column_id2, table_id);
|
||||
|
||||
// Both should have the same TSID since null labels are ignored
|
||||
assert_eq!(
|
||||
tsid1, tsid2,
|
||||
"TSID should be same when only difference is null label values"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_tsid_same_with_multiple_null_labels() {
|
||||
// Test with multiple null labels
|
||||
let table_id = 1025;
|
||||
|
||||
// Row 1: a=A, b=B (no nulls)
|
||||
let schema1 = create_multi_label_schema(&["a", "b"]);
|
||||
let name_to_column_id1 = create_name_to_column_id(&["a", "b"]);
|
||||
let row1 = create_row_with_values(&["A", "B"]);
|
||||
let tsid1 = extract_tsid(schema1, row1, &name_to_column_id1, table_id);
|
||||
|
||||
// Row 2: a=A, b=B, c=null, d=null (multiple nulls)
|
||||
let schema2 = create_multi_label_schema(&["a", "b", "c", "d"]);
|
||||
let name_to_column_id2 = create_name_to_column_id(&["a", "b", "c", "d"]);
|
||||
let row2 = create_row_with_nulls(&[Some("A"), Some("B"), None, None]);
|
||||
let tsid2 = extract_tsid(schema2, row2, &name_to_column_id2, table_id);
|
||||
|
||||
assert_eq!(
|
||||
tsid1, tsid2,
|
||||
"TSID should be same when only difference is multiple null label values"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_tsid_different_with_different_non_null_values() {
|
||||
// Test that rows with different non-null values produce different TSIDs
|
||||
let table_id = 1025;
|
||||
|
||||
// Row 1: a=A, b=B
|
||||
let schema1 = create_multi_label_schema(&["a", "b"]);
|
||||
let name_to_column_id1 = create_name_to_column_id(&["a", "b"]);
|
||||
let row1 = create_row_with_values(&["A", "B"]);
|
||||
let tsid1 = extract_tsid(schema1, row1, &name_to_column_id1, table_id);
|
||||
|
||||
// Row 2: a=A, b=C (different value for b)
|
||||
let schema2 = create_multi_label_schema(&["a", "b"]);
|
||||
let name_to_column_id2 = create_name_to_column_id(&["a", "b"]);
|
||||
let row2 = create_row_with_values(&["A", "C"]);
|
||||
let tsid2 = extract_tsid(schema2, row2, &name_to_column_id2, table_id);
|
||||
|
||||
assert_ne!(
|
||||
tsid1, tsid2,
|
||||
"TSID should be different when label values differ"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_tsid_fast_path_vs_slow_path_consistency() {
|
||||
// Test that fast path (no nulls) and slow path (with nulls) produce
|
||||
// the same TSID for the same non-null label values
|
||||
let table_id = 1025;
|
||||
|
||||
// Fast path: a=A, b=B (no nulls)
|
||||
let schema_fast = create_multi_label_schema(&["a", "b"]);
|
||||
let name_to_column_id_fast = create_name_to_column_id(&["a", "b"]);
|
||||
let row_fast = create_row_with_values(&["A", "B"]);
|
||||
let tsid_fast = extract_tsid(schema_fast, row_fast, &name_to_column_id_fast, table_id);
|
||||
|
||||
// Slow path: a=A, b=B, c=null (has null, triggers slow path)
|
||||
let schema_slow = create_multi_label_schema(&["a", "b", "c"]);
|
||||
let name_to_column_id_slow = create_name_to_column_id(&["a", "b", "c"]);
|
||||
let row_slow = create_row_with_nulls(&[Some("A"), Some("B"), None]);
|
||||
let tsid_slow = extract_tsid(schema_slow, row_slow, &name_to_column_id_slow, table_id);
|
||||
|
||||
assert_eq!(
|
||||
tsid_fast, tsid_slow,
|
||||
"Fast path and slow path should produce same TSID for same non-null values"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_tsid_with_null_in_middle() {
|
||||
// Test with null in the middle of labels
|
||||
let table_id = 1025;
|
||||
|
||||
// Row 1: a=A, b=B, c=C
|
||||
let schema1 = create_multi_label_schema(&["a", "b", "c"]);
|
||||
let name_to_column_id1 = create_name_to_column_id(&["a", "b", "c"]);
|
||||
let row1 = create_row_with_values(&["A", "B", "C"]);
|
||||
let tsid1 = extract_tsid(schema1, row1, &name_to_column_id1, table_id);
|
||||
|
||||
// Row 2: a=A, b=null, c=C (null in middle)
|
||||
let schema2 = create_multi_label_schema(&["a", "b", "c"]);
|
||||
let name_to_column_id2 = create_name_to_column_id(&["a", "b", "c"]);
|
||||
let row2 = create_row_with_nulls(&[Some("A"), None, Some("C")]);
|
||||
let tsid2 = extract_tsid(schema2, row2, &name_to_column_id2, table_id);
|
||||
|
||||
// Should be different because b is null in row2 but B in row1
|
||||
// Actually wait, let me reconsider - if b is null, it should be ignored
|
||||
// So row2 should be equivalent to a=A, c=C
|
||||
// But row1 is a=A, b=B, c=C, so they should be different
|
||||
assert_ne!(
|
||||
tsid1, tsid2,
|
||||
"TSID should be different when a non-null value becomes null"
|
||||
);
|
||||
|
||||
// Row 3: a=A, c=C (no b at all, equivalent to row2)
|
||||
let schema3 = create_multi_label_schema(&["a", "c"]);
|
||||
let name_to_column_id3 = create_name_to_column_id(&["a", "c"]);
|
||||
let row3 = create_row_with_values(&["A", "C"]);
|
||||
let tsid3 = extract_tsid(schema3, row3, &name_to_column_id3, table_id);
|
||||
|
||||
// Row2 (a=A, b=null, c=C) should be same as row3 (a=A, c=C)
|
||||
assert_eq!(
|
||||
tsid2, tsid3,
|
||||
"TSID should be same when null label is ignored"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_tsid_all_null_labels() {
|
||||
// Test with all labels being null
|
||||
let table_id = 1025;
|
||||
|
||||
// Row with all nulls
|
||||
let schema = create_multi_label_schema(&["a", "b", "c"]);
|
||||
let name_to_column_id = create_name_to_column_id(&["a", "b", "c"]);
|
||||
let row = create_row_with_nulls(&[None, None, None]);
|
||||
let tsid = extract_tsid(schema.clone(), row, &name_to_column_id, table_id);
|
||||
|
||||
// Should still produce a TSID (based on label names only when all values are null)
|
||||
// This tests that the slow path handles the case where all values are null
|
||||
// The TSID will be based on the label name hash only
|
||||
// Test that it's consistent - same schema with all nulls should produce same TSID
|
||||
let row2 = create_row_with_nulls(&[None, None, None]);
|
||||
let tsid2 = extract_tsid(schema, row2, &name_to_column_id, table_id);
|
||||
assert_eq!(
|
||||
tsid, tsid2,
|
||||
"TSID should be consistent when all label values are null"
|
||||
);
|
||||
assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -515,7 +515,6 @@ async fn test_flush_workers() {
|
||||
}
|
||||
|
||||
async fn test_flush_workers_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let mut env = TestEnv::new().await;
|
||||
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
|
||||
let listener = Arc::new(FlushListener::default());
|
||||
@@ -575,7 +574,7 @@ async fn test_flush_workers_with_format(flat_format: bool) {
|
||||
put_rows(&engine, region_id0, rows).await;
|
||||
|
||||
// Waits until flush is finished.
|
||||
while listener.success_count() < 3 {
|
||||
while listener.success_count() < 2 {
|
||||
listener.wait().await;
|
||||
}
|
||||
|
||||
|
||||
@@ -20,7 +20,7 @@ use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::time::Instant;
|
||||
|
||||
use common_telemetry::{debug, error, info};
|
||||
use common_telemetry::{debug, error, info, trace};
|
||||
use datatypes::arrow::datatypes::SchemaRef;
|
||||
use either::Either;
|
||||
use partition::expr::PartitionExpr;
|
||||
@@ -89,12 +89,6 @@ pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
|
||||
|
||||
/// Returns the total memory used by memtables.
|
||||
fn memory_usage(&self) -> usize;
|
||||
|
||||
/// Returns the mutable memtable memory limit.
|
||||
///
|
||||
/// The write buffer manager should flush memtables when the mutable memory usage
|
||||
/// exceeds this limit.
|
||||
fn flush_limit(&self) -> usize;
|
||||
}
|
||||
|
||||
pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
|
||||
@@ -151,7 +145,7 @@ impl WriteBufferManagerImpl {
|
||||
impl WriteBufferManager for WriteBufferManagerImpl {
|
||||
fn should_flush_engine(&self) -> bool {
|
||||
let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
|
||||
if mutable_memtable_memory_usage >= self.mutable_limit {
|
||||
if mutable_memtable_memory_usage > self.mutable_limit {
|
||||
debug!(
|
||||
"Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
|
||||
mutable_memtable_memory_usage,
|
||||
@@ -163,8 +157,23 @@ impl WriteBufferManager for WriteBufferManagerImpl {
|
||||
}
|
||||
|
||||
let memory_usage = self.memory_used.load(Ordering::Relaxed);
|
||||
// If the memory exceeds the buffer size, we trigger more aggressive
|
||||
// flush. But if already more than half memory is being flushed,
|
||||
// triggering more flush may not help. We will hold it instead.
|
||||
if memory_usage >= self.global_write_buffer_size {
|
||||
return true;
|
||||
if mutable_memtable_memory_usage >= self.global_write_buffer_size / 2 {
|
||||
debug!(
|
||||
"Engine should flush (over total limit), memory_usage: {}, global_write_buffer_size: {}, \
|
||||
mutable_usage: {}.",
|
||||
memory_usage, self.global_write_buffer_size, mutable_memtable_memory_usage
|
||||
);
|
||||
return true;
|
||||
} else {
|
||||
trace!(
|
||||
"Engine won't flush, memory_usage: {}, global_write_buffer_size: {}, mutable_usage: {}.",
|
||||
memory_usage, self.global_write_buffer_size, mutable_memtable_memory_usage
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
false
|
||||
@@ -196,10 +205,6 @@ impl WriteBufferManager for WriteBufferManagerImpl {
|
||||
fn memory_usage(&self) -> usize {
|
||||
self.memory_used.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
fn flush_limit(&self) -> usize {
|
||||
self.mutable_limit
|
||||
}
|
||||
}
|
||||
|
||||
/// Reason of a flush task.
|
||||
@@ -883,31 +888,6 @@ impl FlushScheduler {
|
||||
self.region_status.contains_key(®ion_id)
|
||||
}
|
||||
|
||||
fn schedule_flush_task(
|
||||
&mut self,
|
||||
version_control: &VersionControlRef,
|
||||
task: RegionFlushTask,
|
||||
) -> Result<()> {
|
||||
let region_id = task.region_id;
|
||||
|
||||
// If current region doesn't have flush status, we can flush the region directly.
|
||||
if let Err(e) = version_control.freeze_mutable() {
|
||||
error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
|
||||
|
||||
return Err(e);
|
||||
}
|
||||
// Submit a flush job.
|
||||
let job = task.into_flush_job(version_control);
|
||||
if let Err(e) = self.scheduler.schedule(job) {
|
||||
// If scheduler returns error, senders in the job will be dropped and waiters
|
||||
// can get recv errors.
|
||||
error!(e; "Failed to schedule flush job for region {}", region_id);
|
||||
|
||||
return Err(e);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Schedules a flush `task` for specific `region`.
|
||||
pub(crate) fn schedule_flush(
|
||||
&mut self,
|
||||
@@ -930,21 +910,46 @@ impl FlushScheduler {
|
||||
.with_label_values(&[task.reason.as_str()])
|
||||
.inc();
|
||||
|
||||
// If current region has flush status, merge the task.
|
||||
if let Some(flush_status) = self.region_status.get_mut(®ion_id) {
|
||||
// Checks whether we can flush the region now.
|
||||
debug!("Merging flush task for region {}", region_id);
|
||||
// Add this region to status map.
|
||||
let flush_status = self
|
||||
.region_status
|
||||
.entry(region_id)
|
||||
.or_insert_with(|| FlushStatus::new(region_id, version_control.clone()));
|
||||
// Checks whether we can flush the region now.
|
||||
if flush_status.flushing {
|
||||
// There is already a flush job running.
|
||||
flush_status.merge_task(task);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.schedule_flush_task(version_control, task)?;
|
||||
// TODO(yingwen): We can merge with pending and execute directly.
|
||||
// If there are pending tasks, then we should push it to pending list.
|
||||
if flush_status.pending_task.is_some() {
|
||||
flush_status.merge_task(task);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Add this region to status map.
|
||||
let _ = self.region_status.insert(
|
||||
region_id,
|
||||
FlushStatus::new(region_id, version_control.clone()),
|
||||
);
|
||||
// Now we can flush the region directly.
|
||||
if let Err(e) = version_control.freeze_mutable() {
|
||||
error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
|
||||
|
||||
// Remove from region status if we can't freeze the mutable memtable.
|
||||
self.region_status.remove(®ion_id);
|
||||
return Err(e);
|
||||
}
|
||||
// Submit a flush job.
|
||||
let job = task.into_flush_job(version_control);
|
||||
if let Err(e) = self.scheduler.schedule(job) {
|
||||
// If scheduler returns error, senders in the job will be dropped and waiters
|
||||
// can get recv errors.
|
||||
error!(e; "Failed to schedule flush job for region {}", region_id);
|
||||
|
||||
// Remove from region status if we can't submit the task.
|
||||
self.region_status.remove(®ion_id);
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
flush_status.flushing = true;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -961,56 +966,48 @@ impl FlushScheduler {
|
||||
Vec<SenderBulkRequest>,
|
||||
)> {
|
||||
let flush_status = self.region_status.get_mut(®ion_id)?;
|
||||
// If region doesn't have any pending flush task, we need to remove it from the status.
|
||||
if flush_status.pending_task.is_none() {
|
||||
|
||||
// This region doesn't have running flush job.
|
||||
flush_status.flushing = false;
|
||||
|
||||
let pending_requests = if flush_status.pending_task.is_none() {
|
||||
// The region doesn't have any pending flush task.
|
||||
// Safety: The flush status must exist.
|
||||
debug!(
|
||||
"Region {} doesn't have any pending flush task, removing it from the status",
|
||||
region_id
|
||||
);
|
||||
let flush_status = self.region_status.remove(®ion_id).unwrap();
|
||||
return Some((
|
||||
Some((
|
||||
flush_status.pending_ddls,
|
||||
flush_status.pending_writes,
|
||||
flush_status.pending_bulk_writes,
|
||||
));
|
||||
))
|
||||
} else {
|
||||
let version_data = flush_status.version_control.current();
|
||||
if version_data.version.memtables.is_empty() {
|
||||
// The region has nothing to flush, we also need to remove it from the status.
|
||||
// Safety: The pending task is not None.
|
||||
let task = flush_status.pending_task.take().unwrap();
|
||||
// The region has nothing to flush. We can notify pending task.
|
||||
task.on_success();
|
||||
// `schedule_next_flush()` may pick up the same region to flush, so we must remove
|
||||
// it from the status to avoid leaking pending requests.
|
||||
// Safety: The flush status must exist.
|
||||
let flush_status = self.region_status.remove(®ion_id).unwrap();
|
||||
Some((
|
||||
flush_status.pending_ddls,
|
||||
flush_status.pending_writes,
|
||||
flush_status.pending_bulk_writes,
|
||||
))
|
||||
} else {
|
||||
// We can flush the region again, keep it in the region status.
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
// Schedule next flush job.
|
||||
if let Err(e) = self.schedule_next_flush() {
|
||||
error!(e; "Flush of region {} is successful, but failed to schedule next flush", region_id);
|
||||
}
|
||||
|
||||
// If region has pending task, but has nothing to flush, we need to remove it from the status.
|
||||
let version_data = flush_status.version_control.current();
|
||||
if version_data.version.memtables.is_empty() {
|
||||
// The region has nothing to flush, we also need to remove it from the status.
|
||||
// Safety: The pending task is not None.
|
||||
let task = flush_status.pending_task.take().unwrap();
|
||||
// The region has nothing to flush. We can notify pending task.
|
||||
task.on_success();
|
||||
debug!(
|
||||
"Region {} has nothing to flush, removing it from the status",
|
||||
region_id
|
||||
);
|
||||
// Safety: The flush status must exist.
|
||||
let flush_status = self.region_status.remove(®ion_id).unwrap();
|
||||
return Some((
|
||||
flush_status.pending_ddls,
|
||||
flush_status.pending_writes,
|
||||
flush_status.pending_bulk_writes,
|
||||
));
|
||||
}
|
||||
|
||||
// If region has pending task and has something to flush, we need to schedule it.
|
||||
debug!("Scheduling pending flush task for region {}", region_id);
|
||||
// Safety: The flush status must exist.
|
||||
let task = flush_status.pending_task.take().unwrap();
|
||||
let version_control = flush_status.version_control.clone();
|
||||
if let Err(err) = self.schedule_flush_task(&version_control, task) {
|
||||
error!(
|
||||
err;
|
||||
"Flush succeeded for region {region_id}, but failed to schedule next flush for it."
|
||||
);
|
||||
}
|
||||
// We can flush the region again, keep it in the region status.
|
||||
None
|
||||
pending_requests
|
||||
}
|
||||
|
||||
/// Notifies the scheduler that the flush job is failed.
|
||||
@@ -1026,6 +1023,11 @@ impl FlushScheduler {
|
||||
|
||||
// Fast fail: cancels all pending tasks and sends error to their waiters.
|
||||
flush_status.on_failure(err);
|
||||
|
||||
// Still tries to schedule a new flush.
|
||||
if let Err(e) = self.schedule_next_flush() {
|
||||
error!(e; "Failed to schedule next flush after region {} flush is failed", region_id);
|
||||
}
|
||||
}
|
||||
|
||||
/// Notifies the scheduler that the region is dropped.
|
||||
@@ -1096,6 +1098,30 @@ impl FlushScheduler {
|
||||
.map(|status| !status.pending_ddls.is_empty())
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
/// Schedules a new flush task when the scheduler can submit next task.
|
||||
pub(crate) fn schedule_next_flush(&mut self) -> Result<()> {
|
||||
debug_assert!(
|
||||
self.region_status
|
||||
.values()
|
||||
.all(|status| status.flushing || status.pending_task.is_some())
|
||||
);
|
||||
|
||||
// Get the first region from status map.
|
||||
let Some(flush_status) = self
|
||||
.region_status
|
||||
.values_mut()
|
||||
.find(|status| status.pending_task.is_some())
|
||||
else {
|
||||
return Ok(());
|
||||
};
|
||||
debug_assert!(!flush_status.flushing);
|
||||
let task = flush_status.pending_task.take().unwrap();
|
||||
let region_id = flush_status.region_id;
|
||||
let version_control = flush_status.version_control.clone();
|
||||
|
||||
self.schedule_flush(region_id, &version_control, task)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for FlushScheduler {
|
||||
@@ -1115,6 +1141,11 @@ struct FlushStatus {
|
||||
region_id: RegionId,
|
||||
/// Version control of the region.
|
||||
version_control: VersionControlRef,
|
||||
/// There is a flush task running.
|
||||
///
|
||||
/// It is possible that a region is not flushing but has pending task if the scheduler
|
||||
/// doesn't schedules this region.
|
||||
flushing: bool,
|
||||
/// Task waiting for next flush.
|
||||
pending_task: Option<RegionFlushTask>,
|
||||
/// Pending ddl requests.
|
||||
@@ -1130,6 +1161,7 @@ impl FlushStatus {
|
||||
FlushStatus {
|
||||
region_id,
|
||||
version_control,
|
||||
flushing: false,
|
||||
pending_task: None,
|
||||
pending_ddls: Vec::new(),
|
||||
pending_writes: Vec::new(),
|
||||
@@ -1221,12 +1253,10 @@ mod tests {
|
||||
// Global usage is still 1100.
|
||||
manager.schedule_free_mem(200);
|
||||
assert!(manager.should_flush_engine());
|
||||
assert!(manager.should_stall());
|
||||
|
||||
// More than global limit, mutable (1100-200-450=450) is less than mutable limit (< 500).
|
||||
// More than global limit, but mutable (1100-200-450=450) is not enough (< 500).
|
||||
manager.schedule_free_mem(450);
|
||||
assert!(manager.should_flush_engine());
|
||||
assert!(manager.should_stall());
|
||||
assert!(!manager.should_flush_engine());
|
||||
|
||||
// Now mutable is enough.
|
||||
manager.reserve_mem(50);
|
||||
@@ -1473,92 +1503,4 @@ mod tests {
|
||||
assert_eq!(2, total_rows, "append_mode should preserve duplicates");
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_schedule_pending_request_on_flush_success() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let job_scheduler = Arc::new(VecScheduler::default());
|
||||
let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
|
||||
let (tx, _rx) = mpsc::channel(4);
|
||||
let mut scheduler = env.mock_flush_scheduler();
|
||||
let mut builder = VersionControlBuilder::new();
|
||||
// Overwrites the empty memtable builder.
|
||||
builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
|
||||
let version_control = Arc::new(builder.build());
|
||||
// Writes data to the memtable so it is not empty.
|
||||
let version_data = version_control.current();
|
||||
write_rows_to_version(&version_data.version, "host0", 0, 10);
|
||||
let manifest_ctx = env
|
||||
.mock_manifest_context(version_data.version.metadata.clone())
|
||||
.await;
|
||||
// Creates 2 tasks.
|
||||
let mut tasks: Vec<_> = (0..2)
|
||||
.map(|_| RegionFlushTask {
|
||||
region_id: builder.region_id(),
|
||||
reason: FlushReason::Others,
|
||||
senders: Vec::new(),
|
||||
request_sender: tx.clone(),
|
||||
access_layer: env.access_layer.clone(),
|
||||
listener: WorkerListener::default(),
|
||||
engine_config: Arc::new(MitoConfig::default()),
|
||||
row_group_size: None,
|
||||
cache_manager: Arc::new(CacheManager::default()),
|
||||
manifest_ctx: manifest_ctx.clone(),
|
||||
index_options: IndexOptions::default(),
|
||||
flush_semaphore: Arc::new(Semaphore::new(2)),
|
||||
is_staging: false,
|
||||
})
|
||||
.collect();
|
||||
// Schedule first task.
|
||||
let task = tasks.pop().unwrap();
|
||||
scheduler
|
||||
.schedule_flush(builder.region_id(), &version_control, task)
|
||||
.unwrap();
|
||||
// Should schedule 1 flush.
|
||||
assert_eq!(1, scheduler.region_status.len());
|
||||
assert_eq!(1, job_scheduler.num_jobs());
|
||||
// Schedule second task.
|
||||
let task = tasks.pop().unwrap();
|
||||
scheduler
|
||||
.schedule_flush(builder.region_id(), &version_control, task)
|
||||
.unwrap();
|
||||
assert!(
|
||||
scheduler
|
||||
.region_status
|
||||
.get(&builder.region_id())
|
||||
.unwrap()
|
||||
.pending_task
|
||||
.is_some()
|
||||
);
|
||||
|
||||
// Check the new version.
|
||||
let version_data = version_control.current();
|
||||
assert_eq!(0, version_data.version.memtables.immutables()[0].id());
|
||||
// Assumes the flush job is finished.
|
||||
version_control.apply_edit(
|
||||
Some(RegionEdit {
|
||||
files_to_add: Vec::new(),
|
||||
files_to_remove: Vec::new(),
|
||||
timestamp_ms: None,
|
||||
compaction_time_window: None,
|
||||
flushed_entry_id: None,
|
||||
flushed_sequence: None,
|
||||
committed_sequence: None,
|
||||
}),
|
||||
&[0],
|
||||
builder.file_purger(),
|
||||
);
|
||||
write_rows_to_version(&version_data.version, "host1", 0, 10);
|
||||
scheduler.on_flush_success(builder.region_id());
|
||||
assert_eq!(2, job_scheduler.num_jobs());
|
||||
// The pending task is cleared.
|
||||
assert!(
|
||||
scheduler
|
||||
.region_status
|
||||
.get(&builder.region_id())
|
||||
.unwrap()
|
||||
.pending_task
|
||||
.is_none()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1020,15 +1020,9 @@ pub struct MockWriteBufferManager {
|
||||
should_stall: AtomicBool,
|
||||
memory_used: AtomicUsize,
|
||||
memory_active: AtomicUsize,
|
||||
flush_limit: usize,
|
||||
}
|
||||
|
||||
impl MockWriteBufferManager {
|
||||
/// Set flush limit.
|
||||
pub fn set_flush_limit(&mut self, flush_limit: usize) {
|
||||
self.flush_limit = flush_limit;
|
||||
}
|
||||
|
||||
/// Set whether to flush the engine.
|
||||
pub fn set_should_flush(&self, value: bool) {
|
||||
self.should_flush.store(value, Ordering::Relaxed);
|
||||
@@ -1070,10 +1064,6 @@ impl WriteBufferManager for MockWriteBufferManager {
|
||||
fn memory_usage(&self) -> usize {
|
||||
self.memory_used.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
fn flush_limit(&self) -> usize {
|
||||
self.flush_limit
|
||||
}
|
||||
}
|
||||
|
||||
pub fn column_metadata_to_column_schema(metadata: &ColumnMetadata) -> api::v1::ColumnSchema {
|
||||
|
||||
@@ -22,7 +22,6 @@ use common_telemetry::info;
|
||||
use common_telemetry::tracing::warn;
|
||||
use humantime_serde::re::humantime;
|
||||
use snafu::{ResultExt, ensure};
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::metadata::{
|
||||
InvalidSetRegionOptionRequestSnafu, MetadataError, RegionMetadata, RegionMetadataBuilder,
|
||||
RegionMetadataRef,
|
||||
@@ -42,7 +41,7 @@ use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest};
|
||||
use crate::sst::FormatType;
|
||||
use crate::worker::RegionWorkerLoop;
|
||||
|
||||
impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
impl<S> RegionWorkerLoop<S> {
|
||||
pub(crate) async fn handle_alter_request(
|
||||
&mut self,
|
||||
region_id: RegionId,
|
||||
|
||||
@@ -30,26 +30,16 @@ use crate::request::{BuildIndexRequest, FlushFailed, FlushFinished, OnFailure, O
|
||||
use crate::sst::index::IndexBuildType;
|
||||
use crate::worker::RegionWorkerLoop;
|
||||
|
||||
impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
impl<S> RegionWorkerLoop<S> {
|
||||
/// On region flush job failed.
|
||||
pub(crate) async fn handle_flush_failed(&mut self, region_id: RegionId, request: FlushFailed) {
|
||||
self.flush_scheduler.on_flush_failed(region_id, request.err);
|
||||
debug!(
|
||||
"Flush failed for region {}, handling stalled requests",
|
||||
region_id
|
||||
);
|
||||
// Maybe flush worker again.
|
||||
self.maybe_flush_worker();
|
||||
|
||||
// Handle stalled requests.
|
||||
self.handle_stalled_requests().await;
|
||||
}
|
||||
|
||||
/// Checks whether the engine reaches flush threshold. If so, finds regions in this
|
||||
/// worker to flush.
|
||||
pub(crate) fn maybe_flush_worker(&mut self) {
|
||||
if !self.write_buffer_manager.should_flush_engine() {
|
||||
debug!("No need to flush worker");
|
||||
// No need to flush worker.
|
||||
return;
|
||||
}
|
||||
@@ -66,7 +56,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
let regions = self.regions.list_regions();
|
||||
let now = self.time_provider.current_time_millis();
|
||||
let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
|
||||
let mut pending_regions = vec![];
|
||||
let mut max_mutable_size = 0;
|
||||
// Region with max mutable memtable size.
|
||||
let mut max_mem_region = None;
|
||||
|
||||
for region in ®ions {
|
||||
if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
|
||||
@@ -75,8 +67,12 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
}
|
||||
|
||||
let version = region.version();
|
||||
let region_memtable_size =
|
||||
version.memtables.mutable_usage() + version.memtables.immutables_usage();
|
||||
let region_mutable_size = version.memtables.mutable_usage();
|
||||
// Tracks region with max mutable memtable size.
|
||||
if region_mutable_size > max_mutable_size {
|
||||
max_mem_region = Some(region);
|
||||
max_mutable_size = region_mutable_size;
|
||||
}
|
||||
|
||||
if region.last_flush_millis() < min_last_flush_time {
|
||||
// If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
|
||||
@@ -92,38 +88,14 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
®ion.version_control,
|
||||
task,
|
||||
)?;
|
||||
} else if region_memtable_size > 0 {
|
||||
// We should only consider regions with memtable size > 0 to flush.
|
||||
pending_regions.push((region, region_memtable_size));
|
||||
}
|
||||
}
|
||||
pending_regions.sort_unstable_by_key(|(_, size)| std::cmp::Reverse(*size));
|
||||
// The flush target is the mutable memtable limit (half of the global buffer).
|
||||
// When memory is full, we aggressively flush regions until usage drops below this target,
|
||||
// not just below the full limit.
|
||||
let target_memory_usage = self.write_buffer_manager.flush_limit();
|
||||
let mut memory_usage = self.write_buffer_manager.memory_usage();
|
||||
|
||||
#[cfg(test)]
|
||||
// Flush memtable with max mutable memtable.
|
||||
// TODO(yingwen): Maybe flush more tables to reduce write buffer size.
|
||||
if let Some(region) = max_mem_region
|
||||
&& !self.flush_scheduler.is_flush_requested(region.region_id)
|
||||
{
|
||||
debug!(
|
||||
"Flushing regions on engine full, target memory usage: {}, memory usage: {}, pending regions: {:?}",
|
||||
target_memory_usage,
|
||||
memory_usage,
|
||||
pending_regions
|
||||
.iter()
|
||||
.map(|(region, mem_size)| (region.region_id, mem_size))
|
||||
.collect::<Vec<_>>()
|
||||
);
|
||||
}
|
||||
// Iterate over pending regions in descending order of their memory size and schedule flush tasks
|
||||
// for each region until the overall memory usage drops below the flush limit.
|
||||
for (region, region_mem_size) in pending_regions.into_iter() {
|
||||
// Make sure the first region is always flushed.
|
||||
if memory_usage < target_memory_usage {
|
||||
// Stop flushing regions if memory usage is already below the flush limit
|
||||
break;
|
||||
}
|
||||
let task = self.new_flush_task(
|
||||
region,
|
||||
FlushReason::EngineFull,
|
||||
@@ -131,12 +103,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
self.config.clone(),
|
||||
region.is_staging(),
|
||||
);
|
||||
debug!("Scheduling flush task for region {}", region.region_id);
|
||||
// Schedule a flush task for the current region
|
||||
self.flush_scheduler
|
||||
.schedule_flush(region.region_id, ®ion.version_control, task)?;
|
||||
// Reduce memory usage by the region's size, ensuring it doesn't go negative
|
||||
memory_usage = memory_usage.saturating_sub(region_mem_size);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -323,9 +291,6 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
.await;
|
||||
}
|
||||
|
||||
// Maybe flush worker again.
|
||||
self.maybe_flush_worker();
|
||||
|
||||
// Handle stalled requests.
|
||||
self.handle_stalled_requests().await;
|
||||
|
||||
|
||||
@@ -4327,7 +4327,7 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
|
||||
.await;
|
||||
|
||||
// select metrics data
|
||||
let expected = "[[1753780559836,2.244618,\"arm64\",\"claude-code\",\"claude-sonnet-4-20250514\",\"25.0.0\",\"com.anthropic.claude_code\",\"\",\"1.0.62\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"],[1753780559836,0.0052544,\"arm64\",\"claude-code\",\"claude-3-5-haiku-20241022\",\"25.0.0\",\"com.anthropic.claude_code\",\"\",\"1.0.62\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"]]";
|
||||
let expected = "[[1753780559836,0.0052544,\"arm64\",\"claude-code\",\"claude-3-5-haiku-20241022\",\"25.0.0\",\"com.anthropic.claude_code\",\"\",\"1.0.62\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"],[1753780559836,2.244618,\"arm64\",\"claude-code\",\"claude-sonnet-4-20250514\",\"25.0.0\",\"com.anthropic.claude_code\",\"\",\"1.0.62\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"]]";
|
||||
validate_data(
|
||||
"otlp_metrics_all_select",
|
||||
&client,
|
||||
@@ -4399,7 +4399,7 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
|
||||
.await;
|
||||
|
||||
// select metrics data
|
||||
let expected = "[[1753780559836,0.0052544,\"claude-code\",\"claude-3-5-haiku-20241022\",\"darwin\",\"25.0.0\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"],[1753780559836,2.244618,\"claude-code\",\"claude-sonnet-4-20250514\",\"darwin\",\"25.0.0\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"]]";
|
||||
let expected = "[[1753780559836,2.244618,\"claude-code\",\"claude-sonnet-4-20250514\",\"darwin\",\"25.0.0\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"],[1753780559836,0.0052544,\"claude-code\",\"claude-3-5-haiku-20241022\",\"darwin\",\"25.0.0\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"]]";
|
||||
validate_data(
|
||||
"otlp_metrics_select",
|
||||
&client,
|
||||
|
||||
@@ -363,7 +363,7 @@ pub async fn test_metric_table_region_migration_by_sql(
|
||||
let result = cluster
|
||||
.frontend
|
||||
.instance
|
||||
.do_query("select * from t1 order by host desc", query_ctx.clone())
|
||||
.do_query("select * from t1", query_ctx.clone())
|
||||
.await
|
||||
.remove(0);
|
||||
|
||||
@@ -379,7 +379,7 @@ pub async fn test_metric_table_region_migration_by_sql(
|
||||
let result = cluster
|
||||
.frontend
|
||||
.instance
|
||||
.do_query("select * from t2 order by job desc", query_ctx)
|
||||
.do_query("select * from t2", query_ctx)
|
||||
.await
|
||||
.remove(0);
|
||||
|
||||
|
||||
@@ -37,8 +37,8 @@ SELECT * from t2;
|
||||
+------+-------------------------+-----+
|
||||
| job | ts | val |
|
||||
+------+-------------------------+-----+
|
||||
| job1 | 1970-01-01T00:00:00 | 0.0 |
|
||||
| job2 | 1970-01-01T00:00:00.001 | 1.0 |
|
||||
| job1 | 1970-01-01T00:00:00 | 0.0 |
|
||||
+------+-------------------------+-----+
|
||||
|
||||
DROP TABLE t1;
|
||||
@@ -67,10 +67,10 @@ SELECT ts, val, __tsid, host, job FROM phy;
|
||||
+-------------------------+-----+----------------------+-------+------+
|
||||
| ts | val | __tsid | host | job |
|
||||
+-------------------------+-----+----------------------+-------+------+
|
||||
| 1970-01-01T00:00:00.001 | 1.0 | 7947983149541006936 | host2 | |
|
||||
| 1970-01-01T00:00:00 | 0.0 | 13882403126406556045 | host1 | |
|
||||
| 1970-01-01T00:00:00 | 0.0 | 6248409809737953425 | | job1 |
|
||||
| 1970-01-01T00:00:00.001 | 1.0 | 12867770218286207316 | | job2 |
|
||||
| 1970-01-01T00:00:00.001 | 1.0 | 1128149335081630826 | host2 | |
|
||||
| 1970-01-01T00:00:00 | 0.0 | 18067404594631612786 | host1 | |
|
||||
| 1970-01-01T00:00:00.001 | 1.0 | 2176048834144407834 | | job2 |
|
||||
| 1970-01-01T00:00:00 | 0.0 | 15980333303142110493 | | job1 |
|
||||
+-------------------------+-----+----------------------+-------+------+
|
||||
|
||||
DROP TABLE phy;
|
||||
@@ -123,8 +123,8 @@ SELECT * from t2;
|
||||
+------+-------------------------+-----+
|
||||
| job | ts | val |
|
||||
+------+-------------------------+-----+
|
||||
| job1 | 1970-01-01T00:00:00 | 0.0 |
|
||||
| job2 | 1970-01-01T00:00:00.001 | 1.0 |
|
||||
| job1 | 1970-01-01T00:00:00 | 0.0 |
|
||||
+------+-------------------------+-----+
|
||||
|
||||
ADMIN flush_table('phy');
|
||||
@@ -154,10 +154,10 @@ SELECT * from t2;
|
||||
+------+-------------------------+-----+
|
||||
| job | ts | val |
|
||||
+------+-------------------------+-----+
|
||||
| job3 | 1970-01-01T00:00:00 | 0.0 |
|
||||
| job1 | 1970-01-01T00:00:00 | 0.0 |
|
||||
| job4 | 1970-01-01T00:00:00.001 | 1.0 |
|
||||
| job2 | 1970-01-01T00:00:00.001 | 1.0 |
|
||||
| job3 | 1970-01-01T00:00:00 | 0.0 |
|
||||
| job4 | 1970-01-01T00:00:00.001 | 1.0 |
|
||||
| job1 | 1970-01-01T00:00:00 | 0.0 |
|
||||
+------+-------------------------+-----+
|
||||
|
||||
DROP TABLE t1;
|
||||
|
||||
@@ -22,14 +22,14 @@ INSERT INTO test_ttl(ts, val, host) VALUES
|
||||
|
||||
Affected Rows: 3
|
||||
|
||||
SELECT val, host FROM test_ttl ORDER BY host;
|
||||
SELECT val, host FROM test_ttl;
|
||||
|
||||
+-----+-------+
|
||||
| val | host |
|
||||
+-----+-------+
|
||||
| 1.0 | host1 |
|
||||
| 2.0 | host2 |
|
||||
| 3.0 | host3 |
|
||||
| 1.0 | host1 |
|
||||
+-----+-------+
|
||||
|
||||
-- SQLNESS SLEEP 2s
|
||||
@@ -83,26 +83,26 @@ ADMIN compact_table('phy');
|
||||
+----------------------------+
|
||||
|
||||
--- should not be expired --
|
||||
SELECT val, host FROM test_ttl ORDER BY host;
|
||||
SELECT val, host FROM test_ttl;
|
||||
|
||||
+-----+-------+
|
||||
| val | host |
|
||||
+-----+-------+
|
||||
| 1.0 | host1 |
|
||||
| 2.0 | host2 |
|
||||
| 3.0 | host3 |
|
||||
| 1.0 | host1 |
|
||||
+-----+-------+
|
||||
|
||||
-- restart the db, ensure everything is ok
|
||||
-- SQLNESS ARG restart=true
|
||||
SELECT val, host FROM test_ttl ORDER BY host;
|
||||
SELECT val, host FROM test_ttl;
|
||||
|
||||
+-----+-------+
|
||||
| val | host |
|
||||
+-----+-------+
|
||||
| 1.0 | host1 |
|
||||
| 2.0 | host2 |
|
||||
| 3.0 | host3 |
|
||||
| 1.0 | host1 |
|
||||
+-----+-------+
|
||||
|
||||
DROP TABLE test_ttl;
|
||||
|
||||
@@ -13,7 +13,7 @@ INSERT INTO test_ttl(ts, val, host) VALUES
|
||||
(now(), 2, 'host2'),
|
||||
(now(), 3, 'host3');
|
||||
|
||||
SELECT val, host FROM test_ttl ORDER BY host;
|
||||
SELECT val, host FROM test_ttl;
|
||||
|
||||
-- SQLNESS SLEEP 2s
|
||||
ADMIN flush_table('phy');
|
||||
@@ -35,11 +35,11 @@ ADMIN flush_table('phy');
|
||||
ADMIN compact_table('phy');
|
||||
|
||||
--- should not be expired --
|
||||
SELECT val, host FROM test_ttl ORDER BY host;
|
||||
SELECT val, host FROM test_ttl;
|
||||
|
||||
-- restart the db, ensure everything is ok
|
||||
-- SQLNESS ARG restart=true
|
||||
SELECT val, host FROM test_ttl ORDER BY host;
|
||||
SELECT val, host FROM test_ttl;
|
||||
|
||||
DROP TABLE test_ttl;
|
||||
|
||||
|
||||
@@ -13,14 +13,14 @@ INSERT INTO test_ttl(ts, val, host) VALUES
|
||||
|
||||
Affected Rows: 3
|
||||
|
||||
SELECT val, host FROM test_ttl ORDER BY host;
|
||||
SELECT val, host FROM test_ttl;
|
||||
|
||||
+-----+-------+
|
||||
| val | host |
|
||||
+-----+-------+
|
||||
| 1.0 | host1 |
|
||||
| 2.0 | host2 |
|
||||
| 3.0 | host3 |
|
||||
| 1.0 | host1 |
|
||||
+-----+-------+
|
||||
|
||||
-- SQLNESS SLEEP 2s
|
||||
@@ -74,26 +74,26 @@ ADMIN compact_table('phy');
|
||||
+----------------------------+
|
||||
|
||||
--- should not be expired --
|
||||
SELECT val, host FROM test_ttl ORDER BY host;
|
||||
SELECT val, host FROM test_ttl;
|
||||
|
||||
+-----+-------+
|
||||
| val | host |
|
||||
+-----+-------+
|
||||
| 1.0 | host1 |
|
||||
| 2.0 | host2 |
|
||||
| 3.0 | host3 |
|
||||
| 1.0 | host1 |
|
||||
+-----+-------+
|
||||
|
||||
-- restart the db, ensure everything is ok
|
||||
-- SQLNESS ARG restart=true
|
||||
SELECT val, host FROM test_ttl ORDER BY host;
|
||||
SELECT val, host FROM test_ttl;
|
||||
|
||||
+-----+-------+
|
||||
| val | host |
|
||||
+-----+-------+
|
||||
| 1.0 | host1 |
|
||||
| 2.0 | host2 |
|
||||
| 3.0 | host3 |
|
||||
| 1.0 | host1 |
|
||||
+-----+-------+
|
||||
|
||||
DROP TABLE test_ttl;
|
||||
|
||||
@@ -7,7 +7,7 @@ INSERT INTO test_ttl(ts, val, host) VALUES
|
||||
(now(), 2, 'host2'),
|
||||
(now(), 3, 'host3');
|
||||
|
||||
SELECT val, host FROM test_ttl ORDER BY host;
|
||||
SELECT val, host FROM test_ttl;
|
||||
|
||||
-- SQLNESS SLEEP 2s
|
||||
ADMIN flush_table('phy');
|
||||
@@ -29,11 +29,11 @@ ADMIN flush_table('phy');
|
||||
ADMIN compact_table('phy');
|
||||
|
||||
--- should not be expired --
|
||||
SELECT val, host FROM test_ttl ORDER BY host;
|
||||
SELECT val, host FROM test_ttl;
|
||||
|
||||
-- restart the db, ensure everything is ok
|
||||
-- SQLNESS ARG restart=true
|
||||
SELECT val, host FROM test_ttl ORDER BY host;
|
||||
SELECT val, host FROM test_ttl;
|
||||
|
||||
DROP TABLE test_ttl;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user