From 5c9cbb5f4c364bb4e64c3095bb0d630e786dccd4 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Fri, 2 May 2025 00:20:01 +0800 Subject: [PATCH] chore: bump version to 0.14.2 (#6032) * chore: only retry when retry-able in flow (#5987) * chore: only retry when retry-able * chore: revert dbg change * refactor: per review * fix: check for available frontend first * docs: more explain&longer timeout&feat: more retry at every level&try send select 1 * fix: use `sql` method for "SELECT 1" * fix: also put recover flows in spawned task and a dead loop * test: update transient error in flow rebuild test * chore: sleep after sqlness sleep * chore: add a warning * chore: wait even more time after reboot * fix: sanitize_connection_string (#6012) * fix: disable recursion limit in prost (#6010) Signed-off-by: Ruihang Xia * ci: fix the bugs of release-dev-builder-images and add update-dev-builder-image-tag (#6009) * fix: the dev-builder release job is not triggered by merged event * ci: add update-dev-builder-image-tag * fix: always create mito engine (#6018) * fix: force streaming mode for instant source table (#6031) * fix: force streaming mode for instant source table * tests: sqlness test&refactor: get table * refactor: per review * chore: bump version to 0.14.2 --------- Signed-off-by: Ruihang Xia Co-authored-by: jeremyhi Co-authored-by: Ruihang Xia Co-authored-by: zyy17 Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com> --- .github/scripts/update-dev-builder-version.sh | 37 ++++ .../workflows/release-dev-builder-images.yaml | 49 ++++- Cargo.lock | 146 ++++++------- Cargo.toml | 4 +- src/client/src/database.rs | 37 +++- src/cmd/src/metasrv.rs | 33 ++- src/common/meta/src/kv_backend.rs | 2 +- src/common/meta/src/kv_backend/util.rs | 85 ++++++++ src/datanode/src/datanode.rs | 59 ++--- src/flow/src/adapter/flownode_impl.rs | 78 ++++++- src/flow/src/batching_mode.rs | 11 +- src/flow/src/batching_mode/engine.rs | 3 +- src/flow/src/batching_mode/frontend_client.rs | 115 ++++++---- src/flow/src/error.rs | 13 +- src/flow/src/server.rs | 2 + src/meta-srv/src/metasrv.rs | 52 ++++- src/operator/src/statement/ddl.rs | 48 ++++- .../common/flow/flow_advance_ttl.result | 202 ++++++++++++++---- .../common/flow/flow_advance_ttl.sql | 69 +++++- .../common/flow/flow_auto_sink_table.result | 28 +++ .../common/flow/flow_auto_sink_table.sql | 9 + .../standalone/common/flow/flow_basic.result | 36 ++++ .../standalone/common/flow/flow_basic.sql | 12 ++ .../common/flow/flow_rebuild.result | 11 + .../standalone/common/flow/flow_rebuild.sql | 5 + .../common/flow/show_create_flow.result | 9 + .../common/flow/show_create_flow.sql | 3 + 27 files changed, 935 insertions(+), 223 deletions(-) create mode 100755 .github/scripts/update-dev-builder-version.sh create mode 100644 src/common/meta/src/kv_backend/util.rs diff --git a/.github/scripts/update-dev-builder-version.sh b/.github/scripts/update-dev-builder-version.sh new file mode 100755 index 0000000000..2983d76794 --- /dev/null +++ b/.github/scripts/update-dev-builder-version.sh @@ -0,0 +1,37 @@ +#!/bin/bash + +DEV_BUILDER_IMAGE_TAG=$1 + +update_dev_builder_version() { + if [ -z "$DEV_BUILDER_IMAGE_TAG" ]; then + echo "Error: Should specify the dev-builder image tag" + exit 1 + fi + + # Configure Git configs. + git config --global user.email greptimedb-ci@greptime.com + git config --global user.name greptimedb-ci + + # Checkout a new branch. + BRANCH_NAME="ci/update-dev-builder-$(date +%Y%m%d%H%M%S)" + git checkout -b $BRANCH_NAME + + # Update the dev-builder image tag in the Makefile. + gsed -i "s/DEV_BUILDER_IMAGE_TAG ?=.*/DEV_BUILDER_IMAGE_TAG ?= ${DEV_BUILDER_IMAGE_TAG}/g" Makefile + + # Commit the changes. + git add Makefile + git commit -m "ci: update dev-builder image tag" + git push origin $BRANCH_NAME + + # Create a Pull Request. + gh pr create \ + --title "ci: update dev-builder image tag" \ + --body "This PR updates the dev-builder image tag" \ + --base main \ + --head $BRANCH_NAME \ + --reviewer zyy17 \ + --reviewer daviderli614 +} + +update_dev_builder_version diff --git a/.github/workflows/release-dev-builder-images.yaml b/.github/workflows/release-dev-builder-images.yaml index 1abb359ba7..e6a33dcceb 100644 --- a/.github/workflows/release-dev-builder-images.yaml +++ b/.github/workflows/release-dev-builder-images.yaml @@ -24,11 +24,19 @@ on: description: Release dev-builder-android image required: false default: false + update_dev_builder_image_tag: + type: boolean + description: Update the DEV_BUILDER_IMAGE_TAG in Makefile and create a PR + required: false + default: false jobs: release-dev-builder-images: name: Release dev builder images - if: ${{ inputs.release_dev_builder_ubuntu_image || inputs.release_dev_builder_centos_image || inputs.release_dev_builder_android_image }} # Only manually trigger this job. + # The jobs are triggered by the following events: + # 1. Manually triggered workflow_dispatch event + # 2. Push event when the PR that modifies the `rust-toolchain.toml` or `docker/dev-builder/**` is merged to main + if: ${{ github.event_name == 'push' || inputs.release_dev_builder_ubuntu_image || inputs.release_dev_builder_centos_image || inputs.release_dev_builder_android_image }} runs-on: ubuntu-latest outputs: version: ${{ steps.set-version.outputs.version }} @@ -57,9 +65,9 @@ jobs: version: ${{ env.VERSION }} dockerhub-image-registry-username: ${{ secrets.DOCKERHUB_USERNAME }} dockerhub-image-registry-token: ${{ secrets.DOCKERHUB_TOKEN }} - build-dev-builder-ubuntu: ${{ inputs.release_dev_builder_ubuntu_image }} - build-dev-builder-centos: ${{ inputs.release_dev_builder_centos_image }} - build-dev-builder-android: ${{ inputs.release_dev_builder_android_image }} + build-dev-builder-ubuntu: ${{ inputs.release_dev_builder_ubuntu_image || github.event_name == 'push' }} + build-dev-builder-centos: ${{ inputs.release_dev_builder_centos_image || github.event_name == 'push' }} + build-dev-builder-android: ${{ inputs.release_dev_builder_android_image || github.event_name == 'push' }} release-dev-builder-images-ecr: name: Release dev builder images to AWS ECR @@ -85,7 +93,7 @@ jobs: - name: Push dev-builder-ubuntu image shell: bash - if: ${{ inputs.release_dev_builder_ubuntu_image }} + if: ${{ inputs.release_dev_builder_ubuntu_image || github.event_name == 'push' }} env: IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }} IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }} @@ -106,7 +114,7 @@ jobs: - name: Push dev-builder-centos image shell: bash - if: ${{ inputs.release_dev_builder_centos_image }} + if: ${{ inputs.release_dev_builder_centos_image || github.event_name == 'push' }} env: IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }} IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }} @@ -127,7 +135,7 @@ jobs: - name: Push dev-builder-android image shell: bash - if: ${{ inputs.release_dev_builder_android_image }} + if: ${{ inputs.release_dev_builder_android_image || github.event_name == 'push' }} env: IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }} IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }} @@ -162,7 +170,7 @@ jobs: - name: Push dev-builder-ubuntu image shell: bash - if: ${{ inputs.release_dev_builder_ubuntu_image }} + if: ${{ inputs.release_dev_builder_ubuntu_image || github.event_name == 'push' }} env: IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }} IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }} @@ -176,7 +184,7 @@ jobs: - name: Push dev-builder-centos image shell: bash - if: ${{ inputs.release_dev_builder_centos_image }} + if: ${{ inputs.release_dev_builder_centos_image || github.event_name == 'push' }} env: IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }} IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }} @@ -190,7 +198,7 @@ jobs: - name: Push dev-builder-android image shell: bash - if: ${{ inputs.release_dev_builder_android_image }} + if: ${{ inputs.release_dev_builder_android_image || github.event_name == 'push' }} env: IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }} IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }} @@ -201,3 +209,24 @@ jobs: quay.io/skopeo/stable:latest \ copy -a docker://docker.io/$IMAGE_NAMESPACE/dev-builder-android:$IMAGE_VERSION \ docker://$ACR_IMAGE_REGISTRY/$IMAGE_NAMESPACE/dev-builder-android:$IMAGE_VERSION + + update-dev-builder-image-tag: + name: Update dev-builder image tag + runs-on: ubuntu-latest + permissions: + contents: write + pull-requests: write + if: ${{ github.event_name == 'push' || inputs.update_dev_builder_image_tag }} + needs: [ + release-dev-builder-images + ] + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Update dev-builder image tag + shell: bash + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: | + ./.github/scripts/update-dev-builder-version.sh ${{ needs.release-dev-builder-images.outputs.version }} diff --git a/Cargo.lock b/Cargo.lock index ac25ab061e..c127c0ab38 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -185,7 +185,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c" [[package]] name = "api" -version = "0.14.1" +version = "0.14.2" dependencies = [ "common-base", "common-decimal", @@ -915,7 +915,7 @@ dependencies = [ [[package]] name = "auth" -version = "0.14.1" +version = "0.14.2" dependencies = [ "api", "async-trait", @@ -1537,7 +1537,7 @@ dependencies = [ [[package]] name = "cache" -version = "0.14.1" +version = "0.14.2" dependencies = [ "catalog", "common-error", @@ -1561,7 +1561,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "catalog" -version = "0.14.1" +version = "0.14.2" dependencies = [ "api", "arrow 54.2.1", @@ -1874,7 +1874,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97" [[package]] name = "cli" -version = "0.14.1" +version = "0.14.2" dependencies = [ "async-trait", "auth", @@ -1917,7 +1917,7 @@ dependencies = [ "session", "snafu 0.8.5", "store-api", - "substrait 0.14.1", + "substrait 0.14.2", "table", "tempfile", "tokio", @@ -1926,7 +1926,7 @@ dependencies = [ [[package]] name = "client" -version = "0.14.1" +version = "0.14.2" dependencies = [ "api", "arc-swap", @@ -1955,7 +1955,7 @@ dependencies = [ "rand 0.9.0", "serde_json", "snafu 0.8.5", - "substrait 0.14.1", + "substrait 0.14.2", "substrait 0.37.3", "tokio", "tokio-stream", @@ -1996,7 +1996,7 @@ dependencies = [ [[package]] name = "cmd" -version = "0.14.1" +version = "0.14.2" dependencies = [ "async-trait", "auth", @@ -2056,7 +2056,7 @@ dependencies = [ "similar-asserts", "snafu 0.8.5", "store-api", - "substrait 0.14.1", + "substrait 0.14.2", "table", "temp-env", "tempfile", @@ -2102,7 +2102,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335" [[package]] name = "common-base" -version = "0.14.1" +version = "0.14.2" dependencies = [ "anymap2", "async-trait", @@ -2124,11 +2124,11 @@ dependencies = [ [[package]] name = "common-catalog" -version = "0.14.1" +version = "0.14.2" [[package]] name = "common-config" -version = "0.14.1" +version = "0.14.2" dependencies = [ "common-base", "common-error", @@ -2153,7 +2153,7 @@ dependencies = [ [[package]] name = "common-datasource" -version = "0.14.1" +version = "0.14.2" dependencies = [ "arrow 54.2.1", "arrow-schema 54.3.1", @@ -2190,7 +2190,7 @@ dependencies = [ [[package]] name = "common-decimal" -version = "0.14.1" +version = "0.14.2" dependencies = [ "bigdecimal 0.4.8", "common-error", @@ -2203,7 +2203,7 @@ dependencies = [ [[package]] name = "common-error" -version = "0.14.1" +version = "0.14.2" dependencies = [ "common-macro", "http 1.1.0", @@ -2214,7 +2214,7 @@ dependencies = [ [[package]] name = "common-frontend" -version = "0.14.1" +version = "0.14.2" dependencies = [ "async-trait", "common-error", @@ -2224,7 +2224,7 @@ dependencies = [ [[package]] name = "common-function" -version = "0.14.1" +version = "0.14.2" dependencies = [ "ahash 0.8.11", "api", @@ -2277,7 +2277,7 @@ dependencies = [ [[package]] name = "common-greptimedb-telemetry" -version = "0.14.1" +version = "0.14.2" dependencies = [ "async-trait", "common-runtime", @@ -2294,7 +2294,7 @@ dependencies = [ [[package]] name = "common-grpc" -version = "0.14.1" +version = "0.14.2" dependencies = [ "api", "arrow-flight", @@ -2325,7 +2325,7 @@ dependencies = [ [[package]] name = "common-grpc-expr" -version = "0.14.1" +version = "0.14.2" dependencies = [ "api", "common-base", @@ -2344,7 +2344,7 @@ dependencies = [ [[package]] name = "common-macro" -version = "0.14.1" +version = "0.14.2" dependencies = [ "arc-swap", "common-query", @@ -2358,7 +2358,7 @@ dependencies = [ [[package]] name = "common-mem-prof" -version = "0.14.1" +version = "0.14.2" dependencies = [ "common-error", "common-macro", @@ -2371,7 +2371,7 @@ dependencies = [ [[package]] name = "common-meta" -version = "0.14.1" +version = "0.14.2" dependencies = [ "anymap2", "api", @@ -2432,7 +2432,7 @@ dependencies = [ [[package]] name = "common-options" -version = "0.14.1" +version = "0.14.2" dependencies = [ "common-grpc", "humantime-serde", @@ -2441,11 +2441,11 @@ dependencies = [ [[package]] name = "common-plugins" -version = "0.14.1" +version = "0.14.2" [[package]] name = "common-pprof" -version = "0.14.1" +version = "0.14.2" dependencies = [ "common-error", "common-macro", @@ -2457,7 +2457,7 @@ dependencies = [ [[package]] name = "common-procedure" -version = "0.14.1" +version = "0.14.2" dependencies = [ "async-stream", "async-trait", @@ -2484,7 +2484,7 @@ dependencies = [ [[package]] name = "common-procedure-test" -version = "0.14.1" +version = "0.14.2" dependencies = [ "async-trait", "common-procedure", @@ -2493,7 +2493,7 @@ dependencies = [ [[package]] name = "common-query" -version = "0.14.1" +version = "0.14.2" dependencies = [ "api", "async-trait", @@ -2519,7 +2519,7 @@ dependencies = [ [[package]] name = "common-recordbatch" -version = "0.14.1" +version = "0.14.2" dependencies = [ "arc-swap", "common-error", @@ -2539,7 +2539,7 @@ dependencies = [ [[package]] name = "common-runtime" -version = "0.14.1" +version = "0.14.2" dependencies = [ "async-trait", "clap 4.5.19", @@ -2569,14 +2569,14 @@ dependencies = [ [[package]] name = "common-session" -version = "0.14.1" +version = "0.14.2" dependencies = [ "strum 0.27.1", ] [[package]] name = "common-telemetry" -version = "0.14.1" +version = "0.14.2" dependencies = [ "atty", "backtrace", @@ -2604,7 +2604,7 @@ dependencies = [ [[package]] name = "common-test-util" -version = "0.14.1" +version = "0.14.2" dependencies = [ "client", "common-query", @@ -2616,7 +2616,7 @@ dependencies = [ [[package]] name = "common-time" -version = "0.14.1" +version = "0.14.2" dependencies = [ "arrow 54.2.1", "chrono", @@ -2634,7 +2634,7 @@ dependencies = [ [[package]] name = "common-version" -version = "0.14.1" +version = "0.14.2" dependencies = [ "build-data", "const_format", @@ -2644,7 +2644,7 @@ dependencies = [ [[package]] name = "common-wal" -version = "0.14.1" +version = "0.14.2" dependencies = [ "common-base", "common-error", @@ -3572,7 +3572,7 @@ dependencies = [ [[package]] name = "datanode" -version = "0.14.1" +version = "0.14.2" dependencies = [ "api", "arrow-flight", @@ -3624,7 +3624,7 @@ dependencies = [ "session", "snafu 0.8.5", "store-api", - "substrait 0.14.1", + "substrait 0.14.2", "table", "tokio", "toml 0.8.19", @@ -3633,7 +3633,7 @@ dependencies = [ [[package]] name = "datatypes" -version = "0.14.1" +version = "0.14.2" dependencies = [ "arrow 54.2.1", "arrow-array 54.2.1", @@ -4259,7 +4259,7 @@ dependencies = [ [[package]] name = "file-engine" -version = "0.14.1" +version = "0.14.2" dependencies = [ "api", "async-trait", @@ -4382,7 +4382,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8" [[package]] name = "flow" -version = "0.14.1" +version = "0.14.2" dependencies = [ "api", "arrow 54.2.1", @@ -4444,7 +4444,7 @@ dependencies = [ "snafu 0.8.5", "store-api", "strum 0.27.1", - "substrait 0.14.1", + "substrait 0.14.2", "table", "tokio", "tonic 0.12.3", @@ -4499,7 +4499,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" [[package]] name = "frontend" -version = "0.14.1" +version = "0.14.2" dependencies = [ "api", "arc-swap", @@ -4556,7 +4556,7 @@ dependencies = [ "sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)", "store-api", "strfmt", - "substrait 0.14.1", + "substrait 0.14.2", "table", "tokio", "toml 0.8.19", @@ -5795,7 +5795,7 @@ dependencies = [ [[package]] name = "index" -version = "0.14.1" +version = "0.14.2" dependencies = [ "async-trait", "asynchronous-codec", @@ -6605,7 +6605,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" [[package]] name = "log-query" -version = "0.14.1" +version = "0.14.2" dependencies = [ "chrono", "common-error", @@ -6617,7 +6617,7 @@ dependencies = [ [[package]] name = "log-store" -version = "0.14.1" +version = "0.14.2" dependencies = [ "async-stream", "async-trait", @@ -6911,7 +6911,7 @@ dependencies = [ [[package]] name = "meta-client" -version = "0.14.1" +version = "0.14.2" dependencies = [ "api", "async-trait", @@ -6939,7 +6939,7 @@ dependencies = [ [[package]] name = "meta-srv" -version = "0.14.1" +version = "0.14.2" dependencies = [ "api", "async-trait", @@ -7029,7 +7029,7 @@ dependencies = [ [[package]] name = "metric-engine" -version = "0.14.1" +version = "0.14.2" dependencies = [ "api", "aquamarine", @@ -7118,7 +7118,7 @@ dependencies = [ [[package]] name = "mito2" -version = "0.14.1" +version = "0.14.2" dependencies = [ "api", "aquamarine", @@ -7824,7 +7824,7 @@ dependencies = [ [[package]] name = "object-store" -version = "0.14.1" +version = "0.14.2" dependencies = [ "anyhow", "bytes", @@ -8119,7 +8119,7 @@ dependencies = [ [[package]] name = "operator" -version = "0.14.1" +version = "0.14.2" dependencies = [ "ahash 0.8.11", "api", @@ -8168,7 +8168,7 @@ dependencies = [ "sql", "sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)", "store-api", - "substrait 0.14.1", + "substrait 0.14.2", "table", "tokio", "tokio-util", @@ -8423,7 +8423,7 @@ dependencies = [ [[package]] name = "partition" -version = "0.14.1" +version = "0.14.2" dependencies = [ "api", "async-trait", @@ -8705,7 +8705,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pipeline" -version = "0.14.1" +version = "0.14.2" dependencies = [ "ahash 0.8.11", "api", @@ -8847,7 +8847,7 @@ dependencies = [ [[package]] name = "plugins" -version = "0.14.1" +version = "0.14.2" dependencies = [ "auth", "clap 4.5.19", @@ -9127,7 +9127,7 @@ dependencies = [ [[package]] name = "promql" -version = "0.14.1" +version = "0.14.2" dependencies = [ "ahash 0.8.11", "async-trait", @@ -9373,7 +9373,7 @@ dependencies = [ [[package]] name = "puffin" -version = "0.14.1" +version = "0.14.2" dependencies = [ "async-compression 0.4.13", "async-trait", @@ -9414,7 +9414,7 @@ dependencies = [ [[package]] name = "query" -version = "0.14.1" +version = "0.14.2" dependencies = [ "ahash 0.8.11", "api", @@ -9480,7 +9480,7 @@ dependencies = [ "sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)", "statrs", "store-api", - "substrait 0.14.1", + "substrait 0.14.2", "table", "tokio", "tokio-stream", @@ -10830,7 +10830,7 @@ dependencies = [ [[package]] name = "servers" -version = "0.14.1" +version = "0.14.2" dependencies = [ "ahash 0.8.11", "api", @@ -10950,7 +10950,7 @@ dependencies = [ [[package]] name = "session" -version = "0.14.1" +version = "0.14.2" dependencies = [ "api", "arc-swap", @@ -11275,7 +11275,7 @@ dependencies = [ [[package]] name = "sql" -version = "0.14.1" +version = "0.14.2" dependencies = [ "api", "chrono", @@ -11330,7 +11330,7 @@ dependencies = [ [[package]] name = "sqlness-runner" -version = "0.14.1" +version = "0.14.2" dependencies = [ "async-trait", "clap 4.5.19", @@ -11649,7 +11649,7 @@ dependencies = [ [[package]] name = "store-api" -version = "0.14.1" +version = "0.14.2" dependencies = [ "api", "aquamarine", @@ -11798,7 +11798,7 @@ dependencies = [ [[package]] name = "substrait" -version = "0.14.1" +version = "0.14.2" dependencies = [ "async-trait", "bytes", @@ -11978,7 +11978,7 @@ dependencies = [ [[package]] name = "table" -version = "0.14.1" +version = "0.14.2" dependencies = [ "api", "async-trait", @@ -12229,7 +12229,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" [[package]] name = "tests-fuzz" -version = "0.14.1" +version = "0.14.2" dependencies = [ "arbitrary", "async-trait", @@ -12273,7 +12273,7 @@ dependencies = [ [[package]] name = "tests-integration" -version = "0.14.1" +version = "0.14.2" dependencies = [ "api", "arrow-flight", @@ -12340,7 +12340,7 @@ dependencies = [ "sql", "sqlx", "store-api", - "substrait 0.14.1", + "substrait 0.14.2", "table", "tempfile", "time", diff --git a/Cargo.toml b/Cargo.toml index f4d4af282d..1d4614b35a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,7 +68,7 @@ members = [ resolver = "2" [workspace.package] -version = "0.14.1" +version = "0.14.2" edition = "2021" license = "Apache-2.0" @@ -162,7 +162,7 @@ paste = "1.0" pin-project = "1.0" prometheus = { version = "0.13.3", features = ["process"] } promql-parser = { version = "0.5.1", features = ["ser"] } -prost = "0.13" +prost = { version = "0.13", features = ["no-recursion-limit"] } raft-engine = { version = "0.4.1", default-features = false } rand = "0.9" ratelimit = "0.10" diff --git a/src/client/src/database.rs b/src/client/src/database.rs index c9dc9b08e5..863b6b6419 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -36,8 +36,8 @@ use common_grpc::flight::{FlightDecoder, FlightMessage}; use common_query::Output; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::RecordBatchStreamWrapper; -use common_telemetry::error; use common_telemetry::tracing_context::W3cTrace; +use common_telemetry::{error, warn}; use futures::future; use futures_util::{Stream, StreamExt, TryStreamExt}; use prost::Message; @@ -192,6 +192,36 @@ impl Database { from_grpc_response(response) } + /// Retry if connection fails, max_retries is the max number of retries, so the total wait time + /// is `max_retries * GRPC_CONN_TIMEOUT` + pub async fn handle_with_retry(&self, request: Request, max_retries: u32) -> Result { + let mut client = make_database_client(&self.client)?.inner; + let mut retries = 0; + let request = self.to_rpc_request(request); + loop { + let raw_response = client.handle(request.clone()).await; + match (raw_response, retries < max_retries) { + (Ok(resp), _) => return from_grpc_response(resp.into_inner()), + (Err(err), true) => { + // determine if the error is retryable + if is_grpc_retryable(&err) { + // retry + retries += 1; + warn!("Retrying {} times with error = {:?}", retries, err); + continue; + } + } + (Err(err), false) => { + error!( + "Failed to send request to grpc handle after {} retries, error = {:?}", + retries, err + ); + return Err(err.into()); + } + } + } + } + #[inline] fn to_rpc_request(&self, request: Request) -> GreptimeRequest { GreptimeRequest { @@ -368,6 +398,11 @@ impl Database { } } +/// by grpc standard, only `Unavailable` is retryable, see: https://github.com/grpc/grpc/blob/master/doc/statuscodes.md#status-codes-and-their-use-in-grpc +pub fn is_grpc_retryable(err: &tonic::Status) -> bool { + matches!(err.code(), tonic::Code::Unavailable) +} + #[derive(Default, Debug, Clone)] struct FlightContext { auth_header: Option, diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index da017e71cd..fcd8ca8fa9 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt; use std::time::Duration; use async_trait::async_trait; @@ -131,7 +132,7 @@ impl SubCommand { } } -#[derive(Debug, Default, Parser)] +#[derive(Default, Parser)] pub struct StartCommand { /// The address to bind the gRPC server. #[clap(long, alias = "bind-addr")] @@ -171,6 +172,27 @@ pub struct StartCommand { backend: Option, } +impl fmt::Debug for StartCommand { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("StartCommand") + .field("rpc_bind_addr", &self.rpc_bind_addr) + .field("rpc_server_addr", &self.rpc_server_addr) + .field("store_addrs", &self.sanitize_store_addrs()) + .field("config_file", &self.config_file) + .field("selector", &self.selector) + .field("use_memory_store", &self.use_memory_store) + .field("enable_region_failover", &self.enable_region_failover) + .field("http_addr", &self.http_addr) + .field("http_timeout", &self.http_timeout) + .field("env_prefix", &self.env_prefix) + .field("data_home", &self.data_home) + .field("store_key_prefix", &self.store_key_prefix) + .field("max_txn_ops", &self.max_txn_ops) + .field("backend", &self.backend) + .finish() + } +} + impl StartCommand { pub fn load_options(&self, global_options: &GlobalOptions) -> Result { let mut opts = MetasrvOptions::load_layered_options( @@ -184,6 +206,15 @@ impl StartCommand { Ok(opts) } + fn sanitize_store_addrs(&self) -> Option> { + self.store_addrs.as_ref().map(|addrs| { + addrs + .iter() + .map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr)) + .collect() + }) + } + // The precedence order is: cli > config file > environment variables > default values. fn merge_with_cli_options( &self, diff --git a/src/common/meta/src/kv_backend.rs b/src/common/meta/src/kv_backend.rs index 05c7348fa4..747d1149c4 100644 --- a/src/common/meta/src/kv_backend.rs +++ b/src/common/meta/src/kv_backend.rs @@ -35,7 +35,7 @@ pub mod memory; pub mod rds; pub mod test; pub mod txn; - +pub mod util; pub type KvBackendRef = Arc + Send + Sync>; #[async_trait] diff --git a/src/common/meta/src/kv_backend/util.rs b/src/common/meta/src/kv_backend/util.rs new file mode 100644 index 0000000000..1021d78a60 --- /dev/null +++ b/src/common/meta/src/kv_backend/util.rs @@ -0,0 +1,85 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Removes sensitive information like passwords from connection strings. +/// +/// This function sanitizes connection strings by removing credentials: +/// - For URL format (mysql://user:password@host:port/db): Removes everything before '@' +/// - For parameter format (host=localhost password=secret): Removes the password parameter +/// - For URL format without credentials (mysql://host:port/db): Removes the protocol prefix +/// +/// # Arguments +/// +/// * `conn_str` - The connection string to sanitize +/// +/// # Returns +/// +/// A sanitized version of the connection string with sensitive information removed +pub fn sanitize_connection_string(conn_str: &str) -> String { + // Case 1: URL format with credentials (mysql://user:password@host:port/db) + // Extract everything after the '@' symbol + if let Some(at_pos) = conn_str.find('@') { + return conn_str[at_pos + 1..].to_string(); + } + + // Case 2: Parameter format with password (host=localhost password=secret dbname=mydb) + // Filter out any parameter that starts with "password=" + if conn_str.contains("password=") { + return conn_str + .split_whitespace() + .filter(|param| !param.starts_with("password=")) + .collect::>() + .join(" "); + } + + // Case 3: URL format without credentials (mysql://host:port/db) + // Extract everything after the protocol prefix + if let Some(host_part) = conn_str.split("://").nth(1) { + return host_part.to_string(); + } + + // Case 4: Already sanitized or unknown format + // Return as is + conn_str.to_string() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_sanitize_connection_string() { + // Test URL format with username/password + let conn_str = "mysql://user:password123@localhost:3306/db"; + assert_eq!(sanitize_connection_string(conn_str), "localhost:3306/db"); + + // Test URL format without credentials + let conn_str = "mysql://localhost:3306/db"; + assert_eq!(sanitize_connection_string(conn_str), "localhost:3306/db"); + + // Test parameter format with password + let conn_str = "host=localhost port=5432 user=postgres password=secret dbname=mydb"; + assert_eq!( + sanitize_connection_string(conn_str), + "host=localhost port=5432 user=postgres dbname=mydb" + ); + + // Test parameter format without password + let conn_str = "host=localhost port=5432 user=postgres dbname=mydb"; + assert_eq!( + sanitize_connection_string(conn_str), + "host=localhost port=5432 user=postgres dbname=mydb" + ); + } +} diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 25cb5a9d0c..b0a02eb46e 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -398,45 +398,46 @@ impl DatanodeBuilder { schema_metadata_manager: SchemaMetadataManagerRef, plugins: Plugins, ) -> Result> { - let mut engines = vec![]; - let mut metric_engine_config = opts.region_engine.iter().find_map(|c| match c { - RegionEngineConfig::Metric(config) => Some(config.clone()), - _ => None, - }); + let mut metric_engine_config = metric_engine::config::EngineConfig::default(); + let mut mito_engine_config = MitoConfig::default(); + let mut file_engine_config = file_engine::config::EngineConfig::default(); for engine in &opts.region_engine { match engine { RegionEngineConfig::Mito(config) => { - let mito_engine = Self::build_mito_engine( - opts, - object_store_manager.clone(), - config.clone(), - schema_metadata_manager.clone(), - plugins.clone(), - ) - .await?; - - let metric_engine = MetricEngine::try_new( - mito_engine.clone(), - metric_engine_config.take().unwrap_or_default(), - ) - .context(BuildMetricEngineSnafu)?; - engines.push(Arc::new(mito_engine) as _); - engines.push(Arc::new(metric_engine) as _); + mito_engine_config = config.clone(); } RegionEngineConfig::File(config) => { - let engine = FileRegionEngine::new( - config.clone(), - object_store_manager.default_object_store().clone(), // TODO: implement custom storage for file engine - ); - engines.push(Arc::new(engine) as _); + file_engine_config = config.clone(); } - RegionEngineConfig::Metric(_) => { - // Already handled in `build_mito_engine`. + RegionEngineConfig::Metric(metric_config) => { + metric_engine_config = metric_config.clone(); } } } - Ok(engines) + + let mito_engine = Self::build_mito_engine( + opts, + object_store_manager.clone(), + mito_engine_config, + schema_metadata_manager.clone(), + plugins.clone(), + ) + .await?; + + let metric_engine = MetricEngine::try_new(mito_engine.clone(), metric_engine_config) + .context(BuildMetricEngineSnafu)?; + + let file_engine = FileRegionEngine::new( + file_engine_config, + object_store_manager.default_object_store().clone(), // TODO: implement custom storage for file engine + ); + + Ok(vec![ + Arc::new(mito_engine) as _, + Arc::new(metric_engine) as _, + Arc::new(file_engine) as _, + ]) } /// Builds [MitoEngine] according to options. diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index c49cbb97ef..6abc88b6cb 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -37,11 +37,12 @@ use tokio::sync::{Mutex, RwLock}; use crate::adapter::{CreateFlowArgs, StreamingEngine}; use crate::batching_mode::engine::BatchingEngine; +use crate::batching_mode::{FRONTEND_SCAN_TIMEOUT, MIN_REFRESH_DURATION}; use crate::engine::FlowEngine; use crate::error::{ CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, IllegalCheckTaskStateSnafu, - InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, SyncCheckTaskSnafu, - UnexpectedSnafu, + InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, NoAvailableFrontendSnafu, + SyncCheckTaskSnafu, UnexpectedSnafu, }; use crate::metrics::METRIC_FLOW_TASK_COUNT; use crate::repr::{self, DiffRow}; @@ -81,6 +82,11 @@ impl FlowDualEngine { } } + /// Determine if the engine is in distributed mode + pub fn is_distributed(&self) -> bool { + self.streaming_engine.node_id.is_some() + } + pub fn streaming_engine(&self) -> Arc { self.streaming_engine.clone() } @@ -89,6 +95,39 @@ impl FlowDualEngine { self.batching_engine.clone() } + /// In distributed mode, scan periodically(1s) until available frontend is found, or timeout, + /// in standalone mode, return immediately + /// notice here if any frontend appear in cluster info this function will return immediately + async fn wait_for_available_frontend(&self, timeout: std::time::Duration) -> Result<(), Error> { + if !self.is_distributed() { + return Ok(()); + } + let frontend_client = self.batching_engine().frontend_client.clone(); + let sleep_duration = std::time::Duration::from_millis(1_000); + let now = std::time::Instant::now(); + loop { + let frontend_list = frontend_client.scan_for_frontend().await?; + if !frontend_list.is_empty() { + let fe_list = frontend_list + .iter() + .map(|(_, info)| &info.peer.addr) + .collect::>(); + info!("Available frontend found: {:?}", fe_list); + return Ok(()); + } + let elapsed = now.elapsed(); + tokio::time::sleep(sleep_duration).await; + info!("Waiting for available frontend, elapsed={:?}", elapsed); + if elapsed >= timeout { + return NoAvailableFrontendSnafu { + timeout, + context: "No available frontend found in cluster info", + } + .fail(); + } + } + } + /// Try to sync with check task, this is only used in drop flow&flush flow, so a flow id is required /// /// the need to sync is to make sure flush flow actually get called @@ -338,18 +377,36 @@ struct ConsistentCheckTask { impl ConsistentCheckTask { async fn start_check_task(engine: &Arc) -> Result { - // first do recover flows - engine.check_flow_consistent(true, false).await?; - - let inner = engine.clone(); + let engine = engine.clone(); let (tx, mut rx) = tokio::sync::mpsc::channel(1); let (trigger_tx, mut trigger_rx) = tokio::sync::mpsc::channel::<(bool, bool, tokio::sync::oneshot::Sender<()>)>(10); let handle = common_runtime::spawn_global(async move { + // first check if available frontend is found + if let Err(err) = engine + .wait_for_available_frontend(FRONTEND_SCAN_TIMEOUT) + .await + { + warn!("No frontend is available yet:\n {err:?}"); + } + + // then do recover flows, if failed, always retry + let mut recover_retry = 0; + while let Err(err) = engine.check_flow_consistent(true, false).await { + recover_retry += 1; + error!( + "Failed to recover flows:\n {err:?}, retry {} in {}s", + recover_retry, + MIN_REFRESH_DURATION.as_secs() + ); + tokio::time::sleep(MIN_REFRESH_DURATION).await; + } + + // then do check flows, with configurable allow_create and allow_drop let (mut allow_create, mut allow_drop) = (false, false); let mut ret_signal: Option> = None; loop { - if let Err(err) = inner.check_flow_consistent(allow_create, allow_drop).await { + if let Err(err) = engine.check_flow_consistent(allow_create, allow_drop).await { error!(err; "Failed to check flow consistent"); } if let Some(done) = ret_signal.take() { @@ -534,7 +591,12 @@ impl FlowEngine for FlowDualEngine { match flow_type { Some(FlowType::Batching) => self.batching_engine.flush_flow(flow_id).await, Some(FlowType::Streaming) => self.streaming_engine.flush_flow(flow_id).await, - None => Ok(0), + None => { + warn!( + "Currently flow={flow_id} doesn't exist in flownode, ignore flush_flow request" + ); + Ok(0) + } } } diff --git a/src/flow/src/batching_mode.rs b/src/flow/src/batching_mode.rs index 031c7aad4b..0e6f3bbb25 100644 --- a/src/flow/src/batching_mode.rs +++ b/src/flow/src/batching_mode.rs @@ -31,10 +31,19 @@ pub const DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT: Duration = Duration::from_secs( pub const SLOW_QUERY_THRESHOLD: Duration = Duration::from_secs(60); /// The minimum duration between two queries execution by batching mode task -const MIN_REFRESH_DURATION: Duration = Duration::new(5, 0); +pub const MIN_REFRESH_DURATION: Duration = Duration::new(5, 0); /// Grpc connection timeout const GRPC_CONN_TIMEOUT: Duration = Duration::from_secs(5); /// Grpc max retry number const GRPC_MAX_RETRIES: u32 = 3; + +/// Flow wait for available frontend timeout, +/// if failed to find available frontend after FRONTEND_SCAN_TIMEOUT elapsed, return error +/// which should prevent flownode from starting +pub const FRONTEND_SCAN_TIMEOUT: Duration = Duration::from_secs(30); + +/// Frontend activity timeout +/// if frontend is down(not sending heartbeat) for more than FRONTEND_ACTIVITY_TIMEOUT, it will be removed from the list that flownode use to connect +pub const FRONTEND_ACTIVITY_TIMEOUT: Duration = Duration::from_secs(60); diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs index 6c667d56d5..fbfea7715c 100644 --- a/src/flow/src/batching_mode/engine.rs +++ b/src/flow/src/batching_mode/engine.rs @@ -49,7 +49,8 @@ use crate::{CreateFlowArgs, Error, FlowId, TableName}; pub struct BatchingEngine { tasks: RwLock>, shutdown_txs: RwLock>>, - frontend_client: Arc, + /// frontend client for insert request + pub(crate) frontend_client: Arc, flow_metadata_manager: FlowMetadataManagerRef, table_meta: TableMetadataManagerRef, catalog_manager: CatalogManagerRef, diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs index 9f16ea07fa..1821369f06 100644 --- a/src/flow/src/batching_mode/frontend_client.rs +++ b/src/flow/src/batching_mode/frontend_client.rs @@ -15,6 +15,7 @@ //! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user use std::sync::{Arc, Weak}; +use std::time::SystemTime; use api::v1::greptime_request::Request; use api::v1::CreateTableExpr; @@ -26,15 +27,17 @@ use common_meta::peer::Peer; use common_meta::rpc::store::RangeRequest; use common_query::Output; use common_telemetry::warn; +use itertools::Itertools; use meta_client::client::MetaClient; use servers::query_handler::grpc::GrpcQueryHandler; use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{OptionExt, ResultExt}; use crate::batching_mode::{ - DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, GRPC_CONN_TIMEOUT, GRPC_MAX_RETRIES, + DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT, + GRPC_MAX_RETRIES, }; -use crate::error::{ExternalSnafu, InvalidRequestSnafu, UnexpectedSnafu}; +use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu}; use crate::Error; /// Just like [`GrpcQueryHandler`] but use BoxedError @@ -127,10 +130,24 @@ impl DatabaseWithPeer { fn new(database: Database, peer: Peer) -> Self { Self { database, peer } } + + /// Try sending a "SELECT 1" to the database + async fn try_select_one(&self) -> Result<(), Error> { + // notice here use `sql` for `SELECT 1` return 1 row + let _ = self + .database + .sql("SELECT 1") + .await + .with_context(|_| InvalidRequestSnafu { + context: format!("Failed to handle `SELECT 1` request at {:?}", self.peer), + })?; + Ok(()) + } } impl FrontendClient { - async fn scan_for_frontend(&self) -> Result, Error> { + /// scan for available frontend from metadata + pub(crate) async fn scan_for_frontend(&self) -> Result, Error> { let Self::Distributed { meta_client, .. } = self else { return Ok(vec![]); }; @@ -160,8 +177,8 @@ impl FrontendClient { Ok(res) } - /// Get the database with max `last_activity_ts` - async fn get_last_active_frontend( + /// Get the database with maximum `last_activity_ts`& is able to process query + async fn get_latest_active_frontend( &self, catalog: &str, schema: &str, @@ -177,22 +194,50 @@ impl FrontendClient { .fail(); }; - let frontends = self.scan_for_frontend().await?; - let mut peer = None; + let mut interval = tokio::time::interval(GRPC_CONN_TIMEOUT); + interval.tick().await; + for retry in 0..GRPC_MAX_RETRIES { + let frontends = self.scan_for_frontend().await?; + let now_in_ms = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() as i64; - if let Some((_, val)) = frontends.iter().max_by_key(|(_, val)| val.last_activity_ts) { - peer = Some(val.peer.clone()); + // found node with maximum last_activity_ts + for (_, node_info) in frontends + .iter() + .sorted_by_key(|(_, node_info)| node_info.last_activity_ts) + .rev() + // filter out frontend that have been down for more than 1 min + .filter(|(_, node_info)| { + node_info.last_activity_ts + FRONTEND_ACTIVITY_TIMEOUT.as_millis() as i64 + > now_in_ms + }) + { + let addr = &node_info.peer.addr; + let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]); + let database = Database::new(catalog, schema, client); + let db = DatabaseWithPeer::new(database, node_info.peer.clone()); + match db.try_select_one().await { + Ok(_) => return Ok(db), + Err(e) => { + warn!( + "Failed to connect to frontend {} on retry={}: \n{e:?}", + addr, retry + ); + } + } + } + // no available frontend + // sleep and retry + interval.tick().await; } - let Some(peer) = peer else { - UnexpectedSnafu { - reason: format!("No frontend available: {:?}", frontends), - } - .fail()? - }; - let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![peer.addr.clone()]); - let database = Database::new(catalog, schema, client); - Ok(DatabaseWithPeer::new(database, peer)) + NoAvailableFrontendSnafu { + timeout: GRPC_CONN_TIMEOUT, + context: "No available frontend found that is able to process query", + } + .fail() } pub async fn create( @@ -222,38 +267,18 @@ impl FrontendClient { ) -> Result { match self { FrontendClient::Distributed { .. } => { - let db = self.get_last_active_frontend(catalog, schema).await?; + let db = self.get_latest_active_frontend(catalog, schema).await?; *peer_desc = Some(PeerDesc::Dist { peer: db.peer.clone(), }); - let mut retry = 0; - - loop { - let ret = db.database.handle(req.clone()).await.with_context(|_| { - InvalidRequestSnafu { - context: format!("Failed to handle request: {:?}", req), - } - }); - if let Err(err) = ret { - if retry < GRPC_MAX_RETRIES { - retry += 1; - warn!( - "Failed to send request to grpc handle at Peer={:?}, retry = {}, error = {:?}", - db.peer, retry, err - ); - continue; - } else { - common_telemetry::error!( - "Failed to send request to grpc handle at Peer={:?} after {} retries, error = {:?}", - db.peer, retry, err - ); - return Err(err); - } - } - return ret; - } + db.database + .handle_with_retry(req.clone(), GRPC_MAX_RETRIES) + .await + .with_context(|_| InvalidRequestSnafu { + context: format!("Failed to handle request at {:?}: {:?}", db.peer, req), + }) } FrontendClient::Standalone { database_client } => { let ctx = QueryContextBuilder::default() diff --git a/src/flow/src/error.rs b/src/flow/src/error.rs index 904a0a8fa7..06c776f9b5 100644 --- a/src/flow/src/error.rs +++ b/src/flow/src/error.rs @@ -61,6 +61,16 @@ pub enum Error { location: Location, }, + #[snafu(display( + "No available frontend found after timeout: {timeout:?}, context: {context}" + ))] + NoAvailableFrontend { + timeout: std::time::Duration, + context: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("External error"))] External { source: BoxedError, @@ -296,7 +306,8 @@ impl ErrorExt for Error { Self::Eval { .. } | Self::JoinTask { .. } | Self::Datafusion { .. } - | Self::InsertIntoFlow { .. } => StatusCode::Internal, + | Self::InsertIntoFlow { .. } + | Self::NoAvailableFrontend { .. } => StatusCode::Internal, Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists, Self::TableNotFound { .. } | Self::TableNotFoundMeta { .. } diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index 46c0d93301..e82a20dd81 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -172,6 +172,8 @@ impl FlownodeServer { } /// Start the background task for streaming computation. + /// + /// Should be called only after heartbeat is establish, hence can get cluster info async fn start_workers(&self) -> Result<(), Error> { let manager_ref = self.inner.flow_service.dual_engine.clone(); let handle = manager_ref diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 6c9111dd9c..41599ee083 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -14,7 +14,7 @@ pub mod builder; -use std::fmt::Display; +use std::fmt::{self, Display}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, RwLock}; use std::time::Duration; @@ -96,7 +96,7 @@ pub enum BackendImpl { MysqlStore, } -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, PartialEq, Serialize, Deserialize)] #[serde(default)] pub struct MetasrvOptions { /// The address the server listens on. @@ -166,6 +166,47 @@ pub struct MetasrvOptions { pub node_max_idle_time: Duration, } +impl fmt::Debug for MetasrvOptions { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut debug_struct = f.debug_struct("MetasrvOptions"); + debug_struct + .field("bind_addr", &self.bind_addr) + .field("server_addr", &self.server_addr) + .field("store_addrs", &self.sanitize_store_addrs()) + .field("selector", &self.selector) + .field("use_memory_store", &self.use_memory_store) + .field("enable_region_failover", &self.enable_region_failover) + .field( + "allow_region_failover_on_local_wal", + &self.allow_region_failover_on_local_wal, + ) + .field("http", &self.http) + .field("logging", &self.logging) + .field("procedure", &self.procedure) + .field("failure_detector", &self.failure_detector) + .field("datanode", &self.datanode) + .field("enable_telemetry", &self.enable_telemetry) + .field("data_home", &self.data_home) + .field("wal", &self.wal) + .field("export_metrics", &self.export_metrics) + .field("store_key_prefix", &self.store_key_prefix) + .field("max_txn_ops", &self.max_txn_ops) + .field("flush_stats_factor", &self.flush_stats_factor) + .field("tracing", &self.tracing) + .field("backend", &self.backend); + + #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] + debug_struct.field("meta_table_name", &self.meta_table_name); + + #[cfg(feature = "pg_kvbackend")] + debug_struct.field("meta_election_lock_id", &self.meta_election_lock_id); + + debug_struct + .field("node_max_idle_time", &self.node_max_idle_time) + .finish() + } +} + const DEFAULT_METASRV_ADDR_PORT: &str = "3002"; impl Default for MetasrvOptions { @@ -249,6 +290,13 @@ impl MetasrvOptions { common_telemetry::debug!("detect local IP is not supported on Android"); } } + + fn sanitize_store_addrs(&self) -> Vec { + self.store_addrs + .iter() + .map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr)) + .collect() + } } pub struct MetasrvInfo { diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index afae466dc0..8ea9ff3917 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -37,7 +37,7 @@ use common_meta::rpc::ddl::{ }; use common_meta::rpc::router::{Partition, Partition as MetaPartition}; use common_query::Output; -use common_telemetry::{debug, info, tracing}; +use common_telemetry::{debug, info, tracing, warn}; use common_time::Timezone; use datafusion_common::tree_node::TreeNodeVisitor; use datafusion_expr::LogicalPlan; @@ -369,7 +369,7 @@ impl StatementExecutor { query_context: QueryContextRef, ) -> Result { let flow_type = self - .determine_flow_type(&expr.sql, query_context.clone()) + .determine_flow_type(&expr, query_context.clone()) .await?; info!("determined flow={} type: {:#?}", expr.flow_name, flow_type); @@ -398,9 +398,49 @@ impl StatementExecutor { /// Determine the flow type based on the SQL query /// /// If it contains aggregation or distinct, then it is a batch flow, otherwise it is a streaming flow - async fn determine_flow_type(&self, sql: &str, query_ctx: QueryContextRef) -> Result { + async fn determine_flow_type( + &self, + expr: &CreateFlowExpr, + query_ctx: QueryContextRef, + ) -> Result { + // first check if source table's ttl is instant, if it is, force streaming mode + for src_table_name in &expr.source_table_names { + let table = self + .catalog_manager() + .table( + &src_table_name.catalog_name, + &src_table_name.schema_name, + &src_table_name.table_name, + Some(&query_ctx), + ) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: format_full_table_name( + &src_table_name.catalog_name, + &src_table_name.schema_name, + &src_table_name.table_name, + ), + })?; + + // instant source table can only be handled by streaming mode + if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) { + warn!( + "Source table `{}` for flow `{}`'s ttl=instant, fallback to streaming mode", + format_full_table_name( + &src_table_name.catalog_name, + &src_table_name.schema_name, + &src_table_name.table_name + ), + expr.flow_name + ); + return Ok(FlowType::Streaming); + } + } + let engine = &self.query_engine; - let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx) + let stmt = QueryLanguageParser::parse_sql(&expr.sql, &query_ctx) .map_err(BoxedError::new) .context(ExternalSnafu)?; let plan = engine diff --git a/tests/cases/standalone/common/flow/flow_advance_ttl.result b/tests/cases/standalone/common/flow/flow_advance_ttl.result index 3f5a940977..05ae665be8 100644 --- a/tests/cases/standalone/common/flow/flow_advance_ttl.result +++ b/tests/cases/standalone/common/flow/flow_advance_ttl.result @@ -8,7 +8,7 @@ CREATE TABLE distinct_basic ( Affected Rows: 0 --- should fail +-- should fallback to streaming mode -- SQLNESS REPLACE id=\d+ id=REDACTED CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS SELECT @@ -16,9 +16,151 @@ SELECT FROM distinct_basic; -Error: 3001(EngineExecuteQuery), Unsupported: Source table `greptime.public.distinct_basic`(id=REDACTED) has instant TTL, Instant TTL is not supported under batching mode. Consider using a TTL longer than flush interval +Affected Rows: 0 -ALTER TABLE distinct_basic SET 'ttl' = '5s'; +-- flow_options should have a flow_type:streaming +-- since source table's ttl=instant +SELECT flow_name, options FROM INFORMATION_SCHEMA.FLOWS; + ++---------------------+---------------------------+ +| flow_name | options | ++---------------------+---------------------------+ +| test_distinct_basic | {"flow_type":"streaming"} | ++---------------------+---------------------------+ + +SHOW CREATE TABLE distinct_basic; + ++----------------+-----------------------------------------------------------+ +| Table | Create Table | ++----------------+-----------------------------------------------------------+ +| distinct_basic | CREATE TABLE IF NOT EXISTS "distinct_basic" ( | +| | "number" INT NULL, | +| | "ts" TIMESTAMP(3) NOT NULL DEFAULT current_timestamp(), | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("number") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | ttl = 'instant' | +| | ) | ++----------------+-----------------------------------------------------------+ + +SHOW CREATE TABLE out_distinct_basic; + ++--------------------+---------------------------------------------------+ +| Table | Create Table | ++--------------------+---------------------------------------------------+ +| out_distinct_basic | CREATE TABLE IF NOT EXISTS "out_distinct_basic" ( | +| | "dis" INT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder"), | +| | PRIMARY KEY ("dis") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++--------------------+---------------------------------------------------+ + +-- SQLNESS SLEEP 3s +INSERT INTO + distinct_basic +VALUES + (20, "2021-07-01 00:00:00.200"), + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +Affected Rows: 0 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_distinct_basic'); + ++-----------------------------------------+ +| ADMIN FLUSH_FLOW('test_distinct_basic') | ++-----------------------------------------+ +| FLOW_FLUSHED | ++-----------------------------------------+ + +SELECT + dis +FROM + out_distinct_basic; + ++-----+ +| dis | ++-----+ +| 20 | +| 22 | ++-----+ + +SELECT number FROM distinct_basic; + +++ +++ + +-- SQLNESS SLEEP 6s +ADMIN FLUSH_TABLE('distinct_basic'); + ++-------------------------------------+ +| ADMIN FLUSH_TABLE('distinct_basic') | ++-------------------------------------+ +| 0 | ++-------------------------------------+ + +INSERT INTO + distinct_basic +VALUES + (23, "2021-07-01 00:00:01.600"); + +Affected Rows: 0 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_distinct_basic'); + ++-----------------------------------------+ +| ADMIN FLUSH_FLOW('test_distinct_basic') | ++-----------------------------------------+ +| FLOW_FLUSHED | ++-----------------------------------------+ + +SELECT + dis +FROM + out_distinct_basic; + ++-----+ +| dis | ++-----+ +| 20 | +| 22 | +| 23 | ++-----+ + +SELECT number FROM distinct_basic; + +++ +++ + +DROP FLOW test_distinct_basic; + +Affected Rows: 0 + +DROP TABLE distinct_basic; + +Affected Rows: 0 + +DROP TABLE out_distinct_basic; + +Affected Rows: 0 + +-- test ttl = 5s +CREATE TABLE distinct_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +)WITH ('ttl' = '5s'); Affected Rows: 0 @@ -30,7 +172,26 @@ FROM Affected Rows: 0 +-- flow_options should have a flow_type:batching +-- since source table's ttl=instant +SELECT flow_name, options FROM INFORMATION_SCHEMA.FLOWS; + ++---------------------+--------------------------+ +| flow_name | options | ++---------------------+--------------------------+ +| test_distinct_basic | {"flow_type":"batching"} | ++---------------------+--------------------------+ + -- SQLNESS ARG restart=true +SELECT 1; + ++----------+ +| Int64(1) | ++----------+ +| 1 | ++----------+ + +-- SQLNESS SLEEP 3s INSERT INTO distinct_basic VALUES @@ -130,41 +291,6 @@ ADMIN FLUSH_FLOW('test_distinct_basic'); | FLOW_FLUSHED | +-----------------------------------------+ -SHOW CREATE TABLE distinct_basic; - -+----------------+-----------------------------------------------------------+ -| Table | Create Table | -+----------------+-----------------------------------------------------------+ -| distinct_basic | CREATE TABLE IF NOT EXISTS "distinct_basic" ( | -| | "number" INT NULL, | -| | "ts" TIMESTAMP(3) NOT NULL DEFAULT current_timestamp(), | -| | TIME INDEX ("ts"), | -| | PRIMARY KEY ("number") | -| | ) | -| | | -| | ENGINE=mito | -| | WITH( | -| | ttl = '5s' | -| | ) | -+----------------+-----------------------------------------------------------+ - -SHOW CREATE TABLE out_distinct_basic; - -+--------------------+---------------------------------------------------+ -| Table | Create Table | -+--------------------+---------------------------------------------------+ -| out_distinct_basic | CREATE TABLE IF NOT EXISTS "out_distinct_basic" ( | -| | "dis" INT NULL, | -| | "update_at" TIMESTAMP(3) NULL, | -| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | -| | TIME INDEX ("__ts_placeholder"), | -| | PRIMARY KEY ("dis") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+--------------------+---------------------------------------------------+ - SELECT dis FROM diff --git a/tests/cases/standalone/common/flow/flow_advance_ttl.sql b/tests/cases/standalone/common/flow/flow_advance_ttl.sql index 2691af2b0c..141c595e89 100644 --- a/tests/cases/standalone/common/flow/flow_advance_ttl.sql +++ b/tests/cases/standalone/common/flow/flow_advance_ttl.sql @@ -6,7 +6,7 @@ CREATE TABLE distinct_basic ( TIME INDEX(ts) )WITH ('ttl' = 'instant'); --- should fail +-- should fallback to streaming mode -- SQLNESS REPLACE id=\d+ id=REDACTED CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS SELECT @@ -14,7 +14,61 @@ SELECT FROM distinct_basic; -ALTER TABLE distinct_basic SET 'ttl' = '5s'; +-- flow_options should have a flow_type:streaming +-- since source table's ttl=instant +SELECT flow_name, options FROM INFORMATION_SCHEMA.FLOWS; + +SHOW CREATE TABLE distinct_basic; + +SHOW CREATE TABLE out_distinct_basic; + +-- SQLNESS SLEEP 3s +INSERT INTO + distinct_basic +VALUES + (20, "2021-07-01 00:00:00.200"), + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_distinct_basic'); + +SELECT + dis +FROM + out_distinct_basic; + +SELECT number FROM distinct_basic; + +-- SQLNESS SLEEP 6s +ADMIN FLUSH_TABLE('distinct_basic'); + +INSERT INTO + distinct_basic +VALUES + (23, "2021-07-01 00:00:01.600"); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_distinct_basic'); + +SELECT + dis +FROM + out_distinct_basic; + +SELECT number FROM distinct_basic; + +DROP FLOW test_distinct_basic; +DROP TABLE distinct_basic; +DROP TABLE out_distinct_basic; + +-- test ttl = 5s +CREATE TABLE distinct_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +)WITH ('ttl' = '5s'); CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS SELECT @@ -22,7 +76,14 @@ SELECT FROM distinct_basic; +-- flow_options should have a flow_type:batching +-- since source table's ttl=instant +SELECT flow_name, options FROM INFORMATION_SCHEMA.FLOWS; + -- SQLNESS ARG restart=true +SELECT 1; + +-- SQLNESS SLEEP 3s INSERT INTO distinct_basic VALUES @@ -55,10 +116,6 @@ VALUES -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('test_distinct_basic'); -SHOW CREATE TABLE distinct_basic; - -SHOW CREATE TABLE out_distinct_basic; - SELECT dis FROM diff --git a/tests/cases/standalone/common/flow/flow_auto_sink_table.result b/tests/cases/standalone/common/flow/flow_auto_sink_table.result index de8a44fad7..f1d229e6e8 100644 --- a/tests/cases/standalone/common/flow/flow_auto_sink_table.result +++ b/tests/cases/standalone/common/flow/flow_auto_sink_table.result @@ -44,6 +44,15 @@ ADMIN FLUSH_FLOW('test_numbers_basic'); +----------------------------------------+ -- SQLNESS ARG restart=true +SELECT 1; + ++----------+ +| Int64(1) | ++----------+ +| 1 | ++----------+ + +-- SQLNESS SLEEP 3s SHOW CREATE TABLE out_num_cnt_basic; +-------------------+--------------------------------------------------+ @@ -101,6 +110,16 @@ GROUP BY Affected Rows: 0 +-- SQLNESS ARG restart=true +SELECT 1; + ++----------+ +| Int64(1) | ++----------+ +| 1 | ++----------+ + +-- SQLNESS SLEEP 3s SHOW CREATE TABLE out_num_cnt_basic; +-------------------+--------------------------------------------------+ @@ -118,6 +137,15 @@ SHOW CREATE TABLE out_num_cnt_basic; +-------------------+--------------------------------------------------+ -- SQLNESS ARG restart=true +SELECT 1; + ++----------+ +| Int64(1) | ++----------+ +| 1 | ++----------+ + +-- SQLNESS SLEEP 3s SHOW CREATE FLOW test_numbers_basic; +--------------------+---------------------------------------------------------------------------------------+ diff --git a/tests/cases/standalone/common/flow/flow_auto_sink_table.sql b/tests/cases/standalone/common/flow/flow_auto_sink_table.sql index ca76ba767e..61790272b8 100644 --- a/tests/cases/standalone/common/flow/flow_auto_sink_table.sql +++ b/tests/cases/standalone/common/flow/flow_auto_sink_table.sql @@ -20,6 +20,9 @@ SHOW CREATE TABLE out_num_cnt_basic; ADMIN FLUSH_FLOW('test_numbers_basic'); -- SQLNESS ARG restart=true +SELECT 1; + +-- SQLNESS SLEEP 3s SHOW CREATE TABLE out_num_cnt_basic; SHOW CREATE FLOW test_numbers_basic; @@ -44,10 +47,16 @@ FROM numbers_input_basic GROUP BY ts; +-- SQLNESS ARG restart=true +SELECT 1; +-- SQLNESS SLEEP 3s SHOW CREATE TABLE out_num_cnt_basic; -- SQLNESS ARG restart=true +SELECT 1; + +-- SQLNESS SLEEP 3s SHOW CREATE FLOW test_numbers_basic; SHOW CREATE TABLE out_num_cnt_basic; diff --git a/tests/cases/standalone/common/flow/flow_basic.result b/tests/cases/standalone/common/flow/flow_basic.result index 5b4e6b32ab..f222a9c331 100644 --- a/tests/cases/standalone/common/flow/flow_basic.result +++ b/tests/cases/standalone/common/flow/flow_basic.result @@ -62,6 +62,15 @@ SHOW CREATE TABLE out_num_cnt_basic; +-------------------+--------------------------------------------------+ -- SQLNESS ARG restart=true +SELECT 1; + ++----------+ +| Int64(1) | ++----------+ +| 1 | ++----------+ + +-- SQLNESS SLEEP 3s INSERT INTO numbers_input_basic VALUES @@ -206,6 +215,15 @@ SHOW CREATE TABLE out_basic; +-----------+---------------------------------------------+ -- SQLNESS ARG restart=true +SELECT 1; + ++----------+ +| Int64(1) | ++----------+ +| 1 | ++----------+ + +-- SQLNESS SLEEP 3s INSERT INTO input_basic VALUES @@ -306,6 +324,15 @@ ADMIN FLUSH_FLOW('test_distinct_basic'); +-----------------------------------------+ -- SQLNESS ARG restart=true +SELECT 1; + ++----------+ +| Int64(1) | ++----------+ +| 1 | ++----------+ + +-- SQLNESS SLEEP 3s INSERT INTO distinct_basic VALUES @@ -1665,6 +1692,15 @@ ADMIN FLUSH_FLOW('test_numbers_basic'); +----------------------------------------+ -- SQLNESS ARG restart=true +SELECT 1; + ++----------+ +| Int64(1) | ++----------+ +| 1 | ++----------+ + +-- SQLNESS SLEEP 3s INSERT INTO numbers_input_basic VALUES diff --git a/tests/cases/standalone/common/flow/flow_basic.sql b/tests/cases/standalone/common/flow/flow_basic.sql index 32598927ab..92310f98c4 100644 --- a/tests/cases/standalone/common/flow/flow_basic.sql +++ b/tests/cases/standalone/common/flow/flow_basic.sql @@ -24,6 +24,9 @@ ADMIN FLUSH_FLOW('test_numbers_basic'); SHOW CREATE TABLE out_num_cnt_basic; -- SQLNESS ARG restart=true +SELECT 1; + +-- SQLNESS SLEEP 3s INSERT INTO numbers_input_basic VALUES @@ -91,6 +94,9 @@ FROM SHOW CREATE TABLE out_basic; -- SQLNESS ARG restart=true +SELECT 1; + +-- SQLNESS SLEEP 3s INSERT INTO input_basic VALUES @@ -130,6 +136,9 @@ SHOW CREATE TABLE out_distinct_basic; ADMIN FLUSH_FLOW('test_distinct_basic'); -- SQLNESS ARG restart=true +SELECT 1; + +-- SQLNESS SLEEP 3s INSERT INTO distinct_basic VALUES @@ -788,6 +797,9 @@ SHOW CREATE TABLE out_num_cnt_basic; ADMIN FLUSH_FLOW('test_numbers_basic'); -- SQLNESS ARG restart=true +SELECT 1; + +-- SQLNESS SLEEP 3s INSERT INTO numbers_input_basic VALUES diff --git a/tests/cases/standalone/common/flow/flow_rebuild.result b/tests/cases/standalone/common/flow/flow_rebuild.result index 521bcdb5ae..70946a1739 100644 --- a/tests/cases/standalone/common/flow/flow_rebuild.result +++ b/tests/cases/standalone/common/flow/flow_rebuild.result @@ -730,10 +730,21 @@ SELECT key FROM api_stats; +-----+ -- SQLNESS ARG restart=true +SELECT 1; + ++----------+ +| Int64(1) | ++----------+ +| 1 | ++----------+ + +-- SQLNESS SLEEP 5s INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (now(), '2', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1); Affected Rows: 1 +-- wait more time so flownode have time to recover flows +-- SQLNESS SLEEP 5s -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('api_stats_flow'); diff --git a/tests/cases/standalone/common/flow/flow_rebuild.sql b/tests/cases/standalone/common/flow/flow_rebuild.sql index b2b6149761..170696ed26 100644 --- a/tests/cases/standalone/common/flow/flow_rebuild.sql +++ b/tests/cases/standalone/common/flow/flow_rebuild.sql @@ -399,8 +399,13 @@ ADMIN FLUSH_FLOW('api_stats_flow'); SELECT key FROM api_stats; -- SQLNESS ARG restart=true +SELECT 1; + +-- SQLNESS SLEEP 5s INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (now(), '2', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1); +-- wait more time so flownode have time to recover flows +-- SQLNESS SLEEP 5s -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('api_stats_flow'); diff --git a/tests/cases/standalone/common/flow/show_create_flow.result b/tests/cases/standalone/common/flow/show_create_flow.result index be58053824..2e28f09750 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.result +++ b/tests/cases/standalone/common/flow/show_create_flow.result @@ -263,6 +263,15 @@ SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORM -- makesure after recover should be the same -- SQLNESS ARG restart=true +SELECT 1; + ++----------+ +| Int64(1) | ++----------+ +| 1 | ++----------+ + +-- SQLNESS SLEEP 3s SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; +---------------------+---------------+-------------------------------------------------------------+------------------------------------+ diff --git a/tests/cases/standalone/common/flow/show_create_flow.sql b/tests/cases/standalone/common/flow/show_create_flow.sql index 1c5eed8c4e..15a6b13936 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.sql +++ b/tests/cases/standalone/common/flow/show_create_flow.sql @@ -108,6 +108,9 @@ SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORM -- makesure after recover should be the same -- SQLNESS ARG restart=true +SELECT 1; + +-- SQLNESS SLEEP 3s SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';