Compare commits

..

3 Commits

Author SHA1 Message Date
discord9
0443042f14 feat: better metrics 2025-06-09 20:28:29 +08:00
discord9
1786190235 feat(exp): adjust_flow admin function 2025-06-09 17:02:42 +08:00
discord9
705b2007cf feat: flownode to frontend load balance with guess 2025-06-09 16:34:19 +08:00
113 changed files with 1010 additions and 2297 deletions

View File

@@ -59,7 +59,7 @@ runs:
--set base.podTemplate.main.resources.requests.cpu=50m \ --set base.podTemplate.main.resources.requests.cpu=50m \
--set base.podTemplate.main.resources.requests.memory=256Mi \ --set base.podTemplate.main.resources.requests.memory=256Mi \
--set base.podTemplate.main.resources.limits.cpu=2000m \ --set base.podTemplate.main.resources.limits.cpu=2000m \
--set base.podTemplate.main.resources.limits.memory=3Gi \ --set base.podTemplate.main.resources.limits.memory=2Gi \
--set frontend.replicas=${{ inputs.frontend-replicas }} \ --set frontend.replicas=${{ inputs.frontend-replicas }} \
--set datanode.replicas=${{ inputs.datanode-replicas }} \ --set datanode.replicas=${{ inputs.datanode-replicas }} \
--set meta.replicas=${{ inputs.meta-replicas }} \ --set meta.replicas=${{ inputs.meta-replicas }} \

View File

@@ -30,7 +30,7 @@ update_helm_charts_version() {
# Commit the changes. # Commit the changes.
git add . git add .
git commit -s -m "chore: Update GreptimeDB version to ${VERSION}" git commit -m "chore: Update GreptimeDB version to ${VERSION}"
git push origin $BRANCH_NAME git push origin $BRANCH_NAME
# Create a Pull Request. # Create a Pull Request.

View File

@@ -26,7 +26,7 @@ update_homebrew_greptime_version() {
# Commit the changes. # Commit the changes.
git add . git add .
git commit -s -m "chore: Update GreptimeDB version to ${VERSION}" git commit -m "chore: Update GreptimeDB version to ${VERSION}"
git push origin $BRANCH_NAME git push origin $BRANCH_NAME
# Create a Pull Request. # Create a Pull Request.

View File

@@ -250,11 +250,6 @@ jobs:
name: unstable-fuzz-logs name: unstable-fuzz-logs
path: /tmp/unstable-greptime/ path: /tmp/unstable-greptime/
retention-days: 3 retention-days: 3
- name: Describe pods
if: failure()
shell: bash
run: |
kubectl describe pod -n my-greptimedb
build-greptime-ci: build-greptime-ci:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }} if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
@@ -410,11 +405,6 @@ jobs:
shell: bash shell: bash
run: | run: |
kubectl describe nodes kubectl describe nodes
- name: Describe pod
if: failure()
shell: bash
run: |
kubectl describe pod -n my-greptimedb
- name: Export kind logs - name: Export kind logs
if: failure() if: failure()
shell: bash shell: bash
@@ -564,11 +554,6 @@ jobs:
shell: bash shell: bash
run: | run: |
kubectl describe nodes kubectl describe nodes
- name: Describe pods
if: failure()
shell: bash
run: |
kubectl describe pod -n my-greptimedb
- name: Export kind logs - name: Export kind logs
if: failure() if: failure()
shell: bash shell: bash

70
Cargo.lock generated
View File

@@ -3252,7 +3252,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion" name = "datafusion"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"arrow 54.2.1", "arrow 54.2.1",
"arrow-array 54.2.1", "arrow-array 54.2.1",
@@ -3303,7 +3303,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-catalog" name = "datafusion-catalog"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"arrow 54.2.1", "arrow 54.2.1",
"async-trait", "async-trait",
@@ -3323,7 +3323,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-catalog-listing" name = "datafusion-catalog-listing"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"arrow 54.2.1", "arrow 54.2.1",
"arrow-schema 54.3.1", "arrow-schema 54.3.1",
@@ -3346,7 +3346,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-common" name = "datafusion-common"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"ahash 0.8.11", "ahash 0.8.11",
"arrow 54.2.1", "arrow 54.2.1",
@@ -3371,7 +3371,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-common-runtime" name = "datafusion-common-runtime"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"log", "log",
"tokio", "tokio",
@@ -3380,12 +3380,12 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-doc" name = "datafusion-doc"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
[[package]] [[package]]
name = "datafusion-execution" name = "datafusion-execution"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"arrow 54.2.1", "arrow 54.2.1",
"dashmap", "dashmap",
@@ -3403,7 +3403,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-expr" name = "datafusion-expr"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"arrow 54.2.1", "arrow 54.2.1",
"chrono", "chrono",
@@ -3423,7 +3423,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-expr-common" name = "datafusion-expr-common"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"arrow 54.2.1", "arrow 54.2.1",
"datafusion-common", "datafusion-common",
@@ -3434,7 +3434,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-functions" name = "datafusion-functions"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"arrow 54.2.1", "arrow 54.2.1",
"arrow-buffer 54.3.1", "arrow-buffer 54.3.1",
@@ -3463,7 +3463,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-functions-aggregate" name = "datafusion-functions-aggregate"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"ahash 0.8.11", "ahash 0.8.11",
"arrow 54.2.1", "arrow 54.2.1",
@@ -3484,7 +3484,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-functions-aggregate-common" name = "datafusion-functions-aggregate-common"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"ahash 0.8.11", "ahash 0.8.11",
"arrow 54.2.1", "arrow 54.2.1",
@@ -3496,7 +3496,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-functions-nested" name = "datafusion-functions-nested"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"arrow 54.2.1", "arrow 54.2.1",
"arrow-array 54.2.1", "arrow-array 54.2.1",
@@ -3518,7 +3518,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-functions-table" name = "datafusion-functions-table"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"arrow 54.2.1", "arrow 54.2.1",
"async-trait", "async-trait",
@@ -3533,7 +3533,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-functions-window" name = "datafusion-functions-window"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"datafusion-common", "datafusion-common",
"datafusion-doc", "datafusion-doc",
@@ -3549,7 +3549,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-functions-window-common" name = "datafusion-functions-window-common"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"datafusion-common", "datafusion-common",
"datafusion-physical-expr-common", "datafusion-physical-expr-common",
@@ -3558,7 +3558,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-macros" name = "datafusion-macros"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"datafusion-expr", "datafusion-expr",
"quote", "quote",
@@ -3568,7 +3568,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-optimizer" name = "datafusion-optimizer"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"arrow 54.2.1", "arrow 54.2.1",
"chrono", "chrono",
@@ -3586,7 +3586,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-physical-expr" name = "datafusion-physical-expr"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"ahash 0.8.11", "ahash 0.8.11",
"arrow 54.2.1", "arrow 54.2.1",
@@ -3609,7 +3609,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-physical-expr-common" name = "datafusion-physical-expr-common"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"ahash 0.8.11", "ahash 0.8.11",
"arrow 54.2.1", "arrow 54.2.1",
@@ -3622,7 +3622,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-physical-optimizer" name = "datafusion-physical-optimizer"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"arrow 54.2.1", "arrow 54.2.1",
"arrow-schema 54.3.1", "arrow-schema 54.3.1",
@@ -3643,7 +3643,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-physical-plan" name = "datafusion-physical-plan"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"ahash 0.8.11", "ahash 0.8.11",
"arrow 54.2.1", "arrow 54.2.1",
@@ -3673,7 +3673,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-sql" name = "datafusion-sql"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"arrow 54.2.1", "arrow 54.2.1",
"arrow-array 54.2.1", "arrow-array 54.2.1",
@@ -3691,7 +3691,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-substrait" name = "datafusion-substrait"
version = "45.0.0" version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646" source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [ dependencies = [
"async-recursion", "async-recursion",
"async-trait", "async-trait",
@@ -4511,9 +4511,9 @@ dependencies = [
[[package]] [[package]]
name = "flate2" name = "flate2"
version = "1.1.2" version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a3d7db9596fecd151c5f638c0ee5d5bd487b6e0ea232e5dc96d5250f6f94b1d" checksum = "7ced92e76e966ca2fd84c8f7aa01a4aea65b0eb6648d72f7c8f3e2764a67fece"
dependencies = [ dependencies = [
"crc32fast", "crc32fast",
"libz-rs-sys", "libz-rs-sys",
@@ -5133,7 +5133,7 @@ dependencies = [
[[package]] [[package]]
name = "greptime-proto" name = "greptime-proto"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=52083925a15d741c259800a9a54eba3467939180#52083925a15d741c259800a9a54eba3467939180" source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=2dca1dc67862d7b410838aef81232274c019b3f6#2dca1dc67862d7b410838aef81232274c019b3f6"
dependencies = [ dependencies = [
"prost 0.13.5", "prost 0.13.5",
"serde", "serde",
@@ -5146,9 +5146,9 @@ dependencies = [
[[package]] [[package]]
name = "grok" name = "grok"
version = "2.1.0" version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c52724b609896f661a3f4641dd3a44dc602958ef615857c12d00756b4e9355b" checksum = "273797968160270573071022613fc4aa28b91fe68f3eef6c96a1b2a1947ddfbd"
dependencies = [ dependencies = [
"glob", "glob",
"onig", "onig",
@@ -6716,9 +6716,9 @@ dependencies = [
[[package]] [[package]]
name = "libz-rs-sys" name = "libz-rs-sys"
version = "0.5.1" version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "172a788537a2221661b480fee8dc5f96c580eb34fa88764d3205dc356c7e4221" checksum = "6489ca9bd760fe9642d7644e827b0c9add07df89857b0416ee15c1cc1a3b8c5a"
dependencies = [ dependencies = [
"zlib-rs", "zlib-rs",
] ]
@@ -9664,9 +9664,9 @@ dependencies = [
[[package]] [[package]]
name = "psl" name = "psl"
version = "2.1.119" version = "2.1.112"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0e49aa528239f2ca13ad87387977c208e59c3fb8c437609f95f1b3898ec6ef1" checksum = "1c6b4c497a0c6bfb466f75167c728b1a861b0cdc39de9c35b877208a270a9590"
dependencies = [ dependencies = [
"psl-types", "psl-types",
] ]
@@ -14701,9 +14701,9 @@ dependencies = [
[[package]] [[package]]
name = "zlib-rs" name = "zlib-rs"
version = "0.5.1" version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "626bd9fa9734751fc50d6060752170984d7053f5a39061f524cda68023d4db8a" checksum = "868b928d7949e09af2f6086dfc1e01936064cc7a819253bce650d4e2a2d63ba8"
[[package]] [[package]]
name = "zstd" name = "zstd"

View File

@@ -116,15 +116,15 @@ clap = { version = "4.4", features = ["derive"] }
config = "0.13.0" config = "0.13.0"
crossbeam-utils = "0.8" crossbeam-utils = "0.8"
dashmap = "6.1" dashmap = "6.1"
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" } datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" } datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" } datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" } datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" } datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" } datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" } datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" } datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" } datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
deadpool = "0.12" deadpool = "0.12"
deadpool-postgres = "0.14" deadpool-postgres = "0.14"
derive_builder = "0.20" derive_builder = "0.20"
@@ -133,7 +133,7 @@ etcd-client = "0.14"
fst = "0.4.7" fst = "0.4.7"
futures = "0.3" futures = "0.3"
futures-util = "0.3" futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "52083925a15d741c259800a9a54eba3467939180" } greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2dca1dc67862d7b410838aef81232274c019b3f6" }
hex = "0.4" hex = "0.4"
http = "1" http = "1"
humantime = "2.1" humantime = "2.1"

View File

@@ -195,13 +195,13 @@
| `slow_query.record_type` | String | Unset | The record type of slow queries. It can be `system_table` or `log`. | | `slow_query.record_type` | String | Unset | The record type of slow queries. It can be `system_table` or `log`. |
| `slow_query.threshold` | String | Unset | The threshold of slow query. | | `slow_query.threshold` | String | Unset | The threshold of slow query. |
| `slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. | | `slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
| `export_metrics` | -- | -- | The standalone can export its metrics and send to Prometheus compatible service (e.g. `greptimedb`) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. | | `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. | | `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. | | `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommended to collect metrics generated by itself<br/>You must create the database before enabling it. | | `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommended to collect metrics generated by itself<br/>You must create the database before enabling it. |
| `export_metrics.self_import.db` | String | Unset | -- | | `export_metrics.self_import.db` | String | Unset | -- |
| `export_metrics.remote_write` | -- | -- | -- | | `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. | | `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. | | `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. | | `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. | | `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
@@ -232,7 +232,6 @@
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. | | `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
| `grpc.server_addr` | String | `127.0.0.1:4001` | The address advertised to the metasrv, and used for connections from outside the host.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `grpc.bind_addr`. | | `grpc.server_addr` | String | `127.0.0.1:4001` | The address advertised to the metasrv, and used for connections from outside the host.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `grpc.bind_addr`. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. | | `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for frontend side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression. |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. | | `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
| `grpc.tls.mode` | String | `disable` | TLS mode. | | `grpc.tls.mode` | String | `disable` | TLS mode. |
| `grpc.tls.cert_path` | String | Unset | Certificate file path. | | `grpc.tls.cert_path` | String | Unset | Certificate file path. |
@@ -299,11 +298,13 @@
| `slow_query.threshold` | String | `30s` | The threshold of slow query. It can be human readable time string, for example: `10s`, `100ms`, `1s`. | | `slow_query.threshold` | String | `30s` | The threshold of slow query. It can be human readable time string, for example: `10s`, `100ms`, `1s`. |
| `slow_query.sample_ratio` | Float | `1.0` | The sampling ratio of slow query log. The value should be in the range of (0, 1]. For example, `0.1` means 10% of the slow queries will be logged and `1.0` means all slow queries will be logged. | | `slow_query.sample_ratio` | Float | `1.0` | The sampling ratio of slow query log. The value should be in the range of (0, 1]. For example, `0.1` means 10% of the slow queries will be logged and `1.0` means all slow queries will be logged. |
| `slow_query.ttl` | String | `30d` | The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`. | | `slow_query.ttl` | String | `30d` | The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`. |
| `export_metrics` | -- | -- | The frontend can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. | | `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. | | `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. | | `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
| `export_metrics.self_import.db` | String | Unset | -- |
| `export_metrics.remote_write` | -- | -- | -- | | `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. | | `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. | | `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. | | `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. | | `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
@@ -314,9 +315,11 @@
| Key | Type | Default | Descriptions | | Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- | | --- | -----| ------- | ----------- |
| `data_home` | String | `./greptimedb_data` | The working home directory. | | `data_home` | String | `./greptimedb_data` | The working home directory. |
| `bind_addr` | String | `127.0.0.1:3002` | The bind address of metasrv. |
| `server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `bind_addr`. |
| `store_addrs` | Array | -- | Store server address default to etcd store.<br/>For postgres store, the format is:<br/>"password=password dbname=postgres user=postgres host=localhost port=5432"<br/>For etcd store, the format is:<br/>"127.0.0.1:2379" | | `store_addrs` | Array | -- | Store server address default to etcd store.<br/>For postgres store, the format is:<br/>"password=password dbname=postgres user=postgres host=localhost port=5432"<br/>For etcd store, the format is:<br/>"127.0.0.1:2379" |
| `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. | | `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. |
| `backend` | String | `etcd_store` | The datastore for meta server.<br/>Available values:<br/>- `etcd_store` (default value)<br/>- `memory_store`<br/>- `postgres_store`<br/>- `mysql_store` | | `backend` | String | `etcd_store` | The datastore for meta server.<br/>Available values:<br/>- `etcd_store` (default value)<br/>- `memory_store`<br/>- `postgres_store` |
| `meta_table_name` | String | `greptime_metakv` | Table name in RDS to store metadata. Effect when using a RDS kvbackend.<br/>**Only used when backend is `postgres_store`.** | | `meta_table_name` | String | `greptime_metakv` | Table name in RDS to store metadata. Effect when using a RDS kvbackend.<br/>**Only used when backend is `postgres_store`.** |
| `meta_election_lock_id` | Integer | `1` | Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend<br/>Only used when backend is `postgres_store`. | | `meta_election_lock_id` | Integer | `1` | Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend<br/>Only used when backend is `postgres_store`. |
| `selector` | String | `round_robin` | Datanode selector type.<br/>- `round_robin` (default value)<br/>- `lease_based`<br/>- `load_based`<br/>For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". | | `selector` | String | `round_robin` | Datanode selector type.<br/>- `round_robin` (default value)<br/>- `lease_based`<br/>- `load_based`<br/>For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". |
@@ -328,12 +331,6 @@
| `runtime` | -- | -- | The runtime options. | | `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | | `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. | | `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:3002` | The address to bind the gRPC server. |
| `grpc.server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `bind_addr`. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
| `http` | -- | -- | The HTTP server options. | | `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. | | `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. | | `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
@@ -375,11 +372,13 @@
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. | | `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 | | `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- | | `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `export_metrics` | -- | -- | The metasrv can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. | | `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. | | `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. | | `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
| `export_metrics.self_import.db` | String | Unset | -- |
| `export_metrics.remote_write` | -- | -- | -- | | `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. | | `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. | | `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. | | `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. | | `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
@@ -405,7 +404,6 @@
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. | | `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. | | `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. | | `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for datanode side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression. |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. | | `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
| `grpc.tls.mode` | String | `disable` | TLS mode. | | `grpc.tls.mode` | String | `disable` | TLS mode. |
| `grpc.tls.cert_path` | String | Unset | Certificate file path. | | `grpc.tls.cert_path` | String | Unset | Certificate file path. |
@@ -538,11 +536,13 @@
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. | | `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 | | `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- | | `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. | | `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. | | `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. | | `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
| `export_metrics.self_import.db` | String | Unset | -- |
| `export_metrics.remote_write` | -- | -- | -- | | `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. | | `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. | | `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. | | `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. | | `tracing.tokio_console_addr` | String | Unset | The tokio console address. |

View File

@@ -44,13 +44,6 @@ runtime_size = 8
max_recv_message_size = "512MB" max_recv_message_size = "512MB"
## The maximum send message size for gRPC server. ## The maximum send message size for gRPC server.
max_send_message_size = "512MB" max_send_message_size = "512MB"
## Compression mode for datanode side Arrow IPC service. Available options:
## - `none`: disable all compression
## - `transport`: only enable gRPC transport compression (zstd)
## - `arrow_ipc`: only enable Arrow IPC compression (lz4)
## - `all`: enable all compression.
## Default to `none`
flight_compression = "arrow_ipc"
## gRPC server TLS options, see `mysql.tls` section. ## gRPC server TLS options, see `mysql.tls` section.
[grpc.tls] [grpc.tls]
@@ -642,16 +635,24 @@ max_log_files = 720
[logging.tracing_sample_ratio] [logging.tracing_sample_ratio]
default_ratio = 1.0 default_ratio = 1.0
## The datanode can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API. ## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. ## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
[export_metrics] [export_metrics]
## whether enable export metrics. ## whether enable export metrics.
enable = false enable = false
## The interval of export metrics. ## The interval of export metrics.
write_interval = "30s" write_interval = "30s"
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
## You must create the database before enabling it.
[export_metrics.self_import]
## @toml2docs:none-default
db = "greptime_metrics"
[export_metrics.remote_write] [export_metrics.remote_write]
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. ## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
url = "" url = ""
## HTTP headers of Prometheus remote-write carry. ## HTTP headers of Prometheus remote-write carry.

View File

@@ -54,13 +54,6 @@ bind_addr = "127.0.0.1:4001"
server_addr = "127.0.0.1:4001" server_addr = "127.0.0.1:4001"
## The number of server worker threads. ## The number of server worker threads.
runtime_size = 8 runtime_size = 8
## Compression mode for frontend side Arrow IPC service. Available options:
## - `none`: disable all compression
## - `transport`: only enable gRPC transport compression (zstd)
## - `arrow_ipc`: only enable Arrow IPC compression (lz4)
## - `all`: enable all compression.
## Default to `none`
flight_compression = "arrow_ipc"
## gRPC server TLS options, see `mysql.tls` section. ## gRPC server TLS options, see `mysql.tls` section.
[grpc.tls] [grpc.tls]
@@ -254,16 +247,24 @@ sample_ratio = 1.0
## The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`. ## The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`.
ttl = "30d" ttl = "30d"
## The frontend can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API. ## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. ## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
[export_metrics] [export_metrics]
## whether enable export metrics. ## whether enable export metrics.
enable = false enable = false
## The interval of export metrics. ## The interval of export metrics.
write_interval = "30s" write_interval = "30s"
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
## You must create the database before enabling it.
[export_metrics.self_import]
## @toml2docs:none-default
db = "greptime_metrics"
[export_metrics.remote_write] [export_metrics.remote_write]
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. ## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
url = "" url = ""
## HTTP headers of Prometheus remote-write carry. ## HTTP headers of Prometheus remote-write carry.

View File

@@ -1,6 +1,14 @@
## The working home directory. ## The working home directory.
data_home = "./greptimedb_data" data_home = "./greptimedb_data"
## The bind address of metasrv.
bind_addr = "127.0.0.1:3002"
## The communication server address for the frontend and datanode to connect to metasrv.
## If left empty or unset, the server will automatically use the IP address of the first network interface
## on the host, with the same port number as the one specified in `bind_addr`.
server_addr = "127.0.0.1:3002"
## Store server address default to etcd store. ## Store server address default to etcd store.
## For postgres store, the format is: ## For postgres store, the format is:
## "password=password dbname=postgres user=postgres host=localhost port=5432" ## "password=password dbname=postgres user=postgres host=localhost port=5432"
@@ -16,7 +24,6 @@ store_key_prefix = ""
## - `etcd_store` (default value) ## - `etcd_store` (default value)
## - `memory_store` ## - `memory_store`
## - `postgres_store` ## - `postgres_store`
## - `mysql_store`
backend = "etcd_store" backend = "etcd_store"
## Table name in RDS to store metadata. Effect when using a RDS kvbackend. ## Table name in RDS to store metadata. Effect when using a RDS kvbackend.
@@ -60,21 +67,6 @@ node_max_idle_time = "24hours"
## The number of threads to execute the runtime for global write operations. ## The number of threads to execute the runtime for global write operations.
#+ compact_rt_size = 4 #+ compact_rt_size = 4
## The gRPC server options.
[grpc]
## The address to bind the gRPC server.
bind_addr = "127.0.0.1:3002"
## The communication server address for the frontend and datanode to connect to metasrv.
## If left empty or unset, the server will automatically use the IP address of the first network interface
## on the host, with the same port number as the one specified in `bind_addr`.
server_addr = "127.0.0.1:3002"
## The number of server worker threads.
runtime_size = 8
## The maximum receive message size for gRPC server.
max_recv_message_size = "512MB"
## The maximum send message size for gRPC server.
max_send_message_size = "512MB"
## The HTTP server options. ## The HTTP server options.
[http] [http]
## The address to bind the HTTP server. ## The address to bind the HTTP server.
@@ -237,16 +229,24 @@ max_log_files = 720
[logging.tracing_sample_ratio] [logging.tracing_sample_ratio]
default_ratio = 1.0 default_ratio = 1.0
## The metasrv can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API. ## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. ## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
[export_metrics] [export_metrics]
## whether enable export metrics. ## whether enable export metrics.
enable = false enable = false
## The interval of export metrics. ## The interval of export metrics.
write_interval = "30s" write_interval = "30s"
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
## You must create the database before enabling it.
[export_metrics.self_import]
## @toml2docs:none-default
db = "greptime_metrics"
[export_metrics.remote_write] [export_metrics.remote_write]
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. ## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
url = "" url = ""
## HTTP headers of Prometheus remote-write carry. ## HTTP headers of Prometheus remote-write carry.

View File

@@ -750,11 +750,13 @@ default_ratio = 1.0
## @toml2docs:none-default ## @toml2docs:none-default
#+ sample_ratio = 1.0 #+ sample_ratio = 1.0
## The standalone can export its metrics and send to Prometheus compatible service (e.g. `greptimedb`) from remote-write API. ## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. ## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
[export_metrics] [export_metrics]
## whether enable export metrics. ## whether enable export metrics.
enable = false enable = false
## The interval of export metrics. ## The interval of export metrics.
write_interval = "30s" write_interval = "30s"
@@ -765,7 +767,7 @@ write_interval = "30s"
db = "greptime_metrics" db = "greptime_metrics"
[export_metrics.remote_write] [export_metrics.remote_write]
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. ## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
url = "" url = ""
## HTTP headers of Prometheus remote-write carry. ## HTTP headers of Prometheus remote-write carry.

View File

@@ -58,7 +58,6 @@ where
info!("{desc}, average operation cost: {cost:.2} ms"); info!("{desc}, average operation cost: {cost:.2} ms");
} }
/// Command to benchmark table metadata operations.
#[derive(Debug, Default, Parser)] #[derive(Debug, Default, Parser)]
pub struct BenchTableMetadataCommand { pub struct BenchTableMetadataCommand {
#[clap(long)] #[clap(long)]

View File

@@ -244,18 +244,6 @@ pub enum Error {
#[snafu(implicit)] #[snafu(implicit)]
location: Location, location: Location,
}, },
#[snafu(display("Unsupported memory backend"))]
UnsupportedMemoryBackend {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("File path invalid: {}", msg))]
InvalidFilePath {
msg: String,
#[snafu(implicit)]
location: Location,
},
} }
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
@@ -274,8 +262,6 @@ impl ErrorExt for Error {
| Error::ConnectEtcd { .. } | Error::ConnectEtcd { .. }
| Error::CreateDir { .. } | Error::CreateDir { .. }
| Error::EmptyResult { .. } | Error::EmptyResult { .. }
| Error::InvalidFilePath { .. }
| Error::UnsupportedMemoryBackend { .. }
| Error::ParseProxyOpts { .. } => StatusCode::InvalidArguments, | Error::ParseProxyOpts { .. } => StatusCode::InvalidArguments,
Error::StartProcedureManager { source, .. } Error::StartProcedureManager { source, .. }

View File

@@ -50,7 +50,6 @@ enum ExportTarget {
All, All,
} }
/// Command for exporting data from the GreptimeDB.
#[derive(Debug, Default, Parser)] #[derive(Debug, Default, Parser)]
pub struct ExportCommand { pub struct ExportCommand {
/// Server address to connect /// Server address to connect

View File

@@ -40,7 +40,6 @@ enum ImportTarget {
All, All,
} }
/// Command to import data from a directory into a GreptimeDB instance.
#[derive(Debug, Default, Parser)] #[derive(Debug, Default, Parser)]
pub struct ImportCommand { pub struct ImportCommand {
/// Server address to connect /// Server address to connect

View File

@@ -20,7 +20,7 @@ mod import;
mod meta_snapshot; mod meta_snapshot;
use async_trait::async_trait; use async_trait::async_trait;
use clap::{Parser, Subcommand}; use clap::Parser;
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
pub use database::DatabaseClient; pub use database::DatabaseClient;
use error::Result; use error::Result;
@@ -28,7 +28,7 @@ use error::Result;
pub use crate::bench::BenchTableMetadataCommand; pub use crate::bench::BenchTableMetadataCommand;
pub use crate::export::ExportCommand; pub use crate::export::ExportCommand;
pub use crate::import::ImportCommand; pub use crate::import::ImportCommand;
pub use crate::meta_snapshot::{MetaCommand, MetaInfoCommand, MetaRestoreCommand, MetaSaveCommand}; pub use crate::meta_snapshot::{MetaRestoreCommand, MetaSnapshotCommand};
#[async_trait] #[async_trait]
pub trait Tool: Send + Sync { pub trait Tool: Send + Sync {
@@ -51,19 +51,3 @@ impl AttachCommand {
unimplemented!("Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373") unimplemented!("Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373")
} }
} }
/// Subcommand for data operations like export and import.
#[derive(Subcommand)]
pub enum DataCommand {
Export(ExportCommand),
Import(ImportCommand),
}
impl DataCommand {
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
match self {
DataCommand::Export(cmd) => cmd.build().await,
DataCommand::Import(cmd) => cmd.build().await,
}
}
}

View File

@@ -12,11 +12,10 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use clap::{Parser, Subcommand}; use clap::Parser;
use common_base::secrets::{ExposeSecret, SecretString}; use common_base::secrets::{ExposeSecret, SecretString};
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_meta::kv_backend::chroot::ChrootKvBackend; use common_meta::kv_backend::chroot::ChrootKvBackend;
@@ -27,50 +26,10 @@ use meta_srv::bootstrap::create_etcd_client;
use meta_srv::metasrv::BackendImpl; use meta_srv::metasrv::BackendImpl;
use object_store::services::{Fs, S3}; use object_store::services::{Fs, S3};
use object_store::ObjectStore; use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt}; use snafu::ResultExt;
use crate::error::{ use crate::error::{KvBackendNotSetSnafu, OpenDalSnafu, S3ConfigNotSetSnafu};
InvalidFilePathSnafu, KvBackendNotSetSnafu, OpenDalSnafu, S3ConfigNotSetSnafu,
UnsupportedMemoryBackendSnafu,
};
use crate::Tool; use crate::Tool;
/// Subcommand for metadata snapshot management.
#[derive(Subcommand)]
pub enum MetaCommand {
#[clap(subcommand)]
Snapshot(MetaSnapshotCommand),
}
impl MetaCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
match self {
MetaCommand::Snapshot(cmd) => cmd.build().await,
}
}
}
/// Subcommand for metadata snapshot operations. such as save, restore and info.
#[derive(Subcommand)]
pub enum MetaSnapshotCommand {
/// Export metadata snapshot tool.
Save(MetaSaveCommand),
/// Restore metadata snapshot tool.
Restore(MetaRestoreCommand),
/// Explore metadata from metadata snapshot.
Info(MetaInfoCommand),
}
impl MetaSnapshotCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
match self {
MetaSnapshotCommand::Save(cmd) => cmd.build().await,
MetaSnapshotCommand::Restore(cmd) => cmd.build().await,
MetaSnapshotCommand::Info(cmd) => cmd.build().await,
}
}
}
#[derive(Debug, Default, Parser)] #[derive(Debug, Default, Parser)]
struct MetaConnection { struct MetaConnection {
/// The endpoint of store. one of etcd, pg or mysql. /// The endpoint of store. one of etcd, pg or mysql.
@@ -132,9 +91,6 @@ impl MetaConnection {
.await .await
.map_err(BoxedError::new)?) .map_err(BoxedError::new)?)
} }
Some(BackendImpl::MemoryStore) => UnsupportedMemoryBackendSnafu
.fail()
.map_err(BoxedError::new),
_ => KvBackendNotSetSnafu { backend: "all" } _ => KvBackendNotSetSnafu { backend: "all" }
.fail() .fail()
.map_err(BoxedError::new), .map_err(BoxedError::new),
@@ -214,7 +170,7 @@ impl S3Config {
/// It will dump the metadata snapshot to local file or s3 bucket. /// It will dump the metadata snapshot to local file or s3 bucket.
/// The snapshot file will be in binary format. /// The snapshot file will be in binary format.
#[derive(Debug, Default, Parser)] #[derive(Debug, Default, Parser)]
pub struct MetaSaveCommand { pub struct MetaSnapshotCommand {
/// The connection to the metadata store. /// The connection to the metadata store.
#[clap(flatten)] #[clap(flatten)]
connection: MetaConnection, connection: MetaConnection,
@@ -240,7 +196,7 @@ fn create_local_file_object_store(root: &str) -> Result<ObjectStore, BoxedError>
Ok(object_store) Ok(object_store)
} }
impl MetaSaveCommand { impl MetaSnapshotCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> { pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
let kvbackend = self.connection.build().await?; let kvbackend = self.connection.build().await?;
let output_dir = &self.output_dir; let output_dir = &self.output_dir;
@@ -371,89 +327,3 @@ impl Tool for MetaRestoreTool {
} }
} }
} }
/// Explore metadata from metadata snapshot.
#[derive(Debug, Default, Parser)]
pub struct MetaInfoCommand {
/// The s3 config.
#[clap(flatten)]
s3_config: S3Config,
/// The name of the target snapshot file. we will add the file extension automatically.
#[clap(long, default_value = "metadata_snapshot")]
file_name: String,
/// The query string to filter the metadata.
#[clap(long, default_value = "*")]
inspect_key: String,
/// The limit of the metadata to query.
#[clap(long)]
limit: Option<usize>,
}
pub struct MetaInfoTool {
inner: ObjectStore,
source_file: String,
inspect_key: String,
limit: Option<usize>,
}
#[async_trait]
impl Tool for MetaInfoTool {
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
let result = MetadataSnapshotManager::info(
&self.inner,
&self.source_file,
&self.inspect_key,
self.limit,
)
.await
.map_err(BoxedError::new)?;
for item in result {
println!("{}", item);
}
Ok(())
}
}
impl MetaInfoCommand {
fn decide_object_store_root_for_local_store(
file_path: &str,
) -> Result<(&str, &str), BoxedError> {
let path = Path::new(file_path);
let parent = path
.parent()
.and_then(|p| p.to_str())
.context(InvalidFilePathSnafu { msg: file_path })
.map_err(BoxedError::new)?;
let file_name = path
.file_name()
.and_then(|f| f.to_str())
.context(InvalidFilePathSnafu { msg: file_path })
.map_err(BoxedError::new)?;
let root = if parent.is_empty() { "." } else { parent };
Ok((root, file_name))
}
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
let object_store = self.s3_config.build("").map_err(BoxedError::new)?;
if let Some(store) = object_store {
let tool = MetaInfoTool {
inner: store,
source_file: self.file_name.clone(),
inspect_key: self.inspect_key.clone(),
limit: self.limit,
};
Ok(Box::new(tool))
} else {
let (root, file_name) =
Self::decide_object_store_root_for_local_store(&self.file_name)?;
let object_store = create_local_file_object_store(root)?;
let tool = MetaInfoTool {
inner: object_store,
source_file: file_name.to_string(),
inspect_key: self.inspect_key.clone(),
limit: self.limit,
};
Ok(Box::new(tool))
}
}
}

View File

@@ -162,23 +162,12 @@ impl Client {
.as_bytes() as usize .as_bytes() as usize
} }
pub fn make_flight_client( pub fn make_flight_client(&self) -> Result<FlightClient> {
&self,
send_compression: bool,
accept_compression: bool,
) -> Result<FlightClient> {
let (addr, channel) = self.find_channel()?; let (addr, channel) = self.find_channel()?;
let mut client = FlightServiceClient::new(channel) let client = FlightServiceClient::new(channel)
.max_decoding_message_size(self.max_grpc_recv_message_size()) .max_decoding_message_size(self.max_grpc_recv_message_size())
.max_encoding_message_size(self.max_grpc_send_message_size()); .max_encoding_message_size(self.max_grpc_send_message_size());
// todo(hl): support compression methods.
if send_compression {
client = client.send_compressed(CompressionEncoding::Zstd);
}
if accept_compression {
client = client.accept_compressed(CompressionEncoding::Zstd);
}
Ok(FlightClient { addr, client }) Ok(FlightClient { addr, client })
} }

View File

@@ -49,16 +49,7 @@ impl NodeManager for NodeClients {
async fn datanode(&self, datanode: &Peer) -> DatanodeRef { async fn datanode(&self, datanode: &Peer) -> DatanodeRef {
let client = self.get_client(datanode).await; let client = self.get_client(datanode).await;
let ChannelConfig { Arc::new(RegionRequester::new(client))
send_compression,
accept_compression,
..
} = self.channel_manager.config();
Arc::new(RegionRequester::new(
client,
*send_compression,
*accept_compression,
))
} }
async fn flownode(&self, flownode: &Peer) -> FlownodeRef { async fn flownode(&self, flownode: &Peer) -> FlownodeRef {

View File

@@ -287,7 +287,7 @@ impl Database {
let mut request = tonic::Request::new(request); let mut request = tonic::Request::new(request);
Self::put_hints(request.metadata_mut(), hints)?; Self::put_hints(request.metadata_mut(), hints)?;
let mut client = self.client.make_flight_client(false, false)?; let mut client = self.client.make_flight_client()?;
let response = client.mut_inner().do_get(request).await.or_else(|e| { let response = client.mut_inner().do_get(request).await.or_else(|e| {
let tonic_code = e.code(); let tonic_code = e.code();
@@ -409,7 +409,7 @@ impl Database {
MetadataValue::from_str(db_to_put).context(InvalidTonicMetadataValueSnafu)?, MetadataValue::from_str(db_to_put).context(InvalidTonicMetadataValueSnafu)?,
); );
let mut client = self.client.make_flight_client(false, false)?; let mut client = self.client.make_flight_client()?;
let response = client.mut_inner().do_put(request).await?; let response = client.mut_inner().do_put(request).await?;
let response = response let response = response
.into_inner() .into_inner()

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use api::v1::flow::{DirtyWindowRequest, DirtyWindowRequests, FlowRequest, FlowResponse}; use api::v1::flow::{FlowRequest, FlowResponse};
use api::v1::region::InsertRequests; use api::v1::region::InsertRequests;
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_meta::node_manager::Flownode; use common_meta::node_manager::Flownode;
@@ -44,16 +44,6 @@ impl Flownode for FlowRequester {
.map_err(BoxedError::new) .map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu) .context(common_meta::error::ExternalSnafu)
} }
async fn handle_mark_window_dirty(
&self,
req: DirtyWindowRequest,
) -> common_meta::error::Result<FlowResponse> {
self.handle_mark_window_dirty(req)
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
}
} }
impl FlowRequester { impl FlowRequester {
@@ -101,20 +91,4 @@ impl FlowRequester {
.into_inner(); .into_inner();
Ok(response) Ok(response)
} }
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
let (addr, mut client) = self.client.raw_flow_client()?;
let response = client
.handle_mark_dirty_time_window(DirtyWindowRequests {
requests: vec![req],
})
.await
.or_else(|e| {
let code = e.code();
let err: crate::error::Error = e.into();
Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code })
})?
.into_inner();
Ok(response)
}
} }

View File

@@ -46,8 +46,6 @@ use crate::{metrics, Client, Error};
#[derive(Debug)] #[derive(Debug)]
pub struct RegionRequester { pub struct RegionRequester {
client: Client, client: Client,
send_compression: bool,
accept_compression: bool,
} }
#[async_trait] #[async_trait]
@@ -91,18 +89,12 @@ impl Datanode for RegionRequester {
} }
impl RegionRequester { impl RegionRequester {
pub fn new(client: Client, send_compression: bool, accept_compression: bool) -> Self { pub fn new(client: Client) -> Self {
Self { Self { client }
client,
send_compression,
accept_compression,
}
} }
pub async fn do_get_inner(&self, ticket: Ticket) -> Result<SendableRecordBatchStream> { pub async fn do_get_inner(&self, ticket: Ticket) -> Result<SendableRecordBatchStream> {
let mut flight_client = self let mut flight_client = self.client.make_flight_client()?;
.client
.make_flight_client(self.send_compression, self.accept_compression)?;
let response = flight_client let response = flight_client
.mut_inner() .mut_inner()
.do_get(ticket) .do_get(ticket)

View File

@@ -146,7 +146,6 @@ mod tests {
let output_dir = tempfile::tempdir().unwrap(); let output_dir = tempfile::tempdir().unwrap();
let cli = cli::Command::parse_from([ let cli = cli::Command::parse_from([
"cli", "cli",
"data",
"export", "export",
"--addr", "--addr",
"127.0.0.1:4000", "127.0.0.1:4000",

View File

@@ -364,16 +364,12 @@ impl StartCommand {
// frontend to datanode need not timeout. // frontend to datanode need not timeout.
// Some queries are expected to take long time. // Some queries are expected to take long time.
let mut channel_config = ChannelConfig { let channel_config = ChannelConfig {
timeout: None, timeout: None,
tcp_nodelay: opts.datanode.client.tcp_nodelay, tcp_nodelay: opts.datanode.client.tcp_nodelay,
connect_timeout: Some(opts.datanode.client.connect_timeout), connect_timeout: Some(opts.datanode.client.connect_timeout),
..Default::default() ..Default::default()
}; };
if opts.grpc.flight_compression.transport_compression() {
channel_config.accept_compression = true;
channel_config.send_compression = true;
}
let client = NodeClients::new(channel_config); let client = NodeClients::new(channel_config);
let instance = FrontendBuilder::new( let instance = FrontendBuilder::new(

View File

@@ -237,20 +237,12 @@ impl StartCommand {
tokio_console_addr: global_options.tokio_console_addr.clone(), tokio_console_addr: global_options.tokio_console_addr.clone(),
}; };
#[allow(deprecated)]
if let Some(addr) = &self.rpc_bind_addr { if let Some(addr) = &self.rpc_bind_addr {
opts.bind_addr.clone_from(addr); opts.bind_addr.clone_from(addr);
opts.grpc.bind_addr.clone_from(addr);
} else if !opts.bind_addr.is_empty() {
opts.grpc.bind_addr.clone_from(&opts.bind_addr);
} }
#[allow(deprecated)]
if let Some(addr) = &self.rpc_server_addr { if let Some(addr) = &self.rpc_server_addr {
opts.server_addr.clone_from(addr); opts.server_addr.clone_from(addr);
opts.grpc.server_addr.clone_from(addr);
} else if !opts.server_addr.is_empty() {
opts.grpc.server_addr.clone_from(&opts.server_addr);
} }
if let Some(addrs) = &self.store_addrs { if let Some(addrs) = &self.store_addrs {
@@ -327,7 +319,7 @@ impl StartCommand {
let plugin_opts = opts.plugins; let plugin_opts = opts.plugins;
let mut opts = opts.component; let mut opts = opts.component;
opts.grpc.detect_server_addr(); opts.detect_server_addr();
info!("Metasrv options: {:#?}", opts); info!("Metasrv options: {:#?}", opts);
@@ -371,7 +363,7 @@ mod tests {
}; };
let options = cmd.load_options(&Default::default()).unwrap().component; let options = cmd.load_options(&Default::default()).unwrap().component;
assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr); assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
assert_eq!(vec!["127.0.0.1:2380".to_string()], options.store_addrs); assert_eq!(vec!["127.0.0.1:2380".to_string()], options.store_addrs);
assert_eq!(SelectorType::LoadBased, options.selector); assert_eq!(SelectorType::LoadBased, options.selector);
} }
@@ -404,8 +396,8 @@ mod tests {
}; };
let options = cmd.load_options(&Default::default()).unwrap().component; let options = cmd.load_options(&Default::default()).unwrap().component;
assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr); assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
assert_eq!("127.0.0.1:3002".to_string(), options.grpc.server_addr); assert_eq!("127.0.0.1:3002".to_string(), options.server_addr);
assert_eq!(vec!["127.0.0.1:2379".to_string()], options.store_addrs); assert_eq!(vec!["127.0.0.1:2379".to_string()], options.store_addrs);
assert_eq!(SelectorType::LeaseBased, options.selector); assert_eq!(SelectorType::LeaseBased, options.selector);
assert_eq!("debug", options.logging.level.as_ref().unwrap()); assert_eq!("debug", options.logging.level.as_ref().unwrap());
@@ -517,10 +509,10 @@ mod tests {
let opts = command.load_options(&Default::default()).unwrap().component; let opts = command.load_options(&Default::default()).unwrap().component;
// Should be read from env, env > default values. // Should be read from env, env > default values.
assert_eq!(opts.grpc.bind_addr, "127.0.0.1:14002"); assert_eq!(opts.bind_addr, "127.0.0.1:14002");
// Should be read from config file, config file > env > default values. // Should be read from config file, config file > env > default values.
assert_eq!(opts.grpc.server_addr, "127.0.0.1:3002"); assert_eq!(opts.server_addr, "127.0.0.1:3002");
// Should be read from cli, cli > config file > env > default values. // Should be read from cli, cli > config file > env > default values.
assert_eq!(opts.http.addr, "127.0.0.1:14000"); assert_eq!(opts.http.addr, "127.0.0.1:14000");

View File

@@ -95,7 +95,7 @@ fn test_load_datanode_example_config() {
..Default::default() ..Default::default()
}, },
export_metrics: ExportMetricsOption { export_metrics: ExportMetricsOption {
self_import: None, self_import: Some(Default::default()),
remote_write: Some(Default::default()), remote_write: Some(Default::default()),
..Default::default() ..Default::default()
}, },
@@ -148,7 +148,7 @@ fn test_load_frontend_example_config() {
}, },
}, },
export_metrics: ExportMetricsOption { export_metrics: ExportMetricsOption {
self_import: None, self_import: Some(Default::default()),
remote_write: Some(Default::default()), remote_write: Some(Default::default()),
..Default::default() ..Default::default()
}, },
@@ -176,11 +176,7 @@ fn test_load_metasrv_example_config() {
component: MetasrvOptions { component: MetasrvOptions {
selector: SelectorType::default(), selector: SelectorType::default(),
data_home: DEFAULT_DATA_HOME.to_string(), data_home: DEFAULT_DATA_HOME.to_string(),
grpc: GrpcOptions { server_addr: "127.0.0.1:3002".to_string(),
bind_addr: "127.0.0.1:3002".to_string(),
server_addr: "127.0.0.1:3002".to_string(),
..Default::default()
},
logging: LoggingOptions { logging: LoggingOptions {
dir: Path::new(DEFAULT_DATA_HOME) dir: Path::new(DEFAULT_DATA_HOME)
.join(DEFAULT_LOGGING_DIR) .join(DEFAULT_LOGGING_DIR)
@@ -199,7 +195,7 @@ fn test_load_metasrv_example_config() {
}, },
}, },
export_metrics: ExportMetricsOption { export_metrics: ExportMetricsOption {
self_import: None, self_import: Some(Default::default()),
remote_write: Some(Default::default()), remote_write: Some(Default::default()),
..Default::default() ..Default::default()
}, },

View File

@@ -12,7 +12,11 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
pub mod approximate; mod geo_path;
#[cfg(feature = "geo")] mod hll;
pub mod geo; mod uddsketch_state;
pub mod vector;
pub use geo_path::{GeoPathAccumulator, GEO_PATH_NAME};
pub(crate) use hll::HllStateType;
pub use hll::{HllState, HLL_MERGE_NAME, HLL_NAME};
pub use uddsketch_state::{UddSketchState, UDDSKETCH_MERGE_NAME, UDDSKETCH_STATE_NAME};

View File

@@ -47,7 +47,7 @@ impl GeoPathAccumulator {
Self::default() Self::default()
} }
pub fn uadf_impl() -> AggregateUDF { pub fn udf_impl() -> AggregateUDF {
create_udaf( create_udaf(
GEO_PATH_NAME, GEO_PATH_NAME,
// Input types: lat, lng, timestamp // Input types: lat, lng, timestamp

View File

@@ -1,32 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::function_registry::FunctionRegistry;
pub(crate) mod hll;
mod uddsketch;
pub(crate) struct ApproximateFunction;
impl ApproximateFunction {
pub fn register(registry: &FunctionRegistry) {
// uddsketch
registry.register_aggr(uddsketch::UddSketchState::state_udf_impl());
registry.register_aggr(uddsketch::UddSketchState::merge_udf_impl());
// hll
registry.register_aggr(hll::HllState::state_udf_impl());
registry.register_aggr(hll::HllState::merge_udf_impl());
}
}

View File

@@ -1,27 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::function_registry::FunctionRegistry;
mod encoding;
mod geo_path;
pub(crate) struct GeoFunction;
impl GeoFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_aggr(geo_path::GeoPathAccumulator::uadf_impl());
registry.register_aggr(encoding::JsonPathAccumulator::uadf_impl());
}
}

View File

@@ -1,29 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::aggrs::vector::product::VectorProduct;
use crate::aggrs::vector::sum::VectorSum;
use crate::function_registry::FunctionRegistry;
mod product;
mod sum;
pub(crate) struct VectorFunction;
impl VectorFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_aggr(VectorSum::uadf_impl());
registry.register_aggr(VectorProduct::uadf_impl());
}
}

View File

@@ -1,63 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use datafusion_expr::ScalarUDF;
use crate::function::{FunctionContext, FunctionRef};
use crate::scalars::udf::create_udf;
/// A factory for creating `ScalarUDF` that require a function context.
#[derive(Clone)]
pub struct ScalarFunctionFactory {
name: String,
factory: Arc<dyn Fn(FunctionContext) -> ScalarUDF + Send + Sync>,
}
impl ScalarFunctionFactory {
/// Returns the name of the function.
pub fn name(&self) -> &str {
&self.name
}
/// Returns a `ScalarUDF` when given a function context.
pub fn provide(&self, ctx: FunctionContext) -> ScalarUDF {
(self.factory)(ctx)
}
}
impl From<ScalarUDF> for ScalarFunctionFactory {
fn from(df_udf: ScalarUDF) -> Self {
let name = df_udf.name().to_string();
let func = Arc::new(move |_ctx| df_udf.clone());
Self {
name,
factory: func,
}
}
}
impl From<FunctionRef> for ScalarFunctionFactory {
fn from(func: FunctionRef) -> Self {
let name = func.name().to_string();
let func = Arc::new(move |ctx: FunctionContext| {
create_udf(func.clone(), ctx.query_ctx, ctx.state)
});
Self {
name,
factory: func,
}
}
}

View File

@@ -16,14 +16,11 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use datafusion_expr::AggregateUDF;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use crate::admin::AdminFunction; use crate::admin::AdminFunction;
use crate::aggrs::approximate::ApproximateFunction; use crate::function::{AsyncFunctionRef, FunctionRef};
use crate::aggrs::vector::VectorFunction as VectorAggrFunction; use crate::scalars::aggregate::{AggregateFunctionMetaRef, AggregateFunctions};
use crate::function::{AsyncFunctionRef, Function, FunctionRef};
use crate::function_factory::ScalarFunctionFactory;
use crate::scalars::date::DateFunction; use crate::scalars::date::DateFunction;
use crate::scalars::expression::ExpressionFunction; use crate::scalars::expression::ExpressionFunction;
use crate::scalars::hll_count::HllCalcFunction; use crate::scalars::hll_count::HllCalcFunction;
@@ -34,19 +31,18 @@ use crate::scalars::matches_term::MatchesTermFunction;
use crate::scalars::math::MathFunction; use crate::scalars::math::MathFunction;
use crate::scalars::timestamp::TimestampFunction; use crate::scalars::timestamp::TimestampFunction;
use crate::scalars::uddsketch_calc::UddSketchCalcFunction; use crate::scalars::uddsketch_calc::UddSketchCalcFunction;
use crate::scalars::vector::VectorFunction as VectorScalarFunction; use crate::scalars::vector::VectorFunction;
use crate::system::SystemFunction; use crate::system::SystemFunction;
#[derive(Default)] #[derive(Default)]
pub struct FunctionRegistry { pub struct FunctionRegistry {
functions: RwLock<HashMap<String, ScalarFunctionFactory>>, functions: RwLock<HashMap<String, FunctionRef>>,
async_functions: RwLock<HashMap<String, AsyncFunctionRef>>, async_functions: RwLock<HashMap<String, AsyncFunctionRef>>,
aggregate_functions: RwLock<HashMap<String, AggregateUDF>>, aggregate_functions: RwLock<HashMap<String, AggregateFunctionMetaRef>>,
} }
impl FunctionRegistry { impl FunctionRegistry {
pub fn register(&self, func: impl Into<ScalarFunctionFactory>) { pub fn register(&self, func: FunctionRef) {
let func = func.into();
let _ = self let _ = self
.functions .functions
.write() .write()
@@ -54,10 +50,6 @@ impl FunctionRegistry {
.insert(func.name().to_string(), func); .insert(func.name().to_string(), func);
} }
pub fn register_scalar(&self, func: impl Function + 'static) {
self.register(Arc::new(func) as FunctionRef);
}
pub fn register_async(&self, func: AsyncFunctionRef) { pub fn register_async(&self, func: AsyncFunctionRef) {
let _ = self let _ = self
.async_functions .async_functions
@@ -66,14 +58,6 @@ impl FunctionRegistry {
.insert(func.name().to_string(), func); .insert(func.name().to_string(), func);
} }
pub fn register_aggr(&self, func: AggregateUDF) {
let _ = self
.aggregate_functions
.write()
.unwrap()
.insert(func.name().to_string(), func);
}
pub fn get_async_function(&self, name: &str) -> Option<AsyncFunctionRef> { pub fn get_async_function(&self, name: &str) -> Option<AsyncFunctionRef> {
self.async_functions.read().unwrap().get(name).cloned() self.async_functions.read().unwrap().get(name).cloned()
} }
@@ -87,16 +71,27 @@ impl FunctionRegistry {
.collect() .collect()
} }
#[cfg(test)] pub fn register_aggregate_function(&self, func: AggregateFunctionMetaRef) {
pub fn get_function(&self, name: &str) -> Option<ScalarFunctionFactory> { let _ = self
.aggregate_functions
.write()
.unwrap()
.insert(func.name(), func);
}
pub fn get_aggr_function(&self, name: &str) -> Option<AggregateFunctionMetaRef> {
self.aggregate_functions.read().unwrap().get(name).cloned()
}
pub fn get_function(&self, name: &str) -> Option<FunctionRef> {
self.functions.read().unwrap().get(name).cloned() self.functions.read().unwrap().get(name).cloned()
} }
pub fn scalar_functions(&self) -> Vec<ScalarFunctionFactory> { pub fn functions(&self) -> Vec<FunctionRef> {
self.functions.read().unwrap().values().cloned().collect() self.functions.read().unwrap().values().cloned().collect()
} }
pub fn aggregate_functions(&self) -> Vec<AggregateUDF> { pub fn aggregate_functions(&self) -> Vec<AggregateFunctionMetaRef> {
self.aggregate_functions self.aggregate_functions
.read() .read()
.unwrap() .unwrap()
@@ -117,6 +112,9 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
UddSketchCalcFunction::register(&function_registry); UddSketchCalcFunction::register(&function_registry);
HllCalcFunction::register(&function_registry); HllCalcFunction::register(&function_registry);
// Aggregate functions
AggregateFunctions::register(&function_registry);
// Full text search function // Full text search function
MatchesFunction::register(&function_registry); MatchesFunction::register(&function_registry);
MatchesTermFunction::register(&function_registry); MatchesTermFunction::register(&function_registry);
@@ -129,21 +127,15 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
JsonFunction::register(&function_registry); JsonFunction::register(&function_registry);
// Vector related functions // Vector related functions
VectorScalarFunction::register(&function_registry); VectorFunction::register(&function_registry);
VectorAggrFunction::register(&function_registry);
// Geo functions // Geo functions
#[cfg(feature = "geo")] #[cfg(feature = "geo")]
crate::scalars::geo::GeoFunctions::register(&function_registry); crate::scalars::geo::GeoFunctions::register(&function_registry);
#[cfg(feature = "geo")]
crate::aggrs::geo::GeoFunction::register(&function_registry);
// Ip functions // Ip functions
IpFunctions::register(&function_registry); IpFunctions::register(&function_registry);
// Approximate functions
ApproximateFunction::register(&function_registry);
Arc::new(function_registry) Arc::new(function_registry)
}); });
@@ -155,11 +147,12 @@ mod tests {
#[test] #[test]
fn test_function_registry() { fn test_function_registry() {
let registry = FunctionRegistry::default(); let registry = FunctionRegistry::default();
let func = Arc::new(TestAndFunction);
assert!(registry.get_function("test_and").is_none()); assert!(registry.get_function("test_and").is_none());
assert!(registry.scalar_functions().is_empty()); assert!(registry.functions().is_empty());
registry.register_scalar(TestAndFunction); registry.register(func);
let _ = registry.get_function("test_and").unwrap(); let _ = registry.get_function("test_and").unwrap();
assert_eq!(1, registry.scalar_functions().len()); assert_eq!(1, registry.functions().len());
} }
} }

View File

@@ -19,14 +19,13 @@ mod adjust_flow;
mod admin; mod admin;
mod flush_flow; mod flush_flow;
mod macros; mod macros;
pub mod scalars;
mod system; mod system;
pub mod aggrs; pub mod aggr;
pub mod function; pub mod function;
pub mod function_factory;
pub mod function_registry; pub mod function_registry;
pub mod handlers; pub mod handlers;
pub mod helper; pub mod helper;
pub mod scalars;
pub mod state; pub mod state;
pub mod utils; pub mod utils;

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
pub mod aggregate;
pub(crate) mod date; pub(crate) mod date;
pub mod expression; pub mod expression;
#[cfg(feature = "geo")] #[cfg(feature = "geo")]

View File

@@ -0,0 +1,89 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! # Deprecate Warning:
//!
//! This module is deprecated and will be removed in the future.
//! All UDAF implementation here are not maintained and should
//! not be used before they are refactored into the `src/aggr`
//! version.
use std::sync::Arc;
use common_query::logical_plan::AggregateFunctionCreatorRef;
use crate::function_registry::FunctionRegistry;
use crate::scalars::vector::product::VectorProductCreator;
use crate::scalars::vector::sum::VectorSumCreator;
/// A function creates `AggregateFunctionCreator`.
/// "Aggregator" *is* AggregatorFunction. Since the later one is long, we named an short alias for it.
/// The two names might be used interchangeably.
type AggregatorCreatorFunction = Arc<dyn Fn() -> AggregateFunctionCreatorRef + Send + Sync>;
/// `AggregateFunctionMeta` dynamically creates AggregateFunctionCreator.
#[derive(Clone)]
pub struct AggregateFunctionMeta {
name: String,
args_count: u8,
creator: AggregatorCreatorFunction,
}
pub type AggregateFunctionMetaRef = Arc<AggregateFunctionMeta>;
impl AggregateFunctionMeta {
pub fn new(name: &str, args_count: u8, creator: AggregatorCreatorFunction) -> Self {
Self {
name: name.to_string(),
args_count,
creator,
}
}
pub fn name(&self) -> String {
self.name.to_string()
}
pub fn args_count(&self) -> u8 {
self.args_count
}
pub fn create(&self) -> AggregateFunctionCreatorRef {
(self.creator)()
}
}
pub(crate) struct AggregateFunctions;
impl AggregateFunctions {
pub fn register(registry: &FunctionRegistry) {
registry.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
"vec_sum",
1,
Arc::new(|| Arc::new(VectorSumCreator::default())),
)));
registry.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
"vec_product",
1,
Arc::new(|| Arc::new(VectorProductCreator::default())),
)));
#[cfg(feature = "geo")]
registry.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
"json_encode_path",
3,
Arc::new(|| Arc::new(super::geo::encoding::JsonPathEncodeFunctionCreator::default())),
)));
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::sync::Arc;
mod date_add; mod date_add;
mod date_format; mod date_format;
mod date_sub; mod date_sub;
@@ -26,8 +27,8 @@ pub(crate) struct DateFunction;
impl DateFunction { impl DateFunction {
pub fn register(registry: &FunctionRegistry) { pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(DateAddFunction); registry.register(Arc::new(DateAddFunction));
registry.register_scalar(DateSubFunction); registry.register(Arc::new(DateSubFunction));
registry.register_scalar(DateFormatFunction); registry.register(Arc::new(DateFormatFunction));
} }
} }

View File

@@ -17,6 +17,8 @@ mod ctx;
mod is_null; mod is_null;
mod unary; mod unary;
use std::sync::Arc;
pub use binary::scalar_binary_op; pub use binary::scalar_binary_op;
pub use ctx::EvalContext; pub use ctx::EvalContext;
pub use unary::scalar_unary_op; pub use unary::scalar_unary_op;
@@ -28,6 +30,6 @@ pub(crate) struct ExpressionFunction;
impl ExpressionFunction { impl ExpressionFunction {
pub fn register(registry: &FunctionRegistry) { pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(IsNullFunction); registry.register(Arc::new(IsNullFunction));
} }
} }

View File

@@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::sync::Arc;
pub(crate) mod encoding;
mod geohash; mod geohash;
mod h3; mod h3;
pub(crate) mod helpers; mod helpers;
mod measure; mod measure;
mod relation; mod relation;
mod s2; mod s2;
@@ -27,57 +29,57 @@ pub(crate) struct GeoFunctions;
impl GeoFunctions { impl GeoFunctions {
pub fn register(registry: &FunctionRegistry) { pub fn register(registry: &FunctionRegistry) {
// geohash // geohash
registry.register_scalar(geohash::GeohashFunction); registry.register(Arc::new(geohash::GeohashFunction));
registry.register_scalar(geohash::GeohashNeighboursFunction); registry.register(Arc::new(geohash::GeohashNeighboursFunction));
// h3 index // h3 index
registry.register_scalar(h3::H3LatLngToCell); registry.register(Arc::new(h3::H3LatLngToCell));
registry.register_scalar(h3::H3LatLngToCellString); registry.register(Arc::new(h3::H3LatLngToCellString));
// h3 index inspection // h3 index inspection
registry.register_scalar(h3::H3CellBase); registry.register(Arc::new(h3::H3CellBase));
registry.register_scalar(h3::H3CellIsPentagon); registry.register(Arc::new(h3::H3CellIsPentagon));
registry.register_scalar(h3::H3StringToCell); registry.register(Arc::new(h3::H3StringToCell));
registry.register_scalar(h3::H3CellToString); registry.register(Arc::new(h3::H3CellToString));
registry.register_scalar(h3::H3CellCenterLatLng); registry.register(Arc::new(h3::H3CellCenterLatLng));
registry.register_scalar(h3::H3CellResolution); registry.register(Arc::new(h3::H3CellResolution));
// h3 hierarchical grid // h3 hierarchical grid
registry.register_scalar(h3::H3CellCenterChild); registry.register(Arc::new(h3::H3CellCenterChild));
registry.register_scalar(h3::H3CellParent); registry.register(Arc::new(h3::H3CellParent));
registry.register_scalar(h3::H3CellToChildren); registry.register(Arc::new(h3::H3CellToChildren));
registry.register_scalar(h3::H3CellToChildrenSize); registry.register(Arc::new(h3::H3CellToChildrenSize));
registry.register_scalar(h3::H3CellToChildPos); registry.register(Arc::new(h3::H3CellToChildPos));
registry.register_scalar(h3::H3ChildPosToCell); registry.register(Arc::new(h3::H3ChildPosToCell));
registry.register_scalar(h3::H3CellContains); registry.register(Arc::new(h3::H3CellContains));
// h3 grid traversal // h3 grid traversal
registry.register_scalar(h3::H3GridDisk); registry.register(Arc::new(h3::H3GridDisk));
registry.register_scalar(h3::H3GridDiskDistances); registry.register(Arc::new(h3::H3GridDiskDistances));
registry.register_scalar(h3::H3GridDistance); registry.register(Arc::new(h3::H3GridDistance));
registry.register_scalar(h3::H3GridPathCells); registry.register(Arc::new(h3::H3GridPathCells));
// h3 measurement // h3 measurement
registry.register_scalar(h3::H3CellDistanceSphereKm); registry.register(Arc::new(h3::H3CellDistanceSphereKm));
registry.register_scalar(h3::H3CellDistanceEuclideanDegree); registry.register(Arc::new(h3::H3CellDistanceEuclideanDegree));
// s2 // s2
registry.register_scalar(s2::S2LatLngToCell); registry.register(Arc::new(s2::S2LatLngToCell));
registry.register_scalar(s2::S2CellLevel); registry.register(Arc::new(s2::S2CellLevel));
registry.register_scalar(s2::S2CellToToken); registry.register(Arc::new(s2::S2CellToToken));
registry.register_scalar(s2::S2CellParent); registry.register(Arc::new(s2::S2CellParent));
// spatial data type // spatial data type
registry.register_scalar(wkt::LatLngToPointWkt); registry.register(Arc::new(wkt::LatLngToPointWkt));
// spatial relation // spatial relation
registry.register_scalar(relation::STContains); registry.register(Arc::new(relation::STContains));
registry.register_scalar(relation::STWithin); registry.register(Arc::new(relation::STWithin));
registry.register_scalar(relation::STIntersects); registry.register(Arc::new(relation::STIntersects));
// spatial measure // spatial measure
registry.register_scalar(measure::STDistance); registry.register(Arc::new(measure::STDistance));
registry.register_scalar(measure::STDistanceSphere); registry.register(Arc::new(measure::STDistanceSphere));
registry.register_scalar(measure::STArea); registry.register(Arc::new(measure::STArea));
} }
} }

View File

@@ -19,12 +19,9 @@ use common_error::status_code::StatusCode;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore}; use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{self, InvalidInputStateSnafu, Result}; use common_query::error::{self, InvalidInputStateSnafu, Result};
use common_query::logical_plan::accumulator::AggrFuncTypeStore; use common_query::logical_plan::accumulator::AggrFuncTypeStore;
use common_query::logical_plan::{ use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
create_aggregate_function, Accumulator, AggregateFunctionCreator,
};
use common_query::prelude::AccumulatorCreatorFunction; use common_query::prelude::AccumulatorCreatorFunction;
use common_time::Timestamp; use common_time::Timestamp;
use datafusion_expr::AggregateUDF;
use datatypes::prelude::ConcreteDataType; use datatypes::prelude::ConcreteDataType;
use datatypes::value::{ListValue, Value}; use datatypes::value::{ListValue, Value};
use datatypes::vectors::VectorRef; use datatypes::vectors::VectorRef;
@@ -50,16 +47,6 @@ impl JsonPathAccumulator {
timestamp_type, timestamp_type,
} }
} }
/// Create a new `AggregateUDF` for the `json_encode_path` aggregate function.
pub fn uadf_impl() -> AggregateUDF {
create_aggregate_function(
"json_encode_path".to_string(),
3,
Arc::new(JsonPathEncodeFunctionCreator::default()),
)
.into()
}
} }
impl Accumulator for JsonPathAccumulator { impl Accumulator for JsonPathAccumulator {

View File

@@ -37,7 +37,7 @@ macro_rules! ensure_columns_len {
}; };
} }
pub(crate) use ensure_columns_len; pub(super) use ensure_columns_len;
macro_rules! ensure_columns_n { macro_rules! ensure_columns_n {
($columns:ident, $n:literal) => { ($columns:ident, $n:literal) => {
@@ -58,7 +58,7 @@ macro_rules! ensure_columns_n {
}; };
} }
pub(crate) use ensure_columns_n; pub(super) use ensure_columns_n;
macro_rules! ensure_and_coerce { macro_rules! ensure_and_coerce {
($compare:expr, $coerce:expr) => {{ ($compare:expr, $coerce:expr) => {{
@@ -72,4 +72,4 @@ macro_rules! ensure_and_coerce {
}}; }};
} }
pub(crate) use ensure_and_coerce; pub(super) use ensure_and_coerce;

View File

@@ -16,6 +16,7 @@
use std::fmt; use std::fmt;
use std::fmt::Display; use std::fmt::Display;
use std::sync::Arc;
use common_query::error::{DowncastVectorSnafu, InvalidFuncArgsSnafu, Result}; use common_query::error::{DowncastVectorSnafu, InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, Volatility}; use common_query::prelude::{Signature, Volatility};
@@ -26,7 +27,7 @@ use datatypes::vectors::{BinaryVector, MutableVector, UInt64VectorBuilder, Vecto
use hyperloglogplus::HyperLogLog; use hyperloglogplus::HyperLogLog;
use snafu::OptionExt; use snafu::OptionExt;
use crate::aggrs::approximate::hll::HllStateType; use crate::aggr::HllStateType;
use crate::function::{Function, FunctionContext}; use crate::function::{Function, FunctionContext};
use crate::function_registry::FunctionRegistry; use crate::function_registry::FunctionRegistry;
@@ -43,7 +44,7 @@ pub struct HllCalcFunction;
impl HllCalcFunction { impl HllCalcFunction {
pub fn register(registry: &FunctionRegistry) { pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(HllCalcFunction); registry.register(Arc::new(HllCalcFunction));
} }
} }
@@ -116,8 +117,6 @@ impl Function for HllCalcFunction {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::sync::Arc;
use datatypes::vectors::BinaryVector; use datatypes::vectors::BinaryVector;
use super::*; use super::*;

View File

@@ -17,6 +17,8 @@ mod ipv4;
mod ipv6; mod ipv6;
mod range; mod range;
use std::sync::Arc;
use cidr::{Ipv4ToCidr, Ipv6ToCidr}; use cidr::{Ipv4ToCidr, Ipv6ToCidr};
use ipv4::{Ipv4NumToString, Ipv4StringToNum}; use ipv4::{Ipv4NumToString, Ipv4StringToNum};
use ipv6::{Ipv6NumToString, Ipv6StringToNum}; use ipv6::{Ipv6NumToString, Ipv6StringToNum};
@@ -29,15 +31,15 @@ pub(crate) struct IpFunctions;
impl IpFunctions { impl IpFunctions {
pub fn register(registry: &FunctionRegistry) { pub fn register(registry: &FunctionRegistry) {
// Register IPv4 functions // Register IPv4 functions
registry.register_scalar(Ipv4NumToString); registry.register(Arc::new(Ipv4NumToString));
registry.register_scalar(Ipv4StringToNum); registry.register(Arc::new(Ipv4StringToNum));
registry.register_scalar(Ipv4ToCidr); registry.register(Arc::new(Ipv4ToCidr));
registry.register_scalar(Ipv4InRange); registry.register(Arc::new(Ipv4InRange));
// Register IPv6 functions // Register IPv6 functions
registry.register_scalar(Ipv6NumToString); registry.register(Arc::new(Ipv6NumToString));
registry.register_scalar(Ipv6StringToNum); registry.register(Arc::new(Ipv6StringToNum));
registry.register_scalar(Ipv6ToCidr); registry.register(Arc::new(Ipv6ToCidr));
registry.register_scalar(Ipv6InRange); registry.register(Arc::new(Ipv6InRange));
} }
} }

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::sync::Arc;
pub mod json_get; pub mod json_get;
mod json_is; mod json_is;
mod json_path_exists; mod json_path_exists;
@@ -32,23 +33,23 @@ pub(crate) struct JsonFunction;
impl JsonFunction { impl JsonFunction {
pub fn register(registry: &FunctionRegistry) { pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(JsonToStringFunction); registry.register(Arc::new(JsonToStringFunction));
registry.register_scalar(ParseJsonFunction); registry.register(Arc::new(ParseJsonFunction));
registry.register_scalar(JsonGetInt); registry.register(Arc::new(JsonGetInt));
registry.register_scalar(JsonGetFloat); registry.register(Arc::new(JsonGetFloat));
registry.register_scalar(JsonGetString); registry.register(Arc::new(JsonGetString));
registry.register_scalar(JsonGetBool); registry.register(Arc::new(JsonGetBool));
registry.register_scalar(JsonIsNull); registry.register(Arc::new(JsonIsNull));
registry.register_scalar(JsonIsInt); registry.register(Arc::new(JsonIsInt));
registry.register_scalar(JsonIsFloat); registry.register(Arc::new(JsonIsFloat));
registry.register_scalar(JsonIsString); registry.register(Arc::new(JsonIsString));
registry.register_scalar(JsonIsBool); registry.register(Arc::new(JsonIsBool));
registry.register_scalar(JsonIsArray); registry.register(Arc::new(JsonIsArray));
registry.register_scalar(JsonIsObject); registry.register(Arc::new(JsonIsObject));
registry.register_scalar(json_path_exists::JsonPathExistsFunction); registry.register(Arc::new(json_path_exists::JsonPathExistsFunction));
registry.register_scalar(json_path_match::JsonPathMatchFunction); registry.register(Arc::new(json_path_match::JsonPathMatchFunction));
} }
} }

View File

@@ -38,11 +38,11 @@ use crate::function_registry::FunctionRegistry;
/// ///
/// Usage: matches(`<col>`, `<pattern>`) -> boolean /// Usage: matches(`<col>`, `<pattern>`) -> boolean
#[derive(Clone, Debug, Default)] #[derive(Clone, Debug, Default)]
pub struct MatchesFunction; pub(crate) struct MatchesFunction;
impl MatchesFunction { impl MatchesFunction {
pub fn register(registry: &FunctionRegistry) { pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(MatchesFunction); registry.register(Arc::new(MatchesFunction));
} }
} }

View File

@@ -77,7 +77,7 @@ pub struct MatchesTermFunction;
impl MatchesTermFunction { impl MatchesTermFunction {
pub fn register(registry: &FunctionRegistry) { pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(MatchesTermFunction); registry.register(Arc::new(MatchesTermFunction));
} }
} }

View File

@@ -18,6 +18,7 @@ mod pow;
mod rate; mod rate;
use std::fmt; use std::fmt;
use std::sync::Arc;
pub use clamp::{ClampFunction, ClampMaxFunction, ClampMinFunction}; pub use clamp::{ClampFunction, ClampMaxFunction, ClampMinFunction};
use common_query::error::{GeneralDataFusionSnafu, Result}; use common_query::error::{GeneralDataFusionSnafu, Result};
@@ -38,13 +39,13 @@ pub(crate) struct MathFunction;
impl MathFunction { impl MathFunction {
pub fn register(registry: &FunctionRegistry) { pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(ModuloFunction); registry.register(Arc::new(ModuloFunction));
registry.register_scalar(PowFunction); registry.register(Arc::new(PowFunction));
registry.register_scalar(RateFunction); registry.register(Arc::new(RateFunction));
registry.register_scalar(RangeFunction); registry.register(Arc::new(RangeFunction));
registry.register_scalar(ClampFunction); registry.register(Arc::new(ClampFunction));
registry.register_scalar(ClampMinFunction); registry.register(Arc::new(ClampMinFunction));
registry.register_scalar(ClampMaxFunction); registry.register(Arc::new(ClampMaxFunction));
} }
} }

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::sync::Arc;
mod to_unixtime; mod to_unixtime;
use to_unixtime::ToUnixtimeFunction; use to_unixtime::ToUnixtimeFunction;
@@ -22,6 +23,6 @@ pub(crate) struct TimestampFunction;
impl TimestampFunction { impl TimestampFunction {
pub fn register(registry: &FunctionRegistry) { pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(ToUnixtimeFunction); registry.register(Arc::new(ToUnixtimeFunction));
} }
} }

View File

@@ -16,6 +16,7 @@
use std::fmt; use std::fmt;
use std::fmt::Display; use std::fmt::Display;
use std::sync::Arc;
use common_query::error::{DowncastVectorSnafu, InvalidFuncArgsSnafu, Result}; use common_query::error::{DowncastVectorSnafu, InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, Volatility}; use common_query::prelude::{Signature, Volatility};
@@ -43,7 +44,7 @@ pub struct UddSketchCalcFunction;
impl UddSketchCalcFunction { impl UddSketchCalcFunction {
pub fn register(registry: &FunctionRegistry) { pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(UddSketchCalcFunction); registry.register(Arc::new(UddSketchCalcFunction));
} }
} }

View File

@@ -17,8 +17,10 @@ mod distance;
mod elem_product; mod elem_product;
mod elem_sum; mod elem_sum;
pub mod impl_conv; pub mod impl_conv;
pub(crate) mod product;
mod scalar_add; mod scalar_add;
mod scalar_mul; mod scalar_mul;
pub(crate) mod sum;
mod vector_add; mod vector_add;
mod vector_dim; mod vector_dim;
mod vector_div; mod vector_div;
@@ -28,34 +30,37 @@ mod vector_norm;
mod vector_sub; mod vector_sub;
mod vector_subvector; mod vector_subvector;
use std::sync::Arc;
use crate::function_registry::FunctionRegistry; use crate::function_registry::FunctionRegistry;
pub(crate) struct VectorFunction; pub(crate) struct VectorFunction;
impl VectorFunction { impl VectorFunction {
pub fn register(registry: &FunctionRegistry) { pub fn register(registry: &FunctionRegistry) {
// conversion // conversion
registry.register_scalar(convert::ParseVectorFunction); registry.register(Arc::new(convert::ParseVectorFunction));
registry.register_scalar(convert::VectorToStringFunction); registry.register(Arc::new(convert::VectorToStringFunction));
// distance // distance
registry.register_scalar(distance::CosDistanceFunction); registry.register(Arc::new(distance::CosDistanceFunction));
registry.register_scalar(distance::DotProductFunction); registry.register(Arc::new(distance::DotProductFunction));
registry.register_scalar(distance::L2SqDistanceFunction); registry.register(Arc::new(distance::L2SqDistanceFunction));
// scalar calculation // scalar calculation
registry.register_scalar(scalar_add::ScalarAddFunction); registry.register(Arc::new(scalar_add::ScalarAddFunction));
registry.register_scalar(scalar_mul::ScalarMulFunction); registry.register(Arc::new(scalar_mul::ScalarMulFunction));
// vector calculation // vector calculation
registry.register_scalar(vector_add::VectorAddFunction); registry.register(Arc::new(vector_add::VectorAddFunction));
registry.register_scalar(vector_sub::VectorSubFunction); registry.register(Arc::new(vector_sub::VectorSubFunction));
registry.register_scalar(vector_mul::VectorMulFunction); registry.register(Arc::new(vector_mul::VectorMulFunction));
registry.register_scalar(vector_div::VectorDivFunction); registry.register(Arc::new(vector_div::VectorDivFunction));
registry.register_scalar(vector_norm::VectorNormFunction); registry.register(Arc::new(vector_norm::VectorNormFunction));
registry.register_scalar(vector_dim::VectorDimFunction); registry.register(Arc::new(vector_dim::VectorDimFunction));
registry.register_scalar(vector_kth_elem::VectorKthElemFunction); registry.register(Arc::new(vector_kth_elem::VectorKthElemFunction));
registry.register_scalar(vector_subvector::VectorSubvectorFunction); registry.register(Arc::new(vector_subvector::VectorSubvectorFunction));
registry.register_scalar(elem_sum::ElemSumFunction); registry.register(Arc::new(elem_sum::ElemSumFunction));
registry.register_scalar(elem_product::ElemProductFunction); registry.register(Arc::new(elem_product::ElemProductFunction));
} }
} }

View File

@@ -16,11 +16,8 @@ use std::sync::Arc;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore}; use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{CreateAccumulatorSnafu, Error, InvalidFuncArgsSnafu}; use common_query::error::{CreateAccumulatorSnafu, Error, InvalidFuncArgsSnafu};
use common_query::logical_plan::{ use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
create_aggregate_function, Accumulator, AggregateFunctionCreator,
};
use common_query::prelude::AccumulatorCreatorFunction; use common_query::prelude::AccumulatorCreatorFunction;
use datafusion_expr::AggregateUDF;
use datatypes::prelude::{ConcreteDataType, Value, *}; use datatypes::prelude::{ConcreteDataType, Value, *};
use datatypes::vectors::VectorRef; use datatypes::vectors::VectorRef;
use nalgebra::{Const, DVectorView, Dyn, OVector}; use nalgebra::{Const, DVectorView, Dyn, OVector};
@@ -78,16 +75,6 @@ impl AggregateFunctionCreator for VectorProductCreator {
} }
impl VectorProduct { impl VectorProduct {
/// Create a new `AggregateUDF` for the `vec_product` aggregate function.
pub fn uadf_impl() -> AggregateUDF {
create_aggregate_function(
"vec_product".to_string(),
1,
Arc::new(VectorProductCreator::default()),
)
.into()
}
fn inner(&mut self, len: usize) -> &mut OVector<f32, Dyn> { fn inner(&mut self, len: usize) -> &mut OVector<f32, Dyn> {
self.product.get_or_insert_with(|| { self.product.get_or_insert_with(|| {
OVector::from_iterator_generic(Dyn(len), Const::<1>, (0..len).map(|_| 1.0)) OVector::from_iterator_generic(Dyn(len), Const::<1>, (0..len).map(|_| 1.0))

View File

@@ -16,11 +16,8 @@ use std::sync::Arc;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore}; use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{CreateAccumulatorSnafu, Error, InvalidFuncArgsSnafu}; use common_query::error::{CreateAccumulatorSnafu, Error, InvalidFuncArgsSnafu};
use common_query::logical_plan::{ use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
create_aggregate_function, Accumulator, AggregateFunctionCreator,
};
use common_query::prelude::AccumulatorCreatorFunction; use common_query::prelude::AccumulatorCreatorFunction;
use datafusion_expr::AggregateUDF;
use datatypes::prelude::{ConcreteDataType, Value, *}; use datatypes::prelude::{ConcreteDataType, Value, *};
use datatypes::vectors::VectorRef; use datatypes::vectors::VectorRef;
use nalgebra::{Const, DVectorView, Dyn, OVector}; use nalgebra::{Const, DVectorView, Dyn, OVector};
@@ -28,7 +25,6 @@ use snafu::ensure;
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const, veclit_to_binlit}; use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const, veclit_to_binlit};
/// The accumulator for the `vec_sum` aggregate function.
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct VectorSum { pub struct VectorSum {
sum: Option<OVector<f32, Dyn>>, sum: Option<OVector<f32, Dyn>>,
@@ -78,16 +74,6 @@ impl AggregateFunctionCreator for VectorSumCreator {
} }
impl VectorSum { impl VectorSum {
/// Create a new `AggregateUDF` for the `vec_sum` aggregate function.
pub fn uadf_impl() -> AggregateUDF {
create_aggregate_function(
"vec_sum".to_string(),
1,
Arc::new(VectorSumCreator::default()),
)
.into()
}
fn inner(&mut self, len: usize) -> &mut OVector<f32, Dyn> { fn inner(&mut self, len: usize) -> &mut OVector<f32, Dyn> {
self.sum self.sum
.get_or_insert_with(|| OVector::zeros_generic(Dyn(len), Const::<1>)) .get_or_insert_with(|| OVector::zeros_generic(Dyn(len), Const::<1>))

View File

@@ -36,13 +36,13 @@ pub(crate) struct SystemFunction;
impl SystemFunction { impl SystemFunction {
pub fn register(registry: &FunctionRegistry) { pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(BuildFunction); registry.register(Arc::new(BuildFunction));
registry.register_scalar(VersionFunction); registry.register(Arc::new(VersionFunction));
registry.register_scalar(CurrentSchemaFunction); registry.register(Arc::new(CurrentSchemaFunction));
registry.register_scalar(DatabaseFunction); registry.register(Arc::new(DatabaseFunction));
registry.register_scalar(SessionUserFunction); registry.register(Arc::new(SessionUserFunction));
registry.register_scalar(ReadPreferenceFunction); registry.register(Arc::new(ReadPreferenceFunction));
registry.register_scalar(TimezoneFunction); registry.register(Arc::new(TimezoneFunction));
registry.register_async(Arc::new(ProcedureStateFunction)); registry.register_async(Arc::new(ProcedureStateFunction));
PGCatalogFunction::register(registry); PGCatalogFunction::register(registry);
} }

View File

@@ -16,6 +16,8 @@ mod pg_get_userbyid;
mod table_is_visible; mod table_is_visible;
mod version; mod version;
use std::sync::Arc;
use pg_get_userbyid::PGGetUserByIdFunction; use pg_get_userbyid::PGGetUserByIdFunction;
use table_is_visible::PGTableIsVisibleFunction; use table_is_visible::PGTableIsVisibleFunction;
use version::PGVersionFunction; use version::PGVersionFunction;
@@ -33,8 +35,8 @@ pub(super) struct PGCatalogFunction;
impl PGCatalogFunction { impl PGCatalogFunction {
pub fn register(registry: &FunctionRegistry) { pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(PGTableIsVisibleFunction); registry.register(Arc::new(PGTableIsVisibleFunction));
registry.register_scalar(PGGetUserByIdFunction); registry.register(Arc::new(PGGetUserByIdFunction));
registry.register_scalar(PGVersionFunction); registry.register(Arc::new(PGVersionFunction));
} }
} }

View File

@@ -296,8 +296,6 @@ pub struct ChannelConfig {
pub max_recv_message_size: ReadableSize, pub max_recv_message_size: ReadableSize,
// Max gRPC sending(encoding) message size // Max gRPC sending(encoding) message size
pub max_send_message_size: ReadableSize, pub max_send_message_size: ReadableSize,
pub send_compression: bool,
pub accept_compression: bool,
} }
impl Default for ChannelConfig { impl Default for ChannelConfig {
@@ -318,8 +316,6 @@ impl Default for ChannelConfig {
client_tls: None, client_tls: None,
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
send_compression: false,
accept_compression: false,
} }
} }
} }
@@ -570,8 +566,6 @@ mod tests {
client_tls: None, client_tls: None,
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
send_compression: false,
accept_compression: false,
}, },
default_cfg default_cfg
); );
@@ -616,8 +610,6 @@ mod tests {
}), }),
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
send_compression: false,
accept_compression: false,
}, },
cfg cfg
); );

View File

@@ -64,7 +64,6 @@ impl Default for FlightEncoder {
} }
impl FlightEncoder { impl FlightEncoder {
/// Creates new [FlightEncoder] with compression disabled.
pub fn with_compression_disabled() -> Self { pub fn with_compression_disabled() -> Self {
let write_options = writer::IpcWriteOptions::default() let write_options = writer::IpcWriteOptions::default()
.try_with_compression(None) .try_with_compression(None)

View File

@@ -35,9 +35,6 @@ pub const FLOWNODE_LEASE_SECS: u64 = DATANODE_LEASE_SECS;
/// The lease seconds of metasrv leader. /// The lease seconds of metasrv leader.
pub const META_LEASE_SECS: u64 = 5; pub const META_LEASE_SECS: u64 = 5;
/// The keep-alive interval of the Postgres connection.
pub const POSTGRES_KEEP_ALIVE_SECS: u64 = 30;
/// In a lease, there are two opportunities for renewal. /// In a lease, there are two opportunities for renewal.
pub const META_KEEP_ALIVE_INTERVAL_SECS: u64 = META_LEASE_SECS / 2; pub const META_KEEP_ALIVE_INTERVAL_SECS: u64 = META_LEASE_SECS / 2;

View File

@@ -15,7 +15,7 @@
use std::sync::Arc; use std::sync::Arc;
use api::region::RegionResponse; use api::region::RegionResponse;
use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse}; use api::v1::flow::{FlowRequest, FlowResponse};
use api::v1::region::{InsertRequests, RegionRequest}; use api::v1::region::{InsertRequests, RegionRequest};
pub use common_base::AffectedRows; pub use common_base::AffectedRows;
use common_query::request::QueryRequest; use common_query::request::QueryRequest;
@@ -42,9 +42,6 @@ pub trait Flownode: Send + Sync {
async fn handle(&self, request: FlowRequest) -> Result<FlowResponse>; async fn handle(&self, request: FlowRequest) -> Result<FlowResponse>;
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse>; async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse>;
/// Handles requests to mark time window as dirty.
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse>;
} }
pub type FlownodeRef = Arc<dyn Flownode>; pub type FlownodeRef = Arc<dyn Flownode>;

View File

@@ -14,7 +14,6 @@
pub mod file; pub mod file;
use std::borrow::Cow;
use std::fmt::{Display, Formatter}; use std::fmt::{Display, Formatter};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::time::Instant; use std::time::Instant;
@@ -272,49 +271,6 @@ impl MetadataSnapshotManager {
Ok((filename.to_string(), num_keyvalues as u64)) Ok((filename.to_string(), num_keyvalues as u64))
} }
fn format_output(key: Cow<'_, str>, value: Cow<'_, str>) -> String {
format!("{} => {}", key, value)
}
pub async fn info(
object_store: &ObjectStore,
file_path: &str,
query_str: &str,
limit: Option<usize>,
) -> Result<Vec<String>> {
let path = Path::new(file_path);
let file_name = path
.file_name()
.and_then(|s| s.to_str())
.context(InvalidFilePathSnafu { file_path })?;
let filename = FileName::try_from(file_name)?;
let data = object_store
.read(file_path)
.await
.context(ReadObjectSnafu { file_path })?;
let document = Document::from_slice(&filename.extension.format, &data.to_bytes())?;
let metadata_content = document.into_metadata_content()?.values();
let mut results = Vec::with_capacity(limit.unwrap_or(256));
for kv in metadata_content {
let key_str = String::from_utf8_lossy(&kv.key);
if let Some(prefix) = query_str.strip_suffix('*') {
if key_str.starts_with(prefix) {
let value_str = String::from_utf8_lossy(&kv.value);
results.push(Self::format_output(key_str, value_str));
}
} else if key_str == query_str {
let value_str = String::from_utf8_lossy(&kv.value);
results.push(Self::format_output(key_str, value_str));
}
if results.len() == limit.unwrap_or(usize::MAX) {
break;
}
}
Ok(results)
}
} }
#[cfg(test)] #[cfg(test)]

View File

@@ -111,11 +111,6 @@ impl MetadataContent {
pub fn into_iter(self) -> impl Iterator<Item = KeyValue> { pub fn into_iter(self) -> impl Iterator<Item = KeyValue> {
self.values.into_iter() self.values.into_iter()
} }
/// Returns the key-value pairs as a vector.
pub fn values(self) -> Vec<KeyValue> {
self.values
}
} }
/// The key-value pair of the backup file. /// The key-value pair of the backup file.

View File

@@ -15,7 +15,7 @@
use std::sync::Arc; use std::sync::Arc;
use api::region::RegionResponse; use api::region::RegionResponse;
use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse}; use api::v1::flow::{FlowRequest, FlowResponse};
use api::v1::region::{InsertRequests, RegionRequest}; use api::v1::region::{InsertRequests, RegionRequest};
pub use common_base::AffectedRows; pub use common_base::AffectedRows;
use common_query::request::QueryRequest; use common_query::request::QueryRequest;
@@ -67,14 +67,6 @@ pub trait MockFlownodeHandler: Sync + Send + Clone {
) -> Result<FlowResponse> { ) -> Result<FlowResponse> {
unimplemented!() unimplemented!()
} }
async fn handle_mark_window_dirty(
&self,
_peer: &Peer,
_req: DirtyWindowRequest,
) -> Result<FlowResponse> {
unimplemented!()
}
} }
/// A mock struct implements [NodeManager] only implement the `datanode` method. /// A mock struct implements [NodeManager] only implement the `datanode` method.
@@ -142,10 +134,6 @@ impl<T: MockFlownodeHandler> Flownode for MockNode<T> {
async fn handle_inserts(&self, requests: InsertRequests) -> Result<FlowResponse> { async fn handle_inserts(&self, requests: InsertRequests) -> Result<FlowResponse> {
self.handler.handle_inserts(&self.peer, requests).await self.handler.handle_inserts(&self.peer, requests).await
} }
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
self.handler.handle_mark_window_dirty(&self.peer, req).await
}
} }
#[async_trait::async_trait] #[async_trait::async_trait]

View File

@@ -372,7 +372,6 @@ impl DatanodeBuilder {
opts.max_concurrent_queries, opts.max_concurrent_queries,
//TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter. //TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter.
Duration::from_millis(100), Duration::from_millis(100),
opts.grpc.flight_compression,
); );
let object_store_manager = Self::build_object_store_manager(&opts.storage).await?; let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;

View File

@@ -50,7 +50,6 @@ use query::QueryEngineRef;
use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult}; use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream}; use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
use servers::grpc::region_server::RegionServerHandler; use servers::grpc::region_server::RegionServerHandler;
use servers::grpc::FlightCompression;
use session::context::{QueryContextBuilder, QueryContextRef}; use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt}; use snafu::{ensure, OptionExt, ResultExt};
use store_api::metric_engine_consts::{ use store_api::metric_engine_consts::{
@@ -81,7 +80,6 @@ use crate::event_listener::RegionServerEventListenerRef;
#[derive(Clone)] #[derive(Clone)]
pub struct RegionServer { pub struct RegionServer {
inner: Arc<RegionServerInner>, inner: Arc<RegionServerInner>,
flight_compression: FlightCompression,
} }
pub struct RegionStat { pub struct RegionStat {
@@ -95,7 +93,6 @@ impl RegionServer {
query_engine: QueryEngineRef, query_engine: QueryEngineRef,
runtime: Runtime, runtime: Runtime,
event_listener: RegionServerEventListenerRef, event_listener: RegionServerEventListenerRef,
flight_compression: FlightCompression,
) -> Self { ) -> Self {
Self::with_table_provider( Self::with_table_provider(
query_engine, query_engine,
@@ -104,7 +101,6 @@ impl RegionServer {
Arc::new(DummyTableProviderFactory), Arc::new(DummyTableProviderFactory),
0, 0,
Duration::from_millis(0), Duration::from_millis(0),
flight_compression,
) )
} }
@@ -115,7 +111,6 @@ impl RegionServer {
table_provider_factory: TableProviderFactoryRef, table_provider_factory: TableProviderFactoryRef,
max_concurrent_queries: usize, max_concurrent_queries: usize,
concurrent_query_limiter_timeout: Duration, concurrent_query_limiter_timeout: Duration,
flight_compression: FlightCompression,
) -> Self { ) -> Self {
Self { Self {
inner: Arc::new(RegionServerInner::new( inner: Arc::new(RegionServerInner::new(
@@ -128,7 +123,6 @@ impl RegionServer {
concurrent_query_limiter_timeout, concurrent_query_limiter_timeout,
), ),
)), )),
flight_compression,
} }
} }
@@ -542,11 +536,7 @@ impl FlightCraft for RegionServer {
.trace(tracing_context.attach(info_span!("RegionServer::handle_read"))) .trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
.await?; .await?;
let stream = Box::pin(FlightRecordBatchStream::new( let stream = Box::pin(FlightRecordBatchStream::new(result, tracing_context));
result,
tracing_context,
self.flight_compression,
));
Ok(Response::new(stream)) Ok(Response::new(stream))
} }
} }

View File

@@ -19,16 +19,16 @@ use std::time::Duration;
use api::region::RegionResponse; use api::region::RegionResponse;
use async_trait::async_trait; use async_trait::async_trait;
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_function::function_factory::ScalarFunctionFactory; use common_function::function::FunctionRef;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::Output; use common_query::Output;
use common_runtime::runtime::{BuilderBuild, RuntimeTrait}; use common_runtime::runtime::{BuilderBuild, RuntimeTrait};
use common_runtime::Runtime; use common_runtime::Runtime;
use datafusion_expr::{AggregateUDF, LogicalPlan}; use datafusion_expr::LogicalPlan;
use query::dataframe::DataFrame; use query::dataframe::DataFrame;
use query::planner::LogicalPlanner; use query::planner::LogicalPlanner;
use query::query_engine::{DescribeResult, QueryEngineState}; use query::query_engine::{DescribeResult, QueryEngineState};
use query::{QueryEngine, QueryEngineContext}; use query::{QueryEngine, QueryEngineContext};
use servers::grpc::FlightCompression;
use session::context::QueryContextRef; use session::context::QueryContextRef;
use store_api::metadata::RegionMetadataRef; use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{ use store_api::region_engine::{
@@ -76,9 +76,9 @@ impl QueryEngine for MockQueryEngine {
unimplemented!() unimplemented!()
} }
fn register_aggregate_function(&self, _func: AggregateUDF) {} fn register_aggregate_function(&self, _func: AggregateFunctionMetaRef) {}
fn register_scalar_function(&self, _func: ScalarFunctionFactory) {} fn register_function(&self, _func: FunctionRef) {}
fn read_table(&self, _table: TableRef) -> query::error::Result<DataFrame> { fn read_table(&self, _table: TableRef) -> query::error::Result<DataFrame> {
unimplemented!() unimplemented!()
@@ -98,7 +98,6 @@ pub fn mock_region_server() -> RegionServer {
Arc::new(MockQueryEngine), Arc::new(MockQueryEngine),
Runtime::builder().build().unwrap(), Runtime::builder().build().unwrap(),
Box::new(NoopRegionServerEventListener), Box::new(NoopRegionServerEventListener),
FlightCompression::default(),
) )
} }

View File

@@ -316,7 +316,7 @@ impl StreamingEngine {
); );
METRIC_FLOW_ROWS METRIC_FLOW_ROWS
.with_label_values(&["out-streaming"]) .with_label_values(&["out"])
.inc_by(total_rows as u64); .inc_by(total_rows as u64);
let now = self.tick_manager.tick(); let now = self.tick_manager.tick();

View File

@@ -31,7 +31,6 @@ use common_runtime::JoinHandle;
use common_telemetry::{error, info, trace, warn}; use common_telemetry::{error, info, trace, warn};
use datatypes::value::Value; use datatypes::value::Value;
use futures::TryStreamExt; use futures::TryStreamExt;
use greptime_proto::v1::flow::DirtyWindowRequest;
use itertools::Itertools; use itertools::Itertools;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use session::context::QueryContextBuilder; use session::context::QueryContextBuilder;
@@ -48,7 +47,7 @@ use crate::error::{
IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu, NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
}; };
use crate::metrics::{METRIC_FLOW_ROWS, METRIC_FLOW_TASK_COUNT}; use crate::metrics::METRIC_FLOW_TASK_COUNT;
use crate::repr::{self, DiffRow}; use crate::repr::{self, DiffRow};
use crate::{Error, FlowId}; use crate::{Error, FlowId};
@@ -691,9 +690,6 @@ impl FlowEngine for FlowDualEngine {
let mut to_stream_engine = Vec::with_capacity(request.requests.len()); let mut to_stream_engine = Vec::with_capacity(request.requests.len());
let mut to_batch_engine = request.requests; let mut to_batch_engine = request.requests;
let mut batching_row_cnt = 0;
let mut streaming_row_cnt = 0;
{ {
// not locking this, or recover flows will be starved when also handling flow inserts // not locking this, or recover flows will be starved when also handling flow inserts
let src_table2flow = self.src_table2flow.read().await; let src_table2flow = self.src_table2flow.read().await;
@@ -703,11 +699,9 @@ impl FlowEngine for FlowDualEngine {
let is_in_stream = src_table2flow.in_stream(table_id); let is_in_stream = src_table2flow.in_stream(table_id);
let is_in_batch = src_table2flow.in_batch(table_id); let is_in_batch = src_table2flow.in_batch(table_id);
if is_in_stream { if is_in_stream {
streaming_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
to_stream_engine.push(req.clone()); to_stream_engine.push(req.clone());
} }
if is_in_batch { if is_in_batch {
batching_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
return true; return true;
} }
if !is_in_batch && !is_in_stream { if !is_in_batch && !is_in_stream {
@@ -720,14 +714,6 @@ impl FlowEngine for FlowDualEngine {
// can't use drop due to https://github.com/rust-lang/rust/pull/128846 // can't use drop due to https://github.com/rust-lang/rust/pull/128846
} }
METRIC_FLOW_ROWS
.with_label_values(&["in-streaming"])
.inc_by(streaming_row_cnt as u64);
METRIC_FLOW_ROWS
.with_label_values(&["in-batching"])
.inc_by(batching_row_cnt as u64);
let streaming_engine = self.streaming_engine.clone(); let streaming_engine = self.streaming_engine.clone();
let stream_handler: JoinHandle<Result<(), Error>> = let stream_handler: JoinHandle<Result<(), Error>> =
common_runtime::spawn_global(async move { common_runtime::spawn_global(async move {
@@ -853,11 +839,6 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
.map(|_| Default::default()) .map(|_| Default::default())
.map_err(to_meta_err(snafu::location!())) .map_err(to_meta_err(snafu::location!()))
} }
async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
// todo: implement
unimplemented!()
}
} }
/// return a function to convert `crate::error::Error` to `common_meta::error::Error` /// return a function to convert `crate::error::Error` to `common_meta::error::Error`
@@ -880,98 +861,6 @@ fn to_meta_err(
} }
} }
#[async_trait::async_trait]
impl common_meta::node_manager::Flownode for StreamingEngine {
async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
let query_ctx = request
.header
.and_then(|h| h.query_context)
.map(|ctx| ctx.into());
match request.body {
Some(flow_request::Body::Create(CreateRequest {
flow_id: Some(task_id),
source_table_ids,
sink_table_name: Some(sink_table_name),
create_if_not_exists,
expire_after,
comment,
sql,
flow_options,
or_replace,
})) => {
let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
let sink_table_name = [
sink_table_name.catalog_name,
sink_table_name.schema_name,
sink_table_name.table_name,
];
let expire_after = expire_after.map(|e| e.value);
let args = CreateFlowArgs {
flow_id: task_id.id as u64,
sink_table_name,
source_table_ids,
create_if_not_exists,
or_replace,
expire_after,
comment: Some(comment),
sql: sql.clone(),
flow_options,
query_ctx,
};
let ret = self
.create_flow(args)
.await
.map_err(BoxedError::new)
.with_context(|_| CreateFlowSnafu { sql: sql.clone() })
.map_err(to_meta_err(snafu::location!()))?;
METRIC_FLOW_TASK_COUNT.inc();
Ok(FlowResponse {
affected_flows: ret
.map(|id| greptime_proto::v1::FlowId { id: id as u32 })
.into_iter()
.collect_vec(),
..Default::default()
})
}
Some(flow_request::Body::Drop(DropRequest {
flow_id: Some(flow_id),
})) => {
self.remove_flow(flow_id.id as u64)
.await
.map_err(to_meta_err(snafu::location!()))?;
METRIC_FLOW_TASK_COUNT.dec();
Ok(Default::default())
}
Some(flow_request::Body::Flush(FlushFlow {
flow_id: Some(flow_id),
})) => {
let row = self
.flush_flow_inner(flow_id.id as u64)
.await
.map_err(to_meta_err(snafu::location!()))?;
Ok(FlowResponse {
affected_flows: vec![flow_id],
affected_rows: row as u64,
..Default::default()
})
}
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
}
}
async fn handle_inserts(&self, request: InsertRequests) -> MetaResult<FlowResponse> {
self.handle_inserts_inner(request)
.await
.map(|_| Default::default())
.map_err(to_meta_err(snafu::location!()))
}
async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
// todo: implement
unimplemented!()
}
}
impl FlowEngine for StreamingEngine { impl FlowEngine for StreamingEngine {
async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> { async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
self.create_flow_inner(args).await self.create_flow_inner(args).await

View File

@@ -17,7 +17,6 @@
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
use std::sync::Arc; use std::sync::Arc;
use api::v1::flow::{DirtyWindowRequests, FlowResponse};
use catalog::CatalogManagerRef; use catalog::CatalogManagerRef;
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_meta::ddl::create_flow::FlowType; use common_meta::ddl::create_flow::FlowType;
@@ -30,7 +29,8 @@ use common_telemetry::{debug, info};
use common_time::TimeToLive; use common_time::TimeToLive;
use query::QueryEngineRef; use query::QueryEngineRef;
use snafu::{ensure, OptionExt, ResultExt}; use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId}; use store_api::storage::RegionId;
use table::metadata::TableId;
use tokio::sync::{oneshot, RwLock}; use tokio::sync::{oneshot, RwLock};
use crate::batching_mode::frontend_client::FrontendClient; use crate::batching_mode::frontend_client::FrontendClient;
@@ -42,7 +42,6 @@ use crate::error::{
ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu, ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
UnexpectedSnafu, UnsupportedSnafu, UnexpectedSnafu, UnsupportedSnafu,
}; };
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW_RANGE;
use crate::{CreateFlowArgs, Error, FlowId, TableName}; use crate::{CreateFlowArgs, Error, FlowId, TableName};
/// Batching mode Engine, responsible for driving all the batching mode tasks /// Batching mode Engine, responsible for driving all the batching mode tasks
@@ -78,122 +77,6 @@ impl BatchingEngine {
} }
} }
pub async fn handle_mark_dirty_time_window(
&self,
reqs: DirtyWindowRequests,
) -> Result<FlowResponse, Error> {
let table_info_mgr = self.table_meta.table_info_manager();
let mut group_by_table_id: HashMap<u32, Vec<_>> = HashMap::new();
for r in reqs.requests {
let tid = TableId::from(r.table_id);
let entry = group_by_table_id.entry(tid).or_default();
entry.extend(r.dirty_time_ranges);
}
let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
let table_infos =
table_info_mgr
.batch_get(&tids)
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: format!("Failed to get table info for table ids: {:?}", tids),
})?;
let group_by_table_name = group_by_table_id
.into_iter()
.filter_map(|(id, rows)| {
let table_name = table_infos.get(&id).map(|info| info.table_name());
let Some(table_name) = table_name else {
warn!("Failed to get table infos for table id: {:?}", id);
return None;
};
let table_name = [
table_name.catalog_name,
table_name.schema_name,
table_name.table_name,
];
let schema = &table_infos.get(&id).unwrap().table_info.meta.schema;
let time_index_unit = schema.column_schemas[schema.timestamp_index.unwrap()]
.data_type
.as_timestamp()
.unwrap()
.unit();
Some((table_name, (rows, time_index_unit)))
})
.collect::<HashMap<_, _>>();
let group_by_table_name = Arc::new(group_by_table_name);
let mut handles = Vec::new();
let tasks = self.tasks.read().await;
for (_flow_id, task) in tasks.iter() {
let src_table_names = &task.config.source_table_names;
if src_table_names
.iter()
.all(|name| !group_by_table_name.contains_key(name))
{
continue;
}
let group_by_table_name = group_by_table_name.clone();
let task = task.clone();
let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
let src_table_names = &task.config.source_table_names;
let mut all_dirty_windows = vec![];
for src_table_name in src_table_names {
if let Some((window_ranges, unit)) = group_by_table_name.get(src_table_name) {
let Some(expr) = &task.config.time_window_expr else {
continue;
};
for window in window_ranges {
let align_start = expr
.eval(common_time::Timestamp::new(window.start_value, *unit))?
.0
.context(UnexpectedSnafu {
reason: "Failed to eval start value",
})?;
let align_end = expr
.eval(common_time::Timestamp::new(window.end_value, *unit))?
.1
.context(UnexpectedSnafu {
reason: "Failed to eval end value",
})?;
all_dirty_windows.push((align_start, align_end));
}
}
}
let mut state = task.state.write().unwrap();
let flow_id_label = task.config.flow_id.to_string();
for (s, e) in all_dirty_windows {
METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW_RANGE
.with_label_values(&[&flow_id_label])
.observe(e.sub(&s).unwrap_or_default().num_seconds() as f64);
state.dirty_time_windows.add_window(s, Some(e));
}
Ok(())
});
handles.push(handle);
}
drop(tasks);
for handle in handles {
match handle.await {
Err(e) => {
warn!("Failed to handle inserts: {e}");
}
Ok(Ok(())) => (),
Ok(Err(e)) => {
warn!("Failed to handle inserts: {e}");
}
}
}
Ok(Default::default())
}
pub async fn handle_inserts_inner( pub async fn handle_inserts_inner(
&self, &self,
request: api::v1::region::InsertRequests, request: api::v1::region::InsertRequests,

View File

@@ -286,7 +286,7 @@ impl FrontendClient {
/// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts` /// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts`
/// and is able to process query /// and is able to process query
async fn get_random_active_frontend( pub(crate) async fn get_random_active_frontend(
&self, &self,
catalog: &str, catalog: &str,
schema: &str, schema: &str,
@@ -382,7 +382,7 @@ impl FrontendClient {
}), }),
catalog, catalog,
schema, schema,
&mut None, None,
task, task,
) )
.await .await
@@ -394,16 +394,28 @@ impl FrontendClient {
req: api::v1::greptime_request::Request, req: api::v1::greptime_request::Request,
catalog: &str, catalog: &str,
schema: &str, schema: &str,
peer_desc: &mut Option<PeerDesc>, use_peer: Option<Peer>,
task: Option<&BatchingTask>, task: Option<&BatchingTask>,
) -> Result<u32, Error> { ) -> Result<u32, Error> {
match self { match self {
FrontendClient::Distributed { fe_stats, .. } => { FrontendClient::Distributed {
let db = self.get_random_active_frontend(catalog, schema).await?; fe_stats, chnl_mgr, ..
} => {
*peer_desc = Some(PeerDesc::Dist { let db = if let Some(peer) = use_peer {
peer: db.peer.clone(), DatabaseWithPeer::new(
}); Database::new(
catalog,
schema,
Client::with_manager_and_urls(
chnl_mgr.clone(),
vec![peer.addr.clone()],
),
),
peer,
)
} else {
self.get_random_active_frontend(catalog, schema).await?
};
let flow_id = task.map(|t| t.config.flow_id).unwrap_or_default(); let flow_id = task.map(|t| t.config.flow_id).unwrap_or_default();
let _guard = fe_stats.observe(&db.peer.addr, flow_id); let _guard = fe_stats.observe(&db.peer.addr, flow_id);

View File

@@ -31,9 +31,8 @@ use crate::batching_mode::time_window::TimeWindowExpr;
use crate::batching_mode::MIN_REFRESH_DURATION; use crate::batching_mode::MIN_REFRESH_DURATION;
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu}; use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
use crate::metrics::{ use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT, METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT,
METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE,
}; };
use crate::{Error, FlowId}; use crate::{Error, FlowId};
@@ -54,6 +53,8 @@ pub struct TaskState {
pub(crate) shutdown_rx: oneshot::Receiver<()>, pub(crate) shutdown_rx: oneshot::Receiver<()>,
/// Task handle /// Task handle
pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>, pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
/// Slow Query metrics update task handle
pub(crate) slow_query_metric_task: Option<tokio::task::JoinHandle<()>>,
/// min run interval in seconds /// min run interval in seconds
pub(crate) min_run_interval: Option<u64>, pub(crate) min_run_interval: Option<u64>,
@@ -70,6 +71,7 @@ impl TaskState {
exec_state: ExecState::Idle, exec_state: ExecState::Idle,
shutdown_rx, shutdown_rx,
task_handle: None, task_handle: None,
slow_query_metric_task: None,
min_run_interval: None, min_run_interval: None,
max_filter_num: None, max_filter_num: None,
} }
@@ -95,7 +97,7 @@ impl TaskState {
/// TODO: Make this behavior configurable. /// TODO: Make this behavior configurable.
pub fn get_next_start_query_time( pub fn get_next_start_query_time(
&self, &self,
_flow_id: FlowId, flow_id: FlowId,
_time_window_size: &Option<Duration>, _time_window_size: &Option<Duration>,
max_timeout: Option<Duration>, max_timeout: Option<Duration>,
) -> Instant { ) -> Instant {
@@ -109,7 +111,7 @@ impl TaskState {
); );
// if have dirty time window, execute immediately to clean dirty time window // if have dirty time window, execute immediately to clean dirty time window
/*if self.dirty_time_windows.windows.is_empty() { if self.dirty_time_windows.windows.is_empty() {
self.last_update_time + next_duration self.last_update_time + next_duration
} else { } else {
debug!( debug!(
@@ -119,10 +121,7 @@ impl TaskState {
self.dirty_time_windows.windows self.dirty_time_windows.windows
); );
Instant::now() Instant::now()
}*/ }
// wait for next duration anyway
self.last_update_time + next_duration
} }
} }
@@ -215,63 +214,57 @@ impl DirtyTimeWindows {
// get the first `window_cnt` time windows // get the first `window_cnt` time windows
let max_time_range = window_size * window_cnt as i32; let max_time_range = window_size * window_cnt as i32;
let nth = {
let mut to_be_query = BTreeMap::new(); let mut cur_time_range = chrono::Duration::zero();
let mut new_windows = self.windows.clone(); let mut nth_key = None;
let mut cur_time_range = chrono::Duration::zero(); for (idx, (start, end)) in self.windows.iter().enumerate() {
for (idx, (start, end)) in self.windows.iter().enumerate() { // if time range is too long, stop
let first_end = start if cur_time_range > max_time_range {
.add_duration(window_size.to_std().unwrap()) nth_key = Some(*start);
.context(TimeSnafu)?;
let end = end.unwrap_or(first_end);
// if time range is too long, stop
if cur_time_range >= max_time_range {
break;
}
// if we have enough time windows, stop
if idx >= window_cnt {
break;
}
if let Some(x) = end.sub(start) {
if cur_time_range + x <= max_time_range {
to_be_query.insert(*start, Some(end));
new_windows.remove(start);
cur_time_range += x;
} else {
// too large a window, split it
// split at window_size * times
let surplus = max_time_range - cur_time_range;
let times = surplus.num_seconds() / window_size.num_seconds();
let split_offset = window_size * times as i32;
let split_at = start
.add_duration(split_offset.to_std().unwrap())
.context(TimeSnafu)?;
to_be_query.insert(*start, Some(split_at));
// remove the original window
new_windows.remove(start);
new_windows.insert(split_at, Some(end));
cur_time_range += split_offset;
break; break;
} }
}
}
self.windows = new_windows; // if we have enough time windows, stop
if idx >= window_cnt {
nth_key = Some(*start);
break;
}
if let Some(end) = end {
if let Some(x) = end.sub(start) {
cur_time_range += x;
}
}
}
nth_key
};
let first_nth = {
if let Some(nth) = nth {
let mut after = self.windows.split_off(&nth);
std::mem::swap(&mut self.windows, &mut after);
after
} else {
std::mem::take(&mut self.windows)
}
};
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
.with_label_values(&[flow_id.to_string().as_str()]) .with_label_values(&[
.observe(to_be_query.len() as f64); flow_id.to_string().as_str(),
format!("{}", window_size).as_str(),
])
.observe(first_nth.len() as f64);
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT
.with_label_values(&[flow_id.to_string().as_str()]) .with_label_values(&[
flow_id.to_string().as_str(),
format!("{}", window_size).as_str(),
])
.observe(self.windows.len() as f64); .observe(self.windows.len() as f64);
let full_time_range = to_be_query let full_time_range = first_nth
.iter() .iter()
.fold(chrono::Duration::zero(), |acc, (start, end)| { .fold(chrono::Duration::zero(), |acc, (start, end)| {
if let Some(end) = end { if let Some(end) = end {
@@ -281,27 +274,15 @@ impl DirtyTimeWindows {
} }
}) })
.num_seconds() as f64; .num_seconds() as f64;
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE
.with_label_values(&[flow_id.to_string().as_str()]) .with_label_values(&[
flow_id.to_string().as_str(),
format!("{}", window_size).as_str(),
])
.observe(full_time_range); .observe(full_time_range);
let stalled_time_range =
self.windows
.iter()
.fold(chrono::Duration::zero(), |acc, (start, end)| {
if let Some(end) = end {
acc + end.sub(start).unwrap_or(chrono::Duration::zero())
} else {
acc
}
});
METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE
.with_label_values(&[flow_id.to_string().as_str()])
.observe(stalled_time_range.num_seconds() as f64);
let mut expr_lst = vec![]; let mut expr_lst = vec![];
for (start, end) in to_be_query.into_iter() { for (start, end) in first_nth.into_iter() {
// align using time window exprs // align using time window exprs
let (start, end) = if let Some(ctx) = task_ctx { let (start, end) = if let Some(ctx) = task_ctx {
let Some(time_window_expr) = &ctx.config.time_window_expr else { let Some(time_window_expr) = &ctx.config.time_window_expr else {
@@ -535,64 +516,6 @@ mod test {
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))", "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
) )
), ),
// split range
(
Vec::from_iter((0..20).map(|i|Timestamp::new_second(i*3)).chain(std::iter::once(
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
))),
(chrono::Duration::seconds(3), None),
BTreeMap::from([
(
Timestamp::new_second(0),
Some(Timestamp::new_second(
60
)),
),
(
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
Some(Timestamp::new_second(
60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1) + 3
)),
)]),
Some(
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
)
),
// split 2 min into 1 min
(
Vec::from_iter((0..40).map(|i|Timestamp::new_second(i*3))),
(chrono::Duration::seconds(3), None),
BTreeMap::from([
(
Timestamp::new_second(0),
Some(Timestamp::new_second(
40 * 3
)),
)]),
Some(
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
)
),
// split 3s + 1min into 3s + 57s
(
Vec::from_iter(std::iter::once(Timestamp::new_second(0)).chain((0..40).map(|i|Timestamp::new_second(20+i*3)))),
(chrono::Duration::seconds(3), None),
BTreeMap::from([
(
Timestamp::new_second(0),
Some(Timestamp::new_second(
3
)),
),(
Timestamp::new_second(20),
Some(Timestamp::new_second(
140
)),
)]),
Some(
"(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:03' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:00:20' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:17' AS TIMESTAMP))))",
)
),
// expired // expired
( (
vec![ vec![
@@ -609,8 +532,6 @@ mod test {
None None
), ),
]; ];
// let len = testcases.len();
// let testcases = testcases[(len - 2)..(len - 1)].to_vec();
for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
testcases testcases
{ {

View File

@@ -61,9 +61,8 @@ use crate::error::{
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu, SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
}; };
use crate::metrics::{ use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT,
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
METRIC_FLOW_ROWS,
}; };
use crate::{Error, FlowId}; use crate::{Error, FlowId};
@@ -83,6 +82,14 @@ pub struct TaskConfig {
query_type: QueryType, query_type: QueryType,
} }
impl TaskConfig {
pub fn time_window_size(&self) -> Option<Duration> {
self.time_window_expr
.as_ref()
.and_then(|expr| *expr.time_window_size())
}
}
fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> { fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
let stmts = let stmts =
ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default()) ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default())
@@ -336,11 +343,53 @@ impl BatchingTask {
})?; })?;
let plan = expanded_plan; let plan = expanded_plan;
let mut peer_desc = None;
let db = frontend_client
.get_random_active_frontend(catalog, schema)
.await?;
let peer_desc = db.peer.clone();
let (tx, mut rx) = oneshot::channel();
let peer_inner = peer_desc.clone();
let window_size_pretty = format!(
"{}s",
self.config.time_window_size().unwrap_or_default().as_secs()
);
let inner_window_size_pretty = window_size_pretty.clone();
let flow_id = self.config.flow_id;
let slow_query_metric_task = tokio::task::spawn(async move {
tokio::time::sleep(SLOW_QUERY_THRESHOLD).await;
METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT
.with_label_values(&[
flow_id.to_string().as_str(),
&peer_inner.to_string(),
inner_window_size_pretty.as_str(),
])
.add(1.0);
while rx.try_recv() == Err(TryRecvError::Empty) {
// sleep for a while before next update
tokio::time::sleep(MIN_REFRESH_DURATION).await;
}
METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT
.with_label_values(&[
flow_id.to_string().as_str(),
&peer_inner.to_string(),
inner_window_size_pretty.as_str(),
])
.sub(1.0);
});
self.state.write().unwrap().slow_query_metric_task = Some(slow_query_metric_task);
let res = { let res = {
let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
.with_label_values(&[flow_id.to_string().as_str()]) .with_label_values(&[
flow_id.to_string().as_str(),
format!(
"{}s",
self.config.time_window_size().unwrap_or_default().as_secs()
)
.as_str(),
])
.start_timer(); .start_timer();
// hack and special handling the insert logical plan // hack and special handling the insert logical plan
@@ -369,19 +418,18 @@ impl BatchingTask {
}; };
frontend_client frontend_client
.handle(req, catalog, schema, &mut peer_desc, Some(self)) .handle(req, catalog, schema, Some(db.peer), Some(self))
.await .await
}; };
// signaling the slow query metric task to stop
let _ = tx.send(());
let elapsed = instant.elapsed(); let elapsed = instant.elapsed();
if let Ok(affected_rows) = &res { if let Ok(affected_rows) = &res {
debug!( debug!(
"Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}", "Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
elapsed elapsed
); );
METRIC_FLOW_ROWS
.with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
.inc_by(*affected_rows as _);
} else if let Err(err) = &res { } else if let Err(err) = &res {
warn!( warn!(
"Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}", "Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
@@ -398,7 +446,12 @@ impl BatchingTask {
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
.with_label_values(&[ .with_label_values(&[
flow_id.to_string().as_str(), flow_id.to_string().as_str(),
&peer_desc.unwrap_or_default().to_string(), &peer_desc.to_string(),
format!(
"{}s",
self.config.time_window_size().unwrap_or_default().as_secs()
)
.as_str(),
]) ])
.observe(elapsed.as_secs_f64()); .observe(elapsed.as_secs_f64());
} }
@@ -421,7 +474,6 @@ impl BatchingTask {
engine: QueryEngineRef, engine: QueryEngineRef,
frontend_client: Arc<FrontendClient>, frontend_client: Arc<FrontendClient>,
) { ) {
let flow_id_str = self.config.flow_id.to_string();
loop { loop {
// first check if shutdown signal is received // first check if shutdown signal is received
// if so, break the loop // if so, break the loop
@@ -439,9 +491,6 @@ impl BatchingTask {
Err(TryRecvError::Empty) => (), Err(TryRecvError::Empty) => (),
} }
} }
METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
.with_label_values(&[&flow_id_str])
.inc();
let new_query = match self.gen_insert_plan(&engine).await { let new_query = match self.gen_insert_plan(&engine).await {
Ok(new_query) => new_query, Ok(new_query) => new_query,
@@ -488,9 +537,6 @@ impl BatchingTask {
} }
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed // TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
Err(err) => { Err(err) => {
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
.with_label_values(&[&flow_id_str])
.inc();
match new_query { match new_query {
Some(query) => { Some(query) => {
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id) common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)

View File

@@ -31,22 +31,29 @@ lazy_static! {
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME: HistogramVec = register_histogram_vec!( pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME: HistogramVec = register_histogram_vec!(
"greptime_flow_batching_engine_query_time_secs", "greptime_flow_batching_engine_query_time_secs",
"flow batching engine query time(seconds)", "flow batching engine query time(seconds)",
&["flow_id"], &["flow_id", "time_window_granularity"],
vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,] vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,]
) )
.unwrap(); .unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!( pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!(
"greptime_flow_batching_engine_slow_query_secs", "greptime_flow_batching_engine_slow_query_secs",
"flow batching engine slow query(seconds)", "flow batching engine slow query(seconds), updated after query finished",
&["flow_id", "peer"], &["flow_id", "peer", "time_window_granularity"],
vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.] vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.]
) )
.unwrap(); .unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT: GaugeVec =
register_gauge_vec!(
"greptime_flow_batching_engine_real_time_slow_query_number",
"flow batching engine real time slow query number, updated in real time",
&["flow_id", "peer", "time_window_granularity"],
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT: HistogramVec = pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT: HistogramVec =
register_histogram_vec!( register_histogram_vec!(
"greptime_flow_batching_engine_stalled_query_window_cnt", "greptime_flow_batching_engine_stalled_query_window_cnt",
"flow batching engine stalled query time window count", "flow batching engine stalled query time window count",
&["flow_id"], &["flow_id", "time_window_granularity"],
vec![0.0, 5., 10., 20., 40.] vec![0.0, 5., 10., 20., 40.]
) )
.unwrap(); .unwrap();
@@ -54,48 +61,18 @@ lazy_static! {
register_histogram_vec!( register_histogram_vec!(
"greptime_flow_batching_engine_query_window_cnt", "greptime_flow_batching_engine_query_window_cnt",
"flow batching engine query time window count", "flow batching engine query time window count",
&["flow_id"], &["flow_id", "time_window_granularity"],
vec![0.0, 5., 10., 20., 40.] vec![0.0, 5., 10., 20., 40.]
) )
.unwrap(); .unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE: HistogramVec = pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE: HistogramVec =
register_histogram_vec!( register_histogram_vec!(
"greptime_flow_batching_engine_query_window_size_secs", "greptime_flow_batching_engine_query_time_range_secs",
"flow batching engine query window size(seconds)", "flow batching engine query time range(seconds)",
&["flow_id"], &["flow_id", "time_window_granularity"],
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.] vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
) )
.unwrap(); .unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE: HistogramVec =
register_histogram_vec!(
"greptime_flow_batching_engine_stalled_window_size_secs",
"flow batching engine stalled window size(seconds)",
&["flow_id"],
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW_RANGE: HistogramVec =
register_histogram_vec!(
"greptime_flow_batching_engine_bulk_mark_time_window_range_secs",
"flow batching engine query time window range marked by bulk memtable in seconds",
&["flow_id"],
vec![0.0, 60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT: IntCounterVec =
register_int_counter_vec!(
"greptime_flow_batching_start_query_count",
"flow batching engine started query count",
&["flow_id"],
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT: IntCounterVec =
register_int_counter_vec!(
"greptime_flow_batching_error_count",
"flow batching engine error count per flow id",
&["flow_id"]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD: HistogramVec = pub static ref METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD: HistogramVec =
register_histogram_vec!( register_histogram_vec!(
"greptime_flow_batching_engine_guess_fe_load", "greptime_flow_batching_engine_guess_fe_load",
@@ -108,7 +85,7 @@ lazy_static! {
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap(); register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!( pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(
"greptime_flow_processed_rows", "greptime_flow_processed_rows",
"Count of rows flowing through the system.", "Count of rows flowing through the system",
&["direction"] &["direction"]
) )
.unwrap(); .unwrap();

View File

@@ -17,7 +17,6 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use api::v1::flow::DirtyWindowRequests;
use api::v1::{RowDeleteRequests, RowInsertRequests}; use api::v1::{RowDeleteRequests, RowInsertRequests};
use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME}; use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
use catalog::CatalogManagerRef; use catalog::CatalogManagerRef;
@@ -137,18 +136,6 @@ impl flow_server::Flow for FlowService {
.map(Response::new) .map(Response::new)
.map_err(to_status_with_last_err) .map_err(to_status_with_last_err)
} }
async fn handle_mark_dirty_time_window(
&self,
reqs: Request<DirtyWindowRequests>,
) -> Result<Response<FlowResponse>, Status> {
self.dual_engine
.batching_engine()
.handle_mark_dirty_time_window(reqs.into_inner())
.await
.map(Response::new)
.map_err(to_status_with_last_err)
}
} }
#[derive(Clone)] #[derive(Clone)]

View File

@@ -17,7 +17,7 @@ use std::collections::BTreeMap;
use std::sync::Arc; use std::sync::Arc;
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_function::function::{FunctionContext, FunctionRef}; use common_function::function::FunctionContext;
use datafusion_substrait::extensions::Extensions; use datafusion_substrait::extensions::Extensions;
use datatypes::data_type::ConcreteDataType as CDT; use datatypes::data_type::ConcreteDataType as CDT;
use query::QueryEngine; use query::QueryEngine;
@@ -108,13 +108,9 @@ impl FunctionExtensions {
/// register flow-specific functions to the query engine /// register flow-specific functions to the query engine
pub fn register_function_to_query_engine(engine: &Arc<dyn QueryEngine>) { pub fn register_function_to_query_engine(engine: &Arc<dyn QueryEngine>) {
let tumble_fn = Arc::new(TumbleFunction::new("tumble")) as FunctionRef; engine.register_function(Arc::new(TumbleFunction::new("tumble")));
let tumble_start_fn = Arc::new(TumbleFunction::new(TUMBLE_START)) as FunctionRef; engine.register_function(Arc::new(TumbleFunction::new(TUMBLE_START)));
let tumble_end_fn = Arc::new(TumbleFunction::new(TUMBLE_END)) as FunctionRef; engine.register_function(Arc::new(TumbleFunction::new(TUMBLE_END)));
engine.register_scalar_function(tumble_fn.into());
engine.register_scalar_function(tumble_start_fn.into());
engine.register_scalar_function(tumble_end_fn.into());
} }
#[derive(Debug)] #[derive(Debug)]

View File

@@ -35,8 +35,8 @@ use servers::query_handler::grpc::GrpcQueryHandler;
use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContextRef; use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt}; use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableId;
use table::table_name::TableName; use table::table_name::TableName;
use table::TableRef;
use crate::error::{ use crate::error::{
CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu, CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu,
@@ -235,33 +235,34 @@ impl GrpcQueryHandler for Instance {
async fn put_record_batch( async fn put_record_batch(
&self, &self,
table_name: &TableName, table: &TableName,
table_ref: &mut Option<TableRef>, table_id: &mut Option<TableId>,
decoder: &mut FlightDecoder, decoder: &mut FlightDecoder,
data: FlightData, data: FlightData,
) -> Result<AffectedRows> { ) -> Result<AffectedRows> {
let table = if let Some(table) = table_ref { let table_id = if let Some(table_id) = table_id {
table.clone() *table_id
} else { } else {
let table = self let table = self
.catalog_manager() .catalog_manager()
.table( .table(
&table_name.catalog_name, &table.catalog_name,
&table_name.schema_name, &table.schema_name,
&table_name.table_name, &table.table_name,
None, None,
) )
.await .await
.context(CatalogSnafu)? .context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu { .with_context(|| TableNotFoundSnafu {
table_name: table_name.to_string(), table_name: table.to_string(),
})?; })?;
*table_ref = Some(table.clone()); let id = table.table_info().table_id();
table *table_id = Some(id);
id
}; };
self.inserter self.inserter
.handle_bulk_insert(table, decoder, data) .handle_bulk_insert(table_id, decoder, data)
.await .await
.context(TableOperationSnafu) .context(TableOperationSnafu)
} }

View File

@@ -130,13 +130,7 @@ impl JaegerQueryHandler for Instance {
.await?) .await?)
} }
async fn get_trace( async fn get_trace(&self, ctx: QueryContextRef, trace_id: &str) -> ServerResult<Output> {
&self,
ctx: QueryContextRef,
trace_id: &str,
start_time: Option<i64>,
end_time: Option<i64>,
) -> ServerResult<Output> {
// It's equivalent to // It's equivalent to
// //
// ``` // ```
@@ -145,25 +139,13 @@ impl JaegerQueryHandler for Instance {
// FROM // FROM
// {db}.{trace_table} // {db}.{trace_table}
// WHERE // WHERE
// trace_id = '{trace_id}' AND // trace_id = '{trace_id}'
// timestamp >= {start_time} AND
// timestamp <= {end_time}
// ORDER BY // ORDER BY
// timestamp DESC // timestamp DESC
// ```. // ```.
let selects = vec![wildcard()]; let selects = vec![wildcard()];
let mut filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))]; let filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
if let Some(start_time) = start_time {
// Microseconds to nanoseconds.
filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time * 1_000)));
}
if let Some(end_time) = end_time {
// Microseconds to nanoseconds.
filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time * 1_000)));
}
Ok(query_trace_table( Ok(query_trace_table(
ctx, ctx,

View File

@@ -154,7 +154,6 @@ where
ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()), ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
user_provider.clone(), user_provider.clone(),
runtime, runtime,
opts.grpc.flight_compression,
); );
let grpc_server = builder let grpc_server = builder

View File

@@ -31,6 +31,8 @@ use common_meta::kv_backend::rds::MySqlStore;
#[cfg(feature = "pg_kvbackend")] #[cfg(feature = "pg_kvbackend")]
use common_meta::kv_backend::rds::PgStore; use common_meta::kv_backend::rds::PgStore;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
#[cfg(feature = "pg_kvbackend")]
use common_telemetry::error;
use common_telemetry::info; use common_telemetry::info;
#[cfg(feature = "pg_kvbackend")] #[cfg(feature = "pg_kvbackend")]
use deadpool_postgres::{Config, Runtime}; use deadpool_postgres::{Config, Runtime};
@@ -142,8 +144,7 @@ impl MetasrvInstance {
let (serve_state_tx, serve_state_rx) = oneshot::channel(); let (serve_state_tx, serve_state_rx) = oneshot::channel();
let socket_addr = let socket_addr =
bootstrap_metasrv_with_router(&self.opts.grpc.bind_addr, router, serve_state_tx, rx) bootstrap_metasrv_with_router(&self.opts.bind_addr, router, serve_state_tx, rx).await?;
.await?;
self.bind_addr = Some(socket_addr); self.bind_addr = Some(socket_addr);
let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu { let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu {
@@ -259,7 +260,7 @@ pub async fn metasrv_builder(
let etcd_client = create_etcd_client(&opts.store_addrs).await?; let etcd_client = create_etcd_client(&opts.store_addrs).await?;
let kv_backend = EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops); let kv_backend = EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops);
let election = EtcdElection::with_etcd_client( let election = EtcdElection::with_etcd_client(
&opts.grpc.server_addr, &opts.server_addr,
etcd_client, etcd_client,
opts.store_key_prefix.clone(), opts.store_key_prefix.clone(),
) )
@@ -269,41 +270,22 @@ pub async fn metasrv_builder(
} }
#[cfg(feature = "pg_kvbackend")] #[cfg(feature = "pg_kvbackend")]
(None, BackendImpl::PostgresStore) => { (None, BackendImpl::PostgresStore) => {
use std::time::Duration;
use common_meta::distributed_time_constants::POSTGRES_KEEP_ALIVE_SECS;
use crate::election::rds::postgres::ElectionPgClient;
let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
let execution_timeout = Duration::from_secs(META_LEASE_SECS);
let statement_timeout = Duration::from_secs(META_LEASE_SECS);
let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
let mut cfg = Config::new();
cfg.keepalives = Some(true);
cfg.keepalives_idle = Some(Duration::from_secs(POSTGRES_KEEP_ALIVE_SECS));
// We use a separate pool for election since we need a different session keep-alive idle time.
let pool = create_postgres_pool_with(&opts.store_addrs, cfg).await?;
let election_client =
ElectionPgClient::new(pool, execution_timeout, meta_lease_ttl, statement_timeout)?;
let election = PgElection::with_pg_client(
opts.grpc.server_addr.clone(),
election_client,
opts.store_key_prefix.clone(),
candidate_lease_ttl,
meta_lease_ttl,
&opts.meta_table_name,
opts.meta_election_lock_id,
)
.await?;
let pool = create_postgres_pool(&opts.store_addrs).await?; let pool = create_postgres_pool(&opts.store_addrs).await?;
let kv_backend = PgStore::with_pg_pool(pool, &opts.meta_table_name, opts.max_txn_ops) let kv_backend = PgStore::with_pg_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
.await .await
.context(error::KvBackendSnafu)?; .context(error::KvBackendSnafu)?;
// Client for election should be created separately since we need a different session keep-alive idle time.
let election_client = create_postgres_client(opts).await?;
let election = PgElection::with_pg_client(
opts.server_addr.clone(),
election_client,
opts.store_key_prefix.clone(),
CANDIDATE_LEASE_SECS,
META_LEASE_SECS,
&opts.meta_table_name,
opts.meta_election_lock_id,
)
.await?;
(kv_backend, Some(election)) (kv_backend, Some(election))
} }
#[cfg(feature = "mysql_kvbackend")] #[cfg(feature = "mysql_kvbackend")]
@@ -317,7 +299,7 @@ pub async fn metasrv_builder(
let election_table_name = opts.meta_table_name.clone() + "_election"; let election_table_name = opts.meta_table_name.clone() + "_election";
let election_client = create_mysql_client(opts).await?; let election_client = create_mysql_client(opts).await?;
let election = MySqlElection::with_mysql_client( let election = MySqlElection::with_mysql_client(
opts.grpc.server_addr.clone(), opts.server_addr.clone(),
election_client, election_client,
opts.store_key_prefix.clone(), opts.store_key_prefix.clone(),
CANDIDATE_LEASE_SECS, CANDIDATE_LEASE_SECS,
@@ -390,24 +372,31 @@ pub async fn create_etcd_client(store_addrs: &[String]) -> Result<Client> {
} }
#[cfg(feature = "pg_kvbackend")] #[cfg(feature = "pg_kvbackend")]
/// Creates a pool for the Postgres backend. async fn create_postgres_client(opts: &MetasrvOptions) -> Result<tokio_postgres::Client> {
/// let postgres_url = opts
/// It only use first store addr to create a pool. .store_addrs
pub async fn create_postgres_pool(store_addrs: &[String]) -> Result<deadpool_postgres::Pool> { .first()
create_postgres_pool_with(store_addrs, Config::new()).await .context(error::InvalidArgumentsSnafu {
err_msg: "empty store addrs",
})?;
let (client, connection) = tokio_postgres::connect(postgres_url, NoTls)
.await
.context(error::ConnectPostgresSnafu)?;
tokio::spawn(async move {
if let Err(e) = connection.await {
error!(e; "connection error");
}
});
Ok(client)
} }
#[cfg(feature = "pg_kvbackend")] #[cfg(feature = "pg_kvbackend")]
/// Creates a pool for the Postgres backend. pub async fn create_postgres_pool(store_addrs: &[String]) -> Result<deadpool_postgres::Pool> {
///
/// It only use first store addr to create a pool, and use the given config to create a pool.
pub async fn create_postgres_pool_with(
store_addrs: &[String],
mut cfg: Config,
) -> Result<deadpool_postgres::Pool> {
let postgres_url = store_addrs.first().context(error::InvalidArgumentsSnafu { let postgres_url = store_addrs.first().context(error::InvalidArgumentsSnafu {
err_msg: "empty store addrs", err_msg: "empty store addrs",
})?; })?;
let mut cfg = Config::new();
cfg.url = Some(postgres_url.to_string()); cfg.url = Some(postgres_url.to_string());
let pool = cfg let pool = cfg
.create_pool(Some(Runtime::Tokio1), NoTls) .create_pool(Some(Runtime::Tokio1), NoTls)

View File

@@ -157,11 +157,6 @@ pub trait Election: Send + Sync {
/// but only one can be the leader at a time. /// but only one can be the leader at a time.
async fn campaign(&self) -> Result<()>; async fn campaign(&self) -> Result<()>;
/// Resets the campaign.
///
/// Reset the client and the leader flag if needed.
async fn reset_campaign(&self) {}
/// Returns the leader value for the current election. /// Returns the leader value for the current election.
async fn leader(&self) -> Result<Self::Leader>; async fn leader(&self) -> Result<Self::Leader>;

View File

@@ -18,12 +18,11 @@ use std::time::Duration;
use common_telemetry::{error, warn}; use common_telemetry::{error, warn};
use common_time::Timestamp; use common_time::Timestamp;
use deadpool_postgres::{Manager, Pool};
use snafu::{ensure, OptionExt, ResultExt}; use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::{broadcast, RwLock}; use tokio::sync::broadcast;
use tokio::time::MissedTickBehavior; use tokio::time::MissedTickBehavior;
use tokio_postgres::types::ToSql; use tokio_postgres::types::ToSql;
use tokio_postgres::Row; use tokio_postgres::Client;
use crate::election::rds::{parse_value_and_expire_time, Lease, RdsLeaderKey, LEASE_SEP}; use crate::election::rds::{parse_value_and_expire_time, Lease, RdsLeaderKey, LEASE_SEP};
use crate::election::{ use crate::election::{
@@ -31,14 +30,15 @@ use crate::election::{
CANDIDATES_ROOT, ELECTION_KEY, CANDIDATES_ROOT, ELECTION_KEY,
}; };
use crate::error::{ use crate::error::{
DeserializeFromJsonSnafu, GetPostgresClientSnafu, NoLeaderSnafu, PostgresExecutionSnafu, DeserializeFromJsonSnafu, NoLeaderSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu,
Result, SerializeToJsonSnafu, SqlExecutionTimeoutSnafu, UnexpectedSnafu, UnexpectedSnafu,
}; };
use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
struct ElectionSqlFactory<'a> { struct ElectionSqlFactory<'a> {
lock_id: u64, lock_id: u64,
table_name: &'a str, table_name: &'a str,
meta_lease_ttl_secs: u64,
} }
struct ElectionSqlSet { struct ElectionSqlSet {
@@ -88,10 +88,11 @@ struct ElectionSqlSet {
} }
impl<'a> ElectionSqlFactory<'a> { impl<'a> ElectionSqlFactory<'a> {
fn new(lock_id: u64, table_name: &'a str) -> Self { fn new(lock_id: u64, table_name: &'a str, meta_lease_ttl_secs: u64) -> Self {
Self { Self {
lock_id, lock_id,
table_name, table_name,
meta_lease_ttl_secs,
} }
} }
@@ -107,6 +108,15 @@ impl<'a> ElectionSqlFactory<'a> {
} }
} }
// Currently the session timeout is longer than the leader lease time.
// So the leader will renew the lease twice before the session timeout if everything goes well.
fn set_idle_session_timeout_sql(&self) -> String {
format!(
"SET idle_session_timeout = '{}s';",
self.meta_lease_ttl_secs + 1
)
}
fn campaign_sql(&self) -> String { fn campaign_sql(&self) -> String {
format!("SELECT pg_try_advisory_lock({})", self.lock_id) format!("SELECT pg_try_advisory_lock({})", self.lock_id)
} }
@@ -161,165 +171,46 @@ impl<'a> ElectionSqlFactory<'a> {
} }
} }
/// PgClient for election.
pub struct ElectionPgClient {
current: Option<deadpool::managed::Object<Manager>>,
pool: Pool,
/// The client-side timeout for statement execution.
///
/// This timeout is enforced by the client application and is independent of any server-side timeouts.
/// If a statement takes longer than this duration to execute, the client will abort the operation.
execution_timeout: Duration,
/// The idle session timeout.
///
/// This timeout is configured per client session and is enforced by the PostgreSQL server.
/// If a session remains idle for longer than this duration, the server will terminate it.
idle_session_timeout: Duration,
/// The statement timeout.
///
/// This timeout is configured per client session and is enforced by the PostgreSQL server.
/// If a statement takes longer than this duration to execute, the server will abort it.
statement_timeout: Duration,
}
impl ElectionPgClient {
pub fn new(
pool: Pool,
execution_timeout: Duration,
idle_session_timeout: Duration,
statement_timeout: Duration,
) -> Result<ElectionPgClient> {
Ok(ElectionPgClient {
current: None,
pool,
execution_timeout,
idle_session_timeout,
statement_timeout,
})
}
fn set_idle_session_timeout_sql(&self) -> String {
format!(
"SET idle_session_timeout = '{}s';",
self.idle_session_timeout.as_secs()
)
}
fn set_statement_timeout_sql(&self) -> String {
format!(
"SET statement_timeout = '{}s';",
self.statement_timeout.as_secs()
)
}
async fn reset_client(&mut self) -> Result<()> {
self.current = None;
self.maybe_init_client().await
}
async fn maybe_init_client(&mut self) -> Result<()> {
if self.current.is_none() {
let client = self.pool.get().await.context(GetPostgresClientSnafu)?;
self.current = Some(client);
// Set idle session timeout and statement timeout.
let idle_session_timeout_sql = self.set_idle_session_timeout_sql();
self.execute(&idle_session_timeout_sql, &[]).await?;
let statement_timeout_sql = self.set_statement_timeout_sql();
self.execute(&statement_timeout_sql, &[]).await?;
}
Ok(())
}
/// Returns the result of the query.
///
/// # Panics
/// if `current` is `None`.
async fn execute(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64> {
let result = tokio::time::timeout(
self.execution_timeout,
self.current.as_ref().unwrap().execute(sql, params),
)
.await
.map_err(|_| {
SqlExecutionTimeoutSnafu {
sql: sql.to_string(),
duration: self.execution_timeout,
}
.build()
})?;
result.context(PostgresExecutionSnafu { sql })
}
/// Returns the result of the query.
///
/// # Panics
/// if `current` is `None`.
async fn query(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>> {
let result = tokio::time::timeout(
self.execution_timeout,
self.current.as_ref().unwrap().query(sql, params),
)
.await
.map_err(|_| {
SqlExecutionTimeoutSnafu {
sql: sql.to_string(),
duration: self.execution_timeout,
}
.build()
})?;
result.context(PostgresExecutionSnafu { sql })
}
}
/// PostgreSql implementation of Election. /// PostgreSql implementation of Election.
pub struct PgElection { pub struct PgElection {
leader_value: String, leader_value: String,
pg_client: RwLock<ElectionPgClient>, client: Client,
is_leader: AtomicBool, is_leader: AtomicBool,
leader_infancy: AtomicBool, leader_infancy: AtomicBool,
leader_watcher: broadcast::Sender<LeaderChangeMessage>, leader_watcher: broadcast::Sender<LeaderChangeMessage>,
store_key_prefix: String, store_key_prefix: String,
candidate_lease_ttl: Duration, candidate_lease_ttl_secs: u64,
meta_lease_ttl: Duration, meta_lease_ttl_secs: u64,
sql_set: ElectionSqlSet, sql_set: ElectionSqlSet,
} }
impl PgElection { impl PgElection {
async fn maybe_init_client(&self) -> Result<()> {
if self.pg_client.read().await.current.is_none() {
self.pg_client.write().await.maybe_init_client().await?;
}
Ok(())
}
pub async fn with_pg_client( pub async fn with_pg_client(
leader_value: String, leader_value: String,
pg_client: ElectionPgClient, client: Client,
store_key_prefix: String, store_key_prefix: String,
candidate_lease_ttl: Duration, candidate_lease_ttl_secs: u64,
meta_lease_ttl: Duration, meta_lease_ttl_secs: u64,
table_name: &str, table_name: &str,
lock_id: u64, lock_id: u64,
) -> Result<ElectionRef> { ) -> Result<ElectionRef> {
let sql_factory = ElectionSqlFactory::new(lock_id, table_name); let sql_factory = ElectionSqlFactory::new(lock_id, table_name, meta_lease_ttl_secs);
// Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead advisory lock.
client
.execute(&sql_factory.set_idle_session_timeout_sql(), &[])
.await
.context(PostgresExecutionSnafu)?;
let tx = listen_leader_change(leader_value.clone()); let tx = listen_leader_change(leader_value.clone());
Ok(Arc::new(Self { Ok(Arc::new(Self {
leader_value, leader_value,
pg_client: RwLock::new(pg_client), client,
is_leader: AtomicBool::new(false), is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(false), leader_infancy: AtomicBool::new(false),
leader_watcher: tx, leader_watcher: tx,
store_key_prefix, store_key_prefix,
candidate_lease_ttl, candidate_lease_ttl_secs,
meta_lease_ttl, meta_lease_ttl_secs,
sql_set: sql_factory.build(), sql_set: sql_factory.build(),
})) }))
} }
@@ -358,17 +249,18 @@ impl Election for PgElection {
input: format!("{node_info:?}"), input: format!("{node_info:?}"),
})?; })?;
let res = self let res = self
.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl) .put_value_with_lease(&key, &node_info, self.candidate_lease_ttl_secs)
.await?; .await?;
// May registered before, just update the lease. // May registered before, just update the lease.
if !res { if !res {
self.delete_value(&key).await?; self.delete_value(&key).await?;
self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl) self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl_secs)
.await?; .await?;
} }
// Check if the current lease has expired and renew the lease. // Check if the current lease has expired and renew the lease.
let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2); let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(self.candidate_lease_ttl_secs / 2));
loop { loop {
let _ = keep_alive_interval.tick().await; let _ = keep_alive_interval.tick().await;
@@ -390,8 +282,13 @@ impl Election for PgElection {
); );
// Safety: origin is Some since we are using `get_value_with_lease` with `true`. // Safety: origin is Some since we are using `get_value_with_lease` with `true`.
self.update_value_with_lease(&key, &lease.origin, &node_info, self.candidate_lease_ttl) self.update_value_with_lease(
.await?; &key,
&lease.origin,
&node_info,
self.candidate_lease_ttl_secs,
)
.await?;
} }
} }
@@ -424,17 +321,16 @@ impl Election for PgElection {
/// - If the lock is not acquired (result is false), it calls the `follower_action` method /// - If the lock is not acquired (result is false), it calls the `follower_action` method
/// to perform actions as a follower. /// to perform actions as a follower.
async fn campaign(&self) -> Result<()> { async fn campaign(&self) -> Result<()> {
let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2); let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(self.meta_lease_ttl_secs / 2));
keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
self.maybe_init_client().await?;
loop { loop {
let res = self let res = self
.pg_client .client
.read()
.await
.query(&self.sql_set.campaign, &[]) .query(&self.sql_set.campaign, &[])
.await?; .await
.context(PostgresExecutionSnafu)?;
let row = res.first().context(UnexpectedSnafu { let row = res.first().context(UnexpectedSnafu {
violated: "Failed to get the result of acquiring advisory lock", violated: "Failed to get the result of acquiring advisory lock",
})?; })?;
@@ -453,12 +349,6 @@ impl Election for PgElection {
} }
} }
async fn reset_campaign(&self) {
if let Err(err) = self.pg_client.write().await.reset_client().await {
error!(err; "Failed to reset client");
}
}
async fn leader(&self) -> Result<Self::Leader> { async fn leader(&self) -> Result<Self::Leader> {
if self.is_leader.load(Ordering::Relaxed) { if self.is_leader.load(Ordering::Relaxed) {
Ok(self.leader_value.as_bytes().into()) Ok(self.leader_value.as_bytes().into())
@@ -486,13 +376,11 @@ impl PgElection {
/// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned. /// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned.
async fn get_value_with_lease(&self, key: &str) -> Result<Option<Lease>> { async fn get_value_with_lease(&self, key: &str) -> Result<Option<Lease>> {
let key = key.as_bytes(); let key = key.as_bytes();
self.maybe_init_client().await?;
let res = self let res = self
.pg_client .client
.read()
.await
.query(&self.sql_set.get_value_with_lease, &[&key]) .query(&self.sql_set.get_value_with_lease, &[&key])
.await?; .await
.context(PostgresExecutionSnafu)?;
if res.is_empty() { if res.is_empty() {
Ok(None) Ok(None)
@@ -526,13 +414,11 @@ impl PgElection {
key_prefix: &str, key_prefix: &str,
) -> Result<(Vec<(String, Timestamp)>, Timestamp)> { ) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec(); let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
self.maybe_init_client().await?;
let res = self let res = self
.pg_client .client
.read()
.await
.query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix]) .query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix])
.await?; .await
.context(PostgresExecutionSnafu)?;
let mut values_with_leases = vec![]; let mut values_with_leases = vec![];
let mut current = Timestamp::default(); let mut current = Timestamp::default();
@@ -559,21 +445,18 @@ impl PgElection {
key: &str, key: &str,
prev: &str, prev: &str,
updated: &str, updated: &str,
lease_ttl: Duration, lease_ttl: u64,
) -> Result<()> { ) -> Result<()> {
let key = key.as_bytes(); let key = key.as_bytes();
let prev = prev.as_bytes(); let prev = prev.as_bytes();
self.maybe_init_client().await?;
let lease_ttl_secs = lease_ttl.as_secs() as f64;
let res = self let res = self
.pg_client .client
.read()
.await
.execute( .execute(
&self.sql_set.update_value_with_lease, &self.sql_set.update_value_with_lease,
&[&key, &prev, &updated, &lease_ttl_secs], &[&key, &prev, &updated, &(lease_ttl as f64)],
) )
.await?; .await
.context(PostgresExecutionSnafu)?;
ensure!( ensure!(
res == 1, res == 1,
@@ -590,18 +473,16 @@ impl PgElection {
&self, &self,
key: &str, key: &str,
value: &str, value: &str,
lease_ttl: Duration, lease_ttl_secs: u64,
) -> Result<bool> { ) -> Result<bool> {
let key = key.as_bytes(); let key = key.as_bytes();
let lease_ttl_secs = lease_ttl.as_secs() as f64; let lease_ttl_secs = lease_ttl_secs as f64;
let params: Vec<&(dyn ToSql + Sync)> = vec![&key, &value, &lease_ttl_secs]; let params: Vec<&(dyn ToSql + Sync)> = vec![&key, &value, &lease_ttl_secs];
self.maybe_init_client().await?;
let res = self let res = self
.pg_client .client
.read()
.await
.query(&self.sql_set.put_value_with_lease, &params) .query(&self.sql_set.put_value_with_lease, &params)
.await?; .await
.context(PostgresExecutionSnafu)?;
Ok(res.is_empty()) Ok(res.is_empty())
} }
@@ -609,13 +490,11 @@ impl PgElection {
/// Caution: Should only delete the key if the lease is expired. /// Caution: Should only delete the key if the lease is expired.
async fn delete_value(&self, key: &str) -> Result<bool> { async fn delete_value(&self, key: &str) -> Result<bool> {
let key = key.as_bytes(); let key = key.as_bytes();
self.maybe_init_client().await?;
let res = self let res = self
.pg_client .client
.read()
.await
.query(&self.sql_set.delete_value, &[&key]) .query(&self.sql_set.delete_value, &[&key])
.await?; .await
.context(PostgresExecutionSnafu)?;
Ok(res.len() == 1) Ok(res.len() == 1)
} }
@@ -657,7 +536,7 @@ impl PgElection {
&key, &key,
&lease.origin, &lease.origin,
&self.leader_value, &self.leader_value,
self.meta_lease_ttl, self.meta_lease_ttl_secs,
) )
.await?; .await?;
} }
@@ -726,12 +605,10 @@ impl PgElection {
..Default::default() ..Default::default()
}; };
self.delete_value(&key).await?; self.delete_value(&key).await?;
self.maybe_init_client().await?; self.client
self.pg_client
.read()
.await
.query(&self.sql_set.step_down, &[]) .query(&self.sql_set.step_down, &[])
.await?; .await
.context(PostgresExecutionSnafu)?;
send_leader_change_and_set_flags( send_leader_change_and_set_flags(
&self.is_leader, &self.is_leader,
&self.leader_infancy, &self.leader_infancy,
@@ -774,7 +651,7 @@ impl PgElection {
..Default::default() ..Default::default()
}; };
self.delete_value(&key).await?; self.delete_value(&key).await?;
self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl) self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl_secs)
.await?; .await?;
if self if self
@@ -797,21 +674,15 @@ impl PgElection {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::assert_matches::assert_matches;
use std::env; use std::env;
use common_meta::maybe_skip_postgres_integration_test; use common_meta::maybe_skip_postgres_integration_test;
use tokio_postgres::{Client, NoTls};
use super::*; use super::*;
use crate::bootstrap::create_postgres_pool; use crate::error::PostgresExecutionSnafu;
use crate::error;
async fn create_postgres_client( async fn create_postgres_client(table_name: Option<&str>) -> Result<Client> {
table_name: Option<&str>,
execution_timeout: Duration,
idle_session_timeout: Duration,
statement_timeout: Duration,
) -> Result<ElectionPgClient> {
let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default(); let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default();
if endpoint.is_empty() { if endpoint.is_empty() {
return UnexpectedSnafu { return UnexpectedSnafu {
@@ -819,34 +690,25 @@ mod tests {
} }
.fail(); .fail();
} }
let pool = create_postgres_pool(&[endpoint]).await.unwrap(); let (client, connection) = tokio_postgres::connect(&endpoint, NoTls)
let mut pg_client = ElectionPgClient::new( .await
pool, .context(PostgresExecutionSnafu)?;
execution_timeout, tokio::spawn(async move {
idle_session_timeout, connection.await.context(PostgresExecutionSnafu).unwrap();
statement_timeout, });
)
.unwrap();
pg_client.maybe_init_client().await?;
if let Some(table_name) = table_name { if let Some(table_name) = table_name {
let create_table_sql = format!( let create_table_sql = format!(
"CREATE TABLE IF NOT EXISTS \"{}\"(k bytea PRIMARY KEY, v bytea);", "CREATE TABLE IF NOT EXISTS \"{}\"(k bytea PRIMARY KEY, v bytea);",
table_name table_name
); );
pg_client.execute(&create_table_sql, &[]).await?; client.execute(&create_table_sql, &[]).await.unwrap();
} }
Ok(pg_client) Ok(client)
} }
async fn drop_table(pg_election: &PgElection, table_name: &str) { async fn drop_table(client: &Client, table_name: &str) {
let sql = format!("DROP TABLE IF EXISTS \"{}\";", table_name); let sql = format!("DROP TABLE IF EXISTS \"{}\";", table_name);
pg_election client.execute(&sql, &[]).await.unwrap();
.pg_client
.read()
.await
.execute(&sql, &[])
.await
.unwrap();
} }
#[tokio::test] #[tokio::test]
@@ -857,35 +719,23 @@ mod tests {
let uuid = uuid::Uuid::new_v4().to_string(); let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_postgres_crud_greptime_metakv"; let table_name = "test_postgres_crud_greptime_metakv";
let candidate_lease_ttl = Duration::from_secs(10); let client = create_postgres_client(Some(table_name)).await.unwrap();
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let (tx, _) = broadcast::channel(100); let (tx, _) = broadcast::channel(100);
let pg_election = PgElection { let pg_election = PgElection {
leader_value: "test_leader".to_string(), leader_value: "test_leader".to_string(),
pg_client: RwLock::new(client), client,
is_leader: AtomicBool::new(false), is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true), leader_infancy: AtomicBool::new(true),
leader_watcher: tx, leader_watcher: tx,
store_key_prefix: uuid, store_key_prefix: uuid,
candidate_lease_ttl, candidate_lease_ttl_secs: 10,
meta_lease_ttl, meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28319, table_name).build(), sql_set: ElectionSqlFactory::new(28319, table_name, 2).build(),
}; };
let res = pg_election let res = pg_election
.put_value_with_lease(&key, &value, candidate_lease_ttl) .put_value_with_lease(&key, &value, 10)
.await .await
.unwrap(); .unwrap();
assert!(res); assert!(res);
@@ -898,7 +748,7 @@ mod tests {
assert_eq!(lease.leader_value, value); assert_eq!(lease.leader_value, value);
pg_election pg_election
.update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl) .update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl_secs)
.await .await
.unwrap(); .unwrap();
@@ -912,7 +762,7 @@ mod tests {
let key = format!("test_key_{}", i); let key = format!("test_key_{}", i);
let value = format!("test_value_{}", i); let value = format!("test_value_{}", i);
pg_election pg_election
.put_value_with_lease(&key, &value, candidate_lease_ttl) .put_value_with_lease(&key, &value, 10)
.await .await
.unwrap(); .unwrap();
} }
@@ -937,39 +787,28 @@ mod tests {
assert!(res.is_empty()); assert!(res.is_empty());
assert!(current == Timestamp::default()); assert!(current == Timestamp::default());
drop_table(&pg_election, table_name).await; drop_table(&pg_election.client, table_name).await;
} }
async fn candidate( async fn candidate(
leader_value: String, leader_value: String,
candidate_lease_ttl: Duration, candidate_lease_ttl_secs: u64,
store_key_prefix: String, store_key_prefix: String,
table_name: String, table_name: String,
) { ) {
let execution_timeout = Duration::from_secs(10); let client = create_postgres_client(None).await.unwrap();
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let client = create_postgres_client(
None,
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let (tx, _) = broadcast::channel(100); let (tx, _) = broadcast::channel(100);
let pg_election = PgElection { let pg_election = PgElection {
leader_value, leader_value,
pg_client: RwLock::new(client), client,
is_leader: AtomicBool::new(false), is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true), leader_infancy: AtomicBool::new(true),
leader_watcher: tx, leader_watcher: tx,
store_key_prefix, store_key_prefix,
candidate_lease_ttl, candidate_lease_ttl_secs,
meta_lease_ttl, meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28319, &table_name).build(), sql_set: ElectionSqlFactory::new(28319, &table_name, 2).build(),
}; };
let node_info = MetasrvNodeInfo { let node_info = MetasrvNodeInfo {
@@ -985,28 +824,17 @@ mod tests {
async fn test_candidate_registration() { async fn test_candidate_registration() {
maybe_skip_postgres_integration_test!(); maybe_skip_postgres_integration_test!();
let leader_value_prefix = "test_leader".to_string(); let leader_value_prefix = "test_leader".to_string();
let candidate_lease_ttl_secs = 5;
let uuid = uuid::Uuid::new_v4().to_string(); let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_candidate_registration_greptime_metakv"; let table_name = "test_candidate_registration_greptime_metakv";
let mut handles = vec![]; let mut handles = vec![];
let candidate_lease_ttl = Duration::from_secs(5); let client = create_postgres_client(Some(table_name)).await.unwrap();
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
for i in 0..10 { for i in 0..10 {
let leader_value = format!("{}{}", leader_value_prefix, i); let leader_value = format!("{}{}", leader_value_prefix, i);
let handle = tokio::spawn(candidate( let handle = tokio::spawn(candidate(
leader_value, leader_value,
candidate_lease_ttl, candidate_lease_ttl_secs,
uuid.clone(), uuid.clone(),
table_name.to_string(), table_name.to_string(),
)); ));
@@ -1019,14 +847,14 @@ mod tests {
let leader_value = "test_leader".to_string(); let leader_value = "test_leader".to_string();
let pg_election = PgElection { let pg_election = PgElection {
leader_value, leader_value,
pg_client: RwLock::new(client), client,
is_leader: AtomicBool::new(false), is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true), leader_infancy: AtomicBool::new(true),
leader_watcher: tx, leader_watcher: tx,
store_key_prefix: uuid.clone(), store_key_prefix: uuid.clone(),
candidate_lease_ttl, candidate_lease_ttl_secs,
meta_lease_ttl, meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28319, table_name).build(), sql_set: ElectionSqlFactory::new(28319, table_name, 2).build(),
}; };
let candidates = pg_election.all_candidates().await.unwrap(); let candidates = pg_election.all_candidates().await.unwrap();
@@ -1048,40 +876,29 @@ mod tests {
assert!(res); assert!(res);
} }
drop_table(&pg_election, table_name).await; drop_table(&pg_election.client, table_name).await;
} }
#[tokio::test] #[tokio::test]
async fn test_elected_and_step_down() { async fn test_elected_and_step_down() {
maybe_skip_postgres_integration_test!(); maybe_skip_postgres_integration_test!();
let leader_value = "test_leader".to_string(); let leader_value = "test_leader".to_string();
let candidate_lease_ttl_secs = 5;
let uuid = uuid::Uuid::new_v4().to_string(); let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_elected_and_step_down_greptime_metakv"; let table_name = "test_elected_and_step_down_greptime_metakv";
let candidate_lease_ttl = Duration::from_secs(5); let client = create_postgres_client(Some(table_name)).await.unwrap();
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let (tx, mut rx) = broadcast::channel(100); let (tx, mut rx) = broadcast::channel(100);
let leader_pg_election = PgElection { let leader_pg_election = PgElection {
leader_value: leader_value.clone(), leader_value: leader_value.clone(),
pg_client: RwLock::new(client), client,
is_leader: AtomicBool::new(false), is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true), leader_infancy: AtomicBool::new(true),
leader_watcher: tx, leader_watcher: tx,
store_key_prefix: uuid, store_key_prefix: uuid,
candidate_lease_ttl, candidate_lease_ttl_secs,
meta_lease_ttl, meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28320, table_name).build(), sql_set: ElectionSqlFactory::new(28320, table_name, 2).build(),
}; };
leader_pg_election.elected().await.unwrap(); leader_pg_election.elected().await.unwrap();
@@ -1173,7 +990,7 @@ mod tests {
_ => panic!("Expected LeaderChangeMessage::StepDown"), _ => panic!("Expected LeaderChangeMessage::StepDown"),
} }
drop_table(&leader_pg_election, table_name).await; drop_table(&leader_pg_election.client, table_name).await;
} }
#[tokio::test] #[tokio::test]
@@ -1182,38 +999,25 @@ mod tests {
let leader_value = "test_leader".to_string(); let leader_value = "test_leader".to_string();
let uuid = uuid::Uuid::new_v4().to_string(); let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_leader_action_greptime_metakv"; let table_name = "test_leader_action_greptime_metakv";
let candidate_lease_ttl = Duration::from_secs(5); let candidate_lease_ttl_secs = 5;
let execution_timeout = Duration::from_secs(10); let client = create_postgres_client(Some(table_name)).await.unwrap();
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let (tx, mut rx) = broadcast::channel(100); let (tx, mut rx) = broadcast::channel(100);
let leader_pg_election = PgElection { let leader_pg_election = PgElection {
leader_value: leader_value.clone(), leader_value: leader_value.clone(),
pg_client: RwLock::new(client), client,
is_leader: AtomicBool::new(false), is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true), leader_infancy: AtomicBool::new(true),
leader_watcher: tx, leader_watcher: tx,
store_key_prefix: uuid, store_key_prefix: uuid,
candidate_lease_ttl, candidate_lease_ttl_secs,
meta_lease_ttl, meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28321, table_name).build(), sql_set: ElectionSqlFactory::new(28321, table_name, 2).build(),
}; };
// Step 1: No leader exists, campaign and elected. // Step 1: No leader exists, campaign and elected.
let res = leader_pg_election let res = leader_pg_election
.pg_client .client
.read()
.await
.query(&leader_pg_election.sql_set.campaign, &[]) .query(&leader_pg_election.sql_set.campaign, &[])
.await .await
.unwrap(); .unwrap();
@@ -1244,9 +1048,7 @@ mod tests {
// Step 2: As a leader, renew the lease. // Step 2: As a leader, renew the lease.
let res = leader_pg_election let res = leader_pg_election
.pg_client .client
.read()
.await
.query(&leader_pg_election.sql_set.campaign, &[]) .query(&leader_pg_election.sql_set.campaign, &[])
.await .await
.unwrap(); .unwrap();
@@ -1268,9 +1070,7 @@ mod tests {
tokio::time::sleep(Duration::from_secs(2)).await; tokio::time::sleep(Duration::from_secs(2)).await;
let res = leader_pg_election let res = leader_pg_election
.pg_client .client
.read()
.await
.query(&leader_pg_election.sql_set.campaign, &[]) .query(&leader_pg_election.sql_set.campaign, &[])
.await .await
.unwrap(); .unwrap();
@@ -1298,9 +1098,7 @@ mod tests {
// Step 4: Re-campaign and elected. // Step 4: Re-campaign and elected.
let res = leader_pg_election let res = leader_pg_election
.pg_client .client
.read()
.await
.query(&leader_pg_election.sql_set.campaign, &[]) .query(&leader_pg_election.sql_set.campaign, &[])
.await .await
.unwrap(); .unwrap();
@@ -1357,9 +1155,7 @@ mod tests {
// Step 6: Re-campaign and elected. // Step 6: Re-campaign and elected.
let res = leader_pg_election let res = leader_pg_election
.pg_client .client
.read()
.await
.query(&leader_pg_election.sql_set.campaign, &[]) .query(&leader_pg_election.sql_set.campaign, &[])
.await .await
.unwrap(); .unwrap();
@@ -1390,9 +1186,7 @@ mod tests {
// Step 7: Something wrong, the leader key changed by others. // Step 7: Something wrong, the leader key changed by others.
let res = leader_pg_election let res = leader_pg_election
.pg_client .client
.read()
.await
.query(&leader_pg_election.sql_set.campaign, &[]) .query(&leader_pg_election.sql_set.campaign, &[])
.await .await
.unwrap(); .unwrap();
@@ -1403,11 +1197,7 @@ mod tests {
.await .await
.unwrap(); .unwrap();
leader_pg_election leader_pg_election
.put_value_with_lease( .put_value_with_lease(&leader_pg_election.election_key(), "test", 10)
&leader_pg_election.election_key(),
"test",
Duration::from_secs(10),
)
.await .await
.unwrap(); .unwrap();
leader_pg_election.leader_action().await.unwrap(); leader_pg_election.leader_action().await.unwrap();
@@ -1433,74 +1223,52 @@ mod tests {
// Clean up // Clean up
leader_pg_election leader_pg_election
.pg_client .client
.read()
.await
.query(&leader_pg_election.sql_set.step_down, &[]) .query(&leader_pg_election.sql_set.step_down, &[])
.await .await
.unwrap(); .unwrap();
drop_table(&leader_pg_election, table_name).await; drop_table(&leader_pg_election.client, table_name).await;
} }
#[tokio::test] #[tokio::test]
async fn test_follower_action() { async fn test_follower_action() {
maybe_skip_postgres_integration_test!(); maybe_skip_postgres_integration_test!();
common_telemetry::init_default_ut_logging(); common_telemetry::init_default_ut_logging();
let candidate_lease_ttl_secs = 5;
let uuid = uuid::Uuid::new_v4().to_string(); let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_follower_action_greptime_metakv"; let table_name = "test_follower_action_greptime_metakv";
let candidate_lease_ttl = Duration::from_secs(5); let follower_client = create_postgres_client(Some(table_name)).await.unwrap();
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let follower_client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let (tx, mut rx) = broadcast::channel(100); let (tx, mut rx) = broadcast::channel(100);
let follower_pg_election = PgElection { let follower_pg_election = PgElection {
leader_value: "test_follower".to_string(), leader_value: "test_follower".to_string(),
pg_client: RwLock::new(follower_client), client: follower_client,
is_leader: AtomicBool::new(false), is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true), leader_infancy: AtomicBool::new(true),
leader_watcher: tx, leader_watcher: tx,
store_key_prefix: uuid.clone(), store_key_prefix: uuid.clone(),
candidate_lease_ttl, candidate_lease_ttl_secs,
meta_lease_ttl, meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28322, table_name).build(), sql_set: ElectionSqlFactory::new(28322, table_name, 2).build(),
}; };
let leader_client = create_postgres_client( let leader_client = create_postgres_client(Some(table_name)).await.unwrap();
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let (tx, _) = broadcast::channel(100); let (tx, _) = broadcast::channel(100);
let leader_pg_election = PgElection { let leader_pg_election = PgElection {
leader_value: "test_leader".to_string(), leader_value: "test_leader".to_string(),
pg_client: RwLock::new(leader_client), client: leader_client,
is_leader: AtomicBool::new(false), is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true), leader_infancy: AtomicBool::new(true),
leader_watcher: tx, leader_watcher: tx,
store_key_prefix: uuid, store_key_prefix: uuid,
candidate_lease_ttl, candidate_lease_ttl_secs,
meta_lease_ttl, meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28322, table_name).build(), sql_set: ElectionSqlFactory::new(28322, table_name, 2).build(),
}; };
leader_pg_election leader_pg_election
.pg_client .client
.read()
.await
.query(&leader_pg_election.sql_set.campaign, &[]) .query(&leader_pg_election.sql_set.campaign, &[])
.await .await
.unwrap(); .unwrap();
@@ -1541,41 +1309,11 @@ mod tests {
// Clean up // Clean up
leader_pg_election leader_pg_election
.pg_client .client
.read()
.await
.query(&leader_pg_election.sql_set.step_down, &[]) .query(&leader_pg_election.sql_set.step_down, &[])
.await .await
.unwrap(); .unwrap();
drop_table(&follower_pg_election, table_name).await; drop_table(&follower_pg_election.client, table_name).await;
}
#[tokio::test]
async fn test_idle_session_timeout() {
maybe_skip_postgres_integration_test!();
common_telemetry::init_default_ut_logging();
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let idle_session_timeout = Duration::from_secs(1);
let mut client = create_postgres_client(
None,
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(1100)).await;
// Wait for the idle session timeout.
let err = client.query("SELECT 1", &[]).await.unwrap_err();
assert_matches!(err, error::Error::PostgresExecution { .. });
let error::Error::PostgresExecution { error, .. } = err else {
panic!("Expected PostgresExecution error");
};
assert!(error.is_closed());
// Reset the client and try again.
client.reset_client().await.unwrap();
let _ = client.query("SELECT 1", &[]).await.unwrap();
} }
} }

View File

@@ -748,31 +748,21 @@ pub enum Error {
}, },
#[cfg(feature = "pg_kvbackend")] #[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to execute via postgres, sql: {}", sql))] #[snafu(display("Failed to execute via postgres"))]
PostgresExecution { PostgresExecution {
#[snafu(source)] #[snafu(source)]
error: tokio_postgres::Error, error: tokio_postgres::Error,
sql: String,
#[snafu(implicit)] #[snafu(implicit)]
location: Location, location: Location,
}, },
#[cfg(feature = "pg_kvbackend")] #[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to get Postgres client"))] #[snafu(display("Failed to connect to Postgres"))]
GetPostgresClient { ConnectPostgres {
#[snafu(implicit)]
location: Location,
#[snafu(source)] #[snafu(source)]
error: deadpool::managed::PoolError<tokio_postgres::Error>, error: tokio_postgres::Error,
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Sql execution timeout, sql: {}, duration: {:?}", sql, duration))]
SqlExecutionTimeout {
#[snafu(implicit)] #[snafu(implicit)]
location: Location, location: Location,
sql: String,
duration: std::time::Duration,
}, },
#[cfg(feature = "pg_kvbackend")] #[cfg(feature = "pg_kvbackend")]
@@ -1015,10 +1005,9 @@ impl ErrorExt for Error {
Error::LookupPeer { source, .. } => source.status_code(), Error::LookupPeer { source, .. } => source.status_code(),
#[cfg(feature = "pg_kvbackend")] #[cfg(feature = "pg_kvbackend")]
Error::CreatePostgresPool { .. } Error::CreatePostgresPool { .. }
| Error::GetPostgresClient { .. }
| Error::GetPostgresConnection { .. } | Error::GetPostgresConnection { .. }
| Error::PostgresExecution { .. } | Error::PostgresExecution { .. }
| Error::SqlExecutionTimeout { .. } => StatusCode::Internal, | Error::ConnectPostgres { .. } => StatusCode::Internal,
#[cfg(feature = "mysql_kvbackend")] #[cfg(feature = "mysql_kvbackend")]
Error::MySqlExecution { .. } Error::MySqlExecution { .. }
| Error::CreateMySqlPool { .. } | Error::CreateMySqlPool { .. }

View File

@@ -46,7 +46,6 @@ use common_telemetry::{error, info, warn};
use common_wal::config::MetasrvWalConfig; use common_wal::config::MetasrvWalConfig;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption; use servers::export_metrics::ExportMetricsOption;
use servers::grpc::GrpcOptions;
use servers::http::HttpOptions; use servers::http::HttpOptions;
use snafu::{OptionExt, ResultExt}; use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId; use store_api::storage::RegionId;
@@ -97,10 +96,8 @@ pub enum BackendImpl {
#[serde(default)] #[serde(default)]
pub struct MetasrvOptions { pub struct MetasrvOptions {
/// The address the server listens on. /// The address the server listens on.
#[deprecated(note = "Use grpc.bind_addr instead")]
pub bind_addr: String, pub bind_addr: String,
/// The address the server advertises to the clients. /// The address the server advertises to the clients.
#[deprecated(note = "Use grpc.server_addr instead")]
pub server_addr: String, pub server_addr: String,
/// The address of the store, e.g., etcd. /// The address of the store, e.g., etcd.
pub store_addrs: Vec<String>, pub store_addrs: Vec<String>,
@@ -115,7 +112,6 @@ pub struct MetasrvOptions {
/// If it's true, the region failover will be allowed even if the local WAL is used. /// If it's true, the region failover will be allowed even if the local WAL is used.
/// Note that this option is not recommended to be set to true, because it may lead to data loss during failover. /// Note that this option is not recommended to be set to true, because it may lead to data loss during failover.
pub allow_region_failover_on_local_wal: bool, pub allow_region_failover_on_local_wal: bool,
pub grpc: GrpcOptions,
/// The HTTP server options. /// The HTTP server options.
pub http: HttpOptions, pub http: HttpOptions,
/// The logging options. /// The logging options.
@@ -170,6 +166,8 @@ impl fmt::Debug for MetasrvOptions {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut debug_struct = f.debug_struct("MetasrvOptions"); let mut debug_struct = f.debug_struct("MetasrvOptions");
debug_struct debug_struct
.field("bind_addr", &self.bind_addr)
.field("server_addr", &self.server_addr)
.field("store_addrs", &self.sanitize_store_addrs()) .field("store_addrs", &self.sanitize_store_addrs())
.field("selector", &self.selector) .field("selector", &self.selector)
.field("use_memory_store", &self.use_memory_store) .field("use_memory_store", &self.use_memory_store)
@@ -178,7 +176,6 @@ impl fmt::Debug for MetasrvOptions {
"allow_region_failover_on_local_wal", "allow_region_failover_on_local_wal",
&self.allow_region_failover_on_local_wal, &self.allow_region_failover_on_local_wal,
) )
.field("grpc", &self.grpc)
.field("http", &self.http) .field("http", &self.http)
.field("logging", &self.logging) .field("logging", &self.logging)
.field("procedure", &self.procedure) .field("procedure", &self.procedure)
@@ -211,19 +208,14 @@ const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
impl Default for MetasrvOptions { impl Default for MetasrvOptions {
fn default() -> Self { fn default() -> Self {
Self { Self {
#[allow(deprecated)] bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
bind_addr: String::new(), // If server_addr is not set, the server will use the local ip address as the server address.
#[allow(deprecated)]
server_addr: String::new(), server_addr: String::new(),
store_addrs: vec!["127.0.0.1:2379".to_string()], store_addrs: vec!["127.0.0.1:2379".to_string()],
selector: SelectorType::default(), selector: SelectorType::default(),
use_memory_store: false, use_memory_store: false,
enable_region_failover: false, enable_region_failover: false,
allow_region_failover_on_local_wal: false, allow_region_failover_on_local_wal: false,
grpc: GrpcOptions {
bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
..Default::default()
},
http: HttpOptions::default(), http: HttpOptions::default(),
logging: LoggingOptions::default(), logging: LoggingOptions::default(),
procedure: ProcedureConfig { procedure: ProcedureConfig {
@@ -261,6 +253,37 @@ impl Configurable for MetasrvOptions {
} }
impl MetasrvOptions { impl MetasrvOptions {
/// Detect server address.
#[cfg(not(target_os = "android"))]
pub fn detect_server_addr(&mut self) {
if self.server_addr.is_empty() {
match local_ip_address::local_ip() {
Ok(ip) => {
let detected_addr = format!(
"{}:{}",
ip,
self.bind_addr
.split(':')
.nth(1)
.unwrap_or(DEFAULT_METASRV_ADDR_PORT)
);
info!("Using detected: {} as server address", detected_addr);
self.server_addr = detected_addr;
}
Err(e) => {
error!("Failed to detect local ip address: {}", e);
}
}
}
}
#[cfg(target_os = "android")]
pub fn detect_server_addr(&mut self) {
if self.server_addr.is_empty() {
common_telemetry::debug!("detect local IP is not supported on Android");
}
}
fn sanitize_store_addrs(&self) -> Vec<String> { fn sanitize_store_addrs(&self) -> Vec<String> {
self.store_addrs self.store_addrs
.iter() .iter()
@@ -559,7 +582,6 @@ impl Metasrv {
if let Err(e) = res { if let Err(e) = res {
warn!(e; "Metasrv election error"); warn!(e; "Metasrv election error");
} }
election.reset_campaign().await;
info!("Metasrv re-initiate election"); info!("Metasrv re-initiate election");
} }
info!("Metasrv stopped"); info!("Metasrv stopped");
@@ -616,7 +638,7 @@ impl Metasrv {
pub fn node_info(&self) -> MetasrvNodeInfo { pub fn node_info(&self) -> MetasrvNodeInfo {
let build_info = common_version::build_info(); let build_info = common_version::build_info();
MetasrvNodeInfo { MetasrvNodeInfo {
addr: self.options().grpc.server_addr.clone(), addr: self.options().server_addr.clone(),
version: build_info.version.to_string(), version: build_info.version.to_string(),
git_commit: build_info.commit_short.to_string(), git_commit: build_info.commit_short.to_string(),
start_time_ms: self.start_time_ms(), start_time_ms: self.start_time_ms(),
@@ -708,7 +730,7 @@ impl Metasrv {
#[inline] #[inline]
pub fn new_ctx(&self) -> Context { pub fn new_ctx(&self) -> Context {
let server_addr = self.options().grpc.server_addr.clone(); let server_addr = self.options().server_addr.clone();
let in_memory = self.in_memory.clone(); let in_memory = self.in_memory.clone();
let kv_backend = self.kv_backend.clone(); let kv_backend = self.kv_backend.clone();
let leader_cached_kv_backend = self.leader_cached_kv_backend.clone(); let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();

View File

@@ -179,8 +179,8 @@ impl MetasrvBuilder {
let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemoryKvBackend::new())); let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
let state = Arc::new(RwLock::new(match election { let state = Arc::new(RwLock::new(match election {
None => State::leader(options.grpc.server_addr.to_string(), true), None => State::leader(options.server_addr.to_string(), true),
Some(_) => State::follower(options.grpc.server_addr.to_string()), Some(_) => State::follower(options.server_addr.to_string()),
})); }));
let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::new( let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::new(
@@ -203,7 +203,7 @@ impl MetasrvBuilder {
)); ));
let maintenance_mode_manager = Arc::new(MaintenanceModeManager::new(kv_backend.clone())); let maintenance_mode_manager = Arc::new(MaintenanceModeManager::new(kv_backend.clone()));
let selector_ctx = SelectorContext { let selector_ctx = SelectorContext {
server_addr: options.grpc.server_addr.clone(), server_addr: options.server_addr.clone(),
datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS, datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS,
flownode_lease_secs: distributed_time_constants::FLOWNODE_LEASE_SECS, flownode_lease_secs: distributed_time_constants::FLOWNODE_LEASE_SECS,
kv_backend: kv_backend.clone(), kv_backend: kv_backend.clone(),
@@ -272,7 +272,7 @@ impl MetasrvBuilder {
let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new( let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new(
mailbox.clone(), mailbox.clone(),
MetasrvInfo { MetasrvInfo {
server_addr: options.grpc.server_addr.clone(), server_addr: options.server_addr.clone(),
}, },
)); ));
let peer_lookup_service = Arc::new(MetaPeerLookupService::new(meta_peer_client.clone())); let peer_lookup_service = Arc::new(MetaPeerLookupService::new(meta_peer_client.clone()));
@@ -315,7 +315,7 @@ impl MetasrvBuilder {
memory_region_keeper.clone(), memory_region_keeper.clone(),
region_failure_detector_controller.clone(), region_failure_detector_controller.clone(),
mailbox.clone(), mailbox.clone(),
options.grpc.server_addr.clone(), options.server_addr.clone(),
cache_invalidator.clone(), cache_invalidator.clone(),
), ),
)); ));
@@ -390,7 +390,7 @@ impl MetasrvBuilder {
client: Arc::new(kafka_client), client: Arc::new(kafka_client),
table_metadata_manager: table_metadata_manager.clone(), table_metadata_manager: table_metadata_manager.clone(),
leader_region_registry: leader_region_registry.clone(), leader_region_registry: leader_region_registry.clone(),
server_addr: options.grpc.server_addr.clone(), server_addr: options.server_addr.clone(),
mailbox: mailbox.clone(), mailbox: mailbox.clone(),
}; };
let wal_prune_manager = WalPruneManager::new( let wal_prune_manager = WalPruneManager::new(

View File

@@ -26,7 +26,6 @@ use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use hyper_util::rt::TokioIo; use hyper_util::rt::TokioIo;
use servers::grpc::GrpcOptions;
use tonic::codec::CompressionEncoding; use tonic::codec::CompressionEncoding;
use tower::service_fn; use tower::service_fn;
@@ -48,10 +47,7 @@ pub async fn mock_with_memstore() -> MockInfo {
let in_memory = Arc::new(MemoryKvBackend::new()); let in_memory = Arc::new(MemoryKvBackend::new());
mock( mock(
MetasrvOptions { MetasrvOptions {
grpc: GrpcOptions { server_addr: "127.0.0.1:3002".to_string(),
server_addr: "127.0.0.1:3002".to_string(),
..Default::default()
},
..Default::default() ..Default::default()
}, },
kv_backend, kv_backend,
@@ -66,10 +62,7 @@ pub async fn mock_with_etcdstore(addr: &str) -> MockInfo {
let kv_backend = EtcdStore::with_endpoints([addr], 128).await.unwrap(); let kv_backend = EtcdStore::with_endpoints([addr], 128).await.unwrap();
mock( mock(
MetasrvOptions { MetasrvOptions {
grpc: GrpcOptions { server_addr: "127.0.0.1:3002".to_string(),
server_addr: "127.0.0.1:3002".to_string(),
..Default::default()
},
..Default::default() ..Default::default()
}, },
kv_backend, kv_backend,
@@ -87,7 +80,7 @@ pub async fn mock(
datanode_clients: Option<Arc<NodeClients>>, datanode_clients: Option<Arc<NodeClients>>,
in_memory: Option<ResettableKvBackendRef>, in_memory: Option<ResettableKvBackendRef>,
) -> MockInfo { ) -> MockInfo {
let server_addr = opts.grpc.server_addr.clone(); let server_addr = opts.server_addr.clone();
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
table_metadata_manager.init().await.unwrap(); table_metadata_manager.init().await.unwrap();

View File

@@ -88,7 +88,7 @@ impl cluster_server::Cluster for Metasrv {
return Ok(Response::new(resp)); return Ok(Response::new(resp));
} }
let leader_addr = &self.options().grpc.server_addr; let leader_addr = &self.options().server_addr;
let (leader, followers) = match self.election() { let (leader, followers) = match self.election() {
Some(election) => { Some(election) => {
let nodes = election.all_candidates().await?; let nodes = election.all_candidates().await?;

View File

@@ -190,7 +190,6 @@ mod tests {
use api::v1::meta::*; use api::v1::meta::*;
use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::memory::MemoryKvBackend;
use common_telemetry::tracing_context::W3cTrace; use common_telemetry::tracing_context::W3cTrace;
use servers::grpc::GrpcOptions;
use tonic::IntoRequest; use tonic::IntoRequest;
use super::get_node_id; use super::get_node_id;
@@ -204,10 +203,7 @@ mod tests {
let metasrv = MetasrvBuilder::new() let metasrv = MetasrvBuilder::new()
.kv_backend(kv_backend) .kv_backend(kv_backend)
.options(MetasrvOptions { .options(MetasrvOptions {
grpc: GrpcOptions { server_addr: "127.0.0.1:3002".to_string(),
server_addr: "127.0.0.1:3002".to_string(),
..Default::default()
},
..Default::default() ..Default::default()
}) })
.build() .build()
@@ -220,7 +216,7 @@ mod tests {
let res = metasrv.ask_leader(req.into_request()).await.unwrap(); let res = metasrv.ask_leader(req.into_request()).await.unwrap();
let res = res.into_inner(); let res = res.into_inner();
assert_eq!(metasrv.options().grpc.server_addr, res.leader.unwrap().addr); assert_eq!(metasrv.options().bind_addr, res.leader.unwrap().addr);
} }
#[test] #[test]

View File

@@ -282,15 +282,14 @@ mod tests {
use std::sync::Arc; use std::sync::Arc;
use api::v1::SemanticType; use api::v1::SemanticType;
use common_function::function::FunctionRef; use common_function::function_registry::FUNCTION_REGISTRY;
use common_function::function_factory::ScalarFunctionFactory; use common_function::scalars::udf::create_udf;
use common_function::scalars::matches::MatchesFunction;
use common_function::scalars::matches_term::MatchesTermFunction;
use datafusion::functions::string::lower; use datafusion::functions::string::lower;
use datafusion_common::Column; use datafusion_common::Column;
use datafusion_expr::expr::ScalarFunction; use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::ScalarUDF; use datafusion_expr::ScalarUDF;
use datatypes::schema::ColumnSchema; use datatypes::schema::ColumnSchema;
use session::context::QueryContext;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId; use store_api::storage::RegionId;
@@ -318,17 +317,19 @@ mod tests {
} }
fn matches_func() -> Arc<ScalarUDF> { fn matches_func() -> Arc<ScalarUDF> {
Arc::new( Arc::new(create_udf(
ScalarFunctionFactory::from(Arc::new(MatchesFunction) as FunctionRef) FUNCTION_REGISTRY.get_function("matches").unwrap(),
.provide(Default::default()), QueryContext::arc(),
) Default::default(),
))
} }
fn matches_term_func() -> Arc<ScalarUDF> { fn matches_term_func() -> Arc<ScalarUDF> {
Arc::new( Arc::new(create_udf(
ScalarFunctionFactory::from(Arc::new(MatchesTermFunction) as FunctionRef) FUNCTION_REGISTRY.get_function("matches_term").unwrap(),
.provide(Default::default()), QueryContext::arc(),
) Default::default(),
))
} }
#[test] #[test]

View File

@@ -12,28 +12,18 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::collections::HashSet;
use ahash::{HashMap, HashMapExt}; use ahash::{HashMap, HashMapExt};
use api::v1::flow::{DirtyWindowRequest, WindowRange};
use api::v1::region::{ use api::v1::region::{
bulk_insert_request, region_request, BulkInsertRequest, RegionRequest, RegionRequestHeader, bulk_insert_request, region_request, BulkInsertRequest, RegionRequest, RegionRequestHeader,
}; };
use api::v1::ArrowIpc; use api::v1::ArrowIpc;
use arrow::array::{
Array, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
};
use arrow::datatypes::{DataType, Int64Type, TimeUnit};
use arrow::record_batch::RecordBatch;
use common_base::AffectedRows; use common_base::AffectedRows;
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage}; use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
use common_grpc::FlightData; use common_grpc::FlightData;
use common_telemetry::error;
use common_telemetry::tracing_context::TracingContext; use common_telemetry::tracing_context::TracingContext;
use snafu::{OptionExt, ResultExt}; use snafu::ResultExt;
use store_api::storage::{RegionId, TableId}; use store_api::storage::RegionId;
use table::TableRef; use table::metadata::TableId;
use crate::insert::Inserter; use crate::insert::Inserter;
use crate::{error, metrics}; use crate::{error, metrics};
@@ -42,11 +32,10 @@ impl Inserter {
/// Handle bulk insert request. /// Handle bulk insert request.
pub async fn handle_bulk_insert( pub async fn handle_bulk_insert(
&self, &self,
table: TableRef, table_id: TableId,
decoder: &mut FlightDecoder, decoder: &mut FlightDecoder,
data: FlightData, data: FlightData,
) -> error::Result<AffectedRows> { ) -> error::Result<AffectedRows> {
let table_id = table.table_info().table_id();
let decode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED let decode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
.with_label_values(&["decode_request"]) .with_label_values(&["decode_request"])
.start_timer(); .start_timer();
@@ -59,20 +48,6 @@ impl Inserter {
return Ok(0); return Ok(0);
}; };
decode_timer.observe_duration(); decode_timer.observe_duration();
if let Some((min, max)) = compute_timestamp_range(
&record_batch,
&table
.table_info()
.meta
.schema
.timestamp_column()
.as_ref()
.unwrap()
.name,
)? {
// notify flownode to update dirty time windows.
self.update_flow_dirty_window(table_id, min, max);
}
metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64); metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64);
metrics::BULK_REQUEST_ROWS metrics::BULK_REQUEST_ROWS
.with_label_values(&["raw"]) .with_label_values(&["raw"])
@@ -241,88 +216,4 @@ impl Inserter {
crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(rows_inserted as u64); crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(rows_inserted as u64);
Ok(rows_inserted) Ok(rows_inserted)
} }
fn update_flow_dirty_window(&self, table_id: TableId, min: i64, max: i64) {
let table_flownode_set_cache = self.table_flownode_set_cache.clone();
let node_manager = self.node_manager.clone();
common_runtime::spawn_global(async move {
let result = table_flownode_set_cache
.get(table_id)
.await
.context(error::RequestInsertsSnafu);
let flownodes = match result {
Ok(flownodes) => flownodes.unwrap_or_default(),
Err(e) => {
error!(e; "Failed to get flownodes for table id: {}", table_id);
return;
}
};
let peers: HashSet<_> = flownodes.values().cloned().collect();
for peer in peers {
let node_manager = node_manager.clone();
common_runtime::spawn_global(async move {
if let Err(e) = node_manager
.flownode(&peer)
.await
.handle_mark_window_dirty(DirtyWindowRequest {
table_id,
dirty_time_ranges: vec![WindowRange {
start_value: min,
end_value: max,
}],
})
.await
.context(error::RequestInsertsSnafu)
{
error!(e; "Failed to mark time window as dirty, table: {}, min: {}, max: {}", table_id, min, max);
}
});
}
});
}
}
/// Calculate the timestamp range of record batch. Return `None` if record batch is empty.
fn compute_timestamp_range(
rb: &RecordBatch,
timestamp_index_name: &str,
) -> error::Result<Option<(i64, i64)>> {
let ts_col = rb
.column_by_name(timestamp_index_name)
.context(error::ColumnNotFoundSnafu {
msg: timestamp_index_name,
})?;
if rb.num_rows() == 0 {
return Ok(None);
}
let primitive = match ts_col.data_type() {
DataType::Timestamp(unit, _) => match unit {
TimeUnit::Second => ts_col
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
TimeUnit::Millisecond => ts_col
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
TimeUnit::Microsecond => ts_col
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
TimeUnit::Nanosecond => ts_col
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
},
t => {
return error::InvalidTimeIndexTypeSnafu { ty: t.clone() }.fail();
}
};
Ok(arrow::compute::min(&primitive).zip(arrow::compute::max(&primitive)))
} }

View File

@@ -837,13 +837,6 @@ pub enum Error {
#[snafu(implicit)] #[snafu(implicit)]
location: Location, location: Location,
}, },
#[snafu(display("Invalid time index type: {}", ty))]
InvalidTimeIndexType {
ty: arrow::datatypes::DataType,
#[snafu(implicit)]
location: Location,
},
} }
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
@@ -971,7 +964,6 @@ impl ErrorExt for Error {
Error::ColumnOptions { source, .. } => source.status_code(), Error::ColumnOptions { source, .. } => source.status_code(),
Error::DecodeFlightData { source, .. } => source.status_code(), Error::DecodeFlightData { source, .. } => source.status_code(),
Error::ComputeArrow { .. } => StatusCode::Internal, Error::ComputeArrow { .. } => StatusCode::Internal,
Error::InvalidTimeIndexType { .. } => StatusCode::InvalidArguments,
} }
} }

View File

@@ -78,7 +78,7 @@ pub struct Inserter {
catalog_manager: CatalogManagerRef, catalog_manager: CatalogManagerRef,
pub(crate) partition_manager: PartitionRuleManagerRef, pub(crate) partition_manager: PartitionRuleManagerRef,
pub(crate) node_manager: NodeManagerRef, pub(crate) node_manager: NodeManagerRef,
pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef, table_flownode_set_cache: TableFlownodeSetCacheRef,
} }
pub type InserterRef = Arc<Inserter>; pub type InserterRef = Arc<Inserter>;

View File

@@ -13,17 +13,20 @@
// limitations under the License. // limitations under the License.
use clap::Parser; use clap::Parser;
use cli::{BenchTableMetadataCommand, DataCommand, MetaCommand, Tool}; use cli::{
BenchTableMetadataCommand, ExportCommand, ImportCommand, MetaRestoreCommand,
MetaSnapshotCommand, Tool,
};
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
#[derive(Parser)] #[derive(Parser)]
pub enum SubCommand { pub enum SubCommand {
// Attach(AttachCommand), // Attach(AttachCommand),
Bench(BenchTableMetadataCommand), Bench(BenchTableMetadataCommand),
#[clap(subcommand)] Export(ExportCommand),
Data(DataCommand), Import(ImportCommand),
#[clap(subcommand)] MetaSnapshot(MetaSnapshotCommand),
Meta(MetaCommand), MetaRestore(MetaRestoreCommand),
} }
impl SubCommand { impl SubCommand {
@@ -31,8 +34,10 @@ impl SubCommand {
match self { match self {
// SubCommand::Attach(cmd) => cmd.build().await, // SubCommand::Attach(cmd) => cmd.build().await,
SubCommand::Bench(cmd) => cmd.build().await, SubCommand::Bench(cmd) => cmd.build().await,
SubCommand::Data(cmd) => cmd.build().await, SubCommand::Export(cmd) => cmd.build().await,
SubCommand::Meta(cmd) => cmd.build().await, SubCommand::Import(cmd) => cmd.build().await,
SubCommand::MetaSnapshot(cmd) => cmd.build().await,
SubCommand::MetaRestore(cmd) => cmd.build().await,
} }
} }
} }

View File

@@ -25,7 +25,8 @@ use async_trait::async_trait;
use common_base::Plugins; use common_base::Plugins;
use common_catalog::consts::is_readonly_schema; use common_catalog::consts::is_readonly_schema;
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_function::function_factory::ScalarFunctionFactory; use common_function::function::FunctionRef;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::{Output, OutputData, OutputMeta}; use common_query::{Output, OutputData, OutputMeta};
use common_recordbatch::adapter::RecordBatchStreamAdapter; use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream}; use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
@@ -34,9 +35,7 @@ use datafusion::physical_plan::analyze::AnalyzeExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::ResolvedTableReference; use datafusion_common::ResolvedTableReference;
use datafusion_expr::{ use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, LogicalPlan, WriteOp};
AggregateUDF, DmlStatement, LogicalPlan as DfLogicalPlan, LogicalPlan, WriteOp,
};
use datatypes::prelude::VectorRef; use datatypes::prelude::VectorRef;
use datatypes::schema::Schema; use datatypes::schema::Schema;
use futures_util::StreamExt; use futures_util::StreamExt;
@@ -455,14 +454,14 @@ impl QueryEngine for DatafusionQueryEngine {
/// `SELECT "my_UDAF"(x)` will look for an aggregate named `"my_UDAF"` /// `SELECT "my_UDAF"(x)` will look for an aggregate named `"my_UDAF"`
/// ///
/// So it's better to make UDAF name lowercase when creating one. /// So it's better to make UDAF name lowercase when creating one.
fn register_aggregate_function(&self, func: AggregateUDF) { fn register_aggregate_function(&self, func: AggregateFunctionMetaRef) {
self.state.register_aggr_function(func); self.state.register_aggregate_function(func);
} }
/// Register an scalar function. /// Register an UDF function.
/// Will override if the function with same name is already registered. /// Will override if the function with same name is already registered.
fn register_scalar_function(&self, func: ScalarFunctionFactory) { fn register_function(&self, func: FunctionRef) {
self.state.register_scalar_function(func); self.state.register_function(func);
} }
fn read_table(&self, table: TableRef) -> Result<DataFrame> { fn read_table(&self, table: TableRef) -> Result<DataFrame> {

View File

@@ -18,7 +18,12 @@ use std::sync::Arc;
use arrow_schema::DataType; use arrow_schema::DataType;
use catalog::table_source::DfTableSourceProvider; use catalog::table_source::DfTableSourceProvider;
use common_function::function::FunctionContext; use common_function::aggr::{
GeoPathAccumulator, HllState, UddSketchState, GEO_PATH_NAME, HLL_MERGE_NAME, HLL_NAME,
UDDSKETCH_MERGE_NAME, UDDSKETCH_STATE_NAME,
};
use common_function::scalars::udf::create_udf;
use common_query::logical_plan::create_aggregate_function;
use datafusion::common::TableReference; use datafusion::common::TableReference;
use datafusion::datasource::cte_worktable::CteWorkTable; use datafusion::datasource::cte_worktable::CteWorkTable;
use datafusion::datasource::file_format::{format_as_file_type, FileFormatFactory}; use datafusion::datasource::file_format::{format_as_file_type, FileFormatFactory};
@@ -146,21 +151,38 @@ impl ContextProvider for DfContextProviderAdapter {
} }
fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> { fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
self.engine_state.scalar_function(name).map_or_else( self.engine_state.udf_function(name).map_or_else(
|| self.session_state.scalar_functions().get(name).cloned(), || self.session_state.scalar_functions().get(name).cloned(),
|func| { |func| {
Some(Arc::new(func.provide(FunctionContext { Some(Arc::new(create_udf(
query_ctx: self.query_ctx.clone(), func,
state: self.engine_state.function_state(), self.query_ctx.clone(),
}))) self.engine_state.function_state(),
)))
}, },
) )
} }
fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> { fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
self.engine_state.aggr_function(name).map_or_else( if name == UDDSKETCH_STATE_NAME {
return Some(Arc::new(UddSketchState::state_udf_impl()));
} else if name == UDDSKETCH_MERGE_NAME {
return Some(Arc::new(UddSketchState::merge_udf_impl()));
} else if name == HLL_NAME {
return Some(Arc::new(HllState::state_udf_impl()));
} else if name == HLL_MERGE_NAME {
return Some(Arc::new(HllState::merge_udf_impl()));
} else if name == GEO_PATH_NAME {
return Some(Arc::new(GeoPathAccumulator::udf_impl()));
}
self.engine_state.aggregate_function(name).map_or_else(
|| self.session_state.aggregate_functions().get(name).cloned(), || self.session_state.aggregate_functions().get(name).cloned(),
|func| Some(Arc::new(func)), |func| {
Some(Arc::new(
create_aggregate_function(func.name(), func.args_count(), func.create()).into(),
))
},
) )
} }
@@ -191,13 +213,13 @@ impl ContextProvider for DfContextProviderAdapter {
} }
fn udf_names(&self) -> Vec<String> { fn udf_names(&self) -> Vec<String> {
let mut names = self.engine_state.scalar_names(); let mut names = self.engine_state.udf_names();
names.extend(self.session_state.scalar_functions().keys().cloned()); names.extend(self.session_state.scalar_functions().keys().cloned());
names names
} }
fn udaf_names(&self) -> Vec<String> { fn udaf_names(&self) -> Vec<String> {
let mut names = self.engine_state.aggr_names(); let mut names = self.engine_state.udaf_names();
names.extend(self.session_state.aggregate_functions().keys().cloned()); names.extend(self.session_state.aggregate_functions().keys().cloned());
names names
} }

View File

@@ -2444,7 +2444,7 @@ impl PromPlanner {
LogicalPlanBuilder::from(left) LogicalPlanBuilder::from(left)
.alias(left_table_ref) .alias(left_table_ref)
.context(DataFusionPlanningSnafu)? .context(DataFusionPlanningSnafu)?
.join_detailed( .join(
right, right,
JoinType::Inner, JoinType::Inner,
( (
@@ -2458,7 +2458,6 @@ impl PromPlanner {
.collect::<Vec<_>>(), .collect::<Vec<_>>(),
), ),
None, None,
true,
) )
.context(DataFusionPlanningSnafu)? .context(DataFusionPlanningSnafu)?
.build() .build()

View File

@@ -22,13 +22,14 @@ use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use catalog::CatalogManagerRef; use catalog::CatalogManagerRef;
use common_base::Plugins; use common_base::Plugins;
use common_function::function_factory::ScalarFunctionFactory; use common_function::function::FunctionRef;
use common_function::function_registry::FUNCTION_REGISTRY; use common_function::function_registry::FUNCTION_REGISTRY;
use common_function::handlers::{ use common_function::handlers::{
FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef, FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
}; };
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::Output; use common_query::Output;
use datafusion_expr::{AggregateUDF, LogicalPlan}; use datafusion_expr::LogicalPlan;
use datatypes::schema::Schema; use datatypes::schema::Schema;
pub use default_serializer::{DefaultPlanDecoder, DefaultSerializer}; pub use default_serializer::{DefaultPlanDecoder, DefaultSerializer};
use session::context::QueryContextRef; use session::context::QueryContextRef;
@@ -78,11 +79,11 @@ pub trait QueryEngine: Send + Sync {
/// ///
/// # Panics /// # Panics
/// Will panic if the function with same name is already registered. /// Will panic if the function with same name is already registered.
fn register_aggregate_function(&self, func: AggregateUDF); fn register_aggregate_function(&self, func: AggregateFunctionMetaRef);
/// Register a scalar function. /// Register a SQL function.
/// Will override if the function with same name is already registered. /// Will override if the function with same name is already registered.
fn register_scalar_function(&self, func: ScalarFunctionFactory); fn register_function(&self, func: FunctionRef);
/// Create a DataFrame from a table. /// Create a DataFrame from a table.
fn read_table(&self, table: TableRef) -> Result<DataFrame>; fn read_table(&self, table: TableRef) -> Result<DataFrame>;
@@ -153,8 +154,8 @@ impl QueryEngineFactory {
/// Register all functions implemented by GreptimeDB /// Register all functions implemented by GreptimeDB
fn register_functions(query_engine: &Arc<DatafusionQueryEngine>) { fn register_functions(query_engine: &Arc<DatafusionQueryEngine>) {
for func in FUNCTION_REGISTRY.scalar_functions() { for func in FUNCTION_REGISTRY.functions() {
query_engine.register_scalar_function(func); query_engine.register_function(func);
} }
for accumulator in FUNCTION_REGISTRY.aggregate_functions() { for accumulator in FUNCTION_REGISTRY.aggregate_functions() {

View File

@@ -15,8 +15,9 @@
use std::sync::Arc; use std::sync::Arc;
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_function::function::FunctionContext; use common_function::aggr::{GeoPathAccumulator, HllState, UddSketchState};
use common_function::function_registry::FUNCTION_REGISTRY; use common_function::function_registry::FUNCTION_REGISTRY;
use common_function::scalars::udf::create_udf;
use common_query::error::RegisterUdfSnafu; use common_query::error::RegisterUdfSnafu;
use common_query::logical_plan::SubstraitPlanDecoder; use common_query::logical_plan::SubstraitPlanDecoder;
use datafusion::catalog::CatalogProviderList; use datafusion::catalog::CatalogProviderList;
@@ -123,46 +124,43 @@ impl SubstraitPlanDecoder for DefaultPlanDecoder {
// if they have the same name as the default UDFs or their alias. // if they have the same name as the default UDFs or their alias.
// e.g. The default UDF `to_char()` has an alias `date_format()`, if we register a UDF with the name `date_format()` // e.g. The default UDF `to_char()` has an alias `date_format()`, if we register a UDF with the name `date_format()`
// before we build the session state, the UDF will be lost. // before we build the session state, the UDF will be lost.
for func in FUNCTION_REGISTRY.scalar_functions() { for func in FUNCTION_REGISTRY.functions() {
let udf = func.provide(FunctionContext { let udf = Arc::new(create_udf(
query_ctx: self.query_ctx.clone(), func.clone(),
state: Default::default(), self.query_ctx.clone(),
}); Default::default(),
));
session_state session_state
.register_udf(Arc::new(udf)) .register_udf(udf)
.context(RegisterUdfSnafu { name: func.name() })?; .context(RegisterUdfSnafu { name: func.name() })?;
let _ = session_state.register_udaf(Arc::new(UddSketchState::state_udf_impl()));
let _ = session_state.register_udaf(Arc::new(UddSketchState::merge_udf_impl()));
let _ = session_state.register_udaf(Arc::new(HllState::state_udf_impl()));
let _ = session_state.register_udaf(Arc::new(HllState::merge_udf_impl()));
let _ = session_state.register_udaf(Arc::new(GeoPathAccumulator::udf_impl()));
let _ = session_state.register_udaf(quantile_udaf());
let _ = session_state.register_udf(Arc::new(IDelta::<false>::scalar_udf()));
let _ = session_state.register_udf(Arc::new(IDelta::<true>::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Rate::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Increase::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Delta::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Resets::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Changes::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Deriv::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Round::scalar_udf()));
let _ = session_state.register_udf(Arc::new(AvgOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(MinOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(MaxOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(SumOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(CountOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(LastOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(AbsentOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(PresentOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(StddevOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(StdvarOverTime::scalar_udf()));
// TODO(ruihang): add quantile_over_time, predict_linear, holt_winters, round
} }
for func in FUNCTION_REGISTRY.aggregate_functions() {
let name = func.name().to_string();
session_state
.register_udaf(Arc::new(func))
.context(RegisterUdfSnafu { name })?;
}
let _ = session_state.register_udaf(quantile_udaf());
let _ = session_state.register_udf(Arc::new(IDelta::<false>::scalar_udf()));
let _ = session_state.register_udf(Arc::new(IDelta::<true>::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Rate::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Increase::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Delta::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Resets::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Changes::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Deriv::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Round::scalar_udf()));
let _ = session_state.register_udf(Arc::new(AvgOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(MinOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(MaxOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(SumOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(CountOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(LastOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(AbsentOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(PresentOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(StddevOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(StdvarOverTime::scalar_udf()));
// TODO(ruihang): add quantile_over_time, predict_linear, holt_winters, round
let logical_plan = DFLogicalSubstraitConvertor let logical_plan = DFLogicalSubstraitConvertor
.decode(message, session_state) .decode(message, session_state)
.await .await

View File

@@ -19,10 +19,11 @@ use std::sync::{Arc, RwLock};
use async_trait::async_trait; use async_trait::async_trait;
use catalog::CatalogManagerRef; use catalog::CatalogManagerRef;
use common_base::Plugins; use common_base::Plugins;
use common_function::function_factory::ScalarFunctionFactory; use common_function::function::FunctionRef;
use common_function::handlers::{ use common_function::handlers::{
FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef, FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
}; };
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_function::state::FunctionState; use common_function::state::FunctionState;
use common_telemetry::warn; use common_telemetry::warn;
use datafusion::dataframe::DataFrame; use datafusion::dataframe::DataFrame;
@@ -36,7 +37,7 @@ use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan;
use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner}; use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner};
use datafusion_expr::{AggregateUDF, LogicalPlan as DfLogicalPlan}; use datafusion_expr::LogicalPlan as DfLogicalPlan;
use datafusion_optimizer::analyzer::count_wildcard_rule::CountWildcardRule; use datafusion_optimizer::analyzer::count_wildcard_rule::CountWildcardRule;
use datafusion_optimizer::analyzer::{Analyzer, AnalyzerRule}; use datafusion_optimizer::analyzer::{Analyzer, AnalyzerRule};
use datafusion_optimizer::optimizer::Optimizer; use datafusion_optimizer::optimizer::Optimizer;
@@ -69,8 +70,8 @@ pub struct QueryEngineState {
df_context: SessionContext, df_context: SessionContext,
catalog_manager: CatalogManagerRef, catalog_manager: CatalogManagerRef,
function_state: Arc<FunctionState>, function_state: Arc<FunctionState>,
scalar_functions: Arc<RwLock<HashMap<String, ScalarFunctionFactory>>>, udf_functions: Arc<RwLock<HashMap<String, FunctionRef>>>,
aggr_functions: Arc<RwLock<HashMap<String, AggregateUDF>>>, aggregate_functions: Arc<RwLock<HashMap<String, AggregateFunctionMetaRef>>>,
extension_rules: Vec<Arc<dyn ExtensionAnalyzerRule + Send + Sync>>, extension_rules: Vec<Arc<dyn ExtensionAnalyzerRule + Send + Sync>>,
plugins: Plugins, plugins: Plugins,
} }
@@ -185,10 +186,10 @@ impl QueryEngineState {
procedure_service_handler, procedure_service_handler,
flow_service_handler, flow_service_handler,
}), }),
aggr_functions: Arc::new(RwLock::new(HashMap::new())), aggregate_functions: Arc::new(RwLock::new(HashMap::new())),
extension_rules, extension_rules,
plugins, plugins,
scalar_functions: Arc::new(RwLock::new(HashMap::new())), udf_functions: Arc::new(RwLock::new(HashMap::new())),
} }
} }
@@ -221,28 +222,38 @@ impl QueryEngineState {
self.session_state().optimize(&plan) self.session_state().optimize(&plan)
} }
/// Retrieve the scalar function by name /// Register an udf function.
pub fn scalar_function(&self, function_name: &str) -> Option<ScalarFunctionFactory> { /// Will override if the function with same name is already registered.
self.scalar_functions pub fn register_function(&self, func: FunctionRef) {
let name = func.name().to_string();
let x = self
.udf_functions
.write()
.unwrap()
.insert(name.clone(), func);
if x.is_some() {
warn!("Already registered udf function '{name}'");
}
}
/// Retrieve the udf function by name
pub fn udf_function(&self, function_name: &str) -> Option<FunctionRef> {
self.udf_functions
.read() .read()
.unwrap() .unwrap()
.get(function_name) .get(function_name)
.cloned() .cloned()
} }
/// Retrieve scalar function names. /// Retrieve udf function names.
pub fn scalar_names(&self) -> Vec<String> { pub fn udf_names(&self) -> Vec<String> {
self.scalar_functions self.udf_functions.read().unwrap().keys().cloned().collect()
.read()
.unwrap()
.keys()
.cloned()
.collect()
} }
/// Retrieve the aggregate function by name /// Retrieve the aggregate function by name
pub fn aggr_function(&self, function_name: &str) -> Option<AggregateUDF> { pub fn aggregate_function(&self, function_name: &str) -> Option<AggregateFunctionMetaRef> {
self.aggr_functions self.aggregate_functions
.read() .read()
.unwrap() .unwrap()
.get(function_name) .get(function_name)
@@ -250,8 +261,8 @@ impl QueryEngineState {
} }
/// Retrieve aggregate function names. /// Retrieve aggregate function names.
pub fn aggr_names(&self) -> Vec<String> { pub fn udaf_names(&self) -> Vec<String> {
self.aggr_functions self.aggregate_functions
.read() .read()
.unwrap() .unwrap()
.keys() .keys()
@@ -259,21 +270,6 @@ impl QueryEngineState {
.collect() .collect()
} }
/// Register an scalar function.
/// Will override if the function with same name is already registered.
pub fn register_scalar_function(&self, func: ScalarFunctionFactory) {
let name = func.name().to_string();
let x = self
.scalar_functions
.write()
.unwrap()
.insert(name.clone(), func);
if x.is_some() {
warn!("Already registered scalar function '{name}'");
}
}
/// Register an aggregate function. /// Register an aggregate function.
/// ///
/// # Panics /// # Panics
@@ -282,10 +278,10 @@ impl QueryEngineState {
/// Panicking consideration: currently the aggregated functions are all statically registered, /// Panicking consideration: currently the aggregated functions are all statically registered,
/// user cannot define their own aggregate functions on the fly. So we can panic here. If that /// user cannot define their own aggregate functions on the fly. So we can panic here. If that
/// invariant is broken in the future, we should return an error instead of panicking. /// invariant is broken in the future, we should return an error instead of panicking.
pub fn register_aggr_function(&self, func: AggregateUDF) { pub fn register_aggregate_function(&self, func: AggregateFunctionMetaRef) {
let name = func.name().to_string(); let name = func.name();
let x = self let x = self
.aggr_functions .aggregate_functions
.write() .write()
.unwrap() .unwrap()
.insert(name.clone(), func); .insert(name.clone(), func);

View File

@@ -16,12 +16,11 @@ use std::fmt::Debug;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::sync::Arc; use std::sync::Arc;
use common_function::scalars::aggregate::AggregateFunctionMeta;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore}; use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{CreateAccumulatorSnafu, Result as QueryResult}; use common_query::error::{CreateAccumulatorSnafu, Result as QueryResult};
use common_query::logical_plan::accumulator::AggrFuncTypeStore; use common_query::logical_plan::accumulator::AggrFuncTypeStore;
use common_query::logical_plan::{ use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
create_aggregate_function, Accumulator, AggregateFunctionCreator,
};
use common_query::prelude::*; use common_query::prelude::*;
use common_recordbatch::{RecordBatch, RecordBatches}; use common_recordbatch::{RecordBatch, RecordBatches};
use datatypes::prelude::*; use datatypes::prelude::*;
@@ -208,14 +207,11 @@ where
let engine = new_query_engine_with_table(testing_table); let engine = new_query_engine_with_table(testing_table);
engine.register_aggregate_function( engine.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
create_aggregate_function( "my_sum",
"my_sum".to_string(), 1,
1, Arc::new(|| Arc::new(MySumAccumulatorCreator::default())),
Arc::new(MySumAccumulatorCreator::default()), )));
)
.into(),
);
let sql = format!("select MY_SUM({column_name}) as my_sum from {table_name}"); let sql = format!("select MY_SUM({column_name}) as my_sum from {table_name}");
let batches = exec_selection(engine, &sql).await; let batches = exec_selection(engine, &sql).await;

View File

@@ -66,8 +66,6 @@ pub struct GrpcOptions {
pub max_recv_message_size: ReadableSize, pub max_recv_message_size: ReadableSize,
/// Max gRPC sending(encoding) message size /// Max gRPC sending(encoding) message size
pub max_send_message_size: ReadableSize, pub max_send_message_size: ReadableSize,
/// Compression mode in Arrow Flight service.
pub flight_compression: FlightCompression,
pub runtime_size: usize, pub runtime_size: usize,
#[serde(default = "Default::default")] #[serde(default = "Default::default")]
pub tls: TlsOption, pub tls: TlsOption,
@@ -116,7 +114,6 @@ impl Default for GrpcOptions {
server_addr: String::new(), server_addr: String::new(),
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
flight_compression: FlightCompression::ArrowIpc,
runtime_size: 8, runtime_size: 8,
tls: TlsOption::default(), tls: TlsOption::default(),
} }
@@ -135,30 +132,6 @@ impl GrpcOptions {
} }
} }
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum FlightCompression {
/// Disable all compression in Arrow Flight service.
#[default]
None,
/// Enable only transport layer compression (zstd).
Transport,
/// Enable only payload compression (lz4)
ArrowIpc,
/// Enable all compression.
All,
}
impl FlightCompression {
pub fn transport_compression(&self) -> bool {
self == &FlightCompression::Transport || self == &FlightCompression::All
}
pub fn arrow_compression(&self) -> bool {
self == &FlightCompression::ArrowIpc || self == &FlightCompression::All
}
}
pub struct GrpcServer { pub struct GrpcServer {
// states // states
shutdown_tx: Mutex<Option<Sender<()>>>, shutdown_tx: Mutex<Option<Sender<()>>>,

Some files were not shown because too many files have changed in this diff Show More