mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-27 16:32:54 +00:00
Compare commits
25 Commits
flow/admin
...
feat/bulk-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
51454c93d9 | ||
|
|
5dd6f92b60 | ||
|
|
11e4a8abb3 | ||
|
|
a9cbcbb0c8 | ||
|
|
ebd815d288 | ||
|
|
403958f5ba | ||
|
|
ffe84af343 | ||
|
|
41c40688c4 | ||
|
|
59a82a3f29 | ||
|
|
0d7012a1ea | ||
|
|
88da98c829 | ||
|
|
73ce5914f2 | ||
|
|
f5eac3528c | ||
|
|
9cd61d221d | ||
|
|
7527ff976e | ||
|
|
1d53dd26ae | ||
|
|
01796c9cc0 | ||
|
|
9469a8f8f2 | ||
|
|
2fabe346a1 | ||
|
|
c26138963e | ||
|
|
12648f388a | ||
|
|
2979aa048e | ||
|
|
74222c3070 | ||
|
|
0311db3089 | ||
|
|
e434294a0c |
@@ -59,7 +59,7 @@ runs:
|
||||
--set base.podTemplate.main.resources.requests.cpu=50m \
|
||||
--set base.podTemplate.main.resources.requests.memory=256Mi \
|
||||
--set base.podTemplate.main.resources.limits.cpu=2000m \
|
||||
--set base.podTemplate.main.resources.limits.memory=2Gi \
|
||||
--set base.podTemplate.main.resources.limits.memory=3Gi \
|
||||
--set frontend.replicas=${{ inputs.frontend-replicas }} \
|
||||
--set datanode.replicas=${{ inputs.datanode-replicas }} \
|
||||
--set meta.replicas=${{ inputs.meta-replicas }} \
|
||||
|
||||
@@ -30,7 +30,7 @@ update_helm_charts_version() {
|
||||
|
||||
# Commit the changes.
|
||||
git add .
|
||||
git commit -m "chore: Update GreptimeDB version to ${VERSION}"
|
||||
git commit -s -m "chore: Update GreptimeDB version to ${VERSION}"
|
||||
git push origin $BRANCH_NAME
|
||||
|
||||
# Create a Pull Request.
|
||||
|
||||
@@ -26,7 +26,7 @@ update_homebrew_greptime_version() {
|
||||
|
||||
# Commit the changes.
|
||||
git add .
|
||||
git commit -m "chore: Update GreptimeDB version to ${VERSION}"
|
||||
git commit -s -m "chore: Update GreptimeDB version to ${VERSION}"
|
||||
git push origin $BRANCH_NAME
|
||||
|
||||
# Create a Pull Request.
|
||||
|
||||
15
.github/workflows/develop.yml
vendored
15
.github/workflows/develop.yml
vendored
@@ -250,6 +250,11 @@ jobs:
|
||||
name: unstable-fuzz-logs
|
||||
path: /tmp/unstable-greptime/
|
||||
retention-days: 3
|
||||
- name: Describe pods
|
||||
if: failure()
|
||||
shell: bash
|
||||
run: |
|
||||
kubectl describe pod -n my-greptimedb
|
||||
|
||||
build-greptime-ci:
|
||||
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
|
||||
@@ -405,6 +410,11 @@ jobs:
|
||||
shell: bash
|
||||
run: |
|
||||
kubectl describe nodes
|
||||
- name: Describe pod
|
||||
if: failure()
|
||||
shell: bash
|
||||
run: |
|
||||
kubectl describe pod -n my-greptimedb
|
||||
- name: Export kind logs
|
||||
if: failure()
|
||||
shell: bash
|
||||
@@ -554,6 +564,11 @@ jobs:
|
||||
shell: bash
|
||||
run: |
|
||||
kubectl describe nodes
|
||||
- name: Describe pods
|
||||
if: failure()
|
||||
shell: bash
|
||||
run: |
|
||||
kubectl describe pod -n my-greptimedb
|
||||
- name: Export kind logs
|
||||
if: failure()
|
||||
shell: bash
|
||||
|
||||
70
Cargo.lock
generated
70
Cargo.lock
generated
@@ -3252,7 +3252,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"arrow-array 54.2.1",
|
||||
@@ -3303,7 +3303,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-catalog"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"async-trait",
|
||||
@@ -3323,7 +3323,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-catalog-listing"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"arrow-schema 54.3.1",
|
||||
@@ -3346,7 +3346,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-common"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"arrow 54.2.1",
|
||||
@@ -3371,7 +3371,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-common-runtime"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"log",
|
||||
"tokio",
|
||||
@@ -3380,12 +3380,12 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-doc"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
|
||||
[[package]]
|
||||
name = "datafusion-execution"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"dashmap",
|
||||
@@ -3403,7 +3403,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-expr"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"chrono",
|
||||
@@ -3423,7 +3423,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-expr-common"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"datafusion-common",
|
||||
@@ -3434,7 +3434,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-functions"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"arrow-buffer 54.3.1",
|
||||
@@ -3463,7 +3463,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-functions-aggregate"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"arrow 54.2.1",
|
||||
@@ -3484,7 +3484,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-functions-aggregate-common"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"arrow 54.2.1",
|
||||
@@ -3496,7 +3496,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-functions-nested"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"arrow-array 54.2.1",
|
||||
@@ -3518,7 +3518,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-functions-table"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"async-trait",
|
||||
@@ -3533,7 +3533,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-functions-window"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"datafusion-common",
|
||||
"datafusion-doc",
|
||||
@@ -3549,7 +3549,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-functions-window-common"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"datafusion-common",
|
||||
"datafusion-physical-expr-common",
|
||||
@@ -3558,7 +3558,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-macros"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"datafusion-expr",
|
||||
"quote",
|
||||
@@ -3568,7 +3568,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-optimizer"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"chrono",
|
||||
@@ -3586,7 +3586,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-physical-expr"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"arrow 54.2.1",
|
||||
@@ -3609,7 +3609,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-physical-expr-common"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"arrow 54.2.1",
|
||||
@@ -3622,7 +3622,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-physical-optimizer"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"arrow-schema 54.3.1",
|
||||
@@ -3643,7 +3643,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-physical-plan"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"arrow 54.2.1",
|
||||
@@ -3673,7 +3673,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-sql"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"arrow-array 54.2.1",
|
||||
@@ -3691,7 +3691,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-substrait"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"async-recursion",
|
||||
"async-trait",
|
||||
@@ -4511,9 +4511,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "flate2"
|
||||
version = "1.1.1"
|
||||
version = "1.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7ced92e76e966ca2fd84c8f7aa01a4aea65b0eb6648d72f7c8f3e2764a67fece"
|
||||
checksum = "4a3d7db9596fecd151c5f638c0ee5d5bd487b6e0ea232e5dc96d5250f6f94b1d"
|
||||
dependencies = [
|
||||
"crc32fast",
|
||||
"libz-rs-sys",
|
||||
@@ -5133,7 +5133,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=2dca1dc67862d7b410838aef81232274c019b3f6#2dca1dc67862d7b410838aef81232274c019b3f6"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=52083925a15d741c259800a9a54eba3467939180#52083925a15d741c259800a9a54eba3467939180"
|
||||
dependencies = [
|
||||
"prost 0.13.5",
|
||||
"serde",
|
||||
@@ -5146,9 +5146,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "grok"
|
||||
version = "2.0.0"
|
||||
version = "2.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "273797968160270573071022613fc4aa28b91fe68f3eef6c96a1b2a1947ddfbd"
|
||||
checksum = "6c52724b609896f661a3f4641dd3a44dc602958ef615857c12d00756b4e9355b"
|
||||
dependencies = [
|
||||
"glob",
|
||||
"onig",
|
||||
@@ -6716,9 +6716,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libz-rs-sys"
|
||||
version = "0.5.0"
|
||||
version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6489ca9bd760fe9642d7644e827b0c9add07df89857b0416ee15c1cc1a3b8c5a"
|
||||
checksum = "172a788537a2221661b480fee8dc5f96c580eb34fa88764d3205dc356c7e4221"
|
||||
dependencies = [
|
||||
"zlib-rs",
|
||||
]
|
||||
@@ -9664,9 +9664,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "psl"
|
||||
version = "2.1.112"
|
||||
version = "2.1.119"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1c6b4c497a0c6bfb466f75167c728b1a861b0cdc39de9c35b877208a270a9590"
|
||||
checksum = "d0e49aa528239f2ca13ad87387977c208e59c3fb8c437609f95f1b3898ec6ef1"
|
||||
dependencies = [
|
||||
"psl-types",
|
||||
]
|
||||
@@ -14701,9 +14701,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "zlib-rs"
|
||||
version = "0.5.0"
|
||||
version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "868b928d7949e09af2f6086dfc1e01936064cc7a819253bce650d4e2a2d63ba8"
|
||||
checksum = "626bd9fa9734751fc50d6060752170984d7053f5a39061f524cda68023d4db8a"
|
||||
|
||||
[[package]]
|
||||
name = "zstd"
|
||||
|
||||
20
Cargo.toml
20
Cargo.toml
@@ -116,15 +116,15 @@ clap = { version = "4.4", features = ["derive"] }
|
||||
config = "0.13.0"
|
||||
crossbeam-utils = "0.8"
|
||||
dashmap = "6.1"
|
||||
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
deadpool = "0.12"
|
||||
deadpool-postgres = "0.14"
|
||||
derive_builder = "0.20"
|
||||
@@ -133,7 +133,7 @@ etcd-client = "0.14"
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2dca1dc67862d7b410838aef81232274c019b3f6" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "52083925a15d741c259800a9a54eba3467939180" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
|
||||
@@ -195,13 +195,13 @@
|
||||
| `slow_query.record_type` | String | Unset | The record type of slow queries. It can be `system_table` or `log`. |
|
||||
| `slow_query.threshold` | String | Unset | The threshold of slow query. |
|
||||
| `slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
|
||||
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics` | -- | -- | The standalone can export its metrics and send to Prometheus compatible service (e.g. `greptimedb`) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
|
||||
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
|
||||
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommended to collect metrics generated by itself<br/>You must create the database before enabling it. |
|
||||
| `export_metrics.self_import.db` | String | Unset | -- |
|
||||
| `export_metrics.remote_write` | -- | -- | -- |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
|
||||
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
|
||||
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
|
||||
@@ -232,6 +232,7 @@
|
||||
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
|
||||
| `grpc.server_addr` | String | `127.0.0.1:4001` | The address advertised to the metasrv, and used for connections from outside the host.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `grpc.bind_addr`. |
|
||||
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
|
||||
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for frontend side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression. |
|
||||
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
|
||||
| `grpc.tls.mode` | String | `disable` | TLS mode. |
|
||||
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |
|
||||
@@ -298,13 +299,11 @@
|
||||
| `slow_query.threshold` | String | `30s` | The threshold of slow query. It can be human readable time string, for example: `10s`, `100ms`, `1s`. |
|
||||
| `slow_query.sample_ratio` | Float | `1.0` | The sampling ratio of slow query log. The value should be in the range of (0, 1]. For example, `0.1` means 10% of the slow queries will be logged and `1.0` means all slow queries will be logged. |
|
||||
| `slow_query.ttl` | String | `30d` | The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`. |
|
||||
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics` | -- | -- | The frontend can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
|
||||
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
|
||||
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
|
||||
| `export_metrics.self_import.db` | String | Unset | -- |
|
||||
| `export_metrics.remote_write` | -- | -- | -- |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
|
||||
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
|
||||
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
|
||||
@@ -315,11 +314,9 @@
|
||||
| Key | Type | Default | Descriptions |
|
||||
| --- | -----| ------- | ----------- |
|
||||
| `data_home` | String | `./greptimedb_data` | The working home directory. |
|
||||
| `bind_addr` | String | `127.0.0.1:3002` | The bind address of metasrv. |
|
||||
| `server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `bind_addr`. |
|
||||
| `store_addrs` | Array | -- | Store server address default to etcd store.<br/>For postgres store, the format is:<br/>"password=password dbname=postgres user=postgres host=localhost port=5432"<br/>For etcd store, the format is:<br/>"127.0.0.1:2379" |
|
||||
| `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. |
|
||||
| `backend` | String | `etcd_store` | The datastore for meta server.<br/>Available values:<br/>- `etcd_store` (default value)<br/>- `memory_store`<br/>- `postgres_store` |
|
||||
| `backend` | String | `etcd_store` | The datastore for meta server.<br/>Available values:<br/>- `etcd_store` (default value)<br/>- `memory_store`<br/>- `postgres_store`<br/>- `mysql_store` |
|
||||
| `meta_table_name` | String | `greptime_metakv` | Table name in RDS to store metadata. Effect when using a RDS kvbackend.<br/>**Only used when backend is `postgres_store`.** |
|
||||
| `meta_election_lock_id` | Integer | `1` | Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend<br/>Only used when backend is `postgres_store`. |
|
||||
| `selector` | String | `round_robin` | Datanode selector type.<br/>- `round_robin` (default value)<br/>- `lease_based`<br/>- `load_based`<br/>For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". |
|
||||
@@ -331,6 +328,12 @@
|
||||
| `runtime` | -- | -- | The runtime options. |
|
||||
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
|
||||
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
|
||||
| `grpc` | -- | -- | The gRPC server options. |
|
||||
| `grpc.bind_addr` | String | `127.0.0.1:3002` | The address to bind the gRPC server. |
|
||||
| `grpc.server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `bind_addr`. |
|
||||
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
|
||||
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
|
||||
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
|
||||
| `http` | -- | -- | The HTTP server options. |
|
||||
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
|
||||
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
|
||||
@@ -372,13 +375,11 @@
|
||||
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
|
||||
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
|
||||
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
|
||||
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics` | -- | -- | The metasrv can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
|
||||
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
|
||||
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
|
||||
| `export_metrics.self_import.db` | String | Unset | -- |
|
||||
| `export_metrics.remote_write` | -- | -- | -- |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
|
||||
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
|
||||
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
|
||||
@@ -404,6 +405,7 @@
|
||||
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
|
||||
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
|
||||
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
|
||||
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for datanode side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression. |
|
||||
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
|
||||
| `grpc.tls.mode` | String | `disable` | TLS mode. |
|
||||
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |
|
||||
@@ -536,13 +538,11 @@
|
||||
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
|
||||
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
|
||||
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
|
||||
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
|
||||
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
|
||||
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
|
||||
| `export_metrics.self_import.db` | String | Unset | -- |
|
||||
| `export_metrics.remote_write` | -- | -- | -- |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
|
||||
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
|
||||
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
|
||||
|
||||
@@ -44,6 +44,13 @@ runtime_size = 8
|
||||
max_recv_message_size = "512MB"
|
||||
## The maximum send message size for gRPC server.
|
||||
max_send_message_size = "512MB"
|
||||
## Compression mode for datanode side Arrow IPC service. Available options:
|
||||
## - `none`: disable all compression
|
||||
## - `transport`: only enable gRPC transport compression (zstd)
|
||||
## - `arrow_ipc`: only enable Arrow IPC compression (lz4)
|
||||
## - `all`: enable all compression.
|
||||
## Default to `none`
|
||||
flight_compression = "arrow_ipc"
|
||||
|
||||
## gRPC server TLS options, see `mysql.tls` section.
|
||||
[grpc.tls]
|
||||
@@ -635,24 +642,16 @@ max_log_files = 720
|
||||
[logging.tracing_sample_ratio]
|
||||
default_ratio = 1.0
|
||||
|
||||
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
|
||||
## The datanode can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.
|
||||
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
|
||||
[export_metrics]
|
||||
|
||||
## whether enable export metrics.
|
||||
enable = false
|
||||
|
||||
## The interval of export metrics.
|
||||
write_interval = "30s"
|
||||
|
||||
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
|
||||
## You must create the database before enabling it.
|
||||
[export_metrics.self_import]
|
||||
## @toml2docs:none-default
|
||||
db = "greptime_metrics"
|
||||
|
||||
[export_metrics.remote_write]
|
||||
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
url = ""
|
||||
|
||||
## HTTP headers of Prometheus remote-write carry.
|
||||
|
||||
@@ -54,6 +54,13 @@ bind_addr = "127.0.0.1:4001"
|
||||
server_addr = "127.0.0.1:4001"
|
||||
## The number of server worker threads.
|
||||
runtime_size = 8
|
||||
## Compression mode for frontend side Arrow IPC service. Available options:
|
||||
## - `none`: disable all compression
|
||||
## - `transport`: only enable gRPC transport compression (zstd)
|
||||
## - `arrow_ipc`: only enable Arrow IPC compression (lz4)
|
||||
## - `all`: enable all compression.
|
||||
## Default to `none`
|
||||
flight_compression = "arrow_ipc"
|
||||
|
||||
## gRPC server TLS options, see `mysql.tls` section.
|
||||
[grpc.tls]
|
||||
@@ -247,24 +254,16 @@ sample_ratio = 1.0
|
||||
## The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`.
|
||||
ttl = "30d"
|
||||
|
||||
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
|
||||
## The frontend can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.
|
||||
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
|
||||
[export_metrics]
|
||||
|
||||
## whether enable export metrics.
|
||||
enable = false
|
||||
|
||||
## The interval of export metrics.
|
||||
write_interval = "30s"
|
||||
|
||||
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
|
||||
## You must create the database before enabling it.
|
||||
[export_metrics.self_import]
|
||||
## @toml2docs:none-default
|
||||
db = "greptime_metrics"
|
||||
|
||||
[export_metrics.remote_write]
|
||||
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
url = ""
|
||||
|
||||
## HTTP headers of Prometheus remote-write carry.
|
||||
|
||||
@@ -1,14 +1,6 @@
|
||||
## The working home directory.
|
||||
data_home = "./greptimedb_data"
|
||||
|
||||
## The bind address of metasrv.
|
||||
bind_addr = "127.0.0.1:3002"
|
||||
|
||||
## The communication server address for the frontend and datanode to connect to metasrv.
|
||||
## If left empty or unset, the server will automatically use the IP address of the first network interface
|
||||
## on the host, with the same port number as the one specified in `bind_addr`.
|
||||
server_addr = "127.0.0.1:3002"
|
||||
|
||||
## Store server address default to etcd store.
|
||||
## For postgres store, the format is:
|
||||
## "password=password dbname=postgres user=postgres host=localhost port=5432"
|
||||
@@ -24,6 +16,7 @@ store_key_prefix = ""
|
||||
## - `etcd_store` (default value)
|
||||
## - `memory_store`
|
||||
## - `postgres_store`
|
||||
## - `mysql_store`
|
||||
backend = "etcd_store"
|
||||
|
||||
## Table name in RDS to store metadata. Effect when using a RDS kvbackend.
|
||||
@@ -67,6 +60,21 @@ node_max_idle_time = "24hours"
|
||||
## The number of threads to execute the runtime for global write operations.
|
||||
#+ compact_rt_size = 4
|
||||
|
||||
## The gRPC server options.
|
||||
[grpc]
|
||||
## The address to bind the gRPC server.
|
||||
bind_addr = "127.0.0.1:3002"
|
||||
## The communication server address for the frontend and datanode to connect to metasrv.
|
||||
## If left empty or unset, the server will automatically use the IP address of the first network interface
|
||||
## on the host, with the same port number as the one specified in `bind_addr`.
|
||||
server_addr = "127.0.0.1:3002"
|
||||
## The number of server worker threads.
|
||||
runtime_size = 8
|
||||
## The maximum receive message size for gRPC server.
|
||||
max_recv_message_size = "512MB"
|
||||
## The maximum send message size for gRPC server.
|
||||
max_send_message_size = "512MB"
|
||||
|
||||
## The HTTP server options.
|
||||
[http]
|
||||
## The address to bind the HTTP server.
|
||||
@@ -229,24 +237,16 @@ max_log_files = 720
|
||||
[logging.tracing_sample_ratio]
|
||||
default_ratio = 1.0
|
||||
|
||||
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
|
||||
## The metasrv can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.
|
||||
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
|
||||
[export_metrics]
|
||||
|
||||
## whether enable export metrics.
|
||||
enable = false
|
||||
|
||||
## The interval of export metrics.
|
||||
write_interval = "30s"
|
||||
|
||||
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
|
||||
## You must create the database before enabling it.
|
||||
[export_metrics.self_import]
|
||||
## @toml2docs:none-default
|
||||
db = "greptime_metrics"
|
||||
|
||||
[export_metrics.remote_write]
|
||||
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
url = ""
|
||||
|
||||
## HTTP headers of Prometheus remote-write carry.
|
||||
|
||||
@@ -750,13 +750,11 @@ default_ratio = 1.0
|
||||
## @toml2docs:none-default
|
||||
#+ sample_ratio = 1.0
|
||||
|
||||
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
|
||||
## The standalone can export its metrics and send to Prometheus compatible service (e.g. `greptimedb`) from remote-write API.
|
||||
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
|
||||
[export_metrics]
|
||||
|
||||
## whether enable export metrics.
|
||||
enable = false
|
||||
|
||||
## The interval of export metrics.
|
||||
write_interval = "30s"
|
||||
|
||||
@@ -767,7 +765,7 @@ write_interval = "30s"
|
||||
db = "greptime_metrics"
|
||||
|
||||
[export_metrics.remote_write]
|
||||
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
url = ""
|
||||
|
||||
## HTTP headers of Prometheus remote-write carry.
|
||||
|
||||
@@ -58,6 +58,7 @@ where
|
||||
info!("{desc}, average operation cost: {cost:.2} ms");
|
||||
}
|
||||
|
||||
/// Command to benchmark table metadata operations.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct BenchTableMetadataCommand {
|
||||
#[clap(long)]
|
||||
|
||||
@@ -244,6 +244,18 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
#[snafu(display("Unsupported memory backend"))]
|
||||
UnsupportedMemoryBackend {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("File path invalid: {}", msg))]
|
||||
InvalidFilePath {
|
||||
msg: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -262,6 +274,8 @@ impl ErrorExt for Error {
|
||||
| Error::ConnectEtcd { .. }
|
||||
| Error::CreateDir { .. }
|
||||
| Error::EmptyResult { .. }
|
||||
| Error::InvalidFilePath { .. }
|
||||
| Error::UnsupportedMemoryBackend { .. }
|
||||
| Error::ParseProxyOpts { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
Error::StartProcedureManager { source, .. }
|
||||
|
||||
@@ -50,6 +50,7 @@ enum ExportTarget {
|
||||
All,
|
||||
}
|
||||
|
||||
/// Command for exporting data from the GreptimeDB.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct ExportCommand {
|
||||
/// Server address to connect
|
||||
|
||||
@@ -40,6 +40,7 @@ enum ImportTarget {
|
||||
All,
|
||||
}
|
||||
|
||||
/// Command to import data from a directory into a GreptimeDB instance.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct ImportCommand {
|
||||
/// Server address to connect
|
||||
|
||||
@@ -20,7 +20,7 @@ mod import;
|
||||
mod meta_snapshot;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use clap::Parser;
|
||||
use clap::{Parser, Subcommand};
|
||||
use common_error::ext::BoxedError;
|
||||
pub use database::DatabaseClient;
|
||||
use error::Result;
|
||||
@@ -28,7 +28,7 @@ use error::Result;
|
||||
pub use crate::bench::BenchTableMetadataCommand;
|
||||
pub use crate::export::ExportCommand;
|
||||
pub use crate::import::ImportCommand;
|
||||
pub use crate::meta_snapshot::{MetaRestoreCommand, MetaSnapshotCommand};
|
||||
pub use crate::meta_snapshot::{MetaCommand, MetaInfoCommand, MetaRestoreCommand, MetaSaveCommand};
|
||||
|
||||
#[async_trait]
|
||||
pub trait Tool: Send + Sync {
|
||||
@@ -51,3 +51,19 @@ impl AttachCommand {
|
||||
unimplemented!("Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373")
|
||||
}
|
||||
}
|
||||
|
||||
/// Subcommand for data operations like export and import.
|
||||
#[derive(Subcommand)]
|
||||
pub enum DataCommand {
|
||||
Export(ExportCommand),
|
||||
Import(ImportCommand),
|
||||
}
|
||||
|
||||
impl DataCommand {
|
||||
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
|
||||
match self {
|
||||
DataCommand::Export(cmd) => cmd.build().await,
|
||||
DataCommand::Import(cmd) => cmd.build().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,10 +12,11 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use clap::Parser;
|
||||
use clap::{Parser, Subcommand};
|
||||
use common_base::secrets::{ExposeSecret, SecretString};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::kv_backend::chroot::ChrootKvBackend;
|
||||
@@ -26,10 +27,50 @@ use meta_srv::bootstrap::create_etcd_client;
|
||||
use meta_srv::metasrv::BackendImpl;
|
||||
use object_store::services::{Fs, S3};
|
||||
use object_store::ObjectStore;
|
||||
use snafu::ResultExt;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::error::{KvBackendNotSetSnafu, OpenDalSnafu, S3ConfigNotSetSnafu};
|
||||
use crate::error::{
|
||||
InvalidFilePathSnafu, KvBackendNotSetSnafu, OpenDalSnafu, S3ConfigNotSetSnafu,
|
||||
UnsupportedMemoryBackendSnafu,
|
||||
};
|
||||
use crate::Tool;
|
||||
|
||||
/// Subcommand for metadata snapshot management.
|
||||
#[derive(Subcommand)]
|
||||
pub enum MetaCommand {
|
||||
#[clap(subcommand)]
|
||||
Snapshot(MetaSnapshotCommand),
|
||||
}
|
||||
|
||||
impl MetaCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
match self {
|
||||
MetaCommand::Snapshot(cmd) => cmd.build().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Subcommand for metadata snapshot operations. such as save, restore and info.
|
||||
#[derive(Subcommand)]
|
||||
pub enum MetaSnapshotCommand {
|
||||
/// Export metadata snapshot tool.
|
||||
Save(MetaSaveCommand),
|
||||
/// Restore metadata snapshot tool.
|
||||
Restore(MetaRestoreCommand),
|
||||
/// Explore metadata from metadata snapshot.
|
||||
Info(MetaInfoCommand),
|
||||
}
|
||||
|
||||
impl MetaSnapshotCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
match self {
|
||||
MetaSnapshotCommand::Save(cmd) => cmd.build().await,
|
||||
MetaSnapshotCommand::Restore(cmd) => cmd.build().await,
|
||||
MetaSnapshotCommand::Info(cmd) => cmd.build().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Parser)]
|
||||
struct MetaConnection {
|
||||
/// The endpoint of store. one of etcd, pg or mysql.
|
||||
@@ -91,6 +132,9 @@ impl MetaConnection {
|
||||
.await
|
||||
.map_err(BoxedError::new)?)
|
||||
}
|
||||
Some(BackendImpl::MemoryStore) => UnsupportedMemoryBackendSnafu
|
||||
.fail()
|
||||
.map_err(BoxedError::new),
|
||||
_ => KvBackendNotSetSnafu { backend: "all" }
|
||||
.fail()
|
||||
.map_err(BoxedError::new),
|
||||
@@ -170,7 +214,7 @@ impl S3Config {
|
||||
/// It will dump the metadata snapshot to local file or s3 bucket.
|
||||
/// The snapshot file will be in binary format.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct MetaSnapshotCommand {
|
||||
pub struct MetaSaveCommand {
|
||||
/// The connection to the metadata store.
|
||||
#[clap(flatten)]
|
||||
connection: MetaConnection,
|
||||
@@ -196,7 +240,7 @@ fn create_local_file_object_store(root: &str) -> Result<ObjectStore, BoxedError>
|
||||
Ok(object_store)
|
||||
}
|
||||
|
||||
impl MetaSnapshotCommand {
|
||||
impl MetaSaveCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
let kvbackend = self.connection.build().await?;
|
||||
let output_dir = &self.output_dir;
|
||||
@@ -327,3 +371,89 @@ impl Tool for MetaRestoreTool {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Explore metadata from metadata snapshot.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct MetaInfoCommand {
|
||||
/// The s3 config.
|
||||
#[clap(flatten)]
|
||||
s3_config: S3Config,
|
||||
/// The name of the target snapshot file. we will add the file extension automatically.
|
||||
#[clap(long, default_value = "metadata_snapshot")]
|
||||
file_name: String,
|
||||
/// The query string to filter the metadata.
|
||||
#[clap(long, default_value = "*")]
|
||||
inspect_key: String,
|
||||
/// The limit of the metadata to query.
|
||||
#[clap(long)]
|
||||
limit: Option<usize>,
|
||||
}
|
||||
|
||||
pub struct MetaInfoTool {
|
||||
inner: ObjectStore,
|
||||
source_file: String,
|
||||
inspect_key: String,
|
||||
limit: Option<usize>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Tool for MetaInfoTool {
|
||||
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
|
||||
let result = MetadataSnapshotManager::info(
|
||||
&self.inner,
|
||||
&self.source_file,
|
||||
&self.inspect_key,
|
||||
self.limit,
|
||||
)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
for item in result {
|
||||
println!("{}", item);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl MetaInfoCommand {
|
||||
fn decide_object_store_root_for_local_store(
|
||||
file_path: &str,
|
||||
) -> Result<(&str, &str), BoxedError> {
|
||||
let path = Path::new(file_path);
|
||||
let parent = path
|
||||
.parent()
|
||||
.and_then(|p| p.to_str())
|
||||
.context(InvalidFilePathSnafu { msg: file_path })
|
||||
.map_err(BoxedError::new)?;
|
||||
let file_name = path
|
||||
.file_name()
|
||||
.and_then(|f| f.to_str())
|
||||
.context(InvalidFilePathSnafu { msg: file_path })
|
||||
.map_err(BoxedError::new)?;
|
||||
let root = if parent.is_empty() { "." } else { parent };
|
||||
Ok((root, file_name))
|
||||
}
|
||||
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
let object_store = self.s3_config.build("").map_err(BoxedError::new)?;
|
||||
if let Some(store) = object_store {
|
||||
let tool = MetaInfoTool {
|
||||
inner: store,
|
||||
source_file: self.file_name.clone(),
|
||||
inspect_key: self.inspect_key.clone(),
|
||||
limit: self.limit,
|
||||
};
|
||||
Ok(Box::new(tool))
|
||||
} else {
|
||||
let (root, file_name) =
|
||||
Self::decide_object_store_root_for_local_store(&self.file_name)?;
|
||||
let object_store = create_local_file_object_store(root)?;
|
||||
let tool = MetaInfoTool {
|
||||
inner: object_store,
|
||||
source_file: file_name.to_string(),
|
||||
inspect_key: self.inspect_key.clone(),
|
||||
limit: self.limit,
|
||||
};
|
||||
Ok(Box::new(tool))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -162,12 +162,23 @@ impl Client {
|
||||
.as_bytes() as usize
|
||||
}
|
||||
|
||||
pub fn make_flight_client(&self) -> Result<FlightClient> {
|
||||
pub fn make_flight_client(
|
||||
&self,
|
||||
send_compression: bool,
|
||||
accept_compression: bool,
|
||||
) -> Result<FlightClient> {
|
||||
let (addr, channel) = self.find_channel()?;
|
||||
|
||||
let client = FlightServiceClient::new(channel)
|
||||
let mut client = FlightServiceClient::new(channel)
|
||||
.max_decoding_message_size(self.max_grpc_recv_message_size())
|
||||
.max_encoding_message_size(self.max_grpc_send_message_size());
|
||||
// todo(hl): support compression methods.
|
||||
if send_compression {
|
||||
client = client.send_compressed(CompressionEncoding::Zstd);
|
||||
}
|
||||
if accept_compression {
|
||||
client = client.accept_compressed(CompressionEncoding::Zstd);
|
||||
}
|
||||
|
||||
Ok(FlightClient { addr, client })
|
||||
}
|
||||
|
||||
@@ -49,7 +49,16 @@ impl NodeManager for NodeClients {
|
||||
async fn datanode(&self, datanode: &Peer) -> DatanodeRef {
|
||||
let client = self.get_client(datanode).await;
|
||||
|
||||
Arc::new(RegionRequester::new(client))
|
||||
let ChannelConfig {
|
||||
send_compression,
|
||||
accept_compression,
|
||||
..
|
||||
} = self.channel_manager.config();
|
||||
Arc::new(RegionRequester::new(
|
||||
client,
|
||||
*send_compression,
|
||||
*accept_compression,
|
||||
))
|
||||
}
|
||||
|
||||
async fn flownode(&self, flownode: &Peer) -> FlownodeRef {
|
||||
|
||||
@@ -196,12 +196,22 @@ impl Database {
|
||||
|
||||
/// Retry if connection fails, max_retries is the max number of retries, so the total wait time
|
||||
/// is `max_retries * GRPC_CONN_TIMEOUT`
|
||||
pub async fn handle_with_retry(&self, request: Request, max_retries: u32) -> Result<u32> {
|
||||
pub async fn handle_with_retry(
|
||||
&self,
|
||||
request: Request,
|
||||
max_retries: u32,
|
||||
hints: &[(&str, &str)],
|
||||
) -> Result<u32> {
|
||||
let mut client = make_database_client(&self.client)?.inner;
|
||||
let mut retries = 0;
|
||||
|
||||
let request = self.to_rpc_request(request);
|
||||
|
||||
loop {
|
||||
let raw_response = client.handle(request.clone()).await;
|
||||
let mut tonic_request = tonic::Request::new(request.clone());
|
||||
let metadata = tonic_request.metadata_mut();
|
||||
Self::put_hints(metadata, hints)?;
|
||||
let raw_response = client.handle(tonic_request).await;
|
||||
match (raw_response, retries < max_retries) {
|
||||
(Ok(resp), _) => return from_grpc_response(resp.into_inner()),
|
||||
(Err(err), true) => {
|
||||
@@ -287,7 +297,7 @@ impl Database {
|
||||
let mut request = tonic::Request::new(request);
|
||||
Self::put_hints(request.metadata_mut(), hints)?;
|
||||
|
||||
let mut client = self.client.make_flight_client()?;
|
||||
let mut client = self.client.make_flight_client(false, false)?;
|
||||
|
||||
let response = client.mut_inner().do_get(request).await.or_else(|e| {
|
||||
let tonic_code = e.code();
|
||||
@@ -409,7 +419,7 @@ impl Database {
|
||||
MetadataValue::from_str(db_to_put).context(InvalidTonicMetadataValueSnafu)?,
|
||||
);
|
||||
|
||||
let mut client = self.client.make_flight_client()?;
|
||||
let mut client = self.client.make_flight_client(false, false)?;
|
||||
let response = client.mut_inner().do_put(request).await?;
|
||||
let response = response
|
||||
.into_inner()
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::flow::{FlowRequest, FlowResponse};
|
||||
use api::v1::flow::{DirtyWindowRequest, DirtyWindowRequests, FlowRequest, FlowResponse};
|
||||
use api::v1::region::InsertRequests;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::node_manager::Flownode;
|
||||
@@ -44,6 +44,16 @@ impl Flownode for FlowRequester {
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_meta::error::ExternalSnafu)
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(
|
||||
&self,
|
||||
req: DirtyWindowRequest,
|
||||
) -> common_meta::error::Result<FlowResponse> {
|
||||
self.handle_mark_window_dirty(req)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_meta::error::ExternalSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowRequester {
|
||||
@@ -91,4 +101,20 @@ impl FlowRequester {
|
||||
.into_inner();
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
|
||||
let (addr, mut client) = self.client.raw_flow_client()?;
|
||||
let response = client
|
||||
.handle_mark_dirty_time_window(DirtyWindowRequests {
|
||||
requests: vec![req],
|
||||
})
|
||||
.await
|
||||
.or_else(|e| {
|
||||
let code = e.code();
|
||||
let err: crate::error::Error = e.into();
|
||||
Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code })
|
||||
})?
|
||||
.into_inner();
|
||||
Ok(response)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,6 +46,8 @@ use crate::{metrics, Client, Error};
|
||||
#[derive(Debug)]
|
||||
pub struct RegionRequester {
|
||||
client: Client,
|
||||
send_compression: bool,
|
||||
accept_compression: bool,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -89,12 +91,18 @@ impl Datanode for RegionRequester {
|
||||
}
|
||||
|
||||
impl RegionRequester {
|
||||
pub fn new(client: Client) -> Self {
|
||||
Self { client }
|
||||
pub fn new(client: Client, send_compression: bool, accept_compression: bool) -> Self {
|
||||
Self {
|
||||
client,
|
||||
send_compression,
|
||||
accept_compression,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn do_get_inner(&self, ticket: Ticket) -> Result<SendableRecordBatchStream> {
|
||||
let mut flight_client = self.client.make_flight_client()?;
|
||||
let mut flight_client = self
|
||||
.client
|
||||
.make_flight_client(self.send_compression, self.accept_compression)?;
|
||||
let response = flight_client
|
||||
.mut_inner()
|
||||
.do_get(ticket)
|
||||
|
||||
@@ -146,6 +146,7 @@ mod tests {
|
||||
let output_dir = tempfile::tempdir().unwrap();
|
||||
let cli = cli::Command::parse_from([
|
||||
"cli",
|
||||
"data",
|
||||
"export",
|
||||
"--addr",
|
||||
"127.0.0.1:4000",
|
||||
|
||||
@@ -364,12 +364,16 @@ impl StartCommand {
|
||||
|
||||
// frontend to datanode need not timeout.
|
||||
// Some queries are expected to take long time.
|
||||
let channel_config = ChannelConfig {
|
||||
let mut channel_config = ChannelConfig {
|
||||
timeout: None,
|
||||
tcp_nodelay: opts.datanode.client.tcp_nodelay,
|
||||
connect_timeout: Some(opts.datanode.client.connect_timeout),
|
||||
..Default::default()
|
||||
};
|
||||
if opts.grpc.flight_compression.transport_compression() {
|
||||
channel_config.accept_compression = true;
|
||||
channel_config.send_compression = true;
|
||||
}
|
||||
let client = NodeClients::new(channel_config);
|
||||
|
||||
let instance = FrontendBuilder::new(
|
||||
|
||||
@@ -237,12 +237,20 @@ impl StartCommand {
|
||||
tokio_console_addr: global_options.tokio_console_addr.clone(),
|
||||
};
|
||||
|
||||
#[allow(deprecated)]
|
||||
if let Some(addr) = &self.rpc_bind_addr {
|
||||
opts.bind_addr.clone_from(addr);
|
||||
opts.grpc.bind_addr.clone_from(addr);
|
||||
} else if !opts.bind_addr.is_empty() {
|
||||
opts.grpc.bind_addr.clone_from(&opts.bind_addr);
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
if let Some(addr) = &self.rpc_server_addr {
|
||||
opts.server_addr.clone_from(addr);
|
||||
opts.grpc.server_addr.clone_from(addr);
|
||||
} else if !opts.server_addr.is_empty() {
|
||||
opts.grpc.server_addr.clone_from(&opts.server_addr);
|
||||
}
|
||||
|
||||
if let Some(addrs) = &self.store_addrs {
|
||||
@@ -319,7 +327,7 @@ impl StartCommand {
|
||||
|
||||
let plugin_opts = opts.plugins;
|
||||
let mut opts = opts.component;
|
||||
opts.detect_server_addr();
|
||||
opts.grpc.detect_server_addr();
|
||||
|
||||
info!("Metasrv options: {:#?}", opts);
|
||||
|
||||
@@ -363,7 +371,7 @@ mod tests {
|
||||
};
|
||||
|
||||
let options = cmd.load_options(&Default::default()).unwrap().component;
|
||||
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
|
||||
assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
|
||||
assert_eq!(vec!["127.0.0.1:2380".to_string()], options.store_addrs);
|
||||
assert_eq!(SelectorType::LoadBased, options.selector);
|
||||
}
|
||||
@@ -396,8 +404,8 @@ mod tests {
|
||||
};
|
||||
|
||||
let options = cmd.load_options(&Default::default()).unwrap().component;
|
||||
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
|
||||
assert_eq!("127.0.0.1:3002".to_string(), options.server_addr);
|
||||
assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
|
||||
assert_eq!("127.0.0.1:3002".to_string(), options.grpc.server_addr);
|
||||
assert_eq!(vec!["127.0.0.1:2379".to_string()], options.store_addrs);
|
||||
assert_eq!(SelectorType::LeaseBased, options.selector);
|
||||
assert_eq!("debug", options.logging.level.as_ref().unwrap());
|
||||
@@ -509,10 +517,10 @@ mod tests {
|
||||
let opts = command.load_options(&Default::default()).unwrap().component;
|
||||
|
||||
// Should be read from env, env > default values.
|
||||
assert_eq!(opts.bind_addr, "127.0.0.1:14002");
|
||||
assert_eq!(opts.grpc.bind_addr, "127.0.0.1:14002");
|
||||
|
||||
// Should be read from config file, config file > env > default values.
|
||||
assert_eq!(opts.server_addr, "127.0.0.1:3002");
|
||||
assert_eq!(opts.grpc.server_addr, "127.0.0.1:3002");
|
||||
|
||||
// Should be read from cli, cli > config file > env > default values.
|
||||
assert_eq!(opts.http.addr, "127.0.0.1:14000");
|
||||
|
||||
@@ -95,7 +95,7 @@ fn test_load_datanode_example_config() {
|
||||
..Default::default()
|
||||
},
|
||||
export_metrics: ExportMetricsOption {
|
||||
self_import: Some(Default::default()),
|
||||
self_import: None,
|
||||
remote_write: Some(Default::default()),
|
||||
..Default::default()
|
||||
},
|
||||
@@ -148,7 +148,7 @@ fn test_load_frontend_example_config() {
|
||||
},
|
||||
},
|
||||
export_metrics: ExportMetricsOption {
|
||||
self_import: Some(Default::default()),
|
||||
self_import: None,
|
||||
remote_write: Some(Default::default()),
|
||||
..Default::default()
|
||||
},
|
||||
@@ -176,7 +176,11 @@ fn test_load_metasrv_example_config() {
|
||||
component: MetasrvOptions {
|
||||
selector: SelectorType::default(),
|
||||
data_home: DEFAULT_DATA_HOME.to_string(),
|
||||
server_addr: "127.0.0.1:3002".to_string(),
|
||||
grpc: GrpcOptions {
|
||||
bind_addr: "127.0.0.1:3002".to_string(),
|
||||
server_addr: "127.0.0.1:3002".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
logging: LoggingOptions {
|
||||
dir: Path::new(DEFAULT_DATA_HOME)
|
||||
.join(DEFAULT_LOGGING_DIR)
|
||||
@@ -195,7 +199,7 @@ fn test_load_metasrv_example_config() {
|
||||
},
|
||||
},
|
||||
export_metrics: ExportMetricsOption {
|
||||
self_import: Some(Default::default()),
|
||||
self_import: None,
|
||||
remote_write: Some(Default::default()),
|
||||
..Default::default()
|
||||
},
|
||||
|
||||
@@ -12,11 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod geo_path;
|
||||
mod hll;
|
||||
mod uddsketch_state;
|
||||
|
||||
pub use geo_path::{GeoPathAccumulator, GEO_PATH_NAME};
|
||||
pub(crate) use hll::HllStateType;
|
||||
pub use hll::{HllState, HLL_MERGE_NAME, HLL_NAME};
|
||||
pub use uddsketch_state::{UddSketchState, UDDSKETCH_MERGE_NAME, UDDSKETCH_STATE_NAME};
|
||||
pub mod approximate;
|
||||
#[cfg(feature = "geo")]
|
||||
pub mod geo;
|
||||
pub mod vector;
|
||||
32
src/common/function/src/aggrs/approximate.rs
Normal file
32
src/common/function/src/aggrs/approximate.rs
Normal file
@@ -0,0 +1,32 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
|
||||
pub mod hll;
|
||||
pub mod uddsketch;
|
||||
|
||||
pub(crate) struct ApproximateFunction;
|
||||
|
||||
impl ApproximateFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
// uddsketch
|
||||
registry.register_aggr(uddsketch::UddSketchState::state_udf_impl());
|
||||
registry.register_aggr(uddsketch::UddSketchState::merge_udf_impl());
|
||||
|
||||
// hll
|
||||
registry.register_aggr(hll::HllState::state_udf_impl());
|
||||
registry.register_aggr(hll::HllState::merge_udf_impl());
|
||||
}
|
||||
}
|
||||
27
src/common/function/src/aggrs/geo.rs
Normal file
27
src/common/function/src/aggrs/geo.rs
Normal file
@@ -0,0 +1,27 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
|
||||
mod encoding;
|
||||
mod geo_path;
|
||||
|
||||
pub(crate) struct GeoFunction;
|
||||
|
||||
impl GeoFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register_aggr(geo_path::GeoPathAccumulator::uadf_impl());
|
||||
registry.register_aggr(encoding::JsonPathAccumulator::uadf_impl());
|
||||
}
|
||||
}
|
||||
@@ -19,9 +19,12 @@ use common_error::status_code::StatusCode;
|
||||
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
|
||||
use common_query::error::{self, InvalidInputStateSnafu, Result};
|
||||
use common_query::logical_plan::accumulator::AggrFuncTypeStore;
|
||||
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
|
||||
use common_query::logical_plan::{
|
||||
create_aggregate_function, Accumulator, AggregateFunctionCreator,
|
||||
};
|
||||
use common_query::prelude::AccumulatorCreatorFunction;
|
||||
use common_time::Timestamp;
|
||||
use datafusion_expr::AggregateUDF;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::value::{ListValue, Value};
|
||||
use datatypes::vectors::VectorRef;
|
||||
@@ -47,6 +50,16 @@ impl JsonPathAccumulator {
|
||||
timestamp_type,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new `AggregateUDF` for the `json_encode_path` aggregate function.
|
||||
pub fn uadf_impl() -> AggregateUDF {
|
||||
create_aggregate_function(
|
||||
"json_encode_path".to_string(),
|
||||
3,
|
||||
Arc::new(JsonPathEncodeFunctionCreator::default()),
|
||||
)
|
||||
.into()
|
||||
}
|
||||
}
|
||||
|
||||
impl Accumulator for JsonPathAccumulator {
|
||||
@@ -47,7 +47,7 @@ impl GeoPathAccumulator {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
pub fn udf_impl() -> AggregateUDF {
|
||||
pub fn uadf_impl() -> AggregateUDF {
|
||||
create_udaf(
|
||||
GEO_PATH_NAME,
|
||||
// Input types: lat, lng, timestamp
|
||||
29
src/common/function/src/aggrs/vector.rs
Normal file
29
src/common/function/src/aggrs/vector.rs
Normal file
@@ -0,0 +1,29 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::aggrs::vector::product::VectorProduct;
|
||||
use crate::aggrs::vector::sum::VectorSum;
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
|
||||
mod product;
|
||||
mod sum;
|
||||
|
||||
pub(crate) struct VectorFunction;
|
||||
|
||||
impl VectorFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register_aggr(VectorSum::uadf_impl());
|
||||
registry.register_aggr(VectorProduct::uadf_impl());
|
||||
}
|
||||
}
|
||||
@@ -16,8 +16,11 @@ use std::sync::Arc;
|
||||
|
||||
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
|
||||
use common_query::error::{CreateAccumulatorSnafu, Error, InvalidFuncArgsSnafu};
|
||||
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
|
||||
use common_query::logical_plan::{
|
||||
create_aggregate_function, Accumulator, AggregateFunctionCreator,
|
||||
};
|
||||
use common_query::prelude::AccumulatorCreatorFunction;
|
||||
use datafusion_expr::AggregateUDF;
|
||||
use datatypes::prelude::{ConcreteDataType, Value, *};
|
||||
use datatypes::vectors::VectorRef;
|
||||
use nalgebra::{Const, DVectorView, Dyn, OVector};
|
||||
@@ -75,6 +78,16 @@ impl AggregateFunctionCreator for VectorProductCreator {
|
||||
}
|
||||
|
||||
impl VectorProduct {
|
||||
/// Create a new `AggregateUDF` for the `vec_product` aggregate function.
|
||||
pub fn uadf_impl() -> AggregateUDF {
|
||||
create_aggregate_function(
|
||||
"vec_product".to_string(),
|
||||
1,
|
||||
Arc::new(VectorProductCreator::default()),
|
||||
)
|
||||
.into()
|
||||
}
|
||||
|
||||
fn inner(&mut self, len: usize) -> &mut OVector<f32, Dyn> {
|
||||
self.product.get_or_insert_with(|| {
|
||||
OVector::from_iterator_generic(Dyn(len), Const::<1>, (0..len).map(|_| 1.0))
|
||||
@@ -16,8 +16,11 @@ use std::sync::Arc;
|
||||
|
||||
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
|
||||
use common_query::error::{CreateAccumulatorSnafu, Error, InvalidFuncArgsSnafu};
|
||||
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
|
||||
use common_query::logical_plan::{
|
||||
create_aggregate_function, Accumulator, AggregateFunctionCreator,
|
||||
};
|
||||
use common_query::prelude::AccumulatorCreatorFunction;
|
||||
use datafusion_expr::AggregateUDF;
|
||||
use datatypes::prelude::{ConcreteDataType, Value, *};
|
||||
use datatypes::vectors::VectorRef;
|
||||
use nalgebra::{Const, DVectorView, Dyn, OVector};
|
||||
@@ -25,6 +28,7 @@ use snafu::ensure;
|
||||
|
||||
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const, veclit_to_binlit};
|
||||
|
||||
/// The accumulator for the `vec_sum` aggregate function.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct VectorSum {
|
||||
sum: Option<OVector<f32, Dyn>>,
|
||||
@@ -74,6 +78,16 @@ impl AggregateFunctionCreator for VectorSumCreator {
|
||||
}
|
||||
|
||||
impl VectorSum {
|
||||
/// Create a new `AggregateUDF` for the `vec_sum` aggregate function.
|
||||
pub fn uadf_impl() -> AggregateUDF {
|
||||
create_aggregate_function(
|
||||
"vec_sum".to_string(),
|
||||
1,
|
||||
Arc::new(VectorSumCreator::default()),
|
||||
)
|
||||
.into()
|
||||
}
|
||||
|
||||
fn inner(&mut self, len: usize) -> &mut OVector<f32, Dyn> {
|
||||
self.sum
|
||||
.get_or_insert_with(|| OVector::zeros_generic(Dyn(len), Const::<1>))
|
||||
63
src/common/function/src/function_factory.rs
Normal file
63
src/common/function/src/function_factory.rs
Normal file
@@ -0,0 +1,63 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use datafusion_expr::ScalarUDF;
|
||||
|
||||
use crate::function::{FunctionContext, FunctionRef};
|
||||
use crate::scalars::udf::create_udf;
|
||||
|
||||
/// A factory for creating `ScalarUDF` that require a function context.
|
||||
#[derive(Clone)]
|
||||
pub struct ScalarFunctionFactory {
|
||||
name: String,
|
||||
factory: Arc<dyn Fn(FunctionContext) -> ScalarUDF + Send + Sync>,
|
||||
}
|
||||
|
||||
impl ScalarFunctionFactory {
|
||||
/// Returns the name of the function.
|
||||
pub fn name(&self) -> &str {
|
||||
&self.name
|
||||
}
|
||||
|
||||
/// Returns a `ScalarUDF` when given a function context.
|
||||
pub fn provide(&self, ctx: FunctionContext) -> ScalarUDF {
|
||||
(self.factory)(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ScalarUDF> for ScalarFunctionFactory {
|
||||
fn from(df_udf: ScalarUDF) -> Self {
|
||||
let name = df_udf.name().to_string();
|
||||
let func = Arc::new(move |_ctx| df_udf.clone());
|
||||
Self {
|
||||
name,
|
||||
factory: func,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<FunctionRef> for ScalarFunctionFactory {
|
||||
fn from(func: FunctionRef) -> Self {
|
||||
let name = func.name().to_string();
|
||||
let func = Arc::new(move |ctx: FunctionContext| {
|
||||
create_udf(func.clone(), ctx.query_ctx, ctx.state)
|
||||
});
|
||||
Self {
|
||||
name,
|
||||
factory: func,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -16,11 +16,14 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use datafusion_expr::AggregateUDF;
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
use crate::admin::AdminFunction;
|
||||
use crate::function::{AsyncFunctionRef, FunctionRef};
|
||||
use crate::scalars::aggregate::{AggregateFunctionMetaRef, AggregateFunctions};
|
||||
use crate::aggrs::approximate::ApproximateFunction;
|
||||
use crate::aggrs::vector::VectorFunction as VectorAggrFunction;
|
||||
use crate::function::{AsyncFunctionRef, Function, FunctionRef};
|
||||
use crate::function_factory::ScalarFunctionFactory;
|
||||
use crate::scalars::date::DateFunction;
|
||||
use crate::scalars::expression::ExpressionFunction;
|
||||
use crate::scalars::hll_count::HllCalcFunction;
|
||||
@@ -31,18 +34,19 @@ use crate::scalars::matches_term::MatchesTermFunction;
|
||||
use crate::scalars::math::MathFunction;
|
||||
use crate::scalars::timestamp::TimestampFunction;
|
||||
use crate::scalars::uddsketch_calc::UddSketchCalcFunction;
|
||||
use crate::scalars::vector::VectorFunction;
|
||||
use crate::scalars::vector::VectorFunction as VectorScalarFunction;
|
||||
use crate::system::SystemFunction;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct FunctionRegistry {
|
||||
functions: RwLock<HashMap<String, FunctionRef>>,
|
||||
functions: RwLock<HashMap<String, ScalarFunctionFactory>>,
|
||||
async_functions: RwLock<HashMap<String, AsyncFunctionRef>>,
|
||||
aggregate_functions: RwLock<HashMap<String, AggregateFunctionMetaRef>>,
|
||||
aggregate_functions: RwLock<HashMap<String, AggregateUDF>>,
|
||||
}
|
||||
|
||||
impl FunctionRegistry {
|
||||
pub fn register(&self, func: FunctionRef) {
|
||||
pub fn register(&self, func: impl Into<ScalarFunctionFactory>) {
|
||||
let func = func.into();
|
||||
let _ = self
|
||||
.functions
|
||||
.write()
|
||||
@@ -50,6 +54,10 @@ impl FunctionRegistry {
|
||||
.insert(func.name().to_string(), func);
|
||||
}
|
||||
|
||||
pub fn register_scalar(&self, func: impl Function + 'static) {
|
||||
self.register(Arc::new(func) as FunctionRef);
|
||||
}
|
||||
|
||||
pub fn register_async(&self, func: AsyncFunctionRef) {
|
||||
let _ = self
|
||||
.async_functions
|
||||
@@ -58,6 +66,14 @@ impl FunctionRegistry {
|
||||
.insert(func.name().to_string(), func);
|
||||
}
|
||||
|
||||
pub fn register_aggr(&self, func: AggregateUDF) {
|
||||
let _ = self
|
||||
.aggregate_functions
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert(func.name().to_string(), func);
|
||||
}
|
||||
|
||||
pub fn get_async_function(&self, name: &str) -> Option<AsyncFunctionRef> {
|
||||
self.async_functions.read().unwrap().get(name).cloned()
|
||||
}
|
||||
@@ -71,27 +87,16 @@ impl FunctionRegistry {
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn register_aggregate_function(&self, func: AggregateFunctionMetaRef) {
|
||||
let _ = self
|
||||
.aggregate_functions
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert(func.name(), func);
|
||||
}
|
||||
|
||||
pub fn get_aggr_function(&self, name: &str) -> Option<AggregateFunctionMetaRef> {
|
||||
self.aggregate_functions.read().unwrap().get(name).cloned()
|
||||
}
|
||||
|
||||
pub fn get_function(&self, name: &str) -> Option<FunctionRef> {
|
||||
#[cfg(test)]
|
||||
pub fn get_function(&self, name: &str) -> Option<ScalarFunctionFactory> {
|
||||
self.functions.read().unwrap().get(name).cloned()
|
||||
}
|
||||
|
||||
pub fn functions(&self) -> Vec<FunctionRef> {
|
||||
pub fn scalar_functions(&self) -> Vec<ScalarFunctionFactory> {
|
||||
self.functions.read().unwrap().values().cloned().collect()
|
||||
}
|
||||
|
||||
pub fn aggregate_functions(&self) -> Vec<AggregateFunctionMetaRef> {
|
||||
pub fn aggregate_functions(&self) -> Vec<AggregateUDF> {
|
||||
self.aggregate_functions
|
||||
.read()
|
||||
.unwrap()
|
||||
@@ -112,9 +117,6 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
|
||||
UddSketchCalcFunction::register(&function_registry);
|
||||
HllCalcFunction::register(&function_registry);
|
||||
|
||||
// Aggregate functions
|
||||
AggregateFunctions::register(&function_registry);
|
||||
|
||||
// Full text search function
|
||||
MatchesFunction::register(&function_registry);
|
||||
MatchesTermFunction::register(&function_registry);
|
||||
@@ -127,15 +129,21 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
|
||||
JsonFunction::register(&function_registry);
|
||||
|
||||
// Vector related functions
|
||||
VectorFunction::register(&function_registry);
|
||||
VectorScalarFunction::register(&function_registry);
|
||||
VectorAggrFunction::register(&function_registry);
|
||||
|
||||
// Geo functions
|
||||
#[cfg(feature = "geo")]
|
||||
crate::scalars::geo::GeoFunctions::register(&function_registry);
|
||||
#[cfg(feature = "geo")]
|
||||
crate::aggrs::geo::GeoFunction::register(&function_registry);
|
||||
|
||||
// Ip functions
|
||||
IpFunctions::register(&function_registry);
|
||||
|
||||
// Approximate functions
|
||||
ApproximateFunction::register(&function_registry);
|
||||
|
||||
Arc::new(function_registry)
|
||||
});
|
||||
|
||||
@@ -147,12 +155,11 @@ mod tests {
|
||||
#[test]
|
||||
fn test_function_registry() {
|
||||
let registry = FunctionRegistry::default();
|
||||
let func = Arc::new(TestAndFunction);
|
||||
|
||||
assert!(registry.get_function("test_and").is_none());
|
||||
assert!(registry.functions().is_empty());
|
||||
registry.register(func);
|
||||
assert!(registry.scalar_functions().is_empty());
|
||||
registry.register_scalar(TestAndFunction);
|
||||
let _ = registry.get_function("test_and").unwrap();
|
||||
assert_eq!(1, registry.functions().len());
|
||||
assert_eq!(1, registry.scalar_functions().len());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,13 +19,14 @@ mod adjust_flow;
|
||||
mod admin;
|
||||
mod flush_flow;
|
||||
mod macros;
|
||||
pub mod scalars;
|
||||
mod system;
|
||||
|
||||
pub mod aggr;
|
||||
pub mod aggrs;
|
||||
pub mod function;
|
||||
pub mod function_factory;
|
||||
pub mod function_registry;
|
||||
pub mod handlers;
|
||||
pub mod helper;
|
||||
pub mod scalars;
|
||||
pub mod state;
|
||||
pub mod utils;
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub mod aggregate;
|
||||
pub(crate) mod date;
|
||||
pub mod expression;
|
||||
#[cfg(feature = "geo")]
|
||||
|
||||
@@ -1,89 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! # Deprecate Warning:
|
||||
//!
|
||||
//! This module is deprecated and will be removed in the future.
|
||||
//! All UDAF implementation here are not maintained and should
|
||||
//! not be used before they are refactored into the `src/aggr`
|
||||
//! version.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::logical_plan::AggregateFunctionCreatorRef;
|
||||
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
use crate::scalars::vector::product::VectorProductCreator;
|
||||
use crate::scalars::vector::sum::VectorSumCreator;
|
||||
|
||||
/// A function creates `AggregateFunctionCreator`.
|
||||
/// "Aggregator" *is* AggregatorFunction. Since the later one is long, we named an short alias for it.
|
||||
/// The two names might be used interchangeably.
|
||||
type AggregatorCreatorFunction = Arc<dyn Fn() -> AggregateFunctionCreatorRef + Send + Sync>;
|
||||
|
||||
/// `AggregateFunctionMeta` dynamically creates AggregateFunctionCreator.
|
||||
#[derive(Clone)]
|
||||
pub struct AggregateFunctionMeta {
|
||||
name: String,
|
||||
args_count: u8,
|
||||
creator: AggregatorCreatorFunction,
|
||||
}
|
||||
|
||||
pub type AggregateFunctionMetaRef = Arc<AggregateFunctionMeta>;
|
||||
|
||||
impl AggregateFunctionMeta {
|
||||
pub fn new(name: &str, args_count: u8, creator: AggregatorCreatorFunction) -> Self {
|
||||
Self {
|
||||
name: name.to_string(),
|
||||
args_count,
|
||||
creator,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn name(&self) -> String {
|
||||
self.name.to_string()
|
||||
}
|
||||
|
||||
pub fn args_count(&self) -> u8 {
|
||||
self.args_count
|
||||
}
|
||||
|
||||
pub fn create(&self) -> AggregateFunctionCreatorRef {
|
||||
(self.creator)()
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct AggregateFunctions;
|
||||
|
||||
impl AggregateFunctions {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
|
||||
"vec_sum",
|
||||
1,
|
||||
Arc::new(|| Arc::new(VectorSumCreator::default())),
|
||||
)));
|
||||
registry.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
|
||||
"vec_product",
|
||||
1,
|
||||
Arc::new(|| Arc::new(VectorProductCreator::default())),
|
||||
)));
|
||||
|
||||
#[cfg(feature = "geo")]
|
||||
registry.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
|
||||
"json_encode_path",
|
||||
3,
|
||||
Arc::new(|| Arc::new(super::geo::encoding::JsonPathEncodeFunctionCreator::default())),
|
||||
)));
|
||||
}
|
||||
}
|
||||
@@ -12,7 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
mod date_add;
|
||||
mod date_format;
|
||||
mod date_sub;
|
||||
@@ -27,8 +26,8 @@ pub(crate) struct DateFunction;
|
||||
|
||||
impl DateFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register(Arc::new(DateAddFunction));
|
||||
registry.register(Arc::new(DateSubFunction));
|
||||
registry.register(Arc::new(DateFormatFunction));
|
||||
registry.register_scalar(DateAddFunction);
|
||||
registry.register_scalar(DateSubFunction);
|
||||
registry.register_scalar(DateFormatFunction);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,8 +17,6 @@ mod ctx;
|
||||
mod is_null;
|
||||
mod unary;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
pub use binary::scalar_binary_op;
|
||||
pub use ctx::EvalContext;
|
||||
pub use unary::scalar_unary_op;
|
||||
@@ -30,6 +28,6 @@ pub(crate) struct ExpressionFunction;
|
||||
|
||||
impl ExpressionFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register(Arc::new(IsNullFunction));
|
||||
registry.register_scalar(IsNullFunction);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,11 +12,9 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
pub(crate) mod encoding;
|
||||
mod geohash;
|
||||
mod h3;
|
||||
mod helpers;
|
||||
pub(crate) mod helpers;
|
||||
mod measure;
|
||||
mod relation;
|
||||
mod s2;
|
||||
@@ -29,57 +27,57 @@ pub(crate) struct GeoFunctions;
|
||||
impl GeoFunctions {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
// geohash
|
||||
registry.register(Arc::new(geohash::GeohashFunction));
|
||||
registry.register(Arc::new(geohash::GeohashNeighboursFunction));
|
||||
registry.register_scalar(geohash::GeohashFunction);
|
||||
registry.register_scalar(geohash::GeohashNeighboursFunction);
|
||||
|
||||
// h3 index
|
||||
registry.register(Arc::new(h3::H3LatLngToCell));
|
||||
registry.register(Arc::new(h3::H3LatLngToCellString));
|
||||
registry.register_scalar(h3::H3LatLngToCell);
|
||||
registry.register_scalar(h3::H3LatLngToCellString);
|
||||
|
||||
// h3 index inspection
|
||||
registry.register(Arc::new(h3::H3CellBase));
|
||||
registry.register(Arc::new(h3::H3CellIsPentagon));
|
||||
registry.register(Arc::new(h3::H3StringToCell));
|
||||
registry.register(Arc::new(h3::H3CellToString));
|
||||
registry.register(Arc::new(h3::H3CellCenterLatLng));
|
||||
registry.register(Arc::new(h3::H3CellResolution));
|
||||
registry.register_scalar(h3::H3CellBase);
|
||||
registry.register_scalar(h3::H3CellIsPentagon);
|
||||
registry.register_scalar(h3::H3StringToCell);
|
||||
registry.register_scalar(h3::H3CellToString);
|
||||
registry.register_scalar(h3::H3CellCenterLatLng);
|
||||
registry.register_scalar(h3::H3CellResolution);
|
||||
|
||||
// h3 hierarchical grid
|
||||
registry.register(Arc::new(h3::H3CellCenterChild));
|
||||
registry.register(Arc::new(h3::H3CellParent));
|
||||
registry.register(Arc::new(h3::H3CellToChildren));
|
||||
registry.register(Arc::new(h3::H3CellToChildrenSize));
|
||||
registry.register(Arc::new(h3::H3CellToChildPos));
|
||||
registry.register(Arc::new(h3::H3ChildPosToCell));
|
||||
registry.register(Arc::new(h3::H3CellContains));
|
||||
registry.register_scalar(h3::H3CellCenterChild);
|
||||
registry.register_scalar(h3::H3CellParent);
|
||||
registry.register_scalar(h3::H3CellToChildren);
|
||||
registry.register_scalar(h3::H3CellToChildrenSize);
|
||||
registry.register_scalar(h3::H3CellToChildPos);
|
||||
registry.register_scalar(h3::H3ChildPosToCell);
|
||||
registry.register_scalar(h3::H3CellContains);
|
||||
|
||||
// h3 grid traversal
|
||||
registry.register(Arc::new(h3::H3GridDisk));
|
||||
registry.register(Arc::new(h3::H3GridDiskDistances));
|
||||
registry.register(Arc::new(h3::H3GridDistance));
|
||||
registry.register(Arc::new(h3::H3GridPathCells));
|
||||
registry.register_scalar(h3::H3GridDisk);
|
||||
registry.register_scalar(h3::H3GridDiskDistances);
|
||||
registry.register_scalar(h3::H3GridDistance);
|
||||
registry.register_scalar(h3::H3GridPathCells);
|
||||
|
||||
// h3 measurement
|
||||
registry.register(Arc::new(h3::H3CellDistanceSphereKm));
|
||||
registry.register(Arc::new(h3::H3CellDistanceEuclideanDegree));
|
||||
registry.register_scalar(h3::H3CellDistanceSphereKm);
|
||||
registry.register_scalar(h3::H3CellDistanceEuclideanDegree);
|
||||
|
||||
// s2
|
||||
registry.register(Arc::new(s2::S2LatLngToCell));
|
||||
registry.register(Arc::new(s2::S2CellLevel));
|
||||
registry.register(Arc::new(s2::S2CellToToken));
|
||||
registry.register(Arc::new(s2::S2CellParent));
|
||||
registry.register_scalar(s2::S2LatLngToCell);
|
||||
registry.register_scalar(s2::S2CellLevel);
|
||||
registry.register_scalar(s2::S2CellToToken);
|
||||
registry.register_scalar(s2::S2CellParent);
|
||||
|
||||
// spatial data type
|
||||
registry.register(Arc::new(wkt::LatLngToPointWkt));
|
||||
registry.register_scalar(wkt::LatLngToPointWkt);
|
||||
|
||||
// spatial relation
|
||||
registry.register(Arc::new(relation::STContains));
|
||||
registry.register(Arc::new(relation::STWithin));
|
||||
registry.register(Arc::new(relation::STIntersects));
|
||||
registry.register_scalar(relation::STContains);
|
||||
registry.register_scalar(relation::STWithin);
|
||||
registry.register_scalar(relation::STIntersects);
|
||||
|
||||
// spatial measure
|
||||
registry.register(Arc::new(measure::STDistance));
|
||||
registry.register(Arc::new(measure::STDistanceSphere));
|
||||
registry.register(Arc::new(measure::STArea));
|
||||
registry.register_scalar(measure::STDistance);
|
||||
registry.register_scalar(measure::STDistanceSphere);
|
||||
registry.register_scalar(measure::STArea);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,7 +37,7 @@ macro_rules! ensure_columns_len {
|
||||
};
|
||||
}
|
||||
|
||||
pub(super) use ensure_columns_len;
|
||||
pub(crate) use ensure_columns_len;
|
||||
|
||||
macro_rules! ensure_columns_n {
|
||||
($columns:ident, $n:literal) => {
|
||||
@@ -58,7 +58,7 @@ macro_rules! ensure_columns_n {
|
||||
};
|
||||
}
|
||||
|
||||
pub(super) use ensure_columns_n;
|
||||
pub(crate) use ensure_columns_n;
|
||||
|
||||
macro_rules! ensure_and_coerce {
|
||||
($compare:expr, $coerce:expr) => {{
|
||||
@@ -72,4 +72,4 @@ macro_rules! ensure_and_coerce {
|
||||
}};
|
||||
}
|
||||
|
||||
pub(super) use ensure_and_coerce;
|
||||
pub(crate) use ensure_and_coerce;
|
||||
|
||||
@@ -16,7 +16,6 @@
|
||||
|
||||
use std::fmt;
|
||||
use std::fmt::Display;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::error::{DowncastVectorSnafu, InvalidFuncArgsSnafu, Result};
|
||||
use common_query::prelude::{Signature, Volatility};
|
||||
@@ -27,7 +26,7 @@ use datatypes::vectors::{BinaryVector, MutableVector, UInt64VectorBuilder, Vecto
|
||||
use hyperloglogplus::HyperLogLog;
|
||||
use snafu::OptionExt;
|
||||
|
||||
use crate::aggr::HllStateType;
|
||||
use crate::aggrs::approximate::hll::HllStateType;
|
||||
use crate::function::{Function, FunctionContext};
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
|
||||
@@ -44,7 +43,7 @@ pub struct HllCalcFunction;
|
||||
|
||||
impl HllCalcFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register(Arc::new(HllCalcFunction));
|
||||
registry.register_scalar(HllCalcFunction);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -117,6 +116,8 @@ impl Function for HllCalcFunction {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use datatypes::vectors::BinaryVector;
|
||||
|
||||
use super::*;
|
||||
|
||||
@@ -17,8 +17,6 @@ mod ipv4;
|
||||
mod ipv6;
|
||||
mod range;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use cidr::{Ipv4ToCidr, Ipv6ToCidr};
|
||||
use ipv4::{Ipv4NumToString, Ipv4StringToNum};
|
||||
use ipv6::{Ipv6NumToString, Ipv6StringToNum};
|
||||
@@ -31,15 +29,15 @@ pub(crate) struct IpFunctions;
|
||||
impl IpFunctions {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
// Register IPv4 functions
|
||||
registry.register(Arc::new(Ipv4NumToString));
|
||||
registry.register(Arc::new(Ipv4StringToNum));
|
||||
registry.register(Arc::new(Ipv4ToCidr));
|
||||
registry.register(Arc::new(Ipv4InRange));
|
||||
registry.register_scalar(Ipv4NumToString);
|
||||
registry.register_scalar(Ipv4StringToNum);
|
||||
registry.register_scalar(Ipv4ToCidr);
|
||||
registry.register_scalar(Ipv4InRange);
|
||||
|
||||
// Register IPv6 functions
|
||||
registry.register(Arc::new(Ipv6NumToString));
|
||||
registry.register(Arc::new(Ipv6StringToNum));
|
||||
registry.register(Arc::new(Ipv6ToCidr));
|
||||
registry.register(Arc::new(Ipv6InRange));
|
||||
registry.register_scalar(Ipv6NumToString);
|
||||
registry.register_scalar(Ipv6StringToNum);
|
||||
registry.register_scalar(Ipv6ToCidr);
|
||||
registry.register_scalar(Ipv6InRange);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
pub mod json_get;
|
||||
mod json_is;
|
||||
mod json_path_exists;
|
||||
@@ -33,23 +32,23 @@ pub(crate) struct JsonFunction;
|
||||
|
||||
impl JsonFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register(Arc::new(JsonToStringFunction));
|
||||
registry.register(Arc::new(ParseJsonFunction));
|
||||
registry.register_scalar(JsonToStringFunction);
|
||||
registry.register_scalar(ParseJsonFunction);
|
||||
|
||||
registry.register(Arc::new(JsonGetInt));
|
||||
registry.register(Arc::new(JsonGetFloat));
|
||||
registry.register(Arc::new(JsonGetString));
|
||||
registry.register(Arc::new(JsonGetBool));
|
||||
registry.register_scalar(JsonGetInt);
|
||||
registry.register_scalar(JsonGetFloat);
|
||||
registry.register_scalar(JsonGetString);
|
||||
registry.register_scalar(JsonGetBool);
|
||||
|
||||
registry.register(Arc::new(JsonIsNull));
|
||||
registry.register(Arc::new(JsonIsInt));
|
||||
registry.register(Arc::new(JsonIsFloat));
|
||||
registry.register(Arc::new(JsonIsString));
|
||||
registry.register(Arc::new(JsonIsBool));
|
||||
registry.register(Arc::new(JsonIsArray));
|
||||
registry.register(Arc::new(JsonIsObject));
|
||||
registry.register_scalar(JsonIsNull);
|
||||
registry.register_scalar(JsonIsInt);
|
||||
registry.register_scalar(JsonIsFloat);
|
||||
registry.register_scalar(JsonIsString);
|
||||
registry.register_scalar(JsonIsBool);
|
||||
registry.register_scalar(JsonIsArray);
|
||||
registry.register_scalar(JsonIsObject);
|
||||
|
||||
registry.register(Arc::new(json_path_exists::JsonPathExistsFunction));
|
||||
registry.register(Arc::new(json_path_match::JsonPathMatchFunction));
|
||||
registry.register_scalar(json_path_exists::JsonPathExistsFunction);
|
||||
registry.register_scalar(json_path_match::JsonPathMatchFunction);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,11 +38,11 @@ use crate::function_registry::FunctionRegistry;
|
||||
///
|
||||
/// Usage: matches(`<col>`, `<pattern>`) -> boolean
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub(crate) struct MatchesFunction;
|
||||
pub struct MatchesFunction;
|
||||
|
||||
impl MatchesFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register(Arc::new(MatchesFunction));
|
||||
registry.register_scalar(MatchesFunction);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -77,7 +77,7 @@ pub struct MatchesTermFunction;
|
||||
|
||||
impl MatchesTermFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register(Arc::new(MatchesTermFunction));
|
||||
registry.register_scalar(MatchesTermFunction);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,6 @@ mod pow;
|
||||
mod rate;
|
||||
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub use clamp::{ClampFunction, ClampMaxFunction, ClampMinFunction};
|
||||
use common_query::error::{GeneralDataFusionSnafu, Result};
|
||||
@@ -39,13 +38,13 @@ pub(crate) struct MathFunction;
|
||||
|
||||
impl MathFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register(Arc::new(ModuloFunction));
|
||||
registry.register(Arc::new(PowFunction));
|
||||
registry.register(Arc::new(RateFunction));
|
||||
registry.register(Arc::new(RangeFunction));
|
||||
registry.register(Arc::new(ClampFunction));
|
||||
registry.register(Arc::new(ClampMinFunction));
|
||||
registry.register(Arc::new(ClampMaxFunction));
|
||||
registry.register_scalar(ModuloFunction);
|
||||
registry.register_scalar(PowFunction);
|
||||
registry.register_scalar(RateFunction);
|
||||
registry.register_scalar(RangeFunction);
|
||||
registry.register_scalar(ClampFunction);
|
||||
registry.register_scalar(ClampMinFunction);
|
||||
registry.register_scalar(ClampMaxFunction);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
mod to_unixtime;
|
||||
|
||||
use to_unixtime::ToUnixtimeFunction;
|
||||
@@ -23,6 +22,6 @@ pub(crate) struct TimestampFunction;
|
||||
|
||||
impl TimestampFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register(Arc::new(ToUnixtimeFunction));
|
||||
registry.register_scalar(ToUnixtimeFunction);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,7 +16,6 @@
|
||||
|
||||
use std::fmt;
|
||||
use std::fmt::Display;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::error::{DowncastVectorSnafu, InvalidFuncArgsSnafu, Result};
|
||||
use common_query::prelude::{Signature, Volatility};
|
||||
@@ -44,7 +43,7 @@ pub struct UddSketchCalcFunction;
|
||||
|
||||
impl UddSketchCalcFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register(Arc::new(UddSketchCalcFunction));
|
||||
registry.register_scalar(UddSketchCalcFunction);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -17,10 +17,8 @@ mod distance;
|
||||
mod elem_product;
|
||||
mod elem_sum;
|
||||
pub mod impl_conv;
|
||||
pub(crate) mod product;
|
||||
mod scalar_add;
|
||||
mod scalar_mul;
|
||||
pub(crate) mod sum;
|
||||
mod vector_add;
|
||||
mod vector_dim;
|
||||
mod vector_div;
|
||||
@@ -30,37 +28,34 @@ mod vector_norm;
|
||||
mod vector_sub;
|
||||
mod vector_subvector;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
|
||||
pub(crate) struct VectorFunction;
|
||||
|
||||
impl VectorFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
// conversion
|
||||
registry.register(Arc::new(convert::ParseVectorFunction));
|
||||
registry.register(Arc::new(convert::VectorToStringFunction));
|
||||
registry.register_scalar(convert::ParseVectorFunction);
|
||||
registry.register_scalar(convert::VectorToStringFunction);
|
||||
|
||||
// distance
|
||||
registry.register(Arc::new(distance::CosDistanceFunction));
|
||||
registry.register(Arc::new(distance::DotProductFunction));
|
||||
registry.register(Arc::new(distance::L2SqDistanceFunction));
|
||||
registry.register_scalar(distance::CosDistanceFunction);
|
||||
registry.register_scalar(distance::DotProductFunction);
|
||||
registry.register_scalar(distance::L2SqDistanceFunction);
|
||||
|
||||
// scalar calculation
|
||||
registry.register(Arc::new(scalar_add::ScalarAddFunction));
|
||||
registry.register(Arc::new(scalar_mul::ScalarMulFunction));
|
||||
registry.register_scalar(scalar_add::ScalarAddFunction);
|
||||
registry.register_scalar(scalar_mul::ScalarMulFunction);
|
||||
|
||||
// vector calculation
|
||||
registry.register(Arc::new(vector_add::VectorAddFunction));
|
||||
registry.register(Arc::new(vector_sub::VectorSubFunction));
|
||||
registry.register(Arc::new(vector_mul::VectorMulFunction));
|
||||
registry.register(Arc::new(vector_div::VectorDivFunction));
|
||||
registry.register(Arc::new(vector_norm::VectorNormFunction));
|
||||
registry.register(Arc::new(vector_dim::VectorDimFunction));
|
||||
registry.register(Arc::new(vector_kth_elem::VectorKthElemFunction));
|
||||
registry.register(Arc::new(vector_subvector::VectorSubvectorFunction));
|
||||
registry.register(Arc::new(elem_sum::ElemSumFunction));
|
||||
registry.register(Arc::new(elem_product::ElemProductFunction));
|
||||
registry.register_scalar(vector_add::VectorAddFunction);
|
||||
registry.register_scalar(vector_sub::VectorSubFunction);
|
||||
registry.register_scalar(vector_mul::VectorMulFunction);
|
||||
registry.register_scalar(vector_div::VectorDivFunction);
|
||||
registry.register_scalar(vector_norm::VectorNormFunction);
|
||||
registry.register_scalar(vector_dim::VectorDimFunction);
|
||||
registry.register_scalar(vector_kth_elem::VectorKthElemFunction);
|
||||
registry.register_scalar(vector_subvector::VectorSubvectorFunction);
|
||||
registry.register_scalar(elem_sum::ElemSumFunction);
|
||||
registry.register_scalar(elem_product::ElemProductFunction);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,13 +36,13 @@ pub(crate) struct SystemFunction;
|
||||
|
||||
impl SystemFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register(Arc::new(BuildFunction));
|
||||
registry.register(Arc::new(VersionFunction));
|
||||
registry.register(Arc::new(CurrentSchemaFunction));
|
||||
registry.register(Arc::new(DatabaseFunction));
|
||||
registry.register(Arc::new(SessionUserFunction));
|
||||
registry.register(Arc::new(ReadPreferenceFunction));
|
||||
registry.register(Arc::new(TimezoneFunction));
|
||||
registry.register_scalar(BuildFunction);
|
||||
registry.register_scalar(VersionFunction);
|
||||
registry.register_scalar(CurrentSchemaFunction);
|
||||
registry.register_scalar(DatabaseFunction);
|
||||
registry.register_scalar(SessionUserFunction);
|
||||
registry.register_scalar(ReadPreferenceFunction);
|
||||
registry.register_scalar(TimezoneFunction);
|
||||
registry.register_async(Arc::new(ProcedureStateFunction));
|
||||
PGCatalogFunction::register(registry);
|
||||
}
|
||||
|
||||
@@ -16,8 +16,6 @@ mod pg_get_userbyid;
|
||||
mod table_is_visible;
|
||||
mod version;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use pg_get_userbyid::PGGetUserByIdFunction;
|
||||
use table_is_visible::PGTableIsVisibleFunction;
|
||||
use version::PGVersionFunction;
|
||||
@@ -35,8 +33,8 @@ pub(super) struct PGCatalogFunction;
|
||||
|
||||
impl PGCatalogFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register(Arc::new(PGTableIsVisibleFunction));
|
||||
registry.register(Arc::new(PGGetUserByIdFunction));
|
||||
registry.register(Arc::new(PGVersionFunction));
|
||||
registry.register_scalar(PGTableIsVisibleFunction);
|
||||
registry.register_scalar(PGGetUserByIdFunction);
|
||||
registry.register_scalar(PGVersionFunction);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -296,6 +296,8 @@ pub struct ChannelConfig {
|
||||
pub max_recv_message_size: ReadableSize,
|
||||
// Max gRPC sending(encoding) message size
|
||||
pub max_send_message_size: ReadableSize,
|
||||
pub send_compression: bool,
|
||||
pub accept_compression: bool,
|
||||
}
|
||||
|
||||
impl Default for ChannelConfig {
|
||||
@@ -316,6 +318,8 @@ impl Default for ChannelConfig {
|
||||
client_tls: None,
|
||||
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
|
||||
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
|
||||
send_compression: false,
|
||||
accept_compression: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -566,6 +570,8 @@ mod tests {
|
||||
client_tls: None,
|
||||
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
|
||||
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
|
||||
send_compression: false,
|
||||
accept_compression: false,
|
||||
},
|
||||
default_cfg
|
||||
);
|
||||
@@ -610,6 +616,8 @@ mod tests {
|
||||
}),
|
||||
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
|
||||
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
|
||||
send_compression: false,
|
||||
accept_compression: false,
|
||||
},
|
||||
cfg
|
||||
);
|
||||
|
||||
@@ -64,6 +64,7 @@ impl Default for FlightEncoder {
|
||||
}
|
||||
|
||||
impl FlightEncoder {
|
||||
/// Creates new [FlightEncoder] with compression disabled.
|
||||
pub fn with_compression_disabled() -> Self {
|
||||
let write_options = writer::IpcWriteOptions::default()
|
||||
.try_with_compression(None)
|
||||
|
||||
@@ -35,6 +35,9 @@ pub const FLOWNODE_LEASE_SECS: u64 = DATANODE_LEASE_SECS;
|
||||
/// The lease seconds of metasrv leader.
|
||||
pub const META_LEASE_SECS: u64 = 5;
|
||||
|
||||
/// The keep-alive interval of the Postgres connection.
|
||||
pub const POSTGRES_KEEP_ALIVE_SECS: u64 = 30;
|
||||
|
||||
/// In a lease, there are two opportunities for renewal.
|
||||
pub const META_KEEP_ALIVE_INTERVAL_SECS: u64 = META_LEASE_SECS / 2;
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::region::RegionResponse;
|
||||
use api::v1::flow::{FlowRequest, FlowResponse};
|
||||
use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse};
|
||||
use api::v1::region::{InsertRequests, RegionRequest};
|
||||
pub use common_base::AffectedRows;
|
||||
use common_query::request::QueryRequest;
|
||||
@@ -42,6 +42,9 @@ pub trait Flownode: Send + Sync {
|
||||
async fn handle(&self, request: FlowRequest) -> Result<FlowResponse>;
|
||||
|
||||
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse>;
|
||||
|
||||
/// Handles requests to mark time window as dirty.
|
||||
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse>;
|
||||
}
|
||||
|
||||
pub type FlownodeRef = Arc<dyn Flownode>;
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
pub mod file;
|
||||
|
||||
use std::borrow::Cow;
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::Instant;
|
||||
@@ -271,6 +272,49 @@ impl MetadataSnapshotManager {
|
||||
|
||||
Ok((filename.to_string(), num_keyvalues as u64))
|
||||
}
|
||||
|
||||
fn format_output(key: Cow<'_, str>, value: Cow<'_, str>) -> String {
|
||||
format!("{} => {}", key, value)
|
||||
}
|
||||
|
||||
pub async fn info(
|
||||
object_store: &ObjectStore,
|
||||
file_path: &str,
|
||||
query_str: &str,
|
||||
limit: Option<usize>,
|
||||
) -> Result<Vec<String>> {
|
||||
let path = Path::new(file_path);
|
||||
|
||||
let file_name = path
|
||||
.file_name()
|
||||
.and_then(|s| s.to_str())
|
||||
.context(InvalidFilePathSnafu { file_path })?;
|
||||
|
||||
let filename = FileName::try_from(file_name)?;
|
||||
let data = object_store
|
||||
.read(file_path)
|
||||
.await
|
||||
.context(ReadObjectSnafu { file_path })?;
|
||||
let document = Document::from_slice(&filename.extension.format, &data.to_bytes())?;
|
||||
let metadata_content = document.into_metadata_content()?.values();
|
||||
let mut results = Vec::with_capacity(limit.unwrap_or(256));
|
||||
for kv in metadata_content {
|
||||
let key_str = String::from_utf8_lossy(&kv.key);
|
||||
if let Some(prefix) = query_str.strip_suffix('*') {
|
||||
if key_str.starts_with(prefix) {
|
||||
let value_str = String::from_utf8_lossy(&kv.value);
|
||||
results.push(Self::format_output(key_str, value_str));
|
||||
}
|
||||
} else if key_str == query_str {
|
||||
let value_str = String::from_utf8_lossy(&kv.value);
|
||||
results.push(Self::format_output(key_str, value_str));
|
||||
}
|
||||
if results.len() == limit.unwrap_or(usize::MAX) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(results)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -111,6 +111,11 @@ impl MetadataContent {
|
||||
pub fn into_iter(self) -> impl Iterator<Item = KeyValue> {
|
||||
self.values.into_iter()
|
||||
}
|
||||
|
||||
/// Returns the key-value pairs as a vector.
|
||||
pub fn values(self) -> Vec<KeyValue> {
|
||||
self.values
|
||||
}
|
||||
}
|
||||
|
||||
/// The key-value pair of the backup file.
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::region::RegionResponse;
|
||||
use api::v1::flow::{FlowRequest, FlowResponse};
|
||||
use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse};
|
||||
use api::v1::region::{InsertRequests, RegionRequest};
|
||||
pub use common_base::AffectedRows;
|
||||
use common_query::request::QueryRequest;
|
||||
@@ -67,6 +67,14 @@ pub trait MockFlownodeHandler: Sync + Send + Clone {
|
||||
) -> Result<FlowResponse> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(
|
||||
&self,
|
||||
_peer: &Peer,
|
||||
_req: DirtyWindowRequest,
|
||||
) -> Result<FlowResponse> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
/// A mock struct implements [NodeManager] only implement the `datanode` method.
|
||||
@@ -134,6 +142,10 @@ impl<T: MockFlownodeHandler> Flownode for MockNode<T> {
|
||||
async fn handle_inserts(&self, requests: InsertRequests) -> Result<FlowResponse> {
|
||||
self.handler.handle_inserts(&self.peer, requests).await
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
|
||||
self.handler.handle_mark_window_dirty(&self.peer, req).await
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
|
||||
@@ -372,6 +372,7 @@ impl DatanodeBuilder {
|
||||
opts.max_concurrent_queries,
|
||||
//TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter.
|
||||
Duration::from_millis(100),
|
||||
opts.grpc.flight_compression,
|
||||
);
|
||||
|
||||
let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
|
||||
|
||||
@@ -50,6 +50,7 @@ use query::QueryEngineRef;
|
||||
use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
|
||||
use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
|
||||
use servers::grpc::region_server::RegionServerHandler;
|
||||
use servers::grpc::FlightCompression;
|
||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::metric_engine_consts::{
|
||||
@@ -80,6 +81,7 @@ use crate::event_listener::RegionServerEventListenerRef;
|
||||
#[derive(Clone)]
|
||||
pub struct RegionServer {
|
||||
inner: Arc<RegionServerInner>,
|
||||
flight_compression: FlightCompression,
|
||||
}
|
||||
|
||||
pub struct RegionStat {
|
||||
@@ -93,6 +95,7 @@ impl RegionServer {
|
||||
query_engine: QueryEngineRef,
|
||||
runtime: Runtime,
|
||||
event_listener: RegionServerEventListenerRef,
|
||||
flight_compression: FlightCompression,
|
||||
) -> Self {
|
||||
Self::with_table_provider(
|
||||
query_engine,
|
||||
@@ -101,6 +104,7 @@ impl RegionServer {
|
||||
Arc::new(DummyTableProviderFactory),
|
||||
0,
|
||||
Duration::from_millis(0),
|
||||
flight_compression,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -111,6 +115,7 @@ impl RegionServer {
|
||||
table_provider_factory: TableProviderFactoryRef,
|
||||
max_concurrent_queries: usize,
|
||||
concurrent_query_limiter_timeout: Duration,
|
||||
flight_compression: FlightCompression,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(RegionServerInner::new(
|
||||
@@ -123,6 +128,7 @@ impl RegionServer {
|
||||
concurrent_query_limiter_timeout,
|
||||
),
|
||||
)),
|
||||
flight_compression,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -536,7 +542,11 @@ impl FlightCraft for RegionServer {
|
||||
.trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
|
||||
.await?;
|
||||
|
||||
let stream = Box::pin(FlightRecordBatchStream::new(result, tracing_context));
|
||||
let stream = Box::pin(FlightRecordBatchStream::new(
|
||||
result,
|
||||
tracing_context,
|
||||
self.flight_compression,
|
||||
));
|
||||
Ok(Response::new(stream))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,16 +19,16 @@ use std::time::Duration;
|
||||
use api::region::RegionResponse;
|
||||
use async_trait::async_trait;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_function::function::FunctionRef;
|
||||
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
|
||||
use common_function::function_factory::ScalarFunctionFactory;
|
||||
use common_query::Output;
|
||||
use common_runtime::runtime::{BuilderBuild, RuntimeTrait};
|
||||
use common_runtime::Runtime;
|
||||
use datafusion_expr::LogicalPlan;
|
||||
use datafusion_expr::{AggregateUDF, LogicalPlan};
|
||||
use query::dataframe::DataFrame;
|
||||
use query::planner::LogicalPlanner;
|
||||
use query::query_engine::{DescribeResult, QueryEngineState};
|
||||
use query::{QueryEngine, QueryEngineContext};
|
||||
use servers::grpc::FlightCompression;
|
||||
use session::context::QueryContextRef;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::region_engine::{
|
||||
@@ -76,9 +76,9 @@ impl QueryEngine for MockQueryEngine {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn register_aggregate_function(&self, _func: AggregateFunctionMetaRef) {}
|
||||
fn register_aggregate_function(&self, _func: AggregateUDF) {}
|
||||
|
||||
fn register_function(&self, _func: FunctionRef) {}
|
||||
fn register_scalar_function(&self, _func: ScalarFunctionFactory) {}
|
||||
|
||||
fn read_table(&self, _table: TableRef) -> query::error::Result<DataFrame> {
|
||||
unimplemented!()
|
||||
@@ -98,6 +98,7 @@ pub fn mock_region_server() -> RegionServer {
|
||||
Arc::new(MockQueryEngine),
|
||||
Runtime::builder().build().unwrap(),
|
||||
Box::new(NoopRegionServerEventListener),
|
||||
FlightCompression::default(),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -316,7 +316,7 @@ impl StreamingEngine {
|
||||
);
|
||||
|
||||
METRIC_FLOW_ROWS
|
||||
.with_label_values(&["out"])
|
||||
.with_label_values(&["out-streaming"])
|
||||
.inc_by(total_rows as u64);
|
||||
|
||||
let now = self.tick_manager.tick();
|
||||
|
||||
@@ -31,6 +31,7 @@ use common_runtime::JoinHandle;
|
||||
use common_telemetry::{error, info, trace, warn};
|
||||
use datatypes::value::Value;
|
||||
use futures::TryStreamExt;
|
||||
use greptime_proto::v1::flow::DirtyWindowRequest;
|
||||
use itertools::Itertools;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use session::context::QueryContextBuilder;
|
||||
@@ -47,7 +48,7 @@ use crate::error::{
|
||||
IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
|
||||
NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::metrics::METRIC_FLOW_TASK_COUNT;
|
||||
use crate::metrics::{METRIC_FLOW_ROWS, METRIC_FLOW_TASK_COUNT};
|
||||
use crate::repr::{self, DiffRow};
|
||||
use crate::{Error, FlowId};
|
||||
|
||||
@@ -690,6 +691,9 @@ impl FlowEngine for FlowDualEngine {
|
||||
let mut to_stream_engine = Vec::with_capacity(request.requests.len());
|
||||
let mut to_batch_engine = request.requests;
|
||||
|
||||
let mut batching_row_cnt = 0;
|
||||
let mut streaming_row_cnt = 0;
|
||||
|
||||
{
|
||||
// not locking this, or recover flows will be starved when also handling flow inserts
|
||||
let src_table2flow = self.src_table2flow.read().await;
|
||||
@@ -699,9 +703,11 @@ impl FlowEngine for FlowDualEngine {
|
||||
let is_in_stream = src_table2flow.in_stream(table_id);
|
||||
let is_in_batch = src_table2flow.in_batch(table_id);
|
||||
if is_in_stream {
|
||||
streaming_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
|
||||
to_stream_engine.push(req.clone());
|
||||
}
|
||||
if is_in_batch {
|
||||
batching_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
|
||||
return true;
|
||||
}
|
||||
if !is_in_batch && !is_in_stream {
|
||||
@@ -714,6 +720,14 @@ impl FlowEngine for FlowDualEngine {
|
||||
// can't use drop due to https://github.com/rust-lang/rust/pull/128846
|
||||
}
|
||||
|
||||
METRIC_FLOW_ROWS
|
||||
.with_label_values(&["in-streaming"])
|
||||
.inc_by(streaming_row_cnt as u64);
|
||||
|
||||
METRIC_FLOW_ROWS
|
||||
.with_label_values(&["in-batching"])
|
||||
.inc_by(batching_row_cnt as u64);
|
||||
|
||||
let streaming_engine = self.streaming_engine.clone();
|
||||
let stream_handler: JoinHandle<Result<(), Error>> =
|
||||
common_runtime::spawn_global(async move {
|
||||
@@ -839,6 +853,11 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
|
||||
.map(|_| Default::default())
|
||||
.map_err(to_meta_err(snafu::location!()))
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
|
||||
// todo: implement
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
/// return a function to convert `crate::error::Error` to `common_meta::error::Error`
|
||||
@@ -861,6 +880,98 @@ fn to_meta_err(
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl common_meta::node_manager::Flownode for StreamingEngine {
|
||||
async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
|
||||
let query_ctx = request
|
||||
.header
|
||||
.and_then(|h| h.query_context)
|
||||
.map(|ctx| ctx.into());
|
||||
match request.body {
|
||||
Some(flow_request::Body::Create(CreateRequest {
|
||||
flow_id: Some(task_id),
|
||||
source_table_ids,
|
||||
sink_table_name: Some(sink_table_name),
|
||||
create_if_not_exists,
|
||||
expire_after,
|
||||
comment,
|
||||
sql,
|
||||
flow_options,
|
||||
or_replace,
|
||||
})) => {
|
||||
let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
|
||||
let sink_table_name = [
|
||||
sink_table_name.catalog_name,
|
||||
sink_table_name.schema_name,
|
||||
sink_table_name.table_name,
|
||||
];
|
||||
let expire_after = expire_after.map(|e| e.value);
|
||||
let args = CreateFlowArgs {
|
||||
flow_id: task_id.id as u64,
|
||||
sink_table_name,
|
||||
source_table_ids,
|
||||
create_if_not_exists,
|
||||
or_replace,
|
||||
expire_after,
|
||||
comment: Some(comment),
|
||||
sql: sql.clone(),
|
||||
flow_options,
|
||||
query_ctx,
|
||||
};
|
||||
let ret = self
|
||||
.create_flow(args)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.with_context(|_| CreateFlowSnafu { sql: sql.clone() })
|
||||
.map_err(to_meta_err(snafu::location!()))?;
|
||||
METRIC_FLOW_TASK_COUNT.inc();
|
||||
Ok(FlowResponse {
|
||||
affected_flows: ret
|
||||
.map(|id| greptime_proto::v1::FlowId { id: id as u32 })
|
||||
.into_iter()
|
||||
.collect_vec(),
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
Some(flow_request::Body::Drop(DropRequest {
|
||||
flow_id: Some(flow_id),
|
||||
})) => {
|
||||
self.remove_flow(flow_id.id as u64)
|
||||
.await
|
||||
.map_err(to_meta_err(snafu::location!()))?;
|
||||
METRIC_FLOW_TASK_COUNT.dec();
|
||||
Ok(Default::default())
|
||||
}
|
||||
Some(flow_request::Body::Flush(FlushFlow {
|
||||
flow_id: Some(flow_id),
|
||||
})) => {
|
||||
let row = self
|
||||
.flush_flow_inner(flow_id.id as u64)
|
||||
.await
|
||||
.map_err(to_meta_err(snafu::location!()))?;
|
||||
Ok(FlowResponse {
|
||||
affected_flows: vec![flow_id],
|
||||
affected_rows: row as u64,
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_inserts(&self, request: InsertRequests) -> MetaResult<FlowResponse> {
|
||||
self.handle_inserts_inner(request)
|
||||
.await
|
||||
.map(|_| Default::default())
|
||||
.map_err(to_meta_err(snafu::location!()))
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
|
||||
// todo: implement
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowEngine for StreamingEngine {
|
||||
async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
|
||||
self.create_flow_inner(args).await
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::flow::{DirtyWindowRequests, FlowResponse};
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::ddl::create_flow::FlowType;
|
||||
@@ -29,8 +30,7 @@ use common_telemetry::{debug, info};
|
||||
use common_time::TimeToLive;
|
||||
use query::QueryEngineRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
use table::metadata::TableId;
|
||||
use store_api::storage::{RegionId, TableId};
|
||||
use tokio::sync::{oneshot, RwLock};
|
||||
|
||||
use crate::batching_mode::frontend_client::FrontendClient;
|
||||
@@ -42,6 +42,7 @@ use crate::error::{
|
||||
ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
|
||||
UnexpectedSnafu, UnsupportedSnafu,
|
||||
};
|
||||
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW_RANGE;
|
||||
use crate::{CreateFlowArgs, Error, FlowId, TableName};
|
||||
|
||||
/// Batching mode Engine, responsible for driving all the batching mode tasks
|
||||
@@ -77,6 +78,122 @@ impl BatchingEngine {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn handle_mark_dirty_time_window(
|
||||
&self,
|
||||
reqs: DirtyWindowRequests,
|
||||
) -> Result<FlowResponse, Error> {
|
||||
let table_info_mgr = self.table_meta.table_info_manager();
|
||||
|
||||
let mut group_by_table_id: HashMap<u32, Vec<_>> = HashMap::new();
|
||||
for r in reqs.requests {
|
||||
let tid = TableId::from(r.table_id);
|
||||
let entry = group_by_table_id.entry(tid).or_default();
|
||||
entry.extend(r.dirty_time_ranges);
|
||||
}
|
||||
let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
|
||||
let table_infos =
|
||||
table_info_mgr
|
||||
.batch_get(&tids)
|
||||
.await
|
||||
.with_context(|_| TableNotFoundMetaSnafu {
|
||||
msg: format!("Failed to get table info for table ids: {:?}", tids),
|
||||
})?;
|
||||
|
||||
let group_by_table_name = group_by_table_id
|
||||
.into_iter()
|
||||
.filter_map(|(id, rows)| {
|
||||
let table_name = table_infos.get(&id).map(|info| info.table_name());
|
||||
let Some(table_name) = table_name else {
|
||||
warn!("Failed to get table infos for table id: {:?}", id);
|
||||
return None;
|
||||
};
|
||||
let table_name = [
|
||||
table_name.catalog_name,
|
||||
table_name.schema_name,
|
||||
table_name.table_name,
|
||||
];
|
||||
let schema = &table_infos.get(&id).unwrap().table_info.meta.schema;
|
||||
let time_index_unit = schema.column_schemas[schema.timestamp_index.unwrap()]
|
||||
.data_type
|
||||
.as_timestamp()
|
||||
.unwrap()
|
||||
.unit();
|
||||
Some((table_name, (rows, time_index_unit)))
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
let group_by_table_name = Arc::new(group_by_table_name);
|
||||
|
||||
let mut handles = Vec::new();
|
||||
let tasks = self.tasks.read().await;
|
||||
|
||||
for (_flow_id, task) in tasks.iter() {
|
||||
let src_table_names = &task.config.source_table_names;
|
||||
|
||||
if src_table_names
|
||||
.iter()
|
||||
.all(|name| !group_by_table_name.contains_key(name))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
let group_by_table_name = group_by_table_name.clone();
|
||||
let task = task.clone();
|
||||
|
||||
let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
|
||||
let src_table_names = &task.config.source_table_names;
|
||||
let mut all_dirty_windows = vec![];
|
||||
for src_table_name in src_table_names {
|
||||
if let Some((window_ranges, unit)) = group_by_table_name.get(src_table_name) {
|
||||
let Some(expr) = &task.config.time_window_expr else {
|
||||
continue;
|
||||
};
|
||||
for window in window_ranges {
|
||||
let align_start = expr
|
||||
.eval(common_time::Timestamp::new(window.start_value, *unit))?
|
||||
.0
|
||||
.context(UnexpectedSnafu {
|
||||
reason: "Failed to eval start value",
|
||||
})?;
|
||||
|
||||
let align_end = expr
|
||||
.eval(common_time::Timestamp::new(window.end_value, *unit))?
|
||||
.1
|
||||
.context(UnexpectedSnafu {
|
||||
reason: "Failed to eval end value",
|
||||
})?;
|
||||
all_dirty_windows.push((align_start, align_end));
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut state = task.state.write().unwrap();
|
||||
let flow_id_label = task.config.flow_id.to_string();
|
||||
for (s, e) in all_dirty_windows {
|
||||
METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW_RANGE
|
||||
.with_label_values(&[&flow_id_label])
|
||||
.observe(e.sub(&s).unwrap_or_default().num_seconds() as f64);
|
||||
state.dirty_time_windows.add_window(s, Some(e));
|
||||
}
|
||||
Ok(())
|
||||
});
|
||||
handles.push(handle);
|
||||
}
|
||||
drop(tasks);
|
||||
for handle in handles {
|
||||
match handle.await {
|
||||
Err(e) => {
|
||||
warn!("Failed to handle inserts: {e}");
|
||||
}
|
||||
Ok(Ok(())) => (),
|
||||
Ok(Err(e)) => {
|
||||
warn!("Failed to handle inserts: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Default::default())
|
||||
}
|
||||
|
||||
pub async fn handle_inserts_inner(
|
||||
&self,
|
||||
request: api::v1::region::InsertRequests,
|
||||
|
||||
@@ -409,7 +409,7 @@ impl FrontendClient {
|
||||
let _guard = fe_stats.observe(&db.peer.addr, flow_id);
|
||||
|
||||
db.database
|
||||
.handle_with_retry(req.clone(), GRPC_MAX_RETRIES)
|
||||
.handle_with_retry(req.clone(), GRPC_MAX_RETRIES, &[("query_parallelism", "1")])
|
||||
.await
|
||||
.with_context(|_| InvalidRequestSnafu {
|
||||
context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
|
||||
|
||||
@@ -31,8 +31,9 @@ use crate::batching_mode::time_window::TimeWindowExpr;
|
||||
use crate::batching_mode::MIN_REFRESH_DURATION;
|
||||
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
|
||||
use crate::metrics::{
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE,
|
||||
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT,
|
||||
METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE,
|
||||
};
|
||||
use crate::{Error, FlowId};
|
||||
|
||||
@@ -58,6 +59,11 @@ pub struct TaskState {
|
||||
pub(crate) min_run_interval: Option<u64>,
|
||||
/// max filter number per query
|
||||
pub(crate) max_filter_num: Option<usize>,
|
||||
/// Current filter count, will grow when query succeeds(capped by `max_filter_num`),
|
||||
/// and reset to 1 when query fails.
|
||||
///
|
||||
/// This is useful for controlling resource usage
|
||||
pub(crate) cur_filter_cnt: usize,
|
||||
}
|
||||
impl TaskState {
|
||||
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
|
||||
@@ -71,6 +77,7 @@ impl TaskState {
|
||||
task_handle: None,
|
||||
min_run_interval: None,
|
||||
max_filter_num: None,
|
||||
cur_filter_cnt: 1,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -95,24 +102,32 @@ impl TaskState {
|
||||
pub fn get_next_start_query_time(
|
||||
&self,
|
||||
flow_id: FlowId,
|
||||
_time_window_size: &Option<Duration>,
|
||||
time_window_size: &Option<Duration>,
|
||||
max_timeout: Option<Duration>,
|
||||
) -> Instant {
|
||||
let next_duration = max_timeout
|
||||
.unwrap_or(self.last_query_duration)
|
||||
.min(self.last_query_duration)
|
||||
.max(
|
||||
self.min_run_interval
|
||||
// = last query duration, capped by [max(min_run_interval, time_window_size), max_timeout], note at most `max_timeout`
|
||||
let lower = self.min_run_interval
|
||||
.map(Duration::from_secs)
|
||||
.unwrap_or(MIN_REFRESH_DURATION),
|
||||
);
|
||||
.unwrap_or(MIN_REFRESH_DURATION).max(time_window_size.unwrap_or_default());
|
||||
let next_duration = self.last_query_duration.max(lower);
|
||||
let next_duration = if let Some(max_timeout) = max_timeout {
|
||||
next_duration.min(max_timeout)
|
||||
} else {
|
||||
next_duration
|
||||
};
|
||||
|
||||
// if have dirty time window, execute immediately to clean dirty time window
|
||||
if self.dirty_time_windows.windows.is_empty() {
|
||||
let cur_dirty_window_size = self.dirty_time_windows.window_size();
|
||||
let max_query_update_range = time_window_size.clone().unwrap_or_default().mul_f64(
|
||||
self.max_filter_num
|
||||
.unwrap_or(DirtyTimeWindows::MAX_FILTER_NUM) as f64,
|
||||
);
|
||||
if cur_dirty_window_size < max_query_update_range {
|
||||
self.last_update_time + next_duration
|
||||
} else {
|
||||
// if dirty time windows can't be clean up in one query, execute immediately to faster
|
||||
// clean up dirty time windows
|
||||
debug!(
|
||||
"Flow id = {}, still have {} dirty time window({:?}), execute immediately",
|
||||
"Flow id = {}, still have too many{} dirty time window({:?}), execute immediately",
|
||||
flow_id,
|
||||
self.dirty_time_windows.windows.len(),
|
||||
self.dirty_time_windows.windows
|
||||
@@ -131,6 +146,17 @@ pub struct DirtyTimeWindows {
|
||||
windows: BTreeMap<Timestamp, Option<Timestamp>>,
|
||||
}
|
||||
|
||||
/// Time windows that are being worked on, which are not dirty but are currently being processed
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct WorkingTimeWindows {
|
||||
/// windows's `start -> end` and non-overlapping
|
||||
/// `end` is exclusive(and optional)
|
||||
pub windows: BTreeMap<Timestamp, Option<Timestamp>>,
|
||||
/// Filter expression for the time windows
|
||||
/// This is used to filter the data in the time windows.
|
||||
pub filter: Option<datafusion_expr::Expr>,
|
||||
}
|
||||
|
||||
impl DirtyTimeWindows {
|
||||
/// Time window merge distance
|
||||
///
|
||||
@@ -152,10 +178,28 @@ impl DirtyTimeWindows {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn window_size(&self) -> Duration {
|
||||
let mut ret = Duration::from_secs(0);
|
||||
for (start, end) in &self.windows {
|
||||
if let Some(end) = end {
|
||||
if let Some(duration) = end.sub(start) {
|
||||
ret += duration.to_std().unwrap_or_default();
|
||||
}
|
||||
}
|
||||
}
|
||||
ret
|
||||
}
|
||||
|
||||
pub fn add_window(&mut self, start: Timestamp, end: Option<Timestamp>) {
|
||||
self.windows.insert(start, end);
|
||||
}
|
||||
|
||||
pub fn add_windows(&mut self, windows: BTreeMap<Timestamp, Option<Timestamp>>) {
|
||||
for (start, end) in windows {
|
||||
self.windows.insert(start, end);
|
||||
}
|
||||
}
|
||||
|
||||
/// Clean all dirty time windows, useful when can't found time window expr
|
||||
pub fn clean(&mut self) {
|
||||
self.windows.clear();
|
||||
@@ -174,7 +218,7 @@ impl DirtyTimeWindows {
|
||||
window_cnt: usize,
|
||||
flow_id: FlowId,
|
||||
task_ctx: Option<&BatchingTask>,
|
||||
) -> Result<Option<datafusion_expr::Expr>, Error> {
|
||||
) -> Result<WorkingTimeWindows, Error> {
|
||||
debug!(
|
||||
"expire_lower_bound: {:?}, window_size: {:?}",
|
||||
expire_lower_bound.map(|t| t.to_iso8601_string()),
|
||||
@@ -211,51 +255,63 @@ impl DirtyTimeWindows {
|
||||
|
||||
// get the first `window_cnt` time windows
|
||||
let max_time_range = window_size * window_cnt as i32;
|
||||
let nth = {
|
||||
let mut cur_time_range = chrono::Duration::zero();
|
||||
let mut nth_key = None;
|
||||
for (idx, (start, end)) in self.windows.iter().enumerate() {
|
||||
// if time range is too long, stop
|
||||
if cur_time_range > max_time_range {
|
||||
nth_key = Some(*start);
|
||||
break;
|
||||
}
|
||||
|
||||
// if we have enough time windows, stop
|
||||
if idx >= window_cnt {
|
||||
nth_key = Some(*start);
|
||||
break;
|
||||
}
|
||||
let mut to_be_query = BTreeMap::new();
|
||||
let mut new_windows = self.windows.clone();
|
||||
let mut cur_time_range = chrono::Duration::zero();
|
||||
for (idx, (start, end)) in self.windows.iter().enumerate() {
|
||||
let first_end = start
|
||||
.add_duration(window_size.to_std().unwrap())
|
||||
.context(TimeSnafu)?;
|
||||
let end = end.unwrap_or(first_end);
|
||||
|
||||
if let Some(end) = end {
|
||||
if let Some(x) = end.sub(start) {
|
||||
cur_time_range += x;
|
||||
}
|
||||
}
|
||||
// if time range is too long, stop
|
||||
if cur_time_range >= max_time_range {
|
||||
break;
|
||||
}
|
||||
|
||||
nth_key
|
||||
};
|
||||
let first_nth = {
|
||||
if let Some(nth) = nth {
|
||||
let mut after = self.windows.split_off(&nth);
|
||||
std::mem::swap(&mut self.windows, &mut after);
|
||||
|
||||
after
|
||||
} else {
|
||||
std::mem::take(&mut self.windows)
|
||||
// if we have enough time windows, stop
|
||||
if idx >= window_cnt {
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(x) = end.sub(start) {
|
||||
if cur_time_range + x <= max_time_range {
|
||||
to_be_query.insert(*start, Some(end));
|
||||
new_windows.remove(start);
|
||||
cur_time_range += x;
|
||||
} else {
|
||||
// too large a window, split it
|
||||
// split at window_size * times
|
||||
let surplus = max_time_range - cur_time_range;
|
||||
let times = surplus.num_seconds() / window_size.num_seconds();
|
||||
|
||||
let split_offset = window_size * times as i32;
|
||||
let split_at = start
|
||||
.add_duration(split_offset.to_std().unwrap())
|
||||
.context(TimeSnafu)?;
|
||||
to_be_query.insert(*start, Some(split_at));
|
||||
|
||||
// remove the original window
|
||||
new_windows.remove(start);
|
||||
new_windows.insert(split_at, Some(end));
|
||||
cur_time_range += split_offset;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.windows = new_windows;
|
||||
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
|
||||
.with_label_values(&[flow_id.to_string().as_str()])
|
||||
.observe(first_nth.len() as f64);
|
||||
.observe(to_be_query.len() as f64);
|
||||
|
||||
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT
|
||||
.with_label_values(&[flow_id.to_string().as_str()])
|
||||
.observe(self.windows.len() as f64);
|
||||
|
||||
let full_time_range = first_nth
|
||||
let full_time_range = to_be_query
|
||||
.iter()
|
||||
.fold(chrono::Duration::zero(), |acc, (start, end)| {
|
||||
if let Some(end) = end {
|
||||
@@ -265,12 +321,27 @@ impl DirtyTimeWindows {
|
||||
}
|
||||
})
|
||||
.num_seconds() as f64;
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE
|
||||
.with_label_values(&[flow_id.to_string().as_str()])
|
||||
.observe(full_time_range);
|
||||
|
||||
let stalled_time_range =
|
||||
self.windows
|
||||
.iter()
|
||||
.fold(chrono::Duration::zero(), |acc, (start, end)| {
|
||||
if let Some(end) = end {
|
||||
acc + end.sub(start).unwrap_or(chrono::Duration::zero())
|
||||
} else {
|
||||
acc
|
||||
}
|
||||
});
|
||||
|
||||
METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE
|
||||
.with_label_values(&[flow_id.to_string().as_str()])
|
||||
.observe(stalled_time_range.num_seconds() as f64);
|
||||
|
||||
let mut expr_lst = vec![];
|
||||
for (start, end) in first_nth.into_iter() {
|
||||
for (start, end) in to_be_query.clone().into_iter() {
|
||||
// align using time window exprs
|
||||
let (start, end) = if let Some(ctx) = task_ctx {
|
||||
let Some(time_window_expr) = &ctx.config.time_window_expr else {
|
||||
@@ -302,7 +373,12 @@ impl DirtyTimeWindows {
|
||||
expr_lst.push(expr);
|
||||
}
|
||||
let expr = expr_lst.into_iter().reduce(|a, b| a.or(b));
|
||||
Ok(expr)
|
||||
|
||||
let working = WorkingTimeWindows {
|
||||
windows: to_be_query,
|
||||
filter: expr,
|
||||
};
|
||||
Ok(working)
|
||||
}
|
||||
|
||||
fn align_time_window(
|
||||
@@ -504,6 +580,64 @@ mod test {
|
||||
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
|
||||
)
|
||||
),
|
||||
// split range
|
||||
(
|
||||
Vec::from_iter((0..20).map(|i|Timestamp::new_second(i*3)).chain(std::iter::once(
|
||||
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
|
||||
))),
|
||||
(chrono::Duration::seconds(3), None),
|
||||
BTreeMap::from([
|
||||
(
|
||||
Timestamp::new_second(0),
|
||||
Some(Timestamp::new_second(
|
||||
60
|
||||
)),
|
||||
),
|
||||
(
|
||||
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
|
||||
Some(Timestamp::new_second(
|
||||
60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1) + 3
|
||||
)),
|
||||
)]),
|
||||
Some(
|
||||
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
|
||||
)
|
||||
),
|
||||
// split 2 min into 1 min
|
||||
(
|
||||
Vec::from_iter((0..40).map(|i|Timestamp::new_second(i*3))),
|
||||
(chrono::Duration::seconds(3), None),
|
||||
BTreeMap::from([
|
||||
(
|
||||
Timestamp::new_second(0),
|
||||
Some(Timestamp::new_second(
|
||||
40 * 3
|
||||
)),
|
||||
)]),
|
||||
Some(
|
||||
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
|
||||
)
|
||||
),
|
||||
// split 3s + 1min into 3s + 57s
|
||||
(
|
||||
Vec::from_iter(std::iter::once(Timestamp::new_second(0)).chain((0..40).map(|i|Timestamp::new_second(20+i*3)))),
|
||||
(chrono::Duration::seconds(3), None),
|
||||
BTreeMap::from([
|
||||
(
|
||||
Timestamp::new_second(0),
|
||||
Some(Timestamp::new_second(
|
||||
3
|
||||
)),
|
||||
),(
|
||||
Timestamp::new_second(20),
|
||||
Some(Timestamp::new_second(
|
||||
140
|
||||
)),
|
||||
)]),
|
||||
Some(
|
||||
"(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:03' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:00:20' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:17' AS TIMESTAMP))))",
|
||||
)
|
||||
),
|
||||
// expired
|
||||
(
|
||||
vec![
|
||||
@@ -520,6 +654,8 @@ mod test {
|
||||
None
|
||||
),
|
||||
];
|
||||
// let len = testcases.len();
|
||||
// let testcases = testcases[(len - 2)..(len - 1)].to_vec();
|
||||
for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
|
||||
testcases
|
||||
{
|
||||
@@ -538,7 +674,8 @@ mod test {
|
||||
0,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
.unwrap()
|
||||
.filter;
|
||||
|
||||
let unparser = datafusion::sql::unparser::Unparser::default();
|
||||
let to_sql = filter_expr
|
||||
|
||||
@@ -46,7 +46,7 @@ use tokio::time::Instant;
|
||||
|
||||
use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL};
|
||||
use crate::batching_mode::frontend_client::FrontendClient;
|
||||
use crate::batching_mode::state::{DirtyTimeWindows, TaskState};
|
||||
use crate::batching_mode::state::{DirtyTimeWindows, TaskState, WorkingTimeWindows};
|
||||
use crate::batching_mode::time_window::TimeWindowExpr;
|
||||
use crate::batching_mode::utils::{
|
||||
get_table_info_df_schema, sql_to_df_plan, AddAutoColumnRewriter, AddFilterRewriter,
|
||||
@@ -61,7 +61,9 @@ use crate::error::{
|
||||
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::metrics::{
|
||||
METRIC_FLOW_BATCHING_ENGINE_CUR_WND_CNT, METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT,
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
|
||||
METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT, METRIC_FLOW_ROWS,
|
||||
};
|
||||
use crate::{Error, FlowId};
|
||||
|
||||
@@ -108,6 +110,13 @@ enum QueryType {
|
||||
Sql,
|
||||
}
|
||||
|
||||
/// A plan with working time windows, used to track the time windows that are currently being processed
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PlanWithWindow {
|
||||
pub plan: Option<LogicalPlan>,
|
||||
pub working_windows: WorkingTimeWindows,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct BatchingTask {
|
||||
pub config: Arc<TaskConfig>,
|
||||
@@ -216,30 +225,29 @@ impl BatchingTask {
|
||||
engine: &QueryEngineRef,
|
||||
frontend_client: &Arc<FrontendClient>,
|
||||
) -> Result<Option<(u32, Duration)>, Error> {
|
||||
if let Some(new_query) = self.gen_insert_plan(engine).await? {
|
||||
if let Some(new_query) = self.gen_insert_plan(engine).await?.plan {
|
||||
debug!("Generate new query: {}", new_query);
|
||||
self.execute_logical_plan(frontend_client, &new_query).await
|
||||
self.execute_logical_plan(frontend_client, &new_query)
|
||||
.await
|
||||
.map(Some)
|
||||
} else {
|
||||
debug!("Generate no query");
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn gen_insert_plan(
|
||||
&self,
|
||||
engine: &QueryEngineRef,
|
||||
) -> Result<Option<LogicalPlan>, Error> {
|
||||
pub async fn gen_insert_plan(&self, engine: &QueryEngineRef) -> Result<PlanWithWindow, Error> {
|
||||
let (table, df_schema) = get_table_info_df_schema(
|
||||
self.config.catalog_manager.clone(),
|
||||
self.config.sink_table_name.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let new_query = self
|
||||
let new_query_info = self
|
||||
.gen_query_with_time_window(engine.clone(), &table.meta.schema)
|
||||
.await?;
|
||||
|
||||
let insert_into = if let Some((new_query, _column_cnt)) = new_query {
|
||||
let insert_into = if let Some(new_query) = new_query_info.plan {
|
||||
// first check if all columns in input query exists in sink table
|
||||
// since insert into ref to names in record batch generate by given query
|
||||
let table_columns = df_schema
|
||||
@@ -270,12 +278,15 @@ impl BatchingTask {
|
||||
Arc::new(new_query),
|
||||
))
|
||||
} else {
|
||||
return Ok(None);
|
||||
return Ok(new_query_info);
|
||||
};
|
||||
let insert_into = insert_into.recompute_schema().context(DatafusionSnafu {
|
||||
context: "Failed to recompute schema",
|
||||
})?;
|
||||
Ok(Some(insert_into))
|
||||
Ok(PlanWithWindow {
|
||||
plan: Some(insert_into),
|
||||
working_windows: new_query_info.working_windows,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn create_table(
|
||||
@@ -295,7 +306,7 @@ impl BatchingTask {
|
||||
&self,
|
||||
frontend_client: &Arc<FrontendClient>,
|
||||
plan: &LogicalPlan,
|
||||
) -> Result<Option<(u32, Duration)>, Error> {
|
||||
) -> Result<(u32, Duration), Error> {
|
||||
let instant = Instant::now();
|
||||
let flow_id = self.config.flow_id;
|
||||
|
||||
@@ -377,6 +388,9 @@ impl BatchingTask {
|
||||
"Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
|
||||
elapsed
|
||||
);
|
||||
METRIC_FLOW_ROWS
|
||||
.with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
|
||||
.inc_by(*affected_rows as _);
|
||||
} else if let Err(err) = &res {
|
||||
warn!(
|
||||
"Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
|
||||
@@ -405,7 +419,7 @@ impl BatchingTask {
|
||||
|
||||
let res = res?;
|
||||
|
||||
Ok(Some((res, elapsed)))
|
||||
Ok((res, elapsed))
|
||||
}
|
||||
|
||||
/// start executing query in a loop, break when receive shutdown signal
|
||||
@@ -416,6 +430,7 @@ impl BatchingTask {
|
||||
engine: QueryEngineRef,
|
||||
frontend_client: Arc<FrontendClient>,
|
||||
) {
|
||||
let flow_id_str = self.config.flow_id.to_string();
|
||||
loop {
|
||||
// first check if shutdown signal is received
|
||||
// if so, break the loop
|
||||
@@ -433,8 +448,11 @@ impl BatchingTask {
|
||||
Err(TryRecvError::Empty) => (),
|
||||
}
|
||||
}
|
||||
METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
|
||||
.with_label_values(&[&flow_id_str])
|
||||
.inc();
|
||||
|
||||
let new_query = match self.gen_insert_plan(&engine).await {
|
||||
let new_query_info = match self.gen_insert_plan(&engine).await {
|
||||
Ok(new_query) => new_query,
|
||||
Err(err) => {
|
||||
common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id);
|
||||
@@ -444,8 +462,10 @@ impl BatchingTask {
|
||||
}
|
||||
};
|
||||
|
||||
let res = if let Some(new_query) = &new_query {
|
||||
self.execute_logical_plan(&frontend_client, new_query).await
|
||||
let res = if let Some(new_query) = &new_query_info.plan {
|
||||
self.execute_logical_plan(&frontend_client, new_query)
|
||||
.await
|
||||
.map(Some)
|
||||
} else {
|
||||
Ok(None)
|
||||
};
|
||||
@@ -454,7 +474,17 @@ impl BatchingTask {
|
||||
// normal execute, sleep for some time before doing next query
|
||||
Ok(Some(_)) => {
|
||||
let sleep_until = {
|
||||
let state = self.state.write().unwrap();
|
||||
let mut state = self.state.write().unwrap();
|
||||
|
||||
// double cur_filter_cnt
|
||||
state.cur_filter_cnt = state.cur_filter_cnt.saturating_mul(2).min(
|
||||
state
|
||||
.max_filter_num
|
||||
.unwrap_or(DirtyTimeWindows::MAX_FILTER_NUM),
|
||||
);
|
||||
METRIC_FLOW_BATCHING_ENGINE_CUR_WND_CNT
|
||||
.with_label_values(&[&flow_id_str])
|
||||
.set(state.cur_filter_cnt as i64);
|
||||
|
||||
state.get_next_start_query_time(
|
||||
self.config.flow_id,
|
||||
@@ -479,7 +509,10 @@ impl BatchingTask {
|
||||
}
|
||||
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
|
||||
Err(err) => {
|
||||
match new_query {
|
||||
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
|
||||
.with_label_values(&[&flow_id_str])
|
||||
.inc();
|
||||
match new_query_info.plan {
|
||||
Some(query) => {
|
||||
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)
|
||||
}
|
||||
@@ -487,6 +520,17 @@ impl BatchingTask {
|
||||
common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
|
||||
}
|
||||
}
|
||||
{
|
||||
// return working windows to dirty windows, and reset current filter cnt so next time we generate query only generate a small query
|
||||
let mut state = self.state.write().unwrap();
|
||||
state
|
||||
.dirty_time_windows
|
||||
.add_windows(new_query_info.working_windows.windows);
|
||||
state.cur_filter_cnt = 1;
|
||||
METRIC_FLOW_BATCHING_ENGINE_CUR_WND_CNT
|
||||
.with_label_values(&[&flow_id_str])
|
||||
.set(state.cur_filter_cnt as i64);
|
||||
}
|
||||
// also sleep for a little while before try again to prevent flooding logs
|
||||
tokio::time::sleep(MIN_REFRESH_DURATION).await;
|
||||
}
|
||||
@@ -515,7 +559,7 @@ impl BatchingTask {
|
||||
&self,
|
||||
engine: QueryEngineRef,
|
||||
sink_table_schema: &Arc<Schema>,
|
||||
) -> Result<Option<(LogicalPlan, usize)>, Error> {
|
||||
) -> Result<PlanWithWindow, Error> {
|
||||
let query_ctx = self.state.read().unwrap().query_ctx.clone();
|
||||
let start = SystemTime::now();
|
||||
let since_the_epoch = start
|
||||
@@ -528,7 +572,6 @@ impl BatchingTask {
|
||||
.unwrap_or(u64::MIN);
|
||||
|
||||
let low_bound = Timestamp::new_second(low_bound as i64);
|
||||
let schema_len = self.config.output_schema.fields().len();
|
||||
|
||||
let expire_time_window_bound = self
|
||||
.config
|
||||
@@ -561,10 +604,12 @@ impl BatchingTask {
|
||||
context: format!("Failed to rewrite plan:\n {}\n", plan),
|
||||
})?
|
||||
.data;
|
||||
let schema_len = plan.schema().fields().len();
|
||||
|
||||
// since no time window lower/upper bound is found, just return the original query(with auto columns)
|
||||
return Ok(Some((plan, schema_len)));
|
||||
return Ok(PlanWithWindow {
|
||||
plan: Some(plan),
|
||||
working_windows: WorkingTimeWindows::default(),
|
||||
});
|
||||
};
|
||||
|
||||
debug!(
|
||||
@@ -586,16 +631,14 @@ impl BatchingTask {
|
||||
),
|
||||
})?;
|
||||
|
||||
let expr = {
|
||||
let working_windows = {
|
||||
let mut state = self.state.write().unwrap();
|
||||
let max_window_cnt = state
|
||||
.max_filter_num
|
||||
.unwrap_or(DirtyTimeWindows::MAX_FILTER_NUM);
|
||||
let cur_wnd_cnt = state.cur_filter_cnt;
|
||||
state.dirty_time_windows.gen_filter_exprs(
|
||||
&col_name,
|
||||
Some(l),
|
||||
window_size,
|
||||
max_window_cnt,
|
||||
cur_wnd_cnt,
|
||||
self.config.flow_id,
|
||||
Some(self),
|
||||
)?
|
||||
@@ -604,7 +647,9 @@ impl BatchingTask {
|
||||
debug!(
|
||||
"Flow id={:?}, Generated filter expr: {:?}",
|
||||
self.config.flow_id,
|
||||
expr.as_ref()
|
||||
working_windows
|
||||
.filter
|
||||
.as_ref()
|
||||
.map(|expr| expr_to_sql(expr).with_context(|_| DatafusionSnafu {
|
||||
context: format!("Failed to generate filter expr from {expr:?}"),
|
||||
}))
|
||||
@@ -612,13 +657,16 @@ impl BatchingTask {
|
||||
.map(|s| s.to_string())
|
||||
);
|
||||
|
||||
let Some(expr) = expr else {
|
||||
let Some(expr) = &working_windows.filter else {
|
||||
// no new data, hence no need to update
|
||||
debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
|
||||
return Ok(None);
|
||||
return Ok(PlanWithWindow {
|
||||
plan: None,
|
||||
working_windows,
|
||||
});
|
||||
};
|
||||
|
||||
let mut add_filter = AddFilterRewriter::new(expr);
|
||||
let mut add_filter = AddFilterRewriter::new(expr.clone());
|
||||
let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
|
||||
|
||||
let plan =
|
||||
@@ -634,7 +682,10 @@ impl BatchingTask {
|
||||
// only apply optimize after complex rewrite is done
|
||||
let new_plan = apply_df_optimizer(rewrite).await?;
|
||||
|
||||
Ok(Some((new_plan, schema_len)))
|
||||
Ok(PlanWithWindow {
|
||||
plan: Some(new_plan),
|
||||
working_windows,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -58,14 +58,50 @@ lazy_static! {
|
||||
vec![0.0, 5., 10., 20., 40.]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE: HistogramVec =
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE: HistogramVec =
|
||||
register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_query_time_range_secs",
|
||||
"flow batching engine query time range(seconds)",
|
||||
"greptime_flow_batching_engine_query_window_size_secs",
|
||||
"flow batching engine query window size(seconds)",
|
||||
&["flow_id"],
|
||||
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE: HistogramVec =
|
||||
register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_stalled_window_size_secs",
|
||||
"flow batching engine stalled window size(seconds)",
|
||||
&["flow_id"],
|
||||
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW_RANGE: HistogramVec =
|
||||
register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_bulk_mark_time_window_range_secs",
|
||||
"flow batching engine query time window range marked by bulk memtable in seconds",
|
||||
&["flow_id"],
|
||||
vec![0.0, 60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT: IntCounterVec =
|
||||
register_int_counter_vec!(
|
||||
"greptime_flow_batching_start_query_count",
|
||||
"flow batching engine started query count",
|
||||
&["flow_id"],
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT: IntCounterVec =
|
||||
register_int_counter_vec!(
|
||||
"greptime_flow_batching_error_count",
|
||||
"flow batching engine error count per flow id",
|
||||
&["flow_id"]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_CUR_WND_CNT: IntGaugeVec = register_int_gauge_vec!(
|
||||
"greptime_flow_batching_current_window_count",
|
||||
"flow batching engine current query window count per flow id",
|
||||
&["flow_id"]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD: HistogramVec =
|
||||
register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_guess_fe_load",
|
||||
@@ -78,7 +114,7 @@ lazy_static! {
|
||||
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
|
||||
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(
|
||||
"greptime_flow_processed_rows",
|
||||
"Count of rows flowing through the system",
|
||||
"Count of rows flowing through the system.",
|
||||
&["direction"]
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::flow::DirtyWindowRequests;
|
||||
use api::v1::{RowDeleteRequests, RowInsertRequests};
|
||||
use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
|
||||
use catalog::CatalogManagerRef;
|
||||
@@ -136,6 +137,18 @@ impl flow_server::Flow for FlowService {
|
||||
.map(Response::new)
|
||||
.map_err(to_status_with_last_err)
|
||||
}
|
||||
|
||||
async fn handle_mark_dirty_time_window(
|
||||
&self,
|
||||
reqs: Request<DirtyWindowRequests>,
|
||||
) -> Result<Response<FlowResponse>, Status> {
|
||||
self.dual_engine
|
||||
.batching_engine()
|
||||
.handle_mark_dirty_time_window(reqs.into_inner())
|
||||
.await
|
||||
.map(Response::new)
|
||||
.map_err(to_status_with_last_err)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
||||
@@ -17,7 +17,7 @@ use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_function::function::FunctionContext;
|
||||
use common_function::function::{FunctionContext, FunctionRef};
|
||||
use datafusion_substrait::extensions::Extensions;
|
||||
use datatypes::data_type::ConcreteDataType as CDT;
|
||||
use query::QueryEngine;
|
||||
@@ -108,9 +108,13 @@ impl FunctionExtensions {
|
||||
|
||||
/// register flow-specific functions to the query engine
|
||||
pub fn register_function_to_query_engine(engine: &Arc<dyn QueryEngine>) {
|
||||
engine.register_function(Arc::new(TumbleFunction::new("tumble")));
|
||||
engine.register_function(Arc::new(TumbleFunction::new(TUMBLE_START)));
|
||||
engine.register_function(Arc::new(TumbleFunction::new(TUMBLE_END)));
|
||||
let tumble_fn = Arc::new(TumbleFunction::new("tumble")) as FunctionRef;
|
||||
let tumble_start_fn = Arc::new(TumbleFunction::new(TUMBLE_START)) as FunctionRef;
|
||||
let tumble_end_fn = Arc::new(TumbleFunction::new(TUMBLE_END)) as FunctionRef;
|
||||
|
||||
engine.register_scalar_function(tumble_fn.into());
|
||||
engine.register_scalar_function(tumble_start_fn.into());
|
||||
engine.register_scalar_function(tumble_end_fn.into());
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -35,8 +35,8 @@ use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
use servers::query_handler::sql::SqlQueryHandler;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use table::metadata::TableId;
|
||||
use table::table_name::TableName;
|
||||
use table::TableRef;
|
||||
|
||||
use crate::error::{
|
||||
CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu,
|
||||
@@ -235,34 +235,33 @@ impl GrpcQueryHandler for Instance {
|
||||
|
||||
async fn put_record_batch(
|
||||
&self,
|
||||
table: &TableName,
|
||||
table_id: &mut Option<TableId>,
|
||||
table_name: &TableName,
|
||||
table_ref: &mut Option<TableRef>,
|
||||
decoder: &mut FlightDecoder,
|
||||
data: FlightData,
|
||||
) -> Result<AffectedRows> {
|
||||
let table_id = if let Some(table_id) = table_id {
|
||||
*table_id
|
||||
let table = if let Some(table) = table_ref {
|
||||
table.clone()
|
||||
} else {
|
||||
let table = self
|
||||
.catalog_manager()
|
||||
.table(
|
||||
&table.catalog_name,
|
||||
&table.schema_name,
|
||||
&table.table_name,
|
||||
&table_name.catalog_name,
|
||||
&table_name.schema_name,
|
||||
&table_name.table_name,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: table.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
})?;
|
||||
let id = table.table_info().table_id();
|
||||
*table_id = Some(id);
|
||||
id
|
||||
*table_ref = Some(table.clone());
|
||||
table
|
||||
};
|
||||
|
||||
self.inserter
|
||||
.handle_bulk_insert(table_id, decoder, data)
|
||||
.handle_bulk_insert(table, decoder, data)
|
||||
.await
|
||||
.context(TableOperationSnafu)
|
||||
}
|
||||
|
||||
@@ -130,7 +130,13 @@ impl JaegerQueryHandler for Instance {
|
||||
.await?)
|
||||
}
|
||||
|
||||
async fn get_trace(&self, ctx: QueryContextRef, trace_id: &str) -> ServerResult<Output> {
|
||||
async fn get_trace(
|
||||
&self,
|
||||
ctx: QueryContextRef,
|
||||
trace_id: &str,
|
||||
start_time: Option<i64>,
|
||||
end_time: Option<i64>,
|
||||
) -> ServerResult<Output> {
|
||||
// It's equivalent to
|
||||
//
|
||||
// ```
|
||||
@@ -139,13 +145,25 @@ impl JaegerQueryHandler for Instance {
|
||||
// FROM
|
||||
// {db}.{trace_table}
|
||||
// WHERE
|
||||
// trace_id = '{trace_id}'
|
||||
// trace_id = '{trace_id}' AND
|
||||
// timestamp >= {start_time} AND
|
||||
// timestamp <= {end_time}
|
||||
// ORDER BY
|
||||
// timestamp DESC
|
||||
// ```.
|
||||
let selects = vec![wildcard()];
|
||||
|
||||
let filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
|
||||
let mut filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
|
||||
|
||||
if let Some(start_time) = start_time {
|
||||
// Microseconds to nanoseconds.
|
||||
filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time * 1_000)));
|
||||
}
|
||||
|
||||
if let Some(end_time) = end_time {
|
||||
// Microseconds to nanoseconds.
|
||||
filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time * 1_000)));
|
||||
}
|
||||
|
||||
Ok(query_trace_table(
|
||||
ctx,
|
||||
|
||||
@@ -154,6 +154,7 @@ where
|
||||
ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
|
||||
user_provider.clone(),
|
||||
runtime,
|
||||
opts.grpc.flight_compression,
|
||||
);
|
||||
|
||||
let grpc_server = builder
|
||||
|
||||
@@ -31,8 +31,6 @@ use common_meta::kv_backend::rds::MySqlStore;
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
use common_meta::kv_backend::rds::PgStore;
|
||||
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
use common_telemetry::error;
|
||||
use common_telemetry::info;
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
use deadpool_postgres::{Config, Runtime};
|
||||
@@ -144,7 +142,8 @@ impl MetasrvInstance {
|
||||
let (serve_state_tx, serve_state_rx) = oneshot::channel();
|
||||
|
||||
let socket_addr =
|
||||
bootstrap_metasrv_with_router(&self.opts.bind_addr, router, serve_state_tx, rx).await?;
|
||||
bootstrap_metasrv_with_router(&self.opts.grpc.bind_addr, router, serve_state_tx, rx)
|
||||
.await?;
|
||||
self.bind_addr = Some(socket_addr);
|
||||
|
||||
let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu {
|
||||
@@ -260,7 +259,7 @@ pub async fn metasrv_builder(
|
||||
let etcd_client = create_etcd_client(&opts.store_addrs).await?;
|
||||
let kv_backend = EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops);
|
||||
let election = EtcdElection::with_etcd_client(
|
||||
&opts.server_addr,
|
||||
&opts.grpc.server_addr,
|
||||
etcd_client,
|
||||
opts.store_key_prefix.clone(),
|
||||
)
|
||||
@@ -270,22 +269,41 @@ pub async fn metasrv_builder(
|
||||
}
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
(None, BackendImpl::PostgresStore) => {
|
||||
let pool = create_postgres_pool(&opts.store_addrs).await?;
|
||||
let kv_backend = PgStore::with_pg_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
|
||||
.await
|
||||
.context(error::KvBackendSnafu)?;
|
||||
// Client for election should be created separately since we need a different session keep-alive idle time.
|
||||
let election_client = create_postgres_client(opts).await?;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_meta::distributed_time_constants::POSTGRES_KEEP_ALIVE_SECS;
|
||||
|
||||
use crate::election::rds::postgres::ElectionPgClient;
|
||||
|
||||
let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
|
||||
let execution_timeout = Duration::from_secs(META_LEASE_SECS);
|
||||
let statement_timeout = Duration::from_secs(META_LEASE_SECS);
|
||||
let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
|
||||
|
||||
let mut cfg = Config::new();
|
||||
cfg.keepalives = Some(true);
|
||||
cfg.keepalives_idle = Some(Duration::from_secs(POSTGRES_KEEP_ALIVE_SECS));
|
||||
// We use a separate pool for election since we need a different session keep-alive idle time.
|
||||
let pool = create_postgres_pool_with(&opts.store_addrs, cfg).await?;
|
||||
|
||||
let election_client =
|
||||
ElectionPgClient::new(pool, execution_timeout, meta_lease_ttl, statement_timeout)?;
|
||||
let election = PgElection::with_pg_client(
|
||||
opts.server_addr.clone(),
|
||||
opts.grpc.server_addr.clone(),
|
||||
election_client,
|
||||
opts.store_key_prefix.clone(),
|
||||
CANDIDATE_LEASE_SECS,
|
||||
META_LEASE_SECS,
|
||||
candidate_lease_ttl,
|
||||
meta_lease_ttl,
|
||||
&opts.meta_table_name,
|
||||
opts.meta_election_lock_id,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let pool = create_postgres_pool(&opts.store_addrs).await?;
|
||||
let kv_backend = PgStore::with_pg_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
|
||||
.await
|
||||
.context(error::KvBackendSnafu)?;
|
||||
|
||||
(kv_backend, Some(election))
|
||||
}
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
@@ -299,7 +317,7 @@ pub async fn metasrv_builder(
|
||||
let election_table_name = opts.meta_table_name.clone() + "_election";
|
||||
let election_client = create_mysql_client(opts).await?;
|
||||
let election = MySqlElection::with_mysql_client(
|
||||
opts.server_addr.clone(),
|
||||
opts.grpc.server_addr.clone(),
|
||||
election_client,
|
||||
opts.store_key_prefix.clone(),
|
||||
CANDIDATE_LEASE_SECS,
|
||||
@@ -372,31 +390,24 @@ pub async fn create_etcd_client(store_addrs: &[String]) -> Result<Client> {
|
||||
}
|
||||
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
async fn create_postgres_client(opts: &MetasrvOptions) -> Result<tokio_postgres::Client> {
|
||||
let postgres_url = opts
|
||||
.store_addrs
|
||||
.first()
|
||||
.context(error::InvalidArgumentsSnafu {
|
||||
err_msg: "empty store addrs",
|
||||
})?;
|
||||
let (client, connection) = tokio_postgres::connect(postgres_url, NoTls)
|
||||
.await
|
||||
.context(error::ConnectPostgresSnafu)?;
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
error!(e; "connection error");
|
||||
}
|
||||
});
|
||||
Ok(client)
|
||||
/// Creates a pool for the Postgres backend.
|
||||
///
|
||||
/// It only use first store addr to create a pool.
|
||||
pub async fn create_postgres_pool(store_addrs: &[String]) -> Result<deadpool_postgres::Pool> {
|
||||
create_postgres_pool_with(store_addrs, Config::new()).await
|
||||
}
|
||||
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
pub async fn create_postgres_pool(store_addrs: &[String]) -> Result<deadpool_postgres::Pool> {
|
||||
/// Creates a pool for the Postgres backend.
|
||||
///
|
||||
/// It only use first store addr to create a pool, and use the given config to create a pool.
|
||||
pub async fn create_postgres_pool_with(
|
||||
store_addrs: &[String],
|
||||
mut cfg: Config,
|
||||
) -> Result<deadpool_postgres::Pool> {
|
||||
let postgres_url = store_addrs.first().context(error::InvalidArgumentsSnafu {
|
||||
err_msg: "empty store addrs",
|
||||
})?;
|
||||
let mut cfg = Config::new();
|
||||
cfg.url = Some(postgres_url.to_string());
|
||||
let pool = cfg
|
||||
.create_pool(Some(Runtime::Tokio1), NoTls)
|
||||
|
||||
@@ -157,6 +157,11 @@ pub trait Election: Send + Sync {
|
||||
/// but only one can be the leader at a time.
|
||||
async fn campaign(&self) -> Result<()>;
|
||||
|
||||
/// Resets the campaign.
|
||||
///
|
||||
/// Reset the client and the leader flag if needed.
|
||||
async fn reset_campaign(&self) {}
|
||||
|
||||
/// Returns the leader value for the current election.
|
||||
async fn leader(&self) -> Result<Self::Leader>;
|
||||
|
||||
|
||||
@@ -18,11 +18,12 @@ use std::time::Duration;
|
||||
|
||||
use common_telemetry::{error, warn};
|
||||
use common_time::Timestamp;
|
||||
use deadpool_postgres::{Manager, Pool};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::{broadcast, RwLock};
|
||||
use tokio::time::MissedTickBehavior;
|
||||
use tokio_postgres::types::ToSql;
|
||||
use tokio_postgres::Client;
|
||||
use tokio_postgres::Row;
|
||||
|
||||
use crate::election::rds::{parse_value_and_expire_time, Lease, RdsLeaderKey, LEASE_SEP};
|
||||
use crate::election::{
|
||||
@@ -30,15 +31,14 @@ use crate::election::{
|
||||
CANDIDATES_ROOT, ELECTION_KEY,
|
||||
};
|
||||
use crate::error::{
|
||||
DeserializeFromJsonSnafu, NoLeaderSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu,
|
||||
UnexpectedSnafu,
|
||||
DeserializeFromJsonSnafu, GetPostgresClientSnafu, NoLeaderSnafu, PostgresExecutionSnafu,
|
||||
Result, SerializeToJsonSnafu, SqlExecutionTimeoutSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
|
||||
|
||||
struct ElectionSqlFactory<'a> {
|
||||
lock_id: u64,
|
||||
table_name: &'a str,
|
||||
meta_lease_ttl_secs: u64,
|
||||
}
|
||||
|
||||
struct ElectionSqlSet {
|
||||
@@ -88,11 +88,10 @@ struct ElectionSqlSet {
|
||||
}
|
||||
|
||||
impl<'a> ElectionSqlFactory<'a> {
|
||||
fn new(lock_id: u64, table_name: &'a str, meta_lease_ttl_secs: u64) -> Self {
|
||||
fn new(lock_id: u64, table_name: &'a str) -> Self {
|
||||
Self {
|
||||
lock_id,
|
||||
table_name,
|
||||
meta_lease_ttl_secs,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -108,15 +107,6 @@ impl<'a> ElectionSqlFactory<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
// Currently the session timeout is longer than the leader lease time.
|
||||
// So the leader will renew the lease twice before the session timeout if everything goes well.
|
||||
fn set_idle_session_timeout_sql(&self) -> String {
|
||||
format!(
|
||||
"SET idle_session_timeout = '{}s';",
|
||||
self.meta_lease_ttl_secs + 1
|
||||
)
|
||||
}
|
||||
|
||||
fn campaign_sql(&self) -> String {
|
||||
format!("SELECT pg_try_advisory_lock({})", self.lock_id)
|
||||
}
|
||||
@@ -171,46 +161,165 @@ impl<'a> ElectionSqlFactory<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
/// PgClient for election.
|
||||
pub struct ElectionPgClient {
|
||||
current: Option<deadpool::managed::Object<Manager>>,
|
||||
pool: Pool,
|
||||
/// The client-side timeout for statement execution.
|
||||
///
|
||||
/// This timeout is enforced by the client application and is independent of any server-side timeouts.
|
||||
/// If a statement takes longer than this duration to execute, the client will abort the operation.
|
||||
execution_timeout: Duration,
|
||||
|
||||
/// The idle session timeout.
|
||||
///
|
||||
/// This timeout is configured per client session and is enforced by the PostgreSQL server.
|
||||
/// If a session remains idle for longer than this duration, the server will terminate it.
|
||||
idle_session_timeout: Duration,
|
||||
|
||||
/// The statement timeout.
|
||||
///
|
||||
/// This timeout is configured per client session and is enforced by the PostgreSQL server.
|
||||
/// If a statement takes longer than this duration to execute, the server will abort it.
|
||||
statement_timeout: Duration,
|
||||
}
|
||||
|
||||
impl ElectionPgClient {
|
||||
pub fn new(
|
||||
pool: Pool,
|
||||
execution_timeout: Duration,
|
||||
idle_session_timeout: Duration,
|
||||
statement_timeout: Duration,
|
||||
) -> Result<ElectionPgClient> {
|
||||
Ok(ElectionPgClient {
|
||||
current: None,
|
||||
pool,
|
||||
execution_timeout,
|
||||
idle_session_timeout,
|
||||
statement_timeout,
|
||||
})
|
||||
}
|
||||
|
||||
fn set_idle_session_timeout_sql(&self) -> String {
|
||||
format!(
|
||||
"SET idle_session_timeout = '{}s';",
|
||||
self.idle_session_timeout.as_secs()
|
||||
)
|
||||
}
|
||||
|
||||
fn set_statement_timeout_sql(&self) -> String {
|
||||
format!(
|
||||
"SET statement_timeout = '{}s';",
|
||||
self.statement_timeout.as_secs()
|
||||
)
|
||||
}
|
||||
|
||||
async fn reset_client(&mut self) -> Result<()> {
|
||||
self.current = None;
|
||||
self.maybe_init_client().await
|
||||
}
|
||||
|
||||
async fn maybe_init_client(&mut self) -> Result<()> {
|
||||
if self.current.is_none() {
|
||||
let client = self.pool.get().await.context(GetPostgresClientSnafu)?;
|
||||
|
||||
self.current = Some(client);
|
||||
// Set idle session timeout and statement timeout.
|
||||
let idle_session_timeout_sql = self.set_idle_session_timeout_sql();
|
||||
self.execute(&idle_session_timeout_sql, &[]).await?;
|
||||
let statement_timeout_sql = self.set_statement_timeout_sql();
|
||||
self.execute(&statement_timeout_sql, &[]).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the result of the query.
|
||||
///
|
||||
/// # Panics
|
||||
/// if `current` is `None`.
|
||||
async fn execute(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64> {
|
||||
let result = tokio::time::timeout(
|
||||
self.execution_timeout,
|
||||
self.current.as_ref().unwrap().execute(sql, params),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| {
|
||||
SqlExecutionTimeoutSnafu {
|
||||
sql: sql.to_string(),
|
||||
duration: self.execution_timeout,
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
|
||||
result.context(PostgresExecutionSnafu { sql })
|
||||
}
|
||||
|
||||
/// Returns the result of the query.
|
||||
///
|
||||
/// # Panics
|
||||
/// if `current` is `None`.
|
||||
async fn query(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>> {
|
||||
let result = tokio::time::timeout(
|
||||
self.execution_timeout,
|
||||
self.current.as_ref().unwrap().query(sql, params),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| {
|
||||
SqlExecutionTimeoutSnafu {
|
||||
sql: sql.to_string(),
|
||||
duration: self.execution_timeout,
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
|
||||
result.context(PostgresExecutionSnafu { sql })
|
||||
}
|
||||
}
|
||||
|
||||
/// PostgreSql implementation of Election.
|
||||
pub struct PgElection {
|
||||
leader_value: String,
|
||||
client: Client,
|
||||
pg_client: RwLock<ElectionPgClient>,
|
||||
is_leader: AtomicBool,
|
||||
leader_infancy: AtomicBool,
|
||||
leader_watcher: broadcast::Sender<LeaderChangeMessage>,
|
||||
store_key_prefix: String,
|
||||
candidate_lease_ttl_secs: u64,
|
||||
meta_lease_ttl_secs: u64,
|
||||
candidate_lease_ttl: Duration,
|
||||
meta_lease_ttl: Duration,
|
||||
sql_set: ElectionSqlSet,
|
||||
}
|
||||
|
||||
impl PgElection {
|
||||
async fn maybe_init_client(&self) -> Result<()> {
|
||||
if self.pg_client.read().await.current.is_none() {
|
||||
self.pg_client.write().await.maybe_init_client().await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn with_pg_client(
|
||||
leader_value: String,
|
||||
client: Client,
|
||||
pg_client: ElectionPgClient,
|
||||
store_key_prefix: String,
|
||||
candidate_lease_ttl_secs: u64,
|
||||
meta_lease_ttl_secs: u64,
|
||||
candidate_lease_ttl: Duration,
|
||||
meta_lease_ttl: Duration,
|
||||
table_name: &str,
|
||||
lock_id: u64,
|
||||
) -> Result<ElectionRef> {
|
||||
let sql_factory = ElectionSqlFactory::new(lock_id, table_name, meta_lease_ttl_secs);
|
||||
// Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead advisory lock.
|
||||
client
|
||||
.execute(&sql_factory.set_idle_session_timeout_sql(), &[])
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
let sql_factory = ElectionSqlFactory::new(lock_id, table_name);
|
||||
|
||||
let tx = listen_leader_change(leader_value.clone());
|
||||
Ok(Arc::new(Self {
|
||||
leader_value,
|
||||
client,
|
||||
pg_client: RwLock::new(pg_client),
|
||||
is_leader: AtomicBool::new(false),
|
||||
leader_infancy: AtomicBool::new(false),
|
||||
leader_watcher: tx,
|
||||
store_key_prefix,
|
||||
candidate_lease_ttl_secs,
|
||||
meta_lease_ttl_secs,
|
||||
candidate_lease_ttl,
|
||||
meta_lease_ttl,
|
||||
sql_set: sql_factory.build(),
|
||||
}))
|
||||
}
|
||||
@@ -249,18 +358,17 @@ impl Election for PgElection {
|
||||
input: format!("{node_info:?}"),
|
||||
})?;
|
||||
let res = self
|
||||
.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl_secs)
|
||||
.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
|
||||
.await?;
|
||||
// May registered before, just update the lease.
|
||||
if !res {
|
||||
self.delete_value(&key).await?;
|
||||
self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl_secs)
|
||||
self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Check if the current lease has expired and renew the lease.
|
||||
let mut keep_alive_interval =
|
||||
tokio::time::interval(Duration::from_secs(self.candidate_lease_ttl_secs / 2));
|
||||
let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2);
|
||||
loop {
|
||||
let _ = keep_alive_interval.tick().await;
|
||||
|
||||
@@ -282,13 +390,8 @@ impl Election for PgElection {
|
||||
);
|
||||
|
||||
// Safety: origin is Some since we are using `get_value_with_lease` with `true`.
|
||||
self.update_value_with_lease(
|
||||
&key,
|
||||
&lease.origin,
|
||||
&node_info,
|
||||
self.candidate_lease_ttl_secs,
|
||||
)
|
||||
.await?;
|
||||
self.update_value_with_lease(&key, &lease.origin, &node_info, self.candidate_lease_ttl)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -321,16 +424,17 @@ impl Election for PgElection {
|
||||
/// - If the lock is not acquired (result is false), it calls the `follower_action` method
|
||||
/// to perform actions as a follower.
|
||||
async fn campaign(&self) -> Result<()> {
|
||||
let mut keep_alive_interval =
|
||||
tokio::time::interval(Duration::from_secs(self.meta_lease_ttl_secs / 2));
|
||||
let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2);
|
||||
keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
|
||||
|
||||
self.maybe_init_client().await?;
|
||||
loop {
|
||||
let res = self
|
||||
.client
|
||||
.query(&self.sql_set.campaign, &[])
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
.query(&self.sql_set.campaign, &[])
|
||||
.await?;
|
||||
let row = res.first().context(UnexpectedSnafu {
|
||||
violated: "Failed to get the result of acquiring advisory lock",
|
||||
})?;
|
||||
@@ -349,6 +453,12 @@ impl Election for PgElection {
|
||||
}
|
||||
}
|
||||
|
||||
async fn reset_campaign(&self) {
|
||||
if let Err(err) = self.pg_client.write().await.reset_client().await {
|
||||
error!(err; "Failed to reset client");
|
||||
}
|
||||
}
|
||||
|
||||
async fn leader(&self) -> Result<Self::Leader> {
|
||||
if self.is_leader.load(Ordering::Relaxed) {
|
||||
Ok(self.leader_value.as_bytes().into())
|
||||
@@ -376,11 +486,13 @@ impl PgElection {
|
||||
/// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned.
|
||||
async fn get_value_with_lease(&self, key: &str) -> Result<Option<Lease>> {
|
||||
let key = key.as_bytes();
|
||||
self.maybe_init_client().await?;
|
||||
let res = self
|
||||
.client
|
||||
.query(&self.sql_set.get_value_with_lease, &[&key])
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
.query(&self.sql_set.get_value_with_lease, &[&key])
|
||||
.await?;
|
||||
|
||||
if res.is_empty() {
|
||||
Ok(None)
|
||||
@@ -414,11 +526,13 @@ impl PgElection {
|
||||
key_prefix: &str,
|
||||
) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
|
||||
let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
|
||||
self.maybe_init_client().await?;
|
||||
let res = self
|
||||
.client
|
||||
.query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix])
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
.query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix])
|
||||
.await?;
|
||||
|
||||
let mut values_with_leases = vec![];
|
||||
let mut current = Timestamp::default();
|
||||
@@ -445,18 +559,21 @@ impl PgElection {
|
||||
key: &str,
|
||||
prev: &str,
|
||||
updated: &str,
|
||||
lease_ttl: u64,
|
||||
lease_ttl: Duration,
|
||||
) -> Result<()> {
|
||||
let key = key.as_bytes();
|
||||
let prev = prev.as_bytes();
|
||||
self.maybe_init_client().await?;
|
||||
let lease_ttl_secs = lease_ttl.as_secs() as f64;
|
||||
let res = self
|
||||
.client
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.execute(
|
||||
&self.sql_set.update_value_with_lease,
|
||||
&[&key, &prev, &updated, &(lease_ttl as f64)],
|
||||
&[&key, &prev, &updated, &lease_ttl_secs],
|
||||
)
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
.await?;
|
||||
|
||||
ensure!(
|
||||
res == 1,
|
||||
@@ -473,16 +590,18 @@ impl PgElection {
|
||||
&self,
|
||||
key: &str,
|
||||
value: &str,
|
||||
lease_ttl_secs: u64,
|
||||
lease_ttl: Duration,
|
||||
) -> Result<bool> {
|
||||
let key = key.as_bytes();
|
||||
let lease_ttl_secs = lease_ttl_secs as f64;
|
||||
let lease_ttl_secs = lease_ttl.as_secs() as f64;
|
||||
let params: Vec<&(dyn ToSql + Sync)> = vec![&key, &value, &lease_ttl_secs];
|
||||
self.maybe_init_client().await?;
|
||||
let res = self
|
||||
.client
|
||||
.query(&self.sql_set.put_value_with_lease, ¶ms)
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
.query(&self.sql_set.put_value_with_lease, ¶ms)
|
||||
.await?;
|
||||
Ok(res.is_empty())
|
||||
}
|
||||
|
||||
@@ -490,11 +609,13 @@ impl PgElection {
|
||||
/// Caution: Should only delete the key if the lease is expired.
|
||||
async fn delete_value(&self, key: &str) -> Result<bool> {
|
||||
let key = key.as_bytes();
|
||||
self.maybe_init_client().await?;
|
||||
let res = self
|
||||
.client
|
||||
.query(&self.sql_set.delete_value, &[&key])
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
.query(&self.sql_set.delete_value, &[&key])
|
||||
.await?;
|
||||
|
||||
Ok(res.len() == 1)
|
||||
}
|
||||
@@ -536,7 +657,7 @@ impl PgElection {
|
||||
&key,
|
||||
&lease.origin,
|
||||
&self.leader_value,
|
||||
self.meta_lease_ttl_secs,
|
||||
self.meta_lease_ttl,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
@@ -605,10 +726,12 @@ impl PgElection {
|
||||
..Default::default()
|
||||
};
|
||||
self.delete_value(&key).await?;
|
||||
self.client
|
||||
.query(&self.sql_set.step_down, &[])
|
||||
self.maybe_init_client().await?;
|
||||
self.pg_client
|
||||
.read()
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
.query(&self.sql_set.step_down, &[])
|
||||
.await?;
|
||||
send_leader_change_and_set_flags(
|
||||
&self.is_leader,
|
||||
&self.leader_infancy,
|
||||
@@ -651,7 +774,7 @@ impl PgElection {
|
||||
..Default::default()
|
||||
};
|
||||
self.delete_value(&key).await?;
|
||||
self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl_secs)
|
||||
self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl)
|
||||
.await?;
|
||||
|
||||
if self
|
||||
@@ -674,15 +797,21 @@ impl PgElection {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::env;
|
||||
|
||||
use common_meta::maybe_skip_postgres_integration_test;
|
||||
use tokio_postgres::{Client, NoTls};
|
||||
|
||||
use super::*;
|
||||
use crate::error::PostgresExecutionSnafu;
|
||||
use crate::bootstrap::create_postgres_pool;
|
||||
use crate::error;
|
||||
|
||||
async fn create_postgres_client(table_name: Option<&str>) -> Result<Client> {
|
||||
async fn create_postgres_client(
|
||||
table_name: Option<&str>,
|
||||
execution_timeout: Duration,
|
||||
idle_session_timeout: Duration,
|
||||
statement_timeout: Duration,
|
||||
) -> Result<ElectionPgClient> {
|
||||
let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default();
|
||||
if endpoint.is_empty() {
|
||||
return UnexpectedSnafu {
|
||||
@@ -690,25 +819,34 @@ mod tests {
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
let (client, connection) = tokio_postgres::connect(&endpoint, NoTls)
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
tokio::spawn(async move {
|
||||
connection.await.context(PostgresExecutionSnafu).unwrap();
|
||||
});
|
||||
let pool = create_postgres_pool(&[endpoint]).await.unwrap();
|
||||
let mut pg_client = ElectionPgClient::new(
|
||||
pool,
|
||||
execution_timeout,
|
||||
idle_session_timeout,
|
||||
statement_timeout,
|
||||
)
|
||||
.unwrap();
|
||||
pg_client.maybe_init_client().await?;
|
||||
if let Some(table_name) = table_name {
|
||||
let create_table_sql = format!(
|
||||
"CREATE TABLE IF NOT EXISTS \"{}\"(k bytea PRIMARY KEY, v bytea);",
|
||||
table_name
|
||||
);
|
||||
client.execute(&create_table_sql, &[]).await.unwrap();
|
||||
pg_client.execute(&create_table_sql, &[]).await?;
|
||||
}
|
||||
Ok(client)
|
||||
Ok(pg_client)
|
||||
}
|
||||
|
||||
async fn drop_table(client: &Client, table_name: &str) {
|
||||
async fn drop_table(pg_election: &PgElection, table_name: &str) {
|
||||
let sql = format!("DROP TABLE IF EXISTS \"{}\";", table_name);
|
||||
client.execute(&sql, &[]).await.unwrap();
|
||||
pg_election
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.execute(&sql, &[])
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -719,23 +857,35 @@ mod tests {
|
||||
|
||||
let uuid = uuid::Uuid::new_v4().to_string();
|
||||
let table_name = "test_postgres_crud_greptime_metakv";
|
||||
let client = create_postgres_client(Some(table_name)).await.unwrap();
|
||||
let candidate_lease_ttl = Duration::from_secs(10);
|
||||
let execution_timeout = Duration::from_secs(10);
|
||||
let statement_timeout = Duration::from_secs(10);
|
||||
let meta_lease_ttl = Duration::from_secs(2);
|
||||
let idle_session_timeout = Duration::from_secs(0);
|
||||
let client = create_postgres_client(
|
||||
Some(table_name),
|
||||
execution_timeout,
|
||||
idle_session_timeout,
|
||||
statement_timeout,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let (tx, _) = broadcast::channel(100);
|
||||
let pg_election = PgElection {
|
||||
leader_value: "test_leader".to_string(),
|
||||
client,
|
||||
pg_client: RwLock::new(client),
|
||||
is_leader: AtomicBool::new(false),
|
||||
leader_infancy: AtomicBool::new(true),
|
||||
leader_watcher: tx,
|
||||
store_key_prefix: uuid,
|
||||
candidate_lease_ttl_secs: 10,
|
||||
meta_lease_ttl_secs: 2,
|
||||
sql_set: ElectionSqlFactory::new(28319, table_name, 2).build(),
|
||||
candidate_lease_ttl,
|
||||
meta_lease_ttl,
|
||||
sql_set: ElectionSqlFactory::new(28319, table_name).build(),
|
||||
};
|
||||
|
||||
let res = pg_election
|
||||
.put_value_with_lease(&key, &value, 10)
|
||||
.put_value_with_lease(&key, &value, candidate_lease_ttl)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(res);
|
||||
@@ -748,7 +898,7 @@ mod tests {
|
||||
assert_eq!(lease.leader_value, value);
|
||||
|
||||
pg_election
|
||||
.update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl_secs)
|
||||
.update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -762,7 +912,7 @@ mod tests {
|
||||
let key = format!("test_key_{}", i);
|
||||
let value = format!("test_value_{}", i);
|
||||
pg_election
|
||||
.put_value_with_lease(&key, &value, 10)
|
||||
.put_value_with_lease(&key, &value, candidate_lease_ttl)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
@@ -787,28 +937,39 @@ mod tests {
|
||||
assert!(res.is_empty());
|
||||
assert!(current == Timestamp::default());
|
||||
|
||||
drop_table(&pg_election.client, table_name).await;
|
||||
drop_table(&pg_election, table_name).await;
|
||||
}
|
||||
|
||||
async fn candidate(
|
||||
leader_value: String,
|
||||
candidate_lease_ttl_secs: u64,
|
||||
candidate_lease_ttl: Duration,
|
||||
store_key_prefix: String,
|
||||
table_name: String,
|
||||
) {
|
||||
let client = create_postgres_client(None).await.unwrap();
|
||||
let execution_timeout = Duration::from_secs(10);
|
||||
let statement_timeout = Duration::from_secs(10);
|
||||
let meta_lease_ttl = Duration::from_secs(2);
|
||||
let idle_session_timeout = Duration::from_secs(0);
|
||||
let client = create_postgres_client(
|
||||
None,
|
||||
execution_timeout,
|
||||
idle_session_timeout,
|
||||
statement_timeout,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let (tx, _) = broadcast::channel(100);
|
||||
let pg_election = PgElection {
|
||||
leader_value,
|
||||
client,
|
||||
pg_client: RwLock::new(client),
|
||||
is_leader: AtomicBool::new(false),
|
||||
leader_infancy: AtomicBool::new(true),
|
||||
leader_watcher: tx,
|
||||
store_key_prefix,
|
||||
candidate_lease_ttl_secs,
|
||||
meta_lease_ttl_secs: 2,
|
||||
sql_set: ElectionSqlFactory::new(28319, &table_name, 2).build(),
|
||||
candidate_lease_ttl,
|
||||
meta_lease_ttl,
|
||||
sql_set: ElectionSqlFactory::new(28319, &table_name).build(),
|
||||
};
|
||||
|
||||
let node_info = MetasrvNodeInfo {
|
||||
@@ -824,17 +985,28 @@ mod tests {
|
||||
async fn test_candidate_registration() {
|
||||
maybe_skip_postgres_integration_test!();
|
||||
let leader_value_prefix = "test_leader".to_string();
|
||||
let candidate_lease_ttl_secs = 5;
|
||||
let uuid = uuid::Uuid::new_v4().to_string();
|
||||
let table_name = "test_candidate_registration_greptime_metakv";
|
||||
let mut handles = vec![];
|
||||
let client = create_postgres_client(Some(table_name)).await.unwrap();
|
||||
let candidate_lease_ttl = Duration::from_secs(5);
|
||||
let execution_timeout = Duration::from_secs(10);
|
||||
let statement_timeout = Duration::from_secs(10);
|
||||
let meta_lease_ttl = Duration::from_secs(2);
|
||||
let idle_session_timeout = Duration::from_secs(0);
|
||||
let client = create_postgres_client(
|
||||
Some(table_name),
|
||||
execution_timeout,
|
||||
idle_session_timeout,
|
||||
statement_timeout,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
for i in 0..10 {
|
||||
let leader_value = format!("{}{}", leader_value_prefix, i);
|
||||
let handle = tokio::spawn(candidate(
|
||||
leader_value,
|
||||
candidate_lease_ttl_secs,
|
||||
candidate_lease_ttl,
|
||||
uuid.clone(),
|
||||
table_name.to_string(),
|
||||
));
|
||||
@@ -847,14 +1019,14 @@ mod tests {
|
||||
let leader_value = "test_leader".to_string();
|
||||
let pg_election = PgElection {
|
||||
leader_value,
|
||||
client,
|
||||
pg_client: RwLock::new(client),
|
||||
is_leader: AtomicBool::new(false),
|
||||
leader_infancy: AtomicBool::new(true),
|
||||
leader_watcher: tx,
|
||||
store_key_prefix: uuid.clone(),
|
||||
candidate_lease_ttl_secs,
|
||||
meta_lease_ttl_secs: 2,
|
||||
sql_set: ElectionSqlFactory::new(28319, table_name, 2).build(),
|
||||
candidate_lease_ttl,
|
||||
meta_lease_ttl,
|
||||
sql_set: ElectionSqlFactory::new(28319, table_name).build(),
|
||||
};
|
||||
|
||||
let candidates = pg_election.all_candidates().await.unwrap();
|
||||
@@ -876,29 +1048,40 @@ mod tests {
|
||||
assert!(res);
|
||||
}
|
||||
|
||||
drop_table(&pg_election.client, table_name).await;
|
||||
drop_table(&pg_election, table_name).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_elected_and_step_down() {
|
||||
maybe_skip_postgres_integration_test!();
|
||||
let leader_value = "test_leader".to_string();
|
||||
let candidate_lease_ttl_secs = 5;
|
||||
let uuid = uuid::Uuid::new_v4().to_string();
|
||||
let table_name = "test_elected_and_step_down_greptime_metakv";
|
||||
let client = create_postgres_client(Some(table_name)).await.unwrap();
|
||||
let candidate_lease_ttl = Duration::from_secs(5);
|
||||
let execution_timeout = Duration::from_secs(10);
|
||||
let statement_timeout = Duration::from_secs(10);
|
||||
let meta_lease_ttl = Duration::from_secs(2);
|
||||
let idle_session_timeout = Duration::from_secs(0);
|
||||
let client = create_postgres_client(
|
||||
Some(table_name),
|
||||
execution_timeout,
|
||||
idle_session_timeout,
|
||||
statement_timeout,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let (tx, mut rx) = broadcast::channel(100);
|
||||
let leader_pg_election = PgElection {
|
||||
leader_value: leader_value.clone(),
|
||||
client,
|
||||
pg_client: RwLock::new(client),
|
||||
is_leader: AtomicBool::new(false),
|
||||
leader_infancy: AtomicBool::new(true),
|
||||
leader_watcher: tx,
|
||||
store_key_prefix: uuid,
|
||||
candidate_lease_ttl_secs,
|
||||
meta_lease_ttl_secs: 2,
|
||||
sql_set: ElectionSqlFactory::new(28320, table_name, 2).build(),
|
||||
candidate_lease_ttl,
|
||||
meta_lease_ttl,
|
||||
sql_set: ElectionSqlFactory::new(28320, table_name).build(),
|
||||
};
|
||||
|
||||
leader_pg_election.elected().await.unwrap();
|
||||
@@ -990,7 +1173,7 @@ mod tests {
|
||||
_ => panic!("Expected LeaderChangeMessage::StepDown"),
|
||||
}
|
||||
|
||||
drop_table(&leader_pg_election.client, table_name).await;
|
||||
drop_table(&leader_pg_election, table_name).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -999,25 +1182,38 @@ mod tests {
|
||||
let leader_value = "test_leader".to_string();
|
||||
let uuid = uuid::Uuid::new_v4().to_string();
|
||||
let table_name = "test_leader_action_greptime_metakv";
|
||||
let candidate_lease_ttl_secs = 5;
|
||||
let client = create_postgres_client(Some(table_name)).await.unwrap();
|
||||
let candidate_lease_ttl = Duration::from_secs(5);
|
||||
let execution_timeout = Duration::from_secs(10);
|
||||
let statement_timeout = Duration::from_secs(10);
|
||||
let meta_lease_ttl = Duration::from_secs(2);
|
||||
let idle_session_timeout = Duration::from_secs(0);
|
||||
let client = create_postgres_client(
|
||||
Some(table_name),
|
||||
execution_timeout,
|
||||
idle_session_timeout,
|
||||
statement_timeout,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let (tx, mut rx) = broadcast::channel(100);
|
||||
let leader_pg_election = PgElection {
|
||||
leader_value: leader_value.clone(),
|
||||
client,
|
||||
pg_client: RwLock::new(client),
|
||||
is_leader: AtomicBool::new(false),
|
||||
leader_infancy: AtomicBool::new(true),
|
||||
leader_watcher: tx,
|
||||
store_key_prefix: uuid,
|
||||
candidate_lease_ttl_secs,
|
||||
meta_lease_ttl_secs: 2,
|
||||
sql_set: ElectionSqlFactory::new(28321, table_name, 2).build(),
|
||||
candidate_lease_ttl,
|
||||
meta_lease_ttl,
|
||||
sql_set: ElectionSqlFactory::new(28321, table_name).build(),
|
||||
};
|
||||
|
||||
// Step 1: No leader exists, campaign and elected.
|
||||
let res = leader_pg_election
|
||||
.client
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.query(&leader_pg_election.sql_set.campaign, &[])
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1048,7 +1244,9 @@ mod tests {
|
||||
|
||||
// Step 2: As a leader, renew the lease.
|
||||
let res = leader_pg_election
|
||||
.client
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.query(&leader_pg_election.sql_set.campaign, &[])
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1070,7 +1268,9 @@ mod tests {
|
||||
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||
|
||||
let res = leader_pg_election
|
||||
.client
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.query(&leader_pg_election.sql_set.campaign, &[])
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1098,7 +1298,9 @@ mod tests {
|
||||
|
||||
// Step 4: Re-campaign and elected.
|
||||
let res = leader_pg_election
|
||||
.client
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.query(&leader_pg_election.sql_set.campaign, &[])
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1155,7 +1357,9 @@ mod tests {
|
||||
|
||||
// Step 6: Re-campaign and elected.
|
||||
let res = leader_pg_election
|
||||
.client
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.query(&leader_pg_election.sql_set.campaign, &[])
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1186,7 +1390,9 @@ mod tests {
|
||||
|
||||
// Step 7: Something wrong, the leader key changed by others.
|
||||
let res = leader_pg_election
|
||||
.client
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.query(&leader_pg_election.sql_set.campaign, &[])
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1197,7 +1403,11 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
leader_pg_election
|
||||
.put_value_with_lease(&leader_pg_election.election_key(), "test", 10)
|
||||
.put_value_with_lease(
|
||||
&leader_pg_election.election_key(),
|
||||
"test",
|
||||
Duration::from_secs(10),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
leader_pg_election.leader_action().await.unwrap();
|
||||
@@ -1223,52 +1433,74 @@ mod tests {
|
||||
|
||||
// Clean up
|
||||
leader_pg_election
|
||||
.client
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.query(&leader_pg_election.sql_set.step_down, &[])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
drop_table(&leader_pg_election.client, table_name).await;
|
||||
drop_table(&leader_pg_election, table_name).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_follower_action() {
|
||||
maybe_skip_postgres_integration_test!();
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let candidate_lease_ttl_secs = 5;
|
||||
let uuid = uuid::Uuid::new_v4().to_string();
|
||||
let table_name = "test_follower_action_greptime_metakv";
|
||||
|
||||
let follower_client = create_postgres_client(Some(table_name)).await.unwrap();
|
||||
let candidate_lease_ttl = Duration::from_secs(5);
|
||||
let execution_timeout = Duration::from_secs(10);
|
||||
let statement_timeout = Duration::from_secs(10);
|
||||
let meta_lease_ttl = Duration::from_secs(2);
|
||||
let idle_session_timeout = Duration::from_secs(0);
|
||||
let follower_client = create_postgres_client(
|
||||
Some(table_name),
|
||||
execution_timeout,
|
||||
idle_session_timeout,
|
||||
statement_timeout,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let (tx, mut rx) = broadcast::channel(100);
|
||||
let follower_pg_election = PgElection {
|
||||
leader_value: "test_follower".to_string(),
|
||||
client: follower_client,
|
||||
pg_client: RwLock::new(follower_client),
|
||||
is_leader: AtomicBool::new(false),
|
||||
leader_infancy: AtomicBool::new(true),
|
||||
leader_watcher: tx,
|
||||
store_key_prefix: uuid.clone(),
|
||||
candidate_lease_ttl_secs,
|
||||
meta_lease_ttl_secs: 2,
|
||||
sql_set: ElectionSqlFactory::new(28322, table_name, 2).build(),
|
||||
candidate_lease_ttl,
|
||||
meta_lease_ttl,
|
||||
sql_set: ElectionSqlFactory::new(28322, table_name).build(),
|
||||
};
|
||||
|
||||
let leader_client = create_postgres_client(Some(table_name)).await.unwrap();
|
||||
let leader_client = create_postgres_client(
|
||||
Some(table_name),
|
||||
execution_timeout,
|
||||
idle_session_timeout,
|
||||
statement_timeout,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let (tx, _) = broadcast::channel(100);
|
||||
let leader_pg_election = PgElection {
|
||||
leader_value: "test_leader".to_string(),
|
||||
client: leader_client,
|
||||
pg_client: RwLock::new(leader_client),
|
||||
is_leader: AtomicBool::new(false),
|
||||
leader_infancy: AtomicBool::new(true),
|
||||
leader_watcher: tx,
|
||||
store_key_prefix: uuid,
|
||||
candidate_lease_ttl_secs,
|
||||
meta_lease_ttl_secs: 2,
|
||||
sql_set: ElectionSqlFactory::new(28322, table_name, 2).build(),
|
||||
candidate_lease_ttl,
|
||||
meta_lease_ttl,
|
||||
sql_set: ElectionSqlFactory::new(28322, table_name).build(),
|
||||
};
|
||||
|
||||
leader_pg_election
|
||||
.client
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.query(&leader_pg_election.sql_set.campaign, &[])
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1309,11 +1541,41 @@ mod tests {
|
||||
|
||||
// Clean up
|
||||
leader_pg_election
|
||||
.client
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.query(&leader_pg_election.sql_set.step_down, &[])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
drop_table(&follower_pg_election.client, table_name).await;
|
||||
drop_table(&follower_pg_election, table_name).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_idle_session_timeout() {
|
||||
maybe_skip_postgres_integration_test!();
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let execution_timeout = Duration::from_secs(10);
|
||||
let statement_timeout = Duration::from_secs(10);
|
||||
let idle_session_timeout = Duration::from_secs(1);
|
||||
let mut client = create_postgres_client(
|
||||
None,
|
||||
execution_timeout,
|
||||
idle_session_timeout,
|
||||
statement_timeout,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
tokio::time::sleep(Duration::from_millis(1100)).await;
|
||||
// Wait for the idle session timeout.
|
||||
let err = client.query("SELECT 1", &[]).await.unwrap_err();
|
||||
assert_matches!(err, error::Error::PostgresExecution { .. });
|
||||
let error::Error::PostgresExecution { error, .. } = err else {
|
||||
panic!("Expected PostgresExecution error");
|
||||
};
|
||||
assert!(error.is_closed());
|
||||
// Reset the client and try again.
|
||||
client.reset_client().await.unwrap();
|
||||
let _ = client.query("SELECT 1", &[]).await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -748,21 +748,31 @@ pub enum Error {
|
||||
},
|
||||
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
#[snafu(display("Failed to execute via postgres"))]
|
||||
#[snafu(display("Failed to execute via postgres, sql: {}", sql))]
|
||||
PostgresExecution {
|
||||
#[snafu(source)]
|
||||
error: tokio_postgres::Error,
|
||||
sql: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
#[snafu(display("Failed to connect to Postgres"))]
|
||||
ConnectPostgres {
|
||||
#[snafu(source)]
|
||||
error: tokio_postgres::Error,
|
||||
#[snafu(display("Failed to get Postgres client"))]
|
||||
GetPostgresClient {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
#[snafu(source)]
|
||||
error: deadpool::managed::PoolError<tokio_postgres::Error>,
|
||||
},
|
||||
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
#[snafu(display("Sql execution timeout, sql: {}, duration: {:?}", sql, duration))]
|
||||
SqlExecutionTimeout {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
sql: String,
|
||||
duration: std::time::Duration,
|
||||
},
|
||||
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
@@ -1005,9 +1015,10 @@ impl ErrorExt for Error {
|
||||
Error::LookupPeer { source, .. } => source.status_code(),
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
Error::CreatePostgresPool { .. }
|
||||
| Error::GetPostgresClient { .. }
|
||||
| Error::GetPostgresConnection { .. }
|
||||
| Error::PostgresExecution { .. }
|
||||
| Error::ConnectPostgres { .. } => StatusCode::Internal,
|
||||
| Error::SqlExecutionTimeout { .. } => StatusCode::Internal,
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
Error::MySqlExecution { .. }
|
||||
| Error::CreateMySqlPool { .. }
|
||||
|
||||
@@ -46,6 +46,7 @@ use common_telemetry::{error, info, warn};
|
||||
use common_wal::config::MetasrvWalConfig;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use servers::export_metrics::ExportMetricsOption;
|
||||
use servers::grpc::GrpcOptions;
|
||||
use servers::http::HttpOptions;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
@@ -96,8 +97,10 @@ pub enum BackendImpl {
|
||||
#[serde(default)]
|
||||
pub struct MetasrvOptions {
|
||||
/// The address the server listens on.
|
||||
#[deprecated(note = "Use grpc.bind_addr instead")]
|
||||
pub bind_addr: String,
|
||||
/// The address the server advertises to the clients.
|
||||
#[deprecated(note = "Use grpc.server_addr instead")]
|
||||
pub server_addr: String,
|
||||
/// The address of the store, e.g., etcd.
|
||||
pub store_addrs: Vec<String>,
|
||||
@@ -112,6 +115,7 @@ pub struct MetasrvOptions {
|
||||
/// If it's true, the region failover will be allowed even if the local WAL is used.
|
||||
/// Note that this option is not recommended to be set to true, because it may lead to data loss during failover.
|
||||
pub allow_region_failover_on_local_wal: bool,
|
||||
pub grpc: GrpcOptions,
|
||||
/// The HTTP server options.
|
||||
pub http: HttpOptions,
|
||||
/// The logging options.
|
||||
@@ -166,8 +170,6 @@ impl fmt::Debug for MetasrvOptions {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
let mut debug_struct = f.debug_struct("MetasrvOptions");
|
||||
debug_struct
|
||||
.field("bind_addr", &self.bind_addr)
|
||||
.field("server_addr", &self.server_addr)
|
||||
.field("store_addrs", &self.sanitize_store_addrs())
|
||||
.field("selector", &self.selector)
|
||||
.field("use_memory_store", &self.use_memory_store)
|
||||
@@ -176,6 +178,7 @@ impl fmt::Debug for MetasrvOptions {
|
||||
"allow_region_failover_on_local_wal",
|
||||
&self.allow_region_failover_on_local_wal,
|
||||
)
|
||||
.field("grpc", &self.grpc)
|
||||
.field("http", &self.http)
|
||||
.field("logging", &self.logging)
|
||||
.field("procedure", &self.procedure)
|
||||
@@ -208,14 +211,19 @@ const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
|
||||
impl Default for MetasrvOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
|
||||
// If server_addr is not set, the server will use the local ip address as the server address.
|
||||
#[allow(deprecated)]
|
||||
bind_addr: String::new(),
|
||||
#[allow(deprecated)]
|
||||
server_addr: String::new(),
|
||||
store_addrs: vec!["127.0.0.1:2379".to_string()],
|
||||
selector: SelectorType::default(),
|
||||
use_memory_store: false,
|
||||
enable_region_failover: false,
|
||||
allow_region_failover_on_local_wal: false,
|
||||
grpc: GrpcOptions {
|
||||
bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
|
||||
..Default::default()
|
||||
},
|
||||
http: HttpOptions::default(),
|
||||
logging: LoggingOptions::default(),
|
||||
procedure: ProcedureConfig {
|
||||
@@ -253,37 +261,6 @@ impl Configurable for MetasrvOptions {
|
||||
}
|
||||
|
||||
impl MetasrvOptions {
|
||||
/// Detect server address.
|
||||
#[cfg(not(target_os = "android"))]
|
||||
pub fn detect_server_addr(&mut self) {
|
||||
if self.server_addr.is_empty() {
|
||||
match local_ip_address::local_ip() {
|
||||
Ok(ip) => {
|
||||
let detected_addr = format!(
|
||||
"{}:{}",
|
||||
ip,
|
||||
self.bind_addr
|
||||
.split(':')
|
||||
.nth(1)
|
||||
.unwrap_or(DEFAULT_METASRV_ADDR_PORT)
|
||||
);
|
||||
info!("Using detected: {} as server address", detected_addr);
|
||||
self.server_addr = detected_addr;
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to detect local ip address: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(target_os = "android")]
|
||||
pub fn detect_server_addr(&mut self) {
|
||||
if self.server_addr.is_empty() {
|
||||
common_telemetry::debug!("detect local IP is not supported on Android");
|
||||
}
|
||||
}
|
||||
|
||||
fn sanitize_store_addrs(&self) -> Vec<String> {
|
||||
self.store_addrs
|
||||
.iter()
|
||||
@@ -582,6 +559,7 @@ impl Metasrv {
|
||||
if let Err(e) = res {
|
||||
warn!(e; "Metasrv election error");
|
||||
}
|
||||
election.reset_campaign().await;
|
||||
info!("Metasrv re-initiate election");
|
||||
}
|
||||
info!("Metasrv stopped");
|
||||
@@ -638,7 +616,7 @@ impl Metasrv {
|
||||
pub fn node_info(&self) -> MetasrvNodeInfo {
|
||||
let build_info = common_version::build_info();
|
||||
MetasrvNodeInfo {
|
||||
addr: self.options().server_addr.clone(),
|
||||
addr: self.options().grpc.server_addr.clone(),
|
||||
version: build_info.version.to_string(),
|
||||
git_commit: build_info.commit_short.to_string(),
|
||||
start_time_ms: self.start_time_ms(),
|
||||
@@ -730,7 +708,7 @@ impl Metasrv {
|
||||
|
||||
#[inline]
|
||||
pub fn new_ctx(&self) -> Context {
|
||||
let server_addr = self.options().server_addr.clone();
|
||||
let server_addr = self.options().grpc.server_addr.clone();
|
||||
let in_memory = self.in_memory.clone();
|
||||
let kv_backend = self.kv_backend.clone();
|
||||
let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
|
||||
|
||||
@@ -179,8 +179,8 @@ impl MetasrvBuilder {
|
||||
let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
|
||||
|
||||
let state = Arc::new(RwLock::new(match election {
|
||||
None => State::leader(options.server_addr.to_string(), true),
|
||||
Some(_) => State::follower(options.server_addr.to_string()),
|
||||
None => State::leader(options.grpc.server_addr.to_string(), true),
|
||||
Some(_) => State::follower(options.grpc.server_addr.to_string()),
|
||||
}));
|
||||
|
||||
let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::new(
|
||||
@@ -203,7 +203,7 @@ impl MetasrvBuilder {
|
||||
));
|
||||
let maintenance_mode_manager = Arc::new(MaintenanceModeManager::new(kv_backend.clone()));
|
||||
let selector_ctx = SelectorContext {
|
||||
server_addr: options.server_addr.clone(),
|
||||
server_addr: options.grpc.server_addr.clone(),
|
||||
datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS,
|
||||
flownode_lease_secs: distributed_time_constants::FLOWNODE_LEASE_SECS,
|
||||
kv_backend: kv_backend.clone(),
|
||||
@@ -272,7 +272,7 @@ impl MetasrvBuilder {
|
||||
let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new(
|
||||
mailbox.clone(),
|
||||
MetasrvInfo {
|
||||
server_addr: options.server_addr.clone(),
|
||||
server_addr: options.grpc.server_addr.clone(),
|
||||
},
|
||||
));
|
||||
let peer_lookup_service = Arc::new(MetaPeerLookupService::new(meta_peer_client.clone()));
|
||||
@@ -315,7 +315,7 @@ impl MetasrvBuilder {
|
||||
memory_region_keeper.clone(),
|
||||
region_failure_detector_controller.clone(),
|
||||
mailbox.clone(),
|
||||
options.server_addr.clone(),
|
||||
options.grpc.server_addr.clone(),
|
||||
cache_invalidator.clone(),
|
||||
),
|
||||
));
|
||||
@@ -390,7 +390,7 @@ impl MetasrvBuilder {
|
||||
client: Arc::new(kafka_client),
|
||||
table_metadata_manager: table_metadata_manager.clone(),
|
||||
leader_region_registry: leader_region_registry.clone(),
|
||||
server_addr: options.server_addr.clone(),
|
||||
server_addr: options.grpc.server_addr.clone(),
|
||||
mailbox: mailbox.clone(),
|
||||
};
|
||||
let wal_prune_manager = WalPruneManager::new(
|
||||
|
||||
@@ -26,6 +26,7 @@ use common_meta::kv_backend::etcd::EtcdStore;
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
|
||||
use hyper_util::rt::TokioIo;
|
||||
use servers::grpc::GrpcOptions;
|
||||
use tonic::codec::CompressionEncoding;
|
||||
use tower::service_fn;
|
||||
|
||||
@@ -47,7 +48,10 @@ pub async fn mock_with_memstore() -> MockInfo {
|
||||
let in_memory = Arc::new(MemoryKvBackend::new());
|
||||
mock(
|
||||
MetasrvOptions {
|
||||
server_addr: "127.0.0.1:3002".to_string(),
|
||||
grpc: GrpcOptions {
|
||||
server_addr: "127.0.0.1:3002".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
..Default::default()
|
||||
},
|
||||
kv_backend,
|
||||
@@ -62,7 +66,10 @@ pub async fn mock_with_etcdstore(addr: &str) -> MockInfo {
|
||||
let kv_backend = EtcdStore::with_endpoints([addr], 128).await.unwrap();
|
||||
mock(
|
||||
MetasrvOptions {
|
||||
server_addr: "127.0.0.1:3002".to_string(),
|
||||
grpc: GrpcOptions {
|
||||
server_addr: "127.0.0.1:3002".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
..Default::default()
|
||||
},
|
||||
kv_backend,
|
||||
@@ -80,7 +87,7 @@ pub async fn mock(
|
||||
datanode_clients: Option<Arc<NodeClients>>,
|
||||
in_memory: Option<ResettableKvBackendRef>,
|
||||
) -> MockInfo {
|
||||
let server_addr = opts.server_addr.clone();
|
||||
let server_addr = opts.grpc.server_addr.clone();
|
||||
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
|
||||
|
||||
table_metadata_manager.init().await.unwrap();
|
||||
|
||||
@@ -88,7 +88,7 @@ impl cluster_server::Cluster for Metasrv {
|
||||
return Ok(Response::new(resp));
|
||||
}
|
||||
|
||||
let leader_addr = &self.options().server_addr;
|
||||
let leader_addr = &self.options().grpc.server_addr;
|
||||
let (leader, followers) = match self.election() {
|
||||
Some(election) => {
|
||||
let nodes = election.all_candidates().await?;
|
||||
|
||||
@@ -190,6 +190,7 @@ mod tests {
|
||||
use api::v1::meta::*;
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use common_telemetry::tracing_context::W3cTrace;
|
||||
use servers::grpc::GrpcOptions;
|
||||
use tonic::IntoRequest;
|
||||
|
||||
use super::get_node_id;
|
||||
@@ -203,7 +204,10 @@ mod tests {
|
||||
let metasrv = MetasrvBuilder::new()
|
||||
.kv_backend(kv_backend)
|
||||
.options(MetasrvOptions {
|
||||
server_addr: "127.0.0.1:3002".to_string(),
|
||||
grpc: GrpcOptions {
|
||||
server_addr: "127.0.0.1:3002".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
..Default::default()
|
||||
})
|
||||
.build()
|
||||
@@ -216,7 +220,7 @@ mod tests {
|
||||
|
||||
let res = metasrv.ask_leader(req.into_request()).await.unwrap();
|
||||
let res = res.into_inner();
|
||||
assert_eq!(metasrv.options().bind_addr, res.leader.unwrap().addr);
|
||||
assert_eq!(metasrv.options().grpc.server_addr, res.leader.unwrap().addr);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -962,19 +962,22 @@ impl StreamContext {
|
||||
}
|
||||
}
|
||||
}
|
||||
if verbose {
|
||||
write!(f, "{{")?;
|
||||
}
|
||||
write!(
|
||||
f,
|
||||
"partition_count={} ({} memtable ranges, {} file {} ranges)",
|
||||
"\"partition_count\":{{\"count\":{}, \"mem_ranges\":{}, \"files\":{}, \"file_ranges\":{}}}",
|
||||
self.ranges.len(),
|
||||
num_mem_ranges,
|
||||
self.input.num_files(),
|
||||
num_file_ranges,
|
||||
)?;
|
||||
if let Some(selector) = &self.input.series_row_selector {
|
||||
write!(f, ", selector={}", selector)?;
|
||||
write!(f, ", \"selector\":\"{}\"", selector)?;
|
||||
}
|
||||
if let Some(distribution) = &self.input.distribution {
|
||||
write!(f, ", distribution={}", distribution)?;
|
||||
write!(f, ", \"distribution\":\"{}\"", distribution)?;
|
||||
}
|
||||
|
||||
if verbose {
|
||||
@@ -991,14 +994,15 @@ impl StreamContext {
|
||||
|
||||
impl fmt::Debug for FileWrapper<'_> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
let (start, end) = self.file.time_range();
|
||||
write!(
|
||||
f,
|
||||
"[file={}, time_range=({}::{}, {}::{}), rows={}, size={}, index_size={}]",
|
||||
r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
|
||||
self.file.file_id(),
|
||||
self.file.time_range().0.value(),
|
||||
self.file.time_range().0.unit(),
|
||||
self.file.time_range().1.value(),
|
||||
self.file.time_range().1.unit(),
|
||||
start.value(),
|
||||
start.unit(),
|
||||
end.value(),
|
||||
end.unit(),
|
||||
self.file.num_rows(),
|
||||
self.file.size(),
|
||||
self.file.index_size()
|
||||
@@ -1014,25 +1018,22 @@ impl StreamContext {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
let output_schema = self.input.mapper.output_schema();
|
||||
if !output_schema.is_empty() {
|
||||
write!(f, ", projection=")?;
|
||||
f.debug_list()
|
||||
.entries(output_schema.column_schemas().iter().map(|col| &col.name))
|
||||
.finish()?;
|
||||
let names: Vec<_> = output_schema
|
||||
.column_schemas()
|
||||
.iter()
|
||||
.map(|col| &col.name)
|
||||
.collect();
|
||||
write!(f, ", \"projection\": {:?}", names)?;
|
||||
}
|
||||
if let Some(predicate) = &self.input.predicate.predicate() {
|
||||
if !predicate.exprs().is_empty() {
|
||||
write!(f, ", filters=[")?;
|
||||
for (i, expr) in predicate.exprs().iter().enumerate() {
|
||||
if i == predicate.exprs().len() - 1 {
|
||||
write!(f, "{}]", expr)?;
|
||||
} else {
|
||||
write!(f, "{}, ", expr)?;
|
||||
}
|
||||
}
|
||||
let exprs: Vec<_> =
|
||||
predicate.exprs().iter().map(|e| e.to_string()).collect();
|
||||
write!(f, ", \"filters\": {:?}", exprs)?;
|
||||
}
|
||||
}
|
||||
if !self.input.files.is_empty() {
|
||||
write!(f, ", files=")?;
|
||||
write!(f, ", \"files\": ")?;
|
||||
f.debug_list()
|
||||
.entries(self.input.files.iter().map(|file| FileWrapper { file }))
|
||||
.finish()?;
|
||||
|
||||
@@ -142,36 +142,36 @@ impl fmt::Debug for ScanMetricsSet {
|
||||
|
||||
write!(
|
||||
f,
|
||||
"{{prepare_scan_cost={prepare_scan_cost:?}, \
|
||||
build_reader_cost={build_reader_cost:?}, \
|
||||
scan_cost={scan_cost:?}, \
|
||||
convert_cost={convert_cost:?}, \
|
||||
yield_cost={yield_cost:?}, \
|
||||
total_cost={total_cost:?}, \
|
||||
num_rows={num_rows}, \
|
||||
num_batches={num_batches}, \
|
||||
num_mem_ranges={num_mem_ranges}, \
|
||||
num_file_ranges={num_file_ranges}, \
|
||||
build_parts_cost={build_parts_cost:?}, \
|
||||
rg_total={rg_total}, \
|
||||
rg_fulltext_filtered={rg_fulltext_filtered}, \
|
||||
rg_inverted_filtered={rg_inverted_filtered}, \
|
||||
rg_minmax_filtered={rg_minmax_filtered}, \
|
||||
rg_bloom_filtered={rg_bloom_filtered}, \
|
||||
rows_before_filter={rows_before_filter}, \
|
||||
rows_fulltext_filtered={rows_fulltext_filtered}, \
|
||||
rows_inverted_filtered={rows_inverted_filtered}, \
|
||||
rows_bloom_filtered={rows_bloom_filtered}, \
|
||||
rows_precise_filtered={rows_precise_filtered}, \
|
||||
num_sst_record_batches={num_sst_record_batches}, \
|
||||
num_sst_batches={num_sst_batches}, \
|
||||
num_sst_rows={num_sst_rows}, \
|
||||
first_poll={first_poll:?}, \
|
||||
num_series_send_timeout={num_series_send_timeout}, \
|
||||
num_distributor_rows={num_distributor_rows}, \
|
||||
num_distributor_batches={num_distributor_batches}, \
|
||||
distributor_scan_cost={distributor_scan_cost:?}, \
|
||||
distributor_yield_cost={distributor_yield_cost:?}}},"
|
||||
"{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
|
||||
\"build_reader_cost\":\"{build_reader_cost:?}\", \
|
||||
\"scan_cost\":\"{scan_cost:?}\", \
|
||||
\"convert_cost\":\"{convert_cost:?}\", \
|
||||
\"yield_cost\":\"{yield_cost:?}\", \
|
||||
\"total_cost\":\"{total_cost:?}\", \
|
||||
\"num_rows\":{num_rows}, \
|
||||
\"num_batches\":{num_batches}, \
|
||||
\"num_mem_ranges\":{num_mem_ranges}, \
|
||||
\"num_file_ranges\":{num_file_ranges}, \
|
||||
\"build_parts_cost\":\"{build_parts_cost:?}\", \
|
||||
\"rg_total\":{rg_total}, \
|
||||
\"rg_fulltext_filtered\":{rg_fulltext_filtered}, \
|
||||
\"rg_inverted_filtered\":{rg_inverted_filtered}, \
|
||||
\"rg_minmax_filtered\":{rg_minmax_filtered}, \
|
||||
\"rg_bloom_filtered\":{rg_bloom_filtered}, \
|
||||
\"rows_before_filter\":{rows_before_filter}, \
|
||||
\"rows_fulltext_filtered\":{rows_fulltext_filtered}, \
|
||||
\"rows_inverted_filtered\":{rows_inverted_filtered}, \
|
||||
\"rows_bloom_filtered\":{rows_bloom_filtered}, \
|
||||
\"rows_precise_filtered\":{rows_precise_filtered}, \
|
||||
\"num_sst_record_batches\":{num_sst_record_batches}, \
|
||||
\"num_sst_batches\":{num_sst_batches}, \
|
||||
\"num_sst_rows\":{num_sst_rows}, \
|
||||
\"first_poll\":\"{first_poll:?}\", \
|
||||
\"num_series_send_timeout\":{num_series_send_timeout}, \
|
||||
\"num_distributor_rows\":{num_distributor_rows}, \
|
||||
\"num_distributor_batches\":{num_distributor_batches}, \
|
||||
\"distributor_scan_cost\":\"{distributor_scan_cost:?}\", \
|
||||
\"distributor_yield_cost\":\"{distributor_yield_cost:?}\"}}"
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -390,10 +390,11 @@ impl PartitionMetricsList {
|
||||
/// Format verbose metrics for each partition for explain.
|
||||
pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
let list = self.0.lock().unwrap();
|
||||
write!(f, ", metrics_per_partition: ")?;
|
||||
write!(f, ", \"metrics_per_partition\": ")?;
|
||||
f.debug_list()
|
||||
.entries(list.iter().filter_map(|p| p.as_ref()))
|
||||
.finish()
|
||||
.finish()?;
|
||||
write!(f, "}}")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -488,7 +489,11 @@ impl PartitionMetrics {
|
||||
impl fmt::Debug for PartitionMetrics {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
let metrics = self.0.metrics.lock().unwrap();
|
||||
write!(f, "[partition={}, {:?}]", self.0.partition, metrics)
|
||||
write!(
|
||||
f,
|
||||
r#"{{"partition":{}, "metrics":{:?}}}"#,
|
||||
self.0.partition, metrics
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -282,14 +282,15 @@ mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::SemanticType;
|
||||
use common_function::function_registry::FUNCTION_REGISTRY;
|
||||
use common_function::scalars::udf::create_udf;
|
||||
use common_function::function::FunctionRef;
|
||||
use common_function::function_factory::ScalarFunctionFactory;
|
||||
use common_function::scalars::matches::MatchesFunction;
|
||||
use common_function::scalars::matches_term::MatchesTermFunction;
|
||||
use datafusion::functions::string::lower;
|
||||
use datafusion_common::Column;
|
||||
use datafusion_expr::expr::ScalarFunction;
|
||||
use datafusion_expr::ScalarUDF;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use session::context::QueryContext;
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
@@ -317,19 +318,17 @@ mod tests {
|
||||
}
|
||||
|
||||
fn matches_func() -> Arc<ScalarUDF> {
|
||||
Arc::new(create_udf(
|
||||
FUNCTION_REGISTRY.get_function("matches").unwrap(),
|
||||
QueryContext::arc(),
|
||||
Default::default(),
|
||||
))
|
||||
Arc::new(
|
||||
ScalarFunctionFactory::from(Arc::new(MatchesFunction) as FunctionRef)
|
||||
.provide(Default::default()),
|
||||
)
|
||||
}
|
||||
|
||||
fn matches_term_func() -> Arc<ScalarUDF> {
|
||||
Arc::new(create_udf(
|
||||
FUNCTION_REGISTRY.get_function("matches_term").unwrap(),
|
||||
QueryContext::arc(),
|
||||
Default::default(),
|
||||
))
|
||||
Arc::new(
|
||||
ScalarFunctionFactory::from(Arc::new(MatchesTermFunction) as FunctionRef)
|
||||
.provide(Default::default()),
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -12,18 +12,28 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
|
||||
use ahash::{HashMap, HashMapExt};
|
||||
use api::v1::flow::{DirtyWindowRequest, WindowRange};
|
||||
use api::v1::region::{
|
||||
bulk_insert_request, region_request, BulkInsertRequest, RegionRequest, RegionRequestHeader,
|
||||
};
|
||||
use api::v1::ArrowIpc;
|
||||
use arrow::array::{
|
||||
Array, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
|
||||
TimestampSecondArray,
|
||||
};
|
||||
use arrow::datatypes::{DataType, Int64Type, TimeUnit};
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use common_base::AffectedRows;
|
||||
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
|
||||
use common_grpc::FlightData;
|
||||
use common_telemetry::error;
|
||||
use common_telemetry::tracing_context::TracingContext;
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::RegionId;
|
||||
use table::metadata::TableId;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::{RegionId, TableId};
|
||||
use table::TableRef;
|
||||
|
||||
use crate::insert::Inserter;
|
||||
use crate::{error, metrics};
|
||||
@@ -32,10 +42,11 @@ impl Inserter {
|
||||
/// Handle bulk insert request.
|
||||
pub async fn handle_bulk_insert(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
table: TableRef,
|
||||
decoder: &mut FlightDecoder,
|
||||
data: FlightData,
|
||||
) -> error::Result<AffectedRows> {
|
||||
let table_id = table.table_info().table_id();
|
||||
let decode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
|
||||
.with_label_values(&["decode_request"])
|
||||
.start_timer();
|
||||
@@ -48,6 +59,20 @@ impl Inserter {
|
||||
return Ok(0);
|
||||
};
|
||||
decode_timer.observe_duration();
|
||||
if let Some((min, max)) = compute_timestamp_range(
|
||||
&record_batch,
|
||||
&table
|
||||
.table_info()
|
||||
.meta
|
||||
.schema
|
||||
.timestamp_column()
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.name,
|
||||
)? {
|
||||
// notify flownode to update dirty time windows.
|
||||
self.update_flow_dirty_window(table_id, min, max);
|
||||
}
|
||||
metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64);
|
||||
metrics::BULK_REQUEST_ROWS
|
||||
.with_label_values(&["raw"])
|
||||
@@ -216,4 +241,88 @@ impl Inserter {
|
||||
crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(rows_inserted as u64);
|
||||
Ok(rows_inserted)
|
||||
}
|
||||
|
||||
fn update_flow_dirty_window(&self, table_id: TableId, min: i64, max: i64) {
|
||||
let table_flownode_set_cache = self.table_flownode_set_cache.clone();
|
||||
let node_manager = self.node_manager.clone();
|
||||
common_runtime::spawn_global(async move {
|
||||
let result = table_flownode_set_cache
|
||||
.get(table_id)
|
||||
.await
|
||||
.context(error::RequestInsertsSnafu);
|
||||
let flownodes = match result {
|
||||
Ok(flownodes) => flownodes.unwrap_or_default(),
|
||||
Err(e) => {
|
||||
error!(e; "Failed to get flownodes for table id: {}", table_id);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let peers: HashSet<_> = flownodes.values().cloned().collect();
|
||||
for peer in peers {
|
||||
let node_manager = node_manager.clone();
|
||||
common_runtime::spawn_global(async move {
|
||||
if let Err(e) = node_manager
|
||||
.flownode(&peer)
|
||||
.await
|
||||
.handle_mark_window_dirty(DirtyWindowRequest {
|
||||
table_id,
|
||||
dirty_time_ranges: vec![WindowRange {
|
||||
start_value: min,
|
||||
end_value: max,
|
||||
}],
|
||||
})
|
||||
.await
|
||||
.context(error::RequestInsertsSnafu)
|
||||
{
|
||||
error!(e; "Failed to mark time window as dirty, table: {}, min: {}, max: {}", table_id, min, max);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Calculate the timestamp range of record batch. Return `None` if record batch is empty.
|
||||
fn compute_timestamp_range(
|
||||
rb: &RecordBatch,
|
||||
timestamp_index_name: &str,
|
||||
) -> error::Result<Option<(i64, i64)>> {
|
||||
let ts_col = rb
|
||||
.column_by_name(timestamp_index_name)
|
||||
.context(error::ColumnNotFoundSnafu {
|
||||
msg: timestamp_index_name,
|
||||
})?;
|
||||
if rb.num_rows() == 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
let primitive = match ts_col.data_type() {
|
||||
DataType::Timestamp(unit, _) => match unit {
|
||||
TimeUnit::Second => ts_col
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampSecondArray>()
|
||||
.unwrap()
|
||||
.reinterpret_cast::<Int64Type>(),
|
||||
TimeUnit::Millisecond => ts_col
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondArray>()
|
||||
.unwrap()
|
||||
.reinterpret_cast::<Int64Type>(),
|
||||
TimeUnit::Microsecond => ts_col
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMicrosecondArray>()
|
||||
.unwrap()
|
||||
.reinterpret_cast::<Int64Type>(),
|
||||
TimeUnit::Nanosecond => ts_col
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampNanosecondArray>()
|
||||
.unwrap()
|
||||
.reinterpret_cast::<Int64Type>(),
|
||||
},
|
||||
t => {
|
||||
return error::InvalidTimeIndexTypeSnafu { ty: t.clone() }.fail();
|
||||
}
|
||||
};
|
||||
|
||||
Ok(arrow::compute::min(&primitive).zip(arrow::compute::max(&primitive)))
|
||||
}
|
||||
|
||||
@@ -837,6 +837,13 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid time index type: {}", ty))]
|
||||
InvalidTimeIndexType {
|
||||
ty: arrow::datatypes::DataType,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -964,6 +971,7 @@ impl ErrorExt for Error {
|
||||
Error::ColumnOptions { source, .. } => source.status_code(),
|
||||
Error::DecodeFlightData { source, .. } => source.status_code(),
|
||||
Error::ComputeArrow { .. } => StatusCode::Internal,
|
||||
Error::InvalidTimeIndexType { .. } => StatusCode::InvalidArguments,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -78,7 +78,7 @@ pub struct Inserter {
|
||||
catalog_manager: CatalogManagerRef,
|
||||
pub(crate) partition_manager: PartitionRuleManagerRef,
|
||||
pub(crate) node_manager: NodeManagerRef,
|
||||
table_flownode_set_cache: TableFlownodeSetCacheRef,
|
||||
pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef,
|
||||
}
|
||||
|
||||
pub type InserterRef = Arc<Inserter>;
|
||||
|
||||
@@ -13,20 +13,17 @@
|
||||
// limitations under the License.
|
||||
|
||||
use clap::Parser;
|
||||
use cli::{
|
||||
BenchTableMetadataCommand, ExportCommand, ImportCommand, MetaRestoreCommand,
|
||||
MetaSnapshotCommand, Tool,
|
||||
};
|
||||
use cli::{BenchTableMetadataCommand, DataCommand, MetaCommand, Tool};
|
||||
use common_error::ext::BoxedError;
|
||||
|
||||
#[derive(Parser)]
|
||||
pub enum SubCommand {
|
||||
// Attach(AttachCommand),
|
||||
Bench(BenchTableMetadataCommand),
|
||||
Export(ExportCommand),
|
||||
Import(ImportCommand),
|
||||
MetaSnapshot(MetaSnapshotCommand),
|
||||
MetaRestore(MetaRestoreCommand),
|
||||
#[clap(subcommand)]
|
||||
Data(DataCommand),
|
||||
#[clap(subcommand)]
|
||||
Meta(MetaCommand),
|
||||
}
|
||||
|
||||
impl SubCommand {
|
||||
@@ -34,10 +31,8 @@ impl SubCommand {
|
||||
match self {
|
||||
// SubCommand::Attach(cmd) => cmd.build().await,
|
||||
SubCommand::Bench(cmd) => cmd.build().await,
|
||||
SubCommand::Export(cmd) => cmd.build().await,
|
||||
SubCommand::Import(cmd) => cmd.build().await,
|
||||
SubCommand::MetaSnapshot(cmd) => cmd.build().await,
|
||||
SubCommand::MetaRestore(cmd) => cmd.build().await,
|
||||
SubCommand::Data(cmd) => cmd.build().await,
|
||||
SubCommand::Meta(cmd) => cmd.build().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,8 +25,7 @@ use async_trait::async_trait;
|
||||
use common_base::Plugins;
|
||||
use common_catalog::consts::is_readonly_schema;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_function::function::FunctionRef;
|
||||
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
|
||||
use common_function::function_factory::ScalarFunctionFactory;
|
||||
use common_query::{Output, OutputData, OutputMeta};
|
||||
use common_recordbatch::adapter::RecordBatchStreamAdapter;
|
||||
use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
|
||||
@@ -35,7 +34,9 @@ use datafusion::physical_plan::analyze::AnalyzeExec;
|
||||
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
|
||||
use datafusion::physical_plan::ExecutionPlan;
|
||||
use datafusion_common::ResolvedTableReference;
|
||||
use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, LogicalPlan, WriteOp};
|
||||
use datafusion_expr::{
|
||||
AggregateUDF, DmlStatement, LogicalPlan as DfLogicalPlan, LogicalPlan, WriteOp,
|
||||
};
|
||||
use datatypes::prelude::VectorRef;
|
||||
use datatypes::schema::Schema;
|
||||
use futures_util::StreamExt;
|
||||
@@ -454,14 +455,14 @@ impl QueryEngine for DatafusionQueryEngine {
|
||||
/// `SELECT "my_UDAF"(x)` will look for an aggregate named `"my_UDAF"`
|
||||
///
|
||||
/// So it's better to make UDAF name lowercase when creating one.
|
||||
fn register_aggregate_function(&self, func: AggregateFunctionMetaRef) {
|
||||
self.state.register_aggregate_function(func);
|
||||
fn register_aggregate_function(&self, func: AggregateUDF) {
|
||||
self.state.register_aggr_function(func);
|
||||
}
|
||||
|
||||
/// Register an UDF function.
|
||||
/// Register an scalar function.
|
||||
/// Will override if the function with same name is already registered.
|
||||
fn register_function(&self, func: FunctionRef) {
|
||||
self.state.register_function(func);
|
||||
fn register_scalar_function(&self, func: ScalarFunctionFactory) {
|
||||
self.state.register_scalar_function(func);
|
||||
}
|
||||
|
||||
fn read_table(&self, table: TableRef) -> Result<DataFrame> {
|
||||
@@ -477,6 +478,21 @@ impl QueryEngine for DatafusionQueryEngine {
|
||||
fn engine_context(&self, query_ctx: QueryContextRef) -> QueryEngineContext {
|
||||
let mut state = self.state.session_state();
|
||||
state.config_mut().set_extension(query_ctx.clone());
|
||||
// note that hints in "x-greptime-hints" is automatically parsed
|
||||
// and set to query context's extension, so we can get it from query context.
|
||||
if let Some(parallelism) = query_ctx.extension("query_parallelism") {
|
||||
if let Ok(n) = parallelism.parse::<u64>() {
|
||||
if n > 0 {
|
||||
let new_cfg = state.config().clone().with_target_partitions(n as usize);
|
||||
*state.config_mut() = new_cfg;
|
||||
}
|
||||
} else {
|
||||
common_telemetry::warn!(
|
||||
"Failed to parse query_parallelism: {}, using default value",
|
||||
parallelism
|
||||
);
|
||||
}
|
||||
}
|
||||
QueryEngineContext::new(state, query_ctx)
|
||||
}
|
||||
|
||||
|
||||
@@ -18,12 +18,7 @@ use std::sync::Arc;
|
||||
|
||||
use arrow_schema::DataType;
|
||||
use catalog::table_source::DfTableSourceProvider;
|
||||
use common_function::aggr::{
|
||||
GeoPathAccumulator, HllState, UddSketchState, GEO_PATH_NAME, HLL_MERGE_NAME, HLL_NAME,
|
||||
UDDSKETCH_MERGE_NAME, UDDSKETCH_STATE_NAME,
|
||||
};
|
||||
use common_function::scalars::udf::create_udf;
|
||||
use common_query::logical_plan::create_aggregate_function;
|
||||
use common_function::function::FunctionContext;
|
||||
use datafusion::common::TableReference;
|
||||
use datafusion::datasource::cte_worktable::CteWorkTable;
|
||||
use datafusion::datasource::file_format::{format_as_file_type, FileFormatFactory};
|
||||
@@ -151,38 +146,21 @@ impl ContextProvider for DfContextProviderAdapter {
|
||||
}
|
||||
|
||||
fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
|
||||
self.engine_state.udf_function(name).map_or_else(
|
||||
self.engine_state.scalar_function(name).map_or_else(
|
||||
|| self.session_state.scalar_functions().get(name).cloned(),
|
||||
|func| {
|
||||
Some(Arc::new(create_udf(
|
||||
func,
|
||||
self.query_ctx.clone(),
|
||||
self.engine_state.function_state(),
|
||||
)))
|
||||
Some(Arc::new(func.provide(FunctionContext {
|
||||
query_ctx: self.query_ctx.clone(),
|
||||
state: self.engine_state.function_state(),
|
||||
})))
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
|
||||
if name == UDDSKETCH_STATE_NAME {
|
||||
return Some(Arc::new(UddSketchState::state_udf_impl()));
|
||||
} else if name == UDDSKETCH_MERGE_NAME {
|
||||
return Some(Arc::new(UddSketchState::merge_udf_impl()));
|
||||
} else if name == HLL_NAME {
|
||||
return Some(Arc::new(HllState::state_udf_impl()));
|
||||
} else if name == HLL_MERGE_NAME {
|
||||
return Some(Arc::new(HllState::merge_udf_impl()));
|
||||
} else if name == GEO_PATH_NAME {
|
||||
return Some(Arc::new(GeoPathAccumulator::udf_impl()));
|
||||
}
|
||||
|
||||
self.engine_state.aggregate_function(name).map_or_else(
|
||||
self.engine_state.aggr_function(name).map_or_else(
|
||||
|| self.session_state.aggregate_functions().get(name).cloned(),
|
||||
|func| {
|
||||
Some(Arc::new(
|
||||
create_aggregate_function(func.name(), func.args_count(), func.create()).into(),
|
||||
))
|
||||
},
|
||||
|func| Some(Arc::new(func)),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -213,13 +191,13 @@ impl ContextProvider for DfContextProviderAdapter {
|
||||
}
|
||||
|
||||
fn udf_names(&self) -> Vec<String> {
|
||||
let mut names = self.engine_state.udf_names();
|
||||
let mut names = self.engine_state.scalar_names();
|
||||
names.extend(self.session_state.scalar_functions().keys().cloned());
|
||||
names
|
||||
}
|
||||
|
||||
fn udaf_names(&self) -> Vec<String> {
|
||||
let mut names = self.engine_state.udaf_names();
|
||||
let mut names = self.engine_state.aggr_names();
|
||||
names.extend(self.session_state.aggregate_functions().keys().cloned());
|
||||
names
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::debug;
|
||||
use datafusion::datasource::DefaultTableSource;
|
||||
use datafusion::error::Result as DfResult;
|
||||
use datafusion_common::config::ConfigOptions;
|
||||
@@ -148,12 +149,13 @@ struct PlanRewriter {
|
||||
level: usize,
|
||||
/// Simulated stack for the `rewrite` recursion
|
||||
stack: Vec<(LogicalPlan, usize)>,
|
||||
/// Stages to be expanded
|
||||
/// Stages to be expanded, will be added as parent node of merge scan one by one
|
||||
stage: Vec<LogicalPlan>,
|
||||
status: RewriterStatus,
|
||||
/// Partition columns of the table in current pass
|
||||
partition_cols: Option<Vec<String>>,
|
||||
column_requirements: HashSet<Column>,
|
||||
expand_on_next_call: bool,
|
||||
}
|
||||
|
||||
impl PlanRewriter {
|
||||
@@ -174,6 +176,10 @@ impl PlanRewriter {
|
||||
{
|
||||
return true;
|
||||
}
|
||||
if self.expand_on_next_call {
|
||||
self.expand_on_next_call = false;
|
||||
return true;
|
||||
}
|
||||
match Categorizer::check_plan(plan, self.partition_cols.clone()) {
|
||||
Commutativity::Commutative => {}
|
||||
Commutativity::PartialCommutative => {
|
||||
@@ -190,12 +196,20 @@ impl PlanRewriter {
|
||||
self.stage.push(plan)
|
||||
}
|
||||
}
|
||||
Commutativity::TransformedCommutative(transformer) => {
|
||||
Commutativity::TransformedCommutative {
|
||||
transformer,
|
||||
expand_on_parent,
|
||||
} => {
|
||||
if let Some(transformer) = transformer
|
||||
&& let Some(plan) = transformer(plan)
|
||||
&& let Some(changed_plan) = transformer(plan)
|
||||
{
|
||||
self.update_column_requirements(&plan);
|
||||
self.stage.push(plan)
|
||||
debug!("PlanRewriter: transformed plan: {changed_plan:#?} from {plan}");
|
||||
if let Some(last_stage) = changed_plan.last() {
|
||||
// update the column requirements from the last stage
|
||||
self.update_column_requirements(last_stage);
|
||||
}
|
||||
self.stage.extend(changed_plan.into_iter().rev());
|
||||
self.expand_on_next_call = expand_on_parent;
|
||||
}
|
||||
}
|
||||
Commutativity::NonCommutative
|
||||
@@ -391,10 +405,21 @@ impl TreeNodeRewriter for PlanRewriter {
|
||||
return Ok(Transformed::yes(node));
|
||||
};
|
||||
|
||||
let parent = parent.clone();
|
||||
|
||||
// TODO(ruihang): avoid this clone
|
||||
if self.should_expand(&parent.clone()) {
|
||||
if self.should_expand(&parent) {
|
||||
// TODO(ruihang): does this work for nodes with multiple children?;
|
||||
let node = self.expand(node)?;
|
||||
debug!("PlanRewriter: should expand child:\n {node}\n Of Parent: {parent}");
|
||||
let node = self.expand(node);
|
||||
debug!(
|
||||
"PlanRewriter: expanded plan: {}",
|
||||
match &node {
|
||||
Ok(n) => n.to_string(),
|
||||
Err(e) => format!("Error expanding plan: {e}"),
|
||||
}
|
||||
);
|
||||
let node = node?;
|
||||
self.pop_stack();
|
||||
return Ok(Transformed::yes(node));
|
||||
}
|
||||
|
||||
@@ -15,7 +15,12 @@
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNode};
|
||||
use common_function::aggrs::approximate::hll::{HllState, HLL_NAME};
|
||||
use common_function::aggrs::approximate::uddsketch::{UddSketchState, UDDSKETCH_STATE_NAME};
|
||||
use common_telemetry::debug;
|
||||
use datafusion::functions_aggregate::sum::sum_udaf;
|
||||
use datafusion_common::Column;
|
||||
use datafusion_expr::{Expr, LogicalPlan, Projection, UserDefinedLogicalNode};
|
||||
use promql::extension_plan::{
|
||||
EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize,
|
||||
};
|
||||
@@ -23,12 +28,190 @@ use promql::extension_plan::{
|
||||
use crate::dist_plan::merge_sort::{merge_sort_transformer, MergeSortLogicalPlan};
|
||||
use crate::dist_plan::MergeScanLogicalPlan;
|
||||
|
||||
/// generate the upper aggregation plan that will execute on the frontend.
|
||||
/// Basically a logical plan resembling the following:
|
||||
/// Projection:
|
||||
/// Aggregate:
|
||||
///
|
||||
/// from Aggregate
|
||||
///
|
||||
/// The upper Projection exists sole to make sure parent plan can recognize the output
|
||||
/// of the upper aggregation plan.
|
||||
pub fn step_aggr_to_upper_aggr(
|
||||
aggr_plan: &LogicalPlan,
|
||||
) -> datafusion_common::Result<Vec<LogicalPlan>> {
|
||||
let LogicalPlan::Aggregate(input_aggr) = aggr_plan else {
|
||||
return Err(datafusion_common::DataFusionError::Plan(
|
||||
"step_aggr_to_upper_aggr only accepts Aggregate plan".to_string(),
|
||||
));
|
||||
};
|
||||
if !is_all_aggr_exprs_steppable(&input_aggr.aggr_expr) {
|
||||
return Err(datafusion_common::DataFusionError::NotImplemented(
|
||||
"Some aggregate expressions are not steppable".to_string(),
|
||||
));
|
||||
}
|
||||
let mut upper_aggr_expr = vec![];
|
||||
for aggr_expr in &input_aggr.aggr_expr {
|
||||
let Some(aggr_func) = get_aggr_func(aggr_expr) else {
|
||||
return Err(datafusion_common::DataFusionError::NotImplemented(
|
||||
"Aggregate function not found".to_string(),
|
||||
));
|
||||
};
|
||||
let col_name = aggr_expr.qualified_name();
|
||||
let input_column = Expr::Column(datafusion_common::Column::new(col_name.0, col_name.1));
|
||||
let upper_func = match aggr_func.func.name() {
|
||||
"sum" | "min" | "max" | "last_value" | "first_value" => {
|
||||
// aggr_calc(aggr_merge(input_column))) as col_name
|
||||
let mut new_aggr_func = aggr_func.clone();
|
||||
new_aggr_func.args = vec![input_column.clone()];
|
||||
new_aggr_func
|
||||
}
|
||||
"count" => {
|
||||
// sum(input_column) as col_name
|
||||
let mut new_aggr_func = aggr_func.clone();
|
||||
new_aggr_func.func = sum_udaf();
|
||||
new_aggr_func.args = vec![input_column.clone()];
|
||||
new_aggr_func
|
||||
}
|
||||
UDDSKETCH_STATE_NAME => {
|
||||
// udd_merge(bucket_size, error_rate input_column) as col_name
|
||||
let mut new_aggr_func = aggr_func.clone();
|
||||
new_aggr_func.func = Arc::new(UddSketchState::merge_udf_impl());
|
||||
new_aggr_func.args[2] = input_column.clone();
|
||||
new_aggr_func
|
||||
}
|
||||
HLL_NAME => {
|
||||
// hll_merge(input_column) as col_name
|
||||
let mut new_aggr_func = aggr_func.clone();
|
||||
new_aggr_func.func = Arc::new(HllState::merge_udf_impl());
|
||||
new_aggr_func.args = vec![input_column.clone()];
|
||||
new_aggr_func
|
||||
}
|
||||
_ => {
|
||||
return Err(datafusion_common::DataFusionError::NotImplemented(format!(
|
||||
"Aggregate function {} is not supported for Step aggregation",
|
||||
aggr_func.func.name()
|
||||
)))
|
||||
}
|
||||
};
|
||||
|
||||
// deal with nested alias case
|
||||
let mut new_aggr_expr = aggr_expr.clone();
|
||||
{
|
||||
let new_aggr_func = get_aggr_func_mut(&mut new_aggr_expr).unwrap();
|
||||
*new_aggr_func = upper_func;
|
||||
}
|
||||
|
||||
upper_aggr_expr.push(new_aggr_expr);
|
||||
}
|
||||
let mut new_aggr = input_aggr.clone();
|
||||
// use lower aggregate plan as input, this will be replace by merge scan plan later
|
||||
new_aggr.input = Arc::new(LogicalPlan::Aggregate(input_aggr.clone()));
|
||||
|
||||
new_aggr.aggr_expr = upper_aggr_expr;
|
||||
|
||||
// group by expr also need to be all ref by column to avoid duplicated computing
|
||||
let mut new_group_expr = new_aggr.group_expr.clone();
|
||||
for expr in &mut new_group_expr {
|
||||
if let Expr::Column(_) = expr {
|
||||
// already a column, no need to change
|
||||
continue;
|
||||
}
|
||||
let col_name = expr.qualified_name();
|
||||
let input_column = Expr::Column(datafusion_common::Column::new(col_name.0, col_name.1));
|
||||
*expr = input_column;
|
||||
}
|
||||
new_aggr.group_expr = new_group_expr.clone();
|
||||
|
||||
let mut new_projection_exprs = new_group_expr;
|
||||
// the upper aggr expr need to be aliased to the input aggr expr's name,
|
||||
// so that the parent plan can recognize it.
|
||||
for (lower_aggr_expr, upper_aggr_expr) in
|
||||
input_aggr.aggr_expr.iter().zip(new_aggr.aggr_expr.iter())
|
||||
{
|
||||
let lower_col_name = lower_aggr_expr.qualified_name();
|
||||
let (table, col_name) = upper_aggr_expr.qualified_name();
|
||||
let aggr_out_column = Column::new(table, col_name);
|
||||
let aliased_output_aggr_expr =
|
||||
Expr::Column(aggr_out_column).alias_qualified(lower_col_name.0, lower_col_name.1);
|
||||
new_projection_exprs.push(aliased_output_aggr_expr);
|
||||
}
|
||||
let upper_aggr_plan = LogicalPlan::Aggregate(new_aggr);
|
||||
debug!("Before recompute schema: {upper_aggr_plan:?}");
|
||||
let upper_aggr_plan = upper_aggr_plan.recompute_schema()?;
|
||||
debug!("After recompute schema: {upper_aggr_plan:?}");
|
||||
// create a projection on top of the new aggregate plan
|
||||
let new_projection =
|
||||
Projection::try_new(new_projection_exprs, Arc::new(upper_aggr_plan.clone()))?;
|
||||
let projection = LogicalPlan::Projection(new_projection);
|
||||
// return the new logical plan
|
||||
Ok(vec![projection, upper_aggr_plan])
|
||||
}
|
||||
|
||||
/// Check if the given aggregate expression is steppable.
|
||||
/// As in if it can be split into multiple steps:
|
||||
/// i.e. on datanode first call `state(input)` then
|
||||
/// on frontend call `calc(merge(state))` to get the final result.
|
||||
///
|
||||
pub fn is_all_aggr_exprs_steppable(aggr_exprs: &[Expr]) -> bool {
|
||||
let step_action = HashSet::from([
|
||||
"sum",
|
||||
"count",
|
||||
"min",
|
||||
"max",
|
||||
"first_value",
|
||||
"last_value",
|
||||
UDDSKETCH_STATE_NAME,
|
||||
HLL_NAME,
|
||||
]);
|
||||
aggr_exprs.iter().all(|expr| {
|
||||
if let Some(aggr_func) = get_aggr_func(expr) {
|
||||
if aggr_func.distinct {
|
||||
// Distinct aggregate functions are not steppable(yet).
|
||||
return false;
|
||||
}
|
||||
step_action.contains(aggr_func.func.name())
|
||||
} else {
|
||||
false
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_aggr_func(expr: &Expr) -> Option<&datafusion_expr::expr::AggregateFunction> {
|
||||
let mut expr_ref = expr;
|
||||
while let Expr::Alias(alias) = expr_ref {
|
||||
expr_ref = &alias.expr;
|
||||
}
|
||||
if let Expr::AggregateFunction(aggr_func) = expr_ref {
|
||||
Some(aggr_func)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_aggr_func_mut(expr: &mut Expr) -> Option<&mut datafusion_expr::expr::AggregateFunction> {
|
||||
let mut expr_ref = expr;
|
||||
while let Expr::Alias(alias) = expr_ref {
|
||||
expr_ref = &mut alias.expr;
|
||||
}
|
||||
if let Expr::AggregateFunction(aggr_func) = expr_ref {
|
||||
Some(aggr_func)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub enum Commutativity {
|
||||
Commutative,
|
||||
PartialCommutative,
|
||||
ConditionalCommutative(Option<Transformer>),
|
||||
TransformedCommutative(Option<Transformer>),
|
||||
TransformedCommutative {
|
||||
/// Return plans from parent to child order
|
||||
transformer: Option<StageTransformer>,
|
||||
/// whether the transformer changes the child to parent
|
||||
expand_on_parent: bool,
|
||||
},
|
||||
NonCommutative,
|
||||
Unimplemented,
|
||||
/// For unrelated plans like DDL
|
||||
@@ -55,7 +238,21 @@ impl Categorizer {
|
||||
LogicalPlan::Filter(filter) => Self::check_expr(&filter.predicate),
|
||||
LogicalPlan::Window(_) => Commutativity::Unimplemented,
|
||||
LogicalPlan::Aggregate(aggr) => {
|
||||
if !Self::check_partition(&aggr.group_expr, &partition_cols) {
|
||||
let is_all_steppable = is_all_aggr_exprs_steppable(&aggr.aggr_expr);
|
||||
let matches_partition = Self::check_partition(&aggr.group_expr, &partition_cols);
|
||||
if !matches_partition && is_all_steppable {
|
||||
debug!("Plan is steppable: {plan}");
|
||||
return Commutativity::TransformedCommutative {
|
||||
transformer: Some(Arc::new(|plan: &LogicalPlan| {
|
||||
debug!("Before Step optimize: {plan}");
|
||||
let ret = step_aggr_to_upper_aggr(plan);
|
||||
debug!("After Step Optimize: {ret:?}");
|
||||
ret.ok()
|
||||
})),
|
||||
expand_on_parent: true,
|
||||
};
|
||||
}
|
||||
if !matches_partition {
|
||||
return Commutativity::NonCommutative;
|
||||
}
|
||||
for expr in &aggr.aggr_expr {
|
||||
@@ -169,7 +366,8 @@ impl Categorizer {
|
||||
| Expr::Negative(_)
|
||||
| Expr::Between(_)
|
||||
| Expr::Exists(_)
|
||||
| Expr::InList(_) => Commutativity::Commutative,
|
||||
| Expr::InList(_)
|
||||
| Expr::Case(_) => Commutativity::Commutative,
|
||||
Expr::ScalarFunction(_udf) => Commutativity::Commutative,
|
||||
Expr::AggregateFunction(_udaf) => Commutativity::Commutative,
|
||||
|
||||
@@ -177,7 +375,6 @@ impl Categorizer {
|
||||
| Expr::SimilarTo(_)
|
||||
| Expr::IsUnknown(_)
|
||||
| Expr::IsNotUnknown(_)
|
||||
| Expr::Case(_)
|
||||
| Expr::Cast(_)
|
||||
| Expr::TryCast(_)
|
||||
| Expr::WindowFunction(_)
|
||||
@@ -217,6 +414,8 @@ impl Categorizer {
|
||||
|
||||
pub type Transformer = Arc<dyn Fn(&LogicalPlan) -> Option<LogicalPlan>>;
|
||||
|
||||
pub type StageTransformer = Arc<dyn Fn(&LogicalPlan) -> Option<Vec<LogicalPlan>>>;
|
||||
|
||||
pub fn partial_commutative_transformer(plan: &LogicalPlan) -> Option<LogicalPlan> {
|
||||
Some(plan.clone())
|
||||
}
|
||||
|
||||
@@ -2444,7 +2444,7 @@ impl PromPlanner {
|
||||
LogicalPlanBuilder::from(left)
|
||||
.alias(left_table_ref)
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.join(
|
||||
.join_detailed(
|
||||
right,
|
||||
JoinType::Inner,
|
||||
(
|
||||
@@ -2458,6 +2458,7 @@ impl PromPlanner {
|
||||
.collect::<Vec<_>>(),
|
||||
),
|
||||
None,
|
||||
true,
|
||||
)
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.build()
|
||||
|
||||
@@ -22,14 +22,13 @@ use std::sync::Arc;
|
||||
use async_trait::async_trait;
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_base::Plugins;
|
||||
use common_function::function::FunctionRef;
|
||||
use common_function::function_factory::ScalarFunctionFactory;
|
||||
use common_function::function_registry::FUNCTION_REGISTRY;
|
||||
use common_function::handlers::{
|
||||
FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
|
||||
};
|
||||
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
|
||||
use common_query::Output;
|
||||
use datafusion_expr::LogicalPlan;
|
||||
use datafusion_expr::{AggregateUDF, LogicalPlan};
|
||||
use datatypes::schema::Schema;
|
||||
pub use default_serializer::{DefaultPlanDecoder, DefaultSerializer};
|
||||
use session::context::QueryContextRef;
|
||||
@@ -79,11 +78,11 @@ pub trait QueryEngine: Send + Sync {
|
||||
///
|
||||
/// # Panics
|
||||
/// Will panic if the function with same name is already registered.
|
||||
fn register_aggregate_function(&self, func: AggregateFunctionMetaRef);
|
||||
fn register_aggregate_function(&self, func: AggregateUDF);
|
||||
|
||||
/// Register a SQL function.
|
||||
/// Register a scalar function.
|
||||
/// Will override if the function with same name is already registered.
|
||||
fn register_function(&self, func: FunctionRef);
|
||||
fn register_scalar_function(&self, func: ScalarFunctionFactory);
|
||||
|
||||
/// Create a DataFrame from a table.
|
||||
fn read_table(&self, table: TableRef) -> Result<DataFrame>;
|
||||
@@ -154,8 +153,8 @@ impl QueryEngineFactory {
|
||||
|
||||
/// Register all functions implemented by GreptimeDB
|
||||
fn register_functions(query_engine: &Arc<DatafusionQueryEngine>) {
|
||||
for func in FUNCTION_REGISTRY.functions() {
|
||||
query_engine.register_function(func);
|
||||
for func in FUNCTION_REGISTRY.scalar_functions() {
|
||||
query_engine.register_scalar_function(func);
|
||||
}
|
||||
|
||||
for accumulator in FUNCTION_REGISTRY.aggregate_functions() {
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user