Compare commits

..

5 Commits

Author SHA1 Message Date
Lei, HUANG
0c392da638 enable arrow ipc compression 2025-06-10 11:25:22 +08:00
discord9
9722482043 feat: better metrics 2025-06-10 11:01:07 +08:00
discord9
b5c185ed59 feat(exp): adjust_flow admin function 2025-06-10 11:01:07 +08:00
discord9
9df5f94662 feat: flownode to frontend load balance with guess 2025-06-10 11:01:07 +08:00
discord9
838d3ab04e feat: steppable aggr fn
poc: step aggr query

feat: mvp poc stuff

test: sqlness

chore: import missing

feat: support first/last_value

fix: check also include first/last value
2025-06-10 11:01:07 +08:00
174 changed files with 2637 additions and 3176 deletions

View File

@@ -10,13 +10,13 @@ inputs:
meta-replicas:
default: 2
description: "Number of Metasrv replicas"
image-registry:
image-registry:
default: "docker.io"
description: "Image registry"
image-repository:
image-repository:
default: "greptime/greptimedb"
description: "Image repository"
image-tag:
image-tag:
default: "latest"
description: 'Image tag'
etcd-endpoints:
@@ -32,12 +32,12 @@ runs:
steps:
- name: Install GreptimeDB operator
uses: nick-fields/retry@v3
with:
with:
timeout_minutes: 3
max_attempts: 3
shell: bash
command: |
helm repo add greptime https://greptimeteam.github.io/helm-charts/
helm repo add greptime https://greptimeteam.github.io/helm-charts/
helm repo update
helm upgrade \
--install \
@@ -48,10 +48,10 @@ runs:
--wait-for-jobs
- name: Install GreptimeDB cluster
shell: bash
run: |
run: |
helm upgrade \
--install my-greptimedb \
--set meta.backendStorage.etcd.endpoints=${{ inputs.etcd-endpoints }} \
--set meta.etcdEndpoints=${{ inputs.etcd-endpoints }} \
--set meta.enableRegionFailover=${{ inputs.enable-region-failover }} \
--set image.registry=${{ inputs.image-registry }} \
--set image.repository=${{ inputs.image-repository }} \
@@ -72,7 +72,7 @@ runs:
- name: Wait for GreptimeDB
shell: bash
run: |
while true; do
while true; do
PHASE=$(kubectl -n my-greptimedb get gtc my-greptimedb -o jsonpath='{.status.clusterPhase}')
if [ "$PHASE" == "Running" ]; then
echo "Cluster is ready"
@@ -86,10 +86,10 @@ runs:
- name: Print GreptimeDB info
if: always()
shell: bash
run: |
run: |
kubectl get all --show-labels -n my-greptimedb
- name: Describe Nodes
if: always()
shell: bash
run: |
run: |
kubectl describe nodes

View File

@@ -68,7 +68,7 @@ function deploy_greptimedb_cluster() {
helm install "$cluster_name" greptime/greptimedb-cluster \
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
--set meta.etcdEndpoints="etcd.$install_namespace:2379" \
-n "$install_namespace"
# Wait for greptimedb cluster to be ready.
@@ -103,7 +103,7 @@ function deploy_greptimedb_cluster_with_s3_storage() {
helm install "$cluster_name" greptime/greptimedb-cluster -n "$install_namespace" \
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
--set meta.etcdEndpoints="etcd.$install_namespace:2379" \
--set storage.s3.bucket="$AWS_CI_TEST_BUCKET" \
--set storage.s3.region="$AWS_REGION" \
--set storage.s3.root="$DATA_ROOT" \

View File

@@ -30,7 +30,7 @@ update_helm_charts_version() {
# Commit the changes.
git add .
git commit -s -m "chore: Update GreptimeDB version to ${VERSION}"
git commit -m "chore: Update GreptimeDB version to ${VERSION}"
git push origin $BRANCH_NAME
# Create a Pull Request.

View File

@@ -26,7 +26,7 @@ update_homebrew_greptime_version() {
# Commit the changes.
git add .
git commit -s -m "chore: Update GreptimeDB version to ${VERSION}"
git commit -m "chore: Update GreptimeDB version to ${VERSION}"
git push origin $BRANCH_NAME
# Create a Pull Request.

63
Cargo.lock generated
View File

@@ -1623,7 +1623,6 @@ dependencies = [
"chrono",
"common-catalog",
"common-error",
"common-frontend",
"common-macro",
"common-meta",
"common-procedure",
@@ -2294,14 +2293,8 @@ version = "0.15.0"
dependencies = [
"async-trait",
"common-error",
"common-grpc",
"common-macro",
"common-meta",
"greptime-proto",
"meta-client",
"snafu 0.8.5",
"tokio",
"tonic 0.12.3",
]
[[package]]
@@ -3259,7 +3252,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"arrow-array 54.2.1",
@@ -3310,7 +3303,7 @@ dependencies = [
[[package]]
name = "datafusion-catalog"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"async-trait",
@@ -3330,7 +3323,7 @@ dependencies = [
[[package]]
name = "datafusion-catalog-listing"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"arrow-schema 54.3.1",
@@ -3353,7 +3346,7 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3378,7 +3371,7 @@ dependencies = [
[[package]]
name = "datafusion-common-runtime"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"log",
"tokio",
@@ -3387,12 +3380,12 @@ dependencies = [
[[package]]
name = "datafusion-doc"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
[[package]]
name = "datafusion-execution"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"dashmap",
@@ -3410,7 +3403,7 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"chrono",
@@ -3430,7 +3423,7 @@ dependencies = [
[[package]]
name = "datafusion-expr-common"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"datafusion-common",
@@ -3441,7 +3434,7 @@ dependencies = [
[[package]]
name = "datafusion-functions"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"arrow-buffer 54.3.1",
@@ -3470,7 +3463,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-aggregate"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3491,7 +3484,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-aggregate-common"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3503,7 +3496,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-nested"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"arrow-array 54.2.1",
@@ -3525,7 +3518,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-table"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"async-trait",
@@ -3540,7 +3533,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-window"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"datafusion-common",
"datafusion-doc",
@@ -3556,7 +3549,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-window-common"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"datafusion-common",
"datafusion-physical-expr-common",
@@ -3565,7 +3558,7 @@ dependencies = [
[[package]]
name = "datafusion-macros"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"datafusion-expr",
"quote",
@@ -3575,7 +3568,7 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"chrono",
@@ -3593,7 +3586,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3616,7 +3609,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr-common"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3629,7 +3622,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-optimizer"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"arrow-schema 54.3.1",
@@ -3650,7 +3643,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-plan"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3680,7 +3673,7 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"arrow-array 54.2.1",
@@ -3698,7 +3691,7 @@ dependencies = [
[[package]]
name = "datafusion-substrait"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"async-recursion",
"async-trait",
@@ -4708,7 +4701,6 @@ dependencies = [
"common-config",
"common-datasource",
"common-error",
"common-frontend",
"common-function",
"common-grpc",
"common-macro",
@@ -5141,7 +5133,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=5f6119ac7952878d39dcde0343c4bf828d18ffc8#5f6119ac7952878d39dcde0343c4bf828d18ffc8"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=2dca1dc67862d7b410838aef81232274c019b3f6#2dca1dc67862d7b410838aef81232274c019b3f6"
dependencies = [
"prost 0.13.5",
"serde",
@@ -8893,9 +8885,9 @@ dependencies = [
[[package]]
name = "pgwire"
version = "0.30.2"
version = "0.30.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ca6c26b25be998208a13ff2f0c55b567363f34675410e6d6f1c513a150583fd"
checksum = "ec79ee18e6cafde8698885646780b967ecc905120798b8359dd0da64f9688e89"
dependencies = [
"async-trait",
"bytes",
@@ -11143,7 +11135,6 @@ dependencies = [
"common-catalog",
"common-config",
"common-error",
"common-frontend",
"common-grpc",
"common-macro",
"common-mem-prof",

View File

@@ -116,15 +116,15 @@ clap = { version = "4.4", features = ["derive"] }
config = "0.13.0"
crossbeam-utils = "0.8"
dashmap = "6.1"
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
deadpool = "0.12"
deadpool-postgres = "0.14"
derive_builder = "0.20"
@@ -133,7 +133,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "5f6119ac7952878d39dcde0343c4bf828d18ffc8" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2dca1dc67862d7b410838aef81232274c019b3f6" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -232,7 +232,6 @@
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
| `grpc.server_addr` | String | `127.0.0.1:4001` | The address advertised to the metasrv, and used for connections from outside the host.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `grpc.bind_addr`. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for frontend side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression.<br/>Default to `none` |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
| `grpc.tls.mode` | String | `disable` | TLS mode. |
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |
@@ -405,7 +404,6 @@
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for datanode side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression.<br/>Default to `none` |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
| `grpc.tls.mode` | String | `disable` | TLS mode. |
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |

View File

@@ -44,13 +44,6 @@ runtime_size = 8
max_recv_message_size = "512MB"
## The maximum send message size for gRPC server.
max_send_message_size = "512MB"
## Compression mode for datanode side Arrow IPC service. Available options:
## - `none`: disable all compression
## - `transport`: only enable gRPC transport compression (zstd)
## - `arrow_ipc`: only enable Arrow IPC compression (lz4)
## - `all`: enable all compression.
## Default to `none`
flight_compression = "arrow_ipc"
## gRPC server TLS options, see `mysql.tls` section.
[grpc.tls]

View File

@@ -54,13 +54,6 @@ bind_addr = "127.0.0.1:4001"
server_addr = "127.0.0.1:4001"
## The number of server worker threads.
runtime_size = 8
## Compression mode for frontend side Arrow IPC service. Available options:
## - `none`: disable all compression
## - `transport`: only enable gRPC transport compression (zstd)
## - `arrow_ipc`: only enable Arrow IPC compression (lz4)
## - `all`: enable all compression.
## Default to `none`
flight_compression = "arrow_ipc"
## gRPC server TLS options, see `mysql.tls` section.
[grpc.tls]

View File

@@ -9,7 +9,7 @@ We highly recommend using the self-monitoring feature provided by [GreptimeDB Op
- **Metrics Dashboards**
- `dashboards/metrics/cluster/dashboard.json`: The Grafana dashboard for the GreptimeDB cluster. Read the [dashboard.md](./dashboards/metrics/cluster/dashboard.md) for more details.
- `dashboards/metrics/standalone/dashboard.json`: The Grafana dashboard for the standalone GreptimeDB instance. **It's generated from the `cluster/dashboard.json` by removing the instance filter through the `make dashboards` command**. Read the [dashboard.md](./dashboards/metrics/standalone/dashboard.md) for more details.
- **Logs Dashboard**
@@ -83,7 +83,7 @@ If you use the [Helm Chart](https://github.com/GreptimeTeam/helm-charts) to depl
- `monitoring.enabled=true`: Deploys a standalone GreptimeDB instance dedicated to monitoring the cluster;
- `grafana.enabled=true`: Deploys Grafana and automatically imports the monitoring dashboard;
The standalone GreptimeDB instance will collect metrics from your cluster, and the dashboard will be available in the Grafana UI. For detailed deployment instructions, please refer to our [Kubernetes deployment guide](https://docs.greptime.com/user-guide/deployments-administration/deploy-on-kubernetes/getting-started).
The standalone GreptimeDB instance will collect metrics from your cluster, and the dashboard will be available in the Grafana UI. For detailed deployment instructions, please refer to our [Kubernetes deployment guide](https://docs.greptime.com/nightly/user-guide/deployments/deploy-on-kubernetes/getting-started).
### Self-host Prometheus and import dashboards manually

View File

@@ -19,7 +19,6 @@ async-trait.workspace = true
bytes.workspace = true
common-catalog.workspace = true
common-error.workspace = true
common-frontend.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-procedure.workspace = true

View File

@@ -277,13 +277,6 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to list frontend nodes"))]
ListProcess {
source: common_frontend::error::Error,
#[snafu(implicit)]
location: Location,
},
}
impl Error {
@@ -352,7 +345,6 @@ impl ErrorExt for Error {
Error::GetViewCache { source, .. } | Error::GetTableCache { source, .. } => {
source.status_code()
}
Error::ListProcess { source, .. } => source.status_code(),
}
}

View File

@@ -51,7 +51,6 @@ use crate::error::{
};
use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
use crate::kvbackend::TableCacheRef;
use crate::process_manager::ProcessManagerRef;
use crate::system_schema::pg_catalog::PGCatalogProvider;
use crate::system_schema::SystemSchemaProvider;
use crate::CatalogManager;
@@ -85,7 +84,6 @@ impl KvBackendCatalogManager {
backend: KvBackendRef,
cache_registry: LayeredCacheRegistryRef,
procedure_manager: Option<ProcedureManagerRef>,
process_manager: Option<ProcessManagerRef>,
) -> Arc<Self> {
Arc::new_cyclic(|me| Self {
information_extension,
@@ -104,14 +102,12 @@ impl KvBackendCatalogManager {
DEFAULT_CATALOG_NAME.to_string(),
me.clone(),
Arc::new(FlowMetadataManager::new(backend.clone())),
process_manager.clone(),
)),
pg_catalog_provider: Arc::new(PGCatalogProvider::new(
DEFAULT_CATALOG_NAME.to_string(),
me.clone(),
)),
backend,
process_manager,
},
cache_registry,
procedure_manager,
@@ -423,7 +419,6 @@ struct SystemCatalog {
information_schema_provider: Arc<InformationSchemaProvider>,
pg_catalog_provider: Arc<PGCatalogProvider>,
backend: KvBackendRef,
process_manager: Option<ProcessManagerRef>,
}
impl SystemCatalog {
@@ -491,7 +486,6 @@ impl SystemCatalog {
catalog.to_string(),
self.catalog_manager.clone(),
Arc::new(FlowMetadataManager::new(self.backend.clone())),
self.process_manager.clone(),
))
});
information_schema_provider.table(table_name)

View File

@@ -40,7 +40,6 @@ pub mod information_schema {
pub use crate::system_schema::information_schema::*;
}
pub mod process_manager;
pub mod table_source;
#[async_trait::async_trait]

View File

@@ -356,7 +356,6 @@ impl MemoryCatalogManager {
catalog,
Arc::downgrade(self) as Weak<dyn CatalogManager>,
Arc::new(FlowMetadataManager::new(Arc::new(MemoryKvBackend::new()))),
None, // we don't need ProcessManager on regions server.
);
let information_schema = information_schema_provider.tables().clone();

View File

@@ -1,188 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use api::v1::frontend::{ListProcessRequest, ProcessInfo};
use common_frontend::selector::{FrontendSelector, MetaClientSelector};
use common_telemetry::{debug, info};
use common_time::util::current_time_millis;
use meta_client::MetaClientRef;
use snafu::ResultExt;
use crate::error;
pub type ProcessManagerRef = Arc<ProcessManager>;
pub struct ProcessManager {
server_addr: String,
next_id: AtomicU64,
catalogs: RwLock<HashMap<String, HashMap<u64, ProcessInfo>>>,
frontend_selector: Option<MetaClientSelector>,
}
impl ProcessManager {
/// Create a [ProcessManager] instance with server address and kv client.
pub fn new(server_addr: String, meta_client: Option<MetaClientRef>) -> Self {
let frontend_selector = meta_client.map(MetaClientSelector::new);
Self {
server_addr,
next_id: Default::default(),
catalogs: Default::default(),
frontend_selector,
}
}
}
impl ProcessManager {
/// Registers a submitted query. Use the provided id if present.
pub fn register_query(
self: &Arc<Self>,
catalog: String,
schemas: Vec<String>,
query: String,
client: String,
id: Option<u64>,
) -> Ticket {
let id = id.unwrap_or_else(|| self.next_id.fetch_add(1, Ordering::Relaxed));
let process = ProcessInfo {
id,
catalog: catalog.clone(),
schemas,
query,
start_timestamp: current_time_millis(),
client,
frontend: self.server_addr.clone(),
};
self.catalogs
.write()
.unwrap()
.entry(catalog.clone())
.or_default()
.insert(id, process);
Ticket {
catalog,
manager: self.clone(),
id,
}
}
/// Generates the next process id.
pub fn next_id(&self) -> u64 {
self.next_id.fetch_add(1, Ordering::Relaxed)
}
/// De-register a query from process list.
pub fn deregister_query(&self, catalog: String, id: u64) {
if let Entry::Occupied(mut o) = self.catalogs.write().unwrap().entry(catalog) {
let process = o.get_mut().remove(&id);
debug!("Deregister process: {:?}", process);
if o.get_mut().is_empty() {
o.remove();
}
}
}
pub fn deregister_all_queries(&self) {
self.catalogs.write().unwrap().clear();
info!("All queries on {} has been deregistered", self.server_addr);
}
/// List local running processes in given catalog.
pub fn local_processes(&self, catalog: Option<&str>) -> error::Result<Vec<ProcessInfo>> {
let catalogs = self.catalogs.read().unwrap();
let result = if let Some(catalog) = catalog {
if let Some(catalogs) = catalogs.get(catalog) {
catalogs.values().cloned().collect()
} else {
vec![]
}
} else {
catalogs
.values()
.flat_map(|v| v.values().cloned())
.collect()
};
Ok(result)
}
pub async fn list_all_processes(
&self,
catalog: Option<&str>,
) -> error::Result<Vec<ProcessInfo>> {
let mut processes = vec![];
if let Some(remote_frontend_selector) = self.frontend_selector.as_ref() {
let frontends = remote_frontend_selector
.select(|node| node.peer.addr != self.server_addr)
.await
.context(error::ListProcessSnafu)?;
for mut f in frontends {
processes.extend(
f.list_process(ListProcessRequest {
catalog: catalog.unwrap_or_default().to_string(),
})
.await
.context(error::ListProcessSnafu)?
.processes,
);
}
}
processes.extend(self.local_processes(catalog)?);
Ok(processes)
}
}
pub struct Ticket {
pub(crate) catalog: String,
pub(crate) manager: ProcessManagerRef,
pub(crate) id: u64,
}
impl Drop for Ticket {
fn drop(&mut self) {
self.manager
.deregister_query(std::mem::take(&mut self.catalog), self.id);
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::process_manager::ProcessManager;
#[tokio::test]
async fn test_register_query() {
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
let ticket = process_manager.clone().register_query(
"public".to_string(),
vec!["test".to_string()],
"SELECT * FROM table".to_string(),
"".to_string(),
None,
);
let running_processes = process_manager.local_processes(None).unwrap();
assert_eq!(running_processes.len(), 1);
assert_eq!(&running_processes[0].frontend, "127.0.0.1:8000");
assert_eq!(running_processes[0].id, ticket.id);
assert_eq!(&running_processes[0].query, "SELECT * FROM table");
drop(ticket);
assert_eq!(process_manager.local_processes(None).unwrap().len(), 0);
}
}

View File

@@ -19,7 +19,6 @@ mod information_memory_table;
pub mod key_column_usage;
mod partitions;
mod procedure_info;
mod process_list;
pub mod region_peers;
mod region_statistics;
mod runtime_metrics;
@@ -43,7 +42,6 @@ use common_recordbatch::SendableRecordBatchStream;
use datatypes::schema::SchemaRef;
use lazy_static::lazy_static;
use paste::paste;
use process_list::InformationSchemaProcessList;
use store_api::storage::{ScanRequest, TableId};
use table::metadata::TableType;
use table::TableRef;
@@ -52,7 +50,6 @@ use views::InformationSchemaViews;
use self::columns::InformationSchemaColumns;
use crate::error::{Error, Result};
use crate::process_manager::ProcessManagerRef;
use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo;
use crate::system_schema::information_schema::flows::InformationSchemaFlows;
use crate::system_schema::information_schema::information_memory_table::get_schema_columns;
@@ -116,7 +113,6 @@ macro_rules! setup_memory_table {
pub struct InformationSchemaProvider {
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
process_manager: Option<ProcessManagerRef>,
flow_metadata_manager: Arc<FlowMetadataManager>,
tables: HashMap<String, TableRef>,
}
@@ -211,10 +207,6 @@ impl SystemSchemaProviderInner for InformationSchemaProvider {
self.catalog_manager.clone(),
),
) as _),
PROCESS_LIST => self
.process_manager
.as_ref()
.map(|p| Arc::new(InformationSchemaProcessList::new(p.clone())) as _),
_ => None,
}
}
@@ -225,13 +217,11 @@ impl InformationSchemaProvider {
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
flow_metadata_manager: Arc<FlowMetadataManager>,
process_manager: Option<ProcessManagerRef>,
) -> Self {
let mut provider = Self {
catalog_name,
catalog_manager,
flow_metadata_manager,
process_manager,
tables: HashMap::new(),
};
@@ -287,9 +277,6 @@ impl InformationSchemaProvider {
self.build_table(TABLE_CONSTRAINTS).unwrap(),
);
tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap());
if let Some(process_list) = self.build_table(PROCESS_LIST) {
tables.insert(PROCESS_LIST.to_string(), process_list);
}
// Add memory tables
for name in MEMORY_TABLES.iter() {
tables.insert((*name).to_string(), self.build_table(name).expect(name));

View File

@@ -1,189 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_catalog::consts::INFORMATION_SCHEMA_PROCESS_LIST_TABLE_ID;
use common_error::ext::BoxedError;
use common_frontend::DisplayProcessId;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_time::util::current_time_millis;
use common_time::{Duration, Timestamp};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datatypes::prelude::ConcreteDataType as CDT;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{
DurationMillisecondVectorBuilder, StringVectorBuilder, TimestampMillisecondVectorBuilder,
VectorRef,
};
use snafu::ResultExt;
use store_api::storage::{ScanRequest, TableId};
use crate::error::{self, InternalSnafu};
use crate::information_schema::Predicates;
use crate::process_manager::ProcessManagerRef;
use crate::system_schema::information_schema::InformationTable;
/// Column names of `information_schema.process_list`
const ID: &str = "id";
const CATALOG: &str = "catalog";
const SCHEMAS: &str = "schemas";
const QUERY: &str = "query";
const CLIENT: &str = "client";
const FRONTEND: &str = "frontend";
const START_TIMESTAMP: &str = "start_timestamp";
const ELAPSED_TIME: &str = "elapsed_time";
/// `information_schema.process_list` table implementation that tracks running
/// queries in current cluster.
pub struct InformationSchemaProcessList {
schema: SchemaRef,
process_manager: ProcessManagerRef,
}
impl InformationSchemaProcessList {
pub fn new(process_manager: ProcessManagerRef) -> Self {
Self {
schema: Self::schema(),
process_manager,
}
}
fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
ColumnSchema::new(ID, CDT::string_datatype(), false),
ColumnSchema::new(CATALOG, CDT::string_datatype(), false),
ColumnSchema::new(SCHEMAS, CDT::string_datatype(), false),
ColumnSchema::new(QUERY, CDT::string_datatype(), false),
ColumnSchema::new(CLIENT, CDT::string_datatype(), false),
ColumnSchema::new(FRONTEND, CDT::string_datatype(), false),
ColumnSchema::new(
START_TIMESTAMP,
CDT::timestamp_millisecond_datatype(),
false,
),
ColumnSchema::new(ELAPSED_TIME, CDT::duration_millisecond_datatype(), false),
]))
}
}
impl InformationTable for InformationSchemaProcessList {
fn table_id(&self) -> TableId {
INFORMATION_SCHEMA_PROCESS_LIST_TABLE_ID
}
fn table_name(&self) -> &'static str {
"process_list"
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn to_stream(&self, request: ScanRequest) -> error::Result<SendableRecordBatchStream> {
let process_manager = self.process_manager.clone();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
self.schema.arrow_schema().clone(),
futures::stream::once(async move {
make_process_list(process_manager, request)
.await
.map(RecordBatch::into_df_record_batch)
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))
}),
));
Ok(Box::pin(
RecordBatchStreamAdapter::try_new(stream)
.map_err(BoxedError::new)
.context(InternalSnafu)?,
))
}
}
/// Build running process list.
async fn make_process_list(
process_manager: ProcessManagerRef,
request: ScanRequest,
) -> error::Result<RecordBatch> {
let predicates = Predicates::from_scan_request(&Some(request));
let current_time = current_time_millis();
// todo(hl): find a way to extract user catalog to filter queries from other users.
let queries = process_manager.list_all_processes(None).await?;
let mut id_builder = StringVectorBuilder::with_capacity(queries.len());
let mut catalog_builder = StringVectorBuilder::with_capacity(queries.len());
let mut schemas_builder = StringVectorBuilder::with_capacity(queries.len());
let mut query_builder = StringVectorBuilder::with_capacity(queries.len());
let mut client_builder = StringVectorBuilder::with_capacity(queries.len());
let mut frontend_builder = StringVectorBuilder::with_capacity(queries.len());
let mut start_time_builder = TimestampMillisecondVectorBuilder::with_capacity(queries.len());
let mut elapsed_time_builder = DurationMillisecondVectorBuilder::with_capacity(queries.len());
for process in queries {
let display_id = DisplayProcessId {
server_addr: process.frontend.to_string(),
id: process.id,
}
.to_string();
let schemas = process.schemas.join(",");
let id = Value::from(display_id);
let catalog = Value::from(process.catalog);
let schemas = Value::from(schemas);
let query = Value::from(process.query);
let client = Value::from(process.client);
let frontend = Value::from(process.frontend);
let start_timestamp = Value::from(Timestamp::new_millisecond(process.start_timestamp));
let elapsed_time = Value::from(Duration::new_millisecond(
current_time - process.start_timestamp,
));
let row = [
(ID, &id),
(CATALOG, &catalog),
(SCHEMAS, &schemas),
(QUERY, &query),
(CLIENT, &client),
(FRONTEND, &frontend),
(START_TIMESTAMP, &start_timestamp),
(ELAPSED_TIME, &elapsed_time),
];
if predicates.eval(&row) {
id_builder.push(id.as_string().as_deref());
catalog_builder.push(catalog.as_string().as_deref());
schemas_builder.push(schemas.as_string().as_deref());
query_builder.push(query.as_string().as_deref());
client_builder.push(client.as_string().as_deref());
frontend_builder.push(frontend.as_string().as_deref());
start_time_builder.push(start_timestamp.as_timestamp().map(|t| t.value().into()));
elapsed_time_builder.push(elapsed_time.as_duration().map(|d| d.value().into()));
}
}
RecordBatch::new(
InformationSchemaProcessList::schema(),
vec![
Arc::new(id_builder.finish()) as VectorRef,
Arc::new(catalog_builder.finish()) as VectorRef,
Arc::new(schemas_builder.finish()) as VectorRef,
Arc::new(query_builder.finish()) as VectorRef,
Arc::new(client_builder.finish()) as VectorRef,
Arc::new(frontend_builder.finish()) as VectorRef,
Arc::new(start_time_builder.finish()) as VectorRef,
Arc::new(elapsed_time_builder.finish()) as VectorRef,
],
)
.context(error::CreateRecordBatchSnafu)
}

View File

@@ -47,4 +47,3 @@ pub const VIEWS: &str = "views";
pub const FLOWS: &str = "flows";
pub const PROCEDURE_INFO: &str = "procedure_info";
pub const REGION_STATISTICS: &str = "region_statistics";
pub const PROCESS_LIST: &str = "process_list";

View File

@@ -328,7 +328,6 @@ mod tests {
backend.clone(),
layered_cache_registry,
None,
None,
);
let table_metadata_manager = TableMetadataManager::new(backend);
let mut view_info = common_meta::key::test_utils::new_test_table_info(1024, vec![]);

View File

@@ -58,7 +58,6 @@ where
info!("{desc}, average operation cost: {cost:.2} ms");
}
/// Command to benchmark table metadata operations.
#[derive(Debug, Default, Parser)]
pub struct BenchTableMetadataCommand {
#[clap(long)]

View File

@@ -1,39 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod export;
mod import;
use clap::Subcommand;
use common_error::ext::BoxedError;
use crate::data::export::ExportCommand;
use crate::data::import::ImportCommand;
use crate::Tool;
/// Command for data operations including exporting data from and importing data into GreptimeDB.
#[derive(Subcommand)]
pub enum DataCommand {
Export(ExportCommand),
Import(ImportCommand),
}
impl DataCommand {
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
match self {
DataCommand::Export(cmd) => cmd.build().await,
DataCommand::Import(cmd) => cmd.build().await,
}
}
}

View File

@@ -30,7 +30,6 @@ pub enum Error {
location: Location,
msg: String,
},
#[snafu(display("Failed to create default catalog and schema"))]
InitMetadata {
#[snafu(implicit)]
@@ -229,41 +228,19 @@ pub enum Error {
#[snafu(source)]
error: ObjectStoreError,
},
#[snafu(display("S3 config need be set"))]
S3ConfigNotSet {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Output directory not set"))]
OutputDirNotSet {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Empty store addresses"))]
EmptyStoreAddrs {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unsupported memory backend"))]
UnsupportedMemoryBackend {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("File path invalid: {}", msg))]
InvalidFilePath {
msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid arguments: {}", msg))]
InvalidArguments {
msg: String,
#[snafu(display("KV backend not set: {}", backend))]
KvBackendNotSet {
backend: String,
#[snafu(implicit)]
location: Location,
},
@@ -285,9 +262,6 @@ impl ErrorExt for Error {
| Error::ConnectEtcd { .. }
| Error::CreateDir { .. }
| Error::EmptyResult { .. }
| Error::InvalidFilePath { .. }
| Error::UnsupportedMemoryBackend { .. }
| Error::InvalidArguments { .. }
| Error::ParseProxyOpts { .. } => StatusCode::InvalidArguments,
Error::StartProcedureManager { source, .. }
@@ -308,7 +282,7 @@ impl ErrorExt for Error {
Error::OpenDal { .. } => StatusCode::Internal,
Error::S3ConfigNotSet { .. }
| Error::OutputDirNotSet { .. }
| Error::EmptyStoreAddrs { .. } => StatusCode::InvalidArguments,
| Error::KvBackendNotSet { .. } => StatusCode::InvalidArguments,
Error::BuildRuntime { source, .. } => source.status_code(),

View File

@@ -50,7 +50,6 @@ enum ExportTarget {
All,
}
/// Command for exporting data from the GreptimeDB.
#[derive(Debug, Default, Parser)]
pub struct ExportCommand {
/// Server address to connect

View File

@@ -40,7 +40,6 @@ enum ImportTarget {
All,
}
/// Command to import data from a directory into a GreptimeDB instance.
#[derive(Debug, Default, Parser)]
pub struct ImportCommand {
/// Server address to connect

View File

@@ -13,10 +13,11 @@
// limitations under the License.
mod bench;
mod data;
mod database;
pub mod error;
mod metadata;
mod export;
mod import;
mod meta_snapshot;
use async_trait::async_trait;
use clap::Parser;
@@ -25,8 +26,9 @@ pub use database::DatabaseClient;
use error::Result;
pub use crate::bench::BenchTableMetadataCommand;
pub use crate::data::DataCommand;
pub use crate::metadata::MetadataCommand;
pub use crate::export::ExportCommand;
pub use crate::import::ImportCommand;
pub use crate::meta_snapshot::{MetaRestoreCommand, MetaSnapshotCommand};
#[async_trait]
pub trait Tool: Send + Sync {

View File

@@ -12,38 +12,96 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::Path;
use std::sync::Arc;
use async_trait::async_trait;
use clap::{Parser, Subcommand};
use clap::Parser;
use common_base::secrets::{ExposeSecret, SecretString};
use common_error::ext::BoxedError;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::KvBackendRef;
use common_meta::snapshot::MetadataSnapshotManager;
use meta_srv::bootstrap::create_etcd_client;
use meta_srv::metasrv::BackendImpl;
use object_store::services::{Fs, S3};
use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use snafu::ResultExt;
use crate::error::{InvalidFilePathSnafu, OpenDalSnafu, S3ConfigNotSetSnafu};
use crate::metadata::common::StoreConfig;
use crate::error::{KvBackendNotSetSnafu, OpenDalSnafu, S3ConfigNotSetSnafu};
use crate::Tool;
/// Subcommand for metadata snapshot operations, including saving snapshots, restoring from snapshots, and viewing snapshot information.
#[derive(Subcommand)]
pub enum SnapshotCommand {
/// Save a snapshot of the current metadata state to a specified location.
Save(SaveCommand),
/// Restore metadata from a snapshot.
Restore(RestoreCommand),
/// Explore metadata from a snapshot.
Info(InfoCommand),
#[derive(Debug, Default, Parser)]
struct MetaConnection {
/// The endpoint of store. one of etcd, pg or mysql.
#[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)]
store_addrs: Vec<String>,
/// The database backend.
#[clap(long, value_enum)]
backend: Option<BackendImpl>,
#[clap(long, default_value = "")]
store_key_prefix: String,
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
#[clap(long,default_value = common_meta::kv_backend::DEFAULT_META_TABLE_NAME)]
meta_table_name: String,
#[clap(long, default_value = "128")]
max_txn_ops: usize,
}
impl SnapshotCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
match self {
SnapshotCommand::Save(cmd) => cmd.build().await,
SnapshotCommand::Restore(cmd) => cmd.build().await,
SnapshotCommand::Info(cmd) => cmd.build().await,
impl MetaConnection {
pub async fn build(&self) -> Result<KvBackendRef, BoxedError> {
let max_txn_ops = self.max_txn_ops;
let store_addrs = &self.store_addrs;
if store_addrs.is_empty() {
KvBackendNotSetSnafu { backend: "all" }
.fail()
.map_err(BoxedError::new)
} else {
let kvbackend = match self.backend {
Some(BackendImpl::EtcdStore) => {
let etcd_client = create_etcd_client(store_addrs)
.await
.map_err(BoxedError::new)?;
Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops))
}
#[cfg(feature = "pg_kvbackend")]
Some(BackendImpl::PostgresStore) => {
let table_name = &self.meta_table_name;
let pool = meta_srv::bootstrap::create_postgres_pool(store_addrs)
.await
.map_err(BoxedError::new)?;
Ok(common_meta::kv_backend::rds::PgStore::with_pg_pool(
pool,
table_name,
max_txn_ops,
)
.await
.map_err(BoxedError::new)?)
}
#[cfg(feature = "mysql_kvbackend")]
Some(BackendImpl::MysqlStore) => {
let table_name = &self.meta_table_name;
let pool = meta_srv::bootstrap::create_mysql_pool(store_addrs)
.await
.map_err(BoxedError::new)?;
Ok(common_meta::kv_backend::rds::MySqlStore::with_mysql_pool(
pool,
table_name,
max_txn_ops,
)
.await
.map_err(BoxedError::new)?)
}
_ => KvBackendNotSetSnafu { backend: "all" }
.fail()
.map_err(BoxedError::new),
};
if self.store_key_prefix.is_empty() {
kvbackend
} else {
let chroot_kvbackend =
ChrootKvBackend::new(self.store_key_prefix.as_bytes().to_vec(), kvbackend?);
Ok(Arc::new(chroot_kvbackend))
}
}
}
}
@@ -112,10 +170,10 @@ impl S3Config {
/// It will dump the metadata snapshot to local file or s3 bucket.
/// The snapshot file will be in binary format.
#[derive(Debug, Default, Parser)]
pub struct SaveCommand {
/// The store configuration.
pub struct MetaSnapshotCommand {
/// The connection to the metadata store.
#[clap(flatten)]
store: StoreConfig,
connection: MetaConnection,
/// The s3 config.
#[clap(flatten)]
s3_config: S3Config,
@@ -138,9 +196,9 @@ fn create_local_file_object_store(root: &str) -> Result<ObjectStore, BoxedError>
Ok(object_store)
}
impl SaveCommand {
impl MetaSnapshotCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
let kvbackend = self.store.build().await?;
let kvbackend = self.connection.build().await?;
let output_dir = &self.output_dir;
let object_store = self.s3_config.build(output_dir).map_err(BoxedError::new)?;
if let Some(store) = object_store {
@@ -160,7 +218,7 @@ impl SaveCommand {
}
}
struct MetaSnapshotTool {
pub struct MetaSnapshotTool {
inner: MetadataSnapshotManager,
target_file: String,
}
@@ -176,16 +234,14 @@ impl Tool for MetaSnapshotTool {
}
}
/// Restore metadata from a snapshot file.
///
/// This command restores the metadata state from a previously saved snapshot.
/// The snapshot can be loaded from either a local file system or an S3 bucket,
/// depending on the provided configuration.
/// Restore metadata snapshot tool.
/// This tool is used to restore metadata snapshot from etcd, pg or mysql.
/// It will restore the metadata snapshot from local file or s3 bucket.
#[derive(Debug, Default, Parser)]
pub struct RestoreCommand {
/// The store configuration.
pub struct MetaRestoreCommand {
/// The connection to the metadata store.
#[clap(flatten)]
store: StoreConfig,
connection: MetaConnection,
/// The s3 config.
#[clap(flatten)]
s3_config: S3Config,
@@ -199,9 +255,9 @@ pub struct RestoreCommand {
force: bool,
}
impl RestoreCommand {
impl MetaRestoreCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
let kvbackend = self.store.build().await?;
let kvbackend = self.connection.build().await?;
let input_dir = &self.input_dir;
let object_store = self.s3_config.build(input_dir).map_err(BoxedError::new)?;
if let Some(store) = object_store {
@@ -223,7 +279,7 @@ impl RestoreCommand {
}
}
struct MetaRestoreTool {
pub struct MetaRestoreTool {
inner: MetadataSnapshotManager,
source_file: String,
force: bool,
@@ -271,93 +327,3 @@ impl Tool for MetaRestoreTool {
}
}
}
/// Explore metadata from a snapshot file.
///
/// This command allows filtering the metadata by a specific key and limiting the number of results.
/// It prints the filtered metadata to the console.
#[derive(Debug, Default, Parser)]
pub struct InfoCommand {
/// The s3 config.
#[clap(flatten)]
s3_config: S3Config,
/// The name of the target snapshot file. we will add the file extension automatically.
#[clap(long, default_value = "metadata_snapshot")]
file_name: String,
/// The query string to filter the metadata.
#[clap(long, default_value = "*")]
inspect_key: String,
/// The limit of the metadata to query.
#[clap(long)]
limit: Option<usize>,
}
struct MetaInfoTool {
inner: ObjectStore,
source_file: String,
inspect_key: String,
limit: Option<usize>,
}
#[async_trait]
impl Tool for MetaInfoTool {
#[allow(clippy::print_stdout)]
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
let result = MetadataSnapshotManager::info(
&self.inner,
&self.source_file,
&self.inspect_key,
self.limit,
)
.await
.map_err(BoxedError::new)?;
for item in result {
println!("{}", item);
}
Ok(())
}
}
impl InfoCommand {
fn decide_object_store_root_for_local_store(
file_path: &str,
) -> Result<(&str, &str), BoxedError> {
let path = Path::new(file_path);
let parent = path
.parent()
.and_then(|p| p.to_str())
.context(InvalidFilePathSnafu { msg: file_path })
.map_err(BoxedError::new)?;
let file_name = path
.file_name()
.and_then(|f| f.to_str())
.context(InvalidFilePathSnafu { msg: file_path })
.map_err(BoxedError::new)?;
let root = if parent.is_empty() { "." } else { parent };
Ok((root, file_name))
}
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
let object_store = self.s3_config.build("").map_err(BoxedError::new)?;
if let Some(store) = object_store {
let tool = MetaInfoTool {
inner: store,
source_file: self.file_name.clone(),
inspect_key: self.inspect_key.clone(),
limit: self.limit,
};
Ok(Box::new(tool))
} else {
let (root, file_name) =
Self::decide_object_store_root_for_local_store(&self.file_name)?;
let object_store = create_local_file_object_store(root)?;
let tool = MetaInfoTool {
inner: object_store,
source_file: file_name.to_string(),
inspect_key: self.inspect_key.clone(),
limit: self.limit,
};
Ok(Box::new(tool))
}
}
}

View File

@@ -1,42 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod common;
mod control;
mod snapshot;
use clap::Subcommand;
use common_error::ext::BoxedError;
use crate::metadata::control::ControlCommand;
use crate::metadata::snapshot::SnapshotCommand;
use crate::Tool;
/// Command for managing metadata operations, including saving metadata snapshots and restoring metadata from snapshots.
#[derive(Subcommand)]
pub enum MetadataCommand {
#[clap(subcommand)]
Snapshot(SnapshotCommand),
#[clap(subcommand)]
Control(ControlCommand),
}
impl MetadataCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
match self {
MetadataCommand::Snapshot(cmd) => cmd.build().await,
MetadataCommand::Control(cmd) => cmd.build().await,
}
}
}

View File

@@ -1,116 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use clap::Parser;
use common_error::ext::BoxedError;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::KvBackendRef;
use meta_srv::bootstrap::create_etcd_client;
use meta_srv::metasrv::BackendImpl;
use crate::error::{EmptyStoreAddrsSnafu, UnsupportedMemoryBackendSnafu};
#[derive(Debug, Default, Parser)]
pub(crate) struct StoreConfig {
/// The endpoint of store. one of etcd, postgres or mysql.
///
/// For postgres store, the format is:
/// "password=password dbname=postgres user=postgres host=localhost port=5432"
///
/// For etcd store, the format is:
/// "127.0.0.1:2379"
///
/// For mysql store, the format is:
/// "mysql://user:password@ip:port/dbname"
#[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)]
store_addrs: Vec<String>,
/// The maximum number of operations in a transaction. Only used when using [etcd-store].
#[clap(long, default_value = "128")]
max_txn_ops: usize,
/// The metadata store backend.
#[clap(long, value_enum, default_value = "etcd-store")]
backend: BackendImpl,
/// The key prefix of the metadata store.
#[clap(long, default_value = "")]
store_key_prefix: String,
/// The table name in RDS to store metadata. Only used when using [postgres-store] or [mysql-store].
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
#[clap(long, default_value = common_meta::kv_backend::DEFAULT_META_TABLE_NAME)]
meta_table_name: String,
}
impl StoreConfig {
/// Builds a [`KvBackendRef`] from the store configuration.
pub async fn build(&self) -> Result<KvBackendRef, BoxedError> {
let max_txn_ops = self.max_txn_ops;
let store_addrs = &self.store_addrs;
if store_addrs.is_empty() {
EmptyStoreAddrsSnafu.fail().map_err(BoxedError::new)
} else {
let kvbackend = match self.backend {
BackendImpl::EtcdStore => {
let etcd_client = create_etcd_client(store_addrs)
.await
.map_err(BoxedError::new)?;
Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops))
}
#[cfg(feature = "pg_kvbackend")]
BackendImpl::PostgresStore => {
let table_name = &self.meta_table_name;
let pool = meta_srv::bootstrap::create_postgres_pool(store_addrs)
.await
.map_err(BoxedError::new)?;
Ok(common_meta::kv_backend::rds::PgStore::with_pg_pool(
pool,
table_name,
max_txn_ops,
)
.await
.map_err(BoxedError::new)?)
}
#[cfg(feature = "mysql_kvbackend")]
BackendImpl::MysqlStore => {
let table_name = &self.meta_table_name;
let pool = meta_srv::bootstrap::create_mysql_pool(store_addrs)
.await
.map_err(BoxedError::new)?;
Ok(common_meta::kv_backend::rds::MySqlStore::with_mysql_pool(
pool,
table_name,
max_txn_ops,
)
.await
.map_err(BoxedError::new)?)
}
BackendImpl::MemoryStore => UnsupportedMemoryBackendSnafu
.fail()
.map_err(BoxedError::new),
};
if self.store_key_prefix.is_empty() {
kvbackend
} else {
let chroot_kvbackend =
ChrootKvBackend::new(self.store_key_prefix.as_bytes().to_vec(), kvbackend?);
Ok(Arc::new(chroot_kvbackend))
}
}
}
}

View File

@@ -1,38 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod get;
mod utils;
use clap::Subcommand;
use common_error::ext::BoxedError;
use get::GetCommand;
use crate::Tool;
/// Subcommand for metadata control.
#[derive(Subcommand)]
pub enum ControlCommand {
/// Get the metadata from the metasrv.
#[clap(subcommand)]
Get(GetCommand),
}
impl ControlCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
match self {
ControlCommand::Get(cmd) => cmd.build().await,
}
}
}

View File

@@ -1,242 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cmp::min;
use async_trait::async_trait;
use clap::{Parser, Subcommand};
use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::key::table_info::TableInfoKey;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::table_route::TableRouteKey;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use common_meta::rpc::store::RangeRequest;
use futures::TryStreamExt;
use crate::error::InvalidArgumentsSnafu;
use crate::metadata::common::StoreConfig;
use crate::metadata::control::utils::{decode_key_value, json_fromatter};
use crate::Tool;
/// Subcommand for get command.
#[derive(Subcommand)]
pub enum GetCommand {
Key(GetKeyCommand),
Table(GetTableCommand),
}
impl GetCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
match self {
GetCommand::Key(cmd) => cmd.build().await,
GetCommand::Table(cmd) => cmd.build().await,
}
}
}
/// Get key-value pairs from the metadata store.
#[derive(Debug, Default, Parser)]
pub struct GetKeyCommand {
/// The key to get from the metadata store. If empty, returns all key-value pairs.
#[clap(default_value = "")]
key: String,
/// Whether to perform a prefix query. If true, returns all key-value pairs where the key starts with the given prefix.
#[clap(long, default_value = "false")]
prefix: bool,
/// The maximum number of key-value pairs to return. If 0, returns all key-value pairs.
#[clap(long, default_value = "0")]
limit: u64,
#[clap(flatten)]
store: StoreConfig,
}
impl GetKeyCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
let kvbackend = self.store.build().await?;
Ok(Box::new(GetKeyTool {
kvbackend,
key: self.key.clone(),
prefix: self.prefix,
limit: self.limit,
}))
}
}
struct GetKeyTool {
kvbackend: KvBackendRef,
key: String,
prefix: bool,
limit: u64,
}
#[async_trait]
impl Tool for GetKeyTool {
async fn do_work(&self) -> Result<(), BoxedError> {
let mut req = RangeRequest::default();
if self.prefix {
req = req.with_prefix(self.key.as_bytes());
} else {
req = req.with_key(self.key.as_bytes());
}
let page_size = if self.limit > 0 {
min(self.limit as usize, DEFAULT_PAGE_SIZE)
} else {
DEFAULT_PAGE_SIZE
};
let pagination_stream =
PaginationStream::new(self.kvbackend.clone(), req, page_size, decode_key_value);
let mut stream = Box::pin(pagination_stream.into_stream());
let mut counter = 0;
while let Some((key, value)) = stream.try_next().await.map_err(BoxedError::new)? {
print!("{}\n{}\n", key, value);
counter += 1;
if self.limit > 0 && counter >= self.limit {
break;
}
}
Ok(())
}
}
/// Get table metadata from the metadata store via table id.
#[derive(Debug, Default, Parser)]
pub struct GetTableCommand {
/// Get table metadata by table id.
#[clap(long)]
table_id: Option<u32>,
/// Get table metadata by table name.
#[clap(long)]
table_name: Option<String>,
/// The schema name of the table.
#[clap(long)]
schema_name: Option<String>,
/// Pretty print the output.
#[clap(long, default_value = "false")]
pretty: bool,
#[clap(flatten)]
store: StoreConfig,
}
impl GetTableCommand {
pub fn validate(&self) -> Result<(), BoxedError> {
if self.table_id.is_none() && self.table_name.is_none() {
return Err(BoxedError::new(
InvalidArgumentsSnafu {
msg: "You must specify either --table-id or --table-name.",
}
.build(),
));
}
Ok(())
}
}
struct GetTableTool {
kvbackend: KvBackendRef,
table_id: Option<u32>,
table_name: Option<String>,
schema_name: Option<String>,
pretty: bool,
}
#[async_trait]
impl Tool for GetTableTool {
async fn do_work(&self) -> Result<(), BoxedError> {
let table_metadata_manager = TableMetadataManager::new(self.kvbackend.clone());
let table_name_manager = table_metadata_manager.table_name_manager();
let table_info_manager = table_metadata_manager.table_info_manager();
let table_route_manager = table_metadata_manager.table_route_manager();
let table_id = if let Some(table_name) = &self.table_name {
let catalog = DEFAULT_CATALOG_NAME.to_string();
let schema_name = self
.schema_name
.clone()
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());
let key = TableNameKey::new(&catalog, &schema_name, table_name);
let Some(table_name) = table_name_manager.get(key).await.map_err(BoxedError::new)?
else {
println!(
"Table({}) not found",
format_full_table_name(&catalog, &schema_name, table_name)
);
return Ok(());
};
table_name.table_id()
} else {
// Safety: we have validated that table_id or table_name is not None
self.table_id.unwrap()
};
let table_info = table_info_manager
.get(table_id)
.await
.map_err(BoxedError::new)?;
if let Some(table_info) = table_info {
println!(
"{}\n{}",
TableInfoKey::new(table_id),
json_fromatter(self.pretty, &*table_info)
);
} else {
println!("Table info not found");
}
let table_route = table_route_manager
.table_route_storage()
.get(table_id)
.await
.map_err(BoxedError::new)?;
if let Some(table_route) = table_route {
println!(
"{}\n{}",
TableRouteKey::new(table_id),
json_fromatter(self.pretty, &table_route)
);
} else {
println!("Table route not found");
}
Ok(())
}
}
impl GetTableCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
self.validate()?;
let kvbackend = self.store.build().await?;
Ok(Box::new(GetTableTool {
kvbackend,
table_id: self.table_id,
table_name: self.table_name.clone(),
schema_name: self.schema_name.clone(),
pretty: self.pretty,
}))
}
}

View File

@@ -1,36 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::error::Result as CommonMetaResult;
use common_meta::rpc::KeyValue;
use serde::Serialize;
/// Decodes a key-value pair into a string.
pub fn decode_key_value(kv: KeyValue) -> CommonMetaResult<(String, String)> {
let key = String::from_utf8_lossy(&kv.key).to_string();
let value = String::from_utf8_lossy(&kv.value).to_string();
Ok((key, value))
}
/// Formats a value as a JSON string.
pub fn json_fromatter<T>(pretty: bool, value: &T) -> String
where
T: Serialize,
{
if pretty {
serde_json::to_string_pretty(value).unwrap()
} else {
serde_json::to_string(value).unwrap()
}
}

View File

@@ -162,23 +162,12 @@ impl Client {
.as_bytes() as usize
}
pub fn make_flight_client(
&self,
send_compression: bool,
accept_compression: bool,
) -> Result<FlightClient> {
pub fn make_flight_client(&self) -> Result<FlightClient> {
let (addr, channel) = self.find_channel()?;
let mut client = FlightServiceClient::new(channel)
let client = FlightServiceClient::new(channel)
.max_decoding_message_size(self.max_grpc_recv_message_size())
.max_encoding_message_size(self.max_grpc_send_message_size());
// todo(hl): support compression methods.
if send_compression {
client = client.send_compressed(CompressionEncoding::Zstd);
}
if accept_compression {
client = client.accept_compressed(CompressionEncoding::Zstd);
}
Ok(FlightClient { addr, client })
}

View File

@@ -49,16 +49,7 @@ impl NodeManager for NodeClients {
async fn datanode(&self, datanode: &Peer) -> DatanodeRef {
let client = self.get_client(datanode).await;
let ChannelConfig {
send_compression,
accept_compression,
..
} = self.channel_manager.config();
Arc::new(RegionRequester::new(
client,
*send_compression,
*accept_compression,
))
Arc::new(RegionRequester::new(client))
}
async fn flownode(&self, flownode: &Peer) -> FlownodeRef {

View File

@@ -287,7 +287,7 @@ impl Database {
let mut request = tonic::Request::new(request);
Self::put_hints(request.metadata_mut(), hints)?;
let mut client = self.client.make_flight_client(false, false)?;
let mut client = self.client.make_flight_client()?;
let response = client.mut_inner().do_get(request).await.or_else(|e| {
let tonic_code = e.code();
@@ -409,7 +409,7 @@ impl Database {
MetadataValue::from_str(db_to_put).context(InvalidTonicMetadataValueSnafu)?,
);
let mut client = self.client.make_flight_client(false, false)?;
let mut client = self.client.make_flight_client()?;
let response = client.mut_inner().do_put(request).await?;
let response = response
.into_inner()

View File

@@ -46,8 +46,6 @@ use crate::{metrics, Client, Error};
#[derive(Debug)]
pub struct RegionRequester {
client: Client,
send_compression: bool,
accept_compression: bool,
}
#[async_trait]
@@ -91,18 +89,12 @@ impl Datanode for RegionRequester {
}
impl RegionRequester {
pub fn new(client: Client, send_compression: bool, accept_compression: bool) -> Self {
Self {
client,
send_compression,
accept_compression,
}
pub fn new(client: Client) -> Self {
Self { client }
}
pub async fn do_get_inner(&self, ticket: Ticket) -> Result<SendableRecordBatchStream> {
let mut flight_client = self
.client
.make_flight_client(self.send_compression, self.accept_compression)?;
let mut flight_client = self.client.make_flight_client()?;
let response = flight_client
.mut_inner()
.do_get(ticket)

View File

@@ -146,7 +146,6 @@ mod tests {
let output_dir = tempfile::tempdir().unwrap();
let cli = cli::Command::parse_from([
"cli",
"data",
"export",
"--addr",
"127.0.0.1:4000",

View File

@@ -323,7 +323,6 @@ impl StartCommand {
cached_meta_backend.clone(),
layered_cache_registry.clone(),
None,
None,
);
let table_metadata_manager =

View File

@@ -20,7 +20,6 @@ use async_trait::async_trait;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::information_extension::DistributedInformationExtension;
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use catalog::process_manager::ProcessManager;
use clap::Parser;
use client::client_manager::NodeClients;
use common_base::Plugins;
@@ -39,7 +38,6 @@ use frontend::heartbeat::HeartbeatTask;
use frontend::instance::builder::FrontendBuilder;
use frontend::server::Services;
use meta_client::{MetaClientOptions, MetaClientType};
use servers::addrs;
use servers::export_metrics::ExportMetricsTask;
use servers::tls::{TlsMode, TlsOption};
use snafu::{OptionExt, ResultExt};
@@ -344,17 +342,11 @@ impl StartCommand {
let information_extension =
Arc::new(DistributedInformationExtension::new(meta_client.clone()));
let process_manager = Arc::new(ProcessManager::new(
addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
Some(meta_client.clone()),
));
let catalog_manager = KvBackendCatalogManager::new(
information_extension,
cached_meta_backend.clone(),
layered_cache_registry.clone(),
None,
Some(process_manager.clone()),
);
let executor = HandlerGroupExecutor::new(vec![
@@ -372,16 +364,12 @@ impl StartCommand {
// frontend to datanode need not timeout.
// Some queries are expected to take long time.
let mut channel_config = ChannelConfig {
let channel_config = ChannelConfig {
timeout: None,
tcp_nodelay: opts.datanode.client.tcp_nodelay,
connect_timeout: Some(opts.datanode.client.connect_timeout),
..Default::default()
};
if opts.grpc.flight_compression.transport_compression() {
channel_config.accept_compression = true;
channel_config.send_compression = true;
}
let client = NodeClients::new(channel_config);
let instance = FrontendBuilder::new(
@@ -391,7 +379,6 @@ impl StartCommand {
catalog_manager,
Arc::new(client),
meta_client,
process_manager,
)
.with_plugin(plugins.clone())
.with_local_cache_invalidator(layered_cache_registry)

View File

@@ -21,7 +21,6 @@ use async_trait::async_trait;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::information_schema::InformationExtension;
use catalog::kvbackend::KvBackendCatalogManager;
use catalog::process_manager::ProcessManager;
use clap::Parser;
use client::api::v1::meta::RegionRole;
use common_base::readable_size::ReadableSize;
@@ -527,14 +526,11 @@ impl StartCommand {
datanode.region_server(),
procedure_manager.clone(),
));
let process_manager = Arc::new(ProcessManager::new(opts.grpc.server_addr.clone(), None));
let catalog_manager = KvBackendCatalogManager::new(
information_extension.clone(),
kv_backend.clone(),
layered_cache_registry.clone(),
Some(procedure_manager.clone()),
Some(process_manager.clone()),
);
let table_metadata_manager =
@@ -624,7 +620,6 @@ impl StartCommand {
catalog_manager.clone(),
node_manager.clone(),
ddl_task_executor.clone(),
process_manager,
)
.with_plugin(plugins.clone())
.try_build()
@@ -652,7 +647,7 @@ impl StartCommand {
node_manager,
)
.await
.context(StartFlownodeSnafu)?;
.context(error::StartFlownodeSnafu)?;
flow_streaming_engine.set_frontend_invoker(invoker).await;
let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))

View File

@@ -102,8 +102,6 @@ pub const INFORMATION_SCHEMA_FLOW_TABLE_ID: u32 = 33;
pub const INFORMATION_SCHEMA_PROCEDURE_INFO_TABLE_ID: u32 = 34;
/// id for information_schema.region_statistics
pub const INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID: u32 = 35;
/// id for information_schema.process_list
pub const INFORMATION_SCHEMA_PROCESS_LIST_TABLE_ID: u32 = 36;
// ----- End of information_schema tables -----

View File

@@ -7,13 +7,5 @@ license.workspace = true
[dependencies]
async-trait.workspace = true
common-error.workspace = true
common-grpc.workspace = true
common-macro.workspace = true
common-meta.workspace = true
greptime-proto.workspace = true
meta-client.workspace = true
snafu.workspace = true
tonic.workspace = true
[dev-dependencies]
tokio.workspace = true

View File

@@ -27,35 +27,6 @@ pub enum Error {
location: Location,
source: BoxedError,
},
#[snafu(display("Failed to list nodes from metasrv"))]
Meta {
source: Box<meta_client::error::Error>,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to parse process id: {}", s))]
ParseProcessId {
s: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to invoke list process service"))]
ListProcess {
#[snafu(source)]
error: tonic::Status,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to invoke list process service"))]
CreateChannel {
source: common_grpc::error::Error,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -65,10 +36,6 @@ impl ErrorExt for Error {
use Error::*;
match self {
External { source, .. } => source.status_code(),
Meta { source, .. } => source.status_code(),
ParseProcessId { .. } => StatusCode::InvalidArguments,
ListProcess { .. } => StatusCode::External,
CreateChannel { source, .. } => source.status_code(),
}
}

View File

@@ -12,41 +12,4 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{Display, Formatter};
use std::str::FromStr;
use snafu::OptionExt;
pub mod error;
pub mod selector;
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct DisplayProcessId {
pub server_addr: String,
pub id: u64,
}
impl Display for DisplayProcessId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", self.server_addr, self.id)
}
}
impl TryFrom<&str> for DisplayProcessId {
type Error = error::Error;
fn try_from(value: &str) -> Result<Self, Self::Error> {
let mut split = value.split('/');
let server_addr = split
.next()
.context(error::ParseProcessIdSnafu { s: value })?
.to_string();
let id = split
.next()
.context(error::ParseProcessIdSnafu { s: value })?;
let id = u64::from_str(id)
.ok()
.context(error::ParseProcessIdSnafu { s: value })?;
Ok(DisplayProcessId { server_addr, id })
}
}

View File

@@ -1,98 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::cluster::{ClusterInfo, NodeInfo, Role};
use greptime_proto::v1::frontend::{frontend_client, ListProcessRequest, ListProcessResponse};
use meta_client::MetaClientRef;
use snafu::ResultExt;
use crate::error;
use crate::error::{MetaSnafu, Result};
pub type FrontendClientPtr = Box<dyn FrontendClient>;
#[async_trait::async_trait]
pub trait FrontendClient: Send {
async fn list_process(&mut self, req: ListProcessRequest) -> Result<ListProcessResponse>;
}
#[async_trait::async_trait]
impl FrontendClient for frontend_client::FrontendClient<tonic::transport::channel::Channel> {
async fn list_process(&mut self, req: ListProcessRequest) -> Result<ListProcessResponse> {
let response: ListProcessResponse = frontend_client::FrontendClient::<
tonic::transport::channel::Channel,
>::list_process(self, req)
.await
.context(error::ListProcessSnafu)?
.into_inner();
Ok(response)
}
}
#[async_trait::async_trait]
pub trait FrontendSelector {
async fn select<F>(&self, predicate: F) -> Result<Vec<FrontendClientPtr>>
where
F: Fn(&NodeInfo) -> bool + Send;
}
#[derive(Debug, Clone)]
pub struct MetaClientSelector {
meta_client: MetaClientRef,
channel_manager: ChannelManager,
}
#[async_trait::async_trait]
impl FrontendSelector for MetaClientSelector {
async fn select<F>(&self, predicate: F) -> Result<Vec<FrontendClientPtr>>
where
F: Fn(&NodeInfo) -> bool + Send,
{
let nodes = self
.meta_client
.list_nodes(Some(Role::Frontend))
.await
.map_err(Box::new)
.context(MetaSnafu)?;
nodes
.into_iter()
.filter(predicate)
.map(|node| {
let channel = self
.channel_manager
.get(node.peer.addr)
.context(error::CreateChannelSnafu)?;
let client = frontend_client::FrontendClient::new(channel);
Ok(Box::new(client) as FrontendClientPtr)
})
.collect::<Result<Vec<_>>>()
}
}
impl MetaClientSelector {
pub fn new(meta_client: MetaClientRef) -> Self {
let cfg = ChannelConfig::new()
.connect_timeout(Duration::from_secs(30))
.timeout(Duration::from_secs(30));
let channel_manager = ChannelManager::with_config(cfg);
Self {
meta_client,
channel_manager,
}
}
}

View File

@@ -0,0 +1,90 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_macro::admin_fn;
use common_query::error::{
InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::Signature;
use datafusion::logical_expr::Volatility;
use datatypes::value::{Value, ValueRef};
use session::context::QueryContextRef;
use snafu::ensure;
use store_api::storage::ConcreteDataType;
use crate::handlers::FlowServiceHandlerRef;
use crate::helper::parse_catalog_flow;
fn adjust_signature() -> Signature {
Signature::exact(
vec![
ConcreteDataType::string_datatype(), // flow name
ConcreteDataType::uint64_datatype(), // min_run_interval in seconds
ConcreteDataType::uint64_datatype(), // max filter number per query
],
Volatility::Immutable,
)
}
#[admin_fn(
name = AdjustFlowFunction,
display_name = adjust_flow,
sig_fn = adjust_signature,
ret = uint64
)]
pub(crate) async fn adjust_flow(
flow_service_handler: &FlowServiceHandlerRef,
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
params.len() == 3,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 3, have: {}",
params.len()
),
}
);
let (flow_name, min_run_interval, max_filter_num) = match (params[0], params[1], params[2]) {
(
ValueRef::String(flow_name),
ValueRef::UInt64(min_run_interval),
ValueRef::UInt64(max_filter_num),
) => (flow_name, min_run_interval, max_filter_num),
_ => {
return UnsupportedInputDataTypeSnafu {
function: "adjust_flow",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
}
};
let (catalog_name, flow_name) = parse_catalog_flow(flow_name, query_ctx)?;
let res = flow_service_handler
.adjust(
&catalog_name,
&flow_name,
min_run_interval,
max_filter_num as usize,
query_ctx.clone(),
)
.await?;
let affected_rows = res.affected_rows;
Ok(Value::from(affected_rows))
}

View File

@@ -26,6 +26,7 @@ use flush_compact_table::{CompactTableFunction, FlushTableFunction};
use migrate_region::MigrateRegionFunction;
use remove_region_follower::RemoveRegionFollowerFunction;
use crate::adjust_flow::AdjustFlowFunction;
use crate::flush_flow::FlushFlowFunction;
use crate::function_registry::FunctionRegistry;
@@ -43,5 +44,6 @@ impl AdminFunction {
registry.register_async(Arc::new(FlushTableFunction));
registry.register_async(Arc::new(CompactTableFunction));
registry.register_async(Arc::new(FlushFlowFunction));
registry.register_async(Arc::new(AdjustFlowFunction));
}
}

View File

@@ -12,7 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod approximate;
#[cfg(feature = "geo")]
pub mod geo;
pub mod vector;
mod geo_path;
mod hll;
mod uddsketch_state;
pub use geo_path::{GeoPathAccumulator, GEO_PATH_NAME};
pub(crate) use hll::HllStateType;
pub use hll::{HllState, HLL_MERGE_NAME, HLL_NAME};
pub use uddsketch_state::{UddSketchState, UDDSKETCH_MERGE_NAME, UDDSKETCH_STATE_NAME};

View File

@@ -47,7 +47,7 @@ impl GeoPathAccumulator {
Self::default()
}
pub fn uadf_impl() -> AggregateUDF {
pub fn udf_impl() -> AggregateUDF {
create_udaf(
GEO_PATH_NAME,
// Input types: lat, lng, timestamp

View File

@@ -1,32 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::function_registry::FunctionRegistry;
pub(crate) mod hll;
mod uddsketch;
pub(crate) struct ApproximateFunction;
impl ApproximateFunction {
pub fn register(registry: &FunctionRegistry) {
// uddsketch
registry.register_aggr(uddsketch::UddSketchState::state_udf_impl());
registry.register_aggr(uddsketch::UddSketchState::merge_udf_impl());
// hll
registry.register_aggr(hll::HllState::state_udf_impl());
registry.register_aggr(hll::HllState::merge_udf_impl());
}
}

View File

@@ -1,27 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::function_registry::FunctionRegistry;
mod encoding;
mod geo_path;
pub(crate) struct GeoFunction;
impl GeoFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_aggr(geo_path::GeoPathAccumulator::uadf_impl());
registry.register_aggr(encoding::JsonPathAccumulator::uadf_impl());
}
}

View File

@@ -1,29 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::aggrs::vector::product::VectorProduct;
use crate::aggrs::vector::sum::VectorSum;
use crate::function_registry::FunctionRegistry;
mod product;
mod sum;
pub(crate) struct VectorFunction;
impl VectorFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_aggr(VectorSum::uadf_impl());
registry.register_aggr(VectorProduct::uadf_impl());
}
}

View File

@@ -12,21 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_error::ext::BoxedError;
use common_macro::admin_fn;
use common_query::error::{
ExecuteSnafu, InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result,
UnsupportedInputDataTypeSnafu,
InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::Signature;
use datafusion::logical_expr::Volatility;
use datatypes::value::{Value, ValueRef};
use session::context::QueryContextRef;
use snafu::{ensure, ResultExt};
use sql::parser::ParserContext;
use snafu::ensure;
use store_api::storage::ConcreteDataType;
use crate::handlers::FlowServiceHandlerRef;
use crate::helper::parse_catalog_flow;
fn flush_signature() -> Signature {
Signature::uniform(
@@ -47,20 +45,6 @@ pub(crate) async fn flush_flow(
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
let (catalog_name, flow_name) = parse_flush_flow(params, query_ctx)?;
let res = flow_service_handler
.flush(&catalog_name, &flow_name, query_ctx.clone())
.await?;
let affected_rows = res.affected_rows;
Ok(Value::from(affected_rows))
}
fn parse_flush_flow(
params: &[ValueRef<'_>],
query_ctx: &QueryContextRef,
) -> Result<(String, String)> {
ensure!(
params.len() == 1,
InvalidFuncArgsSnafu {
@@ -70,7 +54,6 @@ fn parse_flush_flow(
),
}
);
let ValueRef::String(flow_name) = params[0] else {
return UnsupportedInputDataTypeSnafu {
function: "flush_flow",
@@ -78,27 +61,14 @@ fn parse_flush_flow(
}
.fail();
};
let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect())
.map_err(BoxedError::new)
.context(ExecuteSnafu)?;
let (catalog_name, flow_name) = parse_catalog_flow(flow_name, query_ctx)?;
let (catalog_name, flow_name) = match &obj_name.0[..] {
[flow_name] => (
query_ctx.current_catalog().to_string(),
flow_name.value.clone(),
),
[catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
_ => {
return InvalidFuncArgsSnafu {
err_msg: format!(
"expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
obj_name
),
}
.fail()
}
};
Ok((catalog_name, flow_name))
let res = flow_service_handler
.flush(&catalog_name, &flow_name, query_ctx.clone())
.await?;
let affected_rows = res.affected_rows;
Ok(Value::from(affected_rows))
}
#[cfg(test)]
@@ -154,10 +124,7 @@ mod test {
("catalog.flow_name", ("catalog", "flow_name")),
];
for (input, expected) in testcases.iter() {
let args = vec![*input];
let args = args.into_iter().map(ValueRef::String).collect::<Vec<_>>();
let result = parse_flush_flow(&args, &QueryContext::arc()).unwrap();
let result = parse_catalog_flow(input, &QueryContext::arc()).unwrap();
assert_eq!(*expected, (result.0.as_str(), result.1.as_str()));
}
}

View File

@@ -1,63 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use datafusion_expr::ScalarUDF;
use crate::function::{FunctionContext, FunctionRef};
use crate::scalars::udf::create_udf;
/// A factory for creating `ScalarUDF` that require a function context.
#[derive(Clone)]
pub struct ScalarFunctionFactory {
name: String,
factory: Arc<dyn Fn(FunctionContext) -> ScalarUDF + Send + Sync>,
}
impl ScalarFunctionFactory {
/// Returns the name of the function.
pub fn name(&self) -> &str {
&self.name
}
/// Returns a `ScalarUDF` when given a function context.
pub fn provide(&self, ctx: FunctionContext) -> ScalarUDF {
(self.factory)(ctx)
}
}
impl From<ScalarUDF> for ScalarFunctionFactory {
fn from(df_udf: ScalarUDF) -> Self {
let name = df_udf.name().to_string();
let func = Arc::new(move |_ctx| df_udf.clone());
Self {
name,
factory: func,
}
}
}
impl From<FunctionRef> for ScalarFunctionFactory {
fn from(func: FunctionRef) -> Self {
let name = func.name().to_string();
let func = Arc::new(move |ctx: FunctionContext| {
create_udf(func.clone(), ctx.query_ctx, ctx.state)
});
Self {
name,
factory: func,
}
}
}

View File

@@ -16,14 +16,11 @@
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use datafusion_expr::AggregateUDF;
use once_cell::sync::Lazy;
use crate::admin::AdminFunction;
use crate::aggrs::approximate::ApproximateFunction;
use crate::aggrs::vector::VectorFunction as VectorAggrFunction;
use crate::function::{AsyncFunctionRef, Function, FunctionRef};
use crate::function_factory::ScalarFunctionFactory;
use crate::function::{AsyncFunctionRef, FunctionRef};
use crate::scalars::aggregate::{AggregateFunctionMetaRef, AggregateFunctions};
use crate::scalars::date::DateFunction;
use crate::scalars::expression::ExpressionFunction;
use crate::scalars::hll_count::HllCalcFunction;
@@ -34,19 +31,18 @@ use crate::scalars::matches_term::MatchesTermFunction;
use crate::scalars::math::MathFunction;
use crate::scalars::timestamp::TimestampFunction;
use crate::scalars::uddsketch_calc::UddSketchCalcFunction;
use crate::scalars::vector::VectorFunction as VectorScalarFunction;
use crate::scalars::vector::VectorFunction;
use crate::system::SystemFunction;
#[derive(Default)]
pub struct FunctionRegistry {
functions: RwLock<HashMap<String, ScalarFunctionFactory>>,
functions: RwLock<HashMap<String, FunctionRef>>,
async_functions: RwLock<HashMap<String, AsyncFunctionRef>>,
aggregate_functions: RwLock<HashMap<String, AggregateUDF>>,
aggregate_functions: RwLock<HashMap<String, AggregateFunctionMetaRef>>,
}
impl FunctionRegistry {
pub fn register(&self, func: impl Into<ScalarFunctionFactory>) {
let func = func.into();
pub fn register(&self, func: FunctionRef) {
let _ = self
.functions
.write()
@@ -54,10 +50,6 @@ impl FunctionRegistry {
.insert(func.name().to_string(), func);
}
pub fn register_scalar(&self, func: impl Function + 'static) {
self.register(Arc::new(func) as FunctionRef);
}
pub fn register_async(&self, func: AsyncFunctionRef) {
let _ = self
.async_functions
@@ -66,14 +58,6 @@ impl FunctionRegistry {
.insert(func.name().to_string(), func);
}
pub fn register_aggr(&self, func: AggregateUDF) {
let _ = self
.aggregate_functions
.write()
.unwrap()
.insert(func.name().to_string(), func);
}
pub fn get_async_function(&self, name: &str) -> Option<AsyncFunctionRef> {
self.async_functions.read().unwrap().get(name).cloned()
}
@@ -87,16 +71,27 @@ impl FunctionRegistry {
.collect()
}
#[cfg(test)]
pub fn get_function(&self, name: &str) -> Option<ScalarFunctionFactory> {
pub fn register_aggregate_function(&self, func: AggregateFunctionMetaRef) {
let _ = self
.aggregate_functions
.write()
.unwrap()
.insert(func.name(), func);
}
pub fn get_aggr_function(&self, name: &str) -> Option<AggregateFunctionMetaRef> {
self.aggregate_functions.read().unwrap().get(name).cloned()
}
pub fn get_function(&self, name: &str) -> Option<FunctionRef> {
self.functions.read().unwrap().get(name).cloned()
}
pub fn scalar_functions(&self) -> Vec<ScalarFunctionFactory> {
pub fn functions(&self) -> Vec<FunctionRef> {
self.functions.read().unwrap().values().cloned().collect()
}
pub fn aggregate_functions(&self) -> Vec<AggregateUDF> {
pub fn aggregate_functions(&self) -> Vec<AggregateFunctionMetaRef> {
self.aggregate_functions
.read()
.unwrap()
@@ -117,6 +112,9 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
UddSketchCalcFunction::register(&function_registry);
HllCalcFunction::register(&function_registry);
// Aggregate functions
AggregateFunctions::register(&function_registry);
// Full text search function
MatchesFunction::register(&function_registry);
MatchesTermFunction::register(&function_registry);
@@ -129,21 +127,15 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
JsonFunction::register(&function_registry);
// Vector related functions
VectorScalarFunction::register(&function_registry);
VectorAggrFunction::register(&function_registry);
VectorFunction::register(&function_registry);
// Geo functions
#[cfg(feature = "geo")]
crate::scalars::geo::GeoFunctions::register(&function_registry);
#[cfg(feature = "geo")]
crate::aggrs::geo::GeoFunction::register(&function_registry);
// Ip functions
IpFunctions::register(&function_registry);
// Approximate functions
ApproximateFunction::register(&function_registry);
Arc::new(function_registry)
});
@@ -155,11 +147,12 @@ mod tests {
#[test]
fn test_function_registry() {
let registry = FunctionRegistry::default();
let func = Arc::new(TestAndFunction);
assert!(registry.get_function("test_and").is_none());
assert!(registry.scalar_functions().is_empty());
registry.register_scalar(TestAndFunction);
assert!(registry.functions().is_empty());
registry.register(func);
let _ = registry.get_function("test_and").unwrap();
assert_eq!(1, registry.scalar_functions().len());
assert_eq!(1, registry.functions().len());
}
}

View File

@@ -87,6 +87,15 @@ pub trait FlowServiceHandler: Send + Sync {
flow: &str,
ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse>;
async fn adjust(
&self,
catalog: &str,
flow: &str,
min_run_interval_secs: u64,
max_filter_num_per_query: usize,
ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse>;
}
pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>;

View File

@@ -12,12 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_query::error::{InvalidInputTypeSnafu, Result};
use common_error::ext::BoxedError;
use common_query::error::{ExecuteSnafu, InvalidFuncArgsSnafu, InvalidInputTypeSnafu, Result};
use common_query::prelude::{Signature, TypeSignature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::types::cast::cast;
use datatypes::value::ValueRef;
use session::context::QueryContextRef;
use snafu::ResultExt;
use sql::parser::ParserContext;
/// Create a function signature with oneof signatures of interleaving two arguments.
pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>) -> Signature {
@@ -43,3 +46,30 @@ pub fn cast_u64(value: &ValueRef) -> Result<Option<u64>> {
})
.map(|v| v.as_u64())
}
pub fn parse_catalog_flow(
flow_name: &str,
query_ctx: &QueryContextRef,
) -> Result<(String, String)> {
let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect())
.map_err(BoxedError::new)
.context(ExecuteSnafu)?;
let (catalog_name, flow_name) = match &obj_name.0[..] {
[flow_name] => (
query_ctx.current_catalog().to_string(),
flow_name.value.clone(),
),
[catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
_ => {
return InvalidFuncArgsSnafu {
err_msg: format!(
"expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
obj_name
),
}
.fail()
}
};
Ok((catalog_name, flow_name))
}

View File

@@ -15,17 +15,17 @@
#![feature(let_chains)]
#![feature(try_blocks)]
mod adjust_flow;
mod admin;
mod flush_flow;
mod macros;
pub mod scalars;
mod system;
pub mod aggrs;
pub mod aggr;
pub mod function;
pub mod function_factory;
pub mod function_registry;
pub mod handlers;
pub mod helper;
pub mod scalars;
pub mod state;
pub mod utils;

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod aggregate;
pub(crate) mod date;
pub mod expression;
#[cfg(feature = "geo")]

View File

@@ -0,0 +1,89 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! # Deprecate Warning:
//!
//! This module is deprecated and will be removed in the future.
//! All UDAF implementation here are not maintained and should
//! not be used before they are refactored into the `src/aggr`
//! version.
use std::sync::Arc;
use common_query::logical_plan::AggregateFunctionCreatorRef;
use crate::function_registry::FunctionRegistry;
use crate::scalars::vector::product::VectorProductCreator;
use crate::scalars::vector::sum::VectorSumCreator;
/// A function creates `AggregateFunctionCreator`.
/// "Aggregator" *is* AggregatorFunction. Since the later one is long, we named an short alias for it.
/// The two names might be used interchangeably.
type AggregatorCreatorFunction = Arc<dyn Fn() -> AggregateFunctionCreatorRef + Send + Sync>;
/// `AggregateFunctionMeta` dynamically creates AggregateFunctionCreator.
#[derive(Clone)]
pub struct AggregateFunctionMeta {
name: String,
args_count: u8,
creator: AggregatorCreatorFunction,
}
pub type AggregateFunctionMetaRef = Arc<AggregateFunctionMeta>;
impl AggregateFunctionMeta {
pub fn new(name: &str, args_count: u8, creator: AggregatorCreatorFunction) -> Self {
Self {
name: name.to_string(),
args_count,
creator,
}
}
pub fn name(&self) -> String {
self.name.to_string()
}
pub fn args_count(&self) -> u8 {
self.args_count
}
pub fn create(&self) -> AggregateFunctionCreatorRef {
(self.creator)()
}
}
pub(crate) struct AggregateFunctions;
impl AggregateFunctions {
pub fn register(registry: &FunctionRegistry) {
registry.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
"vec_sum",
1,
Arc::new(|| Arc::new(VectorSumCreator::default())),
)));
registry.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
"vec_product",
1,
Arc::new(|| Arc::new(VectorProductCreator::default())),
)));
#[cfg(feature = "geo")]
registry.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
"json_encode_path",
3,
Arc::new(|| Arc::new(super::geo::encoding::JsonPathEncodeFunctionCreator::default())),
)));
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
mod date_add;
mod date_format;
mod date_sub;
@@ -26,8 +27,8 @@ pub(crate) struct DateFunction;
impl DateFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(DateAddFunction);
registry.register_scalar(DateSubFunction);
registry.register_scalar(DateFormatFunction);
registry.register(Arc::new(DateAddFunction));
registry.register(Arc::new(DateSubFunction));
registry.register(Arc::new(DateFormatFunction));
}
}

View File

@@ -17,6 +17,8 @@ mod ctx;
mod is_null;
mod unary;
use std::sync::Arc;
pub use binary::scalar_binary_op;
pub use ctx::EvalContext;
pub use unary::scalar_unary_op;
@@ -28,6 +30,6 @@ pub(crate) struct ExpressionFunction;
impl ExpressionFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(IsNullFunction);
registry.register(Arc::new(IsNullFunction));
}
}

View File

@@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
pub(crate) mod encoding;
mod geohash;
mod h3;
pub(crate) mod helpers;
mod helpers;
mod measure;
mod relation;
mod s2;
@@ -27,57 +29,57 @@ pub(crate) struct GeoFunctions;
impl GeoFunctions {
pub fn register(registry: &FunctionRegistry) {
// geohash
registry.register_scalar(geohash::GeohashFunction);
registry.register_scalar(geohash::GeohashNeighboursFunction);
registry.register(Arc::new(geohash::GeohashFunction));
registry.register(Arc::new(geohash::GeohashNeighboursFunction));
// h3 index
registry.register_scalar(h3::H3LatLngToCell);
registry.register_scalar(h3::H3LatLngToCellString);
registry.register(Arc::new(h3::H3LatLngToCell));
registry.register(Arc::new(h3::H3LatLngToCellString));
// h3 index inspection
registry.register_scalar(h3::H3CellBase);
registry.register_scalar(h3::H3CellIsPentagon);
registry.register_scalar(h3::H3StringToCell);
registry.register_scalar(h3::H3CellToString);
registry.register_scalar(h3::H3CellCenterLatLng);
registry.register_scalar(h3::H3CellResolution);
registry.register(Arc::new(h3::H3CellBase));
registry.register(Arc::new(h3::H3CellIsPentagon));
registry.register(Arc::new(h3::H3StringToCell));
registry.register(Arc::new(h3::H3CellToString));
registry.register(Arc::new(h3::H3CellCenterLatLng));
registry.register(Arc::new(h3::H3CellResolution));
// h3 hierarchical grid
registry.register_scalar(h3::H3CellCenterChild);
registry.register_scalar(h3::H3CellParent);
registry.register_scalar(h3::H3CellToChildren);
registry.register_scalar(h3::H3CellToChildrenSize);
registry.register_scalar(h3::H3CellToChildPos);
registry.register_scalar(h3::H3ChildPosToCell);
registry.register_scalar(h3::H3CellContains);
registry.register(Arc::new(h3::H3CellCenterChild));
registry.register(Arc::new(h3::H3CellParent));
registry.register(Arc::new(h3::H3CellToChildren));
registry.register(Arc::new(h3::H3CellToChildrenSize));
registry.register(Arc::new(h3::H3CellToChildPos));
registry.register(Arc::new(h3::H3ChildPosToCell));
registry.register(Arc::new(h3::H3CellContains));
// h3 grid traversal
registry.register_scalar(h3::H3GridDisk);
registry.register_scalar(h3::H3GridDiskDistances);
registry.register_scalar(h3::H3GridDistance);
registry.register_scalar(h3::H3GridPathCells);
registry.register(Arc::new(h3::H3GridDisk));
registry.register(Arc::new(h3::H3GridDiskDistances));
registry.register(Arc::new(h3::H3GridDistance));
registry.register(Arc::new(h3::H3GridPathCells));
// h3 measurement
registry.register_scalar(h3::H3CellDistanceSphereKm);
registry.register_scalar(h3::H3CellDistanceEuclideanDegree);
registry.register(Arc::new(h3::H3CellDistanceSphereKm));
registry.register(Arc::new(h3::H3CellDistanceEuclideanDegree));
// s2
registry.register_scalar(s2::S2LatLngToCell);
registry.register_scalar(s2::S2CellLevel);
registry.register_scalar(s2::S2CellToToken);
registry.register_scalar(s2::S2CellParent);
registry.register(Arc::new(s2::S2LatLngToCell));
registry.register(Arc::new(s2::S2CellLevel));
registry.register(Arc::new(s2::S2CellToToken));
registry.register(Arc::new(s2::S2CellParent));
// spatial data type
registry.register_scalar(wkt::LatLngToPointWkt);
registry.register(Arc::new(wkt::LatLngToPointWkt));
// spatial relation
registry.register_scalar(relation::STContains);
registry.register_scalar(relation::STWithin);
registry.register_scalar(relation::STIntersects);
registry.register(Arc::new(relation::STContains));
registry.register(Arc::new(relation::STWithin));
registry.register(Arc::new(relation::STIntersects));
// spatial measure
registry.register_scalar(measure::STDistance);
registry.register_scalar(measure::STDistanceSphere);
registry.register_scalar(measure::STArea);
registry.register(Arc::new(measure::STDistance));
registry.register(Arc::new(measure::STDistanceSphere));
registry.register(Arc::new(measure::STArea));
}
}

View File

@@ -19,12 +19,9 @@ use common_error::status_code::StatusCode;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{self, InvalidInputStateSnafu, Result};
use common_query::logical_plan::accumulator::AggrFuncTypeStore;
use common_query::logical_plan::{
create_aggregate_function, Accumulator, AggregateFunctionCreator,
};
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
use common_query::prelude::AccumulatorCreatorFunction;
use common_time::Timestamp;
use datafusion_expr::AggregateUDF;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::{ListValue, Value};
use datatypes::vectors::VectorRef;
@@ -50,16 +47,6 @@ impl JsonPathAccumulator {
timestamp_type,
}
}
/// Create a new `AggregateUDF` for the `json_encode_path` aggregate function.
pub fn uadf_impl() -> AggregateUDF {
create_aggregate_function(
"json_encode_path".to_string(),
3,
Arc::new(JsonPathEncodeFunctionCreator::default()),
)
.into()
}
}
impl Accumulator for JsonPathAccumulator {

View File

@@ -37,7 +37,7 @@ macro_rules! ensure_columns_len {
};
}
pub(crate) use ensure_columns_len;
pub(super) use ensure_columns_len;
macro_rules! ensure_columns_n {
($columns:ident, $n:literal) => {
@@ -58,7 +58,7 @@ macro_rules! ensure_columns_n {
};
}
pub(crate) use ensure_columns_n;
pub(super) use ensure_columns_n;
macro_rules! ensure_and_coerce {
($compare:expr, $coerce:expr) => {{
@@ -72,4 +72,4 @@ macro_rules! ensure_and_coerce {
}};
}
pub(crate) use ensure_and_coerce;
pub(super) use ensure_and_coerce;

View File

@@ -16,6 +16,7 @@
use std::fmt;
use std::fmt::Display;
use std::sync::Arc;
use common_query::error::{DowncastVectorSnafu, InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, Volatility};
@@ -26,7 +27,7 @@ use datatypes::vectors::{BinaryVector, MutableVector, UInt64VectorBuilder, Vecto
use hyperloglogplus::HyperLogLog;
use snafu::OptionExt;
use crate::aggrs::approximate::hll::HllStateType;
use crate::aggr::HllStateType;
use crate::function::{Function, FunctionContext};
use crate::function_registry::FunctionRegistry;
@@ -43,7 +44,7 @@ pub struct HllCalcFunction;
impl HllCalcFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(HllCalcFunction);
registry.register(Arc::new(HllCalcFunction));
}
}
@@ -116,8 +117,6 @@ impl Function for HllCalcFunction {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use datatypes::vectors::BinaryVector;
use super::*;

View File

@@ -17,6 +17,8 @@ mod ipv4;
mod ipv6;
mod range;
use std::sync::Arc;
use cidr::{Ipv4ToCidr, Ipv6ToCidr};
use ipv4::{Ipv4NumToString, Ipv4StringToNum};
use ipv6::{Ipv6NumToString, Ipv6StringToNum};
@@ -29,15 +31,15 @@ pub(crate) struct IpFunctions;
impl IpFunctions {
pub fn register(registry: &FunctionRegistry) {
// Register IPv4 functions
registry.register_scalar(Ipv4NumToString);
registry.register_scalar(Ipv4StringToNum);
registry.register_scalar(Ipv4ToCidr);
registry.register_scalar(Ipv4InRange);
registry.register(Arc::new(Ipv4NumToString));
registry.register(Arc::new(Ipv4StringToNum));
registry.register(Arc::new(Ipv4ToCidr));
registry.register(Arc::new(Ipv4InRange));
// Register IPv6 functions
registry.register_scalar(Ipv6NumToString);
registry.register_scalar(Ipv6StringToNum);
registry.register_scalar(Ipv6ToCidr);
registry.register_scalar(Ipv6InRange);
registry.register(Arc::new(Ipv6NumToString));
registry.register(Arc::new(Ipv6StringToNum));
registry.register(Arc::new(Ipv6ToCidr));
registry.register(Arc::new(Ipv6InRange));
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
pub mod json_get;
mod json_is;
mod json_path_exists;
@@ -32,23 +33,23 @@ pub(crate) struct JsonFunction;
impl JsonFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(JsonToStringFunction);
registry.register_scalar(ParseJsonFunction);
registry.register(Arc::new(JsonToStringFunction));
registry.register(Arc::new(ParseJsonFunction));
registry.register_scalar(JsonGetInt);
registry.register_scalar(JsonGetFloat);
registry.register_scalar(JsonGetString);
registry.register_scalar(JsonGetBool);
registry.register(Arc::new(JsonGetInt));
registry.register(Arc::new(JsonGetFloat));
registry.register(Arc::new(JsonGetString));
registry.register(Arc::new(JsonGetBool));
registry.register_scalar(JsonIsNull);
registry.register_scalar(JsonIsInt);
registry.register_scalar(JsonIsFloat);
registry.register_scalar(JsonIsString);
registry.register_scalar(JsonIsBool);
registry.register_scalar(JsonIsArray);
registry.register_scalar(JsonIsObject);
registry.register(Arc::new(JsonIsNull));
registry.register(Arc::new(JsonIsInt));
registry.register(Arc::new(JsonIsFloat));
registry.register(Arc::new(JsonIsString));
registry.register(Arc::new(JsonIsBool));
registry.register(Arc::new(JsonIsArray));
registry.register(Arc::new(JsonIsObject));
registry.register_scalar(json_path_exists::JsonPathExistsFunction);
registry.register_scalar(json_path_match::JsonPathMatchFunction);
registry.register(Arc::new(json_path_exists::JsonPathExistsFunction));
registry.register(Arc::new(json_path_match::JsonPathMatchFunction));
}
}

View File

@@ -38,11 +38,11 @@ use crate::function_registry::FunctionRegistry;
///
/// Usage: matches(`<col>`, `<pattern>`) -> boolean
#[derive(Clone, Debug, Default)]
pub struct MatchesFunction;
pub(crate) struct MatchesFunction;
impl MatchesFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(MatchesFunction);
registry.register(Arc::new(MatchesFunction));
}
}

View File

@@ -77,7 +77,7 @@ pub struct MatchesTermFunction;
impl MatchesTermFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(MatchesTermFunction);
registry.register(Arc::new(MatchesTermFunction));
}
}

View File

@@ -18,6 +18,7 @@ mod pow;
mod rate;
use std::fmt;
use std::sync::Arc;
pub use clamp::{ClampFunction, ClampMaxFunction, ClampMinFunction};
use common_query::error::{GeneralDataFusionSnafu, Result};
@@ -38,13 +39,13 @@ pub(crate) struct MathFunction;
impl MathFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(ModuloFunction);
registry.register_scalar(PowFunction);
registry.register_scalar(RateFunction);
registry.register_scalar(RangeFunction);
registry.register_scalar(ClampFunction);
registry.register_scalar(ClampMinFunction);
registry.register_scalar(ClampMaxFunction);
registry.register(Arc::new(ModuloFunction));
registry.register(Arc::new(PowFunction));
registry.register(Arc::new(RateFunction));
registry.register(Arc::new(RangeFunction));
registry.register(Arc::new(ClampFunction));
registry.register(Arc::new(ClampMinFunction));
registry.register(Arc::new(ClampMaxFunction));
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
mod to_unixtime;
use to_unixtime::ToUnixtimeFunction;
@@ -22,6 +23,6 @@ pub(crate) struct TimestampFunction;
impl TimestampFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(ToUnixtimeFunction);
registry.register(Arc::new(ToUnixtimeFunction));
}
}

View File

@@ -16,6 +16,7 @@
use std::fmt;
use std::fmt::Display;
use std::sync::Arc;
use common_query::error::{DowncastVectorSnafu, InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, Volatility};
@@ -43,7 +44,7 @@ pub struct UddSketchCalcFunction;
impl UddSketchCalcFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(UddSketchCalcFunction);
registry.register(Arc::new(UddSketchCalcFunction));
}
}

View File

@@ -17,8 +17,10 @@ mod distance;
mod elem_product;
mod elem_sum;
pub mod impl_conv;
pub(crate) mod product;
mod scalar_add;
mod scalar_mul;
pub(crate) mod sum;
mod vector_add;
mod vector_dim;
mod vector_div;
@@ -28,34 +30,37 @@ mod vector_norm;
mod vector_sub;
mod vector_subvector;
use std::sync::Arc;
use crate::function_registry::FunctionRegistry;
pub(crate) struct VectorFunction;
impl VectorFunction {
pub fn register(registry: &FunctionRegistry) {
// conversion
registry.register_scalar(convert::ParseVectorFunction);
registry.register_scalar(convert::VectorToStringFunction);
registry.register(Arc::new(convert::ParseVectorFunction));
registry.register(Arc::new(convert::VectorToStringFunction));
// distance
registry.register_scalar(distance::CosDistanceFunction);
registry.register_scalar(distance::DotProductFunction);
registry.register_scalar(distance::L2SqDistanceFunction);
registry.register(Arc::new(distance::CosDistanceFunction));
registry.register(Arc::new(distance::DotProductFunction));
registry.register(Arc::new(distance::L2SqDistanceFunction));
// scalar calculation
registry.register_scalar(scalar_add::ScalarAddFunction);
registry.register_scalar(scalar_mul::ScalarMulFunction);
registry.register(Arc::new(scalar_add::ScalarAddFunction));
registry.register(Arc::new(scalar_mul::ScalarMulFunction));
// vector calculation
registry.register_scalar(vector_add::VectorAddFunction);
registry.register_scalar(vector_sub::VectorSubFunction);
registry.register_scalar(vector_mul::VectorMulFunction);
registry.register_scalar(vector_div::VectorDivFunction);
registry.register_scalar(vector_norm::VectorNormFunction);
registry.register_scalar(vector_dim::VectorDimFunction);
registry.register_scalar(vector_kth_elem::VectorKthElemFunction);
registry.register_scalar(vector_subvector::VectorSubvectorFunction);
registry.register_scalar(elem_sum::ElemSumFunction);
registry.register_scalar(elem_product::ElemProductFunction);
registry.register(Arc::new(vector_add::VectorAddFunction));
registry.register(Arc::new(vector_sub::VectorSubFunction));
registry.register(Arc::new(vector_mul::VectorMulFunction));
registry.register(Arc::new(vector_div::VectorDivFunction));
registry.register(Arc::new(vector_norm::VectorNormFunction));
registry.register(Arc::new(vector_dim::VectorDimFunction));
registry.register(Arc::new(vector_kth_elem::VectorKthElemFunction));
registry.register(Arc::new(vector_subvector::VectorSubvectorFunction));
registry.register(Arc::new(elem_sum::ElemSumFunction));
registry.register(Arc::new(elem_product::ElemProductFunction));
}
}

View File

@@ -16,11 +16,8 @@ use std::sync::Arc;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{CreateAccumulatorSnafu, Error, InvalidFuncArgsSnafu};
use common_query::logical_plan::{
create_aggregate_function, Accumulator, AggregateFunctionCreator,
};
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
use common_query::prelude::AccumulatorCreatorFunction;
use datafusion_expr::AggregateUDF;
use datatypes::prelude::{ConcreteDataType, Value, *};
use datatypes::vectors::VectorRef;
use nalgebra::{Const, DVectorView, Dyn, OVector};
@@ -78,16 +75,6 @@ impl AggregateFunctionCreator for VectorProductCreator {
}
impl VectorProduct {
/// Create a new `AggregateUDF` for the `vec_product` aggregate function.
pub fn uadf_impl() -> AggregateUDF {
create_aggregate_function(
"vec_product".to_string(),
1,
Arc::new(VectorProductCreator::default()),
)
.into()
}
fn inner(&mut self, len: usize) -> &mut OVector<f32, Dyn> {
self.product.get_or_insert_with(|| {
OVector::from_iterator_generic(Dyn(len), Const::<1>, (0..len).map(|_| 1.0))

View File

@@ -16,11 +16,8 @@ use std::sync::Arc;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{CreateAccumulatorSnafu, Error, InvalidFuncArgsSnafu};
use common_query::logical_plan::{
create_aggregate_function, Accumulator, AggregateFunctionCreator,
};
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
use common_query::prelude::AccumulatorCreatorFunction;
use datafusion_expr::AggregateUDF;
use datatypes::prelude::{ConcreteDataType, Value, *};
use datatypes::vectors::VectorRef;
use nalgebra::{Const, DVectorView, Dyn, OVector};
@@ -28,7 +25,6 @@ use snafu::ensure;
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const, veclit_to_binlit};
/// The accumulator for the `vec_sum` aggregate function.
#[derive(Debug, Default)]
pub struct VectorSum {
sum: Option<OVector<f32, Dyn>>,
@@ -78,16 +74,6 @@ impl AggregateFunctionCreator for VectorSumCreator {
}
impl VectorSum {
/// Create a new `AggregateUDF` for the `vec_sum` aggregate function.
pub fn uadf_impl() -> AggregateUDF {
create_aggregate_function(
"vec_sum".to_string(),
1,
Arc::new(VectorSumCreator::default()),
)
.into()
}
fn inner(&mut self, len: usize) -> &mut OVector<f32, Dyn> {
self.sum
.get_or_insert_with(|| OVector::zeros_generic(Dyn(len), Const::<1>))

View File

@@ -148,6 +148,17 @@ impl FunctionState {
) -> Result<api::v1::flow::FlowResponse> {
todo!()
}
async fn adjust(
&self,
_catalog: &str,
_flow: &str,
_min_run_interval_secs: u64,
_max_filter_num_per_query: usize,
_ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse> {
todo!()
}
}
Self {

View File

@@ -36,13 +36,13 @@ pub(crate) struct SystemFunction;
impl SystemFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(BuildFunction);
registry.register_scalar(VersionFunction);
registry.register_scalar(CurrentSchemaFunction);
registry.register_scalar(DatabaseFunction);
registry.register_scalar(SessionUserFunction);
registry.register_scalar(ReadPreferenceFunction);
registry.register_scalar(TimezoneFunction);
registry.register(Arc::new(BuildFunction));
registry.register(Arc::new(VersionFunction));
registry.register(Arc::new(CurrentSchemaFunction));
registry.register(Arc::new(DatabaseFunction));
registry.register(Arc::new(SessionUserFunction));
registry.register(Arc::new(ReadPreferenceFunction));
registry.register(Arc::new(TimezoneFunction));
registry.register_async(Arc::new(ProcedureStateFunction));
PGCatalogFunction::register(registry);
}

View File

@@ -16,6 +16,8 @@ mod pg_get_userbyid;
mod table_is_visible;
mod version;
use std::sync::Arc;
use pg_get_userbyid::PGGetUserByIdFunction;
use table_is_visible::PGTableIsVisibleFunction;
use version::PGVersionFunction;
@@ -33,8 +35,8 @@ pub(super) struct PGCatalogFunction;
impl PGCatalogFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(PGTableIsVisibleFunction);
registry.register_scalar(PGGetUserByIdFunction);
registry.register_scalar(PGVersionFunction);
registry.register(Arc::new(PGTableIsVisibleFunction));
registry.register(Arc::new(PGGetUserByIdFunction));
registry.register(Arc::new(PGVersionFunction));
}
}

View File

@@ -296,8 +296,6 @@ pub struct ChannelConfig {
pub max_recv_message_size: ReadableSize,
// Max gRPC sending(encoding) message size
pub max_send_message_size: ReadableSize,
pub send_compression: bool,
pub accept_compression: bool,
}
impl Default for ChannelConfig {
@@ -318,8 +316,6 @@ impl Default for ChannelConfig {
client_tls: None,
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
send_compression: false,
accept_compression: false,
}
}
}
@@ -570,8 +566,6 @@ mod tests {
client_tls: None,
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
send_compression: false,
accept_compression: false,
},
default_cfg
);
@@ -616,8 +610,6 @@ mod tests {
}),
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
send_compression: false,
accept_compression: false,
},
cfg
);

View File

@@ -64,19 +64,6 @@ impl Default for FlightEncoder {
}
impl FlightEncoder {
/// Creates new [FlightEncoder] with compression disabled.
pub fn with_compression_disabled() -> Self {
let write_options = writer::IpcWriteOptions::default()
.try_with_compression(None)
.unwrap();
Self {
write_options,
data_gen: writer::IpcDataGenerator::default(),
dictionary_tracker: writer::DictionaryTracker::new(false),
}
}
pub fn encode(&mut self, flight_message: FlightMessage) -> FlightData {
match flight_message {
FlightMessage::Schema(schema) => SchemaAsIpc::new(&schema, &self.write_options).into(),

View File

@@ -1001,7 +1001,7 @@ impl ErrorExt for Error {
}
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
RdsTransactionRetryFailed { .. } => StatusCode::Internal,
DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
Error::DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
}
}

View File

@@ -70,12 +70,11 @@ impl MetadataKey<'_, ViewInfoKey> for ViewInfoKey {
}
.build()
})?;
let captures =
VIEW_INFO_KEY_PATTERN
.captures(key)
.with_context(|| InvalidViewInfoSnafu {
err_msg: format!("Invalid ViewInfoKey '{key}'"),
})?;
let captures = VIEW_INFO_KEY_PATTERN
.captures(key)
.context(InvalidViewInfoSnafu {
err_msg: format!("Invalid ViewInfoKey '{key}'"),
})?;
// Safety: pass the regex check above
let view_id = captures[1].parse::<TableId>().unwrap();
Ok(ViewInfoKey { view_id })

View File

@@ -14,7 +14,6 @@
pub mod file;
use std::borrow::Cow;
use std::fmt::{Display, Formatter};
use std::path::{Path, PathBuf};
use std::time::Instant;
@@ -272,49 +271,6 @@ impl MetadataSnapshotManager {
Ok((filename.to_string(), num_keyvalues as u64))
}
fn format_output(key: Cow<'_, str>, value: Cow<'_, str>) -> String {
format!("{} => {}", key, value)
}
pub async fn info(
object_store: &ObjectStore,
file_path: &str,
query_str: &str,
limit: Option<usize>,
) -> Result<Vec<String>> {
let path = Path::new(file_path);
let file_name = path
.file_name()
.and_then(|s| s.to_str())
.context(InvalidFilePathSnafu { file_path })?;
let filename = FileName::try_from(file_name)?;
let data = object_store
.read(file_path)
.await
.context(ReadObjectSnafu { file_path })?;
let document = Document::from_slice(&filename.extension.format, &data.to_bytes())?;
let metadata_content = document.into_metadata_content()?.values();
let mut results = Vec::with_capacity(limit.unwrap_or(256));
for kv in metadata_content {
let key_str = String::from_utf8_lossy(&kv.key);
if let Some(prefix) = query_str.strip_suffix('*') {
if key_str.starts_with(prefix) {
let value_str = String::from_utf8_lossy(&kv.value);
results.push(Self::format_output(key_str, value_str));
}
} else if key_str == query_str {
let value_str = String::from_utf8_lossy(&kv.value);
results.push(Self::format_output(key_str, value_str));
}
if results.len() == limit.unwrap_or(usize::MAX) {
break;
}
}
Ok(results)
}
}
#[cfg(test)]

View File

@@ -111,11 +111,6 @@ impl MetadataContent {
pub fn into_iter(self) -> impl Iterator<Item = KeyValue> {
self.values.into_iter()
}
/// Returns the key-value pairs as a vector.
pub fn values(self) -> Vec<KeyValue> {
self.values
}
}
/// The key-value pair of the backup file.

View File

@@ -372,7 +372,6 @@ impl DatanodeBuilder {
opts.max_concurrent_queries,
//TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter.
Duration::from_millis(100),
opts.grpc.flight_compression,
);
let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;

View File

@@ -50,7 +50,6 @@ use query::QueryEngineRef;
use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
use servers::grpc::region_server::RegionServerHandler;
use servers::grpc::FlightCompression;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metric_engine_consts::{
@@ -81,7 +80,6 @@ use crate::event_listener::RegionServerEventListenerRef;
#[derive(Clone)]
pub struct RegionServer {
inner: Arc<RegionServerInner>,
flight_compression: FlightCompression,
}
pub struct RegionStat {
@@ -95,7 +93,6 @@ impl RegionServer {
query_engine: QueryEngineRef,
runtime: Runtime,
event_listener: RegionServerEventListenerRef,
flight_compression: FlightCompression,
) -> Self {
Self::with_table_provider(
query_engine,
@@ -104,7 +101,6 @@ impl RegionServer {
Arc::new(DummyTableProviderFactory),
0,
Duration::from_millis(0),
flight_compression,
)
}
@@ -115,7 +111,6 @@ impl RegionServer {
table_provider_factory: TableProviderFactoryRef,
max_concurrent_queries: usize,
concurrent_query_limiter_timeout: Duration,
flight_compression: FlightCompression,
) -> Self {
Self {
inner: Arc::new(RegionServerInner::new(
@@ -128,7 +123,6 @@ impl RegionServer {
concurrent_query_limiter_timeout,
),
)),
flight_compression,
}
}
@@ -542,11 +536,7 @@ impl FlightCraft for RegionServer {
.trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
.await?;
let stream = Box::pin(FlightRecordBatchStream::new(
result,
tracing_context,
self.flight_compression,
));
let stream = Box::pin(FlightRecordBatchStream::new(result, tracing_context));
Ok(Response::new(stream))
}
}

View File

@@ -19,16 +19,16 @@ use std::time::Duration;
use api::region::RegionResponse;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_function::function_factory::ScalarFunctionFactory;
use common_function::function::FunctionRef;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::Output;
use common_runtime::runtime::{BuilderBuild, RuntimeTrait};
use common_runtime::Runtime;
use datafusion_expr::{AggregateUDF, LogicalPlan};
use datafusion_expr::LogicalPlan;
use query::dataframe::DataFrame;
use query::planner::LogicalPlanner;
use query::query_engine::{DescribeResult, QueryEngineState};
use query::{QueryEngine, QueryEngineContext};
use servers::grpc::FlightCompression;
use session::context::QueryContextRef;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
@@ -76,9 +76,9 @@ impl QueryEngine for MockQueryEngine {
unimplemented!()
}
fn register_aggregate_function(&self, _func: AggregateUDF) {}
fn register_aggregate_function(&self, _func: AggregateFunctionMetaRef) {}
fn register_scalar_function(&self, _func: ScalarFunctionFactory) {}
fn register_function(&self, _func: FunctionRef) {}
fn read_table(&self, _table: TableRef) -> query::error::Result<DataFrame> {
unimplemented!()
@@ -98,7 +98,6 @@ pub fn mock_region_server() -> RegionServer {
Arc::new(MockQueryEngine),
Runtime::builder().build().unwrap(),
Box::new(NoopRegionServerEventListener),
FlightCompression::default(),
)
}

View File

@@ -326,14 +326,6 @@ impl Value {
}
}
/// Cast Value to [Duration]. Return None if value is not a valid duration data type.
pub fn as_duration(&self) -> Option<Duration> {
match self {
Value::Duration(d) => Some(*d),
_ => None,
}
}
/// Returns the logical type of the value.
pub fn logical_type_id(&self) -> LogicalTypeId {
match self {

View File

@@ -61,6 +61,7 @@ prost.workspace = true
query.workspace = true
rand.workspace = true
serde.workspace = true
serde_json.workspace = true
servers.workspace = true
session.workspace = true
smallvec.workspace = true

View File

@@ -18,7 +18,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use api::v1::flow::{
flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
flow_request, AdjustFlow, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
};
use api::v1::region::InsertRequests;
use catalog::CatalogManager;
@@ -32,6 +32,7 @@ use common_telemetry::{error, info, trace, warn};
use datatypes::value::Value;
use futures::TryStreamExt;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use session::context::QueryContextBuilder;
use snafu::{ensure, IntoError, OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId};
@@ -809,6 +810,25 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
..Default::default()
})
}
Some(flow_request::Body::Adjust(AdjustFlow { flow_id, options })) => {
#[derive(Debug, Serialize, Deserialize)]
struct Options {
min_run_interval_secs: u64,
max_filter_num_per_query: usize,
}
let options: Options = serde_json::from_str(&options).with_context(|_| {
common_meta::error::DeserializeFromJsonSnafu { input: options }
})?;
self.batching_engine
.adjust_flow(
flow_id.unwrap().id as u64,
options.min_run_interval_secs,
options.max_filter_num_per_query,
)
.await
.map_err(to_meta_err(snafu::location!()))?;
Ok(Default::default())
}
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
}
}
@@ -841,93 +861,6 @@ fn to_meta_err(
}
}
#[async_trait::async_trait]
impl common_meta::node_manager::Flownode for StreamingEngine {
async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
let query_ctx = request
.header
.and_then(|h| h.query_context)
.map(|ctx| ctx.into());
match request.body {
Some(flow_request::Body::Create(CreateRequest {
flow_id: Some(task_id),
source_table_ids,
sink_table_name: Some(sink_table_name),
create_if_not_exists,
expire_after,
comment,
sql,
flow_options,
or_replace,
})) => {
let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
let sink_table_name = [
sink_table_name.catalog_name,
sink_table_name.schema_name,
sink_table_name.table_name,
];
let expire_after = expire_after.map(|e| e.value);
let args = CreateFlowArgs {
flow_id: task_id.id as u64,
sink_table_name,
source_table_ids,
create_if_not_exists,
or_replace,
expire_after,
comment: Some(comment),
sql: sql.clone(),
flow_options,
query_ctx,
};
let ret = self
.create_flow(args)
.await
.map_err(BoxedError::new)
.with_context(|_| CreateFlowSnafu { sql: sql.clone() })
.map_err(to_meta_err(snafu::location!()))?;
METRIC_FLOW_TASK_COUNT.inc();
Ok(FlowResponse {
affected_flows: ret
.map(|id| greptime_proto::v1::FlowId { id: id as u32 })
.into_iter()
.collect_vec(),
..Default::default()
})
}
Some(flow_request::Body::Drop(DropRequest {
flow_id: Some(flow_id),
})) => {
self.remove_flow(flow_id.id as u64)
.await
.map_err(to_meta_err(snafu::location!()))?;
METRIC_FLOW_TASK_COUNT.dec();
Ok(Default::default())
}
Some(flow_request::Body::Flush(FlushFlow {
flow_id: Some(flow_id),
})) => {
let row = self
.flush_flow_inner(flow_id.id as u64)
.await
.map_err(to_meta_err(snafu::location!()))?;
Ok(FlowResponse {
affected_flows: vec![flow_id],
affected_rows: row as u64,
..Default::default()
})
}
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
}
}
async fn handle_inserts(&self, request: InsertRequests) -> MetaResult<FlowResponse> {
self.handle_inserts_inner(request)
.await
.map(|_| Default::default())
.map_err(to_meta_err(snafu::location!()))
}
}
impl FlowEngine for StreamingEngine {
async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
self.create_flow_inner(args).await

View File

@@ -388,6 +388,20 @@ impl BatchingEngine {
pub async fn flow_exist_inner(&self, flow_id: FlowId) -> bool {
self.tasks.read().await.contains_key(&flow_id)
}
pub async fn adjust_flow(
&self,
flow_id: FlowId,
min_run_interval_secs: u64,
max_filter_num_per_query: usize,
) -> Result<(), Error> {
let task = self.tasks.read().await.get(&flow_id).cloned();
let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?;
debug!("Adjusting flow {flow_id} with min_run_interval_secs={} and max_filter_num_per_query={}", min_run_interval_secs, max_filter_num_per_query);
task.adjust(min_run_interval_secs, max_filter_num_per_query);
Ok(())
}
}
impl FlowEngine for BatchingEngine {

View File

@@ -14,8 +14,9 @@
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
use std::sync::{Arc, Weak};
use std::time::SystemTime;
use std::collections::HashMap;
use std::sync::{Arc, Mutex, Weak};
use std::time::{Duration, Instant, SystemTime};
use api::v1::greptime_request::Request;
use api::v1::CreateTableExpr;
@@ -26,20 +27,21 @@ use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
use common_meta::peer::Peer;
use common_meta::rpc::store::RangeRequest;
use common_query::Output;
use common_telemetry::warn;
use common_telemetry::{debug, warn};
use itertools::Itertools;
use meta_client::client::MetaClient;
use rand::rng;
use rand::seq::SliceRandom;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use crate::batching_mode::task::BatchingTask;
use crate::batching_mode::{
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT,
GRPC_MAX_RETRIES,
};
use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
use crate::{Error, FlowAuthHeader};
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD;
use crate::{Error, FlowAuthHeader, FlowId};
/// Just like [`GrpcQueryHandler`] but use BoxedError
///
@@ -74,6 +76,105 @@ impl<
type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
/// Statistics about running query on this frontend from flownode
#[derive(Debug, Default, Clone)]
struct FrontendStat {
/// The query for flow id has been running since this timestamp
since: HashMap<FlowId, Instant>,
/// The average query time for each flow id
/// This is used to calculate the average query time for each flow id
past_query_avg: HashMap<FlowId, (usize, Duration)>,
}
#[derive(Debug, Default, Clone)]
pub struct FrontendStats {
/// The statistics for each flow id
stats: Arc<Mutex<HashMap<String, FrontendStat>>>,
}
impl FrontendStats {
pub fn observe(&self, frontend_addr: &str, flow_id: FlowId) -> FrontendStatsGuard {
let mut stats = self.stats.lock().expect("Failed to lock frontend stats");
let stat = stats.entry(frontend_addr.to_string()).or_default();
stat.since.insert(flow_id, Instant::now());
FrontendStatsGuard {
stats: self.stats.clone(),
frontend_addr: frontend_addr.to_string(),
cur: flow_id,
}
}
/// return frontend addrs sorted by load, from lightest to heaviest
/// The load is calculated as the total average query time for each flow id plus running query's total running time elapsed
pub fn sort_by_load(&self) -> Vec<String> {
let stats = self.stats.lock().expect("Failed to lock frontend stats");
let fe_load_factor = stats
.iter()
.map(|(node_addr, stat)| {
// total expected avg running time for all currently running queries
let total_expect_avg_run_time = stat
.since
.keys()
.map(|f| {
let (count, total_duration) =
stat.past_query_avg.get(f).unwrap_or(&(0, Duration::ZERO));
if *count == 0 {
0.0
} else {
total_duration.as_secs_f64() / *count as f64
}
})
.sum::<f64>();
let total_cur_running_time = stat
.since
.values()
.map(|since| since.elapsed().as_secs_f64())
.sum::<f64>();
(
node_addr.to_string(),
total_expect_avg_run_time + total_cur_running_time,
)
})
.sorted_by(|(_, load_a), (_, load_b)| {
load_a
.partial_cmp(load_b)
.unwrap_or(std::cmp::Ordering::Equal)
})
.collect::<Vec<_>>();
debug!("Frontend load factor: {:?}", fe_load_factor);
for (node_addr, load) in &fe_load_factor {
METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD
.with_label_values(&[&node_addr.to_string()])
.observe(*load);
}
fe_load_factor
.into_iter()
.map(|(addr, _)| addr)
.collect::<Vec<_>>()
}
}
pub struct FrontendStatsGuard {
stats: Arc<Mutex<HashMap<String, FrontendStat>>>,
frontend_addr: String,
cur: FlowId,
}
impl Drop for FrontendStatsGuard {
fn drop(&mut self) {
let mut stats = self.stats.lock().expect("Failed to lock frontend stats");
if let Some(stat) = stats.get_mut(&self.frontend_addr) {
if let Some(since) = stat.since.remove(&self.cur) {
let elapsed = since.elapsed();
let (count, total_duration) = stat.past_query_avg.entry(self.cur).or_default();
*count += 1;
*total_duration += elapsed;
}
}
}
}
/// A simple frontend client able to execute sql using grpc protocol
///
/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
@@ -83,6 +184,7 @@ pub enum FrontendClient {
meta_client: Arc<MetaClient>,
chnl_mgr: ChannelManager,
auth: Option<FlowAuthHeader>,
fe_stats: FrontendStats,
},
Standalone {
/// for the sake of simplicity still use grpc even in standalone mode
@@ -114,6 +216,7 @@ impl FrontendClient {
ChannelManager::with_config(cfg)
},
auth,
fe_stats: Default::default(),
}
}
@@ -183,7 +286,7 @@ impl FrontendClient {
/// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts`
/// and is able to process query
async fn get_random_active_frontend(
pub(crate) async fn get_random_active_frontend(
&self,
catalog: &str,
schema: &str,
@@ -192,6 +295,7 @@ impl FrontendClient {
meta_client: _,
chnl_mgr,
auth,
fe_stats,
} = self
else {
return UnexpectedSnafu {
@@ -208,8 +312,21 @@ impl FrontendClient {
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
// shuffle the frontends to avoid always pick the same one
frontends.shuffle(&mut rng());
let node_addrs_by_load = fe_stats.sort_by_load();
// index+1 to load order asc, so that the lightest node has load 1 and non-existent node has load 0
let addr2load = node_addrs_by_load
.iter()
.enumerate()
.map(|(i, id)| (id.clone(), i + 1))
.collect::<HashMap<_, _>>();
// sort frontends by load, from lightest to heaviest
frontends.sort_by(|(_, a), (_, b)| {
// if not even in stats, treat as 0 load since never been queried
let load_a = addr2load.get(&a.peer.addr).unwrap_or(&0);
let load_b = addr2load.get(&b.peer.addr).unwrap_or(&0);
load_a.cmp(load_b)
});
debug!("Frontend nodes sorted by load: {:?}", frontends);
// found node with maximum last_activity_ts
for (_, node_info) in frontends
@@ -257,6 +374,7 @@ impl FrontendClient {
create: CreateTableExpr,
catalog: &str,
schema: &str,
task: Option<&BatchingTask>,
) -> Result<u32, Error> {
self.handle(
Request::Ddl(api::v1::DdlRequest {
@@ -264,7 +382,8 @@ impl FrontendClient {
}),
catalog,
schema,
&mut None,
None,
task,
)
.await
}
@@ -275,15 +394,31 @@ impl FrontendClient {
req: api::v1::greptime_request::Request,
catalog: &str,
schema: &str,
peer_desc: &mut Option<PeerDesc>,
use_peer: Option<Peer>,
task: Option<&BatchingTask>,
) -> Result<u32, Error> {
match self {
FrontendClient::Distributed { .. } => {
let db = self.get_random_active_frontend(catalog, schema).await?;
FrontendClient::Distributed {
fe_stats, chnl_mgr, ..
} => {
let db = if let Some(peer) = use_peer {
DatabaseWithPeer::new(
Database::new(
catalog,
schema,
Client::with_manager_and_urls(
chnl_mgr.clone(),
vec![peer.addr.clone()],
),
),
peer,
)
} else {
self.get_random_active_frontend(catalog, schema).await?
};
*peer_desc = Some(PeerDesc::Dist {
peer: db.peer.clone(),
});
let flow_id = task.map(|t| t.config.flow_id).unwrap_or_default();
let _guard = fe_stats.observe(&db.peer.addr, flow_id);
db.database
.handle_with_retry(req.clone(), GRPC_MAX_RETRIES)

View File

@@ -32,6 +32,7 @@ use crate::batching_mode::MIN_REFRESH_DURATION;
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT,
};
use crate::{Error, FlowId};
@@ -52,6 +53,13 @@ pub struct TaskState {
pub(crate) shutdown_rx: oneshot::Receiver<()>,
/// Task handle
pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
/// Slow Query metrics update task handle
pub(crate) slow_query_metric_task: Option<tokio::task::JoinHandle<()>>,
/// min run interval in seconds
pub(crate) min_run_interval: Option<u64>,
/// max filter number per query
pub(crate) max_filter_num: Option<usize>,
}
impl TaskState {
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
@@ -63,6 +71,9 @@ impl TaskState {
exec_state: ExecState::Idle,
shutdown_rx,
task_handle: None,
slow_query_metric_task: None,
min_run_interval: None,
max_filter_num: None,
}
}
@@ -87,20 +98,17 @@ impl TaskState {
pub fn get_next_start_query_time(
&self,
flow_id: FlowId,
time_window_size: &Option<Duration>,
_time_window_size: &Option<Duration>,
max_timeout: Option<Duration>,
) -> Instant {
let last_duration = max_timeout
let next_duration = max_timeout
.unwrap_or(self.last_query_duration)
.min(self.last_query_duration)
.max(MIN_REFRESH_DURATION);
let next_duration = time_window_size
.map(|t| {
let half = t / 2;
half.max(last_duration)
})
.unwrap_or(last_duration);
.max(
self.min_run_interval
.map(Duration::from_secs)
.unwrap_or(MIN_REFRESH_DURATION),
);
// if have dirty time window, execute immediately to clean dirty time window
if self.dirty_time_windows.windows.is_empty() {
@@ -243,9 +251,19 @@ impl DirtyTimeWindows {
};
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
.with_label_values(&[flow_id.to_string().as_str()])
.with_label_values(&[
flow_id.to_string().as_str(),
format!("{}", window_size).as_str(),
])
.observe(first_nth.len() as f64);
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT
.with_label_values(&[
flow_id.to_string().as_str(),
format!("{}", window_size).as_str(),
])
.observe(self.windows.len() as f64);
let full_time_range = first_nth
.iter()
.fold(chrono::Duration::zero(), |acc, (start, end)| {
@@ -257,7 +275,10 @@ impl DirtyTimeWindows {
})
.num_seconds() as f64;
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE
.with_label_values(&[flow_id.to_string().as_str()])
.with_label_values(&[
flow_id.to_string().as_str(),
format!("{}", window_size).as_str(),
])
.observe(full_time_range);
let mut expr_lst = vec![];

View File

@@ -61,7 +61,8 @@ use crate::error::{
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
};
use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT,
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
};
use crate::{Error, FlowId};
@@ -81,6 +82,14 @@ pub struct TaskConfig {
query_type: QueryType,
}
impl TaskConfig {
pub fn time_window_size(&self) -> Option<Duration> {
self.time_window_expr
.as_ref()
.and_then(|expr| *expr.time_window_size())
}
}
fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
let stmts =
ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default())
@@ -144,6 +153,12 @@ impl BatchingTask {
})
}
pub fn adjust(&self, min_run_interval_secs: u64, max_filter_num_per_query: usize) {
let mut state = self.state.write().unwrap();
state.min_run_interval = Some(min_run_interval_secs);
state.max_filter_num = Some(max_filter_num_per_query);
}
/// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
///
/// useful for flush_flow to flush dirty time windows range
@@ -280,7 +295,7 @@ impl BatchingTask {
let catalog = &self.config.sink_table_name[0];
let schema = &self.config.sink_table_name[1];
frontend_client
.create(expr.clone(), catalog, schema)
.create(expr.clone(), catalog, schema, Some(self))
.await?;
Ok(())
}
@@ -328,11 +343,53 @@ impl BatchingTask {
})?;
let plan = expanded_plan;
let mut peer_desc = None;
let db = frontend_client
.get_random_active_frontend(catalog, schema)
.await?;
let peer_desc = db.peer.clone();
let (tx, mut rx) = oneshot::channel();
let peer_inner = peer_desc.clone();
let window_size_pretty = format!(
"{}s",
self.config.time_window_size().unwrap_or_default().as_secs()
);
let inner_window_size_pretty = window_size_pretty.clone();
let flow_id = self.config.flow_id;
let slow_query_metric_task = tokio::task::spawn(async move {
tokio::time::sleep(SLOW_QUERY_THRESHOLD).await;
METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT
.with_label_values(&[
flow_id.to_string().as_str(),
&peer_inner.to_string(),
inner_window_size_pretty.as_str(),
])
.add(1.0);
while rx.try_recv() == Err(TryRecvError::Empty) {
// sleep for a while before next update
tokio::time::sleep(MIN_REFRESH_DURATION).await;
}
METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT
.with_label_values(&[
flow_id.to_string().as_str(),
&peer_inner.to_string(),
inner_window_size_pretty.as_str(),
])
.sub(1.0);
});
self.state.write().unwrap().slow_query_metric_task = Some(slow_query_metric_task);
let res = {
let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
.with_label_values(&[flow_id.to_string().as_str()])
.with_label_values(&[
flow_id.to_string().as_str(),
format!(
"{}s",
self.config.time_window_size().unwrap_or_default().as_secs()
)
.as_str(),
])
.start_timer();
// hack and special handling the insert logical plan
@@ -361,10 +418,12 @@ impl BatchingTask {
};
frontend_client
.handle(req, catalog, schema, &mut peer_desc)
.handle(req, catalog, schema, Some(db.peer), Some(self))
.await
};
// signaling the slow query metric task to stop
let _ = tx.send(());
let elapsed = instant.elapsed();
if let Ok(affected_rows) = &res {
debug!(
@@ -387,7 +446,12 @@ impl BatchingTask {
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
.with_label_values(&[
flow_id.to_string().as_str(),
&peer_desc.unwrap_or_default().to_string(),
&peer_desc.to_string(),
format!(
"{}s",
self.config.time_window_size().unwrap_or_default().as_secs()
)
.as_str(),
])
.observe(elapsed.as_secs_f64());
}
@@ -580,19 +644,20 @@ impl BatchingTask {
),
})?;
let expr = self
.state
.write()
.unwrap()
.dirty_time_windows
.gen_filter_exprs(
let expr = {
let mut state = self.state.write().unwrap();
let max_window_cnt = state
.max_filter_num
.unwrap_or(DirtyTimeWindows::MAX_FILTER_NUM);
state.dirty_time_windows.gen_filter_exprs(
&col_name,
Some(l),
window_size,
DirtyTimeWindows::MAX_FILTER_NUM,
max_window_cnt,
self.config.flow_id,
Some(self),
)?;
)?
};
debug!(
"Flow id={:?}, Generated filter expr: {:?}",

View File

@@ -31,22 +31,37 @@ lazy_static! {
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME: HistogramVec = register_histogram_vec!(
"greptime_flow_batching_engine_query_time_secs",
"flow batching engine query time(seconds)",
&["flow_id"],
&["flow_id", "time_window_granularity"],
vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!(
"greptime_flow_batching_engine_slow_query_secs",
"flow batching engine slow query(seconds)",
&["flow_id", "peer"],
"flow batching engine slow query(seconds), updated after query finished",
&["flow_id", "peer", "time_window_granularity"],
vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT: GaugeVec =
register_gauge_vec!(
"greptime_flow_batching_engine_real_time_slow_query_number",
"flow batching engine real time slow query number, updated in real time",
&["flow_id", "peer", "time_window_granularity"],
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT: HistogramVec =
register_histogram_vec!(
"greptime_flow_batching_engine_stalled_query_window_cnt",
"flow batching engine stalled query time window count",
&["flow_id", "time_window_granularity"],
vec![0.0, 5., 10., 20., 40.]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT: HistogramVec =
register_histogram_vec!(
"greptime_flow_batching_engine_query_window_cnt",
"flow batching engine query time window count",
&["flow_id"],
&["flow_id", "time_window_granularity"],
vec![0.0, 5., 10., 20., 40.]
)
.unwrap();
@@ -54,7 +69,15 @@ lazy_static! {
register_histogram_vec!(
"greptime_flow_batching_engine_query_time_range_secs",
"flow batching engine query time range(seconds)",
&["flow_id"],
&["flow_id", "time_window_granularity"],
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD: HistogramVec =
register_histogram_vec!(
"greptime_flow_batching_engine_guess_fe_load",
"flow batching engine guessed frontend load",
&["fe_addr"],
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
)
.unwrap();

View File

@@ -17,7 +17,7 @@ use std::collections::BTreeMap;
use std::sync::Arc;
use common_error::ext::BoxedError;
use common_function::function::{FunctionContext, FunctionRef};
use common_function::function::FunctionContext;
use datafusion_substrait::extensions::Extensions;
use datatypes::data_type::ConcreteDataType as CDT;
use query::QueryEngine;
@@ -108,13 +108,9 @@ impl FunctionExtensions {
/// register flow-specific functions to the query engine
pub fn register_function_to_query_engine(engine: &Arc<dyn QueryEngine>) {
let tumble_fn = Arc::new(TumbleFunction::new("tumble")) as FunctionRef;
let tumble_start_fn = Arc::new(TumbleFunction::new(TUMBLE_START)) as FunctionRef;
let tumble_end_fn = Arc::new(TumbleFunction::new(TUMBLE_END)) as FunctionRef;
engine.register_scalar_function(tumble_fn.into());
engine.register_scalar_function(tumble_start_fn.into());
engine.register_scalar_function(tumble_end_fn.into());
engine.register_function(Arc::new(TumbleFunction::new("tumble")));
engine.register_function(Arc::new(TumbleFunction::new(TUMBLE_START)));
engine.register_function(Arc::new(TumbleFunction::new(TUMBLE_END)));
}
#[derive(Debug)]

View File

@@ -25,7 +25,6 @@ common-catalog.workspace = true
common-config.workspace = true
common-datasource.workspace = true
common-error.workspace = true
common-frontend.workspace = true
common-function.workspace = true
common-grpc.workspace = true
common-macro.workspace = true

Some files were not shown because too many files have changed in this diff Show More