mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-25 15:40:02 +00:00
Compare commits
34 Commits
flow/adjus
...
feat/bulk-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
97e9b97a57 | ||
|
|
7fc74e2928 | ||
|
|
77f20ede7a | ||
|
|
ced018fce0 | ||
|
|
41dacff283 | ||
|
|
94a14b6da7 | ||
|
|
6ad3a32cb2 | ||
|
|
ac00314578 | ||
|
|
2f08bee08f | ||
|
|
8ebb31cdcd | ||
|
|
f4f8d65a39 | ||
|
|
b31990e881 | ||
|
|
6da633e70d | ||
|
|
9633e794c7 | ||
|
|
eaf1e1198f | ||
|
|
505bf25505 | ||
|
|
f1b29ece3c | ||
|
|
74df12e8c0 | ||
|
|
be6a5d2da8 | ||
|
|
7468a8ab2a | ||
|
|
5bb0466ff2 | ||
|
|
f6db419afd | ||
|
|
05b708ed2e | ||
|
|
f4c3950f57 | ||
|
|
88c4409df4 | ||
|
|
c10b8f8474 | ||
|
|
041b683a8d | ||
|
|
03bb6e4f28 | ||
|
|
09e5a6580f | ||
|
|
f9f905ae14 | ||
|
|
1d53dd26ae | ||
|
|
01796c9cc0 | ||
|
|
9469a8f8f2 | ||
|
|
2fabe346a1 |
@@ -10,13 +10,13 @@ inputs:
|
||||
meta-replicas:
|
||||
default: 2
|
||||
description: "Number of Metasrv replicas"
|
||||
image-registry:
|
||||
image-registry:
|
||||
default: "docker.io"
|
||||
description: "Image registry"
|
||||
image-repository:
|
||||
image-repository:
|
||||
default: "greptime/greptimedb"
|
||||
description: "Image repository"
|
||||
image-tag:
|
||||
image-tag:
|
||||
default: "latest"
|
||||
description: 'Image tag'
|
||||
etcd-endpoints:
|
||||
@@ -32,12 +32,12 @@ runs:
|
||||
steps:
|
||||
- name: Install GreptimeDB operator
|
||||
uses: nick-fields/retry@v3
|
||||
with:
|
||||
with:
|
||||
timeout_minutes: 3
|
||||
max_attempts: 3
|
||||
shell: bash
|
||||
command: |
|
||||
helm repo add greptime https://greptimeteam.github.io/helm-charts/
|
||||
helm repo add greptime https://greptimeteam.github.io/helm-charts/
|
||||
helm repo update
|
||||
helm upgrade \
|
||||
--install \
|
||||
@@ -48,10 +48,10 @@ runs:
|
||||
--wait-for-jobs
|
||||
- name: Install GreptimeDB cluster
|
||||
shell: bash
|
||||
run: |
|
||||
run: |
|
||||
helm upgrade \
|
||||
--install my-greptimedb \
|
||||
--set meta.etcdEndpoints=${{ inputs.etcd-endpoints }} \
|
||||
--set meta.backendStorage.etcd.endpoints=${{ inputs.etcd-endpoints }} \
|
||||
--set meta.enableRegionFailover=${{ inputs.enable-region-failover }} \
|
||||
--set image.registry=${{ inputs.image-registry }} \
|
||||
--set image.repository=${{ inputs.image-repository }} \
|
||||
@@ -72,7 +72,7 @@ runs:
|
||||
- name: Wait for GreptimeDB
|
||||
shell: bash
|
||||
run: |
|
||||
while true; do
|
||||
while true; do
|
||||
PHASE=$(kubectl -n my-greptimedb get gtc my-greptimedb -o jsonpath='{.status.clusterPhase}')
|
||||
if [ "$PHASE" == "Running" ]; then
|
||||
echo "Cluster is ready"
|
||||
@@ -86,10 +86,10 @@ runs:
|
||||
- name: Print GreptimeDB info
|
||||
if: always()
|
||||
shell: bash
|
||||
run: |
|
||||
run: |
|
||||
kubectl get all --show-labels -n my-greptimedb
|
||||
- name: Describe Nodes
|
||||
if: always()
|
||||
shell: bash
|
||||
run: |
|
||||
run: |
|
||||
kubectl describe nodes
|
||||
|
||||
4
.github/scripts/deploy-greptimedb.sh
vendored
4
.github/scripts/deploy-greptimedb.sh
vendored
@@ -68,7 +68,7 @@ function deploy_greptimedb_cluster() {
|
||||
|
||||
helm install "$cluster_name" greptime/greptimedb-cluster \
|
||||
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
|
||||
--set meta.etcdEndpoints="etcd.$install_namespace:2379" \
|
||||
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
|
||||
-n "$install_namespace"
|
||||
|
||||
# Wait for greptimedb cluster to be ready.
|
||||
@@ -103,7 +103,7 @@ function deploy_greptimedb_cluster_with_s3_storage() {
|
||||
|
||||
helm install "$cluster_name" greptime/greptimedb-cluster -n "$install_namespace" \
|
||||
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
|
||||
--set meta.etcdEndpoints="etcd.$install_namespace:2379" \
|
||||
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
|
||||
--set storage.s3.bucket="$AWS_CI_TEST_BUCKET" \
|
||||
--set storage.s3.region="$AWS_REGION" \
|
||||
--set storage.s3.root="$DATA_ROOT" \
|
||||
|
||||
@@ -30,7 +30,7 @@ update_helm_charts_version() {
|
||||
|
||||
# Commit the changes.
|
||||
git add .
|
||||
git commit -m "chore: Update GreptimeDB version to ${VERSION}"
|
||||
git commit -s -m "chore: Update GreptimeDB version to ${VERSION}"
|
||||
git push origin $BRANCH_NAME
|
||||
|
||||
# Create a Pull Request.
|
||||
|
||||
@@ -26,7 +26,7 @@ update_homebrew_greptime_version() {
|
||||
|
||||
# Commit the changes.
|
||||
git add .
|
||||
git commit -m "chore: Update GreptimeDB version to ${VERSION}"
|
||||
git commit -s -m "chore: Update GreptimeDB version to ${VERSION}"
|
||||
git push origin $BRANCH_NAME
|
||||
|
||||
# Create a Pull Request.
|
||||
|
||||
92
Cargo.lock
generated
92
Cargo.lock
generated
@@ -1621,8 +1621,10 @@ dependencies = [
|
||||
"cache",
|
||||
"catalog",
|
||||
"chrono",
|
||||
"common-base",
|
||||
"common-catalog",
|
||||
"common-error",
|
||||
"common-frontend",
|
||||
"common-macro",
|
||||
"common-meta",
|
||||
"common-procedure",
|
||||
@@ -2293,8 +2295,14 @@ version = "0.15.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"common-error",
|
||||
"common-grpc",
|
||||
"common-macro",
|
||||
"common-meta",
|
||||
"greptime-proto",
|
||||
"meta-client",
|
||||
"snafu 0.8.5",
|
||||
"tokio",
|
||||
"tonic 0.12.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3252,7 +3260,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"arrow-array 54.2.1",
|
||||
@@ -3303,7 +3311,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-catalog"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"async-trait",
|
||||
@@ -3323,7 +3331,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-catalog-listing"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"arrow-schema 54.3.1",
|
||||
@@ -3346,7 +3354,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-common"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"arrow 54.2.1",
|
||||
@@ -3371,7 +3379,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-common-runtime"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"log",
|
||||
"tokio",
|
||||
@@ -3380,12 +3388,12 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-doc"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
|
||||
[[package]]
|
||||
name = "datafusion-execution"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"dashmap",
|
||||
@@ -3403,7 +3411,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-expr"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"chrono",
|
||||
@@ -3423,7 +3431,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-expr-common"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"datafusion-common",
|
||||
@@ -3434,7 +3442,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-functions"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"arrow-buffer 54.3.1",
|
||||
@@ -3463,7 +3471,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-functions-aggregate"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"arrow 54.2.1",
|
||||
@@ -3484,7 +3492,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-functions-aggregate-common"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"arrow 54.2.1",
|
||||
@@ -3496,7 +3504,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-functions-nested"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"arrow-array 54.2.1",
|
||||
@@ -3518,7 +3526,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-functions-table"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"async-trait",
|
||||
@@ -3533,7 +3541,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-functions-window"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"datafusion-common",
|
||||
"datafusion-doc",
|
||||
@@ -3549,7 +3557,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-functions-window-common"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"datafusion-common",
|
||||
"datafusion-physical-expr-common",
|
||||
@@ -3558,7 +3566,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-macros"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"datafusion-expr",
|
||||
"quote",
|
||||
@@ -3568,7 +3576,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-optimizer"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"chrono",
|
||||
@@ -3586,7 +3594,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-physical-expr"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"arrow 54.2.1",
|
||||
@@ -3609,7 +3617,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-physical-expr-common"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"arrow 54.2.1",
|
||||
@@ -3622,7 +3630,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-physical-optimizer"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"arrow-schema 54.3.1",
|
||||
@@ -3643,7 +3651,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-physical-plan"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"arrow 54.2.1",
|
||||
@@ -3673,7 +3681,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-sql"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"arrow-array 54.2.1",
|
||||
@@ -3691,7 +3699,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "datafusion-substrait"
|
||||
version = "45.0.0"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
|
||||
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
|
||||
dependencies = [
|
||||
"async-recursion",
|
||||
"async-trait",
|
||||
@@ -4701,6 +4709,7 @@ dependencies = [
|
||||
"common-config",
|
||||
"common-datasource",
|
||||
"common-error",
|
||||
"common-frontend",
|
||||
"common-function",
|
||||
"common-grpc",
|
||||
"common-macro",
|
||||
@@ -4747,6 +4756,7 @@ dependencies = [
|
||||
"substrait 0.15.0",
|
||||
"table",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"toml 0.8.19",
|
||||
"tonic 0.12.3",
|
||||
"tower 0.5.2",
|
||||
@@ -5133,7 +5143,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=454c52634c3bac27de10bf0d85d5533eed1cf03f#454c52634c3bac27de10bf0d85d5533eed1cf03f"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=17971523673f4fbc982510d3c9d6647ff642e16f#17971523673f4fbc982510d3c9d6647ff642e16f"
|
||||
dependencies = [
|
||||
"prost 0.13.5",
|
||||
"serde",
|
||||
@@ -7240,6 +7250,7 @@ dependencies = [
|
||||
"humantime-serde",
|
||||
"itertools 0.14.0",
|
||||
"lazy_static",
|
||||
"mito-codec",
|
||||
"mito2",
|
||||
"mur3",
|
||||
"object-store",
|
||||
@@ -7305,6 +7316,29 @@ dependencies = [
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mito-codec"
|
||||
version = "0.15.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"bytes",
|
||||
"common-base",
|
||||
"common-decimal",
|
||||
"common-error",
|
||||
"common-macro",
|
||||
"common-recordbatch",
|
||||
"common-telemetry",
|
||||
"common-time",
|
||||
"datafusion-common",
|
||||
"datafusion-expr",
|
||||
"datatypes",
|
||||
"memcomparable",
|
||||
"paste",
|
||||
"serde",
|
||||
"snafu 0.8.5",
|
||||
"store-api",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mito2"
|
||||
version = "0.15.0"
|
||||
@@ -7347,6 +7381,7 @@ dependencies = [
|
||||
"lazy_static",
|
||||
"log-store",
|
||||
"memcomparable",
|
||||
"mito-codec",
|
||||
"moka",
|
||||
"object-store",
|
||||
"parquet",
|
||||
@@ -8386,6 +8421,7 @@ dependencies = [
|
||||
"common-catalog",
|
||||
"common-datasource",
|
||||
"common-error",
|
||||
"common-frontend",
|
||||
"common-function",
|
||||
"common-grpc",
|
||||
"common-grpc-expr",
|
||||
@@ -8885,9 +8921,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "pgwire"
|
||||
version = "0.30.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ec79ee18e6cafde8698885646780b967ecc905120798b8359dd0da64f9688e89"
|
||||
version = "0.30.2"
|
||||
source = "git+https://github.com/sunng87/pgwire?rev=127573d997228cfb70c7699881c568eae8131270#127573d997228cfb70c7699881c568eae8131270"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
@@ -11135,6 +11170,7 @@ dependencies = [
|
||||
"common-catalog",
|
||||
"common-config",
|
||||
"common-error",
|
||||
"common-frontend",
|
||||
"common-grpc",
|
||||
"common-macro",
|
||||
"common-mem-prof",
|
||||
|
||||
22
Cargo.toml
22
Cargo.toml
@@ -49,6 +49,7 @@ members = [
|
||||
"src/meta-client",
|
||||
"src/meta-srv",
|
||||
"src/metric-engine",
|
||||
"src/mito-codec",
|
||||
"src/mito2",
|
||||
"src/object-store",
|
||||
"src/operator",
|
||||
@@ -116,15 +117,15 @@ clap = { version = "4.4", features = ["derive"] }
|
||||
config = "0.13.0"
|
||||
crossbeam-utils = "0.8"
|
||||
dashmap = "6.1"
|
||||
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
deadpool = "0.12"
|
||||
deadpool-postgres = "0.14"
|
||||
derive_builder = "0.20"
|
||||
@@ -133,7 +134,7 @@ etcd-client = "0.14"
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "454c52634c3bac27de10bf0d85d5533eed1cf03f" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "17971523673f4fbc982510d3c9d6647ff642e16f" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
@@ -274,6 +275,7 @@ log-store = { path = "src/log-store" }
|
||||
meta-client = { path = "src/meta-client" }
|
||||
meta-srv = { path = "src/meta-srv" }
|
||||
metric-engine = { path = "src/metric-engine" }
|
||||
mito-codec = { path = "src/mito-codec" }
|
||||
mito2 = { path = "src/mito2" }
|
||||
object-store = { path = "src/object-store" }
|
||||
operator = { path = "src/operator" }
|
||||
|
||||
@@ -232,6 +232,7 @@
|
||||
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
|
||||
| `grpc.server_addr` | String | `127.0.0.1:4001` | The address advertised to the metasrv, and used for connections from outside the host.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `grpc.bind_addr`. |
|
||||
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
|
||||
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for frontend side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression.<br/>Default to `none` |
|
||||
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
|
||||
| `grpc.tls.mode` | String | `disable` | TLS mode. |
|
||||
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |
|
||||
@@ -404,6 +405,7 @@
|
||||
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
|
||||
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
|
||||
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
|
||||
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for datanode side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression.<br/>Default to `none` |
|
||||
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
|
||||
| `grpc.tls.mode` | String | `disable` | TLS mode. |
|
||||
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |
|
||||
|
||||
@@ -44,6 +44,13 @@ runtime_size = 8
|
||||
max_recv_message_size = "512MB"
|
||||
## The maximum send message size for gRPC server.
|
||||
max_send_message_size = "512MB"
|
||||
## Compression mode for datanode side Arrow IPC service. Available options:
|
||||
## - `none`: disable all compression
|
||||
## - `transport`: only enable gRPC transport compression (zstd)
|
||||
## - `arrow_ipc`: only enable Arrow IPC compression (lz4)
|
||||
## - `all`: enable all compression.
|
||||
## Default to `none`
|
||||
flight_compression = "arrow_ipc"
|
||||
|
||||
## gRPC server TLS options, see `mysql.tls` section.
|
||||
[grpc.tls]
|
||||
|
||||
@@ -54,6 +54,13 @@ bind_addr = "127.0.0.1:4001"
|
||||
server_addr = "127.0.0.1:4001"
|
||||
## The number of server worker threads.
|
||||
runtime_size = 8
|
||||
## Compression mode for frontend side Arrow IPC service. Available options:
|
||||
## - `none`: disable all compression
|
||||
## - `transport`: only enable gRPC transport compression (zstd)
|
||||
## - `arrow_ipc`: only enable Arrow IPC compression (lz4)
|
||||
## - `all`: enable all compression.
|
||||
## Default to `none`
|
||||
flight_compression = "arrow_ipc"
|
||||
|
||||
## gRPC server TLS options, see `mysql.tls` section.
|
||||
[grpc.tls]
|
||||
|
||||
@@ -9,7 +9,7 @@ We highly recommend using the self-monitoring feature provided by [GreptimeDB Op
|
||||
- **Metrics Dashboards**
|
||||
|
||||
- `dashboards/metrics/cluster/dashboard.json`: The Grafana dashboard for the GreptimeDB cluster. Read the [dashboard.md](./dashboards/metrics/cluster/dashboard.md) for more details.
|
||||
|
||||
|
||||
- `dashboards/metrics/standalone/dashboard.json`: The Grafana dashboard for the standalone GreptimeDB instance. **It's generated from the `cluster/dashboard.json` by removing the instance filter through the `make dashboards` command**. Read the [dashboard.md](./dashboards/metrics/standalone/dashboard.md) for more details.
|
||||
|
||||
- **Logs Dashboard**
|
||||
@@ -83,7 +83,7 @@ If you use the [Helm Chart](https://github.com/GreptimeTeam/helm-charts) to depl
|
||||
- `monitoring.enabled=true`: Deploys a standalone GreptimeDB instance dedicated to monitoring the cluster;
|
||||
- `grafana.enabled=true`: Deploys Grafana and automatically imports the monitoring dashboard;
|
||||
|
||||
The standalone GreptimeDB instance will collect metrics from your cluster, and the dashboard will be available in the Grafana UI. For detailed deployment instructions, please refer to our [Kubernetes deployment guide](https://docs.greptime.com/nightly/user-guide/deployments/deploy-on-kubernetes/getting-started).
|
||||
The standalone GreptimeDB instance will collect metrics from your cluster, and the dashboard will be available in the Grafana UI. For detailed deployment instructions, please refer to our [Kubernetes deployment guide](https://docs.greptime.com/user-guide/deployments-administration/deploy-on-kubernetes/getting-started).
|
||||
|
||||
### Self-host Prometheus and import dashboards manually
|
||||
|
||||
|
||||
@@ -17,8 +17,10 @@ arrow-schema.workspace = true
|
||||
async-stream.workspace = true
|
||||
async-trait.workspace = true
|
||||
bytes.workspace = true
|
||||
common-base.workspace = true
|
||||
common-catalog.workspace = true
|
||||
common-error.workspace = true
|
||||
common-frontend.workspace = true
|
||||
common-macro.workspace = true
|
||||
common-meta.workspace = true
|
||||
common-procedure.workspace = true
|
||||
|
||||
@@ -277,6 +277,26 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to invoke frontend services"))]
|
||||
InvokeFrontend {
|
||||
source: common_frontend::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Meta client is not provided"))]
|
||||
MetaClientMissing {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to find frontend node: {}", addr))]
|
||||
FrontendNotFound {
|
||||
addr: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
impl Error {
|
||||
@@ -345,6 +365,10 @@ impl ErrorExt for Error {
|
||||
Error::GetViewCache { source, .. } | Error::GetTableCache { source, .. } => {
|
||||
source.status_code()
|
||||
}
|
||||
Error::InvokeFrontend { source, .. } => source.status_code(),
|
||||
Error::FrontendNotFound { .. } | Error::MetaClientMissing { .. } => {
|
||||
StatusCode::Unexpected
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -51,6 +51,7 @@ use crate::error::{
|
||||
};
|
||||
use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
|
||||
use crate::kvbackend::TableCacheRef;
|
||||
use crate::process_manager::ProcessManagerRef;
|
||||
use crate::system_schema::pg_catalog::PGCatalogProvider;
|
||||
use crate::system_schema::SystemSchemaProvider;
|
||||
use crate::CatalogManager;
|
||||
@@ -84,6 +85,7 @@ impl KvBackendCatalogManager {
|
||||
backend: KvBackendRef,
|
||||
cache_registry: LayeredCacheRegistryRef,
|
||||
procedure_manager: Option<ProcedureManagerRef>,
|
||||
process_manager: Option<ProcessManagerRef>,
|
||||
) -> Arc<Self> {
|
||||
Arc::new_cyclic(|me| Self {
|
||||
information_extension,
|
||||
@@ -102,12 +104,14 @@ impl KvBackendCatalogManager {
|
||||
DEFAULT_CATALOG_NAME.to_string(),
|
||||
me.clone(),
|
||||
Arc::new(FlowMetadataManager::new(backend.clone())),
|
||||
process_manager.clone(),
|
||||
)),
|
||||
pg_catalog_provider: Arc::new(PGCatalogProvider::new(
|
||||
DEFAULT_CATALOG_NAME.to_string(),
|
||||
me.clone(),
|
||||
)),
|
||||
backend,
|
||||
process_manager,
|
||||
},
|
||||
cache_registry,
|
||||
procedure_manager,
|
||||
@@ -419,6 +423,7 @@ struct SystemCatalog {
|
||||
information_schema_provider: Arc<InformationSchemaProvider>,
|
||||
pg_catalog_provider: Arc<PGCatalogProvider>,
|
||||
backend: KvBackendRef,
|
||||
process_manager: Option<ProcessManagerRef>,
|
||||
}
|
||||
|
||||
impl SystemCatalog {
|
||||
@@ -486,6 +491,7 @@ impl SystemCatalog {
|
||||
catalog.to_string(),
|
||||
self.catalog_manager.clone(),
|
||||
Arc::new(FlowMetadataManager::new(self.backend.clone())),
|
||||
self.process_manager.clone(),
|
||||
))
|
||||
});
|
||||
information_schema_provider.table(table_name)
|
||||
|
||||
@@ -40,6 +40,7 @@ pub mod information_schema {
|
||||
pub use crate::system_schema::information_schema::*;
|
||||
}
|
||||
|
||||
pub mod process_manager;
|
||||
pub mod table_source;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
|
||||
@@ -356,6 +356,7 @@ impl MemoryCatalogManager {
|
||||
catalog,
|
||||
Arc::downgrade(self) as Weak<dyn CatalogManager>,
|
||||
Arc::new(FlowMetadataManager::new(Arc::new(MemoryKvBackend::new()))),
|
||||
None, // we don't need ProcessManager on regions server.
|
||||
);
|
||||
let information_schema = information_schema_provider.tables().clone();
|
||||
|
||||
|
||||
@@ -34,4 +34,20 @@ lazy_static! {
|
||||
register_histogram!("greptime_catalog_kv_get", "catalog kv get").unwrap();
|
||||
pub static ref METRIC_CATALOG_KV_BATCH_GET: Histogram =
|
||||
register_histogram!("greptime_catalog_kv_batch_get", "catalog kv batch get").unwrap();
|
||||
|
||||
/// Count of running process in each catalog.
|
||||
pub static ref PROCESS_LIST_COUNT: IntGaugeVec = register_int_gauge_vec!(
|
||||
"greptime_process_list_count",
|
||||
"Running process count per catalog",
|
||||
&["catalog"]
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
/// Count of killed process in each catalog.
|
||||
pub static ref PROCESS_KILL_COUNT: IntCounterVec = register_int_counter_vec!(
|
||||
"greptime_process_kill_count",
|
||||
"Completed kill process requests count",
|
||||
&["catalog"]
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
488
src/catalog/src/process_manager.rs
Normal file
488
src/catalog/src/process_manager.rs
Normal file
@@ -0,0 +1,488 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use api::v1::frontend::{KillProcessRequest, ListProcessRequest, ProcessInfo};
|
||||
use common_base::cancellation::CancellationHandle;
|
||||
use common_frontend::selector::{FrontendSelector, MetaClientSelector};
|
||||
use common_telemetry::{debug, info};
|
||||
use common_time::util::current_time_millis;
|
||||
use meta_client::MetaClientRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
|
||||
use crate::error;
|
||||
use crate::metrics::{PROCESS_KILL_COUNT, PROCESS_LIST_COUNT};
|
||||
|
||||
pub type ProcessManagerRef = Arc<ProcessManager>;
|
||||
|
||||
/// Query process manager.
|
||||
pub struct ProcessManager {
|
||||
/// Local frontend server address,
|
||||
server_addr: String,
|
||||
/// Next process id for local queries.
|
||||
next_id: AtomicU64,
|
||||
/// Running process per catalog.
|
||||
catalogs: RwLock<HashMap<String, HashMap<u64, CancellableProcess>>>,
|
||||
/// Frontend selector to locate frontend nodes.
|
||||
frontend_selector: Option<MetaClientSelector>,
|
||||
}
|
||||
|
||||
impl ProcessManager {
|
||||
/// Create a [ProcessManager] instance with server address and kv client.
|
||||
pub fn new(server_addr: String, meta_client: Option<MetaClientRef>) -> Self {
|
||||
let frontend_selector = meta_client.map(MetaClientSelector::new);
|
||||
Self {
|
||||
server_addr,
|
||||
next_id: Default::default(),
|
||||
catalogs: Default::default(),
|
||||
frontend_selector,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ProcessManager {
|
||||
/// Registers a submitted query. Use the provided id if present.
|
||||
#[must_use]
|
||||
pub fn register_query(
|
||||
self: &Arc<Self>,
|
||||
catalog: String,
|
||||
schemas: Vec<String>,
|
||||
query: String,
|
||||
client: String,
|
||||
id: Option<u64>,
|
||||
) -> Ticket {
|
||||
let id = id.unwrap_or_else(|| self.next_id.fetch_add(1, Ordering::Relaxed));
|
||||
let process = ProcessInfo {
|
||||
id,
|
||||
catalog: catalog.clone(),
|
||||
schemas,
|
||||
query,
|
||||
start_timestamp: current_time_millis(),
|
||||
client,
|
||||
frontend: self.server_addr.clone(),
|
||||
};
|
||||
let cancellation_handle = Arc::new(CancellationHandle::default());
|
||||
let cancellable_process = CancellableProcess::new(cancellation_handle.clone(), process);
|
||||
|
||||
self.catalogs
|
||||
.write()
|
||||
.unwrap()
|
||||
.entry(catalog.clone())
|
||||
.or_default()
|
||||
.insert(id, cancellable_process);
|
||||
|
||||
Ticket {
|
||||
catalog,
|
||||
manager: self.clone(),
|
||||
id,
|
||||
cancellation_handle,
|
||||
}
|
||||
}
|
||||
|
||||
/// Generates the next process id.
|
||||
pub fn next_id(&self) -> u64 {
|
||||
self.next_id.fetch_add(1, Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// De-register a query from process list.
|
||||
pub fn deregister_query(&self, catalog: String, id: u64) {
|
||||
if let Entry::Occupied(mut o) = self.catalogs.write().unwrap().entry(catalog) {
|
||||
let process = o.get_mut().remove(&id);
|
||||
debug!("Deregister process: {:?}", process);
|
||||
if o.get().is_empty() {
|
||||
o.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// List local running processes in given catalog.
|
||||
pub fn local_processes(&self, catalog: Option<&str>) -> error::Result<Vec<ProcessInfo>> {
|
||||
let catalogs = self.catalogs.read().unwrap();
|
||||
let result = if let Some(catalog) = catalog {
|
||||
if let Some(catalogs) = catalogs.get(catalog) {
|
||||
catalogs.values().map(|p| p.process.clone()).collect()
|
||||
} else {
|
||||
vec![]
|
||||
}
|
||||
} else {
|
||||
catalogs
|
||||
.values()
|
||||
.flat_map(|v| v.values().map(|p| p.process.clone()))
|
||||
.collect()
|
||||
};
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub async fn list_all_processes(
|
||||
&self,
|
||||
catalog: Option<&str>,
|
||||
) -> error::Result<Vec<ProcessInfo>> {
|
||||
let mut processes = vec![];
|
||||
if let Some(remote_frontend_selector) = self.frontend_selector.as_ref() {
|
||||
let frontends = remote_frontend_selector
|
||||
.select(|node| node.peer.addr != self.server_addr)
|
||||
.await
|
||||
.context(error::InvokeFrontendSnafu)?;
|
||||
for mut f in frontends {
|
||||
processes.extend(
|
||||
f.list_process(ListProcessRequest {
|
||||
catalog: catalog.unwrap_or_default().to_string(),
|
||||
})
|
||||
.await
|
||||
.context(error::InvokeFrontendSnafu)?
|
||||
.processes,
|
||||
);
|
||||
}
|
||||
}
|
||||
processes.extend(self.local_processes(catalog)?);
|
||||
Ok(processes)
|
||||
}
|
||||
|
||||
/// Kills query with provided catalog and id.
|
||||
pub async fn kill_process(
|
||||
&self,
|
||||
server_addr: String,
|
||||
catalog: String,
|
||||
id: u64,
|
||||
) -> error::Result<bool> {
|
||||
if server_addr == self.server_addr {
|
||||
if let Some(catalogs) = self.catalogs.write().unwrap().get_mut(&catalog) {
|
||||
if let Some(process) = catalogs.remove(&id) {
|
||||
process.handle.cancel();
|
||||
info!(
|
||||
"Killed process, catalog: {}, id: {:?}",
|
||||
process.process.catalog, process.process.id
|
||||
);
|
||||
PROCESS_KILL_COUNT.with_label_values(&[&catalog]).inc();
|
||||
Ok(true)
|
||||
} else {
|
||||
debug!("Failed to kill process, id not found: {}", id);
|
||||
Ok(false)
|
||||
}
|
||||
} else {
|
||||
debug!("Failed to kill process, catalog not found: {}", catalog);
|
||||
Ok(false)
|
||||
}
|
||||
} else {
|
||||
let mut nodes = self
|
||||
.frontend_selector
|
||||
.as_ref()
|
||||
.context(error::MetaClientMissingSnafu)?
|
||||
.select(|node| node.peer.addr == server_addr)
|
||||
.await
|
||||
.context(error::InvokeFrontendSnafu)?;
|
||||
ensure!(
|
||||
!nodes.is_empty(),
|
||||
error::FrontendNotFoundSnafu { addr: server_addr }
|
||||
);
|
||||
|
||||
let request = KillProcessRequest {
|
||||
server_addr,
|
||||
catalog,
|
||||
process_id: id,
|
||||
};
|
||||
nodes[0]
|
||||
.kill_process(request)
|
||||
.await
|
||||
.context(error::InvokeFrontendSnafu)?;
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Ticket {
|
||||
pub(crate) catalog: String,
|
||||
pub(crate) manager: ProcessManagerRef,
|
||||
pub(crate) id: u64,
|
||||
pub cancellation_handle: Arc<CancellationHandle>,
|
||||
}
|
||||
|
||||
impl Drop for Ticket {
|
||||
fn drop(&mut self) {
|
||||
self.manager
|
||||
.deregister_query(std::mem::take(&mut self.catalog), self.id);
|
||||
}
|
||||
}
|
||||
|
||||
struct CancellableProcess {
|
||||
handle: Arc<CancellationHandle>,
|
||||
process: ProcessInfo,
|
||||
}
|
||||
|
||||
impl Drop for CancellableProcess {
|
||||
fn drop(&mut self) {
|
||||
PROCESS_LIST_COUNT
|
||||
.with_label_values(&[&self.process.catalog])
|
||||
.dec();
|
||||
}
|
||||
}
|
||||
|
||||
impl CancellableProcess {
|
||||
fn new(handle: Arc<CancellationHandle>, process: ProcessInfo) -> Self {
|
||||
PROCESS_LIST_COUNT
|
||||
.with_label_values(&[&process.catalog])
|
||||
.inc();
|
||||
Self { handle, process }
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for CancellableProcess {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("CancellableProcess")
|
||||
.field("cancelled", &self.handle.is_cancelled())
|
||||
.field("process", &self.process)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::process_manager::ProcessManager;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_register_query() {
|
||||
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
|
||||
let ticket = process_manager.clone().register_query(
|
||||
"public".to_string(),
|
||||
vec!["test".to_string()],
|
||||
"SELECT * FROM table".to_string(),
|
||||
"".to_string(),
|
||||
None,
|
||||
);
|
||||
|
||||
let running_processes = process_manager.local_processes(None).unwrap();
|
||||
assert_eq!(running_processes.len(), 1);
|
||||
assert_eq!(&running_processes[0].frontend, "127.0.0.1:8000");
|
||||
assert_eq!(running_processes[0].id, ticket.id);
|
||||
assert_eq!(&running_processes[0].query, "SELECT * FROM table");
|
||||
|
||||
drop(ticket);
|
||||
assert_eq!(process_manager.local_processes(None).unwrap().len(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_register_query_with_custom_id() {
|
||||
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
|
||||
let custom_id = 12345;
|
||||
|
||||
let ticket = process_manager.clone().register_query(
|
||||
"public".to_string(),
|
||||
vec!["test".to_string()],
|
||||
"SELECT * FROM table".to_string(),
|
||||
"client1".to_string(),
|
||||
Some(custom_id),
|
||||
);
|
||||
|
||||
assert_eq!(ticket.id, custom_id);
|
||||
|
||||
let running_processes = process_manager.local_processes(None).unwrap();
|
||||
assert_eq!(running_processes.len(), 1);
|
||||
assert_eq!(running_processes[0].id, custom_id);
|
||||
assert_eq!(&running_processes[0].client, "client1");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_multiple_queries_same_catalog() {
|
||||
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
|
||||
|
||||
let ticket1 = process_manager.clone().register_query(
|
||||
"public".to_string(),
|
||||
vec!["schema1".to_string()],
|
||||
"SELECT * FROM table1".to_string(),
|
||||
"client1".to_string(),
|
||||
None,
|
||||
);
|
||||
|
||||
let ticket2 = process_manager.clone().register_query(
|
||||
"public".to_string(),
|
||||
vec!["schema2".to_string()],
|
||||
"SELECT * FROM table2".to_string(),
|
||||
"client2".to_string(),
|
||||
None,
|
||||
);
|
||||
|
||||
let running_processes = process_manager.local_processes(Some("public")).unwrap();
|
||||
assert_eq!(running_processes.len(), 2);
|
||||
|
||||
// Verify both processes are present
|
||||
let ids: Vec<u64> = running_processes.iter().map(|p| p.id).collect();
|
||||
assert!(ids.contains(&ticket1.id));
|
||||
assert!(ids.contains(&ticket2.id));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_multiple_catalogs() {
|
||||
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
|
||||
|
||||
let _ticket1 = process_manager.clone().register_query(
|
||||
"catalog1".to_string(),
|
||||
vec!["schema1".to_string()],
|
||||
"SELECT * FROM table1".to_string(),
|
||||
"client1".to_string(),
|
||||
None,
|
||||
);
|
||||
|
||||
let _ticket2 = process_manager.clone().register_query(
|
||||
"catalog2".to_string(),
|
||||
vec!["schema2".to_string()],
|
||||
"SELECT * FROM table2".to_string(),
|
||||
"client2".to_string(),
|
||||
None,
|
||||
);
|
||||
|
||||
// Test listing processes for specific catalog
|
||||
let catalog1_processes = process_manager.local_processes(Some("catalog1")).unwrap();
|
||||
assert_eq!(catalog1_processes.len(), 1);
|
||||
assert_eq!(&catalog1_processes[0].catalog, "catalog1");
|
||||
|
||||
let catalog2_processes = process_manager.local_processes(Some("catalog2")).unwrap();
|
||||
assert_eq!(catalog2_processes.len(), 1);
|
||||
assert_eq!(&catalog2_processes[0].catalog, "catalog2");
|
||||
|
||||
// Test listing all processes
|
||||
let all_processes = process_manager.local_processes(None).unwrap();
|
||||
assert_eq!(all_processes.len(), 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_deregister_query() {
|
||||
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
|
||||
|
||||
let ticket = process_manager.clone().register_query(
|
||||
"public".to_string(),
|
||||
vec!["test".to_string()],
|
||||
"SELECT * FROM table".to_string(),
|
||||
"client1".to_string(),
|
||||
None,
|
||||
);
|
||||
assert_eq!(process_manager.local_processes(None).unwrap().len(), 1);
|
||||
process_manager.deregister_query("public".to_string(), ticket.id);
|
||||
assert_eq!(process_manager.local_processes(None).unwrap().len(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cancellation_handle() {
|
||||
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
|
||||
|
||||
let ticket = process_manager.clone().register_query(
|
||||
"public".to_string(),
|
||||
vec!["test".to_string()],
|
||||
"SELECT * FROM table".to_string(),
|
||||
"client1".to_string(),
|
||||
None,
|
||||
);
|
||||
|
||||
assert!(!ticket.cancellation_handle.is_cancelled());
|
||||
ticket.cancellation_handle.cancel();
|
||||
assert!(ticket.cancellation_handle.is_cancelled());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_kill_local_process() {
|
||||
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
|
||||
|
||||
let ticket = process_manager.clone().register_query(
|
||||
"public".to_string(),
|
||||
vec!["test".to_string()],
|
||||
"SELECT * FROM table".to_string(),
|
||||
"client1".to_string(),
|
||||
None,
|
||||
);
|
||||
assert!(!ticket.cancellation_handle.is_cancelled());
|
||||
let killed = process_manager
|
||||
.kill_process(
|
||||
"127.0.0.1:8000".to_string(),
|
||||
"public".to_string(),
|
||||
ticket.id,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(killed);
|
||||
assert_eq!(process_manager.local_processes(None).unwrap().len(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_kill_nonexistent_process() {
|
||||
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
|
||||
let killed = process_manager
|
||||
.kill_process("127.0.0.1:8000".to_string(), "public".to_string(), 999)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(!killed);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_kill_process_nonexistent_catalog() {
|
||||
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
|
||||
let killed = process_manager
|
||||
.kill_process("127.0.0.1:8000".to_string(), "nonexistent".to_string(), 1)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(!killed);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_process_info_fields() {
|
||||
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
|
||||
|
||||
let _ticket = process_manager.clone().register_query(
|
||||
"test_catalog".to_string(),
|
||||
vec!["schema1".to_string(), "schema2".to_string()],
|
||||
"SELECT COUNT(*) FROM users WHERE age > 18".to_string(),
|
||||
"test_client".to_string(),
|
||||
Some(42),
|
||||
);
|
||||
|
||||
let processes = process_manager.local_processes(None).unwrap();
|
||||
assert_eq!(processes.len(), 1);
|
||||
|
||||
let process = &processes[0];
|
||||
assert_eq!(process.id, 42);
|
||||
assert_eq!(&process.catalog, "test_catalog");
|
||||
assert_eq!(process.schemas, vec!["schema1", "schema2"]);
|
||||
assert_eq!(&process.query, "SELECT COUNT(*) FROM users WHERE age > 18");
|
||||
assert_eq!(&process.client, "test_client");
|
||||
assert_eq!(&process.frontend, "127.0.0.1:8000");
|
||||
assert!(process.start_timestamp > 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_ticket_drop_deregisters_process() {
|
||||
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
|
||||
|
||||
{
|
||||
let _ticket = process_manager.clone().register_query(
|
||||
"public".to_string(),
|
||||
vec!["test".to_string()],
|
||||
"SELECT * FROM table".to_string(),
|
||||
"client1".to_string(),
|
||||
None,
|
||||
);
|
||||
|
||||
// Process should be registered
|
||||
assert_eq!(process_manager.local_processes(None).unwrap().len(), 1);
|
||||
} // ticket goes out of scope here
|
||||
|
||||
// Process should be automatically deregistered
|
||||
assert_eq!(process_manager.local_processes(None).unwrap().len(), 0);
|
||||
}
|
||||
}
|
||||
@@ -19,6 +19,7 @@ mod information_memory_table;
|
||||
pub mod key_column_usage;
|
||||
mod partitions;
|
||||
mod procedure_info;
|
||||
mod process_list;
|
||||
pub mod region_peers;
|
||||
mod region_statistics;
|
||||
mod runtime_metrics;
|
||||
@@ -42,6 +43,7 @@ use common_recordbatch::SendableRecordBatchStream;
|
||||
use datatypes::schema::SchemaRef;
|
||||
use lazy_static::lazy_static;
|
||||
use paste::paste;
|
||||
use process_list::InformationSchemaProcessList;
|
||||
use store_api::storage::{ScanRequest, TableId};
|
||||
use table::metadata::TableType;
|
||||
use table::TableRef;
|
||||
@@ -50,6 +52,7 @@ use views::InformationSchemaViews;
|
||||
|
||||
use self::columns::InformationSchemaColumns;
|
||||
use crate::error::{Error, Result};
|
||||
use crate::process_manager::ProcessManagerRef;
|
||||
use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo;
|
||||
use crate::system_schema::information_schema::flows::InformationSchemaFlows;
|
||||
use crate::system_schema::information_schema::information_memory_table::get_schema_columns;
|
||||
@@ -113,6 +116,7 @@ macro_rules! setup_memory_table {
|
||||
pub struct InformationSchemaProvider {
|
||||
catalog_name: String,
|
||||
catalog_manager: Weak<dyn CatalogManager>,
|
||||
process_manager: Option<ProcessManagerRef>,
|
||||
flow_metadata_manager: Arc<FlowMetadataManager>,
|
||||
tables: HashMap<String, TableRef>,
|
||||
}
|
||||
@@ -207,6 +211,10 @@ impl SystemSchemaProviderInner for InformationSchemaProvider {
|
||||
self.catalog_manager.clone(),
|
||||
),
|
||||
) as _),
|
||||
PROCESS_LIST => self
|
||||
.process_manager
|
||||
.as_ref()
|
||||
.map(|p| Arc::new(InformationSchemaProcessList::new(p.clone())) as _),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
@@ -217,11 +225,13 @@ impl InformationSchemaProvider {
|
||||
catalog_name: String,
|
||||
catalog_manager: Weak<dyn CatalogManager>,
|
||||
flow_metadata_manager: Arc<FlowMetadataManager>,
|
||||
process_manager: Option<ProcessManagerRef>,
|
||||
) -> Self {
|
||||
let mut provider = Self {
|
||||
catalog_name,
|
||||
catalog_manager,
|
||||
flow_metadata_manager,
|
||||
process_manager,
|
||||
tables: HashMap::new(),
|
||||
};
|
||||
|
||||
@@ -277,6 +287,9 @@ impl InformationSchemaProvider {
|
||||
self.build_table(TABLE_CONSTRAINTS).unwrap(),
|
||||
);
|
||||
tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap());
|
||||
if let Some(process_list) = self.build_table(PROCESS_LIST) {
|
||||
tables.insert(PROCESS_LIST.to_string(), process_list);
|
||||
}
|
||||
// Add memory tables
|
||||
for name in MEMORY_TABLES.iter() {
|
||||
tables.insert((*name).to_string(), self.build_table(name).expect(name));
|
||||
|
||||
189
src/catalog/src/system_schema/information_schema/process_list.rs
Normal file
189
src/catalog/src/system_schema/information_schema/process_list.rs
Normal file
@@ -0,0 +1,189 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_catalog::consts::INFORMATION_SCHEMA_PROCESS_LIST_TABLE_ID;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_frontend::DisplayProcessId;
|
||||
use common_recordbatch::adapter::RecordBatchStreamAdapter;
|
||||
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
|
||||
use common_time::util::current_time_millis;
|
||||
use common_time::{Duration, Timestamp};
|
||||
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
|
||||
use datatypes::prelude::ConcreteDataType as CDT;
|
||||
use datatypes::scalars::ScalarVectorBuilder;
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{
|
||||
DurationMillisecondVectorBuilder, StringVectorBuilder, TimestampMillisecondVectorBuilder,
|
||||
VectorRef,
|
||||
};
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::{ScanRequest, TableId};
|
||||
|
||||
use crate::error::{self, InternalSnafu};
|
||||
use crate::information_schema::Predicates;
|
||||
use crate::process_manager::ProcessManagerRef;
|
||||
use crate::system_schema::information_schema::InformationTable;
|
||||
|
||||
/// Column names of `information_schema.process_list`
|
||||
const ID: &str = "id";
|
||||
const CATALOG: &str = "catalog";
|
||||
const SCHEMAS: &str = "schemas";
|
||||
const QUERY: &str = "query";
|
||||
const CLIENT: &str = "client";
|
||||
const FRONTEND: &str = "frontend";
|
||||
const START_TIMESTAMP: &str = "start_timestamp";
|
||||
const ELAPSED_TIME: &str = "elapsed_time";
|
||||
|
||||
/// `information_schema.process_list` table implementation that tracks running
|
||||
/// queries in current cluster.
|
||||
pub struct InformationSchemaProcessList {
|
||||
schema: SchemaRef,
|
||||
process_manager: ProcessManagerRef,
|
||||
}
|
||||
|
||||
impl InformationSchemaProcessList {
|
||||
pub fn new(process_manager: ProcessManagerRef) -> Self {
|
||||
Self {
|
||||
schema: Self::schema(),
|
||||
process_manager,
|
||||
}
|
||||
}
|
||||
|
||||
fn schema() -> SchemaRef {
|
||||
Arc::new(Schema::new(vec![
|
||||
ColumnSchema::new(ID, CDT::string_datatype(), false),
|
||||
ColumnSchema::new(CATALOG, CDT::string_datatype(), false),
|
||||
ColumnSchema::new(SCHEMAS, CDT::string_datatype(), false),
|
||||
ColumnSchema::new(QUERY, CDT::string_datatype(), false),
|
||||
ColumnSchema::new(CLIENT, CDT::string_datatype(), false),
|
||||
ColumnSchema::new(FRONTEND, CDT::string_datatype(), false),
|
||||
ColumnSchema::new(
|
||||
START_TIMESTAMP,
|
||||
CDT::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
),
|
||||
ColumnSchema::new(ELAPSED_TIME, CDT::duration_millisecond_datatype(), false),
|
||||
]))
|
||||
}
|
||||
}
|
||||
|
||||
impl InformationTable for InformationSchemaProcessList {
|
||||
fn table_id(&self) -> TableId {
|
||||
INFORMATION_SCHEMA_PROCESS_LIST_TABLE_ID
|
||||
}
|
||||
|
||||
fn table_name(&self) -> &'static str {
|
||||
"process_list"
|
||||
}
|
||||
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.schema.clone()
|
||||
}
|
||||
|
||||
fn to_stream(&self, request: ScanRequest) -> error::Result<SendableRecordBatchStream> {
|
||||
let process_manager = self.process_manager.clone();
|
||||
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
|
||||
self.schema.arrow_schema().clone(),
|
||||
futures::stream::once(async move {
|
||||
make_process_list(process_manager, request)
|
||||
.await
|
||||
.map(RecordBatch::into_df_record_batch)
|
||||
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))
|
||||
}),
|
||||
));
|
||||
|
||||
Ok(Box::pin(
|
||||
RecordBatchStreamAdapter::try_new(stream)
|
||||
.map_err(BoxedError::new)
|
||||
.context(InternalSnafu)?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// Build running process list.
|
||||
async fn make_process_list(
|
||||
process_manager: ProcessManagerRef,
|
||||
request: ScanRequest,
|
||||
) -> error::Result<RecordBatch> {
|
||||
let predicates = Predicates::from_scan_request(&Some(request));
|
||||
let current_time = current_time_millis();
|
||||
// todo(hl): find a way to extract user catalog to filter queries from other users.
|
||||
let queries = process_manager.list_all_processes(None).await?;
|
||||
|
||||
let mut id_builder = StringVectorBuilder::with_capacity(queries.len());
|
||||
let mut catalog_builder = StringVectorBuilder::with_capacity(queries.len());
|
||||
let mut schemas_builder = StringVectorBuilder::with_capacity(queries.len());
|
||||
let mut query_builder = StringVectorBuilder::with_capacity(queries.len());
|
||||
let mut client_builder = StringVectorBuilder::with_capacity(queries.len());
|
||||
let mut frontend_builder = StringVectorBuilder::with_capacity(queries.len());
|
||||
let mut start_time_builder = TimestampMillisecondVectorBuilder::with_capacity(queries.len());
|
||||
let mut elapsed_time_builder = DurationMillisecondVectorBuilder::with_capacity(queries.len());
|
||||
|
||||
for process in queries {
|
||||
let display_id = DisplayProcessId {
|
||||
server_addr: process.frontend.to_string(),
|
||||
id: process.id,
|
||||
}
|
||||
.to_string();
|
||||
let schemas = process.schemas.join(",");
|
||||
let id = Value::from(display_id);
|
||||
let catalog = Value::from(process.catalog);
|
||||
let schemas = Value::from(schemas);
|
||||
let query = Value::from(process.query);
|
||||
let client = Value::from(process.client);
|
||||
let frontend = Value::from(process.frontend);
|
||||
let start_timestamp = Value::from(Timestamp::new_millisecond(process.start_timestamp));
|
||||
let elapsed_time = Value::from(Duration::new_millisecond(
|
||||
current_time - process.start_timestamp,
|
||||
));
|
||||
let row = [
|
||||
(ID, &id),
|
||||
(CATALOG, &catalog),
|
||||
(SCHEMAS, &schemas),
|
||||
(QUERY, &query),
|
||||
(CLIENT, &client),
|
||||
(FRONTEND, &frontend),
|
||||
(START_TIMESTAMP, &start_timestamp),
|
||||
(ELAPSED_TIME, &elapsed_time),
|
||||
];
|
||||
if predicates.eval(&row) {
|
||||
id_builder.push(id.as_string().as_deref());
|
||||
catalog_builder.push(catalog.as_string().as_deref());
|
||||
schemas_builder.push(schemas.as_string().as_deref());
|
||||
query_builder.push(query.as_string().as_deref());
|
||||
client_builder.push(client.as_string().as_deref());
|
||||
frontend_builder.push(frontend.as_string().as_deref());
|
||||
start_time_builder.push(start_timestamp.as_timestamp().map(|t| t.value().into()));
|
||||
elapsed_time_builder.push(elapsed_time.as_duration().map(|d| d.value().into()));
|
||||
}
|
||||
}
|
||||
|
||||
RecordBatch::new(
|
||||
InformationSchemaProcessList::schema(),
|
||||
vec![
|
||||
Arc::new(id_builder.finish()) as VectorRef,
|
||||
Arc::new(catalog_builder.finish()) as VectorRef,
|
||||
Arc::new(schemas_builder.finish()) as VectorRef,
|
||||
Arc::new(query_builder.finish()) as VectorRef,
|
||||
Arc::new(client_builder.finish()) as VectorRef,
|
||||
Arc::new(frontend_builder.finish()) as VectorRef,
|
||||
Arc::new(start_time_builder.finish()) as VectorRef,
|
||||
Arc::new(elapsed_time_builder.finish()) as VectorRef,
|
||||
],
|
||||
)
|
||||
.context(error::CreateRecordBatchSnafu)
|
||||
}
|
||||
@@ -47,3 +47,4 @@ pub const VIEWS: &str = "views";
|
||||
pub const FLOWS: &str = "flows";
|
||||
pub const PROCEDURE_INFO: &str = "procedure_info";
|
||||
pub const REGION_STATISTICS: &str = "region_statistics";
|
||||
pub const PROCESS_LIST: &str = "process_list";
|
||||
|
||||
@@ -328,6 +328,7 @@ mod tests {
|
||||
backend.clone(),
|
||||
layered_cache_registry,
|
||||
None,
|
||||
None,
|
||||
);
|
||||
let table_metadata_manager = TableMetadataManager::new(backend);
|
||||
let mut view_info = common_meta::key::test_utils::new_test_table_info(1024, vec![]);
|
||||
|
||||
@@ -58,6 +58,7 @@ where
|
||||
info!("{desc}, average operation cost: {cost:.2} ms");
|
||||
}
|
||||
|
||||
/// Command to benchmark table metadata operations.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct BenchTableMetadataCommand {
|
||||
#[clap(long)]
|
||||
|
||||
39
src/cli/src/data.rs
Normal file
39
src/cli/src/data.rs
Normal file
@@ -0,0 +1,39 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod export;
|
||||
mod import;
|
||||
|
||||
use clap::Subcommand;
|
||||
use common_error::ext::BoxedError;
|
||||
|
||||
use crate::data::export::ExportCommand;
|
||||
use crate::data::import::ImportCommand;
|
||||
use crate::Tool;
|
||||
|
||||
/// Command for data operations including exporting data from and importing data into GreptimeDB.
|
||||
#[derive(Subcommand)]
|
||||
pub enum DataCommand {
|
||||
Export(ExportCommand),
|
||||
Import(ImportCommand),
|
||||
}
|
||||
|
||||
impl DataCommand {
|
||||
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
|
||||
match self {
|
||||
DataCommand::Export(cmd) => cmd.build().await,
|
||||
DataCommand::Import(cmd) => cmd.build().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -50,6 +50,7 @@ enum ExportTarget {
|
||||
All,
|
||||
}
|
||||
|
||||
/// Command for exporting data from the GreptimeDB.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct ExportCommand {
|
||||
/// Server address to connect
|
||||
@@ -40,6 +40,7 @@ enum ImportTarget {
|
||||
All,
|
||||
}
|
||||
|
||||
/// Command to import data from a directory into a GreptimeDB instance.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct ImportCommand {
|
||||
/// Server address to connect
|
||||
@@ -30,6 +30,7 @@ pub enum Error {
|
||||
location: Location,
|
||||
msg: String,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to create default catalog and schema"))]
|
||||
InitMetadata {
|
||||
#[snafu(implicit)]
|
||||
@@ -228,19 +229,41 @@ pub enum Error {
|
||||
#[snafu(source)]
|
||||
error: ObjectStoreError,
|
||||
},
|
||||
|
||||
#[snafu(display("S3 config need be set"))]
|
||||
S3ConfigNotSet {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Output directory not set"))]
|
||||
OutputDirNotSet {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
#[snafu(display("KV backend not set: {}", backend))]
|
||||
KvBackendNotSet {
|
||||
backend: String,
|
||||
|
||||
#[snafu(display("Empty store addresses"))]
|
||||
EmptyStoreAddrs {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unsupported memory backend"))]
|
||||
UnsupportedMemoryBackend {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("File path invalid: {}", msg))]
|
||||
InvalidFilePath {
|
||||
msg: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid arguments: {}", msg))]
|
||||
InvalidArguments {
|
||||
msg: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
@@ -262,6 +285,9 @@ impl ErrorExt for Error {
|
||||
| Error::ConnectEtcd { .. }
|
||||
| Error::CreateDir { .. }
|
||||
| Error::EmptyResult { .. }
|
||||
| Error::InvalidFilePath { .. }
|
||||
| Error::UnsupportedMemoryBackend { .. }
|
||||
| Error::InvalidArguments { .. }
|
||||
| Error::ParseProxyOpts { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
Error::StartProcedureManager { source, .. }
|
||||
@@ -282,7 +308,7 @@ impl ErrorExt for Error {
|
||||
Error::OpenDal { .. } => StatusCode::Internal,
|
||||
Error::S3ConfigNotSet { .. }
|
||||
| Error::OutputDirNotSet { .. }
|
||||
| Error::KvBackendNotSet { .. } => StatusCode::InvalidArguments,
|
||||
| Error::EmptyStoreAddrs { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
Error::BuildRuntime { source, .. } => source.status_code(),
|
||||
|
||||
|
||||
@@ -13,11 +13,10 @@
|
||||
// limitations under the License.
|
||||
|
||||
mod bench;
|
||||
mod data;
|
||||
mod database;
|
||||
pub mod error;
|
||||
mod export;
|
||||
mod import;
|
||||
mod meta_snapshot;
|
||||
mod metadata;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use clap::Parser;
|
||||
@@ -26,9 +25,8 @@ pub use database::DatabaseClient;
|
||||
use error::Result;
|
||||
|
||||
pub use crate::bench::BenchTableMetadataCommand;
|
||||
pub use crate::export::ExportCommand;
|
||||
pub use crate::import::ImportCommand;
|
||||
pub use crate::meta_snapshot::{MetaRestoreCommand, MetaSnapshotCommand};
|
||||
pub use crate::data::DataCommand;
|
||||
pub use crate::metadata::MetadataCommand;
|
||||
|
||||
#[async_trait]
|
||||
pub trait Tool: Send + Sync {
|
||||
|
||||
42
src/cli/src/metadata.rs
Normal file
42
src/cli/src/metadata.rs
Normal file
@@ -0,0 +1,42 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod common;
|
||||
mod control;
|
||||
mod snapshot;
|
||||
|
||||
use clap::Subcommand;
|
||||
use common_error::ext::BoxedError;
|
||||
|
||||
use crate::metadata::control::ControlCommand;
|
||||
use crate::metadata::snapshot::SnapshotCommand;
|
||||
use crate::Tool;
|
||||
|
||||
/// Command for managing metadata operations, including saving metadata snapshots and restoring metadata from snapshots.
|
||||
#[derive(Subcommand)]
|
||||
pub enum MetadataCommand {
|
||||
#[clap(subcommand)]
|
||||
Snapshot(SnapshotCommand),
|
||||
#[clap(subcommand)]
|
||||
Control(ControlCommand),
|
||||
}
|
||||
|
||||
impl MetadataCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
match self {
|
||||
MetadataCommand::Snapshot(cmd) => cmd.build().await,
|
||||
MetadataCommand::Control(cmd) => cmd.build().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
116
src/cli/src/metadata/common.rs
Normal file
116
src/cli/src/metadata/common.rs
Normal file
@@ -0,0 +1,116 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use clap::Parser;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::kv_backend::chroot::ChrootKvBackend;
|
||||
use common_meta::kv_backend::etcd::EtcdStore;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use meta_srv::bootstrap::create_etcd_client;
|
||||
use meta_srv::metasrv::BackendImpl;
|
||||
|
||||
use crate::error::{EmptyStoreAddrsSnafu, UnsupportedMemoryBackendSnafu};
|
||||
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub(crate) struct StoreConfig {
|
||||
/// The endpoint of store. one of etcd, postgres or mysql.
|
||||
///
|
||||
/// For postgres store, the format is:
|
||||
/// "password=password dbname=postgres user=postgres host=localhost port=5432"
|
||||
///
|
||||
/// For etcd store, the format is:
|
||||
/// "127.0.0.1:2379"
|
||||
///
|
||||
/// For mysql store, the format is:
|
||||
/// "mysql://user:password@ip:port/dbname"
|
||||
#[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)]
|
||||
store_addrs: Vec<String>,
|
||||
|
||||
/// The maximum number of operations in a transaction. Only used when using [etcd-store].
|
||||
#[clap(long, default_value = "128")]
|
||||
max_txn_ops: usize,
|
||||
|
||||
/// The metadata store backend.
|
||||
#[clap(long, value_enum, default_value = "etcd-store")]
|
||||
backend: BackendImpl,
|
||||
|
||||
/// The key prefix of the metadata store.
|
||||
#[clap(long, default_value = "")]
|
||||
store_key_prefix: String,
|
||||
|
||||
/// The table name in RDS to store metadata. Only used when using [postgres-store] or [mysql-store].
|
||||
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
|
||||
#[clap(long, default_value = common_meta::kv_backend::DEFAULT_META_TABLE_NAME)]
|
||||
meta_table_name: String,
|
||||
}
|
||||
|
||||
impl StoreConfig {
|
||||
/// Builds a [`KvBackendRef`] from the store configuration.
|
||||
pub async fn build(&self) -> Result<KvBackendRef, BoxedError> {
|
||||
let max_txn_ops = self.max_txn_ops;
|
||||
let store_addrs = &self.store_addrs;
|
||||
if store_addrs.is_empty() {
|
||||
EmptyStoreAddrsSnafu.fail().map_err(BoxedError::new)
|
||||
} else {
|
||||
let kvbackend = match self.backend {
|
||||
BackendImpl::EtcdStore => {
|
||||
let etcd_client = create_etcd_client(store_addrs)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops))
|
||||
}
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
BackendImpl::PostgresStore => {
|
||||
let table_name = &self.meta_table_name;
|
||||
let pool = meta_srv::bootstrap::create_postgres_pool(store_addrs)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(common_meta::kv_backend::rds::PgStore::with_pg_pool(
|
||||
pool,
|
||||
table_name,
|
||||
max_txn_ops,
|
||||
)
|
||||
.await
|
||||
.map_err(BoxedError::new)?)
|
||||
}
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
BackendImpl::MysqlStore => {
|
||||
let table_name = &self.meta_table_name;
|
||||
let pool = meta_srv::bootstrap::create_mysql_pool(store_addrs)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(common_meta::kv_backend::rds::MySqlStore::with_mysql_pool(
|
||||
pool,
|
||||
table_name,
|
||||
max_txn_ops,
|
||||
)
|
||||
.await
|
||||
.map_err(BoxedError::new)?)
|
||||
}
|
||||
BackendImpl::MemoryStore => UnsupportedMemoryBackendSnafu
|
||||
.fail()
|
||||
.map_err(BoxedError::new),
|
||||
};
|
||||
if self.store_key_prefix.is_empty() {
|
||||
kvbackend
|
||||
} else {
|
||||
let chroot_kvbackend =
|
||||
ChrootKvBackend::new(self.store_key_prefix.as_bytes().to_vec(), kvbackend?);
|
||||
Ok(Arc::new(chroot_kvbackend))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
38
src/cli/src/metadata/control.rs
Normal file
38
src/cli/src/metadata/control.rs
Normal file
@@ -0,0 +1,38 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod get;
|
||||
mod utils;
|
||||
|
||||
use clap::Subcommand;
|
||||
use common_error::ext::BoxedError;
|
||||
use get::GetCommand;
|
||||
|
||||
use crate::Tool;
|
||||
|
||||
/// Subcommand for metadata control.
|
||||
#[derive(Subcommand)]
|
||||
pub enum ControlCommand {
|
||||
/// Get the metadata from the metasrv.
|
||||
#[clap(subcommand)]
|
||||
Get(GetCommand),
|
||||
}
|
||||
|
||||
impl ControlCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
match self {
|
||||
ControlCommand::Get(cmd) => cmd.build().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
242
src/cli/src/metadata/control/get.rs
Normal file
242
src/cli/src/metadata/control/get.rs
Normal file
@@ -0,0 +1,242 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::cmp::min;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use clap::{Parser, Subcommand};
|
||||
use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_catalog::format_full_table_name;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::key::table_info::TableInfoKey;
|
||||
use common_meta::key::table_name::TableNameKey;
|
||||
use common_meta::key::table_route::TableRouteKey;
|
||||
use common_meta::key::TableMetadataManager;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use common_meta::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
|
||||
use common_meta::rpc::store::RangeRequest;
|
||||
use futures::TryStreamExt;
|
||||
|
||||
use crate::error::InvalidArgumentsSnafu;
|
||||
use crate::metadata::common::StoreConfig;
|
||||
use crate::metadata::control::utils::{decode_key_value, json_fromatter};
|
||||
use crate::Tool;
|
||||
|
||||
/// Subcommand for get command.
|
||||
#[derive(Subcommand)]
|
||||
pub enum GetCommand {
|
||||
Key(GetKeyCommand),
|
||||
Table(GetTableCommand),
|
||||
}
|
||||
|
||||
impl GetCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
match self {
|
||||
GetCommand::Key(cmd) => cmd.build().await,
|
||||
GetCommand::Table(cmd) => cmd.build().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get key-value pairs from the metadata store.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct GetKeyCommand {
|
||||
/// The key to get from the metadata store. If empty, returns all key-value pairs.
|
||||
#[clap(default_value = "")]
|
||||
key: String,
|
||||
|
||||
/// Whether to perform a prefix query. If true, returns all key-value pairs where the key starts with the given prefix.
|
||||
#[clap(long, default_value = "false")]
|
||||
prefix: bool,
|
||||
|
||||
/// The maximum number of key-value pairs to return. If 0, returns all key-value pairs.
|
||||
#[clap(long, default_value = "0")]
|
||||
limit: u64,
|
||||
|
||||
#[clap(flatten)]
|
||||
store: StoreConfig,
|
||||
}
|
||||
|
||||
impl GetKeyCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
let kvbackend = self.store.build().await?;
|
||||
Ok(Box::new(GetKeyTool {
|
||||
kvbackend,
|
||||
key: self.key.clone(),
|
||||
prefix: self.prefix,
|
||||
limit: self.limit,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
struct GetKeyTool {
|
||||
kvbackend: KvBackendRef,
|
||||
key: String,
|
||||
prefix: bool,
|
||||
limit: u64,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Tool for GetKeyTool {
|
||||
async fn do_work(&self) -> Result<(), BoxedError> {
|
||||
let mut req = RangeRequest::default();
|
||||
if self.prefix {
|
||||
req = req.with_prefix(self.key.as_bytes());
|
||||
} else {
|
||||
req = req.with_key(self.key.as_bytes());
|
||||
}
|
||||
let page_size = if self.limit > 0 {
|
||||
min(self.limit as usize, DEFAULT_PAGE_SIZE)
|
||||
} else {
|
||||
DEFAULT_PAGE_SIZE
|
||||
};
|
||||
let pagination_stream =
|
||||
PaginationStream::new(self.kvbackend.clone(), req, page_size, decode_key_value);
|
||||
let mut stream = Box::pin(pagination_stream.into_stream());
|
||||
let mut counter = 0;
|
||||
|
||||
while let Some((key, value)) = stream.try_next().await.map_err(BoxedError::new)? {
|
||||
print!("{}\n{}\n", key, value);
|
||||
counter += 1;
|
||||
if self.limit > 0 && counter >= self.limit {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Get table metadata from the metadata store via table id.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct GetTableCommand {
|
||||
/// Get table metadata by table id.
|
||||
#[clap(long)]
|
||||
table_id: Option<u32>,
|
||||
|
||||
/// Get table metadata by table name.
|
||||
#[clap(long)]
|
||||
table_name: Option<String>,
|
||||
|
||||
/// The schema name of the table.
|
||||
#[clap(long)]
|
||||
schema_name: Option<String>,
|
||||
|
||||
/// Pretty print the output.
|
||||
#[clap(long, default_value = "false")]
|
||||
pretty: bool,
|
||||
|
||||
#[clap(flatten)]
|
||||
store: StoreConfig,
|
||||
}
|
||||
|
||||
impl GetTableCommand {
|
||||
pub fn validate(&self) -> Result<(), BoxedError> {
|
||||
if self.table_id.is_none() && self.table_name.is_none() {
|
||||
return Err(BoxedError::new(
|
||||
InvalidArgumentsSnafu {
|
||||
msg: "You must specify either --table-id or --table-name.",
|
||||
}
|
||||
.build(),
|
||||
));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct GetTableTool {
|
||||
kvbackend: KvBackendRef,
|
||||
table_id: Option<u32>,
|
||||
table_name: Option<String>,
|
||||
schema_name: Option<String>,
|
||||
pretty: bool,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Tool for GetTableTool {
|
||||
async fn do_work(&self) -> Result<(), BoxedError> {
|
||||
let table_metadata_manager = TableMetadataManager::new(self.kvbackend.clone());
|
||||
let table_name_manager = table_metadata_manager.table_name_manager();
|
||||
let table_info_manager = table_metadata_manager.table_info_manager();
|
||||
let table_route_manager = table_metadata_manager.table_route_manager();
|
||||
|
||||
let table_id = if let Some(table_name) = &self.table_name {
|
||||
let catalog = DEFAULT_CATALOG_NAME.to_string();
|
||||
let schema_name = self
|
||||
.schema_name
|
||||
.clone()
|
||||
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());
|
||||
let key = TableNameKey::new(&catalog, &schema_name, table_name);
|
||||
|
||||
let Some(table_name) = table_name_manager.get(key).await.map_err(BoxedError::new)?
|
||||
else {
|
||||
println!(
|
||||
"Table({}) not found",
|
||||
format_full_table_name(&catalog, &schema_name, table_name)
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
table_name.table_id()
|
||||
} else {
|
||||
// Safety: we have validated that table_id or table_name is not None
|
||||
self.table_id.unwrap()
|
||||
};
|
||||
|
||||
let table_info = table_info_manager
|
||||
.get(table_id)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
if let Some(table_info) = table_info {
|
||||
println!(
|
||||
"{}\n{}",
|
||||
TableInfoKey::new(table_id),
|
||||
json_fromatter(self.pretty, &*table_info)
|
||||
);
|
||||
} else {
|
||||
println!("Table info not found");
|
||||
}
|
||||
|
||||
let table_route = table_route_manager
|
||||
.table_route_storage()
|
||||
.get(table_id)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
if let Some(table_route) = table_route {
|
||||
println!(
|
||||
"{}\n{}",
|
||||
TableRouteKey::new(table_id),
|
||||
json_fromatter(self.pretty, &table_route)
|
||||
);
|
||||
} else {
|
||||
println!("Table route not found");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl GetTableCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
self.validate()?;
|
||||
let kvbackend = self.store.build().await?;
|
||||
Ok(Box::new(GetTableTool {
|
||||
kvbackend,
|
||||
table_id: self.table_id,
|
||||
table_name: self.table_name.clone(),
|
||||
schema_name: self.schema_name.clone(),
|
||||
pretty: self.pretty,
|
||||
}))
|
||||
}
|
||||
}
|
||||
36
src/cli/src/metadata/control/utils.rs
Normal file
36
src/cli/src/metadata/control/utils.rs
Normal file
@@ -0,0 +1,36 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_meta::error::Result as CommonMetaResult;
|
||||
use common_meta::rpc::KeyValue;
|
||||
use serde::Serialize;
|
||||
|
||||
/// Decodes a key-value pair into a string.
|
||||
pub fn decode_key_value(kv: KeyValue) -> CommonMetaResult<(String, String)> {
|
||||
let key = String::from_utf8_lossy(&kv.key).to_string();
|
||||
let value = String::from_utf8_lossy(&kv.value).to_string();
|
||||
Ok((key, value))
|
||||
}
|
||||
|
||||
/// Formats a value as a JSON string.
|
||||
pub fn json_fromatter<T>(pretty: bool, value: &T) -> String
|
||||
where
|
||||
T: Serialize,
|
||||
{
|
||||
if pretty {
|
||||
serde_json::to_string_pretty(value).unwrap()
|
||||
} else {
|
||||
serde_json::to_string(value).unwrap()
|
||||
}
|
||||
}
|
||||
@@ -12,96 +12,38 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::path::Path;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use clap::Parser;
|
||||
use clap::{Parser, Subcommand};
|
||||
use common_base::secrets::{ExposeSecret, SecretString};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::kv_backend::chroot::ChrootKvBackend;
|
||||
use common_meta::kv_backend::etcd::EtcdStore;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use common_meta::snapshot::MetadataSnapshotManager;
|
||||
use meta_srv::bootstrap::create_etcd_client;
|
||||
use meta_srv::metasrv::BackendImpl;
|
||||
use object_store::services::{Fs, S3};
|
||||
use object_store::ObjectStore;
|
||||
use snafu::ResultExt;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::error::{KvBackendNotSetSnafu, OpenDalSnafu, S3ConfigNotSetSnafu};
|
||||
use crate::error::{InvalidFilePathSnafu, OpenDalSnafu, S3ConfigNotSetSnafu};
|
||||
use crate::metadata::common::StoreConfig;
|
||||
use crate::Tool;
|
||||
#[derive(Debug, Default, Parser)]
|
||||
struct MetaConnection {
|
||||
/// The endpoint of store. one of etcd, pg or mysql.
|
||||
#[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)]
|
||||
store_addrs: Vec<String>,
|
||||
/// The database backend.
|
||||
#[clap(long, value_enum)]
|
||||
backend: Option<BackendImpl>,
|
||||
#[clap(long, default_value = "")]
|
||||
store_key_prefix: String,
|
||||
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
|
||||
#[clap(long,default_value = common_meta::kv_backend::DEFAULT_META_TABLE_NAME)]
|
||||
meta_table_name: String,
|
||||
#[clap(long, default_value = "128")]
|
||||
max_txn_ops: usize,
|
||||
|
||||
/// Subcommand for metadata snapshot operations, including saving snapshots, restoring from snapshots, and viewing snapshot information.
|
||||
#[derive(Subcommand)]
|
||||
pub enum SnapshotCommand {
|
||||
/// Save a snapshot of the current metadata state to a specified location.
|
||||
Save(SaveCommand),
|
||||
/// Restore metadata from a snapshot.
|
||||
Restore(RestoreCommand),
|
||||
/// Explore metadata from a snapshot.
|
||||
Info(InfoCommand),
|
||||
}
|
||||
|
||||
impl MetaConnection {
|
||||
pub async fn build(&self) -> Result<KvBackendRef, BoxedError> {
|
||||
let max_txn_ops = self.max_txn_ops;
|
||||
let store_addrs = &self.store_addrs;
|
||||
if store_addrs.is_empty() {
|
||||
KvBackendNotSetSnafu { backend: "all" }
|
||||
.fail()
|
||||
.map_err(BoxedError::new)
|
||||
} else {
|
||||
let kvbackend = match self.backend {
|
||||
Some(BackendImpl::EtcdStore) => {
|
||||
let etcd_client = create_etcd_client(store_addrs)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops))
|
||||
}
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
Some(BackendImpl::PostgresStore) => {
|
||||
let table_name = &self.meta_table_name;
|
||||
let pool = meta_srv::bootstrap::create_postgres_pool(store_addrs)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(common_meta::kv_backend::rds::PgStore::with_pg_pool(
|
||||
pool,
|
||||
table_name,
|
||||
max_txn_ops,
|
||||
)
|
||||
.await
|
||||
.map_err(BoxedError::new)?)
|
||||
}
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
Some(BackendImpl::MysqlStore) => {
|
||||
let table_name = &self.meta_table_name;
|
||||
let pool = meta_srv::bootstrap::create_mysql_pool(store_addrs)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(common_meta::kv_backend::rds::MySqlStore::with_mysql_pool(
|
||||
pool,
|
||||
table_name,
|
||||
max_txn_ops,
|
||||
)
|
||||
.await
|
||||
.map_err(BoxedError::new)?)
|
||||
}
|
||||
_ => KvBackendNotSetSnafu { backend: "all" }
|
||||
.fail()
|
||||
.map_err(BoxedError::new),
|
||||
};
|
||||
if self.store_key_prefix.is_empty() {
|
||||
kvbackend
|
||||
} else {
|
||||
let chroot_kvbackend =
|
||||
ChrootKvBackend::new(self.store_key_prefix.as_bytes().to_vec(), kvbackend?);
|
||||
Ok(Arc::new(chroot_kvbackend))
|
||||
}
|
||||
impl SnapshotCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
match self {
|
||||
SnapshotCommand::Save(cmd) => cmd.build().await,
|
||||
SnapshotCommand::Restore(cmd) => cmd.build().await,
|
||||
SnapshotCommand::Info(cmd) => cmd.build().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -170,10 +112,10 @@ impl S3Config {
|
||||
/// It will dump the metadata snapshot to local file or s3 bucket.
|
||||
/// The snapshot file will be in binary format.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct MetaSnapshotCommand {
|
||||
/// The connection to the metadata store.
|
||||
pub struct SaveCommand {
|
||||
/// The store configuration.
|
||||
#[clap(flatten)]
|
||||
connection: MetaConnection,
|
||||
store: StoreConfig,
|
||||
/// The s3 config.
|
||||
#[clap(flatten)]
|
||||
s3_config: S3Config,
|
||||
@@ -196,9 +138,9 @@ fn create_local_file_object_store(root: &str) -> Result<ObjectStore, BoxedError>
|
||||
Ok(object_store)
|
||||
}
|
||||
|
||||
impl MetaSnapshotCommand {
|
||||
impl SaveCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
let kvbackend = self.connection.build().await?;
|
||||
let kvbackend = self.store.build().await?;
|
||||
let output_dir = &self.output_dir;
|
||||
let object_store = self.s3_config.build(output_dir).map_err(BoxedError::new)?;
|
||||
if let Some(store) = object_store {
|
||||
@@ -218,7 +160,7 @@ impl MetaSnapshotCommand {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MetaSnapshotTool {
|
||||
struct MetaSnapshotTool {
|
||||
inner: MetadataSnapshotManager,
|
||||
target_file: String,
|
||||
}
|
||||
@@ -234,14 +176,16 @@ impl Tool for MetaSnapshotTool {
|
||||
}
|
||||
}
|
||||
|
||||
/// Restore metadata snapshot tool.
|
||||
/// This tool is used to restore metadata snapshot from etcd, pg or mysql.
|
||||
/// It will restore the metadata snapshot from local file or s3 bucket.
|
||||
/// Restore metadata from a snapshot file.
|
||||
///
|
||||
/// This command restores the metadata state from a previously saved snapshot.
|
||||
/// The snapshot can be loaded from either a local file system or an S3 bucket,
|
||||
/// depending on the provided configuration.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct MetaRestoreCommand {
|
||||
/// The connection to the metadata store.
|
||||
pub struct RestoreCommand {
|
||||
/// The store configuration.
|
||||
#[clap(flatten)]
|
||||
connection: MetaConnection,
|
||||
store: StoreConfig,
|
||||
/// The s3 config.
|
||||
#[clap(flatten)]
|
||||
s3_config: S3Config,
|
||||
@@ -255,9 +199,9 @@ pub struct MetaRestoreCommand {
|
||||
force: bool,
|
||||
}
|
||||
|
||||
impl MetaRestoreCommand {
|
||||
impl RestoreCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
let kvbackend = self.connection.build().await?;
|
||||
let kvbackend = self.store.build().await?;
|
||||
let input_dir = &self.input_dir;
|
||||
let object_store = self.s3_config.build(input_dir).map_err(BoxedError::new)?;
|
||||
if let Some(store) = object_store {
|
||||
@@ -279,7 +223,7 @@ impl MetaRestoreCommand {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MetaRestoreTool {
|
||||
struct MetaRestoreTool {
|
||||
inner: MetadataSnapshotManager,
|
||||
source_file: String,
|
||||
force: bool,
|
||||
@@ -327,3 +271,93 @@ impl Tool for MetaRestoreTool {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Explore metadata from a snapshot file.
|
||||
///
|
||||
/// This command allows filtering the metadata by a specific key and limiting the number of results.
|
||||
/// It prints the filtered metadata to the console.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct InfoCommand {
|
||||
/// The s3 config.
|
||||
#[clap(flatten)]
|
||||
s3_config: S3Config,
|
||||
/// The name of the target snapshot file. we will add the file extension automatically.
|
||||
#[clap(long, default_value = "metadata_snapshot")]
|
||||
file_name: String,
|
||||
/// The query string to filter the metadata.
|
||||
#[clap(long, default_value = "*")]
|
||||
inspect_key: String,
|
||||
/// The limit of the metadata to query.
|
||||
#[clap(long)]
|
||||
limit: Option<usize>,
|
||||
}
|
||||
|
||||
struct MetaInfoTool {
|
||||
inner: ObjectStore,
|
||||
source_file: String,
|
||||
inspect_key: String,
|
||||
limit: Option<usize>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Tool for MetaInfoTool {
|
||||
#[allow(clippy::print_stdout)]
|
||||
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
|
||||
let result = MetadataSnapshotManager::info(
|
||||
&self.inner,
|
||||
&self.source_file,
|
||||
&self.inspect_key,
|
||||
self.limit,
|
||||
)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
for item in result {
|
||||
println!("{}", item);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl InfoCommand {
|
||||
fn decide_object_store_root_for_local_store(
|
||||
file_path: &str,
|
||||
) -> Result<(&str, &str), BoxedError> {
|
||||
let path = Path::new(file_path);
|
||||
let parent = path
|
||||
.parent()
|
||||
.and_then(|p| p.to_str())
|
||||
.context(InvalidFilePathSnafu { msg: file_path })
|
||||
.map_err(BoxedError::new)?;
|
||||
let file_name = path
|
||||
.file_name()
|
||||
.and_then(|f| f.to_str())
|
||||
.context(InvalidFilePathSnafu { msg: file_path })
|
||||
.map_err(BoxedError::new)?;
|
||||
let root = if parent.is_empty() { "." } else { parent };
|
||||
Ok((root, file_name))
|
||||
}
|
||||
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
let object_store = self.s3_config.build("").map_err(BoxedError::new)?;
|
||||
if let Some(store) = object_store {
|
||||
let tool = MetaInfoTool {
|
||||
inner: store,
|
||||
source_file: self.file_name.clone(),
|
||||
inspect_key: self.inspect_key.clone(),
|
||||
limit: self.limit,
|
||||
};
|
||||
Ok(Box::new(tool))
|
||||
} else {
|
||||
let (root, file_name) =
|
||||
Self::decide_object_store_root_for_local_store(&self.file_name)?;
|
||||
let object_store = create_local_file_object_store(root)?;
|
||||
let tool = MetaInfoTool {
|
||||
inner: object_store,
|
||||
source_file: file_name.to_string(),
|
||||
inspect_key: self.inspect_key.clone(),
|
||||
limit: self.limit,
|
||||
};
|
||||
Ok(Box::new(tool))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -162,12 +162,23 @@ impl Client {
|
||||
.as_bytes() as usize
|
||||
}
|
||||
|
||||
pub fn make_flight_client(&self) -> Result<FlightClient> {
|
||||
pub fn make_flight_client(
|
||||
&self,
|
||||
send_compression: bool,
|
||||
accept_compression: bool,
|
||||
) -> Result<FlightClient> {
|
||||
let (addr, channel) = self.find_channel()?;
|
||||
|
||||
let client = FlightServiceClient::new(channel)
|
||||
let mut client = FlightServiceClient::new(channel)
|
||||
.max_decoding_message_size(self.max_grpc_recv_message_size())
|
||||
.max_encoding_message_size(self.max_grpc_send_message_size());
|
||||
// todo(hl): support compression methods.
|
||||
if send_compression {
|
||||
client = client.send_compressed(CompressionEncoding::Zstd);
|
||||
}
|
||||
if accept_compression {
|
||||
client = client.accept_compressed(CompressionEncoding::Zstd);
|
||||
}
|
||||
|
||||
Ok(FlightClient { addr, client })
|
||||
}
|
||||
|
||||
@@ -49,7 +49,16 @@ impl NodeManager for NodeClients {
|
||||
async fn datanode(&self, datanode: &Peer) -> DatanodeRef {
|
||||
let client = self.get_client(datanode).await;
|
||||
|
||||
Arc::new(RegionRequester::new(client))
|
||||
let ChannelConfig {
|
||||
send_compression,
|
||||
accept_compression,
|
||||
..
|
||||
} = self.channel_manager.config();
|
||||
Arc::new(RegionRequester::new(
|
||||
client,
|
||||
*send_compression,
|
||||
*accept_compression,
|
||||
))
|
||||
}
|
||||
|
||||
async fn flownode(&self, flownode: &Peer) -> FlownodeRef {
|
||||
|
||||
@@ -287,7 +287,7 @@ impl Database {
|
||||
let mut request = tonic::Request::new(request);
|
||||
Self::put_hints(request.metadata_mut(), hints)?;
|
||||
|
||||
let mut client = self.client.make_flight_client()?;
|
||||
let mut client = self.client.make_flight_client(false, false)?;
|
||||
|
||||
let response = client.mut_inner().do_get(request).await.or_else(|e| {
|
||||
let tonic_code = e.code();
|
||||
@@ -409,7 +409,7 @@ impl Database {
|
||||
MetadataValue::from_str(db_to_put).context(InvalidTonicMetadataValueSnafu)?,
|
||||
);
|
||||
|
||||
let mut client = self.client.make_flight_client()?;
|
||||
let mut client = self.client.make_flight_client(false, false)?;
|
||||
let response = client.mut_inner().do_put(request).await?;
|
||||
let response = response
|
||||
.into_inner()
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::flow::{FlowRequest, FlowResponse};
|
||||
use api::v1::flow::{DirtyWindowRequest, DirtyWindowRequests, FlowRequest, FlowResponse};
|
||||
use api::v1::region::InsertRequests;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::node_manager::Flownode;
|
||||
@@ -44,6 +44,16 @@ impl Flownode for FlowRequester {
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_meta::error::ExternalSnafu)
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(
|
||||
&self,
|
||||
req: DirtyWindowRequest,
|
||||
) -> common_meta::error::Result<FlowResponse> {
|
||||
self.handle_mark_window_dirty(req)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_meta::error::ExternalSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowRequester {
|
||||
@@ -91,4 +101,20 @@ impl FlowRequester {
|
||||
.into_inner();
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
|
||||
let (addr, mut client) = self.client.raw_flow_client()?;
|
||||
let response = client
|
||||
.handle_mark_dirty_time_window(DirtyWindowRequests {
|
||||
requests: vec![req],
|
||||
})
|
||||
.await
|
||||
.or_else(|e| {
|
||||
let code = e.code();
|
||||
let err: crate::error::Error = e.into();
|
||||
Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code })
|
||||
})?
|
||||
.into_inner();
|
||||
Ok(response)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,6 +46,8 @@ use crate::{metrics, Client, Error};
|
||||
#[derive(Debug)]
|
||||
pub struct RegionRequester {
|
||||
client: Client,
|
||||
send_compression: bool,
|
||||
accept_compression: bool,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -89,12 +91,18 @@ impl Datanode for RegionRequester {
|
||||
}
|
||||
|
||||
impl RegionRequester {
|
||||
pub fn new(client: Client) -> Self {
|
||||
Self { client }
|
||||
pub fn new(client: Client, send_compression: bool, accept_compression: bool) -> Self {
|
||||
Self {
|
||||
client,
|
||||
send_compression,
|
||||
accept_compression,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn do_get_inner(&self, ticket: Ticket) -> Result<SendableRecordBatchStream> {
|
||||
let mut flight_client = self.client.make_flight_client()?;
|
||||
let mut flight_client = self
|
||||
.client
|
||||
.make_flight_client(self.send_compression, self.accept_compression)?;
|
||||
let response = flight_client
|
||||
.mut_inner()
|
||||
.do_get(ticket)
|
||||
|
||||
@@ -146,6 +146,7 @@ mod tests {
|
||||
let output_dir = tempfile::tempdir().unwrap();
|
||||
let cli = cli::Command::parse_from([
|
||||
"cli",
|
||||
"data",
|
||||
"export",
|
||||
"--addr",
|
||||
"127.0.0.1:4000",
|
||||
|
||||
@@ -323,6 +323,7 @@ impl StartCommand {
|
||||
cached_meta_backend.clone(),
|
||||
layered_cache_registry.clone(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
let table_metadata_manager =
|
||||
|
||||
@@ -20,6 +20,7 @@ use async_trait::async_trait;
|
||||
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
|
||||
use catalog::information_extension::DistributedInformationExtension;
|
||||
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
|
||||
use catalog::process_manager::ProcessManager;
|
||||
use clap::Parser;
|
||||
use client::client_manager::NodeClients;
|
||||
use common_base::Plugins;
|
||||
@@ -38,6 +39,7 @@ use frontend::heartbeat::HeartbeatTask;
|
||||
use frontend::instance::builder::FrontendBuilder;
|
||||
use frontend::server::Services;
|
||||
use meta_client::{MetaClientOptions, MetaClientType};
|
||||
use servers::addrs;
|
||||
use servers::export_metrics::ExportMetricsTask;
|
||||
use servers::tls::{TlsMode, TlsOption};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
@@ -342,11 +344,17 @@ impl StartCommand {
|
||||
|
||||
let information_extension =
|
||||
Arc::new(DistributedInformationExtension::new(meta_client.clone()));
|
||||
|
||||
let process_manager = Arc::new(ProcessManager::new(
|
||||
addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
|
||||
Some(meta_client.clone()),
|
||||
));
|
||||
let catalog_manager = KvBackendCatalogManager::new(
|
||||
information_extension,
|
||||
cached_meta_backend.clone(),
|
||||
layered_cache_registry.clone(),
|
||||
None,
|
||||
Some(process_manager.clone()),
|
||||
);
|
||||
|
||||
let executor = HandlerGroupExecutor::new(vec![
|
||||
@@ -364,12 +372,16 @@ impl StartCommand {
|
||||
|
||||
// frontend to datanode need not timeout.
|
||||
// Some queries are expected to take long time.
|
||||
let channel_config = ChannelConfig {
|
||||
let mut channel_config = ChannelConfig {
|
||||
timeout: None,
|
||||
tcp_nodelay: opts.datanode.client.tcp_nodelay,
|
||||
connect_timeout: Some(opts.datanode.client.connect_timeout),
|
||||
..Default::default()
|
||||
};
|
||||
if opts.grpc.flight_compression.transport_compression() {
|
||||
channel_config.accept_compression = true;
|
||||
channel_config.send_compression = true;
|
||||
}
|
||||
let client = NodeClients::new(channel_config);
|
||||
|
||||
let instance = FrontendBuilder::new(
|
||||
@@ -379,6 +391,7 @@ impl StartCommand {
|
||||
catalog_manager,
|
||||
Arc::new(client),
|
||||
meta_client,
|
||||
process_manager,
|
||||
)
|
||||
.with_plugin(plugins.clone())
|
||||
.with_local_cache_invalidator(layered_cache_registry)
|
||||
|
||||
@@ -21,6 +21,7 @@ use async_trait::async_trait;
|
||||
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
|
||||
use catalog::information_schema::InformationExtension;
|
||||
use catalog::kvbackend::KvBackendCatalogManager;
|
||||
use catalog::process_manager::ProcessManager;
|
||||
use clap::Parser;
|
||||
use client::api::v1::meta::RegionRole;
|
||||
use common_base::readable_size::ReadableSize;
|
||||
@@ -526,11 +527,14 @@ impl StartCommand {
|
||||
datanode.region_server(),
|
||||
procedure_manager.clone(),
|
||||
));
|
||||
|
||||
let process_manager = Arc::new(ProcessManager::new(opts.grpc.server_addr.clone(), None));
|
||||
let catalog_manager = KvBackendCatalogManager::new(
|
||||
information_extension.clone(),
|
||||
kv_backend.clone(),
|
||||
layered_cache_registry.clone(),
|
||||
Some(procedure_manager.clone()),
|
||||
Some(process_manager.clone()),
|
||||
);
|
||||
|
||||
let table_metadata_manager =
|
||||
@@ -620,6 +624,7 @@ impl StartCommand {
|
||||
catalog_manager.clone(),
|
||||
node_manager.clone(),
|
||||
ddl_task_executor.clone(),
|
||||
process_manager,
|
||||
)
|
||||
.with_plugin(plugins.clone())
|
||||
.try_build()
|
||||
@@ -647,7 +652,7 @@ impl StartCommand {
|
||||
node_manager,
|
||||
)
|
||||
.await
|
||||
.context(error::StartFlownodeSnafu)?;
|
||||
.context(StartFlownodeSnafu)?;
|
||||
flow_streaming_engine.set_frontend_invoker(invoker).await;
|
||||
|
||||
let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
|
||||
use cmd::options::GreptimeOptions;
|
||||
@@ -58,12 +57,7 @@ fn test_load_datanode_example_config() {
|
||||
metadata_cache_tti: Duration::from_secs(300),
|
||||
}),
|
||||
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
|
||||
dir: Some(
|
||||
Path::new(DEFAULT_DATA_HOME)
|
||||
.join(WAL_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string(),
|
||||
),
|
||||
dir: Some(format!("{}/{}", DEFAULT_DATA_HOME, WAL_DIR)),
|
||||
sync_period: Some(Duration::from_secs(10)),
|
||||
recovery_parallelism: 2,
|
||||
..Default::default()
|
||||
@@ -86,10 +80,7 @@ fn test_load_datanode_example_config() {
|
||||
],
|
||||
logging: LoggingOptions {
|
||||
level: Some("info".to_string()),
|
||||
dir: Path::new(DEFAULT_DATA_HOME)
|
||||
.join(DEFAULT_LOGGING_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string(),
|
||||
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
|
||||
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
|
||||
tracing_sample_ratio: Some(Default::default()),
|
||||
..Default::default()
|
||||
@@ -132,10 +123,7 @@ fn test_load_frontend_example_config() {
|
||||
}),
|
||||
logging: LoggingOptions {
|
||||
level: Some("info".to_string()),
|
||||
dir: Path::new(DEFAULT_DATA_HOME)
|
||||
.join(DEFAULT_LOGGING_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string(),
|
||||
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
|
||||
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
|
||||
tracing_sample_ratio: Some(Default::default()),
|
||||
..Default::default()
|
||||
@@ -182,10 +170,7 @@ fn test_load_metasrv_example_config() {
|
||||
..Default::default()
|
||||
},
|
||||
logging: LoggingOptions {
|
||||
dir: Path::new(DEFAULT_DATA_HOME)
|
||||
.join(DEFAULT_LOGGING_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string(),
|
||||
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
|
||||
level: Some("info".to_string()),
|
||||
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
|
||||
tracing_sample_ratio: Some(Default::default()),
|
||||
@@ -220,12 +205,7 @@ fn test_load_standalone_example_config() {
|
||||
component: StandaloneOptions {
|
||||
default_timezone: Some("UTC".to_string()),
|
||||
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
|
||||
dir: Some(
|
||||
Path::new(DEFAULT_DATA_HOME)
|
||||
.join(WAL_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string(),
|
||||
),
|
||||
dir: Some(format!("{}/{}", DEFAULT_DATA_HOME, WAL_DIR)),
|
||||
sync_period: Some(Duration::from_secs(10)),
|
||||
recovery_parallelism: 2,
|
||||
..Default::default()
|
||||
@@ -248,10 +228,7 @@ fn test_load_standalone_example_config() {
|
||||
},
|
||||
logging: LoggingOptions {
|
||||
level: Some("info".to_string()),
|
||||
dir: Path::new(DEFAULT_DATA_HOME)
|
||||
.join(DEFAULT_LOGGING_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string(),
|
||||
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
|
||||
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
|
||||
tracing_sample_ratio: Some(Default::default()),
|
||||
..Default::default()
|
||||
|
||||
240
src/common/base/src/cancellation.rs
Normal file
240
src/common/base/src/cancellation.rs
Normal file
@@ -0,0 +1,240 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! [CancellationHandle] is used to compose with manual implementation of [futures::future::Future]
|
||||
//! or [futures::stream::Stream] to facilitate cancellation.
|
||||
//! See example in [frontend::stream_wrapper::CancellableStreamWrapper] and [CancellableFuture].
|
||||
|
||||
use std::fmt::{Debug, Display, Formatter};
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use futures::task::AtomicWaker;
|
||||
use pin_project::pin_project;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct CancellationHandle {
|
||||
waker: AtomicWaker,
|
||||
cancelled: AtomicBool,
|
||||
}
|
||||
|
||||
impl Debug for CancellationHandle {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("CancellationHandle")
|
||||
.field("cancelled", &self.is_cancelled())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl CancellationHandle {
|
||||
pub fn waker(&self) -> &AtomicWaker {
|
||||
&self.waker
|
||||
}
|
||||
|
||||
/// Cancels a future or stream.
|
||||
pub fn cancel(&self) {
|
||||
if self
|
||||
.cancelled
|
||||
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
{
|
||||
self.waker.wake();
|
||||
}
|
||||
}
|
||||
|
||||
/// Is this handle cancelled.
|
||||
pub fn is_cancelled(&self) -> bool {
|
||||
self.cancelled.load(Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
#[pin_project]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CancellableFuture<T> {
|
||||
#[pin]
|
||||
fut: T,
|
||||
handle: Arc<CancellationHandle>,
|
||||
}
|
||||
|
||||
impl<T> CancellableFuture<T> {
|
||||
pub fn new(fut: T, handle: Arc<CancellationHandle>) -> Self {
|
||||
Self { fut, handle }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Future for CancellableFuture<T>
|
||||
where
|
||||
T: Future,
|
||||
{
|
||||
type Output = Result<T::Output, Cancelled>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.as_mut().project();
|
||||
// Check if the task has been aborted
|
||||
if this.handle.is_cancelled() {
|
||||
return Poll::Ready(Err(Cancelled));
|
||||
}
|
||||
|
||||
if let Poll::Ready(x) = this.fut.poll(cx) {
|
||||
return Poll::Ready(Ok(x));
|
||||
}
|
||||
|
||||
this.handle.waker().register(cx.waker());
|
||||
if this.handle.is_cancelled() {
|
||||
return Poll::Ready(Err(Cancelled));
|
||||
}
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub struct Cancelled;
|
||||
|
||||
impl Display for Cancelled {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "Future has been cancelled")
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio::time::{sleep, timeout};
|
||||
|
||||
use crate::cancellation::{CancellableFuture, CancellationHandle, Cancelled};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cancellable_future_completes_normally() {
|
||||
let handle = Arc::new(CancellationHandle::default());
|
||||
let future = async { 42 };
|
||||
let cancellable = CancellableFuture::new(future, handle);
|
||||
|
||||
let result = cancellable.await;
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(result.unwrap(), 42);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cancellable_future_cancelled_before_start() {
|
||||
let handle = Arc::new(CancellationHandle::default());
|
||||
handle.cancel();
|
||||
|
||||
let future = async { 42 };
|
||||
let cancellable = CancellableFuture::new(future, handle);
|
||||
|
||||
let result = cancellable.await;
|
||||
assert!(result.is_err());
|
||||
assert!(matches!(result.unwrap_err(), Cancelled));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cancellable_future_cancelled_during_execution() {
|
||||
let handle = Arc::new(CancellationHandle::default());
|
||||
let handle_clone = handle.clone();
|
||||
|
||||
// Create a future that sleeps for a long time
|
||||
let future = async {
|
||||
sleep(Duration::from_secs(10)).await;
|
||||
42
|
||||
};
|
||||
let cancellable = CancellableFuture::new(future, handle);
|
||||
|
||||
// Cancel the future after a short delay
|
||||
tokio::spawn(async move {
|
||||
sleep(Duration::from_millis(50)).await;
|
||||
handle_clone.cancel();
|
||||
});
|
||||
|
||||
let result = cancellable.await;
|
||||
assert!(result.is_err());
|
||||
assert!(matches!(result.unwrap_err(), Cancelled));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cancellable_future_completes_before_cancellation() {
|
||||
let handle = Arc::new(CancellationHandle::default());
|
||||
let handle_clone = handle.clone();
|
||||
|
||||
// Create a future that completes quickly
|
||||
let future = async {
|
||||
sleep(Duration::from_millis(10)).await;
|
||||
42
|
||||
};
|
||||
let cancellable = CancellableFuture::new(future, handle);
|
||||
|
||||
// Try to cancel after the future should have completed
|
||||
tokio::spawn(async move {
|
||||
sleep(Duration::from_millis(100)).await;
|
||||
handle_clone.cancel();
|
||||
});
|
||||
|
||||
let result = cancellable.await;
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(result.unwrap(), 42);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cancellation_handle_is_cancelled() {
|
||||
let handle = CancellationHandle::default();
|
||||
assert!(!handle.is_cancelled());
|
||||
|
||||
handle.cancel();
|
||||
assert!(handle.is_cancelled());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_multiple_cancellable_futures_with_same_handle() {
|
||||
let handle = Arc::new(CancellationHandle::default());
|
||||
|
||||
let future1 = CancellableFuture::new(async { 1 }, handle.clone());
|
||||
let future2 = CancellableFuture::new(async { 2 }, handle.clone());
|
||||
|
||||
// Cancel before starting
|
||||
handle.cancel();
|
||||
|
||||
let (result1, result2) = tokio::join!(future1, future2);
|
||||
|
||||
assert!(result1.is_err());
|
||||
assert!(result2.is_err());
|
||||
assert!(matches!(result1.unwrap_err(), Cancelled));
|
||||
assert!(matches!(result2.unwrap_err(), Cancelled));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cancellable_future_with_timeout() {
|
||||
let handle = Arc::new(CancellationHandle::default());
|
||||
let future = async {
|
||||
sleep(Duration::from_secs(1)).await;
|
||||
42
|
||||
};
|
||||
let cancellable = CancellableFuture::new(future, handle.clone());
|
||||
|
||||
// Use timeout to ensure the test doesn't hang
|
||||
let result = timeout(Duration::from_millis(100), cancellable).await;
|
||||
|
||||
// Should timeout because the future takes 1 second but we timeout after 100ms
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cancelled_display() {
|
||||
let cancelled = Cancelled;
|
||||
assert_eq!(format!("{}", cancelled), "Future has been cancelled");
|
||||
}
|
||||
}
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
pub mod bit_vec;
|
||||
pub mod bytes;
|
||||
pub mod cancellation;
|
||||
pub mod plugins;
|
||||
pub mod range_read;
|
||||
#[allow(clippy::all)]
|
||||
|
||||
@@ -102,6 +102,8 @@ pub const INFORMATION_SCHEMA_FLOW_TABLE_ID: u32 = 33;
|
||||
pub const INFORMATION_SCHEMA_PROCEDURE_INFO_TABLE_ID: u32 = 34;
|
||||
/// id for information_schema.region_statistics
|
||||
pub const INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID: u32 = 35;
|
||||
/// id for information_schema.process_list
|
||||
pub const INFORMATION_SCHEMA_PROCESS_LIST_TABLE_ID: u32 = 36;
|
||||
|
||||
// ----- End of information_schema tables -----
|
||||
|
||||
|
||||
@@ -7,5 +7,13 @@ license.workspace = true
|
||||
[dependencies]
|
||||
async-trait.workspace = true
|
||||
common-error.workspace = true
|
||||
common-grpc.workspace = true
|
||||
common-macro.workspace = true
|
||||
common-meta.workspace = true
|
||||
greptime-proto.workspace = true
|
||||
meta-client.workspace = true
|
||||
snafu.workspace = true
|
||||
tonic.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
tokio.workspace = true
|
||||
|
||||
@@ -27,6 +27,35 @@ pub enum Error {
|
||||
location: Location,
|
||||
source: BoxedError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to list nodes from metasrv"))]
|
||||
Meta {
|
||||
source: Box<meta_client::error::Error>,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to parse process id: {}", s))]
|
||||
ParseProcessId {
|
||||
s: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to invoke frontend service"))]
|
||||
InvokeFrontend {
|
||||
#[snafu(source)]
|
||||
error: tonic::Status,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to invoke list process service"))]
|
||||
CreateChannel {
|
||||
source: common_grpc::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -36,6 +65,10 @@ impl ErrorExt for Error {
|
||||
use Error::*;
|
||||
match self {
|
||||
External { source, .. } => source.status_code(),
|
||||
Meta { source, .. } => source.status_code(),
|
||||
ParseProcessId { .. } => StatusCode::InvalidArguments,
|
||||
InvokeFrontend { .. } => StatusCode::Unexpected,
|
||||
CreateChannel { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,4 +12,41 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::str::FromStr;
|
||||
|
||||
use snafu::OptionExt;
|
||||
|
||||
pub mod error;
|
||||
pub mod selector;
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub struct DisplayProcessId {
|
||||
pub server_addr: String,
|
||||
pub id: u64,
|
||||
}
|
||||
|
||||
impl Display for DisplayProcessId {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}/{}", self.server_addr, self.id)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&str> for DisplayProcessId {
|
||||
type Error = error::Error;
|
||||
|
||||
fn try_from(value: &str) -> Result<Self, Self::Error> {
|
||||
let mut split = value.split('/');
|
||||
let server_addr = split
|
||||
.next()
|
||||
.context(error::ParseProcessIdSnafu { s: value })?
|
||||
.to_string();
|
||||
let id = split
|
||||
.next()
|
||||
.context(error::ParseProcessIdSnafu { s: value })?;
|
||||
let id = u64::from_str(id)
|
||||
.ok()
|
||||
.context(error::ParseProcessIdSnafu { s: value })?;
|
||||
Ok(DisplayProcessId { server_addr, id })
|
||||
}
|
||||
}
|
||||
|
||||
112
src/common/frontend/src/selector.rs
Normal file
112
src/common/frontend/src/selector.rs
Normal file
@@ -0,0 +1,112 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
|
||||
use common_meta::cluster::{ClusterInfo, NodeInfo, Role};
|
||||
use greptime_proto::v1::frontend::{
|
||||
frontend_client, KillProcessRequest, KillProcessResponse, ListProcessRequest,
|
||||
ListProcessResponse,
|
||||
};
|
||||
use meta_client::MetaClientRef;
|
||||
use snafu::ResultExt;
|
||||
use tonic::Response;
|
||||
|
||||
use crate::error;
|
||||
use crate::error::{MetaSnafu, Result};
|
||||
|
||||
pub type FrontendClientPtr = Box<dyn FrontendClient>;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait FrontendClient: Send {
|
||||
async fn list_process(&mut self, req: ListProcessRequest) -> Result<ListProcessResponse>;
|
||||
|
||||
async fn kill_process(&mut self, req: KillProcessRequest) -> Result<KillProcessResponse>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl FrontendClient for frontend_client::FrontendClient<tonic::transport::channel::Channel> {
|
||||
async fn list_process(&mut self, req: ListProcessRequest) -> Result<ListProcessResponse> {
|
||||
frontend_client::FrontendClient::<tonic::transport::channel::Channel>::list_process(
|
||||
self, req,
|
||||
)
|
||||
.await
|
||||
.context(error::InvokeFrontendSnafu)
|
||||
.map(Response::into_inner)
|
||||
}
|
||||
|
||||
async fn kill_process(&mut self, req: KillProcessRequest) -> Result<KillProcessResponse> {
|
||||
frontend_client::FrontendClient::<tonic::transport::channel::Channel>::kill_process(
|
||||
self, req,
|
||||
)
|
||||
.await
|
||||
.context(error::InvokeFrontendSnafu)
|
||||
.map(Response::into_inner)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait FrontendSelector {
|
||||
async fn select<F>(&self, predicate: F) -> Result<Vec<FrontendClientPtr>>
|
||||
where
|
||||
F: Fn(&NodeInfo) -> bool + Send;
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct MetaClientSelector {
|
||||
meta_client: MetaClientRef,
|
||||
channel_manager: ChannelManager,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl FrontendSelector for MetaClientSelector {
|
||||
async fn select<F>(&self, predicate: F) -> Result<Vec<FrontendClientPtr>>
|
||||
where
|
||||
F: Fn(&NodeInfo) -> bool + Send,
|
||||
{
|
||||
let nodes = self
|
||||
.meta_client
|
||||
.list_nodes(Some(Role::Frontend))
|
||||
.await
|
||||
.map_err(Box::new)
|
||||
.context(MetaSnafu)?;
|
||||
|
||||
nodes
|
||||
.into_iter()
|
||||
.filter(predicate)
|
||||
.map(|node| {
|
||||
let channel = self
|
||||
.channel_manager
|
||||
.get(node.peer.addr)
|
||||
.context(error::CreateChannelSnafu)?;
|
||||
let client = frontend_client::FrontendClient::new(channel);
|
||||
Ok(Box::new(client) as FrontendClientPtr)
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()
|
||||
}
|
||||
}
|
||||
|
||||
impl MetaClientSelector {
|
||||
pub fn new(meta_client: MetaClientRef) -> Self {
|
||||
let cfg = ChannelConfig::new()
|
||||
.connect_timeout(Duration::from_secs(30))
|
||||
.timeout(Duration::from_secs(30));
|
||||
let channel_manager = ChannelManager::with_config(cfg);
|
||||
Self {
|
||||
meta_client,
|
||||
channel_manager,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -23,7 +23,8 @@ use std::sync::Arc;
|
||||
|
||||
use build::BuildFunction;
|
||||
use database::{
|
||||
CurrentSchemaFunction, DatabaseFunction, ReadPreferenceFunction, SessionUserFunction,
|
||||
ConnectionIdFunction, CurrentSchemaFunction, DatabaseFunction, PgBackendPidFunction,
|
||||
ReadPreferenceFunction, SessionUserFunction,
|
||||
};
|
||||
use pg_catalog::PGCatalogFunction;
|
||||
use procedure_state::ProcedureStateFunction;
|
||||
@@ -42,6 +43,8 @@ impl SystemFunction {
|
||||
registry.register_scalar(DatabaseFunction);
|
||||
registry.register_scalar(SessionUserFunction);
|
||||
registry.register_scalar(ReadPreferenceFunction);
|
||||
registry.register_scalar(PgBackendPidFunction);
|
||||
registry.register_scalar(ConnectionIdFunction);
|
||||
registry.register_scalar(TimezoneFunction);
|
||||
registry.register_async(Arc::new(ProcedureStateFunction));
|
||||
PGCatalogFunction::register(registry);
|
||||
|
||||
@@ -18,7 +18,8 @@ use std::sync::Arc;
|
||||
use common_query::error::Result;
|
||||
use common_query::prelude::{Signature, Volatility};
|
||||
use datatypes::prelude::{ConcreteDataType, ScalarVector};
|
||||
use datatypes::vectors::{StringVector, VectorRef};
|
||||
use datatypes::vectors::{StringVector, UInt64Vector, VectorRef};
|
||||
use derive_more::Display;
|
||||
|
||||
use crate::function::{Function, FunctionContext};
|
||||
|
||||
@@ -32,10 +33,20 @@ pub struct SessionUserFunction;
|
||||
|
||||
pub struct ReadPreferenceFunction;
|
||||
|
||||
#[derive(Display)]
|
||||
#[display("{}", self.name())]
|
||||
pub struct PgBackendPidFunction;
|
||||
|
||||
#[derive(Display)]
|
||||
#[display("{}", self.name())]
|
||||
pub struct ConnectionIdFunction;
|
||||
|
||||
const DATABASE_FUNCTION_NAME: &str = "database";
|
||||
const CURRENT_SCHEMA_FUNCTION_NAME: &str = "current_schema";
|
||||
const SESSION_USER_FUNCTION_NAME: &str = "session_user";
|
||||
const READ_PREFERENCE_FUNCTION_NAME: &str = "read_preference";
|
||||
const PG_BACKEND_PID: &str = "pg_backend_pid";
|
||||
const CONNECTION_ID: &str = "connection_id";
|
||||
|
||||
impl Function for DatabaseFunction {
|
||||
fn name(&self) -> &str {
|
||||
@@ -117,6 +128,46 @@ impl Function for ReadPreferenceFunction {
|
||||
}
|
||||
}
|
||||
|
||||
impl Function for PgBackendPidFunction {
|
||||
fn name(&self) -> &str {
|
||||
PG_BACKEND_PID
|
||||
}
|
||||
|
||||
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
|
||||
Ok(ConcreteDataType::uint64_datatype())
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
Signature::nullary(Volatility::Immutable)
|
||||
}
|
||||
|
||||
fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
|
||||
let pid = func_ctx.query_ctx.process_id();
|
||||
|
||||
Ok(Arc::new(UInt64Vector::from_slice([pid])) as _)
|
||||
}
|
||||
}
|
||||
|
||||
impl Function for ConnectionIdFunction {
|
||||
fn name(&self) -> &str {
|
||||
CONNECTION_ID
|
||||
}
|
||||
|
||||
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
|
||||
Ok(ConcreteDataType::uint64_datatype())
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
Signature::nullary(Volatility::Immutable)
|
||||
}
|
||||
|
||||
fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
|
||||
let pid = func_ctx.query_ctx.process_id();
|
||||
|
||||
Ok(Arc::new(UInt64Vector::from_slice([pid])) as _)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for DatabaseFunction {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "DATABASE")
|
||||
|
||||
@@ -296,6 +296,8 @@ pub struct ChannelConfig {
|
||||
pub max_recv_message_size: ReadableSize,
|
||||
// Max gRPC sending(encoding) message size
|
||||
pub max_send_message_size: ReadableSize,
|
||||
pub send_compression: bool,
|
||||
pub accept_compression: bool,
|
||||
}
|
||||
|
||||
impl Default for ChannelConfig {
|
||||
@@ -316,6 +318,8 @@ impl Default for ChannelConfig {
|
||||
client_tls: None,
|
||||
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
|
||||
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
|
||||
send_compression: false,
|
||||
accept_compression: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -566,6 +570,8 @@ mod tests {
|
||||
client_tls: None,
|
||||
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
|
||||
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
|
||||
send_compression: false,
|
||||
accept_compression: false,
|
||||
},
|
||||
default_cfg
|
||||
);
|
||||
@@ -610,6 +616,8 @@ mod tests {
|
||||
}),
|
||||
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
|
||||
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
|
||||
send_compression: false,
|
||||
accept_compression: false,
|
||||
},
|
||||
cfg
|
||||
);
|
||||
|
||||
@@ -64,6 +64,7 @@ impl Default for FlightEncoder {
|
||||
}
|
||||
|
||||
impl FlightEncoder {
|
||||
/// Creates new [FlightEncoder] with compression disabled.
|
||||
pub fn with_compression_disabled() -> Self {
|
||||
let write_options = writer::IpcWriteOptions::default()
|
||||
.try_with_compression(None)
|
||||
|
||||
@@ -1001,7 +1001,7 @@ impl ErrorExt for Error {
|
||||
}
|
||||
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
|
||||
RdsTransactionRetryFailed { .. } => StatusCode::Internal,
|
||||
Error::DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
|
||||
DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -70,11 +70,12 @@ impl MetadataKey<'_, ViewInfoKey> for ViewInfoKey {
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
let captures = VIEW_INFO_KEY_PATTERN
|
||||
.captures(key)
|
||||
.context(InvalidViewInfoSnafu {
|
||||
err_msg: format!("Invalid ViewInfoKey '{key}'"),
|
||||
})?;
|
||||
let captures =
|
||||
VIEW_INFO_KEY_PATTERN
|
||||
.captures(key)
|
||||
.with_context(|| InvalidViewInfoSnafu {
|
||||
err_msg: format!("Invalid ViewInfoKey '{key}'"),
|
||||
})?;
|
||||
// Safety: pass the regex check above
|
||||
let view_id = captures[1].parse::<TableId>().unwrap();
|
||||
Ok(ViewInfoKey { view_id })
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::region::RegionResponse;
|
||||
use api::v1::flow::{FlowRequest, FlowResponse};
|
||||
use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse};
|
||||
use api::v1::region::{InsertRequests, RegionRequest};
|
||||
pub use common_base::AffectedRows;
|
||||
use common_query::request::QueryRequest;
|
||||
@@ -42,6 +42,9 @@ pub trait Flownode: Send + Sync {
|
||||
async fn handle(&self, request: FlowRequest) -> Result<FlowResponse>;
|
||||
|
||||
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse>;
|
||||
|
||||
/// Handles requests to mark time window as dirty.
|
||||
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse>;
|
||||
}
|
||||
|
||||
pub type FlownodeRef = Arc<dyn Flownode>;
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
pub mod file;
|
||||
|
||||
use std::borrow::Cow;
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::Instant;
|
||||
@@ -271,6 +272,49 @@ impl MetadataSnapshotManager {
|
||||
|
||||
Ok((filename.to_string(), num_keyvalues as u64))
|
||||
}
|
||||
|
||||
fn format_output(key: Cow<'_, str>, value: Cow<'_, str>) -> String {
|
||||
format!("{} => {}", key, value)
|
||||
}
|
||||
|
||||
pub async fn info(
|
||||
object_store: &ObjectStore,
|
||||
file_path: &str,
|
||||
query_str: &str,
|
||||
limit: Option<usize>,
|
||||
) -> Result<Vec<String>> {
|
||||
let path = Path::new(file_path);
|
||||
|
||||
let file_name = path
|
||||
.file_name()
|
||||
.and_then(|s| s.to_str())
|
||||
.context(InvalidFilePathSnafu { file_path })?;
|
||||
|
||||
let filename = FileName::try_from(file_name)?;
|
||||
let data = object_store
|
||||
.read(file_path)
|
||||
.await
|
||||
.context(ReadObjectSnafu { file_path })?;
|
||||
let document = Document::from_slice(&filename.extension.format, &data.to_bytes())?;
|
||||
let metadata_content = document.into_metadata_content()?.values();
|
||||
let mut results = Vec::with_capacity(limit.unwrap_or(256));
|
||||
for kv in metadata_content {
|
||||
let key_str = String::from_utf8_lossy(&kv.key);
|
||||
if let Some(prefix) = query_str.strip_suffix('*') {
|
||||
if key_str.starts_with(prefix) {
|
||||
let value_str = String::from_utf8_lossy(&kv.value);
|
||||
results.push(Self::format_output(key_str, value_str));
|
||||
}
|
||||
} else if key_str == query_str {
|
||||
let value_str = String::from_utf8_lossy(&kv.value);
|
||||
results.push(Self::format_output(key_str, value_str));
|
||||
}
|
||||
if results.len() == limit.unwrap_or(usize::MAX) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(results)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -111,6 +111,11 @@ impl MetadataContent {
|
||||
pub fn into_iter(self) -> impl Iterator<Item = KeyValue> {
|
||||
self.values.into_iter()
|
||||
}
|
||||
|
||||
/// Returns the key-value pairs as a vector.
|
||||
pub fn values(self) -> Vec<KeyValue> {
|
||||
self.values
|
||||
}
|
||||
}
|
||||
|
||||
/// The key-value pair of the backup file.
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::region::RegionResponse;
|
||||
use api::v1::flow::{FlowRequest, FlowResponse};
|
||||
use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse};
|
||||
use api::v1::region::{InsertRequests, RegionRequest};
|
||||
pub use common_base::AffectedRows;
|
||||
use common_query::request::QueryRequest;
|
||||
@@ -67,6 +67,14 @@ pub trait MockFlownodeHandler: Sync + Send + Clone {
|
||||
) -> Result<FlowResponse> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(
|
||||
&self,
|
||||
_peer: &Peer,
|
||||
_req: DirtyWindowRequest,
|
||||
) -> Result<FlowResponse> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
/// A mock struct implements [NodeManager] only implement the `datanode` method.
|
||||
@@ -134,6 +142,10 @@ impl<T: MockFlownodeHandler> Flownode for MockNode<T> {
|
||||
async fn handle_inserts(&self, requests: InsertRequests) -> Result<FlowResponse> {
|
||||
self.handler.handle_inserts(&self.peer, requests).await
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
|
||||
self.handler.handle_mark_window_dirty(&self.peer, req).await
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
|
||||
@@ -173,6 +173,7 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Stream timeout"))]
|
||||
StreamTimeout {
|
||||
#[snafu(implicit)]
|
||||
@@ -180,6 +181,7 @@ pub enum Error {
|
||||
#[snafu(source)]
|
||||
error: tokio::time::error::Elapsed,
|
||||
},
|
||||
|
||||
#[snafu(display("RecordBatch slice index overflow: {visit_index} > {size}"))]
|
||||
RecordBatchSliceIndexOverflow {
|
||||
#[snafu(implicit)]
|
||||
@@ -187,6 +189,12 @@ pub enum Error {
|
||||
size: usize,
|
||||
visit_index: usize,
|
||||
},
|
||||
|
||||
#[snafu(display("Stream has been cancelled"))]
|
||||
StreamCancelled {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
impl ErrorExt for Error {
|
||||
@@ -221,6 +229,8 @@ impl ErrorExt for Error {
|
||||
}
|
||||
|
||||
Error::StreamTimeout { .. } => StatusCode::Cancelled,
|
||||
|
||||
Error::StreamCancelled { .. } => StatusCode::Cancelled,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -372,6 +372,7 @@ impl DatanodeBuilder {
|
||||
opts.max_concurrent_queries,
|
||||
//TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter.
|
||||
Duration::from_millis(100),
|
||||
opts.grpc.flight_compression,
|
||||
);
|
||||
|
||||
let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
|
||||
|
||||
@@ -50,6 +50,7 @@ use query::QueryEngineRef;
|
||||
use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
|
||||
use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
|
||||
use servers::grpc::region_server::RegionServerHandler;
|
||||
use servers::grpc::FlightCompression;
|
||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::metric_engine_consts::{
|
||||
@@ -80,6 +81,7 @@ use crate::event_listener::RegionServerEventListenerRef;
|
||||
#[derive(Clone)]
|
||||
pub struct RegionServer {
|
||||
inner: Arc<RegionServerInner>,
|
||||
flight_compression: FlightCompression,
|
||||
}
|
||||
|
||||
pub struct RegionStat {
|
||||
@@ -93,6 +95,7 @@ impl RegionServer {
|
||||
query_engine: QueryEngineRef,
|
||||
runtime: Runtime,
|
||||
event_listener: RegionServerEventListenerRef,
|
||||
flight_compression: FlightCompression,
|
||||
) -> Self {
|
||||
Self::with_table_provider(
|
||||
query_engine,
|
||||
@@ -101,6 +104,7 @@ impl RegionServer {
|
||||
Arc::new(DummyTableProviderFactory),
|
||||
0,
|
||||
Duration::from_millis(0),
|
||||
flight_compression,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -111,6 +115,7 @@ impl RegionServer {
|
||||
table_provider_factory: TableProviderFactoryRef,
|
||||
max_concurrent_queries: usize,
|
||||
concurrent_query_limiter_timeout: Duration,
|
||||
flight_compression: FlightCompression,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(RegionServerInner::new(
|
||||
@@ -123,6 +128,7 @@ impl RegionServer {
|
||||
concurrent_query_limiter_timeout,
|
||||
),
|
||||
)),
|
||||
flight_compression,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -536,7 +542,11 @@ impl FlightCraft for RegionServer {
|
||||
.trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
|
||||
.await?;
|
||||
|
||||
let stream = Box::pin(FlightRecordBatchStream::new(result, tracing_context));
|
||||
let stream = Box::pin(FlightRecordBatchStream::new(
|
||||
result,
|
||||
tracing_context,
|
||||
self.flight_compression,
|
||||
));
|
||||
Ok(Response::new(stream))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@ use query::dataframe::DataFrame;
|
||||
use query::planner::LogicalPlanner;
|
||||
use query::query_engine::{DescribeResult, QueryEngineState};
|
||||
use query::{QueryEngine, QueryEngineContext};
|
||||
use servers::grpc::FlightCompression;
|
||||
use session::context::QueryContextRef;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::region_engine::{
|
||||
@@ -97,6 +98,7 @@ pub fn mock_region_server() -> RegionServer {
|
||||
Arc::new(MockQueryEngine),
|
||||
Runtime::builder().build().unwrap(),
|
||||
Box::new(NoopRegionServerEventListener),
|
||||
FlightCompression::default(),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -326,6 +326,14 @@ impl Value {
|
||||
}
|
||||
}
|
||||
|
||||
/// Cast Value to [Duration]. Return None if value is not a valid duration data type.
|
||||
pub fn as_duration(&self) -> Option<Duration> {
|
||||
match self {
|
||||
Value::Duration(d) => Some(*d),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the logical type of the value.
|
||||
pub fn logical_type_id(&self) -> LogicalTypeId {
|
||||
match self {
|
||||
|
||||
@@ -316,7 +316,7 @@ impl StreamingEngine {
|
||||
);
|
||||
|
||||
METRIC_FLOW_ROWS
|
||||
.with_label_values(&["out"])
|
||||
.with_label_values(&["out-streaming"])
|
||||
.inc_by(total_rows as u64);
|
||||
|
||||
let now = self.tick_manager.tick();
|
||||
|
||||
@@ -31,6 +31,7 @@ use common_runtime::JoinHandle;
|
||||
use common_telemetry::{error, info, trace, warn};
|
||||
use datatypes::value::Value;
|
||||
use futures::TryStreamExt;
|
||||
use greptime_proto::v1::flow::DirtyWindowRequest;
|
||||
use itertools::Itertools;
|
||||
use session::context::QueryContextBuilder;
|
||||
use snafu::{ensure, IntoError, OptionExt, ResultExt};
|
||||
@@ -46,7 +47,7 @@ use crate::error::{
|
||||
IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
|
||||
NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::metrics::METRIC_FLOW_TASK_COUNT;
|
||||
use crate::metrics::{METRIC_FLOW_ROWS, METRIC_FLOW_TASK_COUNT};
|
||||
use crate::repr::{self, DiffRow};
|
||||
use crate::{Error, FlowId};
|
||||
|
||||
@@ -689,6 +690,9 @@ impl FlowEngine for FlowDualEngine {
|
||||
let mut to_stream_engine = Vec::with_capacity(request.requests.len());
|
||||
let mut to_batch_engine = request.requests;
|
||||
|
||||
let mut batching_row_cnt = 0;
|
||||
let mut streaming_row_cnt = 0;
|
||||
|
||||
{
|
||||
// not locking this, or recover flows will be starved when also handling flow inserts
|
||||
let src_table2flow = self.src_table2flow.read().await;
|
||||
@@ -698,9 +702,11 @@ impl FlowEngine for FlowDualEngine {
|
||||
let is_in_stream = src_table2flow.in_stream(table_id);
|
||||
let is_in_batch = src_table2flow.in_batch(table_id);
|
||||
if is_in_stream {
|
||||
streaming_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
|
||||
to_stream_engine.push(req.clone());
|
||||
}
|
||||
if is_in_batch {
|
||||
batching_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
|
||||
return true;
|
||||
}
|
||||
if !is_in_batch && !is_in_stream {
|
||||
@@ -713,6 +719,14 @@ impl FlowEngine for FlowDualEngine {
|
||||
// can't use drop due to https://github.com/rust-lang/rust/pull/128846
|
||||
}
|
||||
|
||||
METRIC_FLOW_ROWS
|
||||
.with_label_values(&["in-streaming"])
|
||||
.inc_by(streaming_row_cnt as u64);
|
||||
|
||||
METRIC_FLOW_ROWS
|
||||
.with_label_values(&["in-batching"])
|
||||
.inc_by(batching_row_cnt as u64);
|
||||
|
||||
let streaming_engine = self.streaming_engine.clone();
|
||||
let stream_handler: JoinHandle<Result<(), Error>> =
|
||||
common_runtime::spawn_global(async move {
|
||||
@@ -819,6 +833,10 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
|
||||
.map(|_| Default::default())
|
||||
.map_err(to_meta_err(snafu::location!()))
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
/// return a function to convert `crate::error::Error` to `common_meta::error::Error`
|
||||
@@ -926,6 +944,10 @@ impl common_meta::node_manager::Flownode for StreamingEngine {
|
||||
.map(|_| Default::default())
|
||||
.map_err(to_meta_err(snafu::location!()))
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowEngine for StreamingEngine {
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::flow::{DirtyWindowRequests, FlowResponse};
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::ddl::create_flow::FlowType;
|
||||
@@ -29,8 +30,7 @@ use common_telemetry::{debug, info};
|
||||
use common_time::TimeToLive;
|
||||
use query::QueryEngineRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
use table::metadata::TableId;
|
||||
use store_api::storage::{RegionId, TableId};
|
||||
use tokio::sync::{oneshot, RwLock};
|
||||
|
||||
use crate::batching_mode::frontend_client::FrontendClient;
|
||||
@@ -42,6 +42,7 @@ use crate::error::{
|
||||
ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
|
||||
UnexpectedSnafu, UnsupportedSnafu,
|
||||
};
|
||||
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW;
|
||||
use crate::{CreateFlowArgs, Error, FlowId, TableName};
|
||||
|
||||
/// Batching mode Engine, responsible for driving all the batching mode tasks
|
||||
@@ -77,6 +78,116 @@ impl BatchingEngine {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn handle_mark_dirty_time_window(
|
||||
&self,
|
||||
reqs: DirtyWindowRequests,
|
||||
) -> Result<FlowResponse, Error> {
|
||||
let table_info_mgr = self.table_meta.table_info_manager();
|
||||
|
||||
let mut group_by_table_id: HashMap<u32, Vec<_>> = HashMap::new();
|
||||
for r in reqs.requests {
|
||||
let tid = TableId::from(r.table_id);
|
||||
let entry = group_by_table_id.entry(tid).or_default();
|
||||
entry.extend(r.timestamps);
|
||||
}
|
||||
let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
|
||||
let table_infos =
|
||||
table_info_mgr
|
||||
.batch_get(&tids)
|
||||
.await
|
||||
.with_context(|_| TableNotFoundMetaSnafu {
|
||||
msg: format!("Failed to get table info for table ids: {:?}", tids),
|
||||
})?;
|
||||
|
||||
let group_by_table_name = group_by_table_id
|
||||
.into_iter()
|
||||
.filter_map(|(id, timestamps)| {
|
||||
let table_name = table_infos.get(&id).map(|info| info.table_name());
|
||||
let Some(table_name) = table_name else {
|
||||
warn!("Failed to get table infos for table id: {:?}", id);
|
||||
return None;
|
||||
};
|
||||
let table_name = [
|
||||
table_name.catalog_name,
|
||||
table_name.schema_name,
|
||||
table_name.table_name,
|
||||
];
|
||||
let schema = &table_infos.get(&id).unwrap().table_info.meta.schema;
|
||||
let time_index_unit = schema.column_schemas[schema.timestamp_index.unwrap()]
|
||||
.data_type
|
||||
.as_timestamp()
|
||||
.unwrap()
|
||||
.unit();
|
||||
Some((table_name, (timestamps, time_index_unit)))
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
let group_by_table_name = Arc::new(group_by_table_name);
|
||||
|
||||
let mut handles = Vec::new();
|
||||
let tasks = self.tasks.read().await;
|
||||
|
||||
for (_flow_id, task) in tasks.iter() {
|
||||
let src_table_names = &task.config.source_table_names;
|
||||
|
||||
if src_table_names
|
||||
.iter()
|
||||
.all(|name| !group_by_table_name.contains_key(name))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
let group_by_table_name = group_by_table_name.clone();
|
||||
let task = task.clone();
|
||||
|
||||
let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
|
||||
let src_table_names = &task.config.source_table_names;
|
||||
let mut all_dirty_windows = vec![];
|
||||
for src_table_name in src_table_names {
|
||||
if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) {
|
||||
let Some(expr) = &task.config.time_window_expr else {
|
||||
continue;
|
||||
};
|
||||
for timestamp in timestamps {
|
||||
let align_start = expr
|
||||
.eval(common_time::Timestamp::new(*timestamp, *unit))?
|
||||
.0
|
||||
.context(UnexpectedSnafu {
|
||||
reason: "Failed to eval start value",
|
||||
})?;
|
||||
all_dirty_windows.push(align_start);
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut state = task.state.write().unwrap();
|
||||
let flow_id_label = task.config.flow_id.to_string();
|
||||
for timestamp in all_dirty_windows {
|
||||
state.dirty_time_windows.add_window(timestamp, None);
|
||||
}
|
||||
|
||||
METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW
|
||||
.with_label_values(&[&flow_id_label])
|
||||
.set(state.dirty_time_windows.len() as f64);
|
||||
Ok(())
|
||||
});
|
||||
handles.push(handle);
|
||||
}
|
||||
drop(tasks);
|
||||
for handle in handles {
|
||||
match handle.await {
|
||||
Err(e) => {
|
||||
warn!("Failed to handle inserts: {e}");
|
||||
}
|
||||
Ok(Ok(())) => (),
|
||||
Ok(Err(e)) => {
|
||||
warn!("Failed to handle inserts: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Default::default())
|
||||
}
|
||||
|
||||
pub async fn handle_inserts_inner(
|
||||
&self,
|
||||
request: api::v1::region::InsertRequests,
|
||||
|
||||
@@ -156,6 +156,11 @@ impl DirtyTimeWindows {
|
||||
self.windows.clear();
|
||||
}
|
||||
|
||||
/// Number of dirty windows.
|
||||
pub fn len(&self) -> usize {
|
||||
self.windows.len()
|
||||
}
|
||||
|
||||
/// Generate all filter expressions consuming all time windows
|
||||
///
|
||||
/// there is two limits:
|
||||
|
||||
@@ -61,7 +61,9 @@ use crate::error::{
|
||||
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::metrics::{
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
|
||||
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
|
||||
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
|
||||
METRIC_FLOW_ROWS,
|
||||
};
|
||||
use crate::{Error, FlowId};
|
||||
|
||||
@@ -371,6 +373,9 @@ impl BatchingTask {
|
||||
"Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
|
||||
elapsed
|
||||
);
|
||||
METRIC_FLOW_ROWS
|
||||
.with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
|
||||
.inc_by(*affected_rows as _);
|
||||
} else if let Err(err) = &res {
|
||||
warn!(
|
||||
"Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
|
||||
@@ -410,6 +415,7 @@ impl BatchingTask {
|
||||
engine: QueryEngineRef,
|
||||
frontend_client: Arc<FrontendClient>,
|
||||
) {
|
||||
let flow_id_str = self.config.flow_id.to_string();
|
||||
loop {
|
||||
// first check if shutdown signal is received
|
||||
// if so, break the loop
|
||||
@@ -427,6 +433,9 @@ impl BatchingTask {
|
||||
Err(TryRecvError::Empty) => (),
|
||||
}
|
||||
}
|
||||
METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
|
||||
.with_label_values(&[&flow_id_str])
|
||||
.inc();
|
||||
|
||||
let new_query = match self.gen_insert_plan(&engine).await {
|
||||
Ok(new_query) => new_query,
|
||||
@@ -473,6 +482,9 @@ impl BatchingTask {
|
||||
}
|
||||
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
|
||||
Err(err) => {
|
||||
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
|
||||
.with_label_values(&[&flow_id_str])
|
||||
.inc();
|
||||
match new_query {
|
||||
Some(query) => {
|
||||
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)
|
||||
|
||||
@@ -58,11 +58,32 @@ lazy_static! {
|
||||
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW: GaugeVec =
|
||||
register_gauge_vec!(
|
||||
"greptime_flow_batching_engine_bulk_mark_time_window",
|
||||
"flow batching engine query time window count marked by bulk inserts",
|
||||
&["flow_id"],
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT: IntCounterVec =
|
||||
register_int_counter_vec!(
|
||||
"greptime_flow_batching_start_query_count",
|
||||
"flow batching engine started query count",
|
||||
&["flow_id"],
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT: IntCounterVec =
|
||||
register_int_counter_vec!(
|
||||
"greptime_flow_batching_error_count",
|
||||
"flow batching engine error count per flow id",
|
||||
&["flow_id"],
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
|
||||
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
|
||||
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(
|
||||
"greptime_flow_processed_rows",
|
||||
"Count of rows flowing through the system",
|
||||
"Count of rows flowing through the system.",
|
||||
&["direction"]
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::flow::DirtyWindowRequests;
|
||||
use api::v1::{RowDeleteRequests, RowInsertRequests};
|
||||
use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
|
||||
use catalog::CatalogManagerRef;
|
||||
@@ -136,6 +137,18 @@ impl flow_server::Flow for FlowService {
|
||||
.map(Response::new)
|
||||
.map_err(to_status_with_last_err)
|
||||
}
|
||||
|
||||
async fn handle_mark_dirty_time_window(
|
||||
&self,
|
||||
reqs: Request<DirtyWindowRequests>,
|
||||
) -> Result<Response<FlowResponse>, Status> {
|
||||
self.dual_engine
|
||||
.batching_engine()
|
||||
.handle_mark_dirty_time_window(reqs.into_inner())
|
||||
.await
|
||||
.map(Response::new)
|
||||
.map_err(to_status_with_last_err)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -578,6 +591,7 @@ impl FrontendInvoker {
|
||||
layered_cache_registry.clone(),
|
||||
inserter.clone(),
|
||||
table_route_cache,
|
||||
None,
|
||||
));
|
||||
|
||||
let invoker = FrontendInvoker::new(inserter, deleter, statement_executor);
|
||||
|
||||
@@ -25,6 +25,7 @@ common-catalog.workspace = true
|
||||
common-config.workspace = true
|
||||
common-datasource.workspace = true
|
||||
common-error.workspace = true
|
||||
common-frontend.workspace = true
|
||||
common-function.workspace = true
|
||||
common-grpc.workspace = true
|
||||
common-macro.workspace = true
|
||||
@@ -69,6 +70,7 @@ store-api.workspace = true
|
||||
substrait.workspace = true
|
||||
table.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-util.workspace = true
|
||||
toml.workspace = true
|
||||
tonic.workspace = true
|
||||
|
||||
|
||||
@@ -357,6 +357,12 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Query has been cancelled"))]
|
||||
Cancelled {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -435,6 +441,8 @@ impl ErrorExt for Error {
|
||||
Error::InFlightWriteBytesExceeded { .. } => StatusCode::RateLimited,
|
||||
|
||||
Error::DataFusion { error, .. } => datafusion_status_code::<Self>(error, None),
|
||||
|
||||
Error::Cancelled { .. } => StatusCode::Cancelled,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -30,8 +30,10 @@ use std::time::SystemTime;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
|
||||
use catalog::process_manager::ProcessManagerRef;
|
||||
use catalog::CatalogManagerRef;
|
||||
use client::OutputData;
|
||||
use common_base::cancellation::CancellableFuture;
|
||||
use common_base::Plugins;
|
||||
use common_config::KvBackendConfig;
|
||||
use common_error::ext::{BoxedError, ErrorExt};
|
||||
@@ -80,6 +82,7 @@ use crate::error::{
|
||||
};
|
||||
use crate::limiter::LimiterRef;
|
||||
use crate::slow_query_recorder::SlowQueryRecorder;
|
||||
use crate::stream_wrapper::CancellableStreamWrapper;
|
||||
|
||||
/// The frontend instance contains necessary components, and implements many
|
||||
/// traits, like [`servers::query_handler::grpc::GrpcQueryHandler`],
|
||||
@@ -96,6 +99,7 @@ pub struct Instance {
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
slow_query_recorder: Option<SlowQueryRecorder>,
|
||||
limiter: Option<LimiterRef>,
|
||||
process_manager: ProcessManagerRef,
|
||||
}
|
||||
|
||||
impl Instance {
|
||||
@@ -153,6 +157,10 @@ impl Instance {
|
||||
pub fn inserter(&self) -> &InserterRef {
|
||||
&self.inserter
|
||||
}
|
||||
|
||||
pub fn process_manager(&self) -> &ProcessManagerRef {
|
||||
&self.process_manager
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
|
||||
@@ -172,50 +180,79 @@ impl Instance {
|
||||
None
|
||||
};
|
||||
|
||||
let output = match stmt {
|
||||
Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
|
||||
// TODO: remove this when format is supported in datafusion
|
||||
if let Statement::Explain(explain) = &stmt {
|
||||
if let Some(format) = explain.format() {
|
||||
query_ctx.set_explain_format(format.to_string());
|
||||
let ticket = self.process_manager.register_query(
|
||||
query_ctx.current_catalog().to_string(),
|
||||
vec![query_ctx.current_schema()],
|
||||
stmt.to_string(),
|
||||
query_ctx.conn_info().to_string(),
|
||||
None,
|
||||
);
|
||||
|
||||
let query_fut = async {
|
||||
match stmt {
|
||||
Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
|
||||
// TODO: remove this when format is supported in datafusion
|
||||
if let Statement::Explain(explain) = &stmt {
|
||||
if let Some(format) = explain.format() {
|
||||
query_ctx.set_explain_format(format.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
let stmt = QueryStatement::Sql(stmt);
|
||||
let plan = self
|
||||
.statement_executor
|
||||
.plan(&stmt, query_ctx.clone())
|
||||
.await?;
|
||||
|
||||
let QueryStatement::Sql(stmt) = stmt else {
|
||||
unreachable!()
|
||||
};
|
||||
query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
|
||||
self.statement_executor
|
||||
.exec_plan(plan, query_ctx)
|
||||
.await
|
||||
.context(TableOperationSnafu)
|
||||
}
|
||||
Statement::Tql(tql) => {
|
||||
let plan = self
|
||||
.statement_executor
|
||||
.plan_tql(tql.clone(), &query_ctx)
|
||||
.await?;
|
||||
|
||||
let stmt = QueryStatement::Sql(stmt);
|
||||
let plan = self
|
||||
.statement_executor
|
||||
.plan(&stmt, query_ctx.clone())
|
||||
.await?;
|
||||
|
||||
let QueryStatement::Sql(stmt) = stmt else {
|
||||
unreachable!()
|
||||
};
|
||||
query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
|
||||
|
||||
self.statement_executor.exec_plan(plan, query_ctx).await
|
||||
}
|
||||
Statement::Tql(tql) => {
|
||||
let plan = self
|
||||
.statement_executor
|
||||
.plan_tql(tql.clone(), &query_ctx)
|
||||
.await?;
|
||||
|
||||
query_interceptor.pre_execute(
|
||||
&Statement::Tql(tql),
|
||||
Some(&plan),
|
||||
query_ctx.clone(),
|
||||
)?;
|
||||
|
||||
self.statement_executor.exec_plan(plan, query_ctx).await
|
||||
}
|
||||
_ => {
|
||||
query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
|
||||
|
||||
self.statement_executor.execute_sql(stmt, query_ctx).await
|
||||
query_interceptor.pre_execute(
|
||||
&Statement::Tql(tql),
|
||||
Some(&plan),
|
||||
query_ctx.clone(),
|
||||
)?;
|
||||
self.statement_executor
|
||||
.exec_plan(plan, query_ctx)
|
||||
.await
|
||||
.context(TableOperationSnafu)
|
||||
}
|
||||
_ => {
|
||||
query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
|
||||
self.statement_executor
|
||||
.execute_sql(stmt, query_ctx)
|
||||
.await
|
||||
.context(TableOperationSnafu)
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
output.context(TableOperationSnafu)
|
||||
CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
|
||||
.await
|
||||
.map_err(|_| error::CancelledSnafu.build())?
|
||||
.map(|output| {
|
||||
let Output { meta, data } = output;
|
||||
|
||||
let data = match data {
|
||||
OutputData::Stream(stream) => {
|
||||
OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
|
||||
}
|
||||
other => other,
|
||||
};
|
||||
Output { data, meta }
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -577,6 +614,8 @@ pub fn check_permission(
|
||||
}
|
||||
// cursor operations are always allowed once it's created
|
||||
Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
|
||||
// User can only kill process in their own catalog.
|
||||
Statement::Kill(_) => {}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
|
||||
use catalog::process_manager::ProcessManagerRef;
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_base::Plugins;
|
||||
use common_meta::cache::{LayeredCacheRegistryRef, TableRouteCacheRef};
|
||||
@@ -54,9 +55,11 @@ pub struct FrontendBuilder {
|
||||
node_manager: NodeManagerRef,
|
||||
plugins: Option<Plugins>,
|
||||
procedure_executor: ProcedureExecutorRef,
|
||||
process_manager: ProcessManagerRef,
|
||||
}
|
||||
|
||||
impl FrontendBuilder {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
options: FrontendOptions,
|
||||
kv_backend: KvBackendRef,
|
||||
@@ -64,6 +67,7 @@ impl FrontendBuilder {
|
||||
catalog_manager: CatalogManagerRef,
|
||||
node_manager: NodeManagerRef,
|
||||
procedure_executor: ProcedureExecutorRef,
|
||||
process_manager: ProcessManagerRef,
|
||||
) -> Self {
|
||||
Self {
|
||||
options,
|
||||
@@ -74,6 +78,7 @@ impl FrontendBuilder {
|
||||
node_manager,
|
||||
plugins: None,
|
||||
procedure_executor,
|
||||
process_manager,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -95,7 +100,7 @@ impl FrontendBuilder {
|
||||
let kv_backend = self.kv_backend;
|
||||
let node_manager = self.node_manager;
|
||||
let plugins = self.plugins.unwrap_or_default();
|
||||
|
||||
let process_manager = self.process_manager;
|
||||
let table_route_cache: TableRouteCacheRef =
|
||||
self.layered_cache_registry
|
||||
.get()
|
||||
@@ -175,6 +180,7 @@ impl FrontendBuilder {
|
||||
local_cache_invalidator,
|
||||
inserter.clone(),
|
||||
table_route_cache,
|
||||
Some(process_manager.clone()),
|
||||
));
|
||||
|
||||
let pipeline_operator = Arc::new(PipelineOperator::new(
|
||||
@@ -216,6 +222,7 @@ impl FrontendBuilder {
|
||||
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
|
||||
slow_query_recorder,
|
||||
limiter,
|
||||
process_manager,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,8 +35,8 @@ use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
use servers::query_handler::sql::SqlQueryHandler;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use table::metadata::TableId;
|
||||
use table::table_name::TableName;
|
||||
use table::TableRef;
|
||||
|
||||
use crate::error::{
|
||||
CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu,
|
||||
@@ -235,34 +235,33 @@ impl GrpcQueryHandler for Instance {
|
||||
|
||||
async fn put_record_batch(
|
||||
&self,
|
||||
table: &TableName,
|
||||
table_id: &mut Option<TableId>,
|
||||
table_name: &TableName,
|
||||
table_ref: &mut Option<TableRef>,
|
||||
decoder: &mut FlightDecoder,
|
||||
data: FlightData,
|
||||
) -> Result<AffectedRows> {
|
||||
let table_id = if let Some(table_id) = table_id {
|
||||
*table_id
|
||||
let table = if let Some(table) = table_ref {
|
||||
table.clone()
|
||||
} else {
|
||||
let table = self
|
||||
.catalog_manager()
|
||||
.table(
|
||||
&table.catalog_name,
|
||||
&table.schema_name,
|
||||
&table.table_name,
|
||||
&table_name.catalog_name,
|
||||
&table_name.schema_name,
|
||||
&table_name.table_name,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: table.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
})?;
|
||||
let id = table.table_info().table_id();
|
||||
*table_id = Some(id);
|
||||
id
|
||||
*table_ref = Some(table.clone());
|
||||
table
|
||||
};
|
||||
|
||||
self.inserter
|
||||
.handle_bulk_insert(table_id, decoder, data)
|
||||
.handle_bulk_insert(table, decoder, data)
|
||||
.await
|
||||
.context(TableOperationSnafu)
|
||||
}
|
||||
|
||||
@@ -23,3 +23,4 @@ pub(crate) mod metrics;
|
||||
pub mod server;
|
||||
pub mod service_config;
|
||||
pub(crate) mod slow_query_recorder;
|
||||
mod stream_wrapper;
|
||||
|
||||
@@ -20,6 +20,7 @@ use common_base::Plugins;
|
||||
use common_config::Configurable;
|
||||
use servers::error::Error as ServerError;
|
||||
use servers::grpc::builder::GrpcServerBuilder;
|
||||
use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
|
||||
use servers::grpc::greptime_handler::GreptimeRequestHandler;
|
||||
use servers::grpc::{GrpcOptions, GrpcServer, GrpcServerConfig};
|
||||
use servers::http::event::LogValidatorRef;
|
||||
@@ -154,13 +155,17 @@ where
|
||||
ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
|
||||
user_provider.clone(),
|
||||
runtime,
|
||||
opts.grpc.flight_compression,
|
||||
);
|
||||
|
||||
let frontend_grpc_handler =
|
||||
FrontendGrpcHandler::new(self.instance.process_manager().clone());
|
||||
let grpc_server = builder
|
||||
.database_handler(greptime_request_handler.clone())
|
||||
.prometheus_handler(self.instance.clone(), user_provider.clone())
|
||||
.otel_arrow_handler(OtelArrowServiceHandler(self.instance.clone()))
|
||||
.flight_handler(Arc::new(greptime_request_handler))
|
||||
.frontend_grpc_handler(frontend_grpc_handler)
|
||||
.build();
|
||||
Ok(grpc_server)
|
||||
}
|
||||
|
||||
367
src/frontend/src/stream_wrapper.rs
Normal file
367
src/frontend/src/stream_wrapper.rs
Normal file
@@ -0,0 +1,367 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use catalog::process_manager::Ticket;
|
||||
use common_recordbatch::adapter::RecordBatchMetrics;
|
||||
use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
|
||||
use datatypes::schema::SchemaRef;
|
||||
use futures::Stream;
|
||||
|
||||
pub struct CancellableStreamWrapper {
|
||||
inner: SendableRecordBatchStream,
|
||||
ticket: Ticket,
|
||||
}
|
||||
|
||||
impl Unpin for CancellableStreamWrapper {}
|
||||
|
||||
impl CancellableStreamWrapper {
|
||||
pub fn new(stream: SendableRecordBatchStream, ticket: Ticket) -> Self {
|
||||
Self {
|
||||
inner: stream,
|
||||
ticket,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for CancellableStreamWrapper {
|
||||
type Item = common_recordbatch::error::Result<RecordBatch>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let this = &mut *self;
|
||||
if this.ticket.cancellation_handle.is_cancelled() {
|
||||
return Poll::Ready(Some(common_recordbatch::error::StreamCancelledSnafu.fail()));
|
||||
}
|
||||
|
||||
if let Poll::Ready(res) = Pin::new(&mut this.inner).poll_next(cx) {
|
||||
return Poll::Ready(res);
|
||||
}
|
||||
|
||||
// on pending, register cancellation waker.
|
||||
this.ticket.cancellation_handle.waker().register(cx.waker());
|
||||
// check if canceled again.
|
||||
if this.ticket.cancellation_handle.is_cancelled() {
|
||||
return Poll::Ready(Some(common_recordbatch::error::StreamCancelledSnafu.fail()));
|
||||
}
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
impl RecordBatchStream for CancellableStreamWrapper {
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.inner.schema()
|
||||
}
|
||||
|
||||
fn output_ordering(&self) -> Option<&[OrderOption]> {
|
||||
self.inner.output_ordering()
|
||||
}
|
||||
|
||||
fn metrics(&self) -> Option<RecordBatchMetrics> {
|
||||
self.inner.metrics()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Duration;
|
||||
|
||||
use catalog::process_manager::ProcessManager;
|
||||
use common_recordbatch::adapter::RecordBatchMetrics;
|
||||
use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream};
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::prelude::VectorRef;
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
|
||||
use datatypes::vectors::Int32Vector;
|
||||
use futures::{Stream, StreamExt};
|
||||
use tokio::time::{sleep, timeout};
|
||||
|
||||
use super::CancellableStreamWrapper;
|
||||
|
||||
// Mock stream for testing
|
||||
struct MockRecordBatchStream {
|
||||
schema: SchemaRef,
|
||||
batches: Vec<common_recordbatch::error::Result<RecordBatch>>,
|
||||
current: usize,
|
||||
delay: Option<Duration>,
|
||||
}
|
||||
|
||||
impl MockRecordBatchStream {
|
||||
fn new(batches: Vec<common_recordbatch::error::Result<RecordBatch>>) -> Self {
|
||||
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
|
||||
"test_col",
|
||||
ConcreteDataType::int32_datatype(),
|
||||
false,
|
||||
)]));
|
||||
|
||||
Self {
|
||||
schema,
|
||||
batches,
|
||||
current: 0,
|
||||
delay: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn with_delay(mut self, delay: Duration) -> Self {
|
||||
self.delay = Some(delay);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for MockRecordBatchStream {
|
||||
type Item = common_recordbatch::error::Result<RecordBatch>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
if let Some(delay) = self.delay {
|
||||
// Simulate async delay
|
||||
let waker = cx.waker().clone();
|
||||
let delay_clone = delay;
|
||||
tokio::spawn(async move {
|
||||
sleep(delay_clone).await;
|
||||
waker.wake();
|
||||
});
|
||||
self.delay = None; // Only delay once
|
||||
return Poll::Pending;
|
||||
}
|
||||
|
||||
if self.current >= self.batches.len() {
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
|
||||
let batch = self.batches[self.current].as_ref().unwrap().clone();
|
||||
self.current += 1;
|
||||
Poll::Ready(Some(Ok(batch)))
|
||||
}
|
||||
}
|
||||
|
||||
impl RecordBatchStream for MockRecordBatchStream {
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.schema.clone()
|
||||
}
|
||||
|
||||
fn output_ordering(&self) -> Option<&[OrderOption]> {
|
||||
None
|
||||
}
|
||||
|
||||
fn metrics(&self) -> Option<RecordBatchMetrics> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn create_test_batch() -> RecordBatch {
|
||||
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
|
||||
"test_col",
|
||||
ConcreteDataType::int32_datatype(),
|
||||
false,
|
||||
)]));
|
||||
RecordBatch::new(
|
||||
schema,
|
||||
vec![Arc::new(Int32Vector::from_values(0..3)) as VectorRef],
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_stream_completes_normally() {
|
||||
let batch = create_test_batch();
|
||||
let mock_stream = MockRecordBatchStream::new(vec![Ok(batch.clone())]);
|
||||
let process_manager = Arc::new(ProcessManager::new("".to_string(), None));
|
||||
let ticket = process_manager.register_query(
|
||||
"catalog".to_string(),
|
||||
vec![],
|
||||
"query".to_string(),
|
||||
"client".to_string(),
|
||||
None,
|
||||
);
|
||||
|
||||
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
|
||||
|
||||
let result = cancellable_stream.next().await;
|
||||
assert!(result.is_some());
|
||||
assert!(result.unwrap().is_ok());
|
||||
|
||||
let end_result = cancellable_stream.next().await;
|
||||
assert!(end_result.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_stream_cancelled_before_start() {
|
||||
let batch = create_test_batch();
|
||||
let mock_stream = MockRecordBatchStream::new(vec![Ok(batch)]);
|
||||
let process_manager = Arc::new(ProcessManager::new("".to_string(), None));
|
||||
let ticket = process_manager.register_query(
|
||||
"catalog".to_string(),
|
||||
vec![],
|
||||
"query".to_string(),
|
||||
"client".to_string(),
|
||||
None,
|
||||
);
|
||||
|
||||
// Cancel before creating the wrapper
|
||||
ticket.cancellation_handle.cancel();
|
||||
|
||||
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
|
||||
|
||||
let result = cancellable_stream.next().await;
|
||||
assert!(result.is_some());
|
||||
assert!(result.unwrap().is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_stream_cancelled_during_execution() {
|
||||
let batch = create_test_batch();
|
||||
let mock_stream =
|
||||
MockRecordBatchStream::new(vec![Ok(batch)]).with_delay(Duration::from_millis(100));
|
||||
let process_manager = Arc::new(ProcessManager::new("".to_string(), None));
|
||||
let ticket = process_manager.register_query(
|
||||
"catalog".to_string(),
|
||||
vec![],
|
||||
"query".to_string(),
|
||||
"client".to_string(),
|
||||
None,
|
||||
);
|
||||
let cancellation_handle = ticket.cancellation_handle.clone();
|
||||
|
||||
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
|
||||
|
||||
// Cancel after a short delay
|
||||
tokio::spawn(async move {
|
||||
sleep(Duration::from_millis(50)).await;
|
||||
cancellation_handle.cancel();
|
||||
});
|
||||
|
||||
let result = cancellable_stream.next().await;
|
||||
assert!(result.is_some());
|
||||
assert!(result.unwrap().is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_stream_completes_before_cancellation() {
|
||||
let batch = create_test_batch();
|
||||
let mock_stream = MockRecordBatchStream::new(vec![Ok(batch.clone())]);
|
||||
let process_manager = Arc::new(ProcessManager::new("".to_string(), None));
|
||||
let ticket = process_manager.register_query(
|
||||
"catalog".to_string(),
|
||||
vec![],
|
||||
"query".to_string(),
|
||||
"client".to_string(),
|
||||
None,
|
||||
);
|
||||
let cancellation_handle = ticket.cancellation_handle.clone();
|
||||
|
||||
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
|
||||
|
||||
// Try to cancel after the stream should have completed
|
||||
tokio::spawn(async move {
|
||||
sleep(Duration::from_millis(100)).await;
|
||||
cancellation_handle.cancel();
|
||||
});
|
||||
|
||||
let result = cancellable_stream.next().await;
|
||||
assert!(result.is_some());
|
||||
assert!(result.unwrap().is_ok());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_multiple_batches() {
|
||||
let batch1 = create_test_batch();
|
||||
let batch2 = create_test_batch();
|
||||
let mock_stream = MockRecordBatchStream::new(vec![Ok(batch1), Ok(batch2)]);
|
||||
let process_manager = Arc::new(ProcessManager::new("".to_string(), None));
|
||||
let ticket = process_manager.register_query(
|
||||
"catalog".to_string(),
|
||||
vec![],
|
||||
"query".to_string(),
|
||||
"client".to_string(),
|
||||
None,
|
||||
);
|
||||
|
||||
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
|
||||
|
||||
// First batch
|
||||
let result1 = cancellable_stream.next().await;
|
||||
assert!(result1.is_some());
|
||||
assert!(result1.unwrap().is_ok());
|
||||
|
||||
// Second batch
|
||||
let result2 = cancellable_stream.next().await;
|
||||
assert!(result2.is_some());
|
||||
assert!(result2.unwrap().is_ok());
|
||||
|
||||
// End of stream
|
||||
let end_result = cancellable_stream.next().await;
|
||||
assert!(end_result.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_record_batch_stream_methods() {
|
||||
let batch = create_test_batch();
|
||||
let mock_stream = MockRecordBatchStream::new(vec![Ok(batch)]);
|
||||
let process_manager = Arc::new(ProcessManager::new("".to_string(), None));
|
||||
let ticket = process_manager.register_query(
|
||||
"catalog".to_string(),
|
||||
vec![],
|
||||
"query".to_string(),
|
||||
"client".to_string(),
|
||||
None,
|
||||
);
|
||||
|
||||
let cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
|
||||
|
||||
// Test schema method
|
||||
let schema = cancellable_stream.schema();
|
||||
assert_eq!(schema.column_schemas().len(), 1);
|
||||
assert_eq!(schema.column_schemas()[0].name, "test_col");
|
||||
|
||||
// Test output_ordering method
|
||||
assert!(cancellable_stream.output_ordering().is_none());
|
||||
|
||||
// Test metrics method
|
||||
assert!(cancellable_stream.metrics().is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cancellation_during_pending_poll() {
|
||||
let batch = create_test_batch();
|
||||
let mock_stream =
|
||||
MockRecordBatchStream::new(vec![Ok(batch)]).with_delay(Duration::from_millis(200));
|
||||
let process_manager = Arc::new(ProcessManager::new("".to_string(), None));
|
||||
let ticket = process_manager.register_query(
|
||||
"catalog".to_string(),
|
||||
vec![],
|
||||
"query".to_string(),
|
||||
"client".to_string(),
|
||||
None,
|
||||
);
|
||||
let cancellation_handle = ticket.cancellation_handle.clone();
|
||||
|
||||
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
|
||||
|
||||
// Cancel while the stream is pending
|
||||
tokio::spawn(async move {
|
||||
sleep(Duration::from_millis(50)).await;
|
||||
cancellation_handle.cancel();
|
||||
});
|
||||
|
||||
let result = timeout(Duration::from_millis(300), cancellable_stream.next()).await;
|
||||
assert!(result.is_ok());
|
||||
let stream_result = result.unwrap();
|
||||
assert!(stream_result.is_some());
|
||||
assert!(stream_result.unwrap().is_err());
|
||||
}
|
||||
}
|
||||
@@ -27,6 +27,7 @@ futures-util.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
itertools.workspace = true
|
||||
lazy_static = "1.4"
|
||||
mito-codec.workspace = true
|
||||
mito2.workspace = true
|
||||
mur3 = "0.1"
|
||||
object-store.workspace = true
|
||||
|
||||
@@ -115,7 +115,7 @@ pub enum Error {
|
||||
|
||||
#[snafu(display("Failed to encode primary key"))]
|
||||
EncodePrimaryKey {
|
||||
source: mito2::error::Error,
|
||||
source: mito_codec::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
@@ -18,7 +18,7 @@ use std::hash::Hash;
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
|
||||
use datatypes::value::ValueRef;
|
||||
use mito2::row_converter::SparsePrimaryKeyCodec;
|
||||
use mito_codec::row_converter::SparsePrimaryKeyCodec;
|
||||
use smallvec::SmallVec;
|
||||
use snafu::ResultExt;
|
||||
use store_api::codec::PrimaryKeyEncoding;
|
||||
|
||||
30
src/mito-codec/Cargo.toml
Normal file
30
src/mito-codec/Cargo.toml
Normal file
@@ -0,0 +1,30 @@
|
||||
[package]
|
||||
name = "mito-codec"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[features]
|
||||
default = []
|
||||
testing = []
|
||||
|
||||
[dependencies]
|
||||
api.workspace = true
|
||||
bytes.workspace = true
|
||||
common-base.workspace = true
|
||||
common-decimal.workspace = true
|
||||
common-error.workspace = true
|
||||
common-macro.workspace = true
|
||||
common-recordbatch.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
common-time.workspace = true
|
||||
datatypes.workspace = true
|
||||
memcomparable = "0.2"
|
||||
paste.workspace = true
|
||||
serde.workspace = true
|
||||
snafu.workspace = true
|
||||
store-api.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
datafusion-common.workspace = true
|
||||
datafusion-expr.workspace = true
|
||||
95
src/mito-codec/src/error.rs
Normal file
95
src/mito-codec/src/error.rs
Normal file
@@ -0,0 +1,95 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use snafu::{Location, Snafu};
|
||||
|
||||
/// Error definitions for mito encoding.
|
||||
#[derive(Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
#[stack_trace_debug]
|
||||
pub enum Error {
|
||||
#[snafu(display("Row value mismatches field data type"))]
|
||||
FieldTypeMismatch {
|
||||
// Box the source to reduce the size of the error.
|
||||
#[snafu(source(from(datatypes::error::Error, Box::new)))]
|
||||
source: Box<datatypes::error::Error>,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to serialize field"))]
|
||||
SerializeField {
|
||||
#[snafu(source)]
|
||||
error: memcomparable::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Data type: {} does not support serialization/deserialization",
|
||||
data_type,
|
||||
))]
|
||||
NotSupportedField {
|
||||
data_type: ConcreteDataType,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to deserialize field"))]
|
||||
DeserializeField {
|
||||
#[snafu(source)]
|
||||
error: memcomparable::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Operation not supported: {}", err_msg))]
|
||||
UnsupportedOperation {
|
||||
err_msg: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Encode null value"))]
|
||||
IndexEncodeNull {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
impl ErrorExt for Error {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
use Error::*;
|
||||
|
||||
match self {
|
||||
FieldTypeMismatch { source, .. } => source.status_code(),
|
||||
SerializeField { .. } | DeserializeField { .. } | IndexEncodeNull { .. } => {
|
||||
StatusCode::InvalidArguments
|
||||
}
|
||||
NotSupportedField { .. } | UnsupportedOperation { .. } => StatusCode::Unsupported,
|
||||
}
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
@@ -12,6 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Index codec utilities.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -47,7 +49,7 @@ impl IndexValueCodec {
|
||||
) -> Result<()> {
|
||||
ensure!(!value.is_null(), IndexEncodeNullSnafu);
|
||||
|
||||
if matches!(field.data_type, ConcreteDataType::String(_)) {
|
||||
if matches!(field.data_type(), ConcreteDataType::String(_)) {
|
||||
let value = value
|
||||
.as_string()
|
||||
.context(FieldTypeMismatchSnafu)?
|
||||
@@ -31,7 +31,7 @@ pub struct KeyValues {
|
||||
///
|
||||
/// This mutation must be a valid mutation and rows in the mutation
|
||||
/// must not be `None`.
|
||||
pub(crate) mutation: Mutation,
|
||||
pub mutation: Mutation,
|
||||
/// Key value read helper.
|
||||
helper: SparseReadRowHelper,
|
||||
/// Primary key encoding hint.
|
||||
@@ -333,8 +333,7 @@ mod tests {
|
||||
use api::v1::{self, ColumnDataType, SemanticType};
|
||||
|
||||
use super::*;
|
||||
use crate::test_util::i64_value;
|
||||
use crate::test_util::meta_util::TestRegionMetadataBuilder;
|
||||
use crate::test_util::{i64_value, TestRegionMetadataBuilder};
|
||||
|
||||
const TS_NAME: &str = "ts";
|
||||
const START_SEQ: SequenceNumber = 100;
|
||||
24
src/mito-codec/src/lib.rs
Normal file
24
src/mito-codec/src/lib.rs
Normal file
@@ -0,0 +1,24 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Codec utilities for the Mito protocol.
|
||||
|
||||
pub mod error;
|
||||
pub mod index;
|
||||
pub mod key_values;
|
||||
pub mod primary_key_filter;
|
||||
pub mod row_converter;
|
||||
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub mod test_util;
|
||||
@@ -19,12 +19,17 @@ use api::v1::SemanticType;
|
||||
use common_recordbatch::filter::SimpleFilterEvaluator;
|
||||
use datatypes::value::Value;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
|
||||
use store_api::storage::ColumnId;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::memtable::partition_tree::partition::Partition;
|
||||
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyFilter, SparsePrimaryKeyCodec};
|
||||
|
||||
/// Returns true if this is a partition column for metrics in the memtable.
|
||||
pub fn is_partition_column(name: &str) -> bool {
|
||||
name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct PrimaryKeyFilterInner {
|
||||
metadata: RegionMetadataRef,
|
||||
@@ -42,7 +47,7 @@ impl PrimaryKeyFilterInner {
|
||||
|
||||
let mut result = true;
|
||||
for filter in self.filters.iter() {
|
||||
if Partition::is_partition_column(filter.column_name()) {
|
||||
if is_partition_column(filter.column_name()) {
|
||||
continue;
|
||||
}
|
||||
let Some(column) = self.metadata.column_by_name(filter.column_name()) else {
|
||||
@@ -149,9 +154,8 @@ mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::SemanticType;
|
||||
use datafusion::logical_expr::BinaryExpr;
|
||||
use datafusion_common::{Column, ScalarValue};
|
||||
use datafusion_expr::{Expr, Operator};
|
||||
use datafusion_expr::{BinaryExpr, Expr, Operator};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use datatypes::value::ValueRef;
|
||||
164
src/mito-codec/src/row_converter.rs
Normal file
164
src/mito-codec/src/row_converter.rs
Normal file
@@ -0,0 +1,164 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub mod dense;
|
||||
pub mod sparse;
|
||||
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_recordbatch::filter::SimpleFilterEvaluator;
|
||||
use datatypes::value::{Value, ValueRef};
|
||||
pub use dense::{DensePrimaryKeyCodec, SortField};
|
||||
pub use sparse::{SparsePrimaryKeyCodec, SparseValues, COLUMN_ID_ENCODE_SIZE};
|
||||
use store_api::codec::PrimaryKeyEncoding;
|
||||
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
|
||||
use store_api::storage::ColumnId;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::key_values::KeyValue;
|
||||
|
||||
/// Row value encoder/decoder.
|
||||
pub trait PrimaryKeyCodecExt {
|
||||
/// Encodes rows to bytes.
|
||||
/// # Note
|
||||
/// Ensure the length of row iterator matches the length of fields.
|
||||
fn encode<'a, I>(&self, row: I) -> Result<Vec<u8>>
|
||||
where
|
||||
I: Iterator<Item = ValueRef<'a>>,
|
||||
{
|
||||
let mut buffer = Vec::new();
|
||||
self.encode_to_vec(row, &mut buffer)?;
|
||||
Ok(buffer)
|
||||
}
|
||||
|
||||
/// Encodes rows to specific vec.
|
||||
/// # Note
|
||||
/// Ensure the length of row iterator matches the length of fields.
|
||||
fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
|
||||
where
|
||||
I: Iterator<Item = ValueRef<'a>>;
|
||||
}
|
||||
|
||||
pub trait PrimaryKeyFilter: Send + Sync {
|
||||
/// Returns true if the primary key matches the filter.
|
||||
fn matches(&mut self, pk: &[u8]) -> bool;
|
||||
}
|
||||
|
||||
/// Composite values decoded from primary key bytes.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum CompositeValues {
|
||||
Dense(Vec<(ColumnId, Value)>),
|
||||
Sparse(SparseValues),
|
||||
}
|
||||
|
||||
impl CompositeValues {
|
||||
/// Extends the composite values with the given values.
|
||||
pub fn extend(&mut self, values: &[(ColumnId, Value)]) {
|
||||
match self {
|
||||
CompositeValues::Dense(dense_values) => {
|
||||
for (column_id, value) in values {
|
||||
dense_values.push((*column_id, value.clone()));
|
||||
}
|
||||
}
|
||||
CompositeValues::Sparse(sprase_value) => {
|
||||
for (column_id, value) in values {
|
||||
sprase_value.insert(*column_id, value.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
impl CompositeValues {
|
||||
pub fn into_sparse(self) -> SparseValues {
|
||||
match self {
|
||||
CompositeValues::Sparse(v) => v,
|
||||
_ => panic!("CompositeValues is not sparse"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_dense(self) -> Vec<Value> {
|
||||
match self {
|
||||
CompositeValues::Dense(v) => v.into_iter().map(|(_, v)| v).collect(),
|
||||
_ => panic!("CompositeValues is not dense"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait PrimaryKeyCodec: Send + Sync + Debug {
|
||||
/// Encodes a key value to bytes.
|
||||
fn encode_key_value(&self, key_value: &KeyValue, buffer: &mut Vec<u8>) -> Result<()>;
|
||||
|
||||
/// Encodes values to bytes.
|
||||
fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()>;
|
||||
|
||||
/// Encodes values to bytes.
|
||||
fn encode_value_refs(
|
||||
&self,
|
||||
values: &[(ColumnId, ValueRef)],
|
||||
buffer: &mut Vec<u8>,
|
||||
) -> Result<()>;
|
||||
|
||||
/// Returns the number of fields in the primary key.
|
||||
fn num_fields(&self) -> Option<usize>;
|
||||
|
||||
/// Returns a primary key filter factory.
|
||||
fn primary_key_filter(
|
||||
&self,
|
||||
metadata: &RegionMetadataRef,
|
||||
filters: Arc<Vec<SimpleFilterEvaluator>>,
|
||||
) -> Box<dyn PrimaryKeyFilter>;
|
||||
|
||||
/// Returns the estimated size of the primary key.
|
||||
fn estimated_size(&self) -> Option<usize> {
|
||||
None
|
||||
}
|
||||
|
||||
/// Returns the encoding type of the primary key.
|
||||
fn encoding(&self) -> PrimaryKeyEncoding;
|
||||
|
||||
/// Decodes the primary key from the given bytes.
|
||||
///
|
||||
/// Returns a [`CompositeValues`] that follows the primary key ordering.
|
||||
fn decode(&self, bytes: &[u8]) -> Result<CompositeValues>;
|
||||
|
||||
/// Decode the leftmost value from bytes.
|
||||
fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>>;
|
||||
}
|
||||
|
||||
/// Builds a primary key codec from region metadata.
|
||||
pub fn build_primary_key_codec(region_metadata: &RegionMetadata) -> Arc<dyn PrimaryKeyCodec> {
|
||||
let fields = region_metadata.primary_key_columns().map(|col| {
|
||||
(
|
||||
col.column_id,
|
||||
SortField::new(col.column_schema.data_type.clone()),
|
||||
)
|
||||
});
|
||||
build_primary_key_codec_with_fields(region_metadata.primary_key_encoding, fields)
|
||||
}
|
||||
|
||||
/// Builds a primary key codec from region metadata.
|
||||
pub fn build_primary_key_codec_with_fields(
|
||||
encoding: PrimaryKeyEncoding,
|
||||
fields: impl Iterator<Item = (ColumnId, SortField)>,
|
||||
) -> Arc<dyn PrimaryKeyCodec> {
|
||||
match encoding {
|
||||
PrimaryKeyEncoding::Dense => Arc::new(DensePrimaryKeyCodec::with_fields(fields.collect())),
|
||||
PrimaryKeyEncoding::Sparse => {
|
||||
Arc::new(SparsePrimaryKeyCodec::with_fields(fields.collect()))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -35,15 +35,16 @@ use store_api::storage::ColumnId;
|
||||
use crate::error::{
|
||||
self, FieldTypeMismatchSnafu, NotSupportedFieldSnafu, Result, SerializeFieldSnafu,
|
||||
};
|
||||
use crate::memtable::key_values::KeyValue;
|
||||
use crate::memtable::partition_tree::DensePrimaryKeyFilter;
|
||||
use crate::key_values::KeyValue;
|
||||
use crate::primary_key_filter::DensePrimaryKeyFilter;
|
||||
use crate::row_converter::{
|
||||
CompositeValues, PrimaryKeyCodec, PrimaryKeyCodecExt, PrimaryKeyFilter,
|
||||
};
|
||||
|
||||
/// Field to serialize and deserialize value in memcomparable format.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct SortField {
|
||||
pub(crate) data_type: ConcreteDataType,
|
||||
data_type: ConcreteDataType,
|
||||
}
|
||||
|
||||
impl SortField {
|
||||
@@ -51,6 +52,11 @@ impl SortField {
|
||||
Self { data_type }
|
||||
}
|
||||
|
||||
/// Returns the data type of the field.
|
||||
pub fn data_type(&self) -> &ConcreteDataType {
|
||||
&self.data_type
|
||||
}
|
||||
|
||||
pub fn estimated_size(&self) -> usize {
|
||||
match &self.data_type {
|
||||
ConcreteDataType::Boolean(_) => 2,
|
||||
@@ -75,10 +81,9 @@ impl SortField {
|
||||
| ConcreteDataType::Dictionary(_) => 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SortField {
|
||||
pub(crate) fn serialize(
|
||||
/// Serialize a value to the serializer.
|
||||
pub fn serialize(
|
||||
&self,
|
||||
serializer: &mut Serializer<&mut Vec<u8>>,
|
||||
value: &ValueRef,
|
||||
@@ -163,7 +168,8 @@ impl SortField {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn deserialize<B: Buf>(&self, deserializer: &mut Deserializer<B>) -> Result<Value> {
|
||||
/// Deserialize a value from the deserializer.
|
||||
pub fn deserialize<B: Buf>(&self, deserializer: &mut Deserializer<B>) -> Result<Value> {
|
||||
macro_rules! deserialize_and_build_value {
|
||||
(
|
||||
$self: ident;
|
||||
@@ -525,7 +531,7 @@ mod tests {
|
||||
let value = encoder.decode_value_at(&result, i, &mut offsets).unwrap();
|
||||
decoded.push(value);
|
||||
}
|
||||
assert_eq!(data_types.len(), offsets.len(), "offsets: {:?}", offsets);
|
||||
assert_eq!(data_types.len(), offsets.len(), "offsets: {offsets:?}");
|
||||
assert_eq!(decoded, row);
|
||||
}
|
||||
}
|
||||
@@ -27,8 +27,8 @@ use store_api::storage::consts::ReservedColumnId;
|
||||
use store_api::storage::ColumnId;
|
||||
|
||||
use crate::error::{DeserializeFieldSnafu, Result, SerializeFieldSnafu, UnsupportedOperationSnafu};
|
||||
use crate::memtable::key_values::KeyValue;
|
||||
use crate::memtable::partition_tree::SparsePrimaryKeyFilter;
|
||||
use crate::key_values::KeyValue;
|
||||
use crate::primary_key_filter::SparsePrimaryKeyFilter;
|
||||
use crate::row_converter::dense::SortField;
|
||||
use crate::row_converter::{CompositeValues, PrimaryKeyCodec, PrimaryKeyFilter};
|
||||
|
||||
@@ -205,7 +205,7 @@ impl SparsePrimaryKeyCodec {
|
||||
}
|
||||
|
||||
/// Returns the offset of the given column id in the given primary key.
|
||||
pub(crate) fn has_column(
|
||||
pub fn has_column(
|
||||
&self,
|
||||
pk: &[u8],
|
||||
offsets_map: &mut HashMap<u32, usize>,
|
||||
@@ -233,12 +233,7 @@ impl SparsePrimaryKeyCodec {
|
||||
}
|
||||
|
||||
/// Decode value at `offset` in `pk`.
|
||||
pub(crate) fn decode_value_at(
|
||||
&self,
|
||||
pk: &[u8],
|
||||
offset: usize,
|
||||
column_id: ColumnId,
|
||||
) -> Result<Value> {
|
||||
pub fn decode_value_at(&self, pk: &[u8], offset: usize, column_id: ColumnId) -> Result<Value> {
|
||||
let mut deserializer = Deserializer::new(pk);
|
||||
deserializer.advance(offset);
|
||||
// Safety: checked by `has_column`
|
||||
@@ -300,6 +295,40 @@ impl PrimaryKeyCodec for SparsePrimaryKeyCodec {
|
||||
}
|
||||
}
|
||||
|
||||
/// Field with column id.
|
||||
pub struct FieldWithId {
|
||||
pub field: SortField,
|
||||
pub column_id: ColumnId,
|
||||
}
|
||||
|
||||
/// A special encoder for memtable.
|
||||
pub struct SparseEncoder {
|
||||
fields: Vec<FieldWithId>,
|
||||
}
|
||||
|
||||
impl SparseEncoder {
|
||||
pub fn new(fields: Vec<FieldWithId>) -> Self {
|
||||
Self { fields }
|
||||
}
|
||||
|
||||
pub fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
|
||||
where
|
||||
I: Iterator<Item = ValueRef<'a>>,
|
||||
{
|
||||
let mut serializer = Serializer::new(buffer);
|
||||
for (value, field) in row.zip(self.fields.iter()) {
|
||||
if !value.is_null() {
|
||||
field
|
||||
.column_id
|
||||
.serialize(&mut serializer)
|
||||
.context(SerializeFieldSnafu)?;
|
||||
field.field.serialize(&mut serializer, &value)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
@@ -12,8 +12,10 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Utilities to create a [RegionMetadata](store_api::metadata::RegionMetadata).
|
||||
//! Test utilities for mito codec.
|
||||
|
||||
use api::greptime_proto::v1;
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::SemanticType;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
@@ -105,3 +107,10 @@ impl TestRegionMetadataBuilder {
|
||||
builder.build().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates value for i64.
|
||||
pub fn i64_value(data: i64) -> v1::Value {
|
||||
v1::Value {
|
||||
value_data: Some(ValueData::I64Value(data)),
|
||||
}
|
||||
}
|
||||
@@ -48,6 +48,7 @@ itertools.workspace = true
|
||||
lazy_static = "1.4"
|
||||
log-store = { workspace = true }
|
||||
memcomparable = "0.2"
|
||||
mito-codec.workspace = true
|
||||
moka = { workspace = true, features = ["sync", "future"] }
|
||||
object-store.workspace = true
|
||||
parquet = { workspace = true, features = ["async"] }
|
||||
@@ -82,6 +83,7 @@ common-test-util.workspace = true
|
||||
criterion = "0.4"
|
||||
dotenv.workspace = true
|
||||
log-store.workspace = true
|
||||
mito-codec = { workspace = true, features = ["testing"] }
|
||||
object-store = { workspace = true, features = ["services-memory"] }
|
||||
rskafka.workspace = true
|
||||
rstest.workspace = true
|
||||
|
||||
@@ -25,8 +25,8 @@ use mito2::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtable
|
||||
use mito2::memtable::time_series::TimeSeriesMemtable;
|
||||
use mito2::memtable::{KeyValues, Memtable};
|
||||
use mito2::region::options::MergeMode;
|
||||
use mito2::row_converter::DensePrimaryKeyCodec;
|
||||
use mito2::test_util::memtable_util::{self, region_metadata_to_row_schema};
|
||||
use mito_codec::row_converter::DensePrimaryKeyCodec;
|
||||
use rand::rngs::ThreadRng;
|
||||
use rand::seq::IndexedRandom;
|
||||
use rand::Rng;
|
||||
|
||||
@@ -18,8 +18,9 @@
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_base::BitVec;
|
||||
use common_time::Timestamp;
|
||||
use smallvec::{smallvec, SmallVec};
|
||||
|
||||
use crate::sst::file::FileHandle;
|
||||
use crate::sst::file::{FileHandle, FileId};
|
||||
|
||||
/// Default max compaction output file size when not specified.
|
||||
const DEFAULT_MAX_OUTPUT_SIZE: u64 = ReadableSize::gb(2).as_bytes();
|
||||
@@ -125,17 +126,68 @@ pub trait Item: Ranged + Clone {
|
||||
fn size(&self) -> usize;
|
||||
}
|
||||
|
||||
impl Ranged for FileHandle {
|
||||
type BoundType = Timestamp;
|
||||
/// A group of files that are created by the same compaction task.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct FileGroup {
|
||||
files: SmallVec<[FileHandle; 2]>,
|
||||
size: usize,
|
||||
num_rows: usize,
|
||||
min_timestamp: Timestamp,
|
||||
max_timestamp: Timestamp,
|
||||
}
|
||||
|
||||
fn range(&self) -> (Self::BoundType, Self::BoundType) {
|
||||
self.time_range()
|
||||
impl FileGroup {
|
||||
pub(crate) fn new_with_file(file: FileHandle) -> Self {
|
||||
let size = file.size() as usize;
|
||||
let (min_timestamp, max_timestamp) = file.time_range();
|
||||
let num_rows = file.num_rows();
|
||||
Self {
|
||||
files: smallvec![file],
|
||||
size,
|
||||
num_rows,
|
||||
min_timestamp,
|
||||
max_timestamp,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn num_rows(&self) -> usize {
|
||||
self.num_rows
|
||||
}
|
||||
|
||||
pub(crate) fn add_file(&mut self, file: FileHandle) {
|
||||
self.size += file.size() as usize;
|
||||
self.num_rows += file.num_rows();
|
||||
let (min_timestamp, max_timestamp) = file.time_range();
|
||||
self.min_timestamp = self.min_timestamp.min(min_timestamp);
|
||||
self.max_timestamp = self.max_timestamp.max(max_timestamp);
|
||||
self.files.push(file);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn files(&self) -> &[FileHandle] {
|
||||
&self.files[..]
|
||||
}
|
||||
|
||||
pub(crate) fn file_ids(&self) -> SmallVec<[FileId; 2]> {
|
||||
SmallVec::from_iter(self.files.iter().map(|f| f.file_id()))
|
||||
}
|
||||
|
||||
pub(crate) fn into_files(self) -> impl Iterator<Item = FileHandle> {
|
||||
self.files.into_iter()
|
||||
}
|
||||
}
|
||||
|
||||
impl Item for FileHandle {
|
||||
impl Ranged for FileGroup {
|
||||
type BoundType = Timestamp;
|
||||
|
||||
fn range(&self) -> (Self::BoundType, Self::BoundType) {
|
||||
(self.min_timestamp, self.max_timestamp)
|
||||
}
|
||||
}
|
||||
|
||||
impl Item for FileGroup {
|
||||
fn size(&self) -> usize {
|
||||
self.size() as usize
|
||||
self.size
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::num::NonZeroU64;
|
||||
|
||||
use common_time::Timestamp;
|
||||
|
||||
use crate::sst::file::{FileHandle, FileId, FileMeta, Level};
|
||||
@@ -23,6 +25,23 @@ pub fn new_file_handle(
|
||||
start_ts_millis: i64,
|
||||
end_ts_millis: i64,
|
||||
level: Level,
|
||||
) -> FileHandle {
|
||||
new_file_handle_with_sequence(
|
||||
file_id,
|
||||
start_ts_millis,
|
||||
end_ts_millis,
|
||||
level,
|
||||
start_ts_millis as u64,
|
||||
)
|
||||
}
|
||||
|
||||
/// Test util to create file handles.
|
||||
pub fn new_file_handle_with_sequence(
|
||||
file_id: FileId,
|
||||
start_ts_millis: i64,
|
||||
end_ts_millis: i64,
|
||||
level: Level,
|
||||
sequence: u64,
|
||||
) -> FileHandle {
|
||||
let file_purger = new_noop_file_purger();
|
||||
FileHandle::new(
|
||||
@@ -39,7 +58,7 @@ pub fn new_file_handle(
|
||||
index_file_size: 0,
|
||||
num_rows: 0,
|
||||
num_row_groups: 0,
|
||||
sequence: None,
|
||||
sequence: NonZeroU64::new(sequence),
|
||||
},
|
||||
file_purger,
|
||||
)
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::fmt::Debug;
|
||||
use std::num::NonZeroU64;
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_telemetry::info;
|
||||
@@ -26,7 +27,9 @@ use store_api::storage::RegionId;
|
||||
use crate::compaction::buckets::infer_time_bucket;
|
||||
use crate::compaction::compactor::CompactionRegion;
|
||||
use crate::compaction::picker::{Picker, PickerOutput};
|
||||
use crate::compaction::run::{find_sorted_runs, merge_seq_files, reduce_runs};
|
||||
use crate::compaction::run::{
|
||||
find_sorted_runs, merge_seq_files, reduce_runs, FileGroup, Item, Ranged,
|
||||
};
|
||||
use crate::compaction::{get_expired_ssts, CompactionOutput};
|
||||
use crate::sst::file::{overlaps, FileHandle, Level};
|
||||
use crate::sst::version::LevelMeta;
|
||||
@@ -60,7 +63,8 @@ impl TwcsPicker {
|
||||
if files.files.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let sorted_runs = find_sorted_runs(&mut files.files);
|
||||
let mut files_to_merge: Vec<_> = files.files().cloned().collect();
|
||||
let sorted_runs = find_sorted_runs(&mut files_to_merge);
|
||||
let found_runs = sorted_runs.len();
|
||||
// We only remove deletion markers if we found less than 2 runs and not in append mode.
|
||||
// because after compaction there will be no overlapping files.
|
||||
@@ -90,7 +94,7 @@ impl TwcsPicker {
|
||||
);
|
||||
output.push(CompactionOutput {
|
||||
output_level: LEVEL_COMPACTED, // always compact to l1
|
||||
inputs,
|
||||
inputs: inputs.into_iter().flat_map(|fg| fg.into_files()).collect(),
|
||||
filter_deleted,
|
||||
output_time_range: None, // we do not enforce output time range in twcs compactions.
|
||||
});
|
||||
@@ -109,21 +113,21 @@ fn log_pick_result(
|
||||
file_num: usize,
|
||||
max_output_file_size: Option<u64>,
|
||||
filter_deleted: bool,
|
||||
inputs: &[FileHandle],
|
||||
inputs: &[FileGroup],
|
||||
) {
|
||||
let input_file_str: Vec<String> = inputs
|
||||
.iter()
|
||||
.map(|f| {
|
||||
let range = f.time_range();
|
||||
let range = f.range();
|
||||
let start = range.0.to_iso8601_string();
|
||||
let end = range.1.to_iso8601_string();
|
||||
let num_rows = f.num_rows();
|
||||
format!(
|
||||
"SST{{id: {}, range: ({}, {}), size: {}, num rows: {} }}",
|
||||
f.file_id(),
|
||||
"FileGroup{{id: {:?}, range: ({}, {}), size: {}, num rows: {} }}",
|
||||
f.file_ids(),
|
||||
start,
|
||||
end,
|
||||
ReadableSize(f.size()),
|
||||
ReadableSize(f.size() as u64),
|
||||
num_rows
|
||||
)
|
||||
})
|
||||
@@ -198,7 +202,9 @@ impl Picker for TwcsPicker {
|
||||
struct Window {
|
||||
start: Timestamp,
|
||||
end: Timestamp,
|
||||
files: Vec<FileHandle>,
|
||||
// Mapping from file sequence to file groups. Files with the same sequence is considered
|
||||
// created from the same compaction task.
|
||||
files: HashMap<Option<NonZeroU64>, FileGroup>,
|
||||
time_window: i64,
|
||||
overlapping: bool,
|
||||
}
|
||||
@@ -207,10 +213,11 @@ impl Window {
|
||||
/// Creates a new [Window] with given file.
|
||||
fn new_with_file(file: FileHandle) -> Self {
|
||||
let (start, end) = file.time_range();
|
||||
let files = HashMap::from([(file.meta_ref().sequence, FileGroup::new_with_file(file))]);
|
||||
Self {
|
||||
start,
|
||||
end,
|
||||
files: vec![file],
|
||||
files,
|
||||
time_window: 0,
|
||||
overlapping: false,
|
||||
}
|
||||
@@ -226,7 +233,19 @@ impl Window {
|
||||
let (start, end) = file.time_range();
|
||||
self.start = self.start.min(start);
|
||||
self.end = self.end.max(end);
|
||||
self.files.push(file);
|
||||
|
||||
match self.files.entry(file.meta_ref().sequence) {
|
||||
Entry::Occupied(mut o) => {
|
||||
o.get_mut().add_file(file);
|
||||
}
|
||||
Entry::Vacant(v) => {
|
||||
v.insert(FileGroup::new_with_file(file));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn files(&self) -> impl Iterator<Item = &FileGroup> {
|
||||
self.files.values()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -311,7 +330,7 @@ mod tests {
|
||||
use std::collections::HashSet;
|
||||
|
||||
use super::*;
|
||||
use crate::compaction::test_util::new_file_handle;
|
||||
use crate::compaction::test_util::{new_file_handle, new_file_handle_with_sequence};
|
||||
use crate::sst::file::{FileId, Level};
|
||||
|
||||
#[test]
|
||||
@@ -371,7 +390,9 @@ mod tests {
|
||||
.iter(),
|
||||
3,
|
||||
);
|
||||
assert_eq!(5, windows.get(&0).unwrap().files.len());
|
||||
let fgs = &windows.get(&0).unwrap().files;
|
||||
assert_eq!(1, fgs.len());
|
||||
assert_eq!(fgs.values().map(|f| f.files().len()).sum::<usize>(), 5);
|
||||
|
||||
let files = [FileId::random(); 3];
|
||||
let windows = assign_to_windows(
|
||||
@@ -385,15 +406,56 @@ mod tests {
|
||||
);
|
||||
assert_eq!(
|
||||
files[0],
|
||||
windows.get(&0).unwrap().files.first().unwrap().file_id()
|
||||
windows.get(&0).unwrap().files().next().unwrap().files()[0].file_id()
|
||||
);
|
||||
assert_eq!(
|
||||
files[1],
|
||||
windows.get(&3).unwrap().files.first().unwrap().file_id()
|
||||
windows.get(&3).unwrap().files().next().unwrap().files()[0].file_id()
|
||||
);
|
||||
assert_eq!(
|
||||
files[2],
|
||||
windows.get(&12).unwrap().files.first().unwrap().file_id()
|
||||
windows.get(&12).unwrap().files().next().unwrap().files()[0].file_id()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_assign_file_groups_to_windows() {
|
||||
let files = [
|
||||
FileId::random(),
|
||||
FileId::random(),
|
||||
FileId::random(),
|
||||
FileId::random(),
|
||||
];
|
||||
let windows = assign_to_windows(
|
||||
[
|
||||
new_file_handle_with_sequence(files[0], 0, 999, 0, 1),
|
||||
new_file_handle_with_sequence(files[1], 0, 999, 0, 1),
|
||||
new_file_handle_with_sequence(files[2], 0, 999, 0, 2),
|
||||
new_file_handle_with_sequence(files[3], 0, 999, 0, 2),
|
||||
]
|
||||
.iter(),
|
||||
3,
|
||||
);
|
||||
assert_eq!(windows.len(), 1);
|
||||
let fgs = &windows.get(&0).unwrap().files;
|
||||
assert_eq!(2, fgs.len());
|
||||
assert_eq!(
|
||||
fgs.get(&NonZeroU64::new(1))
|
||||
.unwrap()
|
||||
.files()
|
||||
.iter()
|
||||
.map(|f| f.file_id())
|
||||
.collect::<HashSet<_>>(),
|
||||
[files[0], files[1]].into_iter().collect()
|
||||
);
|
||||
assert_eq!(
|
||||
fgs.get(&NonZeroU64::new(2))
|
||||
.unwrap()
|
||||
.files()
|
||||
.iter()
|
||||
.map(|f| f.file_id())
|
||||
.collect::<HashSet<_>>(),
|
||||
[files[2], files[3]].into_iter().collect()
|
||||
);
|
||||
}
|
||||
|
||||
@@ -408,8 +470,22 @@ mod tests {
|
||||
];
|
||||
files[0].set_compacting(true);
|
||||
files[2].set_compacting(true);
|
||||
let windows = assign_to_windows(files.iter(), 3);
|
||||
assert_eq!(3, windows.get(&0).unwrap().files.len());
|
||||
let mut windows = assign_to_windows(files.iter(), 3);
|
||||
let window0 = windows.remove(&0).unwrap();
|
||||
assert_eq!(1, window0.files.len());
|
||||
let candidates = window0
|
||||
.files
|
||||
.into_values()
|
||||
.flat_map(|fg| fg.into_files())
|
||||
.map(|f| f.file_id())
|
||||
.collect::<HashSet<_>>();
|
||||
assert_eq!(candidates.len(), 3);
|
||||
assert_eq!(
|
||||
candidates,
|
||||
[files[1].file_id(), files[3].file_id(), files[4].file_id()]
|
||||
.into_iter()
|
||||
.collect::<HashSet<_>>()
|
||||
);
|
||||
}
|
||||
|
||||
/// (Window value, overlapping, files' time ranges in window)
|
||||
@@ -438,9 +514,11 @@ mod tests {
|
||||
let mut file_ranges = actual_window
|
||||
.files
|
||||
.iter()
|
||||
.map(|f| {
|
||||
let (s, e) = f.time_range();
|
||||
(s.value(), e.value())
|
||||
.flat_map(|(_, f)| {
|
||||
f.files().iter().map(|f| {
|
||||
let (s, e) = f.time_range();
|
||||
(s.value(), e.value())
|
||||
})
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
file_ranges.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)));
|
||||
@@ -607,10 +685,10 @@ mod tests {
|
||||
CompactionPickerTestCase {
|
||||
window_size: 3,
|
||||
input_files: [
|
||||
new_file_handle(file_ids[0], -2000, -3, 0),
|
||||
new_file_handle(file_ids[1], -3000, -100, 0),
|
||||
new_file_handle(file_ids[2], 0, 2999, 0), //active windows
|
||||
new_file_handle(file_ids[3], 50, 2998, 0), //active windows
|
||||
new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
|
||||
new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
|
||||
new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3), //active windows
|
||||
new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4), //active windows
|
||||
]
|
||||
.to_vec(),
|
||||
expected_outputs: vec![
|
||||
@@ -636,11 +714,11 @@ mod tests {
|
||||
CompactionPickerTestCase {
|
||||
window_size: 3,
|
||||
input_files: [
|
||||
new_file_handle(file_ids[0], -2000, -3, 0),
|
||||
new_file_handle(file_ids[1], -3000, -100, 0),
|
||||
new_file_handle(file_ids[2], 0, 2999, 0),
|
||||
new_file_handle(file_ids[3], 50, 2998, 0),
|
||||
new_file_handle(file_ids[4], 11, 2990, 0),
|
||||
new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
|
||||
new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
|
||||
new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3),
|
||||
new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4),
|
||||
new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 5),
|
||||
]
|
||||
.to_vec(),
|
||||
expected_outputs: vec![
|
||||
@@ -655,6 +733,27 @@ mod tests {
|
||||
],
|
||||
}
|
||||
.check();
|
||||
|
||||
// Case 3:
|
||||
// A compaction may split output into several files that have overlapping time ranges and same sequence,
|
||||
// we should treat these files as one FileGroup.
|
||||
let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
|
||||
CompactionPickerTestCase {
|
||||
window_size: 3,
|
||||
input_files: [
|
||||
new_file_handle_with_sequence(file_ids[0], 0, 2999, 1, 1),
|
||||
new_file_handle_with_sequence(file_ids[1], 0, 2998, 1, 1),
|
||||
new_file_handle_with_sequence(file_ids[2], 3000, 5999, 1, 2),
|
||||
new_file_handle_with_sequence(file_ids[3], 3000, 5000, 1, 2),
|
||||
new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 3),
|
||||
]
|
||||
.to_vec(),
|
||||
expected_outputs: vec![ExpectedOutput {
|
||||
input_files: vec![0, 1, 4],
|
||||
output_level: 1,
|
||||
}],
|
||||
}
|
||||
.check();
|
||||
}
|
||||
|
||||
// TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.
|
||||
|
||||
@@ -42,6 +42,13 @@ use crate::worker::WorkerId;
|
||||
#[snafu(visibility(pub))]
|
||||
#[stack_trace_debug]
|
||||
pub enum Error {
|
||||
#[snafu(display("Unexpected data type"))]
|
||||
DataTypeMismatch {
|
||||
source: datatypes::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("External error, context: {}", context))]
|
||||
External {
|
||||
source: BoxedError,
|
||||
@@ -291,35 +298,6 @@ pub enum Error {
|
||||
#[snafu(display("Failed to write region"))]
|
||||
WriteGroup { source: Arc<Error> },
|
||||
|
||||
#[snafu(display("Row value mismatches field data type"))]
|
||||
FieldTypeMismatch { source: datatypes::error::Error },
|
||||
|
||||
#[snafu(display("Failed to serialize field"))]
|
||||
SerializeField {
|
||||
#[snafu(source)]
|
||||
error: memcomparable::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Data type: {} does not support serialization/deserialization",
|
||||
data_type,
|
||||
))]
|
||||
NotSupportedField {
|
||||
data_type: ConcreteDataType,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to deserialize field"))]
|
||||
DeserializeField {
|
||||
#[snafu(source)]
|
||||
error: memcomparable::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid parquet SST file {}, reason: {}", file, reason))]
|
||||
InvalidParquet {
|
||||
file: String,
|
||||
@@ -1028,6 +1006,20 @@ pub enum Error {
|
||||
location: Location,
|
||||
source: common_grpc::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to encode"))]
|
||||
Encode {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
source: mito_codec::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to decode"))]
|
||||
Decode {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
source: mito_codec::error::Error,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
@@ -1052,6 +1044,7 @@ impl ErrorExt for Error {
|
||||
use Error::*;
|
||||
|
||||
match self {
|
||||
DataTypeMismatch { source, .. } => source.status_code(),
|
||||
OpenDal { .. } | ReadParquet { .. } => StatusCode::StorageUnavailable,
|
||||
WriteWal { source, .. } | ReadWal { source, .. } | DeleteWal { source, .. } => {
|
||||
source.status_code()
|
||||
@@ -1095,7 +1088,6 @@ impl ErrorExt for Error {
|
||||
| BiErrors { .. }
|
||||
| StopScheduler { .. }
|
||||
| ComputeVector { .. }
|
||||
| SerializeField { .. }
|
||||
| EncodeMemtable { .. }
|
||||
| CreateDir { .. }
|
||||
| ReadDataPart { .. }
|
||||
@@ -1107,9 +1099,7 @@ impl ErrorExt for Error {
|
||||
|
||||
WriteParquet { .. } => StatusCode::StorageUnavailable,
|
||||
WriteGroup { source, .. } => source.status_code(),
|
||||
FieldTypeMismatch { source, .. } => source.status_code(),
|
||||
NotSupportedField { .. } => StatusCode::Unsupported,
|
||||
DeserializeField { .. } | EncodeSparsePrimaryKey { .. } => StatusCode::Unexpected,
|
||||
EncodeSparsePrimaryKey { .. } => StatusCode::Unexpected,
|
||||
InvalidBatch { .. } => StatusCode::InvalidArguments,
|
||||
InvalidRecordBatch { .. } => StatusCode::InvalidArguments,
|
||||
ConvertVector { source, .. } => source.status_code(),
|
||||
@@ -1181,7 +1171,9 @@ impl ErrorExt for Error {
|
||||
ScanSeries { source, .. } => source.status_code(),
|
||||
|
||||
ScanMultiTimes { .. } => StatusCode::InvalidArguments,
|
||||
Error::ConvertBulkWalEntry { source, .. } => source.status_code(),
|
||||
ConvertBulkWalEntry { source, .. } => source.status_code(),
|
||||
|
||||
Encode { source, .. } | Decode { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -41,7 +41,6 @@ pub mod read;
|
||||
pub mod region;
|
||||
mod region_write_ctx;
|
||||
pub mod request;
|
||||
pub mod row_converter;
|
||||
pub mod schedule;
|
||||
pub mod sst;
|
||||
mod time_provider;
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user