mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-25 07:30:02 +00:00
Compare commits
6 Commits
flow/adjus
...
basic_with
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
22c61432f6 | ||
|
|
91f373e66e | ||
|
|
1d53dd26ae | ||
|
|
01796c9cc0 | ||
|
|
9469a8f8f2 | ||
|
|
2fabe346a1 |
@@ -30,7 +30,7 @@ update_helm_charts_version() {
|
||||
|
||||
# Commit the changes.
|
||||
git add .
|
||||
git commit -m "chore: Update GreptimeDB version to ${VERSION}"
|
||||
git commit -s -m "chore: Update GreptimeDB version to ${VERSION}"
|
||||
git push origin $BRANCH_NAME
|
||||
|
||||
# Create a Pull Request.
|
||||
|
||||
@@ -26,7 +26,7 @@ update_homebrew_greptime_version() {
|
||||
|
||||
# Commit the changes.
|
||||
git add .
|
||||
git commit -m "chore: Update GreptimeDB version to ${VERSION}"
|
||||
git commit -s -m "chore: Update GreptimeDB version to ${VERSION}"
|
||||
git push origin $BRANCH_NAME
|
||||
|
||||
# Create a Pull Request.
|
||||
|
||||
50
Cargo.lock
generated
50
Cargo.lock
generated
@@ -3252,7 +3252,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"arrow-array 54.2.1",
|
||||
@@ -3303,7 +3303,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-catalog"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"async-trait",
|
||||
@@ -3323,7 +3323,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-catalog-listing"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"arrow-schema 54.3.1",
|
||||
@@ -3346,7 +3346,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-common"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"arrow 54.2.1",
|
||||
@@ -3371,7 +3371,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-common-runtime"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"log",
|
||||
"tokio",
|
||||
@@ -3380,12 +3380,12 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-doc"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
|
||||
[[package]]
|
||||
name = "datafusion-execution"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"dashmap",
|
||||
@@ -3403,7 +3403,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-expr"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"chrono",
|
||||
@@ -3423,7 +3423,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-expr-common"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"datafusion-common",
|
||||
@@ -3434,7 +3434,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-functions"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"arrow-buffer 54.3.1",
|
||||
@@ -3463,7 +3463,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-functions-aggregate"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"arrow 54.2.1",
|
||||
@@ -3484,7 +3484,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-functions-aggregate-common"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"arrow 54.2.1",
|
||||
@@ -3496,7 +3496,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-functions-nested"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"arrow-array 54.2.1",
|
||||
@@ -3518,7 +3518,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-functions-table"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"async-trait",
|
||||
@@ -3533,7 +3533,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-functions-window"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"datafusion-common",
|
||||
"datafusion-doc",
|
||||
@@ -3549,7 +3549,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-functions-window-common"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"datafusion-common",
|
||||
"datafusion-physical-expr-common",
|
||||
@@ -3558,7 +3558,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-macros"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"datafusion-expr",
|
||||
"quote",
|
||||
@@ -3568,7 +3568,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-optimizer"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"chrono",
|
||||
@@ -3586,7 +3586,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-physical-expr"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"arrow 54.2.1",
|
||||
@@ -3609,7 +3609,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-physical-expr-common"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"arrow 54.2.1",
|
||||
@@ -3622,7 +3622,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-physical-optimizer"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"arrow-schema 54.3.1",
|
||||
@@ -3643,7 +3643,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-physical-plan"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"arrow 54.2.1",
|
||||
@@ -3673,7 +3673,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-sql"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"arrow-array 54.2.1",
|
||||
@@ -3691,7 +3691,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-substrait"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"async-recursion",
|
||||
"async-trait",
|
||||
@@ -5133,7 +5133,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=2dca1dc67862d7b410838aef81232274c019b3f6#2dca1dc67862d7b410838aef81232274c019b3f6"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=454c52634c3bac27de10bf0d85d5533eed1cf03f#454c52634c3bac27de10bf0d85d5533eed1cf03f"
|
||||
dependencies = [
|
||||
"prost 0.13.5",
|
||||
"serde",
|
||||
|
||||
20
Cargo.toml
20
Cargo.toml
@@ -116,15 +116,15 @@ clap = { version = "4.4", features = ["derive"] }
|
||||
config = "0.13.0"
|
||||
crossbeam-utils = "0.8"
|
||||
dashmap = "6.1"
|
||||
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
deadpool = "0.12"
|
||||
deadpool-postgres = "0.14"
|
||||
derive_builder = "0.20"
|
||||
@@ -133,7 +133,7 @@ etcd-client = "0.14"
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2dca1dc67862d7b410838aef81232274c019b3f6" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "454c52634c3bac27de10bf0d85d5533eed1cf03f" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
|
||||
@@ -49,6 +49,7 @@ max_send_message_size = "512MB"
|
||||
## - `transport`: only enable gRPC transport compression (zstd)
|
||||
## - `arrow_ipc`: only enable Arrow IPC compression (lz4)
|
||||
## - `all`: enable all compression.
|
||||
## Default to `none`
|
||||
flight_compression = "arrow_ipc"
|
||||
|
||||
## gRPC server TLS options, see `mysql.tls` section.
|
||||
|
||||
@@ -59,6 +59,7 @@ runtime_size = 8
|
||||
## - `transport`: only enable gRPC transport compression (zstd)
|
||||
## - `arrow_ipc`: only enable Arrow IPC compression (lz4)
|
||||
## - `all`: enable all compression.
|
||||
## Default to `none`
|
||||
flight_compression = "arrow_ipc"
|
||||
|
||||
## gRPC server TLS options, see `mysql.tls` section.
|
||||
|
||||
@@ -58,6 +58,7 @@ where
|
||||
info!("{desc}, average operation cost: {cost:.2} ms");
|
||||
}
|
||||
|
||||
/// Command to benchmark table metadata operations.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct BenchTableMetadataCommand {
|
||||
#[clap(long)]
|
||||
|
||||
@@ -244,6 +244,18 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
#[snafu(display("Unsupported memory backend"))]
|
||||
UnsupportedMemoryBackend {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("File path invalid: {}", msg))]
|
||||
InvalidFilePath {
|
||||
msg: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -262,6 +274,8 @@ impl ErrorExt for Error {
|
||||
| Error::ConnectEtcd { .. }
|
||||
| Error::CreateDir { .. }
|
||||
| Error::EmptyResult { .. }
|
||||
| Error::InvalidFilePath { .. }
|
||||
| Error::UnsupportedMemoryBackend { .. }
|
||||
| Error::ParseProxyOpts { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
Error::StartProcedureManager { source, .. }
|
||||
|
||||
@@ -50,6 +50,7 @@ enum ExportTarget {
|
||||
All,
|
||||
}
|
||||
|
||||
/// Command for exporting data from the GreptimeDB.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct ExportCommand {
|
||||
/// Server address to connect
|
||||
|
||||
@@ -40,6 +40,7 @@ enum ImportTarget {
|
||||
All,
|
||||
}
|
||||
|
||||
/// Command to import data from a directory into a GreptimeDB instance.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct ImportCommand {
|
||||
/// Server address to connect
|
||||
|
||||
@@ -20,7 +20,7 @@ mod import;
|
||||
mod meta_snapshot;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use clap::Parser;
|
||||
use clap::{Parser, Subcommand};
|
||||
use common_error::ext::BoxedError;
|
||||
pub use database::DatabaseClient;
|
||||
use error::Result;
|
||||
@@ -28,7 +28,7 @@ use error::Result;
|
||||
pub use crate::bench::BenchTableMetadataCommand;
|
||||
pub use crate::export::ExportCommand;
|
||||
pub use crate::import::ImportCommand;
|
||||
pub use crate::meta_snapshot::{MetaRestoreCommand, MetaSnapshotCommand};
|
||||
pub use crate::meta_snapshot::{MetaCommand, MetaInfoCommand, MetaRestoreCommand, MetaSaveCommand};
|
||||
|
||||
#[async_trait]
|
||||
pub trait Tool: Send + Sync {
|
||||
@@ -51,3 +51,19 @@ impl AttachCommand {
|
||||
unimplemented!("Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373")
|
||||
}
|
||||
}
|
||||
|
||||
/// Subcommand for data operations like export and import.
|
||||
#[derive(Subcommand)]
|
||||
pub enum DataCommand {
|
||||
Export(ExportCommand),
|
||||
Import(ImportCommand),
|
||||
}
|
||||
|
||||
impl DataCommand {
|
||||
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
|
||||
match self {
|
||||
DataCommand::Export(cmd) => cmd.build().await,
|
||||
DataCommand::Import(cmd) => cmd.build().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,10 +12,11 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use clap::Parser;
|
||||
use clap::{Parser, Subcommand};
|
||||
use common_base::secrets::{ExposeSecret, SecretString};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::kv_backend::chroot::ChrootKvBackend;
|
||||
@@ -26,10 +27,50 @@ use meta_srv::bootstrap::create_etcd_client;
|
||||
use meta_srv::metasrv::BackendImpl;
|
||||
use object_store::services::{Fs, S3};
|
||||
use object_store::ObjectStore;
|
||||
use snafu::ResultExt;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::error::{KvBackendNotSetSnafu, OpenDalSnafu, S3ConfigNotSetSnafu};
|
||||
use crate::error::{
|
||||
InvalidFilePathSnafu, KvBackendNotSetSnafu, OpenDalSnafu, S3ConfigNotSetSnafu,
|
||||
UnsupportedMemoryBackendSnafu,
|
||||
};
|
||||
use crate::Tool;
|
||||
|
||||
/// Subcommand for metadata snapshot management.
|
||||
#[derive(Subcommand)]
|
||||
pub enum MetaCommand {
|
||||
#[clap(subcommand)]
|
||||
Snapshot(MetaSnapshotCommand),
|
||||
}
|
||||
|
||||
impl MetaCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
match self {
|
||||
MetaCommand::Snapshot(cmd) => cmd.build().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Subcommand for metadata snapshot operations. such as save, restore and info.
|
||||
#[derive(Subcommand)]
|
||||
pub enum MetaSnapshotCommand {
|
||||
/// Export metadata snapshot tool.
|
||||
Save(MetaSaveCommand),
|
||||
/// Restore metadata snapshot tool.
|
||||
Restore(MetaRestoreCommand),
|
||||
/// Explore metadata from metadata snapshot.
|
||||
Info(MetaInfoCommand),
|
||||
}
|
||||
|
||||
impl MetaSnapshotCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
match self {
|
||||
MetaSnapshotCommand::Save(cmd) => cmd.build().await,
|
||||
MetaSnapshotCommand::Restore(cmd) => cmd.build().await,
|
||||
MetaSnapshotCommand::Info(cmd) => cmd.build().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Parser)]
|
||||
struct MetaConnection {
|
||||
/// The endpoint of store. one of etcd, pg or mysql.
|
||||
@@ -91,6 +132,9 @@ impl MetaConnection {
|
||||
.await
|
||||
.map_err(BoxedError::new)?)
|
||||
}
|
||||
Some(BackendImpl::MemoryStore) => UnsupportedMemoryBackendSnafu
|
||||
.fail()
|
||||
.map_err(BoxedError::new),
|
||||
_ => KvBackendNotSetSnafu { backend: "all" }
|
||||
.fail()
|
||||
.map_err(BoxedError::new),
|
||||
@@ -170,7 +214,7 @@ impl S3Config {
|
||||
/// It will dump the metadata snapshot to local file or s3 bucket.
|
||||
/// The snapshot file will be in binary format.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct MetaSnapshotCommand {
|
||||
pub struct MetaSaveCommand {
|
||||
/// The connection to the metadata store.
|
||||
#[clap(flatten)]
|
||||
connection: MetaConnection,
|
||||
@@ -196,7 +240,7 @@ fn create_local_file_object_store(root: &str) -> Result<ObjectStore, BoxedError>
|
||||
Ok(object_store)
|
||||
}
|
||||
|
||||
impl MetaSnapshotCommand {
|
||||
impl MetaSaveCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
let kvbackend = self.connection.build().await?;
|
||||
let output_dir = &self.output_dir;
|
||||
@@ -327,3 +371,89 @@ impl Tool for MetaRestoreTool {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Explore metadata from metadata snapshot.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct MetaInfoCommand {
|
||||
/// The s3 config.
|
||||
#[clap(flatten)]
|
||||
s3_config: S3Config,
|
||||
/// The name of the target snapshot file. we will add the file extension automatically.
|
||||
#[clap(long, default_value = "metadata_snapshot")]
|
||||
file_name: String,
|
||||
/// The query string to filter the metadata.
|
||||
#[clap(long, default_value = "*")]
|
||||
inspect_key: String,
|
||||
/// The limit of the metadata to query.
|
||||
#[clap(long)]
|
||||
limit: Option<usize>,
|
||||
}
|
||||
|
||||
pub struct MetaInfoTool {
|
||||
inner: ObjectStore,
|
||||
source_file: String,
|
||||
inspect_key: String,
|
||||
limit: Option<usize>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Tool for MetaInfoTool {
|
||||
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
|
||||
let result = MetadataSnapshotManager::info(
|
||||
&self.inner,
|
||||
&self.source_file,
|
||||
&self.inspect_key,
|
||||
self.limit,
|
||||
)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
for item in result {
|
||||
println!("{}", item);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl MetaInfoCommand {
|
||||
fn decide_object_store_root_for_local_store(
|
||||
file_path: &str,
|
||||
) -> Result<(&str, &str), BoxedError> {
|
||||
let path = Path::new(file_path);
|
||||
let parent = path
|
||||
.parent()
|
||||
.and_then(|p| p.to_str())
|
||||
.context(InvalidFilePathSnafu { msg: file_path })
|
||||
.map_err(BoxedError::new)?;
|
||||
let file_name = path
|
||||
.file_name()
|
||||
.and_then(|f| f.to_str())
|
||||
.context(InvalidFilePathSnafu { msg: file_path })
|
||||
.map_err(BoxedError::new)?;
|
||||
let root = if parent.is_empty() { "." } else { parent };
|
||||
Ok((root, file_name))
|
||||
}
|
||||
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
let object_store = self.s3_config.build("").map_err(BoxedError::new)?;
|
||||
if let Some(store) = object_store {
|
||||
let tool = MetaInfoTool {
|
||||
inner: store,
|
||||
source_file: self.file_name.clone(),
|
||||
inspect_key: self.inspect_key.clone(),
|
||||
limit: self.limit,
|
||||
};
|
||||
Ok(Box::new(tool))
|
||||
} else {
|
||||
let (root, file_name) =
|
||||
Self::decide_object_store_root_for_local_store(&self.file_name)?;
|
||||
let object_store = create_local_file_object_store(root)?;
|
||||
let tool = MetaInfoTool {
|
||||
inner: object_store,
|
||||
source_file: file_name.to_string(),
|
||||
inspect_key: self.inspect_key.clone(),
|
||||
limit: self.limit,
|
||||
};
|
||||
Ok(Box::new(tool))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -162,12 +162,23 @@ impl Client {
|
||||
.as_bytes() as usize
|
||||
}
|
||||
|
||||
pub fn make_flight_client(&self) -> Result<FlightClient> {
|
||||
pub fn make_flight_client(
|
||||
&self,
|
||||
send_compression: bool,
|
||||
accept_compression: bool,
|
||||
) -> Result<FlightClient> {
|
||||
let (addr, channel) = self.find_channel()?;
|
||||
|
||||
let client = FlightServiceClient::new(channel)
|
||||
let mut client = FlightServiceClient::new(channel)
|
||||
.max_decoding_message_size(self.max_grpc_recv_message_size())
|
||||
.max_encoding_message_size(self.max_grpc_send_message_size());
|
||||
// todo(hl): support compression methods.
|
||||
if send_compression {
|
||||
client = client.send_compressed(CompressionEncoding::Zstd);
|
||||
}
|
||||
if accept_compression {
|
||||
client = client.accept_compressed(CompressionEncoding::Zstd);
|
||||
}
|
||||
|
||||
Ok(FlightClient { addr, client })
|
||||
}
|
||||
|
||||
@@ -49,7 +49,16 @@ impl NodeManager for NodeClients {
|
||||
async fn datanode(&self, datanode: &Peer) -> DatanodeRef {
|
||||
let client = self.get_client(datanode).await;
|
||||
|
||||
Arc::new(RegionRequester::new(client))
|
||||
let ChannelConfig {
|
||||
send_compression,
|
||||
accept_compression,
|
||||
..
|
||||
} = self.channel_manager.config();
|
||||
Arc::new(RegionRequester::new(
|
||||
client,
|
||||
*send_compression,
|
||||
*accept_compression,
|
||||
))
|
||||
}
|
||||
|
||||
async fn flownode(&self, flownode: &Peer) -> FlownodeRef {
|
||||
|
||||
@@ -287,7 +287,7 @@ impl Database {
|
||||
let mut request = tonic::Request::new(request);
|
||||
Self::put_hints(request.metadata_mut(), hints)?;
|
||||
|
||||
let mut client = self.client.make_flight_client()?;
|
||||
let mut client = self.client.make_flight_client(false, false)?;
|
||||
|
||||
let response = client.mut_inner().do_get(request).await.or_else(|e| {
|
||||
let tonic_code = e.code();
|
||||
@@ -409,7 +409,7 @@ impl Database {
|
||||
MetadataValue::from_str(db_to_put).context(InvalidTonicMetadataValueSnafu)?,
|
||||
);
|
||||
|
||||
let mut client = self.client.make_flight_client()?;
|
||||
let mut client = self.client.make_flight_client(false, false)?;
|
||||
let response = client.mut_inner().do_put(request).await?;
|
||||
let response = response
|
||||
.into_inner()
|
||||
|
||||
@@ -46,6 +46,8 @@ use crate::{metrics, Client, Error};
|
||||
#[derive(Debug)]
|
||||
pub struct RegionRequester {
|
||||
client: Client,
|
||||
send_compression: bool,
|
||||
accept_compression: bool,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -89,12 +91,18 @@ impl Datanode for RegionRequester {
|
||||
}
|
||||
|
||||
impl RegionRequester {
|
||||
pub fn new(client: Client) -> Self {
|
||||
Self { client }
|
||||
pub fn new(client: Client, send_compression: bool, accept_compression: bool) -> Self {
|
||||
Self {
|
||||
client,
|
||||
send_compression,
|
||||
accept_compression,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn do_get_inner(&self, ticket: Ticket) -> Result<SendableRecordBatchStream> {
|
||||
let mut flight_client = self.client.make_flight_client()?;
|
||||
let mut flight_client = self
|
||||
.client
|
||||
.make_flight_client(self.send_compression, self.accept_compression)?;
|
||||
let response = flight_client
|
||||
.mut_inner()
|
||||
.do_get(ticket)
|
||||
|
||||
@@ -146,6 +146,7 @@ mod tests {
|
||||
let output_dir = tempfile::tempdir().unwrap();
|
||||
let cli = cli::Command::parse_from([
|
||||
"cli",
|
||||
"data",
|
||||
"export",
|
||||
"--addr",
|
||||
"127.0.0.1:4000",
|
||||
|
||||
@@ -364,12 +364,16 @@ impl StartCommand {
|
||||
|
||||
// frontend to datanode need not timeout.
|
||||
// Some queries are expected to take long time.
|
||||
let channel_config = ChannelConfig {
|
||||
let mut channel_config = ChannelConfig {
|
||||
timeout: None,
|
||||
tcp_nodelay: opts.datanode.client.tcp_nodelay,
|
||||
connect_timeout: Some(opts.datanode.client.connect_timeout),
|
||||
..Default::default()
|
||||
};
|
||||
if opts.grpc.flight_compression.transport_compression() {
|
||||
channel_config.accept_compression = true;
|
||||
channel_config.send_compression = true;
|
||||
}
|
||||
let client = NodeClients::new(channel_config);
|
||||
|
||||
let instance = FrontendBuilder::new(
|
||||
|
||||
@@ -1,90 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_macro::admin_fn;
|
||||
use common_query::error::{
|
||||
InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
|
||||
};
|
||||
use common_query::prelude::Signature;
|
||||
use datafusion::logical_expr::Volatility;
|
||||
use datatypes::value::{Value, ValueRef};
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ensure;
|
||||
use store_api::storage::ConcreteDataType;
|
||||
|
||||
use crate::handlers::FlowServiceHandlerRef;
|
||||
use crate::helper::parse_catalog_flow;
|
||||
|
||||
fn adjust_signature() -> Signature {
|
||||
Signature::exact(
|
||||
vec![
|
||||
ConcreteDataType::string_datatype(), // flow name
|
||||
ConcreteDataType::uint64_datatype(), // min_run_interval in seconds
|
||||
ConcreteDataType::uint64_datatype(), // max filter number per query
|
||||
],
|
||||
Volatility::Immutable,
|
||||
)
|
||||
}
|
||||
|
||||
#[admin_fn(
|
||||
name = AdjustFlowFunction,
|
||||
display_name = adjust_flow,
|
||||
sig_fn = adjust_signature,
|
||||
ret = uint64
|
||||
)]
|
||||
pub(crate) async fn adjust_flow(
|
||||
flow_service_handler: &FlowServiceHandlerRef,
|
||||
query_ctx: &QueryContextRef,
|
||||
params: &[ValueRef<'_>],
|
||||
) -> Result<Value> {
|
||||
ensure!(
|
||||
params.len() == 3,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect 3, have: {}",
|
||||
params.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let (flow_name, min_run_interval, max_filter_num) = match (params[0], params[1], params[2]) {
|
||||
(
|
||||
ValueRef::String(flow_name),
|
||||
ValueRef::UInt64(min_run_interval),
|
||||
ValueRef::UInt64(max_filter_num),
|
||||
) => (flow_name, min_run_interval, max_filter_num),
|
||||
_ => {
|
||||
return UnsupportedInputDataTypeSnafu {
|
||||
function: "adjust_flow",
|
||||
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
};
|
||||
|
||||
let (catalog_name, flow_name) = parse_catalog_flow(flow_name, query_ctx)?;
|
||||
|
||||
let res = flow_service_handler
|
||||
.adjust(
|
||||
&catalog_name,
|
||||
&flow_name,
|
||||
min_run_interval,
|
||||
max_filter_num as usize,
|
||||
query_ctx.clone(),
|
||||
)
|
||||
.await?;
|
||||
let affected_rows = res.affected_rows;
|
||||
|
||||
Ok(Value::from(affected_rows))
|
||||
}
|
||||
@@ -26,7 +26,6 @@ use flush_compact_table::{CompactTableFunction, FlushTableFunction};
|
||||
use migrate_region::MigrateRegionFunction;
|
||||
use remove_region_follower::RemoveRegionFollowerFunction;
|
||||
|
||||
use crate::adjust_flow::AdjustFlowFunction;
|
||||
use crate::flush_flow::FlushFlowFunction;
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
|
||||
@@ -44,6 +43,5 @@ impl AdminFunction {
|
||||
registry.register_async(Arc::new(FlushTableFunction));
|
||||
registry.register_async(Arc::new(CompactTableFunction));
|
||||
registry.register_async(Arc::new(FlushFlowFunction));
|
||||
registry.register_async(Arc::new(AdjustFlowFunction));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,19 +12,21 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_macro::admin_fn;
|
||||
use common_query::error::{
|
||||
InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
|
||||
ExecuteSnafu, InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result,
|
||||
UnsupportedInputDataTypeSnafu,
|
||||
};
|
||||
use common_query::prelude::Signature;
|
||||
use datafusion::logical_expr::Volatility;
|
||||
use datatypes::value::{Value, ValueRef};
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ensure;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use sql::parser::ParserContext;
|
||||
use store_api::storage::ConcreteDataType;
|
||||
|
||||
use crate::handlers::FlowServiceHandlerRef;
|
||||
use crate::helper::parse_catalog_flow;
|
||||
|
||||
fn flush_signature() -> Signature {
|
||||
Signature::uniform(
|
||||
@@ -45,6 +47,20 @@ pub(crate) async fn flush_flow(
|
||||
query_ctx: &QueryContextRef,
|
||||
params: &[ValueRef<'_>],
|
||||
) -> Result<Value> {
|
||||
let (catalog_name, flow_name) = parse_flush_flow(params, query_ctx)?;
|
||||
|
||||
let res = flow_service_handler
|
||||
.flush(&catalog_name, &flow_name, query_ctx.clone())
|
||||
.await?;
|
||||
let affected_rows = res.affected_rows;
|
||||
|
||||
Ok(Value::from(affected_rows))
|
||||
}
|
||||
|
||||
fn parse_flush_flow(
|
||||
params: &[ValueRef<'_>],
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> Result<(String, String)> {
|
||||
ensure!(
|
||||
params.len() == 1,
|
||||
InvalidFuncArgsSnafu {
|
||||
@@ -54,6 +70,7 @@ pub(crate) async fn flush_flow(
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let ValueRef::String(flow_name) = params[0] else {
|
||||
return UnsupportedInputDataTypeSnafu {
|
||||
function: "flush_flow",
|
||||
@@ -61,14 +78,27 @@ pub(crate) async fn flush_flow(
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
let (catalog_name, flow_name) = parse_catalog_flow(flow_name, query_ctx)?;
|
||||
let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect())
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExecuteSnafu)?;
|
||||
|
||||
let res = flow_service_handler
|
||||
.flush(&catalog_name, &flow_name, query_ctx.clone())
|
||||
.await?;
|
||||
let affected_rows = res.affected_rows;
|
||||
|
||||
Ok(Value::from(affected_rows))
|
||||
let (catalog_name, flow_name) = match &obj_name.0[..] {
|
||||
[flow_name] => (
|
||||
query_ctx.current_catalog().to_string(),
|
||||
flow_name.value.clone(),
|
||||
),
|
||||
[catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
|
||||
_ => {
|
||||
return InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
|
||||
obj_name
|
||||
),
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
};
|
||||
Ok((catalog_name, flow_name))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -124,7 +154,10 @@ mod test {
|
||||
("catalog.flow_name", ("catalog", "flow_name")),
|
||||
];
|
||||
for (input, expected) in testcases.iter() {
|
||||
let result = parse_catalog_flow(input, &QueryContext::arc()).unwrap();
|
||||
let args = vec![*input];
|
||||
let args = args.into_iter().map(ValueRef::String).collect::<Vec<_>>();
|
||||
|
||||
let result = parse_flush_flow(&args, &QueryContext::arc()).unwrap();
|
||||
assert_eq!(*expected, (result.0.as_str(), result.1.as_str()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,15 +87,6 @@ pub trait FlowServiceHandler: Send + Sync {
|
||||
flow: &str,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<api::v1::flow::FlowResponse>;
|
||||
|
||||
async fn adjust(
|
||||
&self,
|
||||
catalog: &str,
|
||||
flow: &str,
|
||||
min_run_interval_secs: u64,
|
||||
max_filter_num_per_query: usize,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<api::v1::flow::FlowResponse>;
|
||||
}
|
||||
|
||||
pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>;
|
||||
|
||||
@@ -12,15 +12,12 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_query::error::{ExecuteSnafu, InvalidFuncArgsSnafu, InvalidInputTypeSnafu, Result};
|
||||
use common_query::error::{InvalidInputTypeSnafu, Result};
|
||||
use common_query::prelude::{Signature, TypeSignature, Volatility};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::types::cast::cast;
|
||||
use datatypes::value::ValueRef;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ResultExt;
|
||||
use sql::parser::ParserContext;
|
||||
|
||||
/// Create a function signature with oneof signatures of interleaving two arguments.
|
||||
pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>) -> Signature {
|
||||
@@ -46,30 +43,3 @@ pub fn cast_u64(value: &ValueRef) -> Result<Option<u64>> {
|
||||
})
|
||||
.map(|v| v.as_u64())
|
||||
}
|
||||
|
||||
pub fn parse_catalog_flow(
|
||||
flow_name: &str,
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> Result<(String, String)> {
|
||||
let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect())
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExecuteSnafu)?;
|
||||
|
||||
let (catalog_name, flow_name) = match &obj_name.0[..] {
|
||||
[flow_name] => (
|
||||
query_ctx.current_catalog().to_string(),
|
||||
flow_name.value.clone(),
|
||||
),
|
||||
[catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
|
||||
_ => {
|
||||
return InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
|
||||
obj_name
|
||||
),
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
};
|
||||
Ok((catalog_name, flow_name))
|
||||
}
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
#![feature(let_chains)]
|
||||
#![feature(try_blocks)]
|
||||
|
||||
mod adjust_flow;
|
||||
mod admin;
|
||||
mod flush_flow;
|
||||
mod macros;
|
||||
|
||||
@@ -148,17 +148,6 @@ impl FunctionState {
|
||||
) -> Result<api::v1::flow::FlowResponse> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn adjust(
|
||||
&self,
|
||||
_catalog: &str,
|
||||
_flow: &str,
|
||||
_min_run_interval_secs: u64,
|
||||
_max_filter_num_per_query: usize,
|
||||
_ctx: QueryContextRef,
|
||||
) -> Result<api::v1::flow::FlowResponse> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
Self {
|
||||
|
||||
@@ -296,6 +296,8 @@ pub struct ChannelConfig {
|
||||
pub max_recv_message_size: ReadableSize,
|
||||
// Max gRPC sending(encoding) message size
|
||||
pub max_send_message_size: ReadableSize,
|
||||
pub send_compression: bool,
|
||||
pub accept_compression: bool,
|
||||
}
|
||||
|
||||
impl Default for ChannelConfig {
|
||||
@@ -316,6 +318,8 @@ impl Default for ChannelConfig {
|
||||
client_tls: None,
|
||||
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
|
||||
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
|
||||
send_compression: false,
|
||||
accept_compression: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -566,6 +570,8 @@ mod tests {
|
||||
client_tls: None,
|
||||
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
|
||||
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
|
||||
send_compression: false,
|
||||
accept_compression: false,
|
||||
},
|
||||
default_cfg
|
||||
);
|
||||
@@ -610,6 +616,8 @@ mod tests {
|
||||
}),
|
||||
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
|
||||
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
|
||||
send_compression: false,
|
||||
accept_compression: false,
|
||||
},
|
||||
cfg
|
||||
);
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
pub mod file;
|
||||
|
||||
use std::borrow::Cow;
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::Instant;
|
||||
@@ -271,6 +272,49 @@ impl MetadataSnapshotManager {
|
||||
|
||||
Ok((filename.to_string(), num_keyvalues as u64))
|
||||
}
|
||||
|
||||
fn format_output(key: Cow<'_, str>, value: Cow<'_, str>) -> String {
|
||||
format!("{} => {}", key, value)
|
||||
}
|
||||
|
||||
pub async fn info(
|
||||
object_store: &ObjectStore,
|
||||
file_path: &str,
|
||||
query_str: &str,
|
||||
limit: Option<usize>,
|
||||
) -> Result<Vec<String>> {
|
||||
let path = Path::new(file_path);
|
||||
|
||||
let file_name = path
|
||||
.file_name()
|
||||
.and_then(|s| s.to_str())
|
||||
.context(InvalidFilePathSnafu { file_path })?;
|
||||
|
||||
let filename = FileName::try_from(file_name)?;
|
||||
let data = object_store
|
||||
.read(file_path)
|
||||
.await
|
||||
.context(ReadObjectSnafu { file_path })?;
|
||||
let document = Document::from_slice(&filename.extension.format, &data.to_bytes())?;
|
||||
let metadata_content = document.into_metadata_content()?.values();
|
||||
let mut results = Vec::with_capacity(limit.unwrap_or(256));
|
||||
for kv in metadata_content {
|
||||
let key_str = String::from_utf8_lossy(&kv.key);
|
||||
if let Some(prefix) = query_str.strip_suffix('*') {
|
||||
if key_str.starts_with(prefix) {
|
||||
let value_str = String::from_utf8_lossy(&kv.value);
|
||||
results.push(Self::format_output(key_str, value_str));
|
||||
}
|
||||
} else if key_str == query_str {
|
||||
let value_str = String::from_utf8_lossy(&kv.value);
|
||||
results.push(Self::format_output(key_str, value_str));
|
||||
}
|
||||
if results.len() == limit.unwrap_or(usize::MAX) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(results)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -111,6 +111,11 @@ impl MetadataContent {
|
||||
pub fn into_iter(self) -> impl Iterator<Item = KeyValue> {
|
||||
self.values.into_iter()
|
||||
}
|
||||
|
||||
/// Returns the key-value pairs as a vector.
|
||||
pub fn values(self) -> Vec<KeyValue> {
|
||||
self.values
|
||||
}
|
||||
}
|
||||
|
||||
/// The key-value pair of the backup file.
|
||||
|
||||
@@ -61,7 +61,6 @@ prost.workspace = true
|
||||
query.workspace = true
|
||||
rand.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
servers.workspace = true
|
||||
session.workspace = true
|
||||
smallvec.workspace = true
|
||||
|
||||
@@ -316,7 +316,7 @@ impl StreamingEngine {
|
||||
);
|
||||
|
||||
METRIC_FLOW_ROWS
|
||||
.with_label_values(&["out"])
|
||||
.with_label_values(&["out-streaming"])
|
||||
.inc_by(total_rows as u64);
|
||||
|
||||
let now = self.tick_manager.tick();
|
||||
|
||||
@@ -18,7 +18,7 @@ use std::sync::atomic::AtomicBool;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::flow::{
|
||||
flow_request, AdjustFlow, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
|
||||
flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
|
||||
};
|
||||
use api::v1::region::InsertRequests;
|
||||
use catalog::CatalogManager;
|
||||
@@ -32,7 +32,6 @@ use common_telemetry::{error, info, trace, warn};
|
||||
use datatypes::value::Value;
|
||||
use futures::TryStreamExt;
|
||||
use itertools::Itertools;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use session::context::QueryContextBuilder;
|
||||
use snafu::{ensure, IntoError, OptionExt, ResultExt};
|
||||
use store_api::storage::{RegionId, TableId};
|
||||
@@ -47,7 +46,7 @@ use crate::error::{
|
||||
IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
|
||||
NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::metrics::METRIC_FLOW_TASK_COUNT;
|
||||
use crate::metrics::{METRIC_FLOW_ROWS, METRIC_FLOW_TASK_COUNT};
|
||||
use crate::repr::{self, DiffRow};
|
||||
use crate::{Error, FlowId};
|
||||
|
||||
@@ -690,6 +689,9 @@ impl FlowEngine for FlowDualEngine {
|
||||
let mut to_stream_engine = Vec::with_capacity(request.requests.len());
|
||||
let mut to_batch_engine = request.requests;
|
||||
|
||||
let mut batching_row_cnt = 0;
|
||||
let mut streaming_row_cnt = 0;
|
||||
|
||||
{
|
||||
// not locking this, or recover flows will be starved when also handling flow inserts
|
||||
let src_table2flow = self.src_table2flow.read().await;
|
||||
@@ -699,9 +701,11 @@ impl FlowEngine for FlowDualEngine {
|
||||
let is_in_stream = src_table2flow.in_stream(table_id);
|
||||
let is_in_batch = src_table2flow.in_batch(table_id);
|
||||
if is_in_stream {
|
||||
streaming_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
|
||||
to_stream_engine.push(req.clone());
|
||||
}
|
||||
if is_in_batch {
|
||||
batching_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
|
||||
return true;
|
||||
}
|
||||
if !is_in_batch && !is_in_stream {
|
||||
@@ -714,6 +718,14 @@ impl FlowEngine for FlowDualEngine {
|
||||
// can't use drop due to https://github.com/rust-lang/rust/pull/128846
|
||||
}
|
||||
|
||||
METRIC_FLOW_ROWS
|
||||
.with_label_values(&["in-streaming"])
|
||||
.inc_by(streaming_row_cnt as u64);
|
||||
|
||||
METRIC_FLOW_ROWS
|
||||
.with_label_values(&["in-batching"])
|
||||
.inc_by(batching_row_cnt as u64);
|
||||
|
||||
let streaming_engine = self.streaming_engine.clone();
|
||||
let stream_handler: JoinHandle<Result<(), Error>> =
|
||||
common_runtime::spawn_global(async move {
|
||||
@@ -810,25 +822,6 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
Some(flow_request::Body::Adjust(AdjustFlow { flow_id, options })) => {
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct Options {
|
||||
min_run_interval_secs: u64,
|
||||
max_filter_num_per_query: usize,
|
||||
}
|
||||
let options: Options = serde_json::from_str(&options).with_context(|_| {
|
||||
common_meta::error::DeserializeFromJsonSnafu { input: options }
|
||||
})?;
|
||||
self.batching_engine
|
||||
.adjust_flow(
|
||||
flow_id.unwrap().id as u64,
|
||||
options.min_run_interval_secs,
|
||||
options.max_filter_num_per_query,
|
||||
)
|
||||
.await
|
||||
.map_err(to_meta_err(snafu::location!()))?;
|
||||
Ok(Default::default())
|
||||
}
|
||||
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
|
||||
}
|
||||
}
|
||||
@@ -861,6 +854,93 @@ fn to_meta_err(
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl common_meta::node_manager::Flownode for StreamingEngine {
|
||||
async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
|
||||
let query_ctx = request
|
||||
.header
|
||||
.and_then(|h| h.query_context)
|
||||
.map(|ctx| ctx.into());
|
||||
match request.body {
|
||||
Some(flow_request::Body::Create(CreateRequest {
|
||||
flow_id: Some(task_id),
|
||||
source_table_ids,
|
||||
sink_table_name: Some(sink_table_name),
|
||||
create_if_not_exists,
|
||||
expire_after,
|
||||
comment,
|
||||
sql,
|
||||
flow_options,
|
||||
or_replace,
|
||||
})) => {
|
||||
let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
|
||||
let sink_table_name = [
|
||||
sink_table_name.catalog_name,
|
||||
sink_table_name.schema_name,
|
||||
sink_table_name.table_name,
|
||||
];
|
||||
let expire_after = expire_after.map(|e| e.value);
|
||||
let args = CreateFlowArgs {
|
||||
flow_id: task_id.id as u64,
|
||||
sink_table_name,
|
||||
source_table_ids,
|
||||
create_if_not_exists,
|
||||
or_replace,
|
||||
expire_after,
|
||||
comment: Some(comment),
|
||||
sql: sql.clone(),
|
||||
flow_options,
|
||||
query_ctx,
|
||||
};
|
||||
let ret = self
|
||||
.create_flow(args)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.with_context(|_| CreateFlowSnafu { sql: sql.clone() })
|
||||
.map_err(to_meta_err(snafu::location!()))?;
|
||||
METRIC_FLOW_TASK_COUNT.inc();
|
||||
Ok(FlowResponse {
|
||||
affected_flows: ret
|
||||
.map(|id| greptime_proto::v1::FlowId { id: id as u32 })
|
||||
.into_iter()
|
||||
.collect_vec(),
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
Some(flow_request::Body::Drop(DropRequest {
|
||||
flow_id: Some(flow_id),
|
||||
})) => {
|
||||
self.remove_flow(flow_id.id as u64)
|
||||
.await
|
||||
.map_err(to_meta_err(snafu::location!()))?;
|
||||
METRIC_FLOW_TASK_COUNT.dec();
|
||||
Ok(Default::default())
|
||||
}
|
||||
Some(flow_request::Body::Flush(FlushFlow {
|
||||
flow_id: Some(flow_id),
|
||||
})) => {
|
||||
let row = self
|
||||
.flush_flow_inner(flow_id.id as u64)
|
||||
.await
|
||||
.map_err(to_meta_err(snafu::location!()))?;
|
||||
Ok(FlowResponse {
|
||||
affected_flows: vec![flow_id],
|
||||
affected_rows: row as u64,
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_inserts(&self, request: InsertRequests) -> MetaResult<FlowResponse> {
|
||||
self.handle_inserts_inner(request)
|
||||
.await
|
||||
.map(|_| Default::default())
|
||||
.map_err(to_meta_err(snafu::location!()))
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowEngine for StreamingEngine {
|
||||
async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
|
||||
self.create_flow_inner(args).await
|
||||
|
||||
@@ -42,6 +42,7 @@ use crate::error::{
|
||||
ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
|
||||
UnexpectedSnafu, UnsupportedSnafu,
|
||||
};
|
||||
use crate::metrics::METRIC_FLOW_ROWS;
|
||||
use crate::{CreateFlowArgs, Error, FlowId, TableName};
|
||||
|
||||
/// Batching mode Engine, responsible for driving all the batching mode tasks
|
||||
@@ -155,6 +156,10 @@ impl BatchingEngine {
|
||||
let Some(expr) = &task.config.time_window_expr else {
|
||||
continue;
|
||||
};
|
||||
let row_cnt: usize = entry.iter().map(|rows| rows.rows.len()).sum();
|
||||
METRIC_FLOW_ROWS
|
||||
.with_label_values(&[&format!("{}-batching-in", task.config.flow_id)])
|
||||
.inc_by(row_cnt as u64);
|
||||
let involved_time_windows = expr.handle_rows(entry.clone()).await?;
|
||||
let mut state = task.state.write().unwrap();
|
||||
state
|
||||
@@ -388,20 +393,6 @@ impl BatchingEngine {
|
||||
pub async fn flow_exist_inner(&self, flow_id: FlowId) -> bool {
|
||||
self.tasks.read().await.contains_key(&flow_id)
|
||||
}
|
||||
|
||||
pub async fn adjust_flow(
|
||||
&self,
|
||||
flow_id: FlowId,
|
||||
min_run_interval_secs: u64,
|
||||
max_filter_num_per_query: usize,
|
||||
) -> Result<(), Error> {
|
||||
let task = self.tasks.read().await.get(&flow_id).cloned();
|
||||
let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?;
|
||||
debug!("Adjusting flow {flow_id} with min_run_interval_secs={} and max_filter_num_per_query={}", min_run_interval_secs, max_filter_num_per_query);
|
||||
task.adjust(min_run_interval_secs, max_filter_num_per_query);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowEngine for BatchingEngine {
|
||||
|
||||
@@ -14,9 +14,8 @@
|
||||
|
||||
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex, Weak};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::SystemTime;
|
||||
|
||||
use api::v1::greptime_request::Request;
|
||||
use api::v1::CreateTableExpr;
|
||||
@@ -27,21 +26,20 @@ use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::rpc::store::RangeRequest;
|
||||
use common_query::Output;
|
||||
use common_telemetry::{debug, warn};
|
||||
use itertools::Itertools;
|
||||
use common_telemetry::warn;
|
||||
use meta_client::client::MetaClient;
|
||||
use rand::rng;
|
||||
use rand::seq::SliceRandom;
|
||||
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::batching_mode::task::BatchingTask;
|
||||
use crate::batching_mode::{
|
||||
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT,
|
||||
GRPC_MAX_RETRIES,
|
||||
};
|
||||
use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
|
||||
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD;
|
||||
use crate::{Error, FlowAuthHeader, FlowId};
|
||||
use crate::{Error, FlowAuthHeader};
|
||||
|
||||
/// Just like [`GrpcQueryHandler`] but use BoxedError
|
||||
///
|
||||
@@ -76,105 +74,6 @@ impl<
|
||||
|
||||
type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
|
||||
|
||||
/// Statistics about running query on this frontend from flownode
|
||||
#[derive(Debug, Default, Clone)]
|
||||
struct FrontendStat {
|
||||
/// The query for flow id has been running since this timestamp
|
||||
since: HashMap<FlowId, Instant>,
|
||||
/// The average query time for each flow id
|
||||
/// This is used to calculate the average query time for each flow id
|
||||
past_query_avg: HashMap<FlowId, (usize, Duration)>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct FrontendStats {
|
||||
/// The statistics for each flow id
|
||||
stats: Arc<Mutex<HashMap<String, FrontendStat>>>,
|
||||
}
|
||||
|
||||
impl FrontendStats {
|
||||
pub fn observe(&self, frontend_addr: &str, flow_id: FlowId) -> FrontendStatsGuard {
|
||||
let mut stats = self.stats.lock().expect("Failed to lock frontend stats");
|
||||
let stat = stats.entry(frontend_addr.to_string()).or_default();
|
||||
stat.since.insert(flow_id, Instant::now());
|
||||
|
||||
FrontendStatsGuard {
|
||||
stats: self.stats.clone(),
|
||||
frontend_addr: frontend_addr.to_string(),
|
||||
cur: flow_id,
|
||||
}
|
||||
}
|
||||
|
||||
/// return frontend addrs sorted by load, from lightest to heaviest
|
||||
/// The load is calculated as the total average query time for each flow id plus running query's total running time elapsed
|
||||
pub fn sort_by_load(&self) -> Vec<String> {
|
||||
let stats = self.stats.lock().expect("Failed to lock frontend stats");
|
||||
let fe_load_factor = stats
|
||||
.iter()
|
||||
.map(|(node_addr, stat)| {
|
||||
// total expected avg running time for all currently running queries
|
||||
let total_expect_avg_run_time = stat
|
||||
.since
|
||||
.keys()
|
||||
.map(|f| {
|
||||
let (count, total_duration) =
|
||||
stat.past_query_avg.get(f).unwrap_or(&(0, Duration::ZERO));
|
||||
if *count == 0 {
|
||||
0.0
|
||||
} else {
|
||||
total_duration.as_secs_f64() / *count as f64
|
||||
}
|
||||
})
|
||||
.sum::<f64>();
|
||||
let total_cur_running_time = stat
|
||||
.since
|
||||
.values()
|
||||
.map(|since| since.elapsed().as_secs_f64())
|
||||
.sum::<f64>();
|
||||
(
|
||||
node_addr.to_string(),
|
||||
total_expect_avg_run_time + total_cur_running_time,
|
||||
)
|
||||
})
|
||||
.sorted_by(|(_, load_a), (_, load_b)| {
|
||||
load_a
|
||||
.partial_cmp(load_b)
|
||||
.unwrap_or(std::cmp::Ordering::Equal)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
debug!("Frontend load factor: {:?}", fe_load_factor);
|
||||
for (node_addr, load) in &fe_load_factor {
|
||||
METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD
|
||||
.with_label_values(&[&node_addr.to_string()])
|
||||
.observe(*load);
|
||||
}
|
||||
fe_load_factor
|
||||
.into_iter()
|
||||
.map(|(addr, _)| addr)
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FrontendStatsGuard {
|
||||
stats: Arc<Mutex<HashMap<String, FrontendStat>>>,
|
||||
frontend_addr: String,
|
||||
cur: FlowId,
|
||||
}
|
||||
|
||||
impl Drop for FrontendStatsGuard {
|
||||
fn drop(&mut self) {
|
||||
let mut stats = self.stats.lock().expect("Failed to lock frontend stats");
|
||||
if let Some(stat) = stats.get_mut(&self.frontend_addr) {
|
||||
if let Some(since) = stat.since.remove(&self.cur) {
|
||||
let elapsed = since.elapsed();
|
||||
let (count, total_duration) = stat.past_query_avg.entry(self.cur).or_default();
|
||||
*count += 1;
|
||||
*total_duration += elapsed;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A simple frontend client able to execute sql using grpc protocol
|
||||
///
|
||||
/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
|
||||
@@ -184,7 +83,6 @@ pub enum FrontendClient {
|
||||
meta_client: Arc<MetaClient>,
|
||||
chnl_mgr: ChannelManager,
|
||||
auth: Option<FlowAuthHeader>,
|
||||
fe_stats: FrontendStats,
|
||||
},
|
||||
Standalone {
|
||||
/// for the sake of simplicity still use grpc even in standalone mode
|
||||
@@ -216,7 +114,6 @@ impl FrontendClient {
|
||||
ChannelManager::with_config(cfg)
|
||||
},
|
||||
auth,
|
||||
fe_stats: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -286,7 +183,7 @@ impl FrontendClient {
|
||||
|
||||
/// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts`
|
||||
/// and is able to process query
|
||||
pub(crate) async fn get_random_active_frontend(
|
||||
async fn get_random_active_frontend(
|
||||
&self,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
@@ -295,7 +192,6 @@ impl FrontendClient {
|
||||
meta_client: _,
|
||||
chnl_mgr,
|
||||
auth,
|
||||
fe_stats,
|
||||
} = self
|
||||
else {
|
||||
return UnexpectedSnafu {
|
||||
@@ -312,21 +208,8 @@ impl FrontendClient {
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis() as i64;
|
||||
let node_addrs_by_load = fe_stats.sort_by_load();
|
||||
// index+1 to load order asc, so that the lightest node has load 1 and non-existent node has load 0
|
||||
let addr2load = node_addrs_by_load
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, id)| (id.clone(), i + 1))
|
||||
.collect::<HashMap<_, _>>();
|
||||
// sort frontends by load, from lightest to heaviest
|
||||
frontends.sort_by(|(_, a), (_, b)| {
|
||||
// if not even in stats, treat as 0 load since never been queried
|
||||
let load_a = addr2load.get(&a.peer.addr).unwrap_or(&0);
|
||||
let load_b = addr2load.get(&b.peer.addr).unwrap_or(&0);
|
||||
load_a.cmp(load_b)
|
||||
});
|
||||
debug!("Frontend nodes sorted by load: {:?}", frontends);
|
||||
// shuffle the frontends to avoid always pick the same one
|
||||
frontends.shuffle(&mut rng());
|
||||
|
||||
// found node with maximum last_activity_ts
|
||||
for (_, node_info) in frontends
|
||||
@@ -374,7 +257,6 @@ impl FrontendClient {
|
||||
create: CreateTableExpr,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
task: Option<&BatchingTask>,
|
||||
) -> Result<u32, Error> {
|
||||
self.handle(
|
||||
Request::Ddl(api::v1::DdlRequest {
|
||||
@@ -382,8 +264,7 @@ impl FrontendClient {
|
||||
}),
|
||||
catalog,
|
||||
schema,
|
||||
None,
|
||||
task,
|
||||
&mut None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -394,31 +275,15 @@ impl FrontendClient {
|
||||
req: api::v1::greptime_request::Request,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
use_peer: Option<Peer>,
|
||||
task: Option<&BatchingTask>,
|
||||
peer_desc: &mut Option<PeerDesc>,
|
||||
) -> Result<u32, Error> {
|
||||
match self {
|
||||
FrontendClient::Distributed {
|
||||
fe_stats, chnl_mgr, ..
|
||||
} => {
|
||||
let db = if let Some(peer) = use_peer {
|
||||
DatabaseWithPeer::new(
|
||||
Database::new(
|
||||
catalog,
|
||||
schema,
|
||||
Client::with_manager_and_urls(
|
||||
chnl_mgr.clone(),
|
||||
vec![peer.addr.clone()],
|
||||
),
|
||||
),
|
||||
peer,
|
||||
)
|
||||
} else {
|
||||
self.get_random_active_frontend(catalog, schema).await?
|
||||
};
|
||||
FrontendClient::Distributed { .. } => {
|
||||
let db = self.get_random_active_frontend(catalog, schema).await?;
|
||||
|
||||
let flow_id = task.map(|t| t.config.flow_id).unwrap_or_default();
|
||||
let _guard = fe_stats.observe(&db.peer.addr, flow_id);
|
||||
*peer_desc = Some(PeerDesc::Dist {
|
||||
peer: db.peer.clone(),
|
||||
});
|
||||
|
||||
db.database
|
||||
.handle_with_retry(req.clone(), GRPC_MAX_RETRIES)
|
||||
|
||||
@@ -32,7 +32,7 @@ use crate::batching_mode::MIN_REFRESH_DURATION;
|
||||
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
|
||||
use crate::metrics::{
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
|
||||
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT,
|
||||
METRIC_FLOW_BATCHING_ENGINE_WAIT_TIME,
|
||||
};
|
||||
use crate::{Error, FlowId};
|
||||
|
||||
@@ -53,13 +53,6 @@ pub struct TaskState {
|
||||
pub(crate) shutdown_rx: oneshot::Receiver<()>,
|
||||
/// Task handle
|
||||
pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
|
||||
/// Slow Query metrics update task handle
|
||||
pub(crate) slow_query_metric_task: Option<tokio::task::JoinHandle<()>>,
|
||||
|
||||
/// min run interval in seconds
|
||||
pub(crate) min_run_interval: Option<u64>,
|
||||
/// max filter number per query
|
||||
pub(crate) max_filter_num: Option<usize>,
|
||||
}
|
||||
impl TaskState {
|
||||
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
|
||||
@@ -71,9 +64,6 @@ impl TaskState {
|
||||
exec_state: ExecState::Idle,
|
||||
shutdown_rx,
|
||||
task_handle: None,
|
||||
slow_query_metric_task: None,
|
||||
min_run_interval: None,
|
||||
max_filter_num: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -98,20 +88,34 @@ impl TaskState {
|
||||
pub fn get_next_start_query_time(
|
||||
&self,
|
||||
flow_id: FlowId,
|
||||
_time_window_size: &Option<Duration>,
|
||||
time_window_size: &Option<Duration>,
|
||||
max_timeout: Option<Duration>,
|
||||
) -> Instant {
|
||||
let next_duration = max_timeout
|
||||
let last_duration = max_timeout
|
||||
.unwrap_or(self.last_query_duration)
|
||||
.min(self.last_query_duration)
|
||||
.max(
|
||||
self.min_run_interval
|
||||
.map(Duration::from_secs)
|
||||
.unwrap_or(MIN_REFRESH_DURATION),
|
||||
);
|
||||
.max(MIN_REFRESH_DURATION);
|
||||
|
||||
let next_duration = time_window_size
|
||||
.map(|t| {
|
||||
let half = t / 2;
|
||||
half.max(last_duration)
|
||||
})
|
||||
.unwrap_or(last_duration);
|
||||
|
||||
METRIC_FLOW_BATCHING_ENGINE_WAIT_TIME
|
||||
.with_label_values(&[
|
||||
flow_id.to_string().as_str(),
|
||||
time_window_size
|
||||
.unwrap_or_default()
|
||||
.as_secs_f64()
|
||||
.to_string()
|
||||
.as_str(),
|
||||
])
|
||||
.observe(next_duration.as_secs_f64());
|
||||
|
||||
// if have dirty time window, execute immediately to clean dirty time window
|
||||
if self.dirty_time_windows.windows.is_empty() {
|
||||
/*if self.dirty_time_windows.windows.is_empty() {
|
||||
self.last_update_time + next_duration
|
||||
} else {
|
||||
debug!(
|
||||
@@ -121,7 +125,9 @@ impl TaskState {
|
||||
self.dirty_time_windows.windows
|
||||
);
|
||||
Instant::now()
|
||||
}
|
||||
}*/
|
||||
|
||||
self.last_update_time + next_duration
|
||||
}
|
||||
}
|
||||
|
||||
@@ -214,69 +220,47 @@ impl DirtyTimeWindows {
|
||||
|
||||
// get the first `window_cnt` time windows
|
||||
let max_time_range = window_size * window_cnt as i32;
|
||||
|
||||
let mut to_be_query = BTreeMap::new();
|
||||
let mut new_windows = self.windows.clone();
|
||||
let mut cur_time_range = chrono::Duration::zero();
|
||||
for (idx, (start, end)) in self.windows.iter().enumerate() {
|
||||
let first_end = start
|
||||
.add_duration(window_size.to_std().unwrap())
|
||||
.context(TimeSnafu)?;
|
||||
let end = end.unwrap_or(first_end);
|
||||
|
||||
// if time range is too long, stop
|
||||
if cur_time_range >= max_time_range {
|
||||
break;
|
||||
}
|
||||
|
||||
// if we have enough time windows, stop
|
||||
if idx >= window_cnt {
|
||||
break;
|
||||
}
|
||||
|
||||
if let Some(x) = end.sub(start) {
|
||||
if cur_time_range + x <= max_time_range {
|
||||
to_be_query.insert(*start, Some(end));
|
||||
new_windows.remove(start);
|
||||
cur_time_range += x;
|
||||
} else {
|
||||
// too large a window, split it
|
||||
// split at window_size * times
|
||||
let surplus = max_time_range - cur_time_range;
|
||||
let times = surplus.num_seconds() / window_size.num_seconds();
|
||||
|
||||
let split_offset = window_size * times as i32;
|
||||
let split_at = start
|
||||
.add_duration(split_offset.to_std().unwrap())
|
||||
.context(TimeSnafu)?;
|
||||
to_be_query.insert(*start, Some(split_at));
|
||||
|
||||
// remove the original window
|
||||
new_windows.remove(start);
|
||||
new_windows.insert(split_at, Some(end));
|
||||
cur_time_range += split_offset;
|
||||
let nth = {
|
||||
let mut cur_time_range = chrono::Duration::zero();
|
||||
let mut nth_key = None;
|
||||
for (idx, (start, end)) in self.windows.iter().enumerate() {
|
||||
// if time range is too long, stop
|
||||
if cur_time_range > max_time_range {
|
||||
nth_key = Some(*start);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.windows = new_windows;
|
||||
// if we have enough time windows, stop
|
||||
if idx >= window_cnt {
|
||||
nth_key = Some(*start);
|
||||
break;
|
||||
}
|
||||
|
||||
if let Some(end) = end {
|
||||
if let Some(x) = end.sub(start) {
|
||||
cur_time_range += x;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
nth_key
|
||||
};
|
||||
let first_nth = {
|
||||
if let Some(nth) = nth {
|
||||
let mut after = self.windows.split_off(&nth);
|
||||
std::mem::swap(&mut self.windows, &mut after);
|
||||
|
||||
after
|
||||
} else {
|
||||
std::mem::take(&mut self.windows)
|
||||
}
|
||||
};
|
||||
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
|
||||
.with_label_values(&[
|
||||
flow_id.to_string().as_str(),
|
||||
format!("{}", window_size).as_str(),
|
||||
])
|
||||
.observe(to_be_query.len() as f64);
|
||||
.with_label_values(&[flow_id.to_string().as_str()])
|
||||
.observe(first_nth.len() as f64);
|
||||
|
||||
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT
|
||||
.with_label_values(&[
|
||||
flow_id.to_string().as_str(),
|
||||
format!("{}", window_size).as_str(),
|
||||
])
|
||||
.observe(self.windows.len() as f64);
|
||||
|
||||
let full_time_range = to_be_query
|
||||
let full_time_range = first_nth
|
||||
.iter()
|
||||
.fold(chrono::Duration::zero(), |acc, (start, end)| {
|
||||
if let Some(end) = end {
|
||||
@@ -287,14 +271,11 @@ impl DirtyTimeWindows {
|
||||
})
|
||||
.num_seconds() as f64;
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE
|
||||
.with_label_values(&[
|
||||
flow_id.to_string().as_str(),
|
||||
format!("{}", window_size).as_str(),
|
||||
])
|
||||
.with_label_values(&[flow_id.to_string().as_str()])
|
||||
.observe(full_time_range);
|
||||
|
||||
let mut expr_lst = vec![];
|
||||
for (start, end) in to_be_query.into_iter() {
|
||||
for (start, end) in first_nth.into_iter() {
|
||||
// align using time window exprs
|
||||
let (start, end) = if let Some(ctx) = task_ctx {
|
||||
let Some(time_window_expr) = &ctx.config.time_window_expr else {
|
||||
@@ -528,64 +509,6 @@ mod test {
|
||||
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
|
||||
)
|
||||
),
|
||||
// split range
|
||||
(
|
||||
Vec::from_iter((0..20).map(|i|Timestamp::new_second(i*3)).chain(std::iter::once(
|
||||
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
|
||||
))),
|
||||
(chrono::Duration::seconds(3), None),
|
||||
BTreeMap::from([
|
||||
(
|
||||
Timestamp::new_second(0),
|
||||
Some(Timestamp::new_second(
|
||||
60
|
||||
)),
|
||||
),
|
||||
(
|
||||
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
|
||||
Some(Timestamp::new_second(
|
||||
60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1) + 3
|
||||
)),
|
||||
)]),
|
||||
Some(
|
||||
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
|
||||
)
|
||||
),
|
||||
// split 2 min into 1 min
|
||||
(
|
||||
Vec::from_iter((0..40).map(|i|Timestamp::new_second(i*3))),
|
||||
(chrono::Duration::seconds(3), None),
|
||||
BTreeMap::from([
|
||||
(
|
||||
Timestamp::new_second(0),
|
||||
Some(Timestamp::new_second(
|
||||
40 * 3
|
||||
)),
|
||||
)]),
|
||||
Some(
|
||||
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
|
||||
)
|
||||
),
|
||||
// split 3s + 1min into 3s + 57s
|
||||
(
|
||||
Vec::from_iter(std::iter::once(Timestamp::new_second(0)).chain((0..40).map(|i|Timestamp::new_second(20+i*3)))),
|
||||
(chrono::Duration::seconds(3), None),
|
||||
BTreeMap::from([
|
||||
(
|
||||
Timestamp::new_second(0),
|
||||
Some(Timestamp::new_second(
|
||||
3
|
||||
)),
|
||||
),(
|
||||
Timestamp::new_second(20),
|
||||
Some(Timestamp::new_second(
|
||||
140
|
||||
)),
|
||||
)]),
|
||||
Some(
|
||||
"(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:03' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:00:20' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:17' AS TIMESTAMP))))",
|
||||
)
|
||||
),
|
||||
// expired
|
||||
(
|
||||
vec![
|
||||
@@ -602,8 +525,6 @@ mod test {
|
||||
None
|
||||
),
|
||||
];
|
||||
// let len = testcases.len();
|
||||
// let testcases = testcases[(len - 2)..(len - 1)].to_vec();
|
||||
for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
|
||||
testcases
|
||||
{
|
||||
|
||||
@@ -61,8 +61,9 @@ use crate::error::{
|
||||
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::metrics::{
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT,
|
||||
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
|
||||
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
|
||||
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
|
||||
METRIC_FLOW_ROWS,
|
||||
};
|
||||
use crate::{Error, FlowId};
|
||||
|
||||
@@ -82,14 +83,6 @@ pub struct TaskConfig {
|
||||
query_type: QueryType,
|
||||
}
|
||||
|
||||
impl TaskConfig {
|
||||
pub fn time_window_size(&self) -> Option<Duration> {
|
||||
self.time_window_expr
|
||||
.as_ref()
|
||||
.and_then(|expr| *expr.time_window_size())
|
||||
}
|
||||
}
|
||||
|
||||
fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
|
||||
let stmts =
|
||||
ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default())
|
||||
@@ -153,12 +146,6 @@ impl BatchingTask {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn adjust(&self, min_run_interval_secs: u64, max_filter_num_per_query: usize) {
|
||||
let mut state = self.state.write().unwrap();
|
||||
state.min_run_interval = Some(min_run_interval_secs);
|
||||
state.max_filter_num = Some(max_filter_num_per_query);
|
||||
}
|
||||
|
||||
/// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
|
||||
///
|
||||
/// useful for flush_flow to flush dirty time windows range
|
||||
@@ -295,7 +282,7 @@ impl BatchingTask {
|
||||
let catalog = &self.config.sink_table_name[0];
|
||||
let schema = &self.config.sink_table_name[1];
|
||||
frontend_client
|
||||
.create(expr.clone(), catalog, schema, Some(self))
|
||||
.create(expr.clone(), catalog, schema)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -343,53 +330,11 @@ impl BatchingTask {
|
||||
})?;
|
||||
|
||||
let plan = expanded_plan;
|
||||
|
||||
let db = frontend_client
|
||||
.get_random_active_frontend(catalog, schema)
|
||||
.await?;
|
||||
let peer_desc = db.peer.clone();
|
||||
|
||||
let (tx, mut rx) = oneshot::channel();
|
||||
let peer_inner = peer_desc.clone();
|
||||
let window_size_pretty = format!(
|
||||
"{}s",
|
||||
self.config.time_window_size().unwrap_or_default().as_secs()
|
||||
);
|
||||
let inner_window_size_pretty = window_size_pretty.clone();
|
||||
let flow_id = self.config.flow_id;
|
||||
let slow_query_metric_task = tokio::task::spawn(async move {
|
||||
tokio::time::sleep(SLOW_QUERY_THRESHOLD).await;
|
||||
METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT
|
||||
.with_label_values(&[
|
||||
flow_id.to_string().as_str(),
|
||||
&peer_inner.to_string(),
|
||||
inner_window_size_pretty.as_str(),
|
||||
])
|
||||
.add(1.0);
|
||||
while rx.try_recv() == Err(TryRecvError::Empty) {
|
||||
// sleep for a while before next update
|
||||
tokio::time::sleep(MIN_REFRESH_DURATION).await;
|
||||
}
|
||||
METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT
|
||||
.with_label_values(&[
|
||||
flow_id.to_string().as_str(),
|
||||
&peer_inner.to_string(),
|
||||
inner_window_size_pretty.as_str(),
|
||||
])
|
||||
.sub(1.0);
|
||||
});
|
||||
self.state.write().unwrap().slow_query_metric_task = Some(slow_query_metric_task);
|
||||
let mut peer_desc = None;
|
||||
|
||||
let res = {
|
||||
let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
|
||||
.with_label_values(&[
|
||||
flow_id.to_string().as_str(),
|
||||
format!(
|
||||
"{}s",
|
||||
self.config.time_window_size().unwrap_or_default().as_secs()
|
||||
)
|
||||
.as_str(),
|
||||
])
|
||||
.with_label_values(&[flow_id.to_string().as_str()])
|
||||
.start_timer();
|
||||
|
||||
// hack and special handling the insert logical plan
|
||||
@@ -418,18 +363,19 @@ impl BatchingTask {
|
||||
};
|
||||
|
||||
frontend_client
|
||||
.handle(req, catalog, schema, Some(db.peer), Some(self))
|
||||
.handle(req, catalog, schema, &mut peer_desc)
|
||||
.await
|
||||
};
|
||||
|
||||
// signaling the slow query metric task to stop
|
||||
let _ = tx.send(());
|
||||
let elapsed = instant.elapsed();
|
||||
if let Ok(affected_rows) = &res {
|
||||
debug!(
|
||||
"Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
|
||||
elapsed
|
||||
);
|
||||
METRIC_FLOW_ROWS
|
||||
.with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
|
||||
.inc_by(*affected_rows as _);
|
||||
} else if let Err(err) = &res {
|
||||
warn!(
|
||||
"Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
|
||||
@@ -446,12 +392,7 @@ impl BatchingTask {
|
||||
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
|
||||
.with_label_values(&[
|
||||
flow_id.to_string().as_str(),
|
||||
&peer_desc.to_string(),
|
||||
format!(
|
||||
"{}s",
|
||||
self.config.time_window_size().unwrap_or_default().as_secs()
|
||||
)
|
||||
.as_str(),
|
||||
&peer_desc.unwrap_or_default().to_string(),
|
||||
])
|
||||
.observe(elapsed.as_secs_f64());
|
||||
}
|
||||
@@ -474,6 +415,7 @@ impl BatchingTask {
|
||||
engine: QueryEngineRef,
|
||||
frontend_client: Arc<FrontendClient>,
|
||||
) {
|
||||
let flow_id_str = self.config.flow_id.to_string();
|
||||
loop {
|
||||
// first check if shutdown signal is received
|
||||
// if so, break the loop
|
||||
@@ -491,6 +433,9 @@ impl BatchingTask {
|
||||
Err(TryRecvError::Empty) => (),
|
||||
}
|
||||
}
|
||||
METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
|
||||
.with_label_values(&[&flow_id_str])
|
||||
.inc();
|
||||
|
||||
let new_query = match self.gen_insert_plan(&engine).await {
|
||||
Ok(new_query) => new_query,
|
||||
@@ -537,6 +482,9 @@ impl BatchingTask {
|
||||
}
|
||||
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
|
||||
Err(err) => {
|
||||
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
|
||||
.with_label_values(&[&flow_id_str])
|
||||
.inc();
|
||||
match new_query {
|
||||
Some(query) => {
|
||||
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)
|
||||
@@ -644,20 +592,19 @@ impl BatchingTask {
|
||||
),
|
||||
})?;
|
||||
|
||||
let expr = {
|
||||
let mut state = self.state.write().unwrap();
|
||||
let max_window_cnt = state
|
||||
.max_filter_num
|
||||
.unwrap_or(DirtyTimeWindows::MAX_FILTER_NUM);
|
||||
state.dirty_time_windows.gen_filter_exprs(
|
||||
let expr = self
|
||||
.state
|
||||
.write()
|
||||
.unwrap()
|
||||
.dirty_time_windows
|
||||
.gen_filter_exprs(
|
||||
&col_name,
|
||||
Some(l),
|
||||
window_size,
|
||||
max_window_cnt,
|
||||
DirtyTimeWindows::MAX_FILTER_NUM,
|
||||
self.config.flow_id,
|
||||
Some(self),
|
||||
)?
|
||||
};
|
||||
)?;
|
||||
|
||||
debug!(
|
||||
"Flow id={:?}, Generated filter expr: {:?}",
|
||||
|
||||
@@ -31,37 +31,29 @@ lazy_static! {
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME: HistogramVec = register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_query_time_secs",
|
||||
"flow batching engine query time(seconds)",
|
||||
&["flow_id", "time_window_granularity"],
|
||||
&["flow_id"],
|
||||
vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_WAIT_TIME: HistogramVec = register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_wait_time_secs",
|
||||
"flow batching engine wait time between query(seconds)",
|
||||
&["flow_id", "time_window_size"],
|
||||
vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_slow_query_secs",
|
||||
"flow batching engine slow query(seconds), updated after query finished",
|
||||
&["flow_id", "peer", "time_window_granularity"],
|
||||
"flow batching engine slow query(seconds)",
|
||||
&["flow_id", "peer"],
|
||||
vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT: GaugeVec =
|
||||
register_gauge_vec!(
|
||||
"greptime_flow_batching_engine_real_time_slow_query_number",
|
||||
"flow batching engine real time slow query number, updated in real time",
|
||||
&["flow_id", "peer", "time_window_granularity"],
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT: HistogramVec =
|
||||
register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_stalled_query_window_cnt",
|
||||
"flow batching engine stalled query time window count",
|
||||
&["flow_id", "time_window_granularity"],
|
||||
vec![0.0, 5., 10., 20., 40.]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT: HistogramVec =
|
||||
register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_query_window_cnt",
|
||||
"flow batching engine query time window count",
|
||||
&["flow_id", "time_window_granularity"],
|
||||
&["flow_id"],
|
||||
vec![0.0, 5., 10., 20., 40.]
|
||||
)
|
||||
.unwrap();
|
||||
@@ -69,16 +61,22 @@ lazy_static! {
|
||||
register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_query_time_range_secs",
|
||||
"flow batching engine query time range(seconds)",
|
||||
&["flow_id", "time_window_granularity"],
|
||||
&["flow_id"],
|
||||
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD: HistogramVec =
|
||||
register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_guess_fe_load",
|
||||
"flow batching engine guessed frontend load",
|
||||
&["fe_addr"],
|
||||
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT: IntCounterVec =
|
||||
register_int_counter_vec!(
|
||||
"greptime_flow_batching_start_query_count",
|
||||
"flow batching engine started query count",
|
||||
&["flow_id"],
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT: IntCounterVec =
|
||||
register_int_counter_vec!(
|
||||
"greptime_flow_batching_error_count",
|
||||
"flow batching engine error count per flow id",
|
||||
&["flow_id"],
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::flow::{AdjustFlow, FlowRequestHeader};
|
||||
use api::v1::flow::FlowRequestHeader;
|
||||
use async_trait::async_trait;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_function::handlers::FlowServiceHandler;
|
||||
@@ -22,7 +22,6 @@ use common_query::error::Result;
|
||||
use common_telemetry::tracing_context::TracingContext;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::StreamExt;
|
||||
use serde_json::json;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
@@ -58,96 +57,9 @@ impl FlowServiceHandler for FlowServiceOperator {
|
||||
) -> Result<api::v1::flow::FlowResponse> {
|
||||
self.flush_inner(catalog, flow, ctx).await
|
||||
}
|
||||
|
||||
async fn adjust(
|
||||
&self,
|
||||
catalog: &str,
|
||||
flow: &str,
|
||||
min_run_interval_secs: u64,
|
||||
max_filter_num_per_query: usize,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<api::v1::flow::FlowResponse> {
|
||||
self.adjust_inner(
|
||||
catalog,
|
||||
flow,
|
||||
min_run_interval_secs,
|
||||
max_filter_num_per_query,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowServiceOperator {
|
||||
async fn adjust_inner(
|
||||
&self,
|
||||
catalog: &str,
|
||||
flow: &str,
|
||||
min_run_interval_secs: u64,
|
||||
max_filter_num_per_query: usize,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<api::v1::flow::FlowResponse> {
|
||||
let id = self
|
||||
.flow_metadata_manager
|
||||
.flow_name_manager()
|
||||
.get(catalog, flow)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_query::error::ExecuteSnafu)?
|
||||
.context(common_meta::error::FlowNotFoundSnafu {
|
||||
flow_name: format!("{}.{}", catalog, flow),
|
||||
})
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_query::error::ExecuteSnafu)?
|
||||
.flow_id();
|
||||
|
||||
let all_flownode_peers = self
|
||||
.flow_metadata_manager
|
||||
.flow_route_manager()
|
||||
.routes(id)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_query::error::ExecuteSnafu)?;
|
||||
|
||||
// order of flownodes doesn't matter here
|
||||
let all_flow_nodes = FuturesUnordered::from_iter(
|
||||
all_flownode_peers
|
||||
.iter()
|
||||
.map(|(_key, peer)| self.node_manager.flownode(peer.peer())),
|
||||
)
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
|
||||
// TODO(discord9): use proper type for flow options
|
||||
let options = json!({
|
||||
"min_run_interval_secs": min_run_interval_secs,
|
||||
"max_filter_num_per_query": max_filter_num_per_query,
|
||||
});
|
||||
|
||||
for node in all_flow_nodes {
|
||||
let _res = {
|
||||
use api::v1::flow::{flow_request, FlowRequest};
|
||||
let flush_req = FlowRequest {
|
||||
header: Some(FlowRequestHeader {
|
||||
tracing_context: TracingContext::from_current_span().to_w3c(),
|
||||
query_context: Some(
|
||||
common_meta::rpc::ddl::QueryContext::from(ctx.clone()).into(),
|
||||
),
|
||||
}),
|
||||
body: Some(flow_request::Body::Adjust(AdjustFlow {
|
||||
flow_id: Some(api::v1::FlowId { id }),
|
||||
options: options.to_string(),
|
||||
})),
|
||||
};
|
||||
node.handle(flush_req)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_query::error::ExecuteSnafu)?
|
||||
};
|
||||
}
|
||||
Ok(Default::default())
|
||||
}
|
||||
|
||||
/// Flush the flownodes according to the flow id.
|
||||
async fn flush_inner(
|
||||
&self,
|
||||
|
||||
@@ -13,20 +13,17 @@
|
||||
// limitations under the License.
|
||||
|
||||
use clap::Parser;
|
||||
use cli::{
|
||||
BenchTableMetadataCommand, ExportCommand, ImportCommand, MetaRestoreCommand,
|
||||
MetaSnapshotCommand, Tool,
|
||||
};
|
||||
use cli::{BenchTableMetadataCommand, DataCommand, MetaCommand, Tool};
|
||||
use common_error::ext::BoxedError;
|
||||
|
||||
#[derive(Parser)]
|
||||
pub enum SubCommand {
|
||||
// Attach(AttachCommand),
|
||||
Bench(BenchTableMetadataCommand),
|
||||
Export(ExportCommand),
|
||||
Import(ImportCommand),
|
||||
MetaSnapshot(MetaSnapshotCommand),
|
||||
MetaRestore(MetaRestoreCommand),
|
||||
#[clap(subcommand)]
|
||||
Data(DataCommand),
|
||||
#[clap(subcommand)]
|
||||
Meta(MetaCommand),
|
||||
}
|
||||
|
||||
impl SubCommand {
|
||||
@@ -34,10 +31,8 @@ impl SubCommand {
|
||||
match self {
|
||||
// SubCommand::Attach(cmd) => cmd.build().await,
|
||||
SubCommand::Bench(cmd) => cmd.build().await,
|
||||
SubCommand::Export(cmd) => cmd.build().await,
|
||||
SubCommand::Import(cmd) => cmd.build().await,
|
||||
SubCommand::MetaSnapshot(cmd) => cmd.build().await,
|
||||
SubCommand::MetaRestore(cmd) => cmd.build().await,
|
||||
SubCommand::Data(cmd) => cmd.build().await,
|
||||
SubCommand::Meta(cmd) => cmd.build().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2444,7 +2444,7 @@ impl PromPlanner {
|
||||
LogicalPlanBuilder::from(left)
|
||||
.alias(left_table_ref)
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.join(
|
||||
.join_detailed(
|
||||
right,
|
||||
JoinType::Inner,
|
||||
(
|
||||
@@ -2458,6 +2458,7 @@ impl PromPlanner {
|
||||
.collect::<Vec<_>>(),
|
||||
),
|
||||
None,
|
||||
true,
|
||||
)
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.build()
|
||||
|
||||
@@ -139,11 +139,11 @@ impl GrpcOptions {
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum FlightCompression {
|
||||
/// Disable all compression in Arrow Flight service.
|
||||
#[default]
|
||||
None,
|
||||
/// Enable only transport layer compression (zstd).
|
||||
Transport,
|
||||
/// Enable only payload compression (lz4)
|
||||
#[default]
|
||||
ArrowIpc,
|
||||
/// Enable all compression.
|
||||
All,
|
||||
|
||||
@@ -675,11 +675,16 @@ insert into cache_miss_with_null_label values
|
||||
Affected Rows: 4
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
-- null!=null, so it will returns the empty set.
|
||||
tql eval (3, 4, '1s') cache_hit_with_null_label / (cache_miss_with_null_label + cache_hit_with_null_label);
|
||||
|
||||
++
|
||||
++
|
||||
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
|
||||
| job | null_label | ts | lhs.greptime_value / rhs.cache_miss_with_null_label.greptime_value + cache_hit_with_null_label.greptime_value |
|
||||
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
|
||||
| read | | 1970-01-01T00:00:03 | 0.5 |
|
||||
| read | | 1970-01-01T00:00:04 | 0.75 |
|
||||
| write | | 1970-01-01T00:00:03 | 0.5 |
|
||||
| write | | 1970-01-01T00:00:04 | 0.6666666666666666 |
|
||||
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') cache_hit_with_null_label / ignoring(null_label) (cache_miss_with_null_label + ignoring(null_label) cache_hit_with_null_label);
|
||||
|
||||
@@ -325,7 +325,6 @@ insert into cache_miss_with_null_label values
|
||||
(4000, "write", null, 2.0);
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
-- null!=null, so it will returns the empty set.
|
||||
tql eval (3, 4, '1s') cache_hit_with_null_label / (cache_miss_with_null_label + cache_hit_with_null_label);
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
|
||||
Reference in New Issue
Block a user