mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 14:40:01 +00:00
Compare commits
2 Commits
feature/df
...
v0.14.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5c9cbb5f4c | ||
|
|
e2df38d0d1 |
37
.github/scripts/update-dev-builder-version.sh
vendored
Executable file
37
.github/scripts/update-dev-builder-version.sh
vendored
Executable file
@@ -0,0 +1,37 @@
|
||||
#!/bin/bash
|
||||
|
||||
DEV_BUILDER_IMAGE_TAG=$1
|
||||
|
||||
update_dev_builder_version() {
|
||||
if [ -z "$DEV_BUILDER_IMAGE_TAG" ]; then
|
||||
echo "Error: Should specify the dev-builder image tag"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Configure Git configs.
|
||||
git config --global user.email greptimedb-ci@greptime.com
|
||||
git config --global user.name greptimedb-ci
|
||||
|
||||
# Checkout a new branch.
|
||||
BRANCH_NAME="ci/update-dev-builder-$(date +%Y%m%d%H%M%S)"
|
||||
git checkout -b $BRANCH_NAME
|
||||
|
||||
# Update the dev-builder image tag in the Makefile.
|
||||
gsed -i "s/DEV_BUILDER_IMAGE_TAG ?=.*/DEV_BUILDER_IMAGE_TAG ?= ${DEV_BUILDER_IMAGE_TAG}/g" Makefile
|
||||
|
||||
# Commit the changes.
|
||||
git add Makefile
|
||||
git commit -m "ci: update dev-builder image tag"
|
||||
git push origin $BRANCH_NAME
|
||||
|
||||
# Create a Pull Request.
|
||||
gh pr create \
|
||||
--title "ci: update dev-builder image tag" \
|
||||
--body "This PR updates the dev-builder image tag" \
|
||||
--base main \
|
||||
--head $BRANCH_NAME \
|
||||
--reviewer zyy17 \
|
||||
--reviewer daviderli614
|
||||
}
|
||||
|
||||
update_dev_builder_version
|
||||
@@ -24,11 +24,19 @@ on:
|
||||
description: Release dev-builder-android image
|
||||
required: false
|
||||
default: false
|
||||
update_dev_builder_image_tag:
|
||||
type: boolean
|
||||
description: Update the DEV_BUILDER_IMAGE_TAG in Makefile and create a PR
|
||||
required: false
|
||||
default: false
|
||||
|
||||
jobs:
|
||||
release-dev-builder-images:
|
||||
name: Release dev builder images
|
||||
if: ${{ inputs.release_dev_builder_ubuntu_image || inputs.release_dev_builder_centos_image || inputs.release_dev_builder_android_image }} # Only manually trigger this job.
|
||||
# The jobs are triggered by the following events:
|
||||
# 1. Manually triggered workflow_dispatch event
|
||||
# 2. Push event when the PR that modifies the `rust-toolchain.toml` or `docker/dev-builder/**` is merged to main
|
||||
if: ${{ github.event_name == 'push' || inputs.release_dev_builder_ubuntu_image || inputs.release_dev_builder_centos_image || inputs.release_dev_builder_android_image }}
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
version: ${{ steps.set-version.outputs.version }}
|
||||
@@ -57,9 +65,9 @@ jobs:
|
||||
version: ${{ env.VERSION }}
|
||||
dockerhub-image-registry-username: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
dockerhub-image-registry-token: ${{ secrets.DOCKERHUB_TOKEN }}
|
||||
build-dev-builder-ubuntu: ${{ inputs.release_dev_builder_ubuntu_image }}
|
||||
build-dev-builder-centos: ${{ inputs.release_dev_builder_centos_image }}
|
||||
build-dev-builder-android: ${{ inputs.release_dev_builder_android_image }}
|
||||
build-dev-builder-ubuntu: ${{ inputs.release_dev_builder_ubuntu_image || github.event_name == 'push' }}
|
||||
build-dev-builder-centos: ${{ inputs.release_dev_builder_centos_image || github.event_name == 'push' }}
|
||||
build-dev-builder-android: ${{ inputs.release_dev_builder_android_image || github.event_name == 'push' }}
|
||||
|
||||
release-dev-builder-images-ecr:
|
||||
name: Release dev builder images to AWS ECR
|
||||
@@ -85,7 +93,7 @@ jobs:
|
||||
|
||||
- name: Push dev-builder-ubuntu image
|
||||
shell: bash
|
||||
if: ${{ inputs.release_dev_builder_ubuntu_image }}
|
||||
if: ${{ inputs.release_dev_builder_ubuntu_image || github.event_name == 'push' }}
|
||||
env:
|
||||
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
|
||||
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
|
||||
@@ -106,7 +114,7 @@ jobs:
|
||||
|
||||
- name: Push dev-builder-centos image
|
||||
shell: bash
|
||||
if: ${{ inputs.release_dev_builder_centos_image }}
|
||||
if: ${{ inputs.release_dev_builder_centos_image || github.event_name == 'push' }}
|
||||
env:
|
||||
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
|
||||
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
|
||||
@@ -127,7 +135,7 @@ jobs:
|
||||
|
||||
- name: Push dev-builder-android image
|
||||
shell: bash
|
||||
if: ${{ inputs.release_dev_builder_android_image }}
|
||||
if: ${{ inputs.release_dev_builder_android_image || github.event_name == 'push' }}
|
||||
env:
|
||||
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
|
||||
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
|
||||
@@ -162,7 +170,7 @@ jobs:
|
||||
|
||||
- name: Push dev-builder-ubuntu image
|
||||
shell: bash
|
||||
if: ${{ inputs.release_dev_builder_ubuntu_image }}
|
||||
if: ${{ inputs.release_dev_builder_ubuntu_image || github.event_name == 'push' }}
|
||||
env:
|
||||
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
|
||||
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
|
||||
@@ -176,7 +184,7 @@ jobs:
|
||||
|
||||
- name: Push dev-builder-centos image
|
||||
shell: bash
|
||||
if: ${{ inputs.release_dev_builder_centos_image }}
|
||||
if: ${{ inputs.release_dev_builder_centos_image || github.event_name == 'push' }}
|
||||
env:
|
||||
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
|
||||
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
|
||||
@@ -190,7 +198,7 @@ jobs:
|
||||
|
||||
- name: Push dev-builder-android image
|
||||
shell: bash
|
||||
if: ${{ inputs.release_dev_builder_android_image }}
|
||||
if: ${{ inputs.release_dev_builder_android_image || github.event_name == 'push' }}
|
||||
env:
|
||||
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
|
||||
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
|
||||
@@ -201,3 +209,24 @@ jobs:
|
||||
quay.io/skopeo/stable:latest \
|
||||
copy -a docker://docker.io/$IMAGE_NAMESPACE/dev-builder-android:$IMAGE_VERSION \
|
||||
docker://$ACR_IMAGE_REGISTRY/$IMAGE_NAMESPACE/dev-builder-android:$IMAGE_VERSION
|
||||
|
||||
update-dev-builder-image-tag:
|
||||
name: Update dev-builder image tag
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: write
|
||||
pull-requests: write
|
||||
if: ${{ github.event_name == 'push' || inputs.update_dev_builder_image_tag }}
|
||||
needs: [
|
||||
release-dev-builder-images
|
||||
]
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Update dev-builder image tag
|
||||
shell: bash
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
run: |
|
||||
./.github/scripts/update-dev-builder-version.sh ${{ needs.release-dev-builder-images.outputs.version }}
|
||||
|
||||
146
Cargo.lock
generated
146
Cargo.lock
generated
@@ -185,7 +185,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
|
||||
|
||||
[[package]]
|
||||
name = "api"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"common-base",
|
||||
"common-decimal",
|
||||
@@ -915,7 +915,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "auth"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -1537,7 +1537,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cache"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"catalog",
|
||||
"common-error",
|
||||
@@ -1561,7 +1561,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
|
||||
|
||||
[[package]]
|
||||
name = "catalog"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow 54.2.1",
|
||||
@@ -1874,7 +1874,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
|
||||
|
||||
[[package]]
|
||||
name = "cli"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"auth",
|
||||
@@ -1917,7 +1917,7 @@ dependencies = [
|
||||
"session",
|
||||
"snafu 0.8.5",
|
||||
"store-api",
|
||||
"substrait 0.14.0",
|
||||
"substrait 0.14.2",
|
||||
"table",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
@@ -1926,7 +1926,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "client"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -1955,7 +1955,7 @@ dependencies = [
|
||||
"rand 0.9.0",
|
||||
"serde_json",
|
||||
"snafu 0.8.5",
|
||||
"substrait 0.14.0",
|
||||
"substrait 0.14.2",
|
||||
"substrait 0.37.3",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
@@ -1996,7 +1996,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cmd"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"auth",
|
||||
@@ -2056,7 +2056,7 @@ dependencies = [
|
||||
"similar-asserts",
|
||||
"snafu 0.8.5",
|
||||
"store-api",
|
||||
"substrait 0.14.0",
|
||||
"substrait 0.14.2",
|
||||
"table",
|
||||
"temp-env",
|
||||
"tempfile",
|
||||
@@ -2102,7 +2102,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
|
||||
|
||||
[[package]]
|
||||
name = "common-base"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"anymap2",
|
||||
"async-trait",
|
||||
@@ -2124,11 +2124,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-catalog"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
|
||||
[[package]]
|
||||
name = "common-config"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"common-base",
|
||||
"common-error",
|
||||
@@ -2153,7 +2153,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-datasource"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"arrow-schema 54.3.1",
|
||||
@@ -2190,7 +2190,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-decimal"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"bigdecimal 0.4.8",
|
||||
"common-error",
|
||||
@@ -2203,7 +2203,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-error"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"common-macro",
|
||||
"http 1.1.0",
|
||||
@@ -2214,7 +2214,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-frontend"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"common-error",
|
||||
@@ -2224,7 +2224,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-function"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -2277,7 +2277,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-greptimedb-telemetry"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"common-runtime",
|
||||
@@ -2294,7 +2294,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-grpc"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow-flight",
|
||||
@@ -2325,7 +2325,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-grpc-expr"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"common-base",
|
||||
@@ -2344,7 +2344,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-macro"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"common-query",
|
||||
@@ -2358,7 +2358,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-mem-prof"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"common-error",
|
||||
"common-macro",
|
||||
@@ -2371,7 +2371,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-meta"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"anymap2",
|
||||
"api",
|
||||
@@ -2432,7 +2432,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-options"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"common-grpc",
|
||||
"humantime-serde",
|
||||
@@ -2441,11 +2441,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-plugins"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
|
||||
[[package]]
|
||||
name = "common-pprof"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"common-error",
|
||||
"common-macro",
|
||||
@@ -2457,7 +2457,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-procedure"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
@@ -2484,7 +2484,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-procedure-test"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"common-procedure",
|
||||
@@ -2493,7 +2493,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-query"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -2519,7 +2519,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-recordbatch"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"common-error",
|
||||
@@ -2539,7 +2539,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-runtime"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"clap 4.5.19",
|
||||
@@ -2569,14 +2569,14 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-session"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"strum 0.27.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "common-telemetry"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"atty",
|
||||
"backtrace",
|
||||
@@ -2604,7 +2604,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-test-util"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"client",
|
||||
"common-query",
|
||||
@@ -2616,7 +2616,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-time"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"chrono",
|
||||
@@ -2634,7 +2634,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-version"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"build-data",
|
||||
"const_format",
|
||||
@@ -2644,7 +2644,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-wal"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"common-base",
|
||||
"common-error",
|
||||
@@ -3572,7 +3572,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "datanode"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow-flight",
|
||||
@@ -3624,7 +3624,7 @@ dependencies = [
|
||||
"session",
|
||||
"snafu 0.8.5",
|
||||
"store-api",
|
||||
"substrait 0.14.0",
|
||||
"substrait 0.14.2",
|
||||
"table",
|
||||
"tokio",
|
||||
"toml 0.8.19",
|
||||
@@ -3633,7 +3633,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "datatypes"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"arrow-array 54.2.1",
|
||||
@@ -4259,7 +4259,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "file-engine"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -4382,7 +4382,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
|
||||
|
||||
[[package]]
|
||||
name = "flow"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow 54.2.1",
|
||||
@@ -4444,7 +4444,7 @@ dependencies = [
|
||||
"snafu 0.8.5",
|
||||
"store-api",
|
||||
"strum 0.27.1",
|
||||
"substrait 0.14.0",
|
||||
"substrait 0.14.2",
|
||||
"table",
|
||||
"tokio",
|
||||
"tonic 0.12.3",
|
||||
@@ -4499,7 +4499,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
|
||||
|
||||
[[package]]
|
||||
name = "frontend"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -4556,7 +4556,7 @@ dependencies = [
|
||||
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
|
||||
"store-api",
|
||||
"strfmt",
|
||||
"substrait 0.14.0",
|
||||
"substrait 0.14.2",
|
||||
"table",
|
||||
"tokio",
|
||||
"toml 0.8.19",
|
||||
@@ -5795,7 +5795,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "index"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"asynchronous-codec",
|
||||
@@ -6605,7 +6605,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
|
||||
|
||||
[[package]]
|
||||
name = "log-query"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"common-error",
|
||||
@@ -6617,7 +6617,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "log-store"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
@@ -6911,7 +6911,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "meta-client"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -6939,7 +6939,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "meta-srv"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -7029,7 +7029,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "metric-engine"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"aquamarine",
|
||||
@@ -7118,7 +7118,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "mito2"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"aquamarine",
|
||||
@@ -7824,7 +7824,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "object-store"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bytes",
|
||||
@@ -8119,7 +8119,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "operator"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -8168,7 +8168,7 @@ dependencies = [
|
||||
"sql",
|
||||
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
|
||||
"store-api",
|
||||
"substrait 0.14.0",
|
||||
"substrait 0.14.2",
|
||||
"table",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
@@ -8423,7 +8423,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "partition"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -8705,7 +8705,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
|
||||
|
||||
[[package]]
|
||||
name = "pipeline"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -8847,7 +8847,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "plugins"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"auth",
|
||||
"clap 4.5.19",
|
||||
@@ -9127,7 +9127,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "promql"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"async-trait",
|
||||
@@ -9373,7 +9373,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "puffin"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"async-compression 0.4.13",
|
||||
"async-trait",
|
||||
@@ -9414,7 +9414,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "query"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -9480,7 +9480,7 @@ dependencies = [
|
||||
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
|
||||
"statrs",
|
||||
"store-api",
|
||||
"substrait 0.14.0",
|
||||
"substrait 0.14.2",
|
||||
"table",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
@@ -10830,7 +10830,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "servers"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -10950,7 +10950,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "session"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -11275,7 +11275,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "sql"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"chrono",
|
||||
@@ -11330,7 +11330,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "sqlness-runner"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"clap 4.5.19",
|
||||
@@ -11649,7 +11649,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "store-api"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"aquamarine",
|
||||
@@ -11798,7 +11798,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "substrait"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
@@ -11978,7 +11978,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "table"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -12229,7 +12229,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
|
||||
|
||||
[[package]]
|
||||
name = "tests-fuzz"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"arbitrary",
|
||||
"async-trait",
|
||||
@@ -12273,7 +12273,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tests-integration"
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow-flight",
|
||||
@@ -12340,7 +12340,7 @@ dependencies = [
|
||||
"sql",
|
||||
"sqlx",
|
||||
"store-api",
|
||||
"substrait 0.14.0",
|
||||
"substrait 0.14.2",
|
||||
"table",
|
||||
"tempfile",
|
||||
"time",
|
||||
|
||||
@@ -68,7 +68,7 @@ members = [
|
||||
resolver = "2"
|
||||
|
||||
[workspace.package]
|
||||
version = "0.14.0"
|
||||
version = "0.14.2"
|
||||
edition = "2021"
|
||||
license = "Apache-2.0"
|
||||
|
||||
@@ -162,7 +162,7 @@ paste = "1.0"
|
||||
pin-project = "1.0"
|
||||
prometheus = { version = "0.13.3", features = ["process"] }
|
||||
promql-parser = { version = "0.5.1", features = ["ser"] }
|
||||
prost = "0.13"
|
||||
prost = { version = "0.13", features = ["no-recursion-limit"] }
|
||||
raft-engine = { version = "0.4.1", default-features = false }
|
||||
rand = "0.9"
|
||||
ratelimit = "0.10"
|
||||
|
||||
@@ -36,8 +36,8 @@ use common_grpc::flight::{FlightDecoder, FlightMessage};
|
||||
use common_query::Output;
|
||||
use common_recordbatch::error::ExternalSnafu;
|
||||
use common_recordbatch::RecordBatchStreamWrapper;
|
||||
use common_telemetry::error;
|
||||
use common_telemetry::tracing_context::W3cTrace;
|
||||
use common_telemetry::{error, warn};
|
||||
use futures::future;
|
||||
use futures_util::{Stream, StreamExt, TryStreamExt};
|
||||
use prost::Message;
|
||||
@@ -192,6 +192,36 @@ impl Database {
|
||||
from_grpc_response(response)
|
||||
}
|
||||
|
||||
/// Retry if connection fails, max_retries is the max number of retries, so the total wait time
|
||||
/// is `max_retries * GRPC_CONN_TIMEOUT`
|
||||
pub async fn handle_with_retry(&self, request: Request, max_retries: u32) -> Result<u32> {
|
||||
let mut client = make_database_client(&self.client)?.inner;
|
||||
let mut retries = 0;
|
||||
let request = self.to_rpc_request(request);
|
||||
loop {
|
||||
let raw_response = client.handle(request.clone()).await;
|
||||
match (raw_response, retries < max_retries) {
|
||||
(Ok(resp), _) => return from_grpc_response(resp.into_inner()),
|
||||
(Err(err), true) => {
|
||||
// determine if the error is retryable
|
||||
if is_grpc_retryable(&err) {
|
||||
// retry
|
||||
retries += 1;
|
||||
warn!("Retrying {} times with error = {:?}", retries, err);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
(Err(err), false) => {
|
||||
error!(
|
||||
"Failed to send request to grpc handle after {} retries, error = {:?}",
|
||||
retries, err
|
||||
);
|
||||
return Err(err.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn to_rpc_request(&self, request: Request) -> GreptimeRequest {
|
||||
GreptimeRequest {
|
||||
@@ -368,6 +398,11 @@ impl Database {
|
||||
}
|
||||
}
|
||||
|
||||
/// by grpc standard, only `Unavailable` is retryable, see: https://github.com/grpc/grpc/blob/master/doc/statuscodes.md#status-codes-and-their-use-in-grpc
|
||||
pub fn is_grpc_retryable(err: &tonic::Status) -> bool {
|
||||
matches!(err.code(), tonic::Code::Unavailable)
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Clone)]
|
||||
struct FlightContext {
|
||||
auth_header: Option<AuthHeader>,
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt;
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
@@ -131,7 +132,7 @@ impl SubCommand {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Parser)]
|
||||
#[derive(Default, Parser)]
|
||||
pub struct StartCommand {
|
||||
/// The address to bind the gRPC server.
|
||||
#[clap(long, alias = "bind-addr")]
|
||||
@@ -171,6 +172,27 @@ pub struct StartCommand {
|
||||
backend: Option<BackendImpl>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for StartCommand {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("StartCommand")
|
||||
.field("rpc_bind_addr", &self.rpc_bind_addr)
|
||||
.field("rpc_server_addr", &self.rpc_server_addr)
|
||||
.field("store_addrs", &self.sanitize_store_addrs())
|
||||
.field("config_file", &self.config_file)
|
||||
.field("selector", &self.selector)
|
||||
.field("use_memory_store", &self.use_memory_store)
|
||||
.field("enable_region_failover", &self.enable_region_failover)
|
||||
.field("http_addr", &self.http_addr)
|
||||
.field("http_timeout", &self.http_timeout)
|
||||
.field("env_prefix", &self.env_prefix)
|
||||
.field("data_home", &self.data_home)
|
||||
.field("store_key_prefix", &self.store_key_prefix)
|
||||
.field("max_txn_ops", &self.max_txn_ops)
|
||||
.field("backend", &self.backend)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl StartCommand {
|
||||
pub fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
|
||||
let mut opts = MetasrvOptions::load_layered_options(
|
||||
@@ -184,6 +206,15 @@ impl StartCommand {
|
||||
Ok(opts)
|
||||
}
|
||||
|
||||
fn sanitize_store_addrs(&self) -> Option<Vec<String>> {
|
||||
self.store_addrs.as_ref().map(|addrs| {
|
||||
addrs
|
||||
.iter()
|
||||
.map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
|
||||
.collect()
|
||||
})
|
||||
}
|
||||
|
||||
// The precedence order is: cli > config file > environment variables > default values.
|
||||
fn merge_with_cli_options(
|
||||
&self,
|
||||
|
||||
@@ -13,10 +13,8 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
mod greatest;
|
||||
mod to_unixtime;
|
||||
|
||||
use greatest::GreatestFunction;
|
||||
use to_unixtime::ToUnixtimeFunction;
|
||||
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
@@ -26,6 +24,5 @@ pub(crate) struct TimestampFunction;
|
||||
impl TimestampFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register(Arc::new(ToUnixtimeFunction));
|
||||
registry.register(Arc::new(GreatestFunction));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,328 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt::{self};
|
||||
|
||||
use common_query::error::{
|
||||
self, ArrowComputeSnafu, InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu,
|
||||
};
|
||||
use common_query::prelude::{Signature, Volatility};
|
||||
use datafusion::arrow::compute::kernels::cmp::gt;
|
||||
use datatypes::arrow::array::AsArray;
|
||||
use datatypes::arrow::compute::cast;
|
||||
use datatypes::arrow::compute::kernels::zip;
|
||||
use datatypes::arrow::datatypes::{
|
||||
DataType as ArrowDataType, Date32Type, TimeUnit, TimestampMicrosecondType,
|
||||
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
|
||||
};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::types::TimestampType;
|
||||
use datatypes::vectors::{Helper, VectorRef};
|
||||
use snafu::{ensure, ResultExt};
|
||||
|
||||
use crate::function::{Function, FunctionContext};
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct GreatestFunction;
|
||||
|
||||
const NAME: &str = "greatest";
|
||||
|
||||
macro_rules! gt_time_types {
|
||||
($ty: ident, $columns:expr) => {{
|
||||
let column1 = $columns[0].to_arrow_array();
|
||||
let column2 = $columns[1].to_arrow_array();
|
||||
|
||||
let column1 = column1.as_primitive::<$ty>();
|
||||
let column2 = column2.as_primitive::<$ty>();
|
||||
let boolean_array = gt(&column1, &column2).context(ArrowComputeSnafu)?;
|
||||
|
||||
let result = zip::zip(&boolean_array, &column1, &column2).context(ArrowComputeSnafu)?;
|
||||
Helper::try_into_vector(&result).context(error::FromArrowArraySnafu)
|
||||
}};
|
||||
}
|
||||
|
||||
impl Function for GreatestFunction {
|
||||
fn name(&self) -> &str {
|
||||
NAME
|
||||
}
|
||||
|
||||
fn return_type(&self, input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
|
||||
ensure!(
|
||||
input_types.len() == 2,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect exactly two, have: {}",
|
||||
input_types.len()
|
||||
)
|
||||
}
|
||||
);
|
||||
|
||||
match &input_types[0] {
|
||||
ConcreteDataType::String(_) => Ok(ConcreteDataType::timestamp_millisecond_datatype()),
|
||||
ConcreteDataType::Date(_) => Ok(ConcreteDataType::date_datatype()),
|
||||
ConcreteDataType::Timestamp(ts_type) => Ok(ConcreteDataType::Timestamp(*ts_type)),
|
||||
_ => UnsupportedInputDataTypeSnafu {
|
||||
function: NAME,
|
||||
datatypes: input_types,
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
Signature::uniform(
|
||||
2,
|
||||
vec![
|
||||
ConcreteDataType::string_datatype(),
|
||||
ConcreteDataType::date_datatype(),
|
||||
ConcreteDataType::timestamp_nanosecond_datatype(),
|
||||
ConcreteDataType::timestamp_microsecond_datatype(),
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
ConcreteDataType::timestamp_second_datatype(),
|
||||
],
|
||||
Volatility::Immutable,
|
||||
)
|
||||
}
|
||||
|
||||
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
|
||||
ensure!(
|
||||
columns.len() == 2,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect exactly two, have: {}",
|
||||
columns.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
match columns[0].data_type() {
|
||||
ConcreteDataType::String(_) => {
|
||||
let column1 = cast(
|
||||
&columns[0].to_arrow_array(),
|
||||
&ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
|
||||
)
|
||||
.context(ArrowComputeSnafu)?;
|
||||
let column1 = column1.as_primitive::<TimestampMillisecondType>();
|
||||
let column2 = cast(
|
||||
&columns[1].to_arrow_array(),
|
||||
&ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
|
||||
)
|
||||
.context(ArrowComputeSnafu)?;
|
||||
let column2 = column2.as_primitive::<TimestampMillisecondType>();
|
||||
let boolean_array = gt(&column1, &column2).context(ArrowComputeSnafu)?;
|
||||
let result =
|
||||
zip::zip(&boolean_array, &column1, &column2).context(ArrowComputeSnafu)?;
|
||||
Ok(Helper::try_into_vector(&result).context(error::FromArrowArraySnafu)?)
|
||||
}
|
||||
ConcreteDataType::Date(_) => gt_time_types!(Date32Type, columns),
|
||||
ConcreteDataType::Timestamp(ts_type) => match ts_type {
|
||||
TimestampType::Second(_) => gt_time_types!(TimestampSecondType, columns),
|
||||
TimestampType::Millisecond(_) => {
|
||||
gt_time_types!(TimestampMillisecondType, columns)
|
||||
}
|
||||
TimestampType::Microsecond(_) => {
|
||||
gt_time_types!(TimestampMicrosecondType, columns)
|
||||
}
|
||||
TimestampType::Nanosecond(_) => {
|
||||
gt_time_types!(TimestampNanosecondType, columns)
|
||||
}
|
||||
},
|
||||
_ => UnsupportedInputDataTypeSnafu {
|
||||
function: NAME,
|
||||
datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for GreatestFunction {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "GREATEST")
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use common_time::{Date, Timestamp};
|
||||
use datatypes::types::{
|
||||
DateType, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
|
||||
TimestampSecondType,
|
||||
};
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{
|
||||
DateVector, StringVector, TimestampMicrosecondVector, TimestampMillisecondVector,
|
||||
TimestampNanosecondVector, TimestampSecondVector, Vector,
|
||||
};
|
||||
use paste::paste;
|
||||
|
||||
use super::*;
|
||||
#[test]
|
||||
fn test_greatest_takes_string_vector() {
|
||||
let function = GreatestFunction;
|
||||
assert_eq!(
|
||||
function
|
||||
.return_type(&[
|
||||
ConcreteDataType::string_datatype(),
|
||||
ConcreteDataType::string_datatype()
|
||||
])
|
||||
.unwrap(),
|
||||
ConcreteDataType::timestamp_millisecond_datatype()
|
||||
);
|
||||
let columns = vec![
|
||||
Arc::new(StringVector::from(vec![
|
||||
"1970-01-01".to_string(),
|
||||
"2012-12-23".to_string(),
|
||||
])) as _,
|
||||
Arc::new(StringVector::from(vec![
|
||||
"2001-02-01".to_string(),
|
||||
"1999-01-01".to_string(),
|
||||
])) as _,
|
||||
];
|
||||
|
||||
let result = function
|
||||
.eval(&FunctionContext::default(), &columns)
|
||||
.unwrap();
|
||||
let result = result
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondVector>()
|
||||
.unwrap();
|
||||
assert_eq!(result.len(), 2);
|
||||
assert_eq!(
|
||||
result.get(0),
|
||||
Value::Timestamp(Timestamp::from_str("2001-02-01 00:00:00", None).unwrap())
|
||||
);
|
||||
assert_eq!(
|
||||
result.get(1),
|
||||
Value::Timestamp(Timestamp::from_str("2012-12-23 00:00:00", None).unwrap())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_greatest_takes_date_vector() {
|
||||
let function = GreatestFunction;
|
||||
assert_eq!(
|
||||
function
|
||||
.return_type(&[
|
||||
ConcreteDataType::date_datatype(),
|
||||
ConcreteDataType::date_datatype()
|
||||
])
|
||||
.unwrap(),
|
||||
ConcreteDataType::Date(DateType)
|
||||
);
|
||||
|
||||
let columns = vec![
|
||||
Arc::new(DateVector::from_slice(vec![-1, 2])) as _,
|
||||
Arc::new(DateVector::from_slice(vec![0, 1])) as _,
|
||||
];
|
||||
|
||||
let result = function
|
||||
.eval(&FunctionContext::default(), &columns)
|
||||
.unwrap();
|
||||
let result = result.as_any().downcast_ref::<DateVector>().unwrap();
|
||||
assert_eq!(result.len(), 2);
|
||||
assert_eq!(
|
||||
result.get(0),
|
||||
Value::Date(Date::from_str_utc("1970-01-01").unwrap())
|
||||
);
|
||||
assert_eq!(
|
||||
result.get(1),
|
||||
Value::Date(Date::from_str_utc("1970-01-03").unwrap())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_greatest_takes_datetime_vector() {
|
||||
let function = GreatestFunction;
|
||||
assert_eq!(
|
||||
function
|
||||
.return_type(&[
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
ConcreteDataType::timestamp_millisecond_datatype()
|
||||
])
|
||||
.unwrap(),
|
||||
ConcreteDataType::timestamp_millisecond_datatype()
|
||||
);
|
||||
|
||||
let columns = vec![
|
||||
Arc::new(TimestampMillisecondVector::from_slice(vec![-1, 2])) as _,
|
||||
Arc::new(TimestampMillisecondVector::from_slice(vec![0, 1])) as _,
|
||||
];
|
||||
|
||||
let result = function
|
||||
.eval(&FunctionContext::default(), &columns)
|
||||
.unwrap();
|
||||
let result = result
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondVector>()
|
||||
.unwrap();
|
||||
assert_eq!(result.len(), 2);
|
||||
assert_eq!(
|
||||
result.get(0),
|
||||
Value::Timestamp(Timestamp::from_str("1970-01-01 00:00:00", None).unwrap())
|
||||
);
|
||||
assert_eq!(
|
||||
result.get(1),
|
||||
Value::Timestamp(Timestamp::from_str("1970-01-01 00:00:00.002", None).unwrap())
|
||||
);
|
||||
}
|
||||
|
||||
macro_rules! test_timestamp {
|
||||
($type: expr,$unit: ident) => {
|
||||
paste! {
|
||||
#[test]
|
||||
fn [<test_greatest_takes_ $unit:lower _vector>]() {
|
||||
let function = GreatestFunction;
|
||||
assert_eq!(
|
||||
function.return_type(&[$type, $type]).unwrap(),
|
||||
ConcreteDataType::Timestamp(TimestampType::$unit([<Timestamp $unit Type>]))
|
||||
);
|
||||
|
||||
let columns = vec![
|
||||
Arc::new([<Timestamp $unit Vector>]::from_slice(vec![-1, 2])) as _,
|
||||
Arc::new([<Timestamp $unit Vector>]::from_slice(vec![0, 1])) as _,
|
||||
];
|
||||
|
||||
let result = function.eval(&FunctionContext::default(), &columns).unwrap();
|
||||
let result = result.as_any().downcast_ref::<[<Timestamp $unit Vector>]>().unwrap();
|
||||
assert_eq!(result.len(), 2);
|
||||
assert_eq!(
|
||||
result.get(0),
|
||||
Value::Timestamp(Timestamp::new(0, TimeUnit::$unit))
|
||||
);
|
||||
assert_eq!(
|
||||
result.get(1),
|
||||
Value::Timestamp(Timestamp::new(2, TimeUnit::$unit))
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test_timestamp!(
|
||||
ConcreteDataType::timestamp_nanosecond_datatype(),
|
||||
Nanosecond
|
||||
);
|
||||
test_timestamp!(
|
||||
ConcreteDataType::timestamp_microsecond_datatype(),
|
||||
Microsecond
|
||||
);
|
||||
test_timestamp!(
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
Millisecond
|
||||
);
|
||||
test_timestamp!(ConcreteDataType::timestamp_second_datatype(), Second);
|
||||
}
|
||||
@@ -35,7 +35,7 @@ pub mod memory;
|
||||
pub mod rds;
|
||||
pub mod test;
|
||||
pub mod txn;
|
||||
|
||||
pub mod util;
|
||||
pub type KvBackendRef<E = Error> = Arc<dyn KvBackend<Error = E> + Send + Sync>;
|
||||
|
||||
#[async_trait]
|
||||
|
||||
85
src/common/meta/src/kv_backend/util.rs
Normal file
85
src/common/meta/src/kv_backend/util.rs
Normal file
@@ -0,0 +1,85 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
/// Removes sensitive information like passwords from connection strings.
|
||||
///
|
||||
/// This function sanitizes connection strings by removing credentials:
|
||||
/// - For URL format (mysql://user:password@host:port/db): Removes everything before '@'
|
||||
/// - For parameter format (host=localhost password=secret): Removes the password parameter
|
||||
/// - For URL format without credentials (mysql://host:port/db): Removes the protocol prefix
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `conn_str` - The connection string to sanitize
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// A sanitized version of the connection string with sensitive information removed
|
||||
pub fn sanitize_connection_string(conn_str: &str) -> String {
|
||||
// Case 1: URL format with credentials (mysql://user:password@host:port/db)
|
||||
// Extract everything after the '@' symbol
|
||||
if let Some(at_pos) = conn_str.find('@') {
|
||||
return conn_str[at_pos + 1..].to_string();
|
||||
}
|
||||
|
||||
// Case 2: Parameter format with password (host=localhost password=secret dbname=mydb)
|
||||
// Filter out any parameter that starts with "password="
|
||||
if conn_str.contains("password=") {
|
||||
return conn_str
|
||||
.split_whitespace()
|
||||
.filter(|param| !param.starts_with("password="))
|
||||
.collect::<Vec<_>>()
|
||||
.join(" ");
|
||||
}
|
||||
|
||||
// Case 3: URL format without credentials (mysql://host:port/db)
|
||||
// Extract everything after the protocol prefix
|
||||
if let Some(host_part) = conn_str.split("://").nth(1) {
|
||||
return host_part.to_string();
|
||||
}
|
||||
|
||||
// Case 4: Already sanitized or unknown format
|
||||
// Return as is
|
||||
conn_str.to_string()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_sanitize_connection_string() {
|
||||
// Test URL format with username/password
|
||||
let conn_str = "mysql://user:password123@localhost:3306/db";
|
||||
assert_eq!(sanitize_connection_string(conn_str), "localhost:3306/db");
|
||||
|
||||
// Test URL format without credentials
|
||||
let conn_str = "mysql://localhost:3306/db";
|
||||
assert_eq!(sanitize_connection_string(conn_str), "localhost:3306/db");
|
||||
|
||||
// Test parameter format with password
|
||||
let conn_str = "host=localhost port=5432 user=postgres password=secret dbname=mydb";
|
||||
assert_eq!(
|
||||
sanitize_connection_string(conn_str),
|
||||
"host=localhost port=5432 user=postgres dbname=mydb"
|
||||
);
|
||||
|
||||
// Test parameter format without password
|
||||
let conn_str = "host=localhost port=5432 user=postgres dbname=mydb";
|
||||
assert_eq!(
|
||||
sanitize_connection_string(conn_str),
|
||||
"host=localhost port=5432 user=postgres dbname=mydb"
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -398,45 +398,46 @@ impl DatanodeBuilder {
|
||||
schema_metadata_manager: SchemaMetadataManagerRef,
|
||||
plugins: Plugins,
|
||||
) -> Result<Vec<RegionEngineRef>> {
|
||||
let mut engines = vec![];
|
||||
let mut metric_engine_config = opts.region_engine.iter().find_map(|c| match c {
|
||||
RegionEngineConfig::Metric(config) => Some(config.clone()),
|
||||
_ => None,
|
||||
});
|
||||
let mut metric_engine_config = metric_engine::config::EngineConfig::default();
|
||||
let mut mito_engine_config = MitoConfig::default();
|
||||
let mut file_engine_config = file_engine::config::EngineConfig::default();
|
||||
|
||||
for engine in &opts.region_engine {
|
||||
match engine {
|
||||
RegionEngineConfig::Mito(config) => {
|
||||
let mito_engine = Self::build_mito_engine(
|
||||
opts,
|
||||
object_store_manager.clone(),
|
||||
config.clone(),
|
||||
schema_metadata_manager.clone(),
|
||||
plugins.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let metric_engine = MetricEngine::try_new(
|
||||
mito_engine.clone(),
|
||||
metric_engine_config.take().unwrap_or_default(),
|
||||
)
|
||||
.context(BuildMetricEngineSnafu)?;
|
||||
engines.push(Arc::new(mito_engine) as _);
|
||||
engines.push(Arc::new(metric_engine) as _);
|
||||
mito_engine_config = config.clone();
|
||||
}
|
||||
RegionEngineConfig::File(config) => {
|
||||
let engine = FileRegionEngine::new(
|
||||
config.clone(),
|
||||
object_store_manager.default_object_store().clone(), // TODO: implement custom storage for file engine
|
||||
);
|
||||
engines.push(Arc::new(engine) as _);
|
||||
file_engine_config = config.clone();
|
||||
}
|
||||
RegionEngineConfig::Metric(_) => {
|
||||
// Already handled in `build_mito_engine`.
|
||||
RegionEngineConfig::Metric(metric_config) => {
|
||||
metric_engine_config = metric_config.clone();
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(engines)
|
||||
|
||||
let mito_engine = Self::build_mito_engine(
|
||||
opts,
|
||||
object_store_manager.clone(),
|
||||
mito_engine_config,
|
||||
schema_metadata_manager.clone(),
|
||||
plugins.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let metric_engine = MetricEngine::try_new(mito_engine.clone(), metric_engine_config)
|
||||
.context(BuildMetricEngineSnafu)?;
|
||||
|
||||
let file_engine = FileRegionEngine::new(
|
||||
file_engine_config,
|
||||
object_store_manager.default_object_store().clone(), // TODO: implement custom storage for file engine
|
||||
);
|
||||
|
||||
Ok(vec![
|
||||
Arc::new(mito_engine) as _,
|
||||
Arc::new(metric_engine) as _,
|
||||
Arc::new(file_engine) as _,
|
||||
])
|
||||
}
|
||||
|
||||
/// Builds [MitoEngine] according to options.
|
||||
|
||||
@@ -37,11 +37,12 @@ use tokio::sync::{Mutex, RwLock};
|
||||
|
||||
use crate::adapter::{CreateFlowArgs, StreamingEngine};
|
||||
use crate::batching_mode::engine::BatchingEngine;
|
||||
use crate::batching_mode::{FRONTEND_SCAN_TIMEOUT, MIN_REFRESH_DURATION};
|
||||
use crate::engine::FlowEngine;
|
||||
use crate::error::{
|
||||
CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, IllegalCheckTaskStateSnafu,
|
||||
InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, SyncCheckTaskSnafu,
|
||||
UnexpectedSnafu,
|
||||
InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, NoAvailableFrontendSnafu,
|
||||
SyncCheckTaskSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::metrics::METRIC_FLOW_TASK_COUNT;
|
||||
use crate::repr::{self, DiffRow};
|
||||
@@ -81,6 +82,11 @@ impl FlowDualEngine {
|
||||
}
|
||||
}
|
||||
|
||||
/// Determine if the engine is in distributed mode
|
||||
pub fn is_distributed(&self) -> bool {
|
||||
self.streaming_engine.node_id.is_some()
|
||||
}
|
||||
|
||||
pub fn streaming_engine(&self) -> Arc<StreamingEngine> {
|
||||
self.streaming_engine.clone()
|
||||
}
|
||||
@@ -89,6 +95,39 @@ impl FlowDualEngine {
|
||||
self.batching_engine.clone()
|
||||
}
|
||||
|
||||
/// In distributed mode, scan periodically(1s) until available frontend is found, or timeout,
|
||||
/// in standalone mode, return immediately
|
||||
/// notice here if any frontend appear in cluster info this function will return immediately
|
||||
async fn wait_for_available_frontend(&self, timeout: std::time::Duration) -> Result<(), Error> {
|
||||
if !self.is_distributed() {
|
||||
return Ok(());
|
||||
}
|
||||
let frontend_client = self.batching_engine().frontend_client.clone();
|
||||
let sleep_duration = std::time::Duration::from_millis(1_000);
|
||||
let now = std::time::Instant::now();
|
||||
loop {
|
||||
let frontend_list = frontend_client.scan_for_frontend().await?;
|
||||
if !frontend_list.is_empty() {
|
||||
let fe_list = frontend_list
|
||||
.iter()
|
||||
.map(|(_, info)| &info.peer.addr)
|
||||
.collect::<Vec<_>>();
|
||||
info!("Available frontend found: {:?}", fe_list);
|
||||
return Ok(());
|
||||
}
|
||||
let elapsed = now.elapsed();
|
||||
tokio::time::sleep(sleep_duration).await;
|
||||
info!("Waiting for available frontend, elapsed={:?}", elapsed);
|
||||
if elapsed >= timeout {
|
||||
return NoAvailableFrontendSnafu {
|
||||
timeout,
|
||||
context: "No available frontend found in cluster info",
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to sync with check task, this is only used in drop flow&flush flow, so a flow id is required
|
||||
///
|
||||
/// the need to sync is to make sure flush flow actually get called
|
||||
@@ -338,18 +377,36 @@ struct ConsistentCheckTask {
|
||||
|
||||
impl ConsistentCheckTask {
|
||||
async fn start_check_task(engine: &Arc<FlowDualEngine>) -> Result<Self, Error> {
|
||||
// first do recover flows
|
||||
engine.check_flow_consistent(true, false).await?;
|
||||
|
||||
let inner = engine.clone();
|
||||
let engine = engine.clone();
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
|
||||
let (trigger_tx, mut trigger_rx) =
|
||||
tokio::sync::mpsc::channel::<(bool, bool, tokio::sync::oneshot::Sender<()>)>(10);
|
||||
let handle = common_runtime::spawn_global(async move {
|
||||
// first check if available frontend is found
|
||||
if let Err(err) = engine
|
||||
.wait_for_available_frontend(FRONTEND_SCAN_TIMEOUT)
|
||||
.await
|
||||
{
|
||||
warn!("No frontend is available yet:\n {err:?}");
|
||||
}
|
||||
|
||||
// then do recover flows, if failed, always retry
|
||||
let mut recover_retry = 0;
|
||||
while let Err(err) = engine.check_flow_consistent(true, false).await {
|
||||
recover_retry += 1;
|
||||
error!(
|
||||
"Failed to recover flows:\n {err:?}, retry {} in {}s",
|
||||
recover_retry,
|
||||
MIN_REFRESH_DURATION.as_secs()
|
||||
);
|
||||
tokio::time::sleep(MIN_REFRESH_DURATION).await;
|
||||
}
|
||||
|
||||
// then do check flows, with configurable allow_create and allow_drop
|
||||
let (mut allow_create, mut allow_drop) = (false, false);
|
||||
let mut ret_signal: Option<tokio::sync::oneshot::Sender<()>> = None;
|
||||
loop {
|
||||
if let Err(err) = inner.check_flow_consistent(allow_create, allow_drop).await {
|
||||
if let Err(err) = engine.check_flow_consistent(allow_create, allow_drop).await {
|
||||
error!(err; "Failed to check flow consistent");
|
||||
}
|
||||
if let Some(done) = ret_signal.take() {
|
||||
@@ -534,7 +591,12 @@ impl FlowEngine for FlowDualEngine {
|
||||
match flow_type {
|
||||
Some(FlowType::Batching) => self.batching_engine.flush_flow(flow_id).await,
|
||||
Some(FlowType::Streaming) => self.streaming_engine.flush_flow(flow_id).await,
|
||||
None => Ok(0),
|
||||
None => {
|
||||
warn!(
|
||||
"Currently flow={flow_id} doesn't exist in flownode, ignore flush_flow request"
|
||||
);
|
||||
Ok(0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -31,10 +31,19 @@ pub const DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT: Duration = Duration::from_secs(
|
||||
pub const SLOW_QUERY_THRESHOLD: Duration = Duration::from_secs(60);
|
||||
|
||||
/// The minimum duration between two queries execution by batching mode task
|
||||
const MIN_REFRESH_DURATION: Duration = Duration::new(5, 0);
|
||||
pub const MIN_REFRESH_DURATION: Duration = Duration::new(5, 0);
|
||||
|
||||
/// Grpc connection timeout
|
||||
const GRPC_CONN_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
/// Grpc max retry number
|
||||
const GRPC_MAX_RETRIES: u32 = 3;
|
||||
|
||||
/// Flow wait for available frontend timeout,
|
||||
/// if failed to find available frontend after FRONTEND_SCAN_TIMEOUT elapsed, return error
|
||||
/// which should prevent flownode from starting
|
||||
pub const FRONTEND_SCAN_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
|
||||
/// Frontend activity timeout
|
||||
/// if frontend is down(not sending heartbeat) for more than FRONTEND_ACTIVITY_TIMEOUT, it will be removed from the list that flownode use to connect
|
||||
pub const FRONTEND_ACTIVITY_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
@@ -49,7 +49,8 @@ use crate::{CreateFlowArgs, Error, FlowId, TableName};
|
||||
pub struct BatchingEngine {
|
||||
tasks: RwLock<BTreeMap<FlowId, BatchingTask>>,
|
||||
shutdown_txs: RwLock<BTreeMap<FlowId, oneshot::Sender<()>>>,
|
||||
frontend_client: Arc<FrontendClient>,
|
||||
/// frontend client for insert request
|
||||
pub(crate) frontend_client: Arc<FrontendClient>,
|
||||
flow_metadata_manager: FlowMetadataManagerRef,
|
||||
table_meta: TableMetadataManagerRef,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
|
||||
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::SystemTime;
|
||||
|
||||
use api::v1::greptime_request::Request;
|
||||
use api::v1::CreateTableExpr;
|
||||
@@ -26,15 +27,17 @@ use common_meta::peer::Peer;
|
||||
use common_meta::rpc::store::RangeRequest;
|
||||
use common_query::Output;
|
||||
use common_telemetry::warn;
|
||||
use itertools::Itertools;
|
||||
use meta_client::client::MetaClient;
|
||||
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::batching_mode::{
|
||||
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, GRPC_CONN_TIMEOUT, GRPC_MAX_RETRIES,
|
||||
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT,
|
||||
GRPC_MAX_RETRIES,
|
||||
};
|
||||
use crate::error::{ExternalSnafu, InvalidRequestSnafu, UnexpectedSnafu};
|
||||
use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
|
||||
use crate::Error;
|
||||
|
||||
/// Just like [`GrpcQueryHandler`] but use BoxedError
|
||||
@@ -127,10 +130,24 @@ impl DatabaseWithPeer {
|
||||
fn new(database: Database, peer: Peer) -> Self {
|
||||
Self { database, peer }
|
||||
}
|
||||
|
||||
/// Try sending a "SELECT 1" to the database
|
||||
async fn try_select_one(&self) -> Result<(), Error> {
|
||||
// notice here use `sql` for `SELECT 1` return 1 row
|
||||
let _ = self
|
||||
.database
|
||||
.sql("SELECT 1")
|
||||
.await
|
||||
.with_context(|_| InvalidRequestSnafu {
|
||||
context: format!("Failed to handle `SELECT 1` request at {:?}", self.peer),
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl FrontendClient {
|
||||
async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
|
||||
/// scan for available frontend from metadata
|
||||
pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
|
||||
let Self::Distributed { meta_client, .. } = self else {
|
||||
return Ok(vec![]);
|
||||
};
|
||||
@@ -160,8 +177,8 @@ impl FrontendClient {
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
/// Get the database with max `last_activity_ts`
|
||||
async fn get_last_active_frontend(
|
||||
/// Get the database with maximum `last_activity_ts`& is able to process query
|
||||
async fn get_latest_active_frontend(
|
||||
&self,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
@@ -177,22 +194,50 @@ impl FrontendClient {
|
||||
.fail();
|
||||
};
|
||||
|
||||
let frontends = self.scan_for_frontend().await?;
|
||||
let mut peer = None;
|
||||
let mut interval = tokio::time::interval(GRPC_CONN_TIMEOUT);
|
||||
interval.tick().await;
|
||||
for retry in 0..GRPC_MAX_RETRIES {
|
||||
let frontends = self.scan_for_frontend().await?;
|
||||
let now_in_ms = SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis() as i64;
|
||||
|
||||
if let Some((_, val)) = frontends.iter().max_by_key(|(_, val)| val.last_activity_ts) {
|
||||
peer = Some(val.peer.clone());
|
||||
// found node with maximum last_activity_ts
|
||||
for (_, node_info) in frontends
|
||||
.iter()
|
||||
.sorted_by_key(|(_, node_info)| node_info.last_activity_ts)
|
||||
.rev()
|
||||
// filter out frontend that have been down for more than 1 min
|
||||
.filter(|(_, node_info)| {
|
||||
node_info.last_activity_ts + FRONTEND_ACTIVITY_TIMEOUT.as_millis() as i64
|
||||
> now_in_ms
|
||||
})
|
||||
{
|
||||
let addr = &node_info.peer.addr;
|
||||
let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
|
||||
let database = Database::new(catalog, schema, client);
|
||||
let db = DatabaseWithPeer::new(database, node_info.peer.clone());
|
||||
match db.try_select_one().await {
|
||||
Ok(_) => return Ok(db),
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Failed to connect to frontend {} on retry={}: \n{e:?}",
|
||||
addr, retry
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
// no available frontend
|
||||
// sleep and retry
|
||||
interval.tick().await;
|
||||
}
|
||||
|
||||
let Some(peer) = peer else {
|
||||
UnexpectedSnafu {
|
||||
reason: format!("No frontend available: {:?}", frontends),
|
||||
}
|
||||
.fail()?
|
||||
};
|
||||
let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![peer.addr.clone()]);
|
||||
let database = Database::new(catalog, schema, client);
|
||||
Ok(DatabaseWithPeer::new(database, peer))
|
||||
NoAvailableFrontendSnafu {
|
||||
timeout: GRPC_CONN_TIMEOUT,
|
||||
context: "No available frontend found that is able to process query",
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
|
||||
pub async fn create(
|
||||
@@ -222,38 +267,18 @@ impl FrontendClient {
|
||||
) -> Result<u32, Error> {
|
||||
match self {
|
||||
FrontendClient::Distributed { .. } => {
|
||||
let db = self.get_last_active_frontend(catalog, schema).await?;
|
||||
let db = self.get_latest_active_frontend(catalog, schema).await?;
|
||||
|
||||
*peer_desc = Some(PeerDesc::Dist {
|
||||
peer: db.peer.clone(),
|
||||
});
|
||||
|
||||
let mut retry = 0;
|
||||
|
||||
loop {
|
||||
let ret = db.database.handle(req.clone()).await.with_context(|_| {
|
||||
InvalidRequestSnafu {
|
||||
context: format!("Failed to handle request: {:?}", req),
|
||||
}
|
||||
});
|
||||
if let Err(err) = ret {
|
||||
if retry < GRPC_MAX_RETRIES {
|
||||
retry += 1;
|
||||
warn!(
|
||||
"Failed to send request to grpc handle at Peer={:?}, retry = {}, error = {:?}",
|
||||
db.peer, retry, err
|
||||
);
|
||||
continue;
|
||||
} else {
|
||||
common_telemetry::error!(
|
||||
"Failed to send request to grpc handle at Peer={:?} after {} retries, error = {:?}",
|
||||
db.peer, retry, err
|
||||
);
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
db.database
|
||||
.handle_with_retry(req.clone(), GRPC_MAX_RETRIES)
|
||||
.await
|
||||
.with_context(|_| InvalidRequestSnafu {
|
||||
context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
|
||||
})
|
||||
}
|
||||
FrontendClient::Standalone { database_client } => {
|
||||
let ctx = QueryContextBuilder::default()
|
||||
|
||||
@@ -61,6 +61,16 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"No available frontend found after timeout: {timeout:?}, context: {context}"
|
||||
))]
|
||||
NoAvailableFrontend {
|
||||
timeout: std::time::Duration,
|
||||
context: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("External error"))]
|
||||
External {
|
||||
source: BoxedError,
|
||||
@@ -296,7 +306,8 @@ impl ErrorExt for Error {
|
||||
Self::Eval { .. }
|
||||
| Self::JoinTask { .. }
|
||||
| Self::Datafusion { .. }
|
||||
| Self::InsertIntoFlow { .. } => StatusCode::Internal,
|
||||
| Self::InsertIntoFlow { .. }
|
||||
| Self::NoAvailableFrontend { .. } => StatusCode::Internal,
|
||||
Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists,
|
||||
Self::TableNotFound { .. }
|
||||
| Self::TableNotFoundMeta { .. }
|
||||
|
||||
@@ -172,6 +172,8 @@ impl FlownodeServer {
|
||||
}
|
||||
|
||||
/// Start the background task for streaming computation.
|
||||
///
|
||||
/// Should be called only after heartbeat is establish, hence can get cluster info
|
||||
async fn start_workers(&self) -> Result<(), Error> {
|
||||
let manager_ref = self.inner.flow_service.dual_engine.clone();
|
||||
let handle = manager_ref
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
pub mod builder;
|
||||
|
||||
use std::fmt::Display;
|
||||
use std::fmt::{self, Display};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
use std::time::Duration;
|
||||
@@ -96,7 +96,7 @@ pub enum BackendImpl {
|
||||
MysqlStore,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
#[derive(Clone, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct MetasrvOptions {
|
||||
/// The address the server listens on.
|
||||
@@ -166,6 +166,47 @@ pub struct MetasrvOptions {
|
||||
pub node_max_idle_time: Duration,
|
||||
}
|
||||
|
||||
impl fmt::Debug for MetasrvOptions {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
let mut debug_struct = f.debug_struct("MetasrvOptions");
|
||||
debug_struct
|
||||
.field("bind_addr", &self.bind_addr)
|
||||
.field("server_addr", &self.server_addr)
|
||||
.field("store_addrs", &self.sanitize_store_addrs())
|
||||
.field("selector", &self.selector)
|
||||
.field("use_memory_store", &self.use_memory_store)
|
||||
.field("enable_region_failover", &self.enable_region_failover)
|
||||
.field(
|
||||
"allow_region_failover_on_local_wal",
|
||||
&self.allow_region_failover_on_local_wal,
|
||||
)
|
||||
.field("http", &self.http)
|
||||
.field("logging", &self.logging)
|
||||
.field("procedure", &self.procedure)
|
||||
.field("failure_detector", &self.failure_detector)
|
||||
.field("datanode", &self.datanode)
|
||||
.field("enable_telemetry", &self.enable_telemetry)
|
||||
.field("data_home", &self.data_home)
|
||||
.field("wal", &self.wal)
|
||||
.field("export_metrics", &self.export_metrics)
|
||||
.field("store_key_prefix", &self.store_key_prefix)
|
||||
.field("max_txn_ops", &self.max_txn_ops)
|
||||
.field("flush_stats_factor", &self.flush_stats_factor)
|
||||
.field("tracing", &self.tracing)
|
||||
.field("backend", &self.backend);
|
||||
|
||||
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
|
||||
debug_struct.field("meta_table_name", &self.meta_table_name);
|
||||
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
debug_struct.field("meta_election_lock_id", &self.meta_election_lock_id);
|
||||
|
||||
debug_struct
|
||||
.field("node_max_idle_time", &self.node_max_idle_time)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
|
||||
|
||||
impl Default for MetasrvOptions {
|
||||
@@ -249,6 +290,13 @@ impl MetasrvOptions {
|
||||
common_telemetry::debug!("detect local IP is not supported on Android");
|
||||
}
|
||||
}
|
||||
|
||||
fn sanitize_store_addrs(&self) -> Vec<String> {
|
||||
self.store_addrs
|
||||
.iter()
|
||||
.map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MetasrvInfo {
|
||||
|
||||
@@ -302,7 +302,10 @@ impl PartitionTreeMemtable {
|
||||
fn update_stats(&self, metrics: &WriteMetrics) {
|
||||
// Only let the tracker tracks value bytes.
|
||||
self.alloc_tracker.on_allocation(metrics.value_bytes);
|
||||
metrics.update_timestamp_range(&self.max_timestamp, &self.min_timestamp);
|
||||
self.max_timestamp
|
||||
.fetch_max(metrics.max_ts, Ordering::SeqCst);
|
||||
self.min_timestamp
|
||||
.fetch_min(metrics.min_ts, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -14,8 +14,6 @@
|
||||
|
||||
//! Internal metrics of the memtable.
|
||||
|
||||
use std::sync::atomic::{AtomicI64, Ordering};
|
||||
|
||||
/// Metrics of writing memtables.
|
||||
pub(crate) struct WriteMetrics {
|
||||
/// Size allocated by keys.
|
||||
@@ -28,51 +26,6 @@ pub(crate) struct WriteMetrics {
|
||||
pub(crate) max_ts: i64,
|
||||
}
|
||||
|
||||
impl WriteMetrics {
|
||||
/// Update the min/max timestamp range according to current write metric.
|
||||
pub(crate) fn update_timestamp_range(&self, prev_max_ts: &AtomicI64, prev_min_ts: &AtomicI64) {
|
||||
loop {
|
||||
let current_min = prev_min_ts.load(Ordering::Relaxed);
|
||||
if self.min_ts >= current_min {
|
||||
break;
|
||||
}
|
||||
|
||||
let Err(updated) = prev_min_ts.compare_exchange(
|
||||
current_min,
|
||||
self.min_ts,
|
||||
Ordering::Relaxed,
|
||||
Ordering::Relaxed,
|
||||
) else {
|
||||
break;
|
||||
};
|
||||
|
||||
if updated == self.min_ts {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
let current_max = prev_max_ts.load(Ordering::Relaxed);
|
||||
if self.max_ts <= current_max {
|
||||
break;
|
||||
}
|
||||
|
||||
let Err(updated) = prev_max_ts.compare_exchange(
|
||||
current_max,
|
||||
self.max_ts,
|
||||
Ordering::Relaxed,
|
||||
Ordering::Relaxed,
|
||||
) else {
|
||||
break;
|
||||
};
|
||||
|
||||
if updated == self.max_ts {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for WriteMetrics {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
|
||||
@@ -147,7 +147,8 @@ impl TimeSeriesMemtable {
|
||||
fn update_stats(&self, stats: WriteMetrics) {
|
||||
self.alloc_tracker
|
||||
.on_allocation(stats.key_bytes + stats.value_bytes);
|
||||
stats.update_timestamp_range(&self.max_timestamp, &self.min_timestamp);
|
||||
self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
|
||||
self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {
|
||||
|
||||
@@ -322,13 +322,10 @@ impl ScanRegion {
|
||||
let memtables: Vec<_> = memtables
|
||||
.into_iter()
|
||||
.filter(|mem| {
|
||||
if mem.is_empty() {
|
||||
// check if memtable is empty by reading stats.
|
||||
let Some((start, end)) = mem.stats().time_range() else {
|
||||
return false;
|
||||
}
|
||||
let stats = mem.stats();
|
||||
// Safety: the memtable is not empty.
|
||||
let (start, end) = stats.time_range().unwrap();
|
||||
|
||||
};
|
||||
// The time range of the memtable is inclusive.
|
||||
let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
|
||||
memtable_range.intersects(&time_range)
|
||||
|
||||
@@ -134,6 +134,7 @@ impl WriteFormat {
|
||||
|
||||
/// Helper for reading the SST format.
|
||||
pub struct ReadFormat {
|
||||
/// The metadata stored in the SST.
|
||||
metadata: RegionMetadataRef,
|
||||
/// SST file schema.
|
||||
arrow_schema: SchemaRef,
|
||||
@@ -305,17 +306,23 @@ impl ReadFormat {
|
||||
&self,
|
||||
row_groups: &[impl Borrow<RowGroupMetaData>],
|
||||
column_id: ColumnId,
|
||||
) -> Option<ArrayRef> {
|
||||
let column = self.metadata.column_by_id(column_id)?;
|
||||
) -> StatValues {
|
||||
let Some(column) = self.metadata.column_by_id(column_id) else {
|
||||
// No such column in the SST.
|
||||
return StatValues::NoColumn;
|
||||
};
|
||||
match column.semantic_type {
|
||||
SemanticType::Tag => self.tag_values(row_groups, column, true),
|
||||
SemanticType::Field => {
|
||||
let index = self.field_id_to_index.get(&column_id)?;
|
||||
Self::column_values(row_groups, column, *index, true)
|
||||
// Safety: `field_id_to_index` is initialized by the semantic type.
|
||||
let index = self.field_id_to_index.get(&column_id).unwrap();
|
||||
let stats = Self::column_values(row_groups, column, *index, true);
|
||||
StatValues::from_stats_opt(stats)
|
||||
}
|
||||
SemanticType::Timestamp => {
|
||||
let index = self.time_index_position();
|
||||
Self::column_values(row_groups, column, index, true)
|
||||
let stats = Self::column_values(row_groups, column, index, true);
|
||||
StatValues::from_stats_opt(stats)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -325,17 +332,23 @@ impl ReadFormat {
|
||||
&self,
|
||||
row_groups: &[impl Borrow<RowGroupMetaData>],
|
||||
column_id: ColumnId,
|
||||
) -> Option<ArrayRef> {
|
||||
let column = self.metadata.column_by_id(column_id)?;
|
||||
) -> StatValues {
|
||||
let Some(column) = self.metadata.column_by_id(column_id) else {
|
||||
// No such column in the SST.
|
||||
return StatValues::NoColumn;
|
||||
};
|
||||
match column.semantic_type {
|
||||
SemanticType::Tag => self.tag_values(row_groups, column, false),
|
||||
SemanticType::Field => {
|
||||
let index = self.field_id_to_index.get(&column_id)?;
|
||||
Self::column_values(row_groups, column, *index, false)
|
||||
// Safety: `field_id_to_index` is initialized by the semantic type.
|
||||
let index = self.field_id_to_index.get(&column_id).unwrap();
|
||||
let stats = Self::column_values(row_groups, column, *index, false);
|
||||
StatValues::from_stats_opt(stats)
|
||||
}
|
||||
SemanticType::Timestamp => {
|
||||
let index = self.time_index_position();
|
||||
Self::column_values(row_groups, column, index, false)
|
||||
let stats = Self::column_values(row_groups, column, index, false);
|
||||
StatValues::from_stats_opt(stats)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -345,17 +358,23 @@ impl ReadFormat {
|
||||
&self,
|
||||
row_groups: &[impl Borrow<RowGroupMetaData>],
|
||||
column_id: ColumnId,
|
||||
) -> Option<ArrayRef> {
|
||||
let column = self.metadata.column_by_id(column_id)?;
|
||||
) -> StatValues {
|
||||
let Some(column) = self.metadata.column_by_id(column_id) else {
|
||||
// No such column in the SST.
|
||||
return StatValues::NoColumn;
|
||||
};
|
||||
match column.semantic_type {
|
||||
SemanticType::Tag => None,
|
||||
SemanticType::Tag => StatValues::NoStats,
|
||||
SemanticType::Field => {
|
||||
let index = self.field_id_to_index.get(&column_id)?;
|
||||
Self::column_null_counts(row_groups, *index)
|
||||
// Safety: `field_id_to_index` is initialized by the semantic type.
|
||||
let index = self.field_id_to_index.get(&column_id).unwrap();
|
||||
let stats = Self::column_null_counts(row_groups, *index);
|
||||
StatValues::from_stats_opt(stats)
|
||||
}
|
||||
SemanticType::Timestamp => {
|
||||
let index = self.time_index_position();
|
||||
Self::column_null_counts(row_groups, index)
|
||||
let stats = Self::column_null_counts(row_groups, index);
|
||||
StatValues::from_stats_opt(stats)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -390,8 +409,7 @@ impl ReadFormat {
|
||||
row_groups: &[impl Borrow<RowGroupMetaData>],
|
||||
column: &ColumnMetadata,
|
||||
is_min: bool,
|
||||
) -> Option<ArrayRef> {
|
||||
let primary_key_encoding = self.metadata.primary_key_encoding;
|
||||
) -> StatValues {
|
||||
let is_first_tag = self
|
||||
.metadata
|
||||
.primary_key
|
||||
@@ -400,9 +418,28 @@ impl ReadFormat {
|
||||
.unwrap_or(false);
|
||||
if !is_first_tag {
|
||||
// Only the min-max of the first tag is available in the primary key.
|
||||
return None;
|
||||
return StatValues::NoStats;
|
||||
}
|
||||
|
||||
StatValues::from_stats_opt(self.first_tag_values(row_groups, column, is_min))
|
||||
}
|
||||
|
||||
/// Returns min/max values of the first tag.
|
||||
/// Returns None if the tag does not have statistics.
|
||||
fn first_tag_values(
|
||||
&self,
|
||||
row_groups: &[impl Borrow<RowGroupMetaData>],
|
||||
column: &ColumnMetadata,
|
||||
is_min: bool,
|
||||
) -> Option<ArrayRef> {
|
||||
debug_assert!(self
|
||||
.metadata
|
||||
.primary_key
|
||||
.first()
|
||||
.map(|id| *id == column.column_id)
|
||||
.unwrap_or(false));
|
||||
|
||||
let primary_key_encoding = self.metadata.primary_key_encoding;
|
||||
let converter = build_primary_key_codec_with_fields(
|
||||
primary_key_encoding,
|
||||
[(
|
||||
@@ -452,6 +489,7 @@ impl ReadFormat {
|
||||
}
|
||||
|
||||
/// Returns min/max values of specific non-tag columns.
|
||||
/// Returns None if the column does not have statistics.
|
||||
fn column_values(
|
||||
row_groups: &[impl Borrow<RowGroupMetaData>],
|
||||
column: &ColumnMetadata,
|
||||
@@ -544,6 +582,29 @@ impl ReadFormat {
|
||||
}
|
||||
}
|
||||
|
||||
/// Values of column statistics of the SST.
|
||||
///
|
||||
/// It also distinguishes the case that a column is not found and
|
||||
/// the column exists but has no statistics.
|
||||
pub enum StatValues {
|
||||
/// Values of each row group.
|
||||
Values(ArrayRef),
|
||||
/// No such column.
|
||||
NoColumn,
|
||||
/// Column exists but has no statistics.
|
||||
NoStats,
|
||||
}
|
||||
|
||||
impl StatValues {
|
||||
/// Creates a new `StatValues` instance from optional statistics.
|
||||
pub fn from_stats_opt(stats: Option<ArrayRef>) -> Self {
|
||||
match stats {
|
||||
Some(stats) => StatValues::Values(stats),
|
||||
None => StatValues::NoStats,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl ReadFormat {
|
||||
/// Creates a helper with existing `metadata` and all columns.
|
||||
|
||||
@@ -25,7 +25,7 @@ use parquet::file::metadata::RowGroupMetaData;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::ColumnId;
|
||||
|
||||
use crate::sst::parquet::format::ReadFormat;
|
||||
use crate::sst::parquet::format::{ReadFormat, StatValues};
|
||||
|
||||
/// Statistics for pruning row groups.
|
||||
pub(crate) struct RowGroupPruningStats<'a, T> {
|
||||
@@ -100,16 +100,18 @@ impl<T: Borrow<RowGroupMetaData>> PruningStatistics for RowGroupPruningStats<'_,
|
||||
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
|
||||
let column_id = self.column_id_to_prune(&column.name)?;
|
||||
match self.read_format.min_values(self.row_groups, column_id) {
|
||||
Some(values) => Some(values),
|
||||
None => self.compat_default_value(&column.name),
|
||||
StatValues::Values(values) => Some(values),
|
||||
StatValues::NoColumn => self.compat_default_value(&column.name),
|
||||
StatValues::NoStats => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
|
||||
let column_id = self.column_id_to_prune(&column.name)?;
|
||||
match self.read_format.max_values(self.row_groups, column_id) {
|
||||
Some(values) => Some(values),
|
||||
None => self.compat_default_value(&column.name),
|
||||
StatValues::Values(values) => Some(values),
|
||||
StatValues::NoColumn => self.compat_default_value(&column.name),
|
||||
StatValues::NoStats => None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -118,10 +120,12 @@ impl<T: Borrow<RowGroupMetaData>> PruningStatistics for RowGroupPruningStats<'_,
|
||||
}
|
||||
|
||||
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
|
||||
let Some(column_id) = self.column_id_to_prune(&column.name) else {
|
||||
return self.compat_null_count(&column.name);
|
||||
};
|
||||
self.read_format.null_counts(self.row_groups, column_id)
|
||||
let column_id = self.column_id_to_prune(&column.name)?;
|
||||
match self.read_format.null_counts(self.row_groups, column_id) {
|
||||
StatValues::Values(values) => Some(values),
|
||||
StatValues::NoColumn => self.compat_null_count(&column.name),
|
||||
StatValues::NoStats => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
|
||||
|
||||
@@ -37,7 +37,7 @@ use common_meta::rpc::ddl::{
|
||||
};
|
||||
use common_meta::rpc::router::{Partition, Partition as MetaPartition};
|
||||
use common_query::Output;
|
||||
use common_telemetry::{debug, info, tracing};
|
||||
use common_telemetry::{debug, info, tracing, warn};
|
||||
use common_time::Timezone;
|
||||
use datafusion_common::tree_node::TreeNodeVisitor;
|
||||
use datafusion_expr::LogicalPlan;
|
||||
@@ -369,7 +369,7 @@ impl StatementExecutor {
|
||||
query_context: QueryContextRef,
|
||||
) -> Result<SubmitDdlTaskResponse> {
|
||||
let flow_type = self
|
||||
.determine_flow_type(&expr.sql, query_context.clone())
|
||||
.determine_flow_type(&expr, query_context.clone())
|
||||
.await?;
|
||||
info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
|
||||
|
||||
@@ -398,9 +398,49 @@ impl StatementExecutor {
|
||||
/// Determine the flow type based on the SQL query
|
||||
///
|
||||
/// If it contains aggregation or distinct, then it is a batch flow, otherwise it is a streaming flow
|
||||
async fn determine_flow_type(&self, sql: &str, query_ctx: QueryContextRef) -> Result<FlowType> {
|
||||
async fn determine_flow_type(
|
||||
&self,
|
||||
expr: &CreateFlowExpr,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<FlowType> {
|
||||
// first check if source table's ttl is instant, if it is, force streaming mode
|
||||
for src_table_name in &expr.source_table_names {
|
||||
let table = self
|
||||
.catalog_manager()
|
||||
.table(
|
||||
&src_table_name.catalog_name,
|
||||
&src_table_name.schema_name,
|
||||
&src_table_name.table_name,
|
||||
Some(&query_ctx),
|
||||
)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: format_full_table_name(
|
||||
&src_table_name.catalog_name,
|
||||
&src_table_name.schema_name,
|
||||
&src_table_name.table_name,
|
||||
),
|
||||
})?;
|
||||
|
||||
// instant source table can only be handled by streaming mode
|
||||
if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) {
|
||||
warn!(
|
||||
"Source table `{}` for flow `{}`'s ttl=instant, fallback to streaming mode",
|
||||
format_full_table_name(
|
||||
&src_table_name.catalog_name,
|
||||
&src_table_name.schema_name,
|
||||
&src_table_name.table_name
|
||||
),
|
||||
expr.flow_name
|
||||
);
|
||||
return Ok(FlowType::Streaming);
|
||||
}
|
||||
}
|
||||
|
||||
let engine = &self.query_engine;
|
||||
let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx)
|
||||
let stmt = QueryLanguageParser::parse_sql(&expr.sql, &query_ctx)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let plan = engine
|
||||
|
||||
@@ -8,7 +8,7 @@ CREATE TABLE distinct_basic (
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- should fail
|
||||
-- should fallback to streaming mode
|
||||
-- SQLNESS REPLACE id=\d+ id=REDACTED
|
||||
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
|
||||
SELECT
|
||||
@@ -16,9 +16,151 @@ SELECT
|
||||
FROM
|
||||
distinct_basic;
|
||||
|
||||
Error: 3001(EngineExecuteQuery), Unsupported: Source table `greptime.public.distinct_basic`(id=REDACTED) has instant TTL, Instant TTL is not supported under batching mode. Consider using a TTL longer than flush interval
|
||||
Affected Rows: 0
|
||||
|
||||
ALTER TABLE distinct_basic SET 'ttl' = '5s';
|
||||
-- flow_options should have a flow_type:streaming
|
||||
-- since source table's ttl=instant
|
||||
SELECT flow_name, options FROM INFORMATION_SCHEMA.FLOWS;
|
||||
|
||||
+---------------------+---------------------------+
|
||||
| flow_name | options |
|
||||
+---------------------+---------------------------+
|
||||
| test_distinct_basic | {"flow_type":"streaming"} |
|
||||
+---------------------+---------------------------+
|
||||
|
||||
SHOW CREATE TABLE distinct_basic;
|
||||
|
||||
+----------------+-----------------------------------------------------------+
|
||||
| Table | Create Table |
|
||||
+----------------+-----------------------------------------------------------+
|
||||
| distinct_basic | CREATE TABLE IF NOT EXISTS "distinct_basic" ( |
|
||||
| | "number" INT NULL, |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL DEFAULT current_timestamp(), |
|
||||
| | TIME INDEX ("ts"), |
|
||||
| | PRIMARY KEY ("number") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=mito |
|
||||
| | WITH( |
|
||||
| | ttl = 'instant' |
|
||||
| | ) |
|
||||
+----------------+-----------------------------------------------------------+
|
||||
|
||||
SHOW CREATE TABLE out_distinct_basic;
|
||||
|
||||
+--------------------+---------------------------------------------------+
|
||||
| Table | Create Table |
|
||||
+--------------------+---------------------------------------------------+
|
||||
| out_distinct_basic | CREATE TABLE IF NOT EXISTS "out_distinct_basic" ( |
|
||||
| | "dis" INT NULL, |
|
||||
| | "update_at" TIMESTAMP(3) NULL, |
|
||||
| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, |
|
||||
| | TIME INDEX ("__ts_placeholder"), |
|
||||
| | PRIMARY KEY ("dis") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=mito |
|
||||
| | |
|
||||
+--------------------+---------------------------------------------------+
|
||||
|
||||
-- SQLNESS SLEEP 3s
|
||||
INSERT INTO
|
||||
distinct_basic
|
||||
VALUES
|
||||
(20, "2021-07-01 00:00:00.200"),
|
||||
(20, "2021-07-01 00:00:00.200"),
|
||||
(22, "2021-07-01 00:00:00.600");
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||
ADMIN FLUSH_FLOW('test_distinct_basic');
|
||||
|
||||
+-----------------------------------------+
|
||||
| ADMIN FLUSH_FLOW('test_distinct_basic') |
|
||||
+-----------------------------------------+
|
||||
| FLOW_FLUSHED |
|
||||
+-----------------------------------------+
|
||||
|
||||
SELECT
|
||||
dis
|
||||
FROM
|
||||
out_distinct_basic;
|
||||
|
||||
+-----+
|
||||
| dis |
|
||||
+-----+
|
||||
| 20 |
|
||||
| 22 |
|
||||
+-----+
|
||||
|
||||
SELECT number FROM distinct_basic;
|
||||
|
||||
++
|
||||
++
|
||||
|
||||
-- SQLNESS SLEEP 6s
|
||||
ADMIN FLUSH_TABLE('distinct_basic');
|
||||
|
||||
+-------------------------------------+
|
||||
| ADMIN FLUSH_TABLE('distinct_basic') |
|
||||
+-------------------------------------+
|
||||
| 0 |
|
||||
+-------------------------------------+
|
||||
|
||||
INSERT INTO
|
||||
distinct_basic
|
||||
VALUES
|
||||
(23, "2021-07-01 00:00:01.600");
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||
ADMIN FLUSH_FLOW('test_distinct_basic');
|
||||
|
||||
+-----------------------------------------+
|
||||
| ADMIN FLUSH_FLOW('test_distinct_basic') |
|
||||
+-----------------------------------------+
|
||||
| FLOW_FLUSHED |
|
||||
+-----------------------------------------+
|
||||
|
||||
SELECT
|
||||
dis
|
||||
FROM
|
||||
out_distinct_basic;
|
||||
|
||||
+-----+
|
||||
| dis |
|
||||
+-----+
|
||||
| 20 |
|
||||
| 22 |
|
||||
| 23 |
|
||||
+-----+
|
||||
|
||||
SELECT number FROM distinct_basic;
|
||||
|
||||
++
|
||||
++
|
||||
|
||||
DROP FLOW test_distinct_basic;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DROP TABLE distinct_basic;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DROP TABLE out_distinct_basic;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- test ttl = 5s
|
||||
CREATE TABLE distinct_basic (
|
||||
number INT,
|
||||
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
PRIMARY KEY(number),
|
||||
TIME INDEX(ts)
|
||||
)WITH ('ttl' = '5s');
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
@@ -30,7 +172,26 @@ FROM
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- flow_options should have a flow_type:batching
|
||||
-- since source table's ttl=instant
|
||||
SELECT flow_name, options FROM INFORMATION_SCHEMA.FLOWS;
|
||||
|
||||
+---------------------+--------------------------+
|
||||
| flow_name | options |
|
||||
+---------------------+--------------------------+
|
||||
| test_distinct_basic | {"flow_type":"batching"} |
|
||||
+---------------------+--------------------------+
|
||||
|
||||
-- SQLNESS ARG restart=true
|
||||
SELECT 1;
|
||||
|
||||
+----------+
|
||||
| Int64(1) |
|
||||
+----------+
|
||||
| 1 |
|
||||
+----------+
|
||||
|
||||
-- SQLNESS SLEEP 3s
|
||||
INSERT INTO
|
||||
distinct_basic
|
||||
VALUES
|
||||
@@ -130,41 +291,6 @@ ADMIN FLUSH_FLOW('test_distinct_basic');
|
||||
| FLOW_FLUSHED |
|
||||
+-----------------------------------------+
|
||||
|
||||
SHOW CREATE TABLE distinct_basic;
|
||||
|
||||
+----------------+-----------------------------------------------------------+
|
||||
| Table | Create Table |
|
||||
+----------------+-----------------------------------------------------------+
|
||||
| distinct_basic | CREATE TABLE IF NOT EXISTS "distinct_basic" ( |
|
||||
| | "number" INT NULL, |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL DEFAULT current_timestamp(), |
|
||||
| | TIME INDEX ("ts"), |
|
||||
| | PRIMARY KEY ("number") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=mito |
|
||||
| | WITH( |
|
||||
| | ttl = '5s' |
|
||||
| | ) |
|
||||
+----------------+-----------------------------------------------------------+
|
||||
|
||||
SHOW CREATE TABLE out_distinct_basic;
|
||||
|
||||
+--------------------+---------------------------------------------------+
|
||||
| Table | Create Table |
|
||||
+--------------------+---------------------------------------------------+
|
||||
| out_distinct_basic | CREATE TABLE IF NOT EXISTS "out_distinct_basic" ( |
|
||||
| | "dis" INT NULL, |
|
||||
| | "update_at" TIMESTAMP(3) NULL, |
|
||||
| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, |
|
||||
| | TIME INDEX ("__ts_placeholder"), |
|
||||
| | PRIMARY KEY ("dis") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=mito |
|
||||
| | |
|
||||
+--------------------+---------------------------------------------------+
|
||||
|
||||
SELECT
|
||||
dis
|
||||
FROM
|
||||
|
||||
@@ -6,7 +6,7 @@ CREATE TABLE distinct_basic (
|
||||
TIME INDEX(ts)
|
||||
)WITH ('ttl' = 'instant');
|
||||
|
||||
-- should fail
|
||||
-- should fallback to streaming mode
|
||||
-- SQLNESS REPLACE id=\d+ id=REDACTED
|
||||
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
|
||||
SELECT
|
||||
@@ -14,7 +14,61 @@ SELECT
|
||||
FROM
|
||||
distinct_basic;
|
||||
|
||||
ALTER TABLE distinct_basic SET 'ttl' = '5s';
|
||||
-- flow_options should have a flow_type:streaming
|
||||
-- since source table's ttl=instant
|
||||
SELECT flow_name, options FROM INFORMATION_SCHEMA.FLOWS;
|
||||
|
||||
SHOW CREATE TABLE distinct_basic;
|
||||
|
||||
SHOW CREATE TABLE out_distinct_basic;
|
||||
|
||||
-- SQLNESS SLEEP 3s
|
||||
INSERT INTO
|
||||
distinct_basic
|
||||
VALUES
|
||||
(20, "2021-07-01 00:00:00.200"),
|
||||
(20, "2021-07-01 00:00:00.200"),
|
||||
(22, "2021-07-01 00:00:00.600");
|
||||
|
||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||
ADMIN FLUSH_FLOW('test_distinct_basic');
|
||||
|
||||
SELECT
|
||||
dis
|
||||
FROM
|
||||
out_distinct_basic;
|
||||
|
||||
SELECT number FROM distinct_basic;
|
||||
|
||||
-- SQLNESS SLEEP 6s
|
||||
ADMIN FLUSH_TABLE('distinct_basic');
|
||||
|
||||
INSERT INTO
|
||||
distinct_basic
|
||||
VALUES
|
||||
(23, "2021-07-01 00:00:01.600");
|
||||
|
||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||
ADMIN FLUSH_FLOW('test_distinct_basic');
|
||||
|
||||
SELECT
|
||||
dis
|
||||
FROM
|
||||
out_distinct_basic;
|
||||
|
||||
SELECT number FROM distinct_basic;
|
||||
|
||||
DROP FLOW test_distinct_basic;
|
||||
DROP TABLE distinct_basic;
|
||||
DROP TABLE out_distinct_basic;
|
||||
|
||||
-- test ttl = 5s
|
||||
CREATE TABLE distinct_basic (
|
||||
number INT,
|
||||
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
PRIMARY KEY(number),
|
||||
TIME INDEX(ts)
|
||||
)WITH ('ttl' = '5s');
|
||||
|
||||
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
|
||||
SELECT
|
||||
@@ -22,7 +76,14 @@ SELECT
|
||||
FROM
|
||||
distinct_basic;
|
||||
|
||||
-- flow_options should have a flow_type:batching
|
||||
-- since source table's ttl=instant
|
||||
SELECT flow_name, options FROM INFORMATION_SCHEMA.FLOWS;
|
||||
|
||||
-- SQLNESS ARG restart=true
|
||||
SELECT 1;
|
||||
|
||||
-- SQLNESS SLEEP 3s
|
||||
INSERT INTO
|
||||
distinct_basic
|
||||
VALUES
|
||||
@@ -55,10 +116,6 @@ VALUES
|
||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||
ADMIN FLUSH_FLOW('test_distinct_basic');
|
||||
|
||||
SHOW CREATE TABLE distinct_basic;
|
||||
|
||||
SHOW CREATE TABLE out_distinct_basic;
|
||||
|
||||
SELECT
|
||||
dis
|
||||
FROM
|
||||
|
||||
@@ -44,6 +44,15 @@ ADMIN FLUSH_FLOW('test_numbers_basic');
|
||||
+----------------------------------------+
|
||||
|
||||
-- SQLNESS ARG restart=true
|
||||
SELECT 1;
|
||||
|
||||
+----------+
|
||||
| Int64(1) |
|
||||
+----------+
|
||||
| 1 |
|
||||
+----------+
|
||||
|
||||
-- SQLNESS SLEEP 3s
|
||||
SHOW CREATE TABLE out_num_cnt_basic;
|
||||
|
||||
+-------------------+--------------------------------------------------+
|
||||
@@ -101,6 +110,16 @@ GROUP BY
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- SQLNESS ARG restart=true
|
||||
SELECT 1;
|
||||
|
||||
+----------+
|
||||
| Int64(1) |
|
||||
+----------+
|
||||
| 1 |
|
||||
+----------+
|
||||
|
||||
-- SQLNESS SLEEP 3s
|
||||
SHOW CREATE TABLE out_num_cnt_basic;
|
||||
|
||||
+-------------------+--------------------------------------------------+
|
||||
@@ -118,6 +137,15 @@ SHOW CREATE TABLE out_num_cnt_basic;
|
||||
+-------------------+--------------------------------------------------+
|
||||
|
||||
-- SQLNESS ARG restart=true
|
||||
SELECT 1;
|
||||
|
||||
+----------+
|
||||
| Int64(1) |
|
||||
+----------+
|
||||
| 1 |
|
||||
+----------+
|
||||
|
||||
-- SQLNESS SLEEP 3s
|
||||
SHOW CREATE FLOW test_numbers_basic;
|
||||
|
||||
+--------------------+---------------------------------------------------------------------------------------+
|
||||
|
||||
@@ -20,6 +20,9 @@ SHOW CREATE TABLE out_num_cnt_basic;
|
||||
ADMIN FLUSH_FLOW('test_numbers_basic');
|
||||
|
||||
-- SQLNESS ARG restart=true
|
||||
SELECT 1;
|
||||
|
||||
-- SQLNESS SLEEP 3s
|
||||
SHOW CREATE TABLE out_num_cnt_basic;
|
||||
|
||||
SHOW CREATE FLOW test_numbers_basic;
|
||||
@@ -44,10 +47,16 @@ FROM
|
||||
numbers_input_basic
|
||||
GROUP BY
|
||||
ts;
|
||||
-- SQLNESS ARG restart=true
|
||||
SELECT 1;
|
||||
|
||||
-- SQLNESS SLEEP 3s
|
||||
SHOW CREATE TABLE out_num_cnt_basic;
|
||||
|
||||
-- SQLNESS ARG restart=true
|
||||
SELECT 1;
|
||||
|
||||
-- SQLNESS SLEEP 3s
|
||||
SHOW CREATE FLOW test_numbers_basic;
|
||||
|
||||
SHOW CREATE TABLE out_num_cnt_basic;
|
||||
|
||||
@@ -62,6 +62,15 @@ SHOW CREATE TABLE out_num_cnt_basic;
|
||||
+-------------------+--------------------------------------------------+
|
||||
|
||||
-- SQLNESS ARG restart=true
|
||||
SELECT 1;
|
||||
|
||||
+----------+
|
||||
| Int64(1) |
|
||||
+----------+
|
||||
| 1 |
|
||||
+----------+
|
||||
|
||||
-- SQLNESS SLEEP 3s
|
||||
INSERT INTO
|
||||
numbers_input_basic
|
||||
VALUES
|
||||
@@ -206,6 +215,15 @@ SHOW CREATE TABLE out_basic;
|
||||
+-----------+---------------------------------------------+
|
||||
|
||||
-- SQLNESS ARG restart=true
|
||||
SELECT 1;
|
||||
|
||||
+----------+
|
||||
| Int64(1) |
|
||||
+----------+
|
||||
| 1 |
|
||||
+----------+
|
||||
|
||||
-- SQLNESS SLEEP 3s
|
||||
INSERT INTO
|
||||
input_basic
|
||||
VALUES
|
||||
@@ -306,6 +324,15 @@ ADMIN FLUSH_FLOW('test_distinct_basic');
|
||||
+-----------------------------------------+
|
||||
|
||||
-- SQLNESS ARG restart=true
|
||||
SELECT 1;
|
||||
|
||||
+----------+
|
||||
| Int64(1) |
|
||||
+----------+
|
||||
| 1 |
|
||||
+----------+
|
||||
|
||||
-- SQLNESS SLEEP 3s
|
||||
INSERT INTO
|
||||
distinct_basic
|
||||
VALUES
|
||||
@@ -1665,6 +1692,15 @@ ADMIN FLUSH_FLOW('test_numbers_basic');
|
||||
+----------------------------------------+
|
||||
|
||||
-- SQLNESS ARG restart=true
|
||||
SELECT 1;
|
||||
|
||||
+----------+
|
||||
| Int64(1) |
|
||||
+----------+
|
||||
| 1 |
|
||||
+----------+
|
||||
|
||||
-- SQLNESS SLEEP 3s
|
||||
INSERT INTO
|
||||
numbers_input_basic
|
||||
VALUES
|
||||
|
||||
@@ -24,6 +24,9 @@ ADMIN FLUSH_FLOW('test_numbers_basic');
|
||||
SHOW CREATE TABLE out_num_cnt_basic;
|
||||
|
||||
-- SQLNESS ARG restart=true
|
||||
SELECT 1;
|
||||
|
||||
-- SQLNESS SLEEP 3s
|
||||
INSERT INTO
|
||||
numbers_input_basic
|
||||
VALUES
|
||||
@@ -91,6 +94,9 @@ FROM
|
||||
SHOW CREATE TABLE out_basic;
|
||||
|
||||
-- SQLNESS ARG restart=true
|
||||
SELECT 1;
|
||||
|
||||
-- SQLNESS SLEEP 3s
|
||||
INSERT INTO
|
||||
input_basic
|
||||
VALUES
|
||||
@@ -130,6 +136,9 @@ SHOW CREATE TABLE out_distinct_basic;
|
||||
ADMIN FLUSH_FLOW('test_distinct_basic');
|
||||
|
||||
-- SQLNESS ARG restart=true
|
||||
SELECT 1;
|
||||
|
||||
-- SQLNESS SLEEP 3s
|
||||
INSERT INTO
|
||||
distinct_basic
|
||||
VALUES
|
||||
@@ -788,6 +797,9 @@ SHOW CREATE TABLE out_num_cnt_basic;
|
||||
ADMIN FLUSH_FLOW('test_numbers_basic');
|
||||
|
||||
-- SQLNESS ARG restart=true
|
||||
SELECT 1;
|
||||
|
||||
-- SQLNESS SLEEP 3s
|
||||
INSERT INTO
|
||||
numbers_input_basic
|
||||
VALUES
|
||||
|
||||
@@ -730,10 +730,21 @@ SELECT key FROM api_stats;
|
||||
+-----+
|
||||
|
||||
-- SQLNESS ARG restart=true
|
||||
SELECT 1;
|
||||
|
||||
+----------+
|
||||
| Int64(1) |
|
||||
+----------+
|
||||
| 1 |
|
||||
+----------+
|
||||
|
||||
-- SQLNESS SLEEP 5s
|
||||
INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (now(), '2', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1);
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
-- wait more time so flownode have time to recover flows
|
||||
-- SQLNESS SLEEP 5s
|
||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||
ADMIN FLUSH_FLOW('api_stats_flow');
|
||||
|
||||
|
||||
@@ -399,8 +399,13 @@ ADMIN FLUSH_FLOW('api_stats_flow');
|
||||
SELECT key FROM api_stats;
|
||||
|
||||
-- SQLNESS ARG restart=true
|
||||
SELECT 1;
|
||||
|
||||
-- SQLNESS SLEEP 5s
|
||||
INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (now(), '2', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1);
|
||||
|
||||
-- wait more time so flownode have time to recover flows
|
||||
-- SQLNESS SLEEP 5s
|
||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||
ADMIN FLUSH_FLOW('api_stats_flow');
|
||||
|
||||
|
||||
@@ -50,6 +50,7 @@ ADMIN FLUSH_FLOW('calc_access_log_10s');
|
||||
+-----------------------------------------+
|
||||
|
||||
-- query should return 3 rows
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
SELECT "url", time_window FROM access_log_10s
|
||||
ORDER BY
|
||||
time_window;
|
||||
@@ -63,6 +64,7 @@ ORDER BY
|
||||
+------------+---------------------+
|
||||
|
||||
-- use hll_count to query the approximate data in access_log_10s
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
SELECT "url", time_window, hll_count(state) FROM access_log_10s
|
||||
ORDER BY
|
||||
time_window;
|
||||
@@ -76,6 +78,7 @@ ORDER BY
|
||||
+------------+---------------------+---------------------------------+
|
||||
|
||||
-- further, we can aggregate 10 seconds of data to every minute, by using hll_merge to merge 10 seconds of hyperloglog state
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
SELECT
|
||||
"url",
|
||||
date_bin('1 minute'::INTERVAL, time_window) AS time_window_1m,
|
||||
@@ -91,8 +94,8 @@ ORDER BY
|
||||
+------------+---------------------+------------+
|
||||
| url | time_window_1m | uv_per_min |
|
||||
+------------+---------------------+------------+
|
||||
| /not_found | 2025-03-04T00:00:00 | 1 |
|
||||
| /dashboard | 2025-03-04T00:00:00 | 3 |
|
||||
| /not_found | 2025-03-04T00:00:00 | 1 |
|
||||
+------------+---------------------+------------+
|
||||
|
||||
DROP FLOW calc_access_log_10s;
|
||||
|
||||
@@ -36,16 +36,19 @@ INSERT INTO access_log VALUES
|
||||
ADMIN FLUSH_FLOW('calc_access_log_10s');
|
||||
|
||||
-- query should return 3 rows
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
SELECT "url", time_window FROM access_log_10s
|
||||
ORDER BY
|
||||
time_window;
|
||||
|
||||
-- use hll_count to query the approximate data in access_log_10s
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
SELECT "url", time_window, hll_count(state) FROM access_log_10s
|
||||
ORDER BY
|
||||
time_window;
|
||||
|
||||
-- further, we can aggregate 10 seconds of data to every minute, by using hll_merge to merge 10 seconds of hyperloglog state
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
SELECT
|
||||
"url",
|
||||
date_bin('1 minute'::INTERVAL, time_window) AS time_window_1m,
|
||||
|
||||
@@ -263,6 +263,15 @@ SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORM
|
||||
|
||||
-- makesure after recover should be the same
|
||||
-- SQLNESS ARG restart=true
|
||||
SELECT 1;
|
||||
|
||||
+----------+
|
||||
| Int64(1) |
|
||||
+----------+
|
||||
| 1 |
|
||||
+----------+
|
||||
|
||||
-- SQLNESS SLEEP 3s
|
||||
SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
|
||||
|
||||
+---------------------+---------------+-------------------------------------------------------------+------------------------------------+
|
||||
|
||||
@@ -108,6 +108,9 @@ SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORM
|
||||
|
||||
-- makesure after recover should be the same
|
||||
-- SQLNESS ARG restart=true
|
||||
SELECT 1;
|
||||
|
||||
-- SQLNESS SLEEP 3s
|
||||
|
||||
SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ select GREATEST('1999-01-30', '2023-03-01');
|
||||
+-------------------------------------------------+
|
||||
| greatest(Utf8("1999-01-30"),Utf8("2023-03-01")) |
|
||||
+-------------------------------------------------+
|
||||
| 2023-03-01T00:00:00 |
|
||||
| 2023-03-01 |
|
||||
+-------------------------------------------------+
|
||||
|
||||
select GREATEST('2000-02-11'::Date, '2020-12-30'::Date);
|
||||
|
||||
158
tests/cases/standalone/common/select/prune_pk.result
Normal file
158
tests/cases/standalone/common/select/prune_pk.result
Normal file
@@ -0,0 +1,158 @@
|
||||
CREATE TABLE IF NOT EXISTS `test_multi_pk_filter` ( `namespace` STRING NULL, `env` STRING NULL DEFAULT 'NULL', `flag` INT NULL, `total` BIGINT NULL, `greptime_timestamp` TIMESTAMP(9) NOT NULL, TIME INDEX (`greptime_timestamp`), PRIMARY KEY (`namespace`, `env`, `flag`) ) ENGINE=mito;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO test_multi_pk_filter
|
||||
(namespace, env, flag, total, greptime_timestamp)
|
||||
VALUES ('thermostat_v2', 'production', 1, 5289, '2023-05-15 10:00:00');
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
INSERT INTO test_multi_pk_filter
|
||||
(namespace, env, flag, total, greptime_timestamp)
|
||||
VALUES ('thermostat_v2', 'production', 0, 421, '2023-05-15 10:05:00');
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
INSERT INTO test_multi_pk_filter
|
||||
(namespace, env, flag, total, greptime_timestamp)
|
||||
VALUES ('thermostat_v2', 'dev', 1, 356, '2023-05-15 10:10:00');
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
ADMIN FLUSH_TABLE('test_multi_pk_filter');
|
||||
|
||||
+-------------------------------------------+
|
||||
| ADMIN FLUSH_TABLE('test_multi_pk_filter') |
|
||||
+-------------------------------------------+
|
||||
| 0 |
|
||||
+-------------------------------------------+
|
||||
|
||||
INSERT INTO test_multi_pk_filter
|
||||
(namespace, env, flag, total, greptime_timestamp)
|
||||
VALUES ('thermostat_v2', 'dev', 1, 412, '2023-05-15 10:15:00');
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
INSERT INTO test_multi_pk_filter
|
||||
(namespace, env, flag, total, greptime_timestamp)
|
||||
VALUES ('thermostat_v2', 'dev', 1, 298, '2023-05-15 10:20:00');
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
INSERT INTO test_multi_pk_filter
|
||||
(namespace, env, flag, total, greptime_timestamp)
|
||||
VALUES ('thermostat_v2', 'production', 1, 5289, '2023-05-15 10:25:00');
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
INSERT INTO test_multi_pk_filter
|
||||
(namespace, env, flag, total, greptime_timestamp)
|
||||
VALUES ('thermostat_v2', 'production', 1, 5874, '2023-05-15 10:30:00');
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
ADMIN FLUSH_TABLE('test_multi_pk_filter');
|
||||
|
||||
+-------------------------------------------+
|
||||
| ADMIN FLUSH_TABLE('test_multi_pk_filter') |
|
||||
+-------------------------------------------+
|
||||
| 0 |
|
||||
+-------------------------------------------+
|
||||
|
||||
INSERT INTO test_multi_pk_filter
|
||||
(namespace, env, flag, total, greptime_timestamp)
|
||||
VALUES ('thermostat_v2', 'production', 1, 6132, '2023-05-15 10:35:00');
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
INSERT INTO test_multi_pk_filter
|
||||
(namespace, env, flag, total, greptime_timestamp)
|
||||
VALUES ('thermostat_v2', 'testing', 1, 1287, '2023-05-15 10:40:00');
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
INSERT INTO test_multi_pk_filter
|
||||
(namespace, env, flag, total, greptime_timestamp)
|
||||
VALUES ('thermostat_v2', 'testing', 1, 1432, '2023-05-15 10:45:00');
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
INSERT INTO test_multi_pk_filter
|
||||
(namespace, env, flag, total, greptime_timestamp)
|
||||
VALUES ('thermostat_v2', 'testing', 1, 1056, '2023-05-15 10:50:00');
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
SELECT greptime_timestamp, namespace, env, total FROM test_multi_pk_filter WHERE
|
||||
greptime_timestamp BETWEEN '2023-05-15 10:00:00' AND '2023-05-15 11:00:00' AND flag = 1 AND namespace = 'thermostat_v2'
|
||||
ORDER BY greptime_timestamp;
|
||||
|
||||
+---------------------+---------------+------------+-------+
|
||||
| greptime_timestamp | namespace | env | total |
|
||||
+---------------------+---------------+------------+-------+
|
||||
| 2023-05-15T10:00:00 | thermostat_v2 | production | 5289 |
|
||||
| 2023-05-15T10:10:00 | thermostat_v2 | dev | 356 |
|
||||
| 2023-05-15T10:15:00 | thermostat_v2 | dev | 412 |
|
||||
| 2023-05-15T10:20:00 | thermostat_v2 | dev | 298 |
|
||||
| 2023-05-15T10:25:00 | thermostat_v2 | production | 5289 |
|
||||
| 2023-05-15T10:30:00 | thermostat_v2 | production | 5874 |
|
||||
| 2023-05-15T10:35:00 | thermostat_v2 | production | 6132 |
|
||||
| 2023-05-15T10:40:00 | thermostat_v2 | testing | 1287 |
|
||||
| 2023-05-15T10:45:00 | thermostat_v2 | testing | 1432 |
|
||||
| 2023-05-15T10:50:00 | thermostat_v2 | testing | 1056 |
|
||||
+---------------------+---------------+------------+-------+
|
||||
|
||||
SELECT greptime_timestamp, namespace, env, total FROM test_multi_pk_filter WHERE
|
||||
greptime_timestamp BETWEEN '2023-05-15 10:00:00' AND '2023-05-15 11:00:00' AND flag = 1 AND namespace = 'thermostat_v2' AND env='dev'
|
||||
ORDER BY greptime_timestamp;
|
||||
|
||||
+---------------------+---------------+-----+-------+
|
||||
| greptime_timestamp | namespace | env | total |
|
||||
+---------------------+---------------+-----+-------+
|
||||
| 2023-05-15T10:10:00 | thermostat_v2 | dev | 356 |
|
||||
| 2023-05-15T10:15:00 | thermostat_v2 | dev | 412 |
|
||||
| 2023-05-15T10:20:00 | thermostat_v2 | dev | 298 |
|
||||
+---------------------+---------------+-----+-------+
|
||||
|
||||
DROP TABLE test_multi_pk_filter;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE TABLE IF NOT EXISTS `test_multi_pk_null` ( `namespace` STRING NULL, `env` STRING NULL DEFAULT 'NULL', `total` BIGINT NULL, `greptime_timestamp` TIMESTAMP(9) NOT NULL, TIME INDEX (`greptime_timestamp`), PRIMARY KEY (`namespace`, `env`) ) ENGINE=mito;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO test_multi_pk_null
|
||||
(namespace, env, total, greptime_timestamp)
|
||||
VALUES ('thermostat_v2', 'production', 5289, '2023-05-15 10:00:00');
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
INSERT INTO test_multi_pk_null
|
||||
(namespace, env, total, greptime_timestamp)
|
||||
VALUES ('thermostat_v2', 'production', 421, '2023-05-15 10:05:00');
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
ADMIN FLUSH_TABLE('test_multi_pk_null');
|
||||
|
||||
+-----------------------------------------+
|
||||
| ADMIN FLUSH_TABLE('test_multi_pk_null') |
|
||||
+-----------------------------------------+
|
||||
| 0 |
|
||||
+-----------------------------------------+
|
||||
|
||||
SELECT * FROM test_multi_pk_null WHERE env IS NOT NULL;
|
||||
|
||||
+---------------+------------+-------+---------------------+
|
||||
| namespace | env | total | greptime_timestamp |
|
||||
+---------------+------------+-------+---------------------+
|
||||
| thermostat_v2 | production | 5289 | 2023-05-15T10:00:00 |
|
||||
| thermostat_v2 | production | 421 | 2023-05-15T10:05:00 |
|
||||
+---------------+------------+-------+---------------------+
|
||||
|
||||
DROP TABLE test_multi_pk_null;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
66
tests/cases/standalone/common/select/prune_pk.sql
Normal file
66
tests/cases/standalone/common/select/prune_pk.sql
Normal file
@@ -0,0 +1,66 @@
|
||||
CREATE TABLE IF NOT EXISTS `test_multi_pk_filter` ( `namespace` STRING NULL, `env` STRING NULL DEFAULT 'NULL', `flag` INT NULL, `total` BIGINT NULL, `greptime_timestamp` TIMESTAMP(9) NOT NULL, TIME INDEX (`greptime_timestamp`), PRIMARY KEY (`namespace`, `env`, `flag`) ) ENGINE=mito;
|
||||
|
||||
INSERT INTO test_multi_pk_filter
|
||||
(namespace, env, flag, total, greptime_timestamp)
|
||||
VALUES ('thermostat_v2', 'production', 1, 5289, '2023-05-15 10:00:00');
|
||||
INSERT INTO test_multi_pk_filter
|
||||
(namespace, env, flag, total, greptime_timestamp)
|
||||
VALUES ('thermostat_v2', 'production', 0, 421, '2023-05-15 10:05:00');
|
||||
INSERT INTO test_multi_pk_filter
|
||||
(namespace, env, flag, total, greptime_timestamp)
|
||||
VALUES ('thermostat_v2', 'dev', 1, 356, '2023-05-15 10:10:00');
|
||||
|
||||
ADMIN FLUSH_TABLE('test_multi_pk_filter');
|
||||
|
||||
INSERT INTO test_multi_pk_filter
|
||||
(namespace, env, flag, total, greptime_timestamp)
|
||||
VALUES ('thermostat_v2', 'dev', 1, 412, '2023-05-15 10:15:00');
|
||||
INSERT INTO test_multi_pk_filter
|
||||
(namespace, env, flag, total, greptime_timestamp)
|
||||
VALUES ('thermostat_v2', 'dev', 1, 298, '2023-05-15 10:20:00');
|
||||
INSERT INTO test_multi_pk_filter
|
||||
(namespace, env, flag, total, greptime_timestamp)
|
||||
VALUES ('thermostat_v2', 'production', 1, 5289, '2023-05-15 10:25:00');
|
||||
INSERT INTO test_multi_pk_filter
|
||||
(namespace, env, flag, total, greptime_timestamp)
|
||||
VALUES ('thermostat_v2', 'production', 1, 5874, '2023-05-15 10:30:00');
|
||||
|
||||
ADMIN FLUSH_TABLE('test_multi_pk_filter');
|
||||
|
||||
INSERT INTO test_multi_pk_filter
|
||||
(namespace, env, flag, total, greptime_timestamp)
|
||||
VALUES ('thermostat_v2', 'production', 1, 6132, '2023-05-15 10:35:00');
|
||||
INSERT INTO test_multi_pk_filter
|
||||
(namespace, env, flag, total, greptime_timestamp)
|
||||
VALUES ('thermostat_v2', 'testing', 1, 1287, '2023-05-15 10:40:00');
|
||||
INSERT INTO test_multi_pk_filter
|
||||
(namespace, env, flag, total, greptime_timestamp)
|
||||
VALUES ('thermostat_v2', 'testing', 1, 1432, '2023-05-15 10:45:00');
|
||||
INSERT INTO test_multi_pk_filter
|
||||
(namespace, env, flag, total, greptime_timestamp)
|
||||
VALUES ('thermostat_v2', 'testing', 1, 1056, '2023-05-15 10:50:00');
|
||||
|
||||
SELECT greptime_timestamp, namespace, env, total FROM test_multi_pk_filter WHERE
|
||||
greptime_timestamp BETWEEN '2023-05-15 10:00:00' AND '2023-05-15 11:00:00' AND flag = 1 AND namespace = 'thermostat_v2'
|
||||
ORDER BY greptime_timestamp;
|
||||
|
||||
SELECT greptime_timestamp, namespace, env, total FROM test_multi_pk_filter WHERE
|
||||
greptime_timestamp BETWEEN '2023-05-15 10:00:00' AND '2023-05-15 11:00:00' AND flag = 1 AND namespace = 'thermostat_v2' AND env='dev'
|
||||
ORDER BY greptime_timestamp;
|
||||
|
||||
DROP TABLE test_multi_pk_filter;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS `test_multi_pk_null` ( `namespace` STRING NULL, `env` STRING NULL DEFAULT 'NULL', `total` BIGINT NULL, `greptime_timestamp` TIMESTAMP(9) NOT NULL, TIME INDEX (`greptime_timestamp`), PRIMARY KEY (`namespace`, `env`) ) ENGINE=mito;
|
||||
|
||||
INSERT INTO test_multi_pk_null
|
||||
(namespace, env, total, greptime_timestamp)
|
||||
VALUES ('thermostat_v2', 'production', 5289, '2023-05-15 10:00:00');
|
||||
INSERT INTO test_multi_pk_null
|
||||
(namespace, env, total, greptime_timestamp)
|
||||
VALUES ('thermostat_v2', 'production', 421, '2023-05-15 10:05:00');
|
||||
|
||||
ADMIN FLUSH_TABLE('test_multi_pk_null');
|
||||
|
||||
SELECT * FROM test_multi_pk_null WHERE env IS NOT NULL;
|
||||
|
||||
DROP TABLE test_multi_pk_null;
|
||||
Reference in New Issue
Block a user