Compare commits

..

6 Commits

Author SHA1 Message Date
discord9
c8fde8112a metrics: more generic metrics batching mode 2025-06-11 02:10:25 +08:00
Lei, HUANG
4534e4c31d chore/enable-flight-encoder:
### Add Flight Compression Support

 - **Configuration Updates**:
   - Added `grpc.flight_compression` option to `config/config.md`, `config/datanode.example.toml`, and `config/frontend.example.toml` to specify compression modes for Arrow IPC service.

 - **Code Enhancements**:
   - Updated `FlightEncoder` in `src/common/grpc/src/flight.rs` to support compression modes.
   - Modified `RegionServer` and `DatanodeBuilder` in `src/datanode/src/datanode.rs` and `src/datanode/src/region_server.rs` to handle `FlightCompression`.
   - Integrated `FlightCompression` in `src/servers/src/grpc.rs` and `src/servers/src/grpc/flight.rs` to manage compression settings.

 - **Testing and Integration**:
   - Updated test utilities and integration tests in `tests-integration/src/grpc/flight.rs` and `tests-integration/src/test_util.rs` to include `FlightCompression`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-10 23:15:01 +08:00
discord9
3d8278dc4c feat: truly limit time range by split window 2025-06-10 23:15:01 +08:00
discord9
e4328380b2 feat: better metrics 2025-06-10 23:15:01 +08:00
discord9
e962076207 feat(exp): adjust_flow admin function 2025-06-10 23:15:01 +08:00
discord9
9ef8ba6460 feat: flownode to frontend load balance with guess 2025-06-10 23:15:01 +08:00
43 changed files with 239 additions and 821 deletions

View File

@@ -30,7 +30,7 @@ update_helm_charts_version() {
# Commit the changes. # Commit the changes.
git add . git add .
git commit -s -m "chore: Update GreptimeDB version to ${VERSION}" git commit -m "chore: Update GreptimeDB version to ${VERSION}"
git push origin $BRANCH_NAME git push origin $BRANCH_NAME
# Create a Pull Request. # Create a Pull Request.

View File

@@ -26,7 +26,7 @@ update_homebrew_greptime_version() {
# Commit the changes. # Commit the changes.
git add . git add .
git commit -s -m "chore: Update GreptimeDB version to ${VERSION}" git commit -m "chore: Update GreptimeDB version to ${VERSION}"
git push origin $BRANCH_NAME git push origin $BRANCH_NAME
# Create a Pull Request. # Create a Pull Request.

50
Cargo.lock generated
View File

@@ -3252,7 +3252,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion" name = "datafusion"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"arrow 54.2.1", "arrow 54.2.1",
"arrow-array 54.2.1", "arrow-array 54.2.1",
@@ -3303,7 +3303,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-catalog" name = "datafusion-catalog"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"arrow 54.2.1", "arrow 54.2.1",
"async-trait", "async-trait",
@@ -3323,7 +3323,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-catalog-listing" name = "datafusion-catalog-listing"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"arrow 54.2.1", "arrow 54.2.1",
"arrow-schema 54.3.1", "arrow-schema 54.3.1",
@@ -3346,7 +3346,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-common" name = "datafusion-common"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"ahash 0.8.11", "ahash 0.8.11",
"arrow 54.2.1", "arrow 54.2.1",
@@ -3371,7 +3371,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-common-runtime" name = "datafusion-common-runtime"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"log", "log",
"tokio", "tokio",
@@ -3380,12 +3380,12 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-doc" name = "datafusion-doc"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
[[package]] [[package]]
name = "datafusion-execution" name = "datafusion-execution"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"arrow 54.2.1", "arrow 54.2.1",
"dashmap", "dashmap",
@@ -3403,7 +3403,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-expr" name = "datafusion-expr"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"arrow 54.2.1", "arrow 54.2.1",
"chrono", "chrono",
@@ -3423,7 +3423,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-expr-common" name = "datafusion-expr-common"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"arrow 54.2.1", "arrow 54.2.1",
"datafusion-common", "datafusion-common",
@@ -3434,7 +3434,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-functions" name = "datafusion-functions"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"arrow 54.2.1", "arrow 54.2.1",
"arrow-buffer 54.3.1", "arrow-buffer 54.3.1",
@@ -3463,7 +3463,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-functions-aggregate" name = "datafusion-functions-aggregate"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"ahash 0.8.11", "ahash 0.8.11",
"arrow 54.2.1", "arrow 54.2.1",
@@ -3484,7 +3484,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-functions-aggregate-common" name = "datafusion-functions-aggregate-common"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"ahash 0.8.11", "ahash 0.8.11",
"arrow 54.2.1", "arrow 54.2.1",
@@ -3496,7 +3496,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-functions-nested" name = "datafusion-functions-nested"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"arrow 54.2.1", "arrow 54.2.1",
"arrow-array 54.2.1", "arrow-array 54.2.1",
@@ -3518,7 +3518,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-functions-table" name = "datafusion-functions-table"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"arrow 54.2.1", "arrow 54.2.1",
"async-trait", "async-trait",
@@ -3533,7 +3533,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-functions-window" name = "datafusion-functions-window"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"datafusion-common", "datafusion-common",
"datafusion-doc", "datafusion-doc",
@@ -3549,7 +3549,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-functions-window-common" name = "datafusion-functions-window-common"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"datafusion-common", "datafusion-common",
"datafusion-physical-expr-common", "datafusion-physical-expr-common",
@@ -3558,7 +3558,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-macros" name = "datafusion-macros"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"datafusion-expr", "datafusion-expr",
"quote", "quote",
@@ -3568,7 +3568,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-optimizer" name = "datafusion-optimizer"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"arrow 54.2.1", "arrow 54.2.1",
"chrono", "chrono",
@@ -3586,7 +3586,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-physical-expr" name = "datafusion-physical-expr"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"ahash 0.8.11", "ahash 0.8.11",
"arrow 54.2.1", "arrow 54.2.1",
@@ -3609,7 +3609,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-physical-expr-common" name = "datafusion-physical-expr-common"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"ahash 0.8.11", "ahash 0.8.11",
"arrow 54.2.1", "arrow 54.2.1",
@@ -3622,7 +3622,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-physical-optimizer" name = "datafusion-physical-optimizer"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"arrow 54.2.1", "arrow 54.2.1",
"arrow-schema 54.3.1", "arrow-schema 54.3.1",
@@ -3643,7 +3643,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-physical-plan" name = "datafusion-physical-plan"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"ahash 0.8.11", "ahash 0.8.11",
"arrow 54.2.1", "arrow 54.2.1",
@@ -3673,7 +3673,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-sql" name = "datafusion-sql"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"arrow 54.2.1", "arrow 54.2.1",
"arrow-array 54.2.1", "arrow-array 54.2.1",
@@ -3691,7 +3691,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-substrait" name = "datafusion-substrait"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"async-recursion", "async-recursion",
"async-trait", "async-trait",
@@ -5133,7 +5133,7 @@ dependencies = [
[[package]] [[package]]
name = "greptime-proto" name = "greptime-proto"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=52083925a15d741c259800a9a54eba3467939180#52083925a15d741c259800a9a54eba3467939180" source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=2dca1dc67862d7b410838aef81232274c019b3f6#2dca1dc67862d7b410838aef81232274c019b3f6"
dependencies = [ dependencies = [
"prost 0.13.5", "prost 0.13.5",
"serde", "serde",

View File

@@ -116,15 +116,15 @@ clap = { version = "4.4", features = ["derive"] }
config = "0.13.0" config = "0.13.0"
crossbeam-utils = "0.8" crossbeam-utils = "0.8"
dashmap = "6.1" dashmap = "6.1"
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" } datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" } datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" } datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" } datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" } datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" } datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" } datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" } datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" } datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
deadpool = "0.12" deadpool = "0.12"
deadpool-postgres = "0.14" deadpool-postgres = "0.14"
derive_builder = "0.20" derive_builder = "0.20"
@@ -133,7 +133,7 @@ etcd-client = "0.14"
fst = "0.4.7" fst = "0.4.7"
futures = "0.3" futures = "0.3"
futures-util = "0.3" futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "52083925a15d741c259800a9a54eba3467939180" } greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2dca1dc67862d7b410838aef81232274c019b3f6" }
hex = "0.4" hex = "0.4"
http = "1" http = "1"
humantime = "2.1" humantime = "2.1"

View File

@@ -49,7 +49,6 @@ max_send_message_size = "512MB"
## - `transport`: only enable gRPC transport compression (zstd) ## - `transport`: only enable gRPC transport compression (zstd)
## - `arrow_ipc`: only enable Arrow IPC compression (lz4) ## - `arrow_ipc`: only enable Arrow IPC compression (lz4)
## - `all`: enable all compression. ## - `all`: enable all compression.
## Default to `none`
flight_compression = "arrow_ipc" flight_compression = "arrow_ipc"
## gRPC server TLS options, see `mysql.tls` section. ## gRPC server TLS options, see `mysql.tls` section.

View File

@@ -59,7 +59,6 @@ runtime_size = 8
## - `transport`: only enable gRPC transport compression (zstd) ## - `transport`: only enable gRPC transport compression (zstd)
## - `arrow_ipc`: only enable Arrow IPC compression (lz4) ## - `arrow_ipc`: only enable Arrow IPC compression (lz4)
## - `all`: enable all compression. ## - `all`: enable all compression.
## Default to `none`
flight_compression = "arrow_ipc" flight_compression = "arrow_ipc"
## gRPC server TLS options, see `mysql.tls` section. ## gRPC server TLS options, see `mysql.tls` section.

View File

@@ -58,7 +58,6 @@ where
info!("{desc}, average operation cost: {cost:.2} ms"); info!("{desc}, average operation cost: {cost:.2} ms");
} }
/// Command to benchmark table metadata operations.
#[derive(Debug, Default, Parser)] #[derive(Debug, Default, Parser)]
pub struct BenchTableMetadataCommand { pub struct BenchTableMetadataCommand {
#[clap(long)] #[clap(long)]

View File

@@ -244,18 +244,6 @@ pub enum Error {
#[snafu(implicit)] #[snafu(implicit)]
location: Location, location: Location,
}, },
#[snafu(display("Unsupported memory backend"))]
UnsupportedMemoryBackend {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("File path invalid: {}", msg))]
InvalidFilePath {
msg: String,
#[snafu(implicit)]
location: Location,
},
} }
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
@@ -274,8 +262,6 @@ impl ErrorExt for Error {
| Error::ConnectEtcd { .. } | Error::ConnectEtcd { .. }
| Error::CreateDir { .. } | Error::CreateDir { .. }
| Error::EmptyResult { .. } | Error::EmptyResult { .. }
| Error::InvalidFilePath { .. }
| Error::UnsupportedMemoryBackend { .. }
| Error::ParseProxyOpts { .. } => StatusCode::InvalidArguments, | Error::ParseProxyOpts { .. } => StatusCode::InvalidArguments,
Error::StartProcedureManager { source, .. } Error::StartProcedureManager { source, .. }

View File

@@ -50,7 +50,6 @@ enum ExportTarget {
All, All,
} }
/// Command for exporting data from the GreptimeDB.
#[derive(Debug, Default, Parser)] #[derive(Debug, Default, Parser)]
pub struct ExportCommand { pub struct ExportCommand {
/// Server address to connect /// Server address to connect

View File

@@ -40,7 +40,6 @@ enum ImportTarget {
All, All,
} }
/// Command to import data from a directory into a GreptimeDB instance.
#[derive(Debug, Default, Parser)] #[derive(Debug, Default, Parser)]
pub struct ImportCommand { pub struct ImportCommand {
/// Server address to connect /// Server address to connect

View File

@@ -20,7 +20,7 @@ mod import;
mod meta_snapshot; mod meta_snapshot;
use async_trait::async_trait; use async_trait::async_trait;
use clap::{Parser, Subcommand}; use clap::Parser;
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
pub use database::DatabaseClient; pub use database::DatabaseClient;
use error::Result; use error::Result;
@@ -28,7 +28,7 @@ use error::Result;
pub use crate::bench::BenchTableMetadataCommand; pub use crate::bench::BenchTableMetadataCommand;
pub use crate::export::ExportCommand; pub use crate::export::ExportCommand;
pub use crate::import::ImportCommand; pub use crate::import::ImportCommand;
pub use crate::meta_snapshot::{MetaCommand, MetaInfoCommand, MetaRestoreCommand, MetaSaveCommand}; pub use crate::meta_snapshot::{MetaRestoreCommand, MetaSnapshotCommand};
#[async_trait] #[async_trait]
pub trait Tool: Send + Sync { pub trait Tool: Send + Sync {
@@ -51,19 +51,3 @@ impl AttachCommand {
unimplemented!("Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373") unimplemented!("Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373")
} }
} }
/// Subcommand for data operations like export and import.
#[derive(Subcommand)]
pub enum DataCommand {
Export(ExportCommand),
Import(ImportCommand),
}
impl DataCommand {
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
match self {
DataCommand::Export(cmd) => cmd.build().await,
DataCommand::Import(cmd) => cmd.build().await,
}
}
}

View File

@@ -12,11 +12,10 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use clap::{Parser, Subcommand}; use clap::Parser;
use common_base::secrets::{ExposeSecret, SecretString}; use common_base::secrets::{ExposeSecret, SecretString};
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_meta::kv_backend::chroot::ChrootKvBackend; use common_meta::kv_backend::chroot::ChrootKvBackend;
@@ -27,50 +26,10 @@ use meta_srv::bootstrap::create_etcd_client;
use meta_srv::metasrv::BackendImpl; use meta_srv::metasrv::BackendImpl;
use object_store::services::{Fs, S3}; use object_store::services::{Fs, S3};
use object_store::ObjectStore; use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt}; use snafu::ResultExt;
use crate::error::{ use crate::error::{KvBackendNotSetSnafu, OpenDalSnafu, S3ConfigNotSetSnafu};
InvalidFilePathSnafu, KvBackendNotSetSnafu, OpenDalSnafu, S3ConfigNotSetSnafu,
UnsupportedMemoryBackendSnafu,
};
use crate::Tool; use crate::Tool;
/// Subcommand for metadata snapshot management.
#[derive(Subcommand)]
pub enum MetaCommand {
#[clap(subcommand)]
Snapshot(MetaSnapshotCommand),
}
impl MetaCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
match self {
MetaCommand::Snapshot(cmd) => cmd.build().await,
}
}
}
/// Subcommand for metadata snapshot operations. such as save, restore and info.
#[derive(Subcommand)]
pub enum MetaSnapshotCommand {
/// Export metadata snapshot tool.
Save(MetaSaveCommand),
/// Restore metadata snapshot tool.
Restore(MetaRestoreCommand),
/// Explore metadata from metadata snapshot.
Info(MetaInfoCommand),
}
impl MetaSnapshotCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
match self {
MetaSnapshotCommand::Save(cmd) => cmd.build().await,
MetaSnapshotCommand::Restore(cmd) => cmd.build().await,
MetaSnapshotCommand::Info(cmd) => cmd.build().await,
}
}
}
#[derive(Debug, Default, Parser)] #[derive(Debug, Default, Parser)]
struct MetaConnection { struct MetaConnection {
/// The endpoint of store. one of etcd, pg or mysql. /// The endpoint of store. one of etcd, pg or mysql.
@@ -132,9 +91,6 @@ impl MetaConnection {
.await .await
.map_err(BoxedError::new)?) .map_err(BoxedError::new)?)
} }
Some(BackendImpl::MemoryStore) => UnsupportedMemoryBackendSnafu
.fail()
.map_err(BoxedError::new),
_ => KvBackendNotSetSnafu { backend: "all" } _ => KvBackendNotSetSnafu { backend: "all" }
.fail() .fail()
.map_err(BoxedError::new), .map_err(BoxedError::new),
@@ -214,7 +170,7 @@ impl S3Config {
/// It will dump the metadata snapshot to local file or s3 bucket. /// It will dump the metadata snapshot to local file or s3 bucket.
/// The snapshot file will be in binary format. /// The snapshot file will be in binary format.
#[derive(Debug, Default, Parser)] #[derive(Debug, Default, Parser)]
pub struct MetaSaveCommand { pub struct MetaSnapshotCommand {
/// The connection to the metadata store. /// The connection to the metadata store.
#[clap(flatten)] #[clap(flatten)]
connection: MetaConnection, connection: MetaConnection,
@@ -240,7 +196,7 @@ fn create_local_file_object_store(root: &str) -> Result<ObjectStore, BoxedError>
Ok(object_store) Ok(object_store)
} }
impl MetaSaveCommand { impl MetaSnapshotCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> { pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
let kvbackend = self.connection.build().await?; let kvbackend = self.connection.build().await?;
let output_dir = &self.output_dir; let output_dir = &self.output_dir;
@@ -371,89 +327,3 @@ impl Tool for MetaRestoreTool {
} }
} }
} }
/// Explore metadata from metadata snapshot.
#[derive(Debug, Default, Parser)]
pub struct MetaInfoCommand {
/// The s3 config.
#[clap(flatten)]
s3_config: S3Config,
/// The name of the target snapshot file. we will add the file extension automatically.
#[clap(long, default_value = "metadata_snapshot")]
file_name: String,
/// The query string to filter the metadata.
#[clap(long, default_value = "*")]
inspect_key: String,
/// The limit of the metadata to query.
#[clap(long)]
limit: Option<usize>,
}
pub struct MetaInfoTool {
inner: ObjectStore,
source_file: String,
inspect_key: String,
limit: Option<usize>,
}
#[async_trait]
impl Tool for MetaInfoTool {
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
let result = MetadataSnapshotManager::info(
&self.inner,
&self.source_file,
&self.inspect_key,
self.limit,
)
.await
.map_err(BoxedError::new)?;
for item in result {
println!("{}", item);
}
Ok(())
}
}
impl MetaInfoCommand {
fn decide_object_store_root_for_local_store(
file_path: &str,
) -> Result<(&str, &str), BoxedError> {
let path = Path::new(file_path);
let parent = path
.parent()
.and_then(|p| p.to_str())
.context(InvalidFilePathSnafu { msg: file_path })
.map_err(BoxedError::new)?;
let file_name = path
.file_name()
.and_then(|f| f.to_str())
.context(InvalidFilePathSnafu { msg: file_path })
.map_err(BoxedError::new)?;
let root = if parent.is_empty() { "." } else { parent };
Ok((root, file_name))
}
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
let object_store = self.s3_config.build("").map_err(BoxedError::new)?;
if let Some(store) = object_store {
let tool = MetaInfoTool {
inner: store,
source_file: self.file_name.clone(),
inspect_key: self.inspect_key.clone(),
limit: self.limit,
};
Ok(Box::new(tool))
} else {
let (root, file_name) =
Self::decide_object_store_root_for_local_store(&self.file_name)?;
let object_store = create_local_file_object_store(root)?;
let tool = MetaInfoTool {
inner: object_store,
source_file: file_name.to_string(),
inspect_key: self.inspect_key.clone(),
limit: self.limit,
};
Ok(Box::new(tool))
}
}
}

View File

@@ -162,23 +162,12 @@ impl Client {
.as_bytes() as usize .as_bytes() as usize
} }
pub fn make_flight_client( pub fn make_flight_client(&self) -> Result<FlightClient> {
&self,
send_compression: bool,
accept_compression: bool,
) -> Result<FlightClient> {
let (addr, channel) = self.find_channel()?; let (addr, channel) = self.find_channel()?;
let mut client = FlightServiceClient::new(channel) let client = FlightServiceClient::new(channel)
.max_decoding_message_size(self.max_grpc_recv_message_size()) .max_decoding_message_size(self.max_grpc_recv_message_size())
.max_encoding_message_size(self.max_grpc_send_message_size()); .max_encoding_message_size(self.max_grpc_send_message_size());
// todo(hl): support compression methods.
if send_compression {
client = client.send_compressed(CompressionEncoding::Zstd);
}
if accept_compression {
client = client.accept_compressed(CompressionEncoding::Zstd);
}
Ok(FlightClient { addr, client }) Ok(FlightClient { addr, client })
} }

View File

@@ -49,16 +49,7 @@ impl NodeManager for NodeClients {
async fn datanode(&self, datanode: &Peer) -> DatanodeRef { async fn datanode(&self, datanode: &Peer) -> DatanodeRef {
let client = self.get_client(datanode).await; let client = self.get_client(datanode).await;
let ChannelConfig { Arc::new(RegionRequester::new(client))
send_compression,
accept_compression,
..
} = self.channel_manager.config();
Arc::new(RegionRequester::new(
client,
*send_compression,
*accept_compression,
))
} }
async fn flownode(&self, flownode: &Peer) -> FlownodeRef { async fn flownode(&self, flownode: &Peer) -> FlownodeRef {

View File

@@ -287,7 +287,7 @@ impl Database {
let mut request = tonic::Request::new(request); let mut request = tonic::Request::new(request);
Self::put_hints(request.metadata_mut(), hints)?; Self::put_hints(request.metadata_mut(), hints)?;
let mut client = self.client.make_flight_client(false, false)?; let mut client = self.client.make_flight_client()?;
let response = client.mut_inner().do_get(request).await.or_else(|e| { let response = client.mut_inner().do_get(request).await.or_else(|e| {
let tonic_code = e.code(); let tonic_code = e.code();
@@ -409,7 +409,7 @@ impl Database {
MetadataValue::from_str(db_to_put).context(InvalidTonicMetadataValueSnafu)?, MetadataValue::from_str(db_to_put).context(InvalidTonicMetadataValueSnafu)?,
); );
let mut client = self.client.make_flight_client(false, false)?; let mut client = self.client.make_flight_client()?;
let response = client.mut_inner().do_put(request).await?; let response = client.mut_inner().do_put(request).await?;
let response = response let response = response
.into_inner() .into_inner()

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use api::v1::flow::{DirtyWindowRequest, DirtyWindowRequests, FlowRequest, FlowResponse}; use api::v1::flow::{FlowRequest, FlowResponse};
use api::v1::region::InsertRequests; use api::v1::region::InsertRequests;
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_meta::node_manager::Flownode; use common_meta::node_manager::Flownode;
@@ -44,16 +44,6 @@ impl Flownode for FlowRequester {
.map_err(BoxedError::new) .map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu) .context(common_meta::error::ExternalSnafu)
} }
async fn handle_mark_window_dirty(
&self,
req: DirtyWindowRequest,
) -> common_meta::error::Result<FlowResponse> {
self.handle_mark_window_dirty(req)
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
}
} }
impl FlowRequester { impl FlowRequester {
@@ -101,20 +91,4 @@ impl FlowRequester {
.into_inner(); .into_inner();
Ok(response) Ok(response)
} }
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
let (addr, mut client) = self.client.raw_flow_client()?;
let response = client
.handle_mark_dirty_time_window(DirtyWindowRequests {
requests: vec![req],
})
.await
.or_else(|e| {
let code = e.code();
let err: crate::error::Error = e.into();
Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code })
})?
.into_inner();
Ok(response)
}
} }

View File

@@ -46,8 +46,6 @@ use crate::{metrics, Client, Error};
#[derive(Debug)] #[derive(Debug)]
pub struct RegionRequester { pub struct RegionRequester {
client: Client, client: Client,
send_compression: bool,
accept_compression: bool,
} }
#[async_trait] #[async_trait]
@@ -91,18 +89,12 @@ impl Datanode for RegionRequester {
} }
impl RegionRequester { impl RegionRequester {
pub fn new(client: Client, send_compression: bool, accept_compression: bool) -> Self { pub fn new(client: Client) -> Self {
Self { Self { client }
client,
send_compression,
accept_compression,
}
} }
pub async fn do_get_inner(&self, ticket: Ticket) -> Result<SendableRecordBatchStream> { pub async fn do_get_inner(&self, ticket: Ticket) -> Result<SendableRecordBatchStream> {
let mut flight_client = self let mut flight_client = self.client.make_flight_client()?;
.client
.make_flight_client(self.send_compression, self.accept_compression)?;
let response = flight_client let response = flight_client
.mut_inner() .mut_inner()
.do_get(ticket) .do_get(ticket)

View File

@@ -146,7 +146,6 @@ mod tests {
let output_dir = tempfile::tempdir().unwrap(); let output_dir = tempfile::tempdir().unwrap();
let cli = cli::Command::parse_from([ let cli = cli::Command::parse_from([
"cli", "cli",
"data",
"export", "export",
"--addr", "--addr",
"127.0.0.1:4000", "127.0.0.1:4000",

View File

@@ -364,16 +364,12 @@ impl StartCommand {
// frontend to datanode need not timeout. // frontend to datanode need not timeout.
// Some queries are expected to take long time. // Some queries are expected to take long time.
let mut channel_config = ChannelConfig { let channel_config = ChannelConfig {
timeout: None, timeout: None,
tcp_nodelay: opts.datanode.client.tcp_nodelay, tcp_nodelay: opts.datanode.client.tcp_nodelay,
connect_timeout: Some(opts.datanode.client.connect_timeout), connect_timeout: Some(opts.datanode.client.connect_timeout),
..Default::default() ..Default::default()
}; };
if opts.grpc.flight_compression.transport_compression() {
channel_config.accept_compression = true;
channel_config.send_compression = true;
}
let client = NodeClients::new(channel_config); let client = NodeClients::new(channel_config);
let instance = FrontendBuilder::new( let instance = FrontendBuilder::new(

View File

@@ -296,8 +296,6 @@ pub struct ChannelConfig {
pub max_recv_message_size: ReadableSize, pub max_recv_message_size: ReadableSize,
// Max gRPC sending(encoding) message size // Max gRPC sending(encoding) message size
pub max_send_message_size: ReadableSize, pub max_send_message_size: ReadableSize,
pub send_compression: bool,
pub accept_compression: bool,
} }
impl Default for ChannelConfig { impl Default for ChannelConfig {
@@ -318,8 +316,6 @@ impl Default for ChannelConfig {
client_tls: None, client_tls: None,
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
send_compression: false,
accept_compression: false,
} }
} }
} }
@@ -570,8 +566,6 @@ mod tests {
client_tls: None, client_tls: None,
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
send_compression: false,
accept_compression: false,
}, },
default_cfg default_cfg
); );
@@ -616,8 +610,6 @@ mod tests {
}), }),
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
send_compression: false,
accept_compression: false,
}, },
cfg cfg
); );

View File

@@ -15,7 +15,7 @@
use std::sync::Arc; use std::sync::Arc;
use api::region::RegionResponse; use api::region::RegionResponse;
use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse}; use api::v1::flow::{FlowRequest, FlowResponse};
use api::v1::region::{InsertRequests, RegionRequest}; use api::v1::region::{InsertRequests, RegionRequest};
pub use common_base::AffectedRows; pub use common_base::AffectedRows;
use common_query::request::QueryRequest; use common_query::request::QueryRequest;
@@ -42,9 +42,6 @@ pub trait Flownode: Send + Sync {
async fn handle(&self, request: FlowRequest) -> Result<FlowResponse>; async fn handle(&self, request: FlowRequest) -> Result<FlowResponse>;
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse>; async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse>;
/// Handles requests to mark time window as dirty.
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse>;
} }
pub type FlownodeRef = Arc<dyn Flownode>; pub type FlownodeRef = Arc<dyn Flownode>;

View File

@@ -14,7 +14,6 @@
pub mod file; pub mod file;
use std::borrow::Cow;
use std::fmt::{Display, Formatter}; use std::fmt::{Display, Formatter};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::time::Instant; use std::time::Instant;
@@ -272,49 +271,6 @@ impl MetadataSnapshotManager {
Ok((filename.to_string(), num_keyvalues as u64)) Ok((filename.to_string(), num_keyvalues as u64))
} }
fn format_output(key: Cow<'_, str>, value: Cow<'_, str>) -> String {
format!("{} => {}", key, value)
}
pub async fn info(
object_store: &ObjectStore,
file_path: &str,
query_str: &str,
limit: Option<usize>,
) -> Result<Vec<String>> {
let path = Path::new(file_path);
let file_name = path
.file_name()
.and_then(|s| s.to_str())
.context(InvalidFilePathSnafu { file_path })?;
let filename = FileName::try_from(file_name)?;
let data = object_store
.read(file_path)
.await
.context(ReadObjectSnafu { file_path })?;
let document = Document::from_slice(&filename.extension.format, &data.to_bytes())?;
let metadata_content = document.into_metadata_content()?.values();
let mut results = Vec::with_capacity(limit.unwrap_or(256));
for kv in metadata_content {
let key_str = String::from_utf8_lossy(&kv.key);
if let Some(prefix) = query_str.strip_suffix('*') {
if key_str.starts_with(prefix) {
let value_str = String::from_utf8_lossy(&kv.value);
results.push(Self::format_output(key_str, value_str));
}
} else if key_str == query_str {
let value_str = String::from_utf8_lossy(&kv.value);
results.push(Self::format_output(key_str, value_str));
}
if results.len() == limit.unwrap_or(usize::MAX) {
break;
}
}
Ok(results)
}
} }
#[cfg(test)] #[cfg(test)]

View File

@@ -111,11 +111,6 @@ impl MetadataContent {
pub fn into_iter(self) -> impl Iterator<Item = KeyValue> { pub fn into_iter(self) -> impl Iterator<Item = KeyValue> {
self.values.into_iter() self.values.into_iter()
} }
/// Returns the key-value pairs as a vector.
pub fn values(self) -> Vec<KeyValue> {
self.values
}
} }
/// The key-value pair of the backup file. /// The key-value pair of the backup file.

View File

@@ -15,7 +15,7 @@
use std::sync::Arc; use std::sync::Arc;
use api::region::RegionResponse; use api::region::RegionResponse;
use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse}; use api::v1::flow::{FlowRequest, FlowResponse};
use api::v1::region::{InsertRequests, RegionRequest}; use api::v1::region::{InsertRequests, RegionRequest};
pub use common_base::AffectedRows; pub use common_base::AffectedRows;
use common_query::request::QueryRequest; use common_query::request::QueryRequest;
@@ -67,14 +67,6 @@ pub trait MockFlownodeHandler: Sync + Send + Clone {
) -> Result<FlowResponse> { ) -> Result<FlowResponse> {
unimplemented!() unimplemented!()
} }
async fn handle_mark_window_dirty(
&self,
_peer: &Peer,
_req: DirtyWindowRequest,
) -> Result<FlowResponse> {
unimplemented!()
}
} }
/// A mock struct implements [NodeManager] only implement the `datanode` method. /// A mock struct implements [NodeManager] only implement the `datanode` method.
@@ -142,10 +134,6 @@ impl<T: MockFlownodeHandler> Flownode for MockNode<T> {
async fn handle_inserts(&self, requests: InsertRequests) -> Result<FlowResponse> { async fn handle_inserts(&self, requests: InsertRequests) -> Result<FlowResponse> {
self.handler.handle_inserts(&self.peer, requests).await self.handler.handle_inserts(&self.peer, requests).await
} }
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
self.handler.handle_mark_window_dirty(&self.peer, req).await
}
} }
#[async_trait::async_trait] #[async_trait::async_trait]

View File

@@ -31,7 +31,6 @@ use common_runtime::JoinHandle;
use common_telemetry::{error, info, trace, warn}; use common_telemetry::{error, info, trace, warn};
use datatypes::value::Value; use datatypes::value::Value;
use futures::TryStreamExt; use futures::TryStreamExt;
use greptime_proto::v1::flow::DirtyWindowRequest;
use itertools::Itertools; use itertools::Itertools;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use session::context::QueryContextBuilder; use session::context::QueryContextBuilder;
@@ -853,11 +852,6 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
.map(|_| Default::default()) .map(|_| Default::default())
.map_err(to_meta_err(snafu::location!())) .map_err(to_meta_err(snafu::location!()))
} }
async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
// todo: implement
unimplemented!()
}
} }
/// return a function to convert `crate::error::Error` to `common_meta::error::Error` /// return a function to convert `crate::error::Error` to `common_meta::error::Error`
@@ -880,98 +874,6 @@ fn to_meta_err(
} }
} }
#[async_trait::async_trait]
impl common_meta::node_manager::Flownode for StreamingEngine {
async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
let query_ctx = request
.header
.and_then(|h| h.query_context)
.map(|ctx| ctx.into());
match request.body {
Some(flow_request::Body::Create(CreateRequest {
flow_id: Some(task_id),
source_table_ids,
sink_table_name: Some(sink_table_name),
create_if_not_exists,
expire_after,
comment,
sql,
flow_options,
or_replace,
})) => {
let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
let sink_table_name = [
sink_table_name.catalog_name,
sink_table_name.schema_name,
sink_table_name.table_name,
];
let expire_after = expire_after.map(|e| e.value);
let args = CreateFlowArgs {
flow_id: task_id.id as u64,
sink_table_name,
source_table_ids,
create_if_not_exists,
or_replace,
expire_after,
comment: Some(comment),
sql: sql.clone(),
flow_options,
query_ctx,
};
let ret = self
.create_flow(args)
.await
.map_err(BoxedError::new)
.with_context(|_| CreateFlowSnafu { sql: sql.clone() })
.map_err(to_meta_err(snafu::location!()))?;
METRIC_FLOW_TASK_COUNT.inc();
Ok(FlowResponse {
affected_flows: ret
.map(|id| greptime_proto::v1::FlowId { id: id as u32 })
.into_iter()
.collect_vec(),
..Default::default()
})
}
Some(flow_request::Body::Drop(DropRequest {
flow_id: Some(flow_id),
})) => {
self.remove_flow(flow_id.id as u64)
.await
.map_err(to_meta_err(snafu::location!()))?;
METRIC_FLOW_TASK_COUNT.dec();
Ok(Default::default())
}
Some(flow_request::Body::Flush(FlushFlow {
flow_id: Some(flow_id),
})) => {
let row = self
.flush_flow_inner(flow_id.id as u64)
.await
.map_err(to_meta_err(snafu::location!()))?;
Ok(FlowResponse {
affected_flows: vec![flow_id],
affected_rows: row as u64,
..Default::default()
})
}
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
}
}
async fn handle_inserts(&self, request: InsertRequests) -> MetaResult<FlowResponse> {
self.handle_inserts_inner(request)
.await
.map(|_| Default::default())
.map_err(to_meta_err(snafu::location!()))
}
async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
// todo: implement
unimplemented!()
}
}
impl FlowEngine for StreamingEngine { impl FlowEngine for StreamingEngine {
async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> { async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
self.create_flow_inner(args).await self.create_flow_inner(args).await

View File

@@ -17,7 +17,6 @@
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
use std::sync::Arc; use std::sync::Arc;
use api::v1::flow::{DirtyWindowRequests, FlowResponse};
use catalog::CatalogManagerRef; use catalog::CatalogManagerRef;
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_meta::ddl::create_flow::FlowType; use common_meta::ddl::create_flow::FlowType;
@@ -30,7 +29,8 @@ use common_telemetry::{debug, info};
use common_time::TimeToLive; use common_time::TimeToLive;
use query::QueryEngineRef; use query::QueryEngineRef;
use snafu::{ensure, OptionExt, ResultExt}; use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId}; use store_api::storage::RegionId;
use table::metadata::TableId;
use tokio::sync::{oneshot, RwLock}; use tokio::sync::{oneshot, RwLock};
use crate::batching_mode::frontend_client::FrontendClient; use crate::batching_mode::frontend_client::FrontendClient;
@@ -42,7 +42,6 @@ use crate::error::{
ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu, ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
UnexpectedSnafu, UnsupportedSnafu, UnexpectedSnafu, UnsupportedSnafu,
}; };
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW_RANGE;
use crate::{CreateFlowArgs, Error, FlowId, TableName}; use crate::{CreateFlowArgs, Error, FlowId, TableName};
/// Batching mode Engine, responsible for driving all the batching mode tasks /// Batching mode Engine, responsible for driving all the batching mode tasks
@@ -78,122 +77,6 @@ impl BatchingEngine {
} }
} }
pub async fn handle_mark_dirty_time_window(
&self,
reqs: DirtyWindowRequests,
) -> Result<FlowResponse, Error> {
let table_info_mgr = self.table_meta.table_info_manager();
let mut group_by_table_id: HashMap<u32, Vec<_>> = HashMap::new();
for r in reqs.requests {
let tid = TableId::from(r.table_id);
let entry = group_by_table_id.entry(tid).or_default();
entry.extend(r.dirty_time_ranges);
}
let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
let table_infos =
table_info_mgr
.batch_get(&tids)
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: format!("Failed to get table info for table ids: {:?}", tids),
})?;
let group_by_table_name = group_by_table_id
.into_iter()
.filter_map(|(id, rows)| {
let table_name = table_infos.get(&id).map(|info| info.table_name());
let Some(table_name) = table_name else {
warn!("Failed to get table infos for table id: {:?}", id);
return None;
};
let table_name = [
table_name.catalog_name,
table_name.schema_name,
table_name.table_name,
];
let schema = &table_infos.get(&id).unwrap().table_info.meta.schema;
let time_index_unit = schema.column_schemas[schema.timestamp_index.unwrap()]
.data_type
.as_timestamp()
.unwrap()
.unit();
Some((table_name, (rows, time_index_unit)))
})
.collect::<HashMap<_, _>>();
let group_by_table_name = Arc::new(group_by_table_name);
let mut handles = Vec::new();
let tasks = self.tasks.read().await;
for (_flow_id, task) in tasks.iter() {
let src_table_names = &task.config.source_table_names;
if src_table_names
.iter()
.all(|name| !group_by_table_name.contains_key(name))
{
continue;
}
let group_by_table_name = group_by_table_name.clone();
let task = task.clone();
let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
let src_table_names = &task.config.source_table_names;
let mut all_dirty_windows = vec![];
for src_table_name in src_table_names {
if let Some((window_ranges, unit)) = group_by_table_name.get(src_table_name) {
let Some(expr) = &task.config.time_window_expr else {
continue;
};
for window in window_ranges {
let align_start = expr
.eval(common_time::Timestamp::new(window.start_value, *unit))?
.0
.context(UnexpectedSnafu {
reason: "Failed to eval start value",
})?;
let align_end = expr
.eval(common_time::Timestamp::new(window.end_value, *unit))?
.1
.context(UnexpectedSnafu {
reason: "Failed to eval end value",
})?;
all_dirty_windows.push((align_start, align_end));
}
}
}
let mut state = task.state.write().unwrap();
let flow_id_label = task.config.flow_id.to_string();
for (s, e) in all_dirty_windows {
METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW_RANGE
.with_label_values(&[&flow_id_label])
.observe(e.sub(&s).unwrap_or_default().num_seconds() as f64);
state.dirty_time_windows.add_window(s, Some(e));
}
Ok(())
});
handles.push(handle);
}
drop(tasks);
for handle in handles {
match handle.await {
Err(e) => {
warn!("Failed to handle inserts: {e}");
}
Ok(Ok(())) => (),
Ok(Err(e)) => {
warn!("Failed to handle inserts: {e}");
}
}
}
Ok(Default::default())
}
pub async fn handle_inserts_inner( pub async fn handle_inserts_inner(
&self, &self,
request: api::v1::region::InsertRequests, request: api::v1::region::InsertRequests,

View File

@@ -286,7 +286,7 @@ impl FrontendClient {
/// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts` /// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts`
/// and is able to process query /// and is able to process query
async fn get_random_active_frontend( pub(crate) async fn get_random_active_frontend(
&self, &self,
catalog: &str, catalog: &str,
schema: &str, schema: &str,
@@ -382,7 +382,7 @@ impl FrontendClient {
}), }),
catalog, catalog,
schema, schema,
&mut None, None,
task, task,
) )
.await .await
@@ -394,16 +394,28 @@ impl FrontendClient {
req: api::v1::greptime_request::Request, req: api::v1::greptime_request::Request,
catalog: &str, catalog: &str,
schema: &str, schema: &str,
peer_desc: &mut Option<PeerDesc>, use_peer: Option<Peer>,
task: Option<&BatchingTask>, task: Option<&BatchingTask>,
) -> Result<u32, Error> { ) -> Result<u32, Error> {
match self { match self {
FrontendClient::Distributed { fe_stats, .. } => { FrontendClient::Distributed {
let db = self.get_random_active_frontend(catalog, schema).await?; fe_stats, chnl_mgr, ..
} => {
*peer_desc = Some(PeerDesc::Dist { let db = if let Some(peer) = use_peer {
peer: db.peer.clone(), DatabaseWithPeer::new(
}); Database::new(
catalog,
schema,
Client::with_manager_and_urls(
chnl_mgr.clone(),
vec![peer.addr.clone()],
),
),
peer,
)
} else {
self.get_random_active_frontend(catalog, schema).await?
};
let flow_id = task.map(|t| t.config.flow_id).unwrap_or_default(); let flow_id = task.map(|t| t.config.flow_id).unwrap_or_default();
let _guard = fe_stats.observe(&db.peer.addr, flow_id); let _guard = fe_stats.observe(&db.peer.addr, flow_id);

View File

@@ -31,9 +31,8 @@ use crate::batching_mode::time_window::TimeWindowExpr;
use crate::batching_mode::MIN_REFRESH_DURATION; use crate::batching_mode::MIN_REFRESH_DURATION;
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu}; use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
use crate::metrics::{ use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT, METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT,
METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE,
}; };
use crate::{Error, FlowId}; use crate::{Error, FlowId};
@@ -54,6 +53,8 @@ pub struct TaskState {
pub(crate) shutdown_rx: oneshot::Receiver<()>, pub(crate) shutdown_rx: oneshot::Receiver<()>,
/// Task handle /// Task handle
pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>, pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
/// Slow Query metrics update task handle
pub(crate) slow_query_metric_task: Option<tokio::task::JoinHandle<()>>,
/// min run interval in seconds /// min run interval in seconds
pub(crate) min_run_interval: Option<u64>, pub(crate) min_run_interval: Option<u64>,
@@ -70,6 +71,7 @@ impl TaskState {
exec_state: ExecState::Idle, exec_state: ExecState::Idle,
shutdown_rx, shutdown_rx,
task_handle: None, task_handle: None,
slow_query_metric_task: None,
min_run_interval: None, min_run_interval: None,
max_filter_num: None, max_filter_num: None,
} }
@@ -95,7 +97,7 @@ impl TaskState {
/// TODO: Make this behavior configurable. /// TODO: Make this behavior configurable.
pub fn get_next_start_query_time( pub fn get_next_start_query_time(
&self, &self,
_flow_id: FlowId, flow_id: FlowId,
_time_window_size: &Option<Duration>, _time_window_size: &Option<Duration>,
max_timeout: Option<Duration>, max_timeout: Option<Duration>,
) -> Instant { ) -> Instant {
@@ -109,7 +111,7 @@ impl TaskState {
); );
// if have dirty time window, execute immediately to clean dirty time window // if have dirty time window, execute immediately to clean dirty time window
/*if self.dirty_time_windows.windows.is_empty() { if self.dirty_time_windows.windows.is_empty() {
self.last_update_time + next_duration self.last_update_time + next_duration
} else { } else {
debug!( debug!(
@@ -119,10 +121,7 @@ impl TaskState {
self.dirty_time_windows.windows self.dirty_time_windows.windows
); );
Instant::now() Instant::now()
}*/ }
// wait for next duration anyway
self.last_update_time + next_duration
} }
} }
@@ -264,11 +263,17 @@ impl DirtyTimeWindows {
self.windows = new_windows; self.windows = new_windows;
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
.with_label_values(&[flow_id.to_string().as_str()]) .with_label_values(&[
flow_id.to_string().as_str(),
format!("{}", window_size).as_str(),
])
.observe(to_be_query.len() as f64); .observe(to_be_query.len() as f64);
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT
.with_label_values(&[flow_id.to_string().as_str()]) .with_label_values(&[
flow_id.to_string().as_str(),
format!("{}", window_size).as_str(),
])
.observe(self.windows.len() as f64); .observe(self.windows.len() as f64);
let full_time_range = to_be_query let full_time_range = to_be_query
@@ -281,25 +286,13 @@ impl DirtyTimeWindows {
} }
}) })
.num_seconds() as f64; .num_seconds() as f64;
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE
.with_label_values(&[flow_id.to_string().as_str()]) .with_label_values(&[
flow_id.to_string().as_str(),
format!("{}", window_size).as_str(),
])
.observe(full_time_range); .observe(full_time_range);
let stalled_time_range =
self.windows
.iter()
.fold(chrono::Duration::zero(), |acc, (start, end)| {
if let Some(end) = end {
acc + end.sub(start).unwrap_or(chrono::Duration::zero())
} else {
acc
}
});
METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE
.with_label_values(&[flow_id.to_string().as_str()])
.observe(stalled_time_range.num_seconds() as f64);
let mut expr_lst = vec![]; let mut expr_lst = vec![];
for (start, end) in to_be_query.into_iter() { for (start, end) in to_be_query.into_iter() {
// align using time window exprs // align using time window exprs

View File

@@ -62,8 +62,8 @@ use crate::error::{
}; };
use crate::metrics::{ use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT, METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
METRIC_FLOW_ROWS, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT, METRIC_FLOW_ROWS,
}; };
use crate::{Error, FlowId}; use crate::{Error, FlowId};
@@ -83,6 +83,14 @@ pub struct TaskConfig {
query_type: QueryType, query_type: QueryType,
} }
impl TaskConfig {
pub fn time_window_size(&self) -> Option<Duration> {
self.time_window_expr
.as_ref()
.and_then(|expr| *expr.time_window_size())
}
}
fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> { fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
let stmts = let stmts =
ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default()) ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default())
@@ -336,11 +344,53 @@ impl BatchingTask {
})?; })?;
let plan = expanded_plan; let plan = expanded_plan;
let mut peer_desc = None;
let db = frontend_client
.get_random_active_frontend(catalog, schema)
.await?;
let peer_desc = db.peer.clone();
let (tx, mut rx) = oneshot::channel();
let peer_inner = peer_desc.clone();
let window_size_pretty = format!(
"{}s",
self.config.time_window_size().unwrap_or_default().as_secs()
);
let inner_window_size_pretty = window_size_pretty.clone();
let flow_id = self.config.flow_id;
let slow_query_metric_task = tokio::task::spawn(async move {
tokio::time::sleep(SLOW_QUERY_THRESHOLD).await;
METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT
.with_label_values(&[
flow_id.to_string().as_str(),
&peer_inner.to_string(),
inner_window_size_pretty.as_str(),
])
.add(1.0);
while rx.try_recv() == Err(TryRecvError::Empty) {
// sleep for a while before next update
tokio::time::sleep(MIN_REFRESH_DURATION).await;
}
METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT
.with_label_values(&[
flow_id.to_string().as_str(),
&peer_inner.to_string(),
inner_window_size_pretty.as_str(),
])
.sub(1.0);
});
self.state.write().unwrap().slow_query_metric_task = Some(slow_query_metric_task);
let res = { let res = {
let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
.with_label_values(&[flow_id.to_string().as_str()]) .with_label_values(&[
flow_id.to_string().as_str(),
format!(
"{}s",
self.config.time_window_size().unwrap_or_default().as_secs()
)
.as_str(),
])
.start_timer(); .start_timer();
// hack and special handling the insert logical plan // hack and special handling the insert logical plan
@@ -369,10 +419,12 @@ impl BatchingTask {
}; };
frontend_client frontend_client
.handle(req, catalog, schema, &mut peer_desc, Some(self)) .handle(req, catalog, schema, Some(db.peer), Some(self))
.await .await
}; };
// signaling the slow query metric task to stop
let _ = tx.send(());
let elapsed = instant.elapsed(); let elapsed = instant.elapsed();
if let Ok(affected_rows) = &res { if let Ok(affected_rows) = &res {
debug!( debug!(
@@ -398,7 +450,12 @@ impl BatchingTask {
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
.with_label_values(&[ .with_label_values(&[
flow_id.to_string().as_str(), flow_id.to_string().as_str(),
&peer_desc.unwrap_or_default().to_string(), &peer_desc.to_string(),
format!(
"{}s",
self.config.time_window_size().unwrap_or_default().as_secs()
)
.as_str(),
]) ])
.observe(elapsed.as_secs_f64()); .observe(elapsed.as_secs_f64());
} }

View File

@@ -31,22 +31,29 @@ lazy_static! {
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME: HistogramVec = register_histogram_vec!( pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME: HistogramVec = register_histogram_vec!(
"greptime_flow_batching_engine_query_time_secs", "greptime_flow_batching_engine_query_time_secs",
"flow batching engine query time(seconds)", "flow batching engine query time(seconds)",
&["flow_id"], &["flow_id", "time_window_granularity"],
vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,] vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,]
) )
.unwrap(); .unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!( pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!(
"greptime_flow_batching_engine_slow_query_secs", "greptime_flow_batching_engine_slow_query_secs",
"flow batching engine slow query(seconds)", "flow batching engine slow query(seconds), updated after query finished",
&["flow_id", "peer"], &["flow_id", "peer", "time_window_granularity"],
vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.] vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.]
) )
.unwrap(); .unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT: GaugeVec =
register_gauge_vec!(
"greptime_flow_batching_engine_real_time_slow_query_number",
"flow batching engine real time slow query number, updated in real time",
&["flow_id", "peer", "time_window_granularity"],
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT: HistogramVec = pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT: HistogramVec =
register_histogram_vec!( register_histogram_vec!(
"greptime_flow_batching_engine_stalled_query_window_cnt", "greptime_flow_batching_engine_stalled_query_window_cnt",
"flow batching engine stalled query time window count", "flow batching engine stalled query time window count",
&["flow_id"], &["flow_id", "time_window_granularity"],
vec![0.0, 5., 10., 20., 40.] vec![0.0, 5., 10., 20., 40.]
) )
.unwrap(); .unwrap();
@@ -54,34 +61,26 @@ lazy_static! {
register_histogram_vec!( register_histogram_vec!(
"greptime_flow_batching_engine_query_window_cnt", "greptime_flow_batching_engine_query_window_cnt",
"flow batching engine query time window count", "flow batching engine query time window count",
&["flow_id"], &["flow_id", "time_window_granularity"],
vec![0.0, 5., 10., 20., 40.] vec![0.0, 5., 10., 20., 40.]
) )
.unwrap(); .unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE: HistogramVec = pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE: HistogramVec =
register_histogram_vec!( register_histogram_vec!(
"greptime_flow_batching_engine_query_window_size_secs", "greptime_flow_batching_engine_query_time_range_secs",
"flow batching engine query window size(seconds)", "flow batching engine query time range(seconds)",
&["flow_id"], &["flow_id", "time_window_granularity"],
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.] vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
) )
.unwrap(); .unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE: HistogramVec = pub static ref METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD: HistogramVec =
register_histogram_vec!( register_histogram_vec!(
"greptime_flow_batching_engine_stalled_window_size_secs", "greptime_flow_batching_engine_guess_fe_load",
"flow batching engine stalled window size(seconds)", "flow batching engine guessed frontend load",
&["flow_id"], &["fe_addr"],
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.] vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
) )
.unwrap(); .unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW_RANGE: HistogramVec =
register_histogram_vec!(
"greptime_flow_batching_engine_bulk_mark_time_window_range_secs",
"flow batching engine query time window range marked by bulk memtable in seconds",
&["flow_id"],
vec![0.0, 60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT: IntCounterVec = pub static ref METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT: IntCounterVec =
register_int_counter_vec!( register_int_counter_vec!(
"greptime_flow_batching_start_query_count", "greptime_flow_batching_start_query_count",
@@ -93,22 +92,14 @@ lazy_static! {
register_int_counter_vec!( register_int_counter_vec!(
"greptime_flow_batching_error_count", "greptime_flow_batching_error_count",
"flow batching engine error count per flow id", "flow batching engine error count per flow id",
&["flow_id"] &["flow_id"],
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD: HistogramVec =
register_histogram_vec!(
"greptime_flow_batching_engine_guess_fe_load",
"flow batching engine guessed frontend load",
&["fe_addr"],
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
) )
.unwrap(); .unwrap();
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge = pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap(); register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!( pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(
"greptime_flow_processed_rows", "greptime_flow_processed_rows",
"Count of rows flowing through the system.", "Count of rows flowing through the system",
&["direction"] &["direction"]
) )
.unwrap(); .unwrap();

View File

@@ -17,7 +17,6 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use api::v1::flow::DirtyWindowRequests;
use api::v1::{RowDeleteRequests, RowInsertRequests}; use api::v1::{RowDeleteRequests, RowInsertRequests};
use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME}; use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
use catalog::CatalogManagerRef; use catalog::CatalogManagerRef;
@@ -137,18 +136,6 @@ impl flow_server::Flow for FlowService {
.map(Response::new) .map(Response::new)
.map_err(to_status_with_last_err) .map_err(to_status_with_last_err)
} }
async fn handle_mark_dirty_time_window(
&self,
reqs: Request<DirtyWindowRequests>,
) -> Result<Response<FlowResponse>, Status> {
self.dual_engine
.batching_engine()
.handle_mark_dirty_time_window(reqs.into_inner())
.await
.map(Response::new)
.map_err(to_status_with_last_err)
}
} }
#[derive(Clone)] #[derive(Clone)]

View File

@@ -35,8 +35,8 @@ use servers::query_handler::grpc::GrpcQueryHandler;
use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContextRef; use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt}; use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableId;
use table::table_name::TableName; use table::table_name::TableName;
use table::TableRef;
use crate::error::{ use crate::error::{
CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu, CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu,
@@ -235,33 +235,34 @@ impl GrpcQueryHandler for Instance {
async fn put_record_batch( async fn put_record_batch(
&self, &self,
table_name: &TableName, table: &TableName,
table_ref: &mut Option<TableRef>, table_id: &mut Option<TableId>,
decoder: &mut FlightDecoder, decoder: &mut FlightDecoder,
data: FlightData, data: FlightData,
) -> Result<AffectedRows> { ) -> Result<AffectedRows> {
let table = if let Some(table) = table_ref { let table_id = if let Some(table_id) = table_id {
table.clone() *table_id
} else { } else {
let table = self let table = self
.catalog_manager() .catalog_manager()
.table( .table(
&table_name.catalog_name, &table.catalog_name,
&table_name.schema_name, &table.schema_name,
&table_name.table_name, &table.table_name,
None, None,
) )
.await .await
.context(CatalogSnafu)? .context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu { .with_context(|| TableNotFoundSnafu {
table_name: table_name.to_string(), table_name: table.to_string(),
})?; })?;
*table_ref = Some(table.clone()); let id = table.table_info().table_id();
table *table_id = Some(id);
id
}; };
self.inserter self.inserter
.handle_bulk_insert(table, decoder, data) .handle_bulk_insert(table_id, decoder, data)
.await .await
.context(TableOperationSnafu) .context(TableOperationSnafu)
} }

View File

@@ -12,28 +12,18 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::collections::HashSet;
use ahash::{HashMap, HashMapExt}; use ahash::{HashMap, HashMapExt};
use api::v1::flow::{DirtyWindowRequest, WindowRange};
use api::v1::region::{ use api::v1::region::{
bulk_insert_request, region_request, BulkInsertRequest, RegionRequest, RegionRequestHeader, bulk_insert_request, region_request, BulkInsertRequest, RegionRequest, RegionRequestHeader,
}; };
use api::v1::ArrowIpc; use api::v1::ArrowIpc;
use arrow::array::{
Array, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
};
use arrow::datatypes::{DataType, Int64Type, TimeUnit};
use arrow::record_batch::RecordBatch;
use common_base::AffectedRows; use common_base::AffectedRows;
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage}; use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
use common_grpc::FlightData; use common_grpc::FlightData;
use common_telemetry::error;
use common_telemetry::tracing_context::TracingContext; use common_telemetry::tracing_context::TracingContext;
use snafu::{OptionExt, ResultExt}; use snafu::ResultExt;
use store_api::storage::{RegionId, TableId}; use store_api::storage::RegionId;
use table::TableRef; use table::metadata::TableId;
use crate::insert::Inserter; use crate::insert::Inserter;
use crate::{error, metrics}; use crate::{error, metrics};
@@ -42,11 +32,10 @@ impl Inserter {
/// Handle bulk insert request. /// Handle bulk insert request.
pub async fn handle_bulk_insert( pub async fn handle_bulk_insert(
&self, &self,
table: TableRef, table_id: TableId,
decoder: &mut FlightDecoder, decoder: &mut FlightDecoder,
data: FlightData, data: FlightData,
) -> error::Result<AffectedRows> { ) -> error::Result<AffectedRows> {
let table_id = table.table_info().table_id();
let decode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED let decode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
.with_label_values(&["decode_request"]) .with_label_values(&["decode_request"])
.start_timer(); .start_timer();
@@ -59,20 +48,6 @@ impl Inserter {
return Ok(0); return Ok(0);
}; };
decode_timer.observe_duration(); decode_timer.observe_duration();
if let Some((min, max)) = compute_timestamp_range(
&record_batch,
&table
.table_info()
.meta
.schema
.timestamp_column()
.as_ref()
.unwrap()
.name,
)? {
// notify flownode to update dirty time windows.
self.update_flow_dirty_window(table_id, min, max);
}
metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64); metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64);
metrics::BULK_REQUEST_ROWS metrics::BULK_REQUEST_ROWS
.with_label_values(&["raw"]) .with_label_values(&["raw"])
@@ -241,88 +216,4 @@ impl Inserter {
crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(rows_inserted as u64); crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(rows_inserted as u64);
Ok(rows_inserted) Ok(rows_inserted)
} }
fn update_flow_dirty_window(&self, table_id: TableId, min: i64, max: i64) {
let table_flownode_set_cache = self.table_flownode_set_cache.clone();
let node_manager = self.node_manager.clone();
common_runtime::spawn_global(async move {
let result = table_flownode_set_cache
.get(table_id)
.await
.context(error::RequestInsertsSnafu);
let flownodes = match result {
Ok(flownodes) => flownodes.unwrap_or_default(),
Err(e) => {
error!(e; "Failed to get flownodes for table id: {}", table_id);
return;
}
};
let peers: HashSet<_> = flownodes.values().cloned().collect();
for peer in peers {
let node_manager = node_manager.clone();
common_runtime::spawn_global(async move {
if let Err(e) = node_manager
.flownode(&peer)
.await
.handle_mark_window_dirty(DirtyWindowRequest {
table_id,
dirty_time_ranges: vec![WindowRange {
start_value: min,
end_value: max,
}],
})
.await
.context(error::RequestInsertsSnafu)
{
error!(e; "Failed to mark time window as dirty, table: {}, min: {}, max: {}", table_id, min, max);
}
});
}
});
}
}
/// Calculate the timestamp range of record batch. Return `None` if record batch is empty.
fn compute_timestamp_range(
rb: &RecordBatch,
timestamp_index_name: &str,
) -> error::Result<Option<(i64, i64)>> {
let ts_col = rb
.column_by_name(timestamp_index_name)
.context(error::ColumnNotFoundSnafu {
msg: timestamp_index_name,
})?;
if rb.num_rows() == 0 {
return Ok(None);
}
let primitive = match ts_col.data_type() {
DataType::Timestamp(unit, _) => match unit {
TimeUnit::Second => ts_col
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
TimeUnit::Millisecond => ts_col
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
TimeUnit::Microsecond => ts_col
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
TimeUnit::Nanosecond => ts_col
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
},
t => {
return error::InvalidTimeIndexTypeSnafu { ty: t.clone() }.fail();
}
};
Ok(arrow::compute::min(&primitive).zip(arrow::compute::max(&primitive)))
} }

View File

@@ -837,13 +837,6 @@ pub enum Error {
#[snafu(implicit)] #[snafu(implicit)]
location: Location, location: Location,
}, },
#[snafu(display("Invalid time index type: {}", ty))]
InvalidTimeIndexType {
ty: arrow::datatypes::DataType,
#[snafu(implicit)]
location: Location,
},
} }
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
@@ -971,7 +964,6 @@ impl ErrorExt for Error {
Error::ColumnOptions { source, .. } => source.status_code(), Error::ColumnOptions { source, .. } => source.status_code(),
Error::DecodeFlightData { source, .. } => source.status_code(), Error::DecodeFlightData { source, .. } => source.status_code(),
Error::ComputeArrow { .. } => StatusCode::Internal, Error::ComputeArrow { .. } => StatusCode::Internal,
Error::InvalidTimeIndexType { .. } => StatusCode::InvalidArguments,
} }
} }

View File

@@ -78,7 +78,7 @@ pub struct Inserter {
catalog_manager: CatalogManagerRef, catalog_manager: CatalogManagerRef,
pub(crate) partition_manager: PartitionRuleManagerRef, pub(crate) partition_manager: PartitionRuleManagerRef,
pub(crate) node_manager: NodeManagerRef, pub(crate) node_manager: NodeManagerRef,
pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef, table_flownode_set_cache: TableFlownodeSetCacheRef,
} }
pub type InserterRef = Arc<Inserter>; pub type InserterRef = Arc<Inserter>;

View File

@@ -13,17 +13,20 @@
// limitations under the License. // limitations under the License.
use clap::Parser; use clap::Parser;
use cli::{BenchTableMetadataCommand, DataCommand, MetaCommand, Tool}; use cli::{
BenchTableMetadataCommand, ExportCommand, ImportCommand, MetaRestoreCommand,
MetaSnapshotCommand, Tool,
};
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
#[derive(Parser)] #[derive(Parser)]
pub enum SubCommand { pub enum SubCommand {
// Attach(AttachCommand), // Attach(AttachCommand),
Bench(BenchTableMetadataCommand), Bench(BenchTableMetadataCommand),
#[clap(subcommand)] Export(ExportCommand),
Data(DataCommand), Import(ImportCommand),
#[clap(subcommand)] MetaSnapshot(MetaSnapshotCommand),
Meta(MetaCommand), MetaRestore(MetaRestoreCommand),
} }
impl SubCommand { impl SubCommand {
@@ -31,8 +34,10 @@ impl SubCommand {
match self { match self {
// SubCommand::Attach(cmd) => cmd.build().await, // SubCommand::Attach(cmd) => cmd.build().await,
SubCommand::Bench(cmd) => cmd.build().await, SubCommand::Bench(cmd) => cmd.build().await,
SubCommand::Data(cmd) => cmd.build().await, SubCommand::Export(cmd) => cmd.build().await,
SubCommand::Meta(cmd) => cmd.build().await, SubCommand::Import(cmd) => cmd.build().await,
SubCommand::MetaSnapshot(cmd) => cmd.build().await,
SubCommand::MetaRestore(cmd) => cmd.build().await,
} }
} }
} }

View File

@@ -2444,7 +2444,7 @@ impl PromPlanner {
LogicalPlanBuilder::from(left) LogicalPlanBuilder::from(left)
.alias(left_table_ref) .alias(left_table_ref)
.context(DataFusionPlanningSnafu)? .context(DataFusionPlanningSnafu)?
.join_detailed( .join(
right, right,
JoinType::Inner, JoinType::Inner,
( (
@@ -2458,7 +2458,6 @@ impl PromPlanner {
.collect::<Vec<_>>(), .collect::<Vec<_>>(),
), ),
None, None,
true,
) )
.context(DataFusionPlanningSnafu)? .context(DataFusionPlanningSnafu)?
.build() .build()

View File

@@ -139,11 +139,11 @@ impl GrpcOptions {
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
pub enum FlightCompression { pub enum FlightCompression {
/// Disable all compression in Arrow Flight service. /// Disable all compression in Arrow Flight service.
#[default]
None, None,
/// Enable only transport layer compression (zstd). /// Enable only transport layer compression (zstd).
Transport, Transport,
/// Enable only payload compression (lz4) /// Enable only payload compression (lz4)
#[default]
ArrowIpc, ArrowIpc,
/// Enable all compression. /// Enable all compression.
All, All,

View File

@@ -40,7 +40,7 @@ use futures_util::StreamExt;
use session::context::{QueryContext, QueryContextBuilder, QueryContextRef}; use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
use session::hints::READ_PREFERENCE_HINT; use session::hints::READ_PREFERENCE_HINT;
use snafu::{OptionExt, ResultExt}; use snafu::{OptionExt, ResultExt};
use table::TableRef; use table::metadata::TableId;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use crate::error::Error::UnsupportedAuthScheme; use crate::error::Error::UnsupportedAuthScheme;
@@ -149,8 +149,8 @@ impl GreptimeRequestHandler {
.clone() .clone()
.unwrap_or_else(common_runtime::global_runtime); .unwrap_or_else(common_runtime::global_runtime);
runtime.spawn(async move { runtime.spawn(async move {
// Cached table ref // Cached table id
let mut table_ref: Option<TableRef> = None; let mut table_id: Option<TableId> = None;
let mut decoder = FlightDecoder::default(); let mut decoder = FlightDecoder::default();
while let Some(request) = stream.next().await { while let Some(request) = stream.next().await {
@@ -169,7 +169,7 @@ impl GreptimeRequestHandler {
let timer = metrics::GRPC_BULK_INSERT_ELAPSED.start_timer(); let timer = metrics::GRPC_BULK_INSERT_ELAPSED.start_timer();
let result = handler let result = handler
.put_record_batch(&table_name, &mut table_ref, &mut decoder, data) .put_record_batch(&table_name, &mut table_id, &mut decoder, data)
.await .await
.inspect_err(|e| error!(e; "Failed to handle flight record batches")); .inspect_err(|e| error!(e; "Failed to handle flight record batches"));
timer.observe_duration(); timer.observe_duration();

View File

@@ -23,8 +23,8 @@ use common_grpc::flight::FlightDecoder;
use common_query::Output; use common_query::Output;
use session::context::QueryContextRef; use session::context::QueryContextRef;
use snafu::ResultExt; use snafu::ResultExt;
use table::metadata::TableId;
use table::table_name::TableName; use table::table_name::TableName;
use table::TableRef;
use crate::error::{self, Result}; use crate::error::{self, Result};
@@ -45,8 +45,8 @@ pub trait GrpcQueryHandler {
async fn put_record_batch( async fn put_record_batch(
&self, &self,
table_name: &TableName, table: &TableName,
table_ref: &mut Option<TableRef>, table_id: &mut Option<TableId>,
decoder: &mut FlightDecoder, decoder: &mut FlightDecoder,
flight_data: FlightData, flight_data: FlightData,
) -> std::result::Result<AffectedRows, Self::Error>; ) -> std::result::Result<AffectedRows, Self::Error>;
@@ -77,13 +77,13 @@ where
async fn put_record_batch( async fn put_record_batch(
&self, &self,
table_name: &TableName, table: &TableName,
table_ref: &mut Option<TableRef>, table_id: &mut Option<TableId>,
decoder: &mut FlightDecoder, decoder: &mut FlightDecoder,
data: FlightData, data: FlightData,
) -> Result<AffectedRows> { ) -> Result<AffectedRows> {
self.0 self.0
.put_record_batch(table_name, table_ref, decoder, data) .put_record_batch(table, table_id, decoder, data)
.await .await
.map_err(BoxedError::new) .map_err(BoxedError::new)
.context(error::ExecuteGrpcRequestSnafu) .context(error::ExecuteGrpcRequestSnafu)

View File

@@ -34,6 +34,7 @@ use servers::query_handler::sql::{ServerSqlQueryHandlerRef, SqlQueryHandler};
use session::context::QueryContextRef; use session::context::QueryContextRef;
use snafu::ensure; use snafu::ensure;
use sql::statements::statement::Statement; use sql::statements::statement::Statement;
use table::metadata::TableId;
use table::table_name::TableName; use table::table_name::TableName;
use table::TableRef; use table::TableRef;
@@ -159,11 +160,15 @@ impl GrpcQueryHandler for DummyInstance {
async fn put_record_batch( async fn put_record_batch(
&self, &self,
_table_name: &TableName, table: &TableName,
_table_ref: &mut Option<TableRef>, table_id: &mut Option<TableId>,
_decoder: &mut FlightDecoder, decoder: &mut FlightDecoder,
_data: FlightData, data: FlightData,
) -> std::result::Result<AffectedRows, Self::Error> { ) -> std::result::Result<AffectedRows, Self::Error> {
let _ = table;
let _ = data;
let _ = table_id;
let _ = decoder;
unimplemented!() unimplemented!()
} }
} }

View File

@@ -675,16 +675,11 @@ insert into cache_miss_with_null_label values
Affected Rows: 4 Affected Rows: 4
-- SQLNESS SORT_RESULT 3 1 -- SQLNESS SORT_RESULT 3 1
-- null!=null, so it will returns the empty set.
tql eval (3, 4, '1s') cache_hit_with_null_label / (cache_miss_with_null_label + cache_hit_with_null_label); tql eval (3, 4, '1s') cache_hit_with_null_label / (cache_miss_with_null_label + cache_hit_with_null_label);
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+ ++
| job | null_label | ts | lhs.greptime_value / rhs.cache_miss_with_null_label.greptime_value + cache_hit_with_null_label.greptime_value | ++
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
| read | | 1970-01-01T00:00:03 | 0.5 |
| read | | 1970-01-01T00:00:04 | 0.75 |
| write | | 1970-01-01T00:00:03 | 0.5 |
| write | | 1970-01-01T00:00:04 | 0.6666666666666666 |
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
-- SQLNESS SORT_RESULT 3 1 -- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') cache_hit_with_null_label / ignoring(null_label) (cache_miss_with_null_label + ignoring(null_label) cache_hit_with_null_label); tql eval (3, 4, '1s') cache_hit_with_null_label / ignoring(null_label) (cache_miss_with_null_label + ignoring(null_label) cache_hit_with_null_label);

View File

@@ -325,6 +325,7 @@ insert into cache_miss_with_null_label values
(4000, "write", null, 2.0); (4000, "write", null, 2.0);
-- SQLNESS SORT_RESULT 3 1 -- SQLNESS SORT_RESULT 3 1
-- null!=null, so it will returns the empty set.
tql eval (3, 4, '1s') cache_hit_with_null_label / (cache_miss_with_null_label + cache_hit_with_null_label); tql eval (3, 4, '1s') cache_hit_with_null_label / (cache_miss_with_null_label + cache_hit_with_null_label);
-- SQLNESS SORT_RESULT 3 1 -- SQLNESS SORT_RESULT 3 1