mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 05:42:57 +00:00
Compare commits
2 Commits
feature/df
...
v1.0.0-bet
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0177f244e9 | ||
|
|
931556dbd3 |
64
Cargo.lock
generated
64
Cargo.lock
generated
@@ -3274,7 +3274,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion"
|
name = "datafusion"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"arrow-ipc",
|
"arrow-ipc",
|
||||||
@@ -3329,7 +3329,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-catalog"
|
name = "datafusion-catalog"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
@@ -3353,7 +3353,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-catalog-listing"
|
name = "datafusion-catalog-listing"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
@@ -3375,7 +3375,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-common"
|
name = "datafusion-common"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"ahash 0.8.12",
|
"ahash 0.8.12",
|
||||||
"arrow",
|
"arrow",
|
||||||
@@ -3398,7 +3398,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-common-runtime"
|
name = "datafusion-common-runtime"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"futures",
|
"futures",
|
||||||
"log",
|
"log",
|
||||||
@@ -3408,7 +3408,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-datasource"
|
name = "datafusion-datasource"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"async-compression 0.4.19",
|
"async-compression 0.4.19",
|
||||||
@@ -3442,7 +3442,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-datasource-csv"
|
name = "datafusion-datasource-csv"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
@@ -3464,7 +3464,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-datasource-json"
|
name = "datafusion-datasource-json"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
@@ -3485,7 +3485,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-datasource-parquet"
|
name = "datafusion-datasource-parquet"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
@@ -3514,12 +3514,12 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-doc"
|
name = "datafusion-doc"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-execution"
|
name = "datafusion-execution"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
@@ -3538,7 +3538,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-expr"
|
name = "datafusion-expr"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
@@ -3560,7 +3560,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-expr-common"
|
name = "datafusion-expr-common"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"datafusion-common",
|
"datafusion-common",
|
||||||
@@ -3572,7 +3572,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-functions"
|
name = "datafusion-functions"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"arrow-buffer",
|
"arrow-buffer",
|
||||||
@@ -3600,7 +3600,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-functions-aggregate"
|
name = "datafusion-functions-aggregate"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"ahash 0.8.12",
|
"ahash 0.8.12",
|
||||||
"arrow",
|
"arrow",
|
||||||
@@ -3620,7 +3620,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-functions-aggregate-common"
|
name = "datafusion-functions-aggregate-common"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"ahash 0.8.12",
|
"ahash 0.8.12",
|
||||||
"arrow",
|
"arrow",
|
||||||
@@ -3632,7 +3632,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-functions-nested"
|
name = "datafusion-functions-nested"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"arrow-ord",
|
"arrow-ord",
|
||||||
@@ -3654,7 +3654,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-functions-table"
|
name = "datafusion-functions-table"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
@@ -3669,7 +3669,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-functions-window"
|
name = "datafusion-functions-window"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"datafusion-common",
|
"datafusion-common",
|
||||||
@@ -3686,7 +3686,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-functions-window-common"
|
name = "datafusion-functions-window-common"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"datafusion-common",
|
"datafusion-common",
|
||||||
"datafusion-physical-expr-common",
|
"datafusion-physical-expr-common",
|
||||||
@@ -3695,7 +3695,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-macros"
|
name = "datafusion-macros"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"datafusion-doc",
|
"datafusion-doc",
|
||||||
"quote",
|
"quote",
|
||||||
@@ -3705,7 +3705,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-optimizer"
|
name = "datafusion-optimizer"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"chrono",
|
"chrono",
|
||||||
@@ -3756,7 +3756,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-physical-expr"
|
name = "datafusion-physical-expr"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"ahash 0.8.12",
|
"ahash 0.8.12",
|
||||||
"arrow",
|
"arrow",
|
||||||
@@ -3777,7 +3777,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-physical-expr-adapter"
|
name = "datafusion-physical-expr-adapter"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"datafusion-common",
|
"datafusion-common",
|
||||||
@@ -3791,7 +3791,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-physical-expr-common"
|
name = "datafusion-physical-expr-common"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"ahash 0.8.12",
|
"ahash 0.8.12",
|
||||||
"arrow",
|
"arrow",
|
||||||
@@ -3804,7 +3804,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-physical-optimizer"
|
name = "datafusion-physical-optimizer"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"datafusion-common",
|
"datafusion-common",
|
||||||
@@ -3822,7 +3822,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-physical-plan"
|
name = "datafusion-physical-plan"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"ahash 0.8.12",
|
"ahash 0.8.12",
|
||||||
"arrow",
|
"arrow",
|
||||||
@@ -3852,7 +3852,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-pruning"
|
name = "datafusion-pruning"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"datafusion-common",
|
"datafusion-common",
|
||||||
@@ -3868,7 +3868,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-session"
|
name = "datafusion-session"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"datafusion-common",
|
"datafusion-common",
|
||||||
@@ -3881,7 +3881,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-sql"
|
name = "datafusion-sql"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"bigdecimal 0.4.8",
|
"bigdecimal 0.4.8",
|
||||||
@@ -3898,7 +3898,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "datafusion-substrait"
|
name = "datafusion-substrait"
|
||||||
version = "50.1.0"
|
version = "50.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=7f8ea0a45748ed32695757368f847ab9ac7b6c82#7f8ea0a45748ed32695757368f847ab9ac7b6c82"
|
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=fd4b2abcf3c3e43e94951bda452c9fd35243aab0#fd4b2abcf3c3e43e94951bda452c9fd35243aab0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-recursion",
|
"async-recursion",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
@@ -7514,9 +7514,11 @@ dependencies = [
|
|||||||
"common-test-util",
|
"common-test-util",
|
||||||
"common-time",
|
"common-time",
|
||||||
"common-wal",
|
"common-wal",
|
||||||
|
"criterion 0.4.0",
|
||||||
"datafusion",
|
"datafusion",
|
||||||
"datatypes",
|
"datatypes",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
|
"fxhash",
|
||||||
"humantime-serde",
|
"humantime-serde",
|
||||||
"itertools 0.14.0",
|
"itertools 0.14.0",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
|
|||||||
24
Cargo.toml
24
Cargo.toml
@@ -316,18 +316,18 @@ git = "https://github.com/GreptimeTeam/greptime-meter.git"
|
|||||||
rev = "5618e779cf2bb4755b499c630fba4c35e91898cb"
|
rev = "5618e779cf2bb4755b499c630fba4c35e91898cb"
|
||||||
|
|
||||||
[patch.crates-io]
|
[patch.crates-io]
|
||||||
datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
|
datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
|
||||||
datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
|
datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
|
||||||
datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
|
datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
|
||||||
datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
|
datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
|
||||||
datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
|
datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
|
||||||
datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
|
datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
|
||||||
datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
|
datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
|
||||||
datafusion-physical-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
|
datafusion-physical-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
|
||||||
datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
|
datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
|
||||||
datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
|
datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
|
||||||
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
|
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
|
||||||
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7f8ea0a45748ed32695757368f847ab9ac7b6c82" }
|
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
|
||||||
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "4b519a5caa95472cc3988f5556813a583dd35af1" } # branch = "v0.58.x"
|
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "4b519a5caa95472cc3988f5556813a583dd35af1" } # branch = "v0.58.x"
|
||||||
|
|
||||||
[profile.release]
|
[profile.release]
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ async-stream.workspace = true
|
|||||||
async-trait.workspace = true
|
async-trait.workspace = true
|
||||||
base64.workspace = true
|
base64.workspace = true
|
||||||
bytes.workspace = true
|
bytes.workspace = true
|
||||||
|
fxhash = "0.2"
|
||||||
common-base.workspace = true
|
common-base.workspace = true
|
||||||
common-error.workspace = true
|
common-error.workspace = true
|
||||||
common-macro.workspace = true
|
common-macro.workspace = true
|
||||||
@@ -31,7 +32,6 @@ lazy_static = "1.4"
|
|||||||
mito-codec.workspace = true
|
mito-codec.workspace = true
|
||||||
mito2.workspace = true
|
mito2.workspace = true
|
||||||
moka.workspace = true
|
moka.workspace = true
|
||||||
mur3 = "0.1"
|
|
||||||
object-store.workspace = true
|
object-store.workspace = true
|
||||||
prometheus.workspace = true
|
prometheus.workspace = true
|
||||||
serde.workspace = true
|
serde.workspace = true
|
||||||
@@ -47,6 +47,12 @@ common-meta = { workspace = true, features = ["testing"] }
|
|||||||
common-test-util.workspace = true
|
common-test-util.workspace = true
|
||||||
mito2 = { workspace = true, features = ["test"] }
|
mito2 = { workspace = true, features = ["test"] }
|
||||||
common-wal = { workspace = true }
|
common-wal = { workspace = true }
|
||||||
|
criterion = { version = "0.4", features = ["async", "async_tokio"] }
|
||||||
|
mur3 = "0.1"
|
||||||
|
|
||||||
|
[[bench]]
|
||||||
|
name = "bench_tsid_generator"
|
||||||
|
harness = false
|
||||||
|
|
||||||
[package.metadata.cargo-udeps.ignore]
|
[package.metadata.cargo-udeps.ignore]
|
||||||
normal = ["aquamarine"]
|
normal = ["aquamarine"]
|
||||||
|
|||||||
273
src/metric-engine/benches/bench_tsid_generator.rs
Normal file
273
src/metric-engine/benches/bench_tsid_generator.rs
Normal file
@@ -0,0 +1,273 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
use std::hash::Hasher;
|
||||||
|
|
||||||
|
use criterion::{Criterion, black_box, criterion_group, criterion_main};
|
||||||
|
use fxhash::FxHasher;
|
||||||
|
use mur3::Hasher128;
|
||||||
|
|
||||||
|
// A random number (from original implementation)
|
||||||
|
const TSID_HASH_SEED: u32 = 846793005;
|
||||||
|
|
||||||
|
/// Original TSID generator using mur3::Hasher128
|
||||||
|
/// Hashes both label name and value for each label pair
|
||||||
|
struct OriginalTsidGenerator {
|
||||||
|
hasher: Hasher128,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl OriginalTsidGenerator {
|
||||||
|
fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
hasher: Hasher128::with_seed(TSID_HASH_SEED),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Writes a label pair (name and value) to the generator.
|
||||||
|
fn write_label(&mut self, name: &str, value: &str) {
|
||||||
|
use std::hash::Hash;
|
||||||
|
name.hash(&mut self.hasher);
|
||||||
|
value.hash(&mut self.hasher);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Generates a new TSID.
|
||||||
|
fn finish(&mut self) -> u64 {
|
||||||
|
// TSID is 64 bits, simply truncate the 128 bits hash
|
||||||
|
let (hash, _) = self.hasher.finish128();
|
||||||
|
hash
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Current TSID generator using fxhash::FxHasher
|
||||||
|
/// Fast path: pre-computes label name hash, only hashes values
|
||||||
|
struct CurrentTsidGenerator {
|
||||||
|
hasher: FxHasher,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CurrentTsidGenerator {
|
||||||
|
fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
hasher: FxHasher::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn new_with_label_name_hash(label_name_hash: u64) -> Self {
|
||||||
|
let mut hasher = FxHasher::default();
|
||||||
|
hasher.write_u64(label_name_hash);
|
||||||
|
Self { hasher }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Writes a label value to the generator.
|
||||||
|
fn write_str(&mut self, value: &str) {
|
||||||
|
self.hasher.write(value.as_bytes());
|
||||||
|
self.hasher.write_u8(0xff);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Generates a new TSID.
|
||||||
|
fn finish(&mut self) -> u64 {
|
||||||
|
self.hasher.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Pre-computes label name hash (used in fast path)
|
||||||
|
fn compute_label_name_hash(labels: &[(&str, &str)]) -> u64 {
|
||||||
|
let mut hasher = FxHasher::default();
|
||||||
|
for (name, _) in labels {
|
||||||
|
hasher.write(name.as_bytes());
|
||||||
|
hasher.write_u8(0xff);
|
||||||
|
}
|
||||||
|
hasher.finish()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn bench_tsid_generator_small(c: &mut Criterion) {
|
||||||
|
let labels = vec![("namespace", "greptimedb"), ("host", "127.0.0.1")];
|
||||||
|
|
||||||
|
let mut group = c.benchmark_group("tsid_generator_small_2_labels");
|
||||||
|
group.bench_function("original_mur3", |b| {
|
||||||
|
b.iter(|| {
|
||||||
|
let mut tsid_gen = OriginalTsidGenerator::new();
|
||||||
|
for (name, value) in &labels {
|
||||||
|
tsid_gen.write_label(black_box(name), black_box(value));
|
||||||
|
}
|
||||||
|
black_box(tsid_gen.finish())
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
let label_name_hash = compute_label_name_hash(&labels);
|
||||||
|
group.bench_function("current_fxhash_fast_path", |b| {
|
||||||
|
b.iter(|| {
|
||||||
|
let mut tsid_gen =
|
||||||
|
CurrentTsidGenerator::new_with_label_name_hash(black_box(label_name_hash));
|
||||||
|
for (_, value) in &labels {
|
||||||
|
tsid_gen.write_str(black_box(value));
|
||||||
|
}
|
||||||
|
black_box(tsid_gen.finish())
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
group.finish();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn bench_tsid_generator_medium(c: &mut Criterion) {
|
||||||
|
let labels = vec![
|
||||||
|
("namespace", "greptimedb"),
|
||||||
|
("host", "127.0.0.1"),
|
||||||
|
("region", "us-west-2"),
|
||||||
|
("env", "production"),
|
||||||
|
("service", "api"),
|
||||||
|
];
|
||||||
|
|
||||||
|
let mut group = c.benchmark_group("tsid_generator_medium_5_labels");
|
||||||
|
group.bench_function("original_mur3", |b| {
|
||||||
|
b.iter(|| {
|
||||||
|
let mut tsid_gen = OriginalTsidGenerator::new();
|
||||||
|
for (name, value) in &labels {
|
||||||
|
tsid_gen.write_label(black_box(name), black_box(value));
|
||||||
|
}
|
||||||
|
black_box(tsid_gen.finish())
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
let label_name_hash = compute_label_name_hash(&labels);
|
||||||
|
group.bench_function("current_fxhash_fast_path", |b| {
|
||||||
|
b.iter(|| {
|
||||||
|
let mut tsid_gen =
|
||||||
|
CurrentTsidGenerator::new_with_label_name_hash(black_box(label_name_hash));
|
||||||
|
for (_, value) in &labels {
|
||||||
|
tsid_gen.write_str(black_box(value));
|
||||||
|
}
|
||||||
|
black_box(tsid_gen.finish())
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
group.finish();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn bench_tsid_generator_large(c: &mut Criterion) {
|
||||||
|
let labels = vec![
|
||||||
|
("namespace", "greptimedb"),
|
||||||
|
("host", "127.0.0.1"),
|
||||||
|
("region", "us-west-2"),
|
||||||
|
("env", "production"),
|
||||||
|
("service", "api"),
|
||||||
|
("version", "v1.0.0"),
|
||||||
|
("cluster", "cluster-1"),
|
||||||
|
("dc", "dc1"),
|
||||||
|
("rack", "rack-1"),
|
||||||
|
("pod", "pod-123"),
|
||||||
|
];
|
||||||
|
|
||||||
|
let mut group = c.benchmark_group("tsid_generator_large_10_labels");
|
||||||
|
group.bench_function("original_mur3", |b| {
|
||||||
|
b.iter(|| {
|
||||||
|
let mut tsid_gen = OriginalTsidGenerator::new();
|
||||||
|
for (name, value) in &labels {
|
||||||
|
tsid_gen.write_label(black_box(name), black_box(value));
|
||||||
|
}
|
||||||
|
black_box(tsid_gen.finish())
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
let label_name_hash = compute_label_name_hash(&labels);
|
||||||
|
group.bench_function("current_fxhash_fast_path", |b| {
|
||||||
|
b.iter(|| {
|
||||||
|
let mut tsid_gen =
|
||||||
|
CurrentTsidGenerator::new_with_label_name_hash(black_box(label_name_hash));
|
||||||
|
for (_, value) in &labels {
|
||||||
|
tsid_gen.write_str(black_box(value));
|
||||||
|
}
|
||||||
|
black_box(tsid_gen.finish())
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
group.finish();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn bench_tsid_generator_slow_path(c: &mut Criterion) {
|
||||||
|
// Simulate slow path: some labels have null values (empty strings)
|
||||||
|
let labels_with_nulls = vec![
|
||||||
|
("namespace", "greptimedb"),
|
||||||
|
("host", "127.0.0.1"),
|
||||||
|
("region", ""), // null
|
||||||
|
("env", "production"),
|
||||||
|
];
|
||||||
|
|
||||||
|
let labels_all_non_null = vec![
|
||||||
|
("namespace", "greptimedb"),
|
||||||
|
("host", "127.0.0.1"),
|
||||||
|
("env", "production"),
|
||||||
|
];
|
||||||
|
|
||||||
|
let mut group = c.benchmark_group("tsid_generator_slow_path_with_nulls");
|
||||||
|
|
||||||
|
// Original: always hashes name and value
|
||||||
|
group.bench_function("original_mur3_with_nulls", |b| {
|
||||||
|
b.iter(|| {
|
||||||
|
let mut tsid_gen = OriginalTsidGenerator::new();
|
||||||
|
for (name, value) in &labels_with_nulls {
|
||||||
|
if !value.is_empty() {
|
||||||
|
tsid_gen.write_label(black_box(name), black_box(value));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
black_box(tsid_gen.finish())
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
// Current slow path: recomputes label name hash
|
||||||
|
group.bench_function("current_fxhash_slow_path", |b| {
|
||||||
|
b.iter(|| {
|
||||||
|
// Step 1: Compute label name hash for non-null labels
|
||||||
|
let mut name_hasher = CurrentTsidGenerator::new();
|
||||||
|
for (name, value) in &labels_with_nulls {
|
||||||
|
if !value.is_empty() {
|
||||||
|
name_hasher.write_str(black_box(name));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let label_name_hash = name_hasher.finish();
|
||||||
|
|
||||||
|
// Step 2: Use label name hash and hash values
|
||||||
|
let mut tsid_gen = CurrentTsidGenerator::new_with_label_name_hash(label_name_hash);
|
||||||
|
for (_, value) in &labels_with_nulls {
|
||||||
|
if !value.is_empty() {
|
||||||
|
tsid_gen.write_str(black_box(value));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
black_box(tsid_gen.finish())
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
// Current fast path: pre-computed (for comparison)
|
||||||
|
let label_name_hash = compute_label_name_hash(&labels_all_non_null);
|
||||||
|
group.bench_function("current_fxhash_fast_path_no_nulls", |b| {
|
||||||
|
b.iter(|| {
|
||||||
|
let mut tsid_gen =
|
||||||
|
CurrentTsidGenerator::new_with_label_name_hash(black_box(label_name_hash));
|
||||||
|
for (_, value) in &labels_all_non_null {
|
||||||
|
tsid_gen.write_str(black_box(value));
|
||||||
|
}
|
||||||
|
black_box(tsid_gen.finish())
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
group.finish();
|
||||||
|
}
|
||||||
|
|
||||||
|
criterion_group!(
|
||||||
|
benches,
|
||||||
|
bench_tsid_generator_small,
|
||||||
|
bench_tsid_generator_medium,
|
||||||
|
bench_tsid_generator_large,
|
||||||
|
bench_tsid_generator_slow_path
|
||||||
|
);
|
||||||
|
criterion_main!(benches);
|
||||||
@@ -272,15 +272,15 @@ mod tests {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||||
let expected = "\
|
let expected = "\
|
||||||
+-------------------------+----------------+------------+----------------------+-------+
|
+-------------------------+----------------+------------+---------------------+-------+
|
||||||
| greptime_timestamp | greptime_value | __table_id | __tsid | job |
|
| greptime_timestamp | greptime_value | __table_id | __tsid | job |
|
||||||
+-------------------------+----------------+------------+----------------------+-------+
|
+-------------------------+----------------+------------+---------------------+-------+
|
||||||
| 1970-01-01T00:00:00 | 0.0 | 3 | 12881218023286672757 | tag_0 |
|
| 1970-01-01T00:00:00 | 0.0 | 3 | 2955007454552897459 | tag_0 |
|
||||||
| 1970-01-01T00:00:00.001 | 1.0 | 3 | 12881218023286672757 | tag_0 |
|
| 1970-01-01T00:00:00.001 | 1.0 | 3 | 2955007454552897459 | tag_0 |
|
||||||
| 1970-01-01T00:00:00.002 | 2.0 | 3 | 12881218023286672757 | tag_0 |
|
| 1970-01-01T00:00:00.002 | 2.0 | 3 | 2955007454552897459 | tag_0 |
|
||||||
| 1970-01-01T00:00:00.003 | 3.0 | 3 | 12881218023286672757 | tag_0 |
|
| 1970-01-01T00:00:00.003 | 3.0 | 3 | 2955007454552897459 | tag_0 |
|
||||||
| 1970-01-01T00:00:00.004 | 4.0 | 3 | 12881218023286672757 | tag_0 |
|
| 1970-01-01T00:00:00.004 | 4.0 | 3 | 2955007454552897459 | tag_0 |
|
||||||
+-------------------------+----------------+------------+----------------------+-------+";
|
+-------------------------+----------------+------------+---------------------+-------+";
|
||||||
assert_eq!(expected, batches.pretty_print().unwrap(), "physical region");
|
assert_eq!(expected, batches.pretty_print().unwrap(), "physical region");
|
||||||
|
|
||||||
// read data from logical region
|
// read data from logical region
|
||||||
|
|||||||
@@ -13,11 +13,12 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
use std::collections::{BTreeMap, HashMap};
|
use std::collections::{BTreeMap, HashMap};
|
||||||
use std::hash::Hash;
|
use std::hash::Hasher;
|
||||||
|
|
||||||
use api::v1::value::ValueData;
|
use api::v1::value::ValueData;
|
||||||
use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
|
use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
|
||||||
use datatypes::value::ValueRef;
|
use datatypes::value::ValueRef;
|
||||||
|
use fxhash::FxHasher;
|
||||||
use mito_codec::row_converter::SparsePrimaryKeyCodec;
|
use mito_codec::row_converter::SparsePrimaryKeyCodec;
|
||||||
use smallvec::SmallVec;
|
use smallvec::SmallVec;
|
||||||
use snafu::ResultExt;
|
use snafu::ResultExt;
|
||||||
@@ -30,9 +31,6 @@ use store_api::storage::{ColumnId, TableId};
|
|||||||
|
|
||||||
use crate::error::{EncodePrimaryKeySnafu, Result};
|
use crate::error::{EncodePrimaryKeySnafu, Result};
|
||||||
|
|
||||||
// A random number
|
|
||||||
const TSID_HASH_SEED: u32 = 846793005;
|
|
||||||
|
|
||||||
/// A row modifier modifies [`Rows`].
|
/// A row modifier modifies [`Rows`].
|
||||||
///
|
///
|
||||||
/// - For [`PrimaryKeyEncoding::Sparse`] encoding,
|
/// - For [`PrimaryKeyEncoding::Sparse`] encoding,
|
||||||
@@ -75,6 +73,7 @@ impl RowModifier {
|
|||||||
let num_output_column = num_column - num_primary_key_column + 1;
|
let num_output_column = num_column - num_primary_key_column + 1;
|
||||||
|
|
||||||
let mut buffer = vec![];
|
let mut buffer = vec![];
|
||||||
|
|
||||||
for mut iter in iter.iter_mut() {
|
for mut iter in iter.iter_mut() {
|
||||||
let (table_id, tsid) = Self::fill_internal_columns(table_id, &iter);
|
let (table_id, tsid) = Self::fill_internal_columns(table_id, &iter);
|
||||||
let mut values = Vec::with_capacity(num_output_column);
|
let mut values = Vec::with_capacity(num_output_column);
|
||||||
@@ -147,47 +146,72 @@ impl RowModifier {
|
|||||||
|
|
||||||
/// Fills internal columns of a row with table name and a hash of tag values.
|
/// Fills internal columns of a row with table name and a hash of tag values.
|
||||||
pub fn fill_internal_columns(table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) {
|
pub fn fill_internal_columns(table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) {
|
||||||
let mut hasher = TsidGenerator::default();
|
let ts_id = if !iter.has_null_labels() {
|
||||||
for (name, value) in iter.primary_keys_with_name() {
|
// No null labels in row, we can safely reuse the precomputed label name hash.
|
||||||
// The type is checked before. So only null is ignored.
|
let mut ts_id_gen = TsidGenerator::new(iter.index.label_name_hash);
|
||||||
if let Some(ValueData::StringValue(string)) = &value.value_data {
|
for (_, value) in iter.primary_keys_with_name() {
|
||||||
hasher.write_label(name, string);
|
// The type is checked before. So only null is ignored.
|
||||||
|
if let Some(ValueData::StringValue(string)) = &value.value_data {
|
||||||
|
ts_id_gen.write_str(string);
|
||||||
|
} else {
|
||||||
|
unreachable!(
|
||||||
|
"Should not contain null or non-string value: {:?}, table id: {}",
|
||||||
|
value, table_id
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
ts_id_gen.finish()
|
||||||
let hash = hasher.finish();
|
} else {
|
||||||
|
// Slow path: row contains null, recompute label hash
|
||||||
|
let mut hasher = TsidGenerator::default();
|
||||||
|
// 1. Find out label names with non-null values and get the hash.
|
||||||
|
for (name, value) in iter.primary_keys_with_name() {
|
||||||
|
// The type is checked before. So only null is ignored.
|
||||||
|
if let Some(ValueData::StringValue(_)) = &value.value_data {
|
||||||
|
hasher.write_str(name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let label_name_hash = hasher.finish();
|
||||||
|
|
||||||
|
// 2. Use label name hash as seed and continue with label values.
|
||||||
|
let mut final_hasher = TsidGenerator::new(label_name_hash);
|
||||||
|
for (_, value) in iter.primary_keys_with_name() {
|
||||||
|
if let Some(ValueData::StringValue(value)) = &value.value_data {
|
||||||
|
final_hasher.write_str(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
final_hasher.finish()
|
||||||
|
};
|
||||||
|
|
||||||
(
|
(
|
||||||
ValueData::U32Value(table_id).into(),
|
ValueData::U32Value(table_id).into(),
|
||||||
ValueData::U64Value(hash).into(),
|
ValueData::U64Value(ts_id).into(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Tsid generator.
|
/// Tsid generator.
|
||||||
|
#[derive(Default)]
|
||||||
pub struct TsidGenerator {
|
pub struct TsidGenerator {
|
||||||
hasher: mur3::Hasher128,
|
hasher: FxHasher,
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for TsidGenerator {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self {
|
|
||||||
hasher: mur3::Hasher128::with_seed(TSID_HASH_SEED),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TsidGenerator {
|
impl TsidGenerator {
|
||||||
|
pub fn new(label_name_hash: u64) -> Self {
|
||||||
|
let mut hasher = FxHasher::default();
|
||||||
|
hasher.write_u64(label_name_hash);
|
||||||
|
Self { hasher }
|
||||||
|
}
|
||||||
|
|
||||||
/// Writes a label pair to the generator.
|
/// Writes a label pair to the generator.
|
||||||
pub fn write_label(&mut self, name: &str, value: &str) {
|
pub fn write_str(&mut self, value: &str) {
|
||||||
name.hash(&mut self.hasher);
|
self.hasher.write(value.as_bytes());
|
||||||
value.hash(&mut self.hasher);
|
self.hasher.write_u8(0xff);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Generates a new TSID.
|
/// Generates a new TSID.
|
||||||
pub fn finish(&mut self) -> u64 {
|
pub fn finish(&mut self) -> u64 {
|
||||||
// TSID is 64 bits, simply truncate the 128 bits hash
|
self.hasher.finish()
|
||||||
let (hash, _) = self.hasher.finish128();
|
|
||||||
hash
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -202,6 +226,8 @@ struct ValueIndex {
|
|||||||
struct IterIndex {
|
struct IterIndex {
|
||||||
indices: Vec<ValueIndex>,
|
indices: Vec<ValueIndex>,
|
||||||
num_primary_key_column: usize,
|
num_primary_key_column: usize,
|
||||||
|
/// Precomputed hash for label names.
|
||||||
|
label_name_hash: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl IterIndex {
|
impl IterIndex {
|
||||||
@@ -252,15 +278,22 @@ impl IterIndex {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
let num_primary_key_column = primary_key_indices.len() + reserved_indices.len();
|
let num_primary_key_column = primary_key_indices.len() + reserved_indices.len();
|
||||||
let indices = reserved_indices
|
let mut indices = Vec::with_capacity(num_primary_key_column + 2);
|
||||||
.into_iter()
|
indices.extend(reserved_indices);
|
||||||
.chain(primary_key_indices.values().cloned())
|
let mut label_name_hasher = TsidGenerator::default();
|
||||||
.chain(ts_index)
|
for (pk_name, pk_index) in primary_key_indices {
|
||||||
.chain(field_indices)
|
// primary_key_indices already sorted.
|
||||||
.collect();
|
label_name_hasher.write_str(pk_name);
|
||||||
|
indices.push(pk_index);
|
||||||
|
}
|
||||||
|
let label_name_hash = label_name_hasher.finish();
|
||||||
|
|
||||||
|
indices.extend(ts_index);
|
||||||
|
indices.extend(field_indices);
|
||||||
IterIndex {
|
IterIndex {
|
||||||
indices,
|
indices,
|
||||||
num_primary_key_column,
|
num_primary_key_column,
|
||||||
|
label_name_hash,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -314,6 +347,13 @@ impl RowIter<'_> {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns true if any label in current row is null.
|
||||||
|
fn has_null_labels(&self) -> bool {
|
||||||
|
self.index.indices[..self.index.num_primary_key_column]
|
||||||
|
.iter()
|
||||||
|
.any(|idx| self.row.values[idx.index].value_data.is_none())
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the primary keys.
|
/// Returns the primary keys.
|
||||||
pub fn primary_keys(&self) -> impl Iterator<Item = (ColumnId, ValueRef<'_>)> {
|
pub fn primary_keys(&self) -> impl Iterator<Item = (ColumnId, ValueRef<'_>)> {
|
||||||
self.index.indices[..self.index.num_primary_key_column]
|
self.index.indices[..self.index.num_primary_key_column]
|
||||||
@@ -399,9 +439,9 @@ mod tests {
|
|||||||
let result = encoder.modify_rows_sparse(rows_iter, table_id).unwrap();
|
let result = encoder.modify_rows_sparse(rows_iter, table_id).unwrap();
|
||||||
assert_eq!(result.rows[0].values.len(), 1);
|
assert_eq!(result.rows[0].values.len(), 1);
|
||||||
let encoded_primary_key = vec![
|
let encoded_primary_key = vec![
|
||||||
128, 0, 0, 4, 1, 0, 0, 4, 1, 128, 0, 0, 3, 1, 131, 9, 166, 190, 173, 37, 39, 240, 0, 0,
|
128, 0, 0, 4, 1, 0, 0, 4, 1, 128, 0, 0, 3, 1, 37, 196, 242, 181, 117, 224, 7, 137, 0,
|
||||||
0, 2, 1, 1, 49, 50, 55, 46, 48, 46, 48, 46, 9, 49, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1,
|
0, 0, 2, 1, 1, 49, 50, 55, 46, 48, 46, 48, 46, 9, 49, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0,
|
||||||
1, 1, 103, 114, 101, 112, 116, 105, 109, 101, 9, 100, 98, 0, 0, 0, 0, 0, 0, 2,
|
1, 1, 1, 103, 114, 101, 112, 116, 105, 109, 101, 9, 100, 98, 0, 0, 0, 0, 0, 0, 2,
|
||||||
];
|
];
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
result.rows[0].values[0],
|
result.rows[0].values[0],
|
||||||
@@ -477,7 +517,7 @@ mod tests {
|
|||||||
assert_eq!(result.rows[0].values[2], ValueData::U32Value(1025).into());
|
assert_eq!(result.rows[0].values[2], ValueData::U32Value(1025).into());
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
result.rows[0].values[3],
|
result.rows[0].values[3],
|
||||||
ValueData::U64Value(9442261431637846000).into()
|
ValueData::U64Value(2721566936019240841).into()
|
||||||
);
|
);
|
||||||
assert_eq!(result.schema, expected_dense_schema());
|
assert_eq!(result.schema, expected_dense_schema());
|
||||||
}
|
}
|
||||||
@@ -496,7 +536,7 @@ mod tests {
|
|||||||
let row_iter = rows_iter.iter_mut().next().unwrap();
|
let row_iter = rows_iter.iter_mut().next().unwrap();
|
||||||
let (encoded_table_id, tsid) = RowModifier::fill_internal_columns(table_id, &row_iter);
|
let (encoded_table_id, tsid) = RowModifier::fill_internal_columns(table_id, &row_iter);
|
||||||
assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
|
assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
|
||||||
assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into());
|
assert_eq!(tsid, ValueData::U64Value(2721566936019240841).into());
|
||||||
|
|
||||||
// Change the column order
|
// Change the column order
|
||||||
let schema = vec![
|
let schema = vec![
|
||||||
@@ -524,6 +564,264 @@ mod tests {
|
|||||||
let row_iter = rows_iter.iter_mut().next().unwrap();
|
let row_iter = rows_iter.iter_mut().next().unwrap();
|
||||||
let (encoded_table_id, tsid) = RowModifier::fill_internal_columns(table_id, &row_iter);
|
let (encoded_table_id, tsid) = RowModifier::fill_internal_columns(table_id, &row_iter);
|
||||||
assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
|
assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
|
||||||
assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into());
|
assert_eq!(tsid, ValueData::U64Value(2721566936019240841).into());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Helper function to create a schema with multiple label columns
|
||||||
|
fn create_multi_label_schema(labels: &[&str]) -> Vec<ColumnSchema> {
|
||||||
|
labels
|
||||||
|
.iter()
|
||||||
|
.map(|name| ColumnSchema {
|
||||||
|
column_name: name.to_string(),
|
||||||
|
datatype: ColumnDataType::String as i32,
|
||||||
|
semantic_type: SemanticType::Tag as _,
|
||||||
|
datatype_extension: None,
|
||||||
|
options: None,
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Helper function to create a name_to_column_id map
|
||||||
|
fn create_name_to_column_id(labels: &[&str]) -> HashMap<String, ColumnId> {
|
||||||
|
labels
|
||||||
|
.iter()
|
||||||
|
.enumerate()
|
||||||
|
.map(|(idx, name)| (name.to_string(), idx as ColumnId + 1))
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Helper function to create a row with string values
|
||||||
|
fn create_row_with_values(values: &[&str]) -> Row {
|
||||||
|
Row {
|
||||||
|
values: values
|
||||||
|
.iter()
|
||||||
|
.map(|v| ValueData::StringValue(v.to_string()).into())
|
||||||
|
.collect(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Helper function to create a row with some null values
|
||||||
|
fn create_row_with_nulls(values: &[Option<&str>]) -> Row {
|
||||||
|
Row {
|
||||||
|
values: values
|
||||||
|
.iter()
|
||||||
|
.map(|v| {
|
||||||
|
v.map(|s| ValueData::StringValue(s.to_string()).into())
|
||||||
|
.unwrap_or(Value { value_data: None })
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Helper function to extract TSID from a row
|
||||||
|
fn extract_tsid(
|
||||||
|
schema: Vec<ColumnSchema>,
|
||||||
|
row: Row,
|
||||||
|
name_to_column_id: &HashMap<String, ColumnId>,
|
||||||
|
table_id: TableId,
|
||||||
|
) -> u64 {
|
||||||
|
let rows = Rows {
|
||||||
|
schema,
|
||||||
|
rows: vec![row],
|
||||||
|
};
|
||||||
|
let mut rows_iter = RowsIter::new(rows, name_to_column_id);
|
||||||
|
let row_iter = rows_iter.iter_mut().next().unwrap();
|
||||||
|
let (_, tsid_value) = RowModifier::fill_internal_columns(table_id, &row_iter);
|
||||||
|
match tsid_value.value_data {
|
||||||
|
Some(ValueData::U64Value(tsid)) => tsid,
|
||||||
|
_ => panic!("Expected U64Value for TSID"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_tsid_same_for_different_label_orders() {
|
||||||
|
// Test that rows with the same label name-value pairs but in different orders
|
||||||
|
// produce the same TSID
|
||||||
|
let table_id = 1025;
|
||||||
|
|
||||||
|
// Schema 1: a, b, c
|
||||||
|
let schema1 = create_multi_label_schema(&["a", "b", "c"]);
|
||||||
|
let name_to_column_id1 = create_name_to_column_id(&["a", "b", "c"]);
|
||||||
|
let row1 = create_row_with_values(&["A", "B", "C"]);
|
||||||
|
let tsid1 = extract_tsid(schema1, row1, &name_to_column_id1, table_id);
|
||||||
|
|
||||||
|
// Schema 2: b, a, c (different order)
|
||||||
|
let schema2 = create_multi_label_schema(&["b", "a", "c"]);
|
||||||
|
let name_to_column_id2 = create_name_to_column_id(&["a", "b", "c"]);
|
||||||
|
let row2 = create_row_with_values(&["B", "A", "C"]);
|
||||||
|
let tsid2 = extract_tsid(schema2, row2, &name_to_column_id2, table_id);
|
||||||
|
|
||||||
|
// Schema 3: c, b, a (another different order)
|
||||||
|
let schema3 = create_multi_label_schema(&["c", "b", "a"]);
|
||||||
|
let name_to_column_id3 = create_name_to_column_id(&["a", "b", "c"]);
|
||||||
|
let row3 = create_row_with_values(&["C", "B", "A"]);
|
||||||
|
let tsid3 = extract_tsid(schema3, row3, &name_to_column_id3, table_id);
|
||||||
|
|
||||||
|
// All should have the same TSID since label names are sorted lexicographically
|
||||||
|
// and we're using the same label name-value pairs
|
||||||
|
assert_eq!(
|
||||||
|
tsid1, tsid2,
|
||||||
|
"TSID should be same for different column orders"
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
tsid2, tsid3,
|
||||||
|
"TSID should be same for different column orders"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_tsid_same_with_null_labels() {
|
||||||
|
// Test that rows that differ only by null label values produce the same TSID
|
||||||
|
let table_id = 1025;
|
||||||
|
|
||||||
|
// Row 1: a=A, b=B (no nulls, fast path)
|
||||||
|
let schema1 = create_multi_label_schema(&["a", "b"]);
|
||||||
|
let name_to_column_id1 = create_name_to_column_id(&["a", "b"]);
|
||||||
|
let row1 = create_row_with_values(&["A", "B"]);
|
||||||
|
let tsid1 = extract_tsid(schema1, row1, &name_to_column_id1, table_id);
|
||||||
|
|
||||||
|
// Row 2: a=A, b=B, c=null (has null, slow path)
|
||||||
|
let schema2 = create_multi_label_schema(&["a", "b", "c"]);
|
||||||
|
let name_to_column_id2 = create_name_to_column_id(&["a", "b", "c"]);
|
||||||
|
let row2 = create_row_with_nulls(&[Some("A"), Some("B"), None]);
|
||||||
|
let tsid2 = extract_tsid(schema2, row2, &name_to_column_id2, table_id);
|
||||||
|
|
||||||
|
// Both should have the same TSID since null labels are ignored
|
||||||
|
assert_eq!(
|
||||||
|
tsid1, tsid2,
|
||||||
|
"TSID should be same when only difference is null label values"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_tsid_same_with_multiple_null_labels() {
|
||||||
|
// Test with multiple null labels
|
||||||
|
let table_id = 1025;
|
||||||
|
|
||||||
|
// Row 1: a=A, b=B (no nulls)
|
||||||
|
let schema1 = create_multi_label_schema(&["a", "b"]);
|
||||||
|
let name_to_column_id1 = create_name_to_column_id(&["a", "b"]);
|
||||||
|
let row1 = create_row_with_values(&["A", "B"]);
|
||||||
|
let tsid1 = extract_tsid(schema1, row1, &name_to_column_id1, table_id);
|
||||||
|
|
||||||
|
// Row 2: a=A, b=B, c=null, d=null (multiple nulls)
|
||||||
|
let schema2 = create_multi_label_schema(&["a", "b", "c", "d"]);
|
||||||
|
let name_to_column_id2 = create_name_to_column_id(&["a", "b", "c", "d"]);
|
||||||
|
let row2 = create_row_with_nulls(&[Some("A"), Some("B"), None, None]);
|
||||||
|
let tsid2 = extract_tsid(schema2, row2, &name_to_column_id2, table_id);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
tsid1, tsid2,
|
||||||
|
"TSID should be same when only difference is multiple null label values"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_tsid_different_with_different_non_null_values() {
|
||||||
|
// Test that rows with different non-null values produce different TSIDs
|
||||||
|
let table_id = 1025;
|
||||||
|
|
||||||
|
// Row 1: a=A, b=B
|
||||||
|
let schema1 = create_multi_label_schema(&["a", "b"]);
|
||||||
|
let name_to_column_id1 = create_name_to_column_id(&["a", "b"]);
|
||||||
|
let row1 = create_row_with_values(&["A", "B"]);
|
||||||
|
let tsid1 = extract_tsid(schema1, row1, &name_to_column_id1, table_id);
|
||||||
|
|
||||||
|
// Row 2: a=A, b=C (different value for b)
|
||||||
|
let schema2 = create_multi_label_schema(&["a", "b"]);
|
||||||
|
let name_to_column_id2 = create_name_to_column_id(&["a", "b"]);
|
||||||
|
let row2 = create_row_with_values(&["A", "C"]);
|
||||||
|
let tsid2 = extract_tsid(schema2, row2, &name_to_column_id2, table_id);
|
||||||
|
|
||||||
|
assert_ne!(
|
||||||
|
tsid1, tsid2,
|
||||||
|
"TSID should be different when label values differ"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_tsid_fast_path_vs_slow_path_consistency() {
|
||||||
|
// Test that fast path (no nulls) and slow path (with nulls) produce
|
||||||
|
// the same TSID for the same non-null label values
|
||||||
|
let table_id = 1025;
|
||||||
|
|
||||||
|
// Fast path: a=A, b=B (no nulls)
|
||||||
|
let schema_fast = create_multi_label_schema(&["a", "b"]);
|
||||||
|
let name_to_column_id_fast = create_name_to_column_id(&["a", "b"]);
|
||||||
|
let row_fast = create_row_with_values(&["A", "B"]);
|
||||||
|
let tsid_fast = extract_tsid(schema_fast, row_fast, &name_to_column_id_fast, table_id);
|
||||||
|
|
||||||
|
// Slow path: a=A, b=B, c=null (has null, triggers slow path)
|
||||||
|
let schema_slow = create_multi_label_schema(&["a", "b", "c"]);
|
||||||
|
let name_to_column_id_slow = create_name_to_column_id(&["a", "b", "c"]);
|
||||||
|
let row_slow = create_row_with_nulls(&[Some("A"), Some("B"), None]);
|
||||||
|
let tsid_slow = extract_tsid(schema_slow, row_slow, &name_to_column_id_slow, table_id);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
tsid_fast, tsid_slow,
|
||||||
|
"Fast path and slow path should produce same TSID for same non-null values"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_tsid_with_null_in_middle() {
|
||||||
|
// Test with null in the middle of labels
|
||||||
|
let table_id = 1025;
|
||||||
|
|
||||||
|
// Row 1: a=A, b=B, c=C
|
||||||
|
let schema1 = create_multi_label_schema(&["a", "b", "c"]);
|
||||||
|
let name_to_column_id1 = create_name_to_column_id(&["a", "b", "c"]);
|
||||||
|
let row1 = create_row_with_values(&["A", "B", "C"]);
|
||||||
|
let tsid1 = extract_tsid(schema1, row1, &name_to_column_id1, table_id);
|
||||||
|
|
||||||
|
// Row 2: a=A, b=null, c=C (null in middle)
|
||||||
|
let schema2 = create_multi_label_schema(&["a", "b", "c"]);
|
||||||
|
let name_to_column_id2 = create_name_to_column_id(&["a", "b", "c"]);
|
||||||
|
let row2 = create_row_with_nulls(&[Some("A"), None, Some("C")]);
|
||||||
|
let tsid2 = extract_tsid(schema2, row2, &name_to_column_id2, table_id);
|
||||||
|
|
||||||
|
// Should be different because b is null in row2 but B in row1
|
||||||
|
// Actually wait, let me reconsider - if b is null, it should be ignored
|
||||||
|
// So row2 should be equivalent to a=A, c=C
|
||||||
|
// But row1 is a=A, b=B, c=C, so they should be different
|
||||||
|
assert_ne!(
|
||||||
|
tsid1, tsid2,
|
||||||
|
"TSID should be different when a non-null value becomes null"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Row 3: a=A, c=C (no b at all, equivalent to row2)
|
||||||
|
let schema3 = create_multi_label_schema(&["a", "c"]);
|
||||||
|
let name_to_column_id3 = create_name_to_column_id(&["a", "c"]);
|
||||||
|
let row3 = create_row_with_values(&["A", "C"]);
|
||||||
|
let tsid3 = extract_tsid(schema3, row3, &name_to_column_id3, table_id);
|
||||||
|
|
||||||
|
// Row2 (a=A, b=null, c=C) should be same as row3 (a=A, c=C)
|
||||||
|
assert_eq!(
|
||||||
|
tsid2, tsid3,
|
||||||
|
"TSID should be same when null label is ignored"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_tsid_all_null_labels() {
|
||||||
|
// Test with all labels being null
|
||||||
|
let table_id = 1025;
|
||||||
|
|
||||||
|
// Row with all nulls
|
||||||
|
let schema = create_multi_label_schema(&["a", "b", "c"]);
|
||||||
|
let name_to_column_id = create_name_to_column_id(&["a", "b", "c"]);
|
||||||
|
let row = create_row_with_nulls(&[None, None, None]);
|
||||||
|
let tsid = extract_tsid(schema.clone(), row, &name_to_column_id, table_id);
|
||||||
|
|
||||||
|
// Should still produce a TSID (based on label names only when all values are null)
|
||||||
|
// This tests that the slow path handles the case where all values are null
|
||||||
|
// The TSID will be based on the label name hash only
|
||||||
|
// Test that it's consistent - same schema with all nulls should produce same TSID
|
||||||
|
let row2 = create_row_with_nulls(&[None, None, None]);
|
||||||
|
let tsid2 = extract_tsid(schema, row2, &name_to_column_id, table_id);
|
||||||
|
assert_eq!(
|
||||||
|
tsid, tsid2,
|
||||||
|
"TSID should be consistent when all label values are null"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -515,6 +515,7 @@ async fn test_flush_workers() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn test_flush_workers_with_format(flat_format: bool) {
|
async fn test_flush_workers_with_format(flat_format: bool) {
|
||||||
|
common_telemetry::init_default_ut_logging();
|
||||||
let mut env = TestEnv::new().await;
|
let mut env = TestEnv::new().await;
|
||||||
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
|
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
|
||||||
let listener = Arc::new(FlushListener::default());
|
let listener = Arc::new(FlushListener::default());
|
||||||
@@ -574,7 +575,7 @@ async fn test_flush_workers_with_format(flat_format: bool) {
|
|||||||
put_rows(&engine, region_id0, rows).await;
|
put_rows(&engine, region_id0, rows).await;
|
||||||
|
|
||||||
// Waits until flush is finished.
|
// Waits until flush is finished.
|
||||||
while listener.success_count() < 2 {
|
while listener.success_count() < 3 {
|
||||||
listener.wait().await;
|
listener.wait().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ use std::sync::Arc;
|
|||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
use common_telemetry::{debug, error, info, trace};
|
use common_telemetry::{debug, error, info};
|
||||||
use datatypes::arrow::datatypes::SchemaRef;
|
use datatypes::arrow::datatypes::SchemaRef;
|
||||||
use either::Either;
|
use either::Either;
|
||||||
use partition::expr::PartitionExpr;
|
use partition::expr::PartitionExpr;
|
||||||
@@ -89,6 +89,12 @@ pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
|
|||||||
|
|
||||||
/// Returns the total memory used by memtables.
|
/// Returns the total memory used by memtables.
|
||||||
fn memory_usage(&self) -> usize;
|
fn memory_usage(&self) -> usize;
|
||||||
|
|
||||||
|
/// Returns the mutable memtable memory limit.
|
||||||
|
///
|
||||||
|
/// The write buffer manager should flush memtables when the mutable memory usage
|
||||||
|
/// exceeds this limit.
|
||||||
|
fn flush_limit(&self) -> usize;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
|
pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
|
||||||
@@ -145,7 +151,7 @@ impl WriteBufferManagerImpl {
|
|||||||
impl WriteBufferManager for WriteBufferManagerImpl {
|
impl WriteBufferManager for WriteBufferManagerImpl {
|
||||||
fn should_flush_engine(&self) -> bool {
|
fn should_flush_engine(&self) -> bool {
|
||||||
let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
|
let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
|
||||||
if mutable_memtable_memory_usage > self.mutable_limit {
|
if mutable_memtable_memory_usage >= self.mutable_limit {
|
||||||
debug!(
|
debug!(
|
||||||
"Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
|
"Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
|
||||||
mutable_memtable_memory_usage,
|
mutable_memtable_memory_usage,
|
||||||
@@ -157,23 +163,8 @@ impl WriteBufferManager for WriteBufferManagerImpl {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let memory_usage = self.memory_used.load(Ordering::Relaxed);
|
let memory_usage = self.memory_used.load(Ordering::Relaxed);
|
||||||
// If the memory exceeds the buffer size, we trigger more aggressive
|
|
||||||
// flush. But if already more than half memory is being flushed,
|
|
||||||
// triggering more flush may not help. We will hold it instead.
|
|
||||||
if memory_usage >= self.global_write_buffer_size {
|
if memory_usage >= self.global_write_buffer_size {
|
||||||
if mutable_memtable_memory_usage >= self.global_write_buffer_size / 2 {
|
return true;
|
||||||
debug!(
|
|
||||||
"Engine should flush (over total limit), memory_usage: {}, global_write_buffer_size: {}, \
|
|
||||||
mutable_usage: {}.",
|
|
||||||
memory_usage, self.global_write_buffer_size, mutable_memtable_memory_usage
|
|
||||||
);
|
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
trace!(
|
|
||||||
"Engine won't flush, memory_usage: {}, global_write_buffer_size: {}, mutable_usage: {}.",
|
|
||||||
memory_usage, self.global_write_buffer_size, mutable_memtable_memory_usage
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
false
|
false
|
||||||
@@ -205,6 +196,10 @@ impl WriteBufferManager for WriteBufferManagerImpl {
|
|||||||
fn memory_usage(&self) -> usize {
|
fn memory_usage(&self) -> usize {
|
||||||
self.memory_used.load(Ordering::Relaxed)
|
self.memory_used.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn flush_limit(&self) -> usize {
|
||||||
|
self.mutable_limit
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reason of a flush task.
|
/// Reason of a flush task.
|
||||||
@@ -888,6 +883,31 @@ impl FlushScheduler {
|
|||||||
self.region_status.contains_key(®ion_id)
|
self.region_status.contains_key(®ion_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn schedule_flush_task(
|
||||||
|
&mut self,
|
||||||
|
version_control: &VersionControlRef,
|
||||||
|
task: RegionFlushTask,
|
||||||
|
) -> Result<()> {
|
||||||
|
let region_id = task.region_id;
|
||||||
|
|
||||||
|
// If current region doesn't have flush status, we can flush the region directly.
|
||||||
|
if let Err(e) = version_control.freeze_mutable() {
|
||||||
|
error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
|
||||||
|
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
// Submit a flush job.
|
||||||
|
let job = task.into_flush_job(version_control);
|
||||||
|
if let Err(e) = self.scheduler.schedule(job) {
|
||||||
|
// If scheduler returns error, senders in the job will be dropped and waiters
|
||||||
|
// can get recv errors.
|
||||||
|
error!(e; "Failed to schedule flush job for region {}", region_id);
|
||||||
|
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Schedules a flush `task` for specific `region`.
|
/// Schedules a flush `task` for specific `region`.
|
||||||
pub(crate) fn schedule_flush(
|
pub(crate) fn schedule_flush(
|
||||||
&mut self,
|
&mut self,
|
||||||
@@ -910,46 +930,21 @@ impl FlushScheduler {
|
|||||||
.with_label_values(&[task.reason.as_str()])
|
.with_label_values(&[task.reason.as_str()])
|
||||||
.inc();
|
.inc();
|
||||||
|
|
||||||
|
// If current region has flush status, merge the task.
|
||||||
|
if let Some(flush_status) = self.region_status.get_mut(®ion_id) {
|
||||||
|
// Checks whether we can flush the region now.
|
||||||
|
debug!("Merging flush task for region {}", region_id);
|
||||||
|
flush_status.merge_task(task);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
self.schedule_flush_task(version_control, task)?;
|
||||||
|
|
||||||
// Add this region to status map.
|
// Add this region to status map.
|
||||||
let flush_status = self
|
let _ = self.region_status.insert(
|
||||||
.region_status
|
region_id,
|
||||||
.entry(region_id)
|
FlushStatus::new(region_id, version_control.clone()),
|
||||||
.or_insert_with(|| FlushStatus::new(region_id, version_control.clone()));
|
);
|
||||||
// Checks whether we can flush the region now.
|
|
||||||
if flush_status.flushing {
|
|
||||||
// There is already a flush job running.
|
|
||||||
flush_status.merge_task(task);
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(yingwen): We can merge with pending and execute directly.
|
|
||||||
// If there are pending tasks, then we should push it to pending list.
|
|
||||||
if flush_status.pending_task.is_some() {
|
|
||||||
flush_status.merge_task(task);
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now we can flush the region directly.
|
|
||||||
if let Err(e) = version_control.freeze_mutable() {
|
|
||||||
error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
|
|
||||||
|
|
||||||
// Remove from region status if we can't freeze the mutable memtable.
|
|
||||||
self.region_status.remove(®ion_id);
|
|
||||||
return Err(e);
|
|
||||||
}
|
|
||||||
// Submit a flush job.
|
|
||||||
let job = task.into_flush_job(version_control);
|
|
||||||
if let Err(e) = self.scheduler.schedule(job) {
|
|
||||||
// If scheduler returns error, senders in the job will be dropped and waiters
|
|
||||||
// can get recv errors.
|
|
||||||
error!(e; "Failed to schedule flush job for region {}", region_id);
|
|
||||||
|
|
||||||
// Remove from region status if we can't submit the task.
|
|
||||||
self.region_status.remove(®ion_id);
|
|
||||||
return Err(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
flush_status.flushing = true;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -966,48 +961,56 @@ impl FlushScheduler {
|
|||||||
Vec<SenderBulkRequest>,
|
Vec<SenderBulkRequest>,
|
||||||
)> {
|
)> {
|
||||||
let flush_status = self.region_status.get_mut(®ion_id)?;
|
let flush_status = self.region_status.get_mut(®ion_id)?;
|
||||||
|
// If region doesn't have any pending flush task, we need to remove it from the status.
|
||||||
// This region doesn't have running flush job.
|
if flush_status.pending_task.is_none() {
|
||||||
flush_status.flushing = false;
|
|
||||||
|
|
||||||
let pending_requests = if flush_status.pending_task.is_none() {
|
|
||||||
// The region doesn't have any pending flush task.
|
// The region doesn't have any pending flush task.
|
||||||
// Safety: The flush status must exist.
|
// Safety: The flush status must exist.
|
||||||
|
debug!(
|
||||||
|
"Region {} doesn't have any pending flush task, removing it from the status",
|
||||||
|
region_id
|
||||||
|
);
|
||||||
let flush_status = self.region_status.remove(®ion_id).unwrap();
|
let flush_status = self.region_status.remove(®ion_id).unwrap();
|
||||||
Some((
|
return Some((
|
||||||
flush_status.pending_ddls,
|
flush_status.pending_ddls,
|
||||||
flush_status.pending_writes,
|
flush_status.pending_writes,
|
||||||
flush_status.pending_bulk_writes,
|
flush_status.pending_bulk_writes,
|
||||||
))
|
));
|
||||||
} else {
|
|
||||||
let version_data = flush_status.version_control.current();
|
|
||||||
if version_data.version.memtables.is_empty() {
|
|
||||||
// The region has nothing to flush, we also need to remove it from the status.
|
|
||||||
// Safety: The pending task is not None.
|
|
||||||
let task = flush_status.pending_task.take().unwrap();
|
|
||||||
// The region has nothing to flush. We can notify pending task.
|
|
||||||
task.on_success();
|
|
||||||
// `schedule_next_flush()` may pick up the same region to flush, so we must remove
|
|
||||||
// it from the status to avoid leaking pending requests.
|
|
||||||
// Safety: The flush status must exist.
|
|
||||||
let flush_status = self.region_status.remove(®ion_id).unwrap();
|
|
||||||
Some((
|
|
||||||
flush_status.pending_ddls,
|
|
||||||
flush_status.pending_writes,
|
|
||||||
flush_status.pending_bulk_writes,
|
|
||||||
))
|
|
||||||
} else {
|
|
||||||
// We can flush the region again, keep it in the region status.
|
|
||||||
None
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Schedule next flush job.
|
|
||||||
if let Err(e) = self.schedule_next_flush() {
|
|
||||||
error!(e; "Flush of region {} is successful, but failed to schedule next flush", region_id);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pending_requests
|
// If region has pending task, but has nothing to flush, we need to remove it from the status.
|
||||||
|
let version_data = flush_status.version_control.current();
|
||||||
|
if version_data.version.memtables.is_empty() {
|
||||||
|
// The region has nothing to flush, we also need to remove it from the status.
|
||||||
|
// Safety: The pending task is not None.
|
||||||
|
let task = flush_status.pending_task.take().unwrap();
|
||||||
|
// The region has nothing to flush. We can notify pending task.
|
||||||
|
task.on_success();
|
||||||
|
debug!(
|
||||||
|
"Region {} has nothing to flush, removing it from the status",
|
||||||
|
region_id
|
||||||
|
);
|
||||||
|
// Safety: The flush status must exist.
|
||||||
|
let flush_status = self.region_status.remove(®ion_id).unwrap();
|
||||||
|
return Some((
|
||||||
|
flush_status.pending_ddls,
|
||||||
|
flush_status.pending_writes,
|
||||||
|
flush_status.pending_bulk_writes,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
// If region has pending task and has something to flush, we need to schedule it.
|
||||||
|
debug!("Scheduling pending flush task for region {}", region_id);
|
||||||
|
// Safety: The flush status must exist.
|
||||||
|
let task = flush_status.pending_task.take().unwrap();
|
||||||
|
let version_control = flush_status.version_control.clone();
|
||||||
|
if let Err(err) = self.schedule_flush_task(&version_control, task) {
|
||||||
|
error!(
|
||||||
|
err;
|
||||||
|
"Flush succeeded for region {region_id}, but failed to schedule next flush for it."
|
||||||
|
);
|
||||||
|
}
|
||||||
|
// We can flush the region again, keep it in the region status.
|
||||||
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Notifies the scheduler that the flush job is failed.
|
/// Notifies the scheduler that the flush job is failed.
|
||||||
@@ -1023,11 +1026,6 @@ impl FlushScheduler {
|
|||||||
|
|
||||||
// Fast fail: cancels all pending tasks and sends error to their waiters.
|
// Fast fail: cancels all pending tasks and sends error to their waiters.
|
||||||
flush_status.on_failure(err);
|
flush_status.on_failure(err);
|
||||||
|
|
||||||
// Still tries to schedule a new flush.
|
|
||||||
if let Err(e) = self.schedule_next_flush() {
|
|
||||||
error!(e; "Failed to schedule next flush after region {} flush is failed", region_id);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Notifies the scheduler that the region is dropped.
|
/// Notifies the scheduler that the region is dropped.
|
||||||
@@ -1098,30 +1096,6 @@ impl FlushScheduler {
|
|||||||
.map(|status| !status.pending_ddls.is_empty())
|
.map(|status| !status.pending_ddls.is_empty())
|
||||||
.unwrap_or(false)
|
.unwrap_or(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Schedules a new flush task when the scheduler can submit next task.
|
|
||||||
pub(crate) fn schedule_next_flush(&mut self) -> Result<()> {
|
|
||||||
debug_assert!(
|
|
||||||
self.region_status
|
|
||||||
.values()
|
|
||||||
.all(|status| status.flushing || status.pending_task.is_some())
|
|
||||||
);
|
|
||||||
|
|
||||||
// Get the first region from status map.
|
|
||||||
let Some(flush_status) = self
|
|
||||||
.region_status
|
|
||||||
.values_mut()
|
|
||||||
.find(|status| status.pending_task.is_some())
|
|
||||||
else {
|
|
||||||
return Ok(());
|
|
||||||
};
|
|
||||||
debug_assert!(!flush_status.flushing);
|
|
||||||
let task = flush_status.pending_task.take().unwrap();
|
|
||||||
let region_id = flush_status.region_id;
|
|
||||||
let version_control = flush_status.version_control.clone();
|
|
||||||
|
|
||||||
self.schedule_flush(region_id, &version_control, task)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for FlushScheduler {
|
impl Drop for FlushScheduler {
|
||||||
@@ -1141,11 +1115,6 @@ struct FlushStatus {
|
|||||||
region_id: RegionId,
|
region_id: RegionId,
|
||||||
/// Version control of the region.
|
/// Version control of the region.
|
||||||
version_control: VersionControlRef,
|
version_control: VersionControlRef,
|
||||||
/// There is a flush task running.
|
|
||||||
///
|
|
||||||
/// It is possible that a region is not flushing but has pending task if the scheduler
|
|
||||||
/// doesn't schedules this region.
|
|
||||||
flushing: bool,
|
|
||||||
/// Task waiting for next flush.
|
/// Task waiting for next flush.
|
||||||
pending_task: Option<RegionFlushTask>,
|
pending_task: Option<RegionFlushTask>,
|
||||||
/// Pending ddl requests.
|
/// Pending ddl requests.
|
||||||
@@ -1161,7 +1130,6 @@ impl FlushStatus {
|
|||||||
FlushStatus {
|
FlushStatus {
|
||||||
region_id,
|
region_id,
|
||||||
version_control,
|
version_control,
|
||||||
flushing: false,
|
|
||||||
pending_task: None,
|
pending_task: None,
|
||||||
pending_ddls: Vec::new(),
|
pending_ddls: Vec::new(),
|
||||||
pending_writes: Vec::new(),
|
pending_writes: Vec::new(),
|
||||||
@@ -1253,10 +1221,12 @@ mod tests {
|
|||||||
// Global usage is still 1100.
|
// Global usage is still 1100.
|
||||||
manager.schedule_free_mem(200);
|
manager.schedule_free_mem(200);
|
||||||
assert!(manager.should_flush_engine());
|
assert!(manager.should_flush_engine());
|
||||||
|
assert!(manager.should_stall());
|
||||||
|
|
||||||
// More than global limit, but mutable (1100-200-450=450) is not enough (< 500).
|
// More than global limit, mutable (1100-200-450=450) is less than mutable limit (< 500).
|
||||||
manager.schedule_free_mem(450);
|
manager.schedule_free_mem(450);
|
||||||
assert!(!manager.should_flush_engine());
|
assert!(manager.should_flush_engine());
|
||||||
|
assert!(manager.should_stall());
|
||||||
|
|
||||||
// Now mutable is enough.
|
// Now mutable is enough.
|
||||||
manager.reserve_mem(50);
|
manager.reserve_mem(50);
|
||||||
@@ -1503,4 +1473,92 @@ mod tests {
|
|||||||
assert_eq!(2, total_rows, "append_mode should preserve duplicates");
|
assert_eq!(2, total_rows, "append_mode should preserve duplicates");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_schedule_pending_request_on_flush_success() {
|
||||||
|
common_telemetry::init_default_ut_logging();
|
||||||
|
let job_scheduler = Arc::new(VecScheduler::default());
|
||||||
|
let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
|
||||||
|
let (tx, _rx) = mpsc::channel(4);
|
||||||
|
let mut scheduler = env.mock_flush_scheduler();
|
||||||
|
let mut builder = VersionControlBuilder::new();
|
||||||
|
// Overwrites the empty memtable builder.
|
||||||
|
builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
|
||||||
|
let version_control = Arc::new(builder.build());
|
||||||
|
// Writes data to the memtable so it is not empty.
|
||||||
|
let version_data = version_control.current();
|
||||||
|
write_rows_to_version(&version_data.version, "host0", 0, 10);
|
||||||
|
let manifest_ctx = env
|
||||||
|
.mock_manifest_context(version_data.version.metadata.clone())
|
||||||
|
.await;
|
||||||
|
// Creates 2 tasks.
|
||||||
|
let mut tasks: Vec<_> = (0..2)
|
||||||
|
.map(|_| RegionFlushTask {
|
||||||
|
region_id: builder.region_id(),
|
||||||
|
reason: FlushReason::Others,
|
||||||
|
senders: Vec::new(),
|
||||||
|
request_sender: tx.clone(),
|
||||||
|
access_layer: env.access_layer.clone(),
|
||||||
|
listener: WorkerListener::default(),
|
||||||
|
engine_config: Arc::new(MitoConfig::default()),
|
||||||
|
row_group_size: None,
|
||||||
|
cache_manager: Arc::new(CacheManager::default()),
|
||||||
|
manifest_ctx: manifest_ctx.clone(),
|
||||||
|
index_options: IndexOptions::default(),
|
||||||
|
flush_semaphore: Arc::new(Semaphore::new(2)),
|
||||||
|
is_staging: false,
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
// Schedule first task.
|
||||||
|
let task = tasks.pop().unwrap();
|
||||||
|
scheduler
|
||||||
|
.schedule_flush(builder.region_id(), &version_control, task)
|
||||||
|
.unwrap();
|
||||||
|
// Should schedule 1 flush.
|
||||||
|
assert_eq!(1, scheduler.region_status.len());
|
||||||
|
assert_eq!(1, job_scheduler.num_jobs());
|
||||||
|
// Schedule second task.
|
||||||
|
let task = tasks.pop().unwrap();
|
||||||
|
scheduler
|
||||||
|
.schedule_flush(builder.region_id(), &version_control, task)
|
||||||
|
.unwrap();
|
||||||
|
assert!(
|
||||||
|
scheduler
|
||||||
|
.region_status
|
||||||
|
.get(&builder.region_id())
|
||||||
|
.unwrap()
|
||||||
|
.pending_task
|
||||||
|
.is_some()
|
||||||
|
);
|
||||||
|
|
||||||
|
// Check the new version.
|
||||||
|
let version_data = version_control.current();
|
||||||
|
assert_eq!(0, version_data.version.memtables.immutables()[0].id());
|
||||||
|
// Assumes the flush job is finished.
|
||||||
|
version_control.apply_edit(
|
||||||
|
Some(RegionEdit {
|
||||||
|
files_to_add: Vec::new(),
|
||||||
|
files_to_remove: Vec::new(),
|
||||||
|
timestamp_ms: None,
|
||||||
|
compaction_time_window: None,
|
||||||
|
flushed_entry_id: None,
|
||||||
|
flushed_sequence: None,
|
||||||
|
committed_sequence: None,
|
||||||
|
}),
|
||||||
|
&[0],
|
||||||
|
builder.file_purger(),
|
||||||
|
);
|
||||||
|
write_rows_to_version(&version_data.version, "host1", 0, 10);
|
||||||
|
scheduler.on_flush_success(builder.region_id());
|
||||||
|
assert_eq!(2, job_scheduler.num_jobs());
|
||||||
|
// The pending task is cleared.
|
||||||
|
assert!(
|
||||||
|
scheduler
|
||||||
|
.region_status
|
||||||
|
.get(&builder.region_id())
|
||||||
|
.unwrap()
|
||||||
|
.pending_task
|
||||||
|
.is_none()
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1020,9 +1020,15 @@ pub struct MockWriteBufferManager {
|
|||||||
should_stall: AtomicBool,
|
should_stall: AtomicBool,
|
||||||
memory_used: AtomicUsize,
|
memory_used: AtomicUsize,
|
||||||
memory_active: AtomicUsize,
|
memory_active: AtomicUsize,
|
||||||
|
flush_limit: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MockWriteBufferManager {
|
impl MockWriteBufferManager {
|
||||||
|
/// Set flush limit.
|
||||||
|
pub fn set_flush_limit(&mut self, flush_limit: usize) {
|
||||||
|
self.flush_limit = flush_limit;
|
||||||
|
}
|
||||||
|
|
||||||
/// Set whether to flush the engine.
|
/// Set whether to flush the engine.
|
||||||
pub fn set_should_flush(&self, value: bool) {
|
pub fn set_should_flush(&self, value: bool) {
|
||||||
self.should_flush.store(value, Ordering::Relaxed);
|
self.should_flush.store(value, Ordering::Relaxed);
|
||||||
@@ -1064,6 +1070,10 @@ impl WriteBufferManager for MockWriteBufferManager {
|
|||||||
fn memory_usage(&self) -> usize {
|
fn memory_usage(&self) -> usize {
|
||||||
self.memory_used.load(Ordering::Relaxed)
|
self.memory_used.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn flush_limit(&self) -> usize {
|
||||||
|
self.flush_limit
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn column_metadata_to_column_schema(metadata: &ColumnMetadata) -> api::v1::ColumnSchema {
|
pub fn column_metadata_to_column_schema(metadata: &ColumnMetadata) -> api::v1::ColumnSchema {
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ use common_telemetry::info;
|
|||||||
use common_telemetry::tracing::warn;
|
use common_telemetry::tracing::warn;
|
||||||
use humantime_serde::re::humantime;
|
use humantime_serde::re::humantime;
|
||||||
use snafu::{ResultExt, ensure};
|
use snafu::{ResultExt, ensure};
|
||||||
|
use store_api::logstore::LogStore;
|
||||||
use store_api::metadata::{
|
use store_api::metadata::{
|
||||||
InvalidSetRegionOptionRequestSnafu, MetadataError, RegionMetadata, RegionMetadataBuilder,
|
InvalidSetRegionOptionRequestSnafu, MetadataError, RegionMetadata, RegionMetadataBuilder,
|
||||||
RegionMetadataRef,
|
RegionMetadataRef,
|
||||||
@@ -41,7 +42,7 @@ use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest};
|
|||||||
use crate::sst::FormatType;
|
use crate::sst::FormatType;
|
||||||
use crate::worker::RegionWorkerLoop;
|
use crate::worker::RegionWorkerLoop;
|
||||||
|
|
||||||
impl<S> RegionWorkerLoop<S> {
|
impl<S: LogStore> RegionWorkerLoop<S> {
|
||||||
pub(crate) async fn handle_alter_request(
|
pub(crate) async fn handle_alter_request(
|
||||||
&mut self,
|
&mut self,
|
||||||
region_id: RegionId,
|
region_id: RegionId,
|
||||||
|
|||||||
@@ -30,16 +30,26 @@ use crate::request::{BuildIndexRequest, FlushFailed, FlushFinished, OnFailure, O
|
|||||||
use crate::sst::index::IndexBuildType;
|
use crate::sst::index::IndexBuildType;
|
||||||
use crate::worker::RegionWorkerLoop;
|
use crate::worker::RegionWorkerLoop;
|
||||||
|
|
||||||
impl<S> RegionWorkerLoop<S> {
|
impl<S: LogStore> RegionWorkerLoop<S> {
|
||||||
/// On region flush job failed.
|
/// On region flush job failed.
|
||||||
pub(crate) async fn handle_flush_failed(&mut self, region_id: RegionId, request: FlushFailed) {
|
pub(crate) async fn handle_flush_failed(&mut self, region_id: RegionId, request: FlushFailed) {
|
||||||
self.flush_scheduler.on_flush_failed(region_id, request.err);
|
self.flush_scheduler.on_flush_failed(region_id, request.err);
|
||||||
|
debug!(
|
||||||
|
"Flush failed for region {}, handling stalled requests",
|
||||||
|
region_id
|
||||||
|
);
|
||||||
|
// Maybe flush worker again.
|
||||||
|
self.maybe_flush_worker();
|
||||||
|
|
||||||
|
// Handle stalled requests.
|
||||||
|
self.handle_stalled_requests().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Checks whether the engine reaches flush threshold. If so, finds regions in this
|
/// Checks whether the engine reaches flush threshold. If so, finds regions in this
|
||||||
/// worker to flush.
|
/// worker to flush.
|
||||||
pub(crate) fn maybe_flush_worker(&mut self) {
|
pub(crate) fn maybe_flush_worker(&mut self) {
|
||||||
if !self.write_buffer_manager.should_flush_engine() {
|
if !self.write_buffer_manager.should_flush_engine() {
|
||||||
|
debug!("No need to flush worker");
|
||||||
// No need to flush worker.
|
// No need to flush worker.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -56,9 +66,7 @@ impl<S> RegionWorkerLoop<S> {
|
|||||||
let regions = self.regions.list_regions();
|
let regions = self.regions.list_regions();
|
||||||
let now = self.time_provider.current_time_millis();
|
let now = self.time_provider.current_time_millis();
|
||||||
let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
|
let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
|
||||||
let mut max_mutable_size = 0;
|
let mut pending_regions = vec![];
|
||||||
// Region with max mutable memtable size.
|
|
||||||
let mut max_mem_region = None;
|
|
||||||
|
|
||||||
for region in ®ions {
|
for region in ®ions {
|
||||||
if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
|
if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
|
||||||
@@ -67,12 +75,8 @@ impl<S> RegionWorkerLoop<S> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let version = region.version();
|
let version = region.version();
|
||||||
let region_mutable_size = version.memtables.mutable_usage();
|
let region_memtable_size =
|
||||||
// Tracks region with max mutable memtable size.
|
version.memtables.mutable_usage() + version.memtables.immutables_usage();
|
||||||
if region_mutable_size > max_mutable_size {
|
|
||||||
max_mem_region = Some(region);
|
|
||||||
max_mutable_size = region_mutable_size;
|
|
||||||
}
|
|
||||||
|
|
||||||
if region.last_flush_millis() < min_last_flush_time {
|
if region.last_flush_millis() < min_last_flush_time {
|
||||||
// If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
|
// If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
|
||||||
@@ -88,14 +92,38 @@ impl<S> RegionWorkerLoop<S> {
|
|||||||
®ion.version_control,
|
®ion.version_control,
|
||||||
task,
|
task,
|
||||||
)?;
|
)?;
|
||||||
|
} else if region_memtable_size > 0 {
|
||||||
|
// We should only consider regions with memtable size > 0 to flush.
|
||||||
|
pending_regions.push((region, region_memtable_size));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
pending_regions.sort_unstable_by_key(|(_, size)| std::cmp::Reverse(*size));
|
||||||
|
// The flush target is the mutable memtable limit (half of the global buffer).
|
||||||
|
// When memory is full, we aggressively flush regions until usage drops below this target,
|
||||||
|
// not just below the full limit.
|
||||||
|
let target_memory_usage = self.write_buffer_manager.flush_limit();
|
||||||
|
let mut memory_usage = self.write_buffer_manager.memory_usage();
|
||||||
|
|
||||||
// Flush memtable with max mutable memtable.
|
#[cfg(test)]
|
||||||
// TODO(yingwen): Maybe flush more tables to reduce write buffer size.
|
|
||||||
if let Some(region) = max_mem_region
|
|
||||||
&& !self.flush_scheduler.is_flush_requested(region.region_id)
|
|
||||||
{
|
{
|
||||||
|
debug!(
|
||||||
|
"Flushing regions on engine full, target memory usage: {}, memory usage: {}, pending regions: {:?}",
|
||||||
|
target_memory_usage,
|
||||||
|
memory_usage,
|
||||||
|
pending_regions
|
||||||
|
.iter()
|
||||||
|
.map(|(region, mem_size)| (region.region_id, mem_size))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
// Iterate over pending regions in descending order of their memory size and schedule flush tasks
|
||||||
|
// for each region until the overall memory usage drops below the flush limit.
|
||||||
|
for (region, region_mem_size) in pending_regions.into_iter() {
|
||||||
|
// Make sure the first region is always flushed.
|
||||||
|
if memory_usage < target_memory_usage {
|
||||||
|
// Stop flushing regions if memory usage is already below the flush limit
|
||||||
|
break;
|
||||||
|
}
|
||||||
let task = self.new_flush_task(
|
let task = self.new_flush_task(
|
||||||
region,
|
region,
|
||||||
FlushReason::EngineFull,
|
FlushReason::EngineFull,
|
||||||
@@ -103,8 +131,12 @@ impl<S> RegionWorkerLoop<S> {
|
|||||||
self.config.clone(),
|
self.config.clone(),
|
||||||
region.is_staging(),
|
region.is_staging(),
|
||||||
);
|
);
|
||||||
|
debug!("Scheduling flush task for region {}", region.region_id);
|
||||||
|
// Schedule a flush task for the current region
|
||||||
self.flush_scheduler
|
self.flush_scheduler
|
||||||
.schedule_flush(region.region_id, ®ion.version_control, task)?;
|
.schedule_flush(region.region_id, ®ion.version_control, task)?;
|
||||||
|
// Reduce memory usage by the region's size, ensuring it doesn't go negative
|
||||||
|
memory_usage = memory_usage.saturating_sub(region_mem_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -291,6 +323,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
|||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Maybe flush worker again.
|
||||||
|
self.maybe_flush_worker();
|
||||||
|
|
||||||
// Handle stalled requests.
|
// Handle stalled requests.
|
||||||
self.handle_stalled_requests().await;
|
self.handle_stalled_requests().await;
|
||||||
|
|
||||||
|
|||||||
@@ -4327,7 +4327,7 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
|
|||||||
.await;
|
.await;
|
||||||
|
|
||||||
// select metrics data
|
// select metrics data
|
||||||
let expected = "[[1753780559836,0.0052544,\"arm64\",\"claude-code\",\"claude-3-5-haiku-20241022\",\"25.0.0\",\"com.anthropic.claude_code\",\"\",\"1.0.62\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"],[1753780559836,2.244618,\"arm64\",\"claude-code\",\"claude-sonnet-4-20250514\",\"25.0.0\",\"com.anthropic.claude_code\",\"\",\"1.0.62\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"]]";
|
let expected = "[[1753780559836,2.244618,\"arm64\",\"claude-code\",\"claude-sonnet-4-20250514\",\"25.0.0\",\"com.anthropic.claude_code\",\"\",\"1.0.62\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"],[1753780559836,0.0052544,\"arm64\",\"claude-code\",\"claude-3-5-haiku-20241022\",\"25.0.0\",\"com.anthropic.claude_code\",\"\",\"1.0.62\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"]]";
|
||||||
validate_data(
|
validate_data(
|
||||||
"otlp_metrics_all_select",
|
"otlp_metrics_all_select",
|
||||||
&client,
|
&client,
|
||||||
@@ -4399,7 +4399,7 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
|
|||||||
.await;
|
.await;
|
||||||
|
|
||||||
// select metrics data
|
// select metrics data
|
||||||
let expected = "[[1753780559836,2.244618,\"claude-code\",\"claude-sonnet-4-20250514\",\"darwin\",\"25.0.0\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"],[1753780559836,0.0052544,\"claude-code\",\"claude-3-5-haiku-20241022\",\"darwin\",\"25.0.0\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"]]";
|
let expected = "[[1753780559836,0.0052544,\"claude-code\",\"claude-3-5-haiku-20241022\",\"darwin\",\"25.0.0\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"],[1753780559836,2.244618,\"claude-code\",\"claude-sonnet-4-20250514\",\"darwin\",\"25.0.0\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"]]";
|
||||||
validate_data(
|
validate_data(
|
||||||
"otlp_metrics_select",
|
"otlp_metrics_select",
|
||||||
&client,
|
&client,
|
||||||
|
|||||||
@@ -363,7 +363,7 @@ pub async fn test_metric_table_region_migration_by_sql(
|
|||||||
let result = cluster
|
let result = cluster
|
||||||
.frontend
|
.frontend
|
||||||
.instance
|
.instance
|
||||||
.do_query("select * from t1", query_ctx.clone())
|
.do_query("select * from t1 order by host desc", query_ctx.clone())
|
||||||
.await
|
.await
|
||||||
.remove(0);
|
.remove(0);
|
||||||
|
|
||||||
@@ -379,7 +379,7 @@ pub async fn test_metric_table_region_migration_by_sql(
|
|||||||
let result = cluster
|
let result = cluster
|
||||||
.frontend
|
.frontend
|
||||||
.instance
|
.instance
|
||||||
.do_query("select * from t2", query_ctx)
|
.do_query("select * from t2 order by job desc", query_ctx)
|
||||||
.await
|
.await
|
||||||
.remove(0);
|
.remove(0);
|
||||||
|
|
||||||
|
|||||||
@@ -37,8 +37,8 @@ SELECT * from t2;
|
|||||||
+------+-------------------------+-----+
|
+------+-------------------------+-----+
|
||||||
| job | ts | val |
|
| job | ts | val |
|
||||||
+------+-------------------------+-----+
|
+------+-------------------------+-----+
|
||||||
| job2 | 1970-01-01T00:00:00.001 | 1.0 |
|
|
||||||
| job1 | 1970-01-01T00:00:00 | 0.0 |
|
| job1 | 1970-01-01T00:00:00 | 0.0 |
|
||||||
|
| job2 | 1970-01-01T00:00:00.001 | 1.0 |
|
||||||
+------+-------------------------+-----+
|
+------+-------------------------+-----+
|
||||||
|
|
||||||
DROP TABLE t1;
|
DROP TABLE t1;
|
||||||
@@ -67,10 +67,10 @@ SELECT ts, val, __tsid, host, job FROM phy;
|
|||||||
+-------------------------+-----+----------------------+-------+------+
|
+-------------------------+-----+----------------------+-------+------+
|
||||||
| ts | val | __tsid | host | job |
|
| ts | val | __tsid | host | job |
|
||||||
+-------------------------+-----+----------------------+-------+------+
|
+-------------------------+-----+----------------------+-------+------+
|
||||||
| 1970-01-01T00:00:00.001 | 1.0 | 1128149335081630826 | host2 | |
|
| 1970-01-01T00:00:00.001 | 1.0 | 7947983149541006936 | host2 | |
|
||||||
| 1970-01-01T00:00:00 | 0.0 | 18067404594631612786 | host1 | |
|
| 1970-01-01T00:00:00 | 0.0 | 13882403126406556045 | host1 | |
|
||||||
| 1970-01-01T00:00:00.001 | 1.0 | 2176048834144407834 | | job2 |
|
| 1970-01-01T00:00:00 | 0.0 | 6248409809737953425 | | job1 |
|
||||||
| 1970-01-01T00:00:00 | 0.0 | 15980333303142110493 | | job1 |
|
| 1970-01-01T00:00:00.001 | 1.0 | 12867770218286207316 | | job2 |
|
||||||
+-------------------------+-----+----------------------+-------+------+
|
+-------------------------+-----+----------------------+-------+------+
|
||||||
|
|
||||||
DROP TABLE phy;
|
DROP TABLE phy;
|
||||||
@@ -123,8 +123,8 @@ SELECT * from t2;
|
|||||||
+------+-------------------------+-----+
|
+------+-------------------------+-----+
|
||||||
| job | ts | val |
|
| job | ts | val |
|
||||||
+------+-------------------------+-----+
|
+------+-------------------------+-----+
|
||||||
| job2 | 1970-01-01T00:00:00.001 | 1.0 |
|
|
||||||
| job1 | 1970-01-01T00:00:00 | 0.0 |
|
| job1 | 1970-01-01T00:00:00 | 0.0 |
|
||||||
|
| job2 | 1970-01-01T00:00:00.001 | 1.0 |
|
||||||
+------+-------------------------+-----+
|
+------+-------------------------+-----+
|
||||||
|
|
||||||
ADMIN flush_table('phy');
|
ADMIN flush_table('phy');
|
||||||
@@ -154,10 +154,10 @@ SELECT * from t2;
|
|||||||
+------+-------------------------+-----+
|
+------+-------------------------+-----+
|
||||||
| job | ts | val |
|
| job | ts | val |
|
||||||
+------+-------------------------+-----+
|
+------+-------------------------+-----+
|
||||||
| job2 | 1970-01-01T00:00:00.001 | 1.0 |
|
|
||||||
| job3 | 1970-01-01T00:00:00 | 0.0 |
|
| job3 | 1970-01-01T00:00:00 | 0.0 |
|
||||||
| job4 | 1970-01-01T00:00:00.001 | 1.0 |
|
|
||||||
| job1 | 1970-01-01T00:00:00 | 0.0 |
|
| job1 | 1970-01-01T00:00:00 | 0.0 |
|
||||||
|
| job4 | 1970-01-01T00:00:00.001 | 1.0 |
|
||||||
|
| job2 | 1970-01-01T00:00:00.001 | 1.0 |
|
||||||
+------+-------------------------+-----+
|
+------+-------------------------+-----+
|
||||||
|
|
||||||
DROP TABLE t1;
|
DROP TABLE t1;
|
||||||
|
|||||||
@@ -22,14 +22,14 @@ INSERT INTO test_ttl(ts, val, host) VALUES
|
|||||||
|
|
||||||
Affected Rows: 3
|
Affected Rows: 3
|
||||||
|
|
||||||
SELECT val, host FROM test_ttl;
|
SELECT val, host FROM test_ttl ORDER BY host;
|
||||||
|
|
||||||
+-----+-------+
|
+-----+-------+
|
||||||
| val | host |
|
| val | host |
|
||||||
+-----+-------+
|
+-----+-------+
|
||||||
|
| 1.0 | host1 |
|
||||||
| 2.0 | host2 |
|
| 2.0 | host2 |
|
||||||
| 3.0 | host3 |
|
| 3.0 | host3 |
|
||||||
| 1.0 | host1 |
|
|
||||||
+-----+-------+
|
+-----+-------+
|
||||||
|
|
||||||
-- SQLNESS SLEEP 2s
|
-- SQLNESS SLEEP 2s
|
||||||
@@ -83,26 +83,26 @@ ADMIN compact_table('phy');
|
|||||||
+----------------------------+
|
+----------------------------+
|
||||||
|
|
||||||
--- should not be expired --
|
--- should not be expired --
|
||||||
SELECT val, host FROM test_ttl;
|
SELECT val, host FROM test_ttl ORDER BY host;
|
||||||
|
|
||||||
+-----+-------+
|
+-----+-------+
|
||||||
| val | host |
|
| val | host |
|
||||||
+-----+-------+
|
+-----+-------+
|
||||||
|
| 1.0 | host1 |
|
||||||
| 2.0 | host2 |
|
| 2.0 | host2 |
|
||||||
| 3.0 | host3 |
|
| 3.0 | host3 |
|
||||||
| 1.0 | host1 |
|
|
||||||
+-----+-------+
|
+-----+-------+
|
||||||
|
|
||||||
-- restart the db, ensure everything is ok
|
-- restart the db, ensure everything is ok
|
||||||
-- SQLNESS ARG restart=true
|
-- SQLNESS ARG restart=true
|
||||||
SELECT val, host FROM test_ttl;
|
SELECT val, host FROM test_ttl ORDER BY host;
|
||||||
|
|
||||||
+-----+-------+
|
+-----+-------+
|
||||||
| val | host |
|
| val | host |
|
||||||
+-----+-------+
|
+-----+-------+
|
||||||
|
| 1.0 | host1 |
|
||||||
| 2.0 | host2 |
|
| 2.0 | host2 |
|
||||||
| 3.0 | host3 |
|
| 3.0 | host3 |
|
||||||
| 1.0 | host1 |
|
|
||||||
+-----+-------+
|
+-----+-------+
|
||||||
|
|
||||||
DROP TABLE test_ttl;
|
DROP TABLE test_ttl;
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ INSERT INTO test_ttl(ts, val, host) VALUES
|
|||||||
(now(), 2, 'host2'),
|
(now(), 2, 'host2'),
|
||||||
(now(), 3, 'host3');
|
(now(), 3, 'host3');
|
||||||
|
|
||||||
SELECT val, host FROM test_ttl;
|
SELECT val, host FROM test_ttl ORDER BY host;
|
||||||
|
|
||||||
-- SQLNESS SLEEP 2s
|
-- SQLNESS SLEEP 2s
|
||||||
ADMIN flush_table('phy');
|
ADMIN flush_table('phy');
|
||||||
@@ -35,11 +35,11 @@ ADMIN flush_table('phy');
|
|||||||
ADMIN compact_table('phy');
|
ADMIN compact_table('phy');
|
||||||
|
|
||||||
--- should not be expired --
|
--- should not be expired --
|
||||||
SELECT val, host FROM test_ttl;
|
SELECT val, host FROM test_ttl ORDER BY host;
|
||||||
|
|
||||||
-- restart the db, ensure everything is ok
|
-- restart the db, ensure everything is ok
|
||||||
-- SQLNESS ARG restart=true
|
-- SQLNESS ARG restart=true
|
||||||
SELECT val, host FROM test_ttl;
|
SELECT val, host FROM test_ttl ORDER BY host;
|
||||||
|
|
||||||
DROP TABLE test_ttl;
|
DROP TABLE test_ttl;
|
||||||
|
|
||||||
|
|||||||
@@ -13,14 +13,14 @@ INSERT INTO test_ttl(ts, val, host) VALUES
|
|||||||
|
|
||||||
Affected Rows: 3
|
Affected Rows: 3
|
||||||
|
|
||||||
SELECT val, host FROM test_ttl;
|
SELECT val, host FROM test_ttl ORDER BY host;
|
||||||
|
|
||||||
+-----+-------+
|
+-----+-------+
|
||||||
| val | host |
|
| val | host |
|
||||||
+-----+-------+
|
+-----+-------+
|
||||||
|
| 1.0 | host1 |
|
||||||
| 2.0 | host2 |
|
| 2.0 | host2 |
|
||||||
| 3.0 | host3 |
|
| 3.0 | host3 |
|
||||||
| 1.0 | host1 |
|
|
||||||
+-----+-------+
|
+-----+-------+
|
||||||
|
|
||||||
-- SQLNESS SLEEP 2s
|
-- SQLNESS SLEEP 2s
|
||||||
@@ -74,26 +74,26 @@ ADMIN compact_table('phy');
|
|||||||
+----------------------------+
|
+----------------------------+
|
||||||
|
|
||||||
--- should not be expired --
|
--- should not be expired --
|
||||||
SELECT val, host FROM test_ttl;
|
SELECT val, host FROM test_ttl ORDER BY host;
|
||||||
|
|
||||||
+-----+-------+
|
+-----+-------+
|
||||||
| val | host |
|
| val | host |
|
||||||
+-----+-------+
|
+-----+-------+
|
||||||
|
| 1.0 | host1 |
|
||||||
| 2.0 | host2 |
|
| 2.0 | host2 |
|
||||||
| 3.0 | host3 |
|
| 3.0 | host3 |
|
||||||
| 1.0 | host1 |
|
|
||||||
+-----+-------+
|
+-----+-------+
|
||||||
|
|
||||||
-- restart the db, ensure everything is ok
|
-- restart the db, ensure everything is ok
|
||||||
-- SQLNESS ARG restart=true
|
-- SQLNESS ARG restart=true
|
||||||
SELECT val, host FROM test_ttl;
|
SELECT val, host FROM test_ttl ORDER BY host;
|
||||||
|
|
||||||
+-----+-------+
|
+-----+-------+
|
||||||
| val | host |
|
| val | host |
|
||||||
+-----+-------+
|
+-----+-------+
|
||||||
|
| 1.0 | host1 |
|
||||||
| 2.0 | host2 |
|
| 2.0 | host2 |
|
||||||
| 3.0 | host3 |
|
| 3.0 | host3 |
|
||||||
| 1.0 | host1 |
|
|
||||||
+-----+-------+
|
+-----+-------+
|
||||||
|
|
||||||
DROP TABLE test_ttl;
|
DROP TABLE test_ttl;
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ INSERT INTO test_ttl(ts, val, host) VALUES
|
|||||||
(now(), 2, 'host2'),
|
(now(), 2, 'host2'),
|
||||||
(now(), 3, 'host3');
|
(now(), 3, 'host3');
|
||||||
|
|
||||||
SELECT val, host FROM test_ttl;
|
SELECT val, host FROM test_ttl ORDER BY host;
|
||||||
|
|
||||||
-- SQLNESS SLEEP 2s
|
-- SQLNESS SLEEP 2s
|
||||||
ADMIN flush_table('phy');
|
ADMIN flush_table('phy');
|
||||||
@@ -29,11 +29,11 @@ ADMIN flush_table('phy');
|
|||||||
ADMIN compact_table('phy');
|
ADMIN compact_table('phy');
|
||||||
|
|
||||||
--- should not be expired --
|
--- should not be expired --
|
||||||
SELECT val, host FROM test_ttl;
|
SELECT val, host FROM test_ttl ORDER BY host;
|
||||||
|
|
||||||
-- restart the db, ensure everything is ok
|
-- restart the db, ensure everything is ok
|
||||||
-- SQLNESS ARG restart=true
|
-- SQLNESS ARG restart=true
|
||||||
SELECT val, host FROM test_ttl;
|
SELECT val, host FROM test_ttl ORDER BY host;
|
||||||
|
|
||||||
DROP TABLE test_ttl;
|
DROP TABLE test_ttl;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user