Compare commits

...

22 Commits

Author SHA1 Message Date
discord9
77e340270e Downgrade rust-toolchain.toml
DO NOT MERGE
2025-04-29 17:52:43 +08:00
Weny Xu
06e8d46ba9 feat: implement batch region opening in metric engine (#6017)
feat: implement batch open metric regions
2025-04-29 09:05:27 +00:00
zyy17
89661c0626 ci: fix the bugs of release-dev-builder-images and add update-dev-builder-image-tag (#6009)
* fix: the dev-builder release job is not triggered by merged event

* ci: add update-dev-builder-image-tag
2025-04-29 06:25:15 +00:00
Weny Xu
a3ae2d7b52 feat: flush leader region before downgrading (#5995)
* feat: flush leader region before downgrading

* test: add unit tests

* chore: apply suggestions from CR
2025-04-29 03:28:00 +00:00
Ruihang Xia
789f585a7f fix: disable recursion limit in prost (#6010)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-04-28 17:21:49 +00:00
jeremyhi
133f404547 fix: sanitize_connection_string (#6012) 2025-04-28 13:56:26 +00:00
discord9
bdd44fd7ec chore: only retry when retry-able in flow (#5987)
* chore: only retry when retry-able

* chore: revert dbg change

* refactor: per review

* fix: check for available frontend first

* docs: more explain&longer timeout&feat: more retry at every level&try send select 1

* fix: use `sql` method for "SELECT 1"

* fix: also put recover flows in spawned task and a dead loop

* test: update transient error in flow rebuild test

* chore: sleep after sqlness sleep

* chore: add a warning

* chore: wait even more time after reboot
2025-04-28 09:49:49 +00:00
Weny Xu
13ac4d5048 fix: only consider the datanode that reports the failure (#6004)
* fix: only consider the datanode that reports the failure

* chore: fix clippy
2025-04-28 06:08:02 +00:00
dennis zhuang
c6448a6ccc feat: remove own greatest fn (#5994) 2025-04-28 05:27:34 +00:00
Yingwen
86aae6733d fix: prune primary key with multiple columns may use default value as statistics (#5996)
* test: incorrect test result when filtering pk with multiple columns

* fix: prune non first tag correctly

Distinguish no column and no stats and only use default value when no
column

* test: update test result

* refactor: rename test file

* test: add test for null filter

* fix: use StatValues for null counts

* test: drop table

* test: fix unstable flow test
2025-04-28 04:53:30 +00:00
liyang
ed1ce8438f ci: update dev-builder image version to 2025-04-15-1a517ec8-202504280… (#6003)
ci: update dev-builder image version to 2025-04-15-1a517ec8-20250428023155
2025-04-28 03:34:31 +00:00
fys
4b921b8425 chore: make txn_helper pub (#6002)
chore: make txn_helper from pub(crate) to pub
2025-04-28 02:52:39 +00:00
Lei, HUANG
1a517ec8ac fix: check if memtable is empty by stats (#5989)
fix/checking-memtable-empty-and-stats:
 - **Refactor timestamp updates**: Simplified timestamp range updates in `PartitionTreeMemtable` and `TimeSeriesMemtable` by replacing `update_timestamp_range` with `fetch_max` and `fetch_min` methods for `max_timestamp` and `min_timestamp`.
   - Affected files: `partition_tree.rs`, `time_series.rs`

 - **Remove unused code**: Deleted the `update_timestamp_range` method from `WriteMetrics` and removed unnecessary imports.
   - Affected file: `stats.rs`

 - **Optimize memtable filtering**: Streamlined the check for empty memtables in `ScanRegion` by directly using `time_range`.
   - Affected file: `scan_region.rs`
2025-04-28 01:57:17 +00:00
discord9
21044c7339 feat: uddsketch_merge udaf (#5992) 2025-04-27 12:43:21 +00:00
Ning Sun
8e1ec2a201 chore: update nix for new toolchain (#5991) 2025-04-27 11:40:44 +00:00
Weny Xu
5ed0a095b6 feat: introduce RegionStatAwareSelector trait (#5990)
* feat: introduce `RegionStatAwareSelector`

* feat: exclude all failed datanodes

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* chore: apply suggestions from CR
2025-04-27 11:22:39 +00:00
shuiyisong
3c943be189 chore: update rust toolchain (#5818)
* chore: update nightly version

* chore: sort lint lines

* chore: minor fix

* chore: update nix

* chore: update toolchain to 2024-04-14

* chore: update toolchain to 2024-04-15

* chore: remove unnecessory test

* chore: do not assert oid in sqlness test

* chore: fix margin issue

* chore: fix cr issues

* chore: fix cr issues

---------

Co-authored-by: Ning Sun <sunning@greptime.com>
2025-04-27 09:02:36 +00:00
Ning Sun
eeba466717 ci: read next release version from toml by default (#5986)
* ci: read next release version from toml by default

* ci: send error message to stderr

* ci: take the first version only
2025-04-27 04:43:44 +00:00
Zhenchi
2ff54486d3 chore: bump main branch version to 0.15 (#5984)
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-04-27 01:39:44 +00:00
discord9
66e2242e46 fix: conn timeout&refactor: better err msg (#5974)
* fix: conn timeout&refactor: better err msg

* chore: clippy

* chore: make test work

* chore: comment

* todo: fix null cast

* fix: retry conn&udd_calc

* chore: comment

* chore: apply suggestion

---------

Co-authored-by: dennis zhuang <killme2008@gmail.com>
2025-04-25 19:12:30 +00:00
Ning Sun
489b16ae30 fix: security update (#5982) 2025-04-25 18:11:09 +00:00
dennis zhuang
85d564b0fb fix: upgrade sqlparse and validate align in range query (#5958)
* fix: upgrade sqlparse and validate align in range query

* update sqlparser to the merged commit

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Zhenchi <zhongzc_arch@outlook.com>
2025-04-25 17:34:49 +00:00
107 changed files with 2599 additions and 874 deletions

View File

@@ -10,17 +10,17 @@ set -e
function create_version() {
# Read from envrionment variables.
if [ -z "$GITHUB_EVENT_NAME" ]; then
echo "GITHUB_EVENT_NAME is empty"
echo "GITHUB_EVENT_NAME is empty" >&2
exit 1
fi
if [ -z "$NEXT_RELEASE_VERSION" ]; then
echo "NEXT_RELEASE_VERSION is empty"
exit 1
echo "NEXT_RELEASE_VERSION is empty, use version from Cargo.toml" >&2
export NEXT_RELEASE_VERSION=$(grep '^version = ' Cargo.toml | cut -d '"' -f 2 | head -n 1)
fi
if [ -z "$NIGHTLY_RELEASE_PREFIX" ]; then
echo "NIGHTLY_RELEASE_PREFIX is empty"
echo "NIGHTLY_RELEASE_PREFIX is empty" >&2
exit 1
fi
@@ -35,7 +35,7 @@ function create_version() {
# It will be like 'dev-2023080819-f0e7216c'.
if [ "$NEXT_RELEASE_VERSION" = dev ]; then
if [ -z "$COMMIT_SHA" ]; then
echo "COMMIT_SHA is empty in dev build"
echo "COMMIT_SHA is empty in dev build" >&2
exit 1
fi
echo "dev-$(date "+%Y%m%d-%s")-$(echo "$COMMIT_SHA" | cut -c1-8)"
@@ -45,7 +45,7 @@ function create_version() {
# Note: Only output 'version=xxx' to stdout when everything is ok, so that it can be used in GitHub Actions Outputs.
if [ "$GITHUB_EVENT_NAME" = push ]; then
if [ -z "$GITHUB_REF_NAME" ]; then
echo "GITHUB_REF_NAME is empty in push event"
echo "GITHUB_REF_NAME is empty in push event" >&2
exit 1
fi
echo "$GITHUB_REF_NAME"
@@ -54,7 +54,7 @@ function create_version() {
elif [ "$GITHUB_EVENT_NAME" = schedule ]; then
echo "$NEXT_RELEASE_VERSION-$NIGHTLY_RELEASE_PREFIX-$(date "+%Y%m%d")"
else
echo "Unsupported GITHUB_EVENT_NAME: $GITHUB_EVENT_NAME"
echo "Unsupported GITHUB_EVENT_NAME: $GITHUB_EVENT_NAME" >&2
exit 1
fi
}

37
.github/scripts/update-dev-builder-version.sh vendored Executable file
View File

@@ -0,0 +1,37 @@
#!/bin/bash
DEV_BUILDER_IMAGE_TAG=$1
update_dev_builder_version() {
if [ -z "$DEV_BUILDER_IMAGE_TAG" ]; then
echo "Error: Should specify the dev-builder image tag"
exit 1
fi
# Configure Git configs.
git config --global user.email greptimedb-ci@greptime.com
git config --global user.name greptimedb-ci
# Checkout a new branch.
BRANCH_NAME="ci/update-dev-builder-$(date +%Y%m%d%H%M%S)"
git checkout -b $BRANCH_NAME
# Update the dev-builder image tag in the Makefile.
gsed -i "s/DEV_BUILDER_IMAGE_TAG ?=.*/DEV_BUILDER_IMAGE_TAG ?= ${DEV_BUILDER_IMAGE_TAG}/g" Makefile
# Commit the changes.
git add Makefile
git commit -m "ci: update dev-builder image tag"
git push origin $BRANCH_NAME
# Create a Pull Request.
gh pr create \
--title "ci: update dev-builder image tag" \
--body "This PR updates the dev-builder image tag" \
--base main \
--head $BRANCH_NAME \
--reviewer zyy17 \
--reviewer daviderli614
}
update_dev_builder_version

View File

@@ -24,11 +24,19 @@ on:
description: Release dev-builder-android image
required: false
default: false
update_dev_builder_image_tag:
type: boolean
description: Update the DEV_BUILDER_IMAGE_TAG in Makefile and create a PR
required: false
default: false
jobs:
release-dev-builder-images:
name: Release dev builder images
if: ${{ inputs.release_dev_builder_ubuntu_image || inputs.release_dev_builder_centos_image || inputs.release_dev_builder_android_image }} # Only manually trigger this job.
# The jobs are triggered by the following events:
# 1. Manually triggered workflow_dispatch event
# 2. Push event when the PR that modifies the `rust-toolchain.toml` or `docker/dev-builder/**` is merged to main
if: ${{ github.event_name == 'push' || inputs.release_dev_builder_ubuntu_image || inputs.release_dev_builder_centos_image || inputs.release_dev_builder_android_image }}
runs-on: ubuntu-latest
outputs:
version: ${{ steps.set-version.outputs.version }}
@@ -57,9 +65,9 @@ jobs:
version: ${{ env.VERSION }}
dockerhub-image-registry-username: ${{ secrets.DOCKERHUB_USERNAME }}
dockerhub-image-registry-token: ${{ secrets.DOCKERHUB_TOKEN }}
build-dev-builder-ubuntu: ${{ inputs.release_dev_builder_ubuntu_image }}
build-dev-builder-centos: ${{ inputs.release_dev_builder_centos_image }}
build-dev-builder-android: ${{ inputs.release_dev_builder_android_image }}
build-dev-builder-ubuntu: ${{ inputs.release_dev_builder_ubuntu_image || github.event_name == 'push' }}
build-dev-builder-centos: ${{ inputs.release_dev_builder_centos_image || github.event_name == 'push' }}
build-dev-builder-android: ${{ inputs.release_dev_builder_android_image || github.event_name == 'push' }}
release-dev-builder-images-ecr:
name: Release dev builder images to AWS ECR
@@ -85,7 +93,7 @@ jobs:
- name: Push dev-builder-ubuntu image
shell: bash
if: ${{ inputs.release_dev_builder_ubuntu_image }}
if: ${{ inputs.release_dev_builder_ubuntu_image || github.event_name == 'push' }}
env:
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
@@ -106,7 +114,7 @@ jobs:
- name: Push dev-builder-centos image
shell: bash
if: ${{ inputs.release_dev_builder_centos_image }}
if: ${{ inputs.release_dev_builder_centos_image || github.event_name == 'push' }}
env:
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
@@ -127,7 +135,7 @@ jobs:
- name: Push dev-builder-android image
shell: bash
if: ${{ inputs.release_dev_builder_android_image }}
if: ${{ inputs.release_dev_builder_android_image || github.event_name == 'push' }}
env:
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
@@ -162,7 +170,7 @@ jobs:
- name: Push dev-builder-ubuntu image
shell: bash
if: ${{ inputs.release_dev_builder_ubuntu_image }}
if: ${{ inputs.release_dev_builder_ubuntu_image || github.event_name == 'push' }}
env:
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
@@ -176,7 +184,7 @@ jobs:
- name: Push dev-builder-centos image
shell: bash
if: ${{ inputs.release_dev_builder_centos_image }}
if: ${{ inputs.release_dev_builder_centos_image || github.event_name == 'push' }}
env:
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
@@ -190,7 +198,7 @@ jobs:
- name: Push dev-builder-android image
shell: bash
if: ${{ inputs.release_dev_builder_android_image }}
if: ${{ inputs.release_dev_builder_android_image || github.event_name == 'push' }}
env:
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
@@ -201,3 +209,24 @@ jobs:
quay.io/skopeo/stable:latest \
copy -a docker://docker.io/$IMAGE_NAMESPACE/dev-builder-android:$IMAGE_VERSION \
docker://$ACR_IMAGE_REGISTRY/$IMAGE_NAMESPACE/dev-builder-android:$IMAGE_VERSION
update-dev-builder-image-tag:
name: Update dev-builder image tag
runs-on: ubuntu-latest
permissions:
contents: write
pull-requests: write
if: ${{ github.event_name == 'push' || inputs.update_dev_builder_image_tag }}
needs: [
release-dev-builder-images
]
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Update dev-builder image tag
shell: bash
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
./.github/scripts/update-dev-builder-version.sh ${{ needs.release-dev-builder-images.outputs.version }}

View File

@@ -90,8 +90,6 @@ env:
# The scheduled version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-YYYYMMDD', like v0.2.0-nigthly-20230313;
NIGHTLY_RELEASE_PREFIX: nightly
# Note: The NEXT_RELEASE_VERSION should be modified manually by every formal release.
NEXT_RELEASE_VERSION: v0.14.0
jobs:
allocate-runners:
@@ -135,7 +133,6 @@ jobs:
env:
GITHUB_EVENT_NAME: ${{ github.event_name }}
GITHUB_REF_NAME: ${{ github.ref_name }}
NEXT_RELEASE_VERSION: ${{ env.NEXT_RELEASE_VERSION }}
NIGHTLY_RELEASE_PREFIX: ${{ env.NIGHTLY_RELEASE_PREFIX }}
- name: Allocate linux-amd64 runner

261
Cargo.lock generated
View File

@@ -173,9 +173,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.89"
version = "1.0.98"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6"
checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
[[package]]
name = "anymap2"
@@ -185,7 +185,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
[[package]]
name = "api"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"common-base",
"common-decimal",
@@ -915,7 +915,7 @@ dependencies = [
[[package]]
name = "auth"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"api",
"async-trait",
@@ -1537,7 +1537,7 @@ dependencies = [
[[package]]
name = "cache"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"catalog",
"common-error",
@@ -1561,7 +1561,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"api",
"arrow 54.2.1",
@@ -1597,7 +1597,7 @@ dependencies = [
"partition",
"paste",
"prometheus",
"rustc-hash 2.0.0",
"rustc-hash 2.1.1",
"serde_json",
"session",
"snafu 0.8.5",
@@ -1619,9 +1619,9 @@ dependencies = [
[[package]]
name = "cc"
version = "1.1.24"
version = "1.2.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "812acba72f0a070b003d3697490d2b55b837230ae7c6c6497f05cc2ddbb8d938"
checksum = "04da6a0d40b948dfc4fa8f5bbf402b0fc1a64a28dbf7d12ffd683550f2c1b63a"
dependencies = [
"jobserver",
"libc",
@@ -1874,7 +1874,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
[[package]]
name = "cli"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"async-trait",
"auth",
@@ -1917,7 +1917,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.14.0",
"substrait 0.15.0",
"table",
"tempfile",
"tokio",
@@ -1926,7 +1926,7 @@ dependencies = [
[[package]]
name = "client"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"api",
"arc-swap",
@@ -1955,7 +1955,7 @@ dependencies = [
"rand 0.9.0",
"serde_json",
"snafu 0.8.5",
"substrait 0.14.0",
"substrait 0.15.0",
"substrait 0.37.3",
"tokio",
"tokio-stream",
@@ -1996,7 +1996,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"async-trait",
"auth",
@@ -2056,7 +2056,7 @@ dependencies = [
"similar-asserts",
"snafu 0.8.5",
"store-api",
"substrait 0.14.0",
"substrait 0.15.0",
"table",
"temp-env",
"tempfile",
@@ -2102,7 +2102,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"anymap2",
"async-trait",
@@ -2124,11 +2124,11 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "0.14.0"
version = "0.15.0"
[[package]]
name = "common-config"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"common-base",
"common-error",
@@ -2153,7 +2153,7 @@ dependencies = [
[[package]]
name = "common-datasource"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"arrow 54.2.1",
"arrow-schema 54.3.1",
@@ -2190,7 +2190,7 @@ dependencies = [
[[package]]
name = "common-decimal"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"bigdecimal 0.4.8",
"common-error",
@@ -2203,7 +2203,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"common-macro",
"http 1.1.0",
@@ -2214,7 +2214,7 @@ dependencies = [
[[package]]
name = "common-frontend"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"async-trait",
"common-error",
@@ -2224,7 +2224,7 @@ dependencies = [
[[package]]
name = "common-function"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -2277,7 +2277,7 @@ dependencies = [
[[package]]
name = "common-greptimedb-telemetry"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"async-trait",
"common-runtime",
@@ -2294,7 +2294,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"api",
"arrow-flight",
@@ -2325,7 +2325,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"api",
"common-base",
@@ -2344,7 +2344,7 @@ dependencies = [
[[package]]
name = "common-macro"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"arc-swap",
"common-query",
@@ -2358,7 +2358,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"common-error",
"common-macro",
@@ -2371,7 +2371,7 @@ dependencies = [
[[package]]
name = "common-meta"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"anymap2",
"api",
@@ -2432,7 +2432,7 @@ dependencies = [
[[package]]
name = "common-options"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"common-grpc",
"humantime-serde",
@@ -2441,11 +2441,11 @@ dependencies = [
[[package]]
name = "common-plugins"
version = "0.14.0"
version = "0.15.0"
[[package]]
name = "common-pprof"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"common-error",
"common-macro",
@@ -2457,7 +2457,7 @@ dependencies = [
[[package]]
name = "common-procedure"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"async-stream",
"async-trait",
@@ -2484,7 +2484,7 @@ dependencies = [
[[package]]
name = "common-procedure-test"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"async-trait",
"common-procedure",
@@ -2493,7 +2493,7 @@ dependencies = [
[[package]]
name = "common-query"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"api",
"async-trait",
@@ -2510,7 +2510,7 @@ dependencies = [
"futures-util",
"serde",
"snafu 0.8.5",
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e98e6b322426a9d397a71efef17075966223c089)",
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"sqlparser_derive 0.1.1",
"statrs",
"store-api",
@@ -2519,7 +2519,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"arc-swap",
"common-error",
@@ -2539,7 +2539,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -2569,14 +2569,14 @@ dependencies = [
[[package]]
name = "common-session"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"strum 0.27.1",
]
[[package]]
name = "common-telemetry"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"atty",
"backtrace",
@@ -2604,7 +2604,7 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"client",
"common-query",
@@ -2616,7 +2616,7 @@ dependencies = [
[[package]]
name = "common-time"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"arrow 54.2.1",
"chrono",
@@ -2634,7 +2634,7 @@ dependencies = [
[[package]]
name = "common-version"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"build-data",
"const_format",
@@ -2644,7 +2644,7 @@ dependencies = [
[[package]]
name = "common-wal"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"common-base",
"common-error",
@@ -2946,9 +2946,9 @@ dependencies = [
[[package]]
name = "crossbeam-channel"
version = "0.5.13"
version = "0.5.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2"
checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2"
dependencies = [
"crossbeam-utils",
]
@@ -3110,14 +3110,14 @@ dependencies = [
[[package]]
name = "data-encoding"
version = "2.6.0"
version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2"
checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476"
[[package]]
name = "datafusion"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"arrow-array 54.2.1",
@@ -3168,7 +3168,7 @@ dependencies = [
[[package]]
name = "datafusion-catalog"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"async-trait",
@@ -3188,7 +3188,7 @@ dependencies = [
[[package]]
name = "datafusion-catalog-listing"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"arrow-schema 54.3.1",
@@ -3211,7 +3211,7 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3236,7 +3236,7 @@ dependencies = [
[[package]]
name = "datafusion-common-runtime"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"log",
"tokio",
@@ -3245,12 +3245,12 @@ dependencies = [
[[package]]
name = "datafusion-doc"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
[[package]]
name = "datafusion-execution"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"dashmap",
@@ -3268,7 +3268,7 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"chrono",
@@ -3288,7 +3288,7 @@ dependencies = [
[[package]]
name = "datafusion-expr-common"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"datafusion-common",
@@ -3299,7 +3299,7 @@ dependencies = [
[[package]]
name = "datafusion-functions"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"arrow-buffer 54.3.1",
@@ -3328,7 +3328,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-aggregate"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3349,7 +3349,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-aggregate-common"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3361,7 +3361,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-nested"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"arrow-array 54.2.1",
@@ -3383,7 +3383,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-table"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"async-trait",
@@ -3398,7 +3398,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-window"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"datafusion-common",
"datafusion-doc",
@@ -3414,7 +3414,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-window-common"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"datafusion-common",
"datafusion-physical-expr-common",
@@ -3423,7 +3423,7 @@ dependencies = [
[[package]]
name = "datafusion-macros"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"datafusion-expr",
"quote",
@@ -3433,7 +3433,7 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"chrono",
@@ -3451,7 +3451,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3474,7 +3474,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr-common"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3487,7 +3487,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-optimizer"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"arrow-schema 54.3.1",
@@ -3508,7 +3508,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-plan"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3538,7 +3538,7 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"arrow-array 54.2.1",
@@ -3556,7 +3556,7 @@ dependencies = [
[[package]]
name = "datafusion-substrait"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"async-recursion",
"async-trait",
@@ -3572,7 +3572,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"api",
"arrow-flight",
@@ -3624,7 +3624,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.14.0",
"substrait 0.15.0",
"table",
"tokio",
"toml 0.8.19",
@@ -3633,7 +3633,7 @@ dependencies = [
[[package]]
name = "datatypes"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"arrow 54.2.1",
"arrow-array 54.2.1",
@@ -3656,7 +3656,7 @@ dependencies = [
"serde",
"serde_json",
"snafu 0.8.5",
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e98e6b322426a9d397a71efef17075966223c089)",
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"sqlparser_derive 0.1.1",
]
@@ -4259,7 +4259,7 @@ dependencies = [
[[package]]
name = "file-engine"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"api",
"async-trait",
@@ -4382,7 +4382,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
[[package]]
name = "flow"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"api",
"arrow 54.2.1",
@@ -4444,7 +4444,7 @@ dependencies = [
"snafu 0.8.5",
"store-api",
"strum 0.27.1",
"substrait 0.14.0",
"substrait 0.15.0",
"table",
"tokio",
"tonic 0.12.3",
@@ -4499,7 +4499,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]]
name = "frontend"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"api",
"arc-swap",
@@ -4553,10 +4553,10 @@ dependencies = [
"session",
"snafu 0.8.5",
"sql",
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e98e6b322426a9d397a71efef17075966223c089)",
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"store-api",
"strfmt",
"substrait 0.14.0",
"substrait 0.15.0",
"table",
"tokio",
"toml 0.8.19",
@@ -5795,7 +5795,7 @@ dependencies = [
[[package]]
name = "index"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"async-trait",
"asynchronous-codec",
@@ -6599,13 +6599,13 @@ dependencies = [
[[package]]
name = "log"
version = "0.4.22"
version = "0.4.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94"
[[package]]
name = "log-query"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"chrono",
"common-error",
@@ -6617,7 +6617,7 @@ dependencies = [
[[package]]
name = "log-store"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"async-stream",
"async-trait",
@@ -6911,7 +6911,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"api",
"async-trait",
@@ -6939,7 +6939,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"api",
"async-trait",
@@ -7029,7 +7029,7 @@ dependencies = [
[[package]]
name = "metric-engine"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"api",
"aquamarine",
@@ -7118,7 +7118,7 @@ dependencies = [
[[package]]
name = "mito2"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"api",
"aquamarine",
@@ -7780,7 +7780,7 @@ version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af1844ef2428cc3e1cb900be36181049ef3d3193c63e43026cfe202983b27a56"
dependencies = [
"proc-macro-crate 1.3.1",
"proc-macro-crate 3.2.0",
"proc-macro2",
"quote",
"syn 2.0.100",
@@ -7824,7 +7824,7 @@ dependencies = [
[[package]]
name = "object-store"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"anyhow",
"bytes",
@@ -8119,7 +8119,7 @@ dependencies = [
[[package]]
name = "operator"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -8166,9 +8166,9 @@ dependencies = [
"session",
"snafu 0.8.5",
"sql",
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e98e6b322426a9d397a71efef17075966223c089)",
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"store-api",
"substrait 0.14.0",
"substrait 0.15.0",
"table",
"tokio",
"tokio-util",
@@ -8423,7 +8423,7 @@ dependencies = [
[[package]]
name = "partition"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"api",
"async-trait",
@@ -8443,7 +8443,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"sql",
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e98e6b322426a9d397a71efef17075966223c089)",
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"store-api",
"table",
]
@@ -8705,7 +8705,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipeline"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -8847,7 +8847,7 @@ dependencies = [
[[package]]
name = "plugins"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"auth",
"clap 4.5.19",
@@ -9127,7 +9127,7 @@ dependencies = [
[[package]]
name = "promql"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"ahash 0.8.11",
"async-trait",
@@ -9373,7 +9373,7 @@ dependencies = [
[[package]]
name = "puffin"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"async-compression 0.4.13",
"async-trait",
@@ -9414,7 +9414,7 @@ dependencies = [
[[package]]
name = "query"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -9477,10 +9477,10 @@ dependencies = [
"session",
"snafu 0.8.5",
"sql",
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e98e6b322426a9d397a71efef17075966223c089)",
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"statrs",
"store-api",
"substrait 0.14.0",
"substrait 0.15.0",
"table",
"tokio",
"tokio-stream",
@@ -9527,7 +9527,7 @@ dependencies = [
"pin-project-lite",
"quinn-proto",
"quinn-udp",
"rustc-hash 2.0.0",
"rustc-hash 2.1.1",
"rustls",
"socket2",
"thiserror 1.0.64",
@@ -9544,7 +9544,7 @@ dependencies = [
"bytes",
"rand 0.8.5",
"ring",
"rustc-hash 2.0.0",
"rustc-hash 2.1.1",
"rustls",
"slab",
"thiserror 1.0.64",
@@ -9821,9 +9821,9 @@ dependencies = [
[[package]]
name = "regex"
version = "1.11.0"
version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38200e5ee88914975b69f657f0801b6f6dccafd44fd9326302a4aaeecfacb1d8"
checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191"
dependencies = [
"aho-corasick",
"memchr",
@@ -10005,15 +10005,14 @@ dependencies = [
[[package]]
name = "ring"
version = "0.17.8"
version = "0.17.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d"
checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7"
dependencies = [
"cc",
"cfg-if",
"getrandom 0.2.15",
"libc",
"spin",
"untrusted",
"windows-sys 0.52.0",
]
@@ -10334,9 +10333,9 @@ checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc-hash"
version = "2.0.0"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "583034fd73374156e66797ed8e5b0d5690409c9226b22d87cb7f19821c05d152"
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
[[package]]
name = "rustc_version"
@@ -10831,7 +10830,7 @@ dependencies = [
[[package]]
name = "servers"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -10951,7 +10950,7 @@ dependencies = [
[[package]]
name = "session"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"api",
"arc-swap",
@@ -11159,9 +11158,9 @@ dependencies = [
[[package]]
name = "smallbitvec"
version = "2.5.3"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcc3fc564a4b53fd1e8589628efafe57602d91bde78be18186b5f61e8faea470"
checksum = "d31d263dd118560e1a492922182ab6ca6dc1d03a3bf54e7699993f31a4150e3f"
[[package]]
name = "smallvec"
@@ -11276,7 +11275,7 @@ dependencies = [
[[package]]
name = "sql"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"api",
"chrono",
@@ -11304,7 +11303,7 @@ dependencies = [
"serde",
"serde_json",
"snafu 0.8.5",
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e98e6b322426a9d397a71efef17075966223c089)",
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"sqlparser_derive 0.1.1",
"store-api",
"table",
@@ -11331,7 +11330,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -11373,7 +11372,7 @@ dependencies = [
[[package]]
name = "sqlparser"
version = "0.54.0"
source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e98e6b322426a9d397a71efef17075966223c089#e98e6b322426a9d397a71efef17075966223c089"
source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e#0cf6c04490d59435ee965edd2078e8855bd8471e"
dependencies = [
"lazy_static",
"log",
@@ -11381,7 +11380,7 @@ dependencies = [
"regex",
"serde",
"sqlparser 0.54.0 (registry+https://github.com/rust-lang/crates.io-index)",
"sqlparser_derive 0.3.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e98e6b322426a9d397a71efef17075966223c089)",
"sqlparser_derive 0.3.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
]
[[package]]
@@ -11409,7 +11408,7 @@ dependencies = [
[[package]]
name = "sqlparser_derive"
version = "0.3.0"
source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e98e6b322426a9d397a71efef17075966223c089#e98e6b322426a9d397a71efef17075966223c089"
source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e#0cf6c04490d59435ee965edd2078e8855bd8471e"
dependencies = [
"proc-macro2",
"quote",
@@ -11650,7 +11649,7 @@ dependencies = [
[[package]]
name = "store-api"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"api",
"aquamarine",
@@ -11799,7 +11798,7 @@ dependencies = [
[[package]]
name = "substrait"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"async-trait",
"bytes",
@@ -11979,7 +11978,7 @@ dependencies = [
[[package]]
name = "table"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"api",
"async-trait",
@@ -12230,7 +12229,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]]
name = "tests-fuzz"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"arbitrary",
"async-trait",
@@ -12264,7 +12263,7 @@ dependencies = [
"serde_yaml",
"snafu 0.8.5",
"sql",
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e98e6b322426a9d397a71efef17075966223c089)",
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"sqlx",
"store-api",
"strum 0.27.1",
@@ -12274,7 +12273,7 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"api",
"arrow-flight",
@@ -12341,7 +12340,7 @@ dependencies = [
"sql",
"sqlx",
"store-api",
"substrait 0.14.0",
"substrait 0.15.0",
"table",
"tempfile",
"time",

View File

@@ -68,15 +68,16 @@ members = [
resolver = "2"
[workspace.package]
version = "0.14.0"
version = "0.15.0"
edition = "2021"
license = "Apache-2.0"
[workspace.lints]
clippy.print_stdout = "warn"
clippy.print_stderr = "warn"
clippy.dbg_macro = "warn"
clippy.implicit_clone = "warn"
clippy.result_large_err = "allow"
clippy.large_enum_variant = "allow"
clippy.doc_overindented_list_items = "allow"
rust.unknown_lints = "deny"
rust.unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
@@ -112,15 +113,15 @@ clap = { version = "4.4", features = ["derive"] }
config = "0.13.0"
crossbeam-utils = "0.8"
dashmap = "6.1"
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" }
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" }
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" }
datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" }
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" }
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" }
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" }
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" }
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" }
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
deadpool = "0.12"
deadpool-postgres = "0.14"
derive_builder = "0.20"
@@ -162,7 +163,7 @@ paste = "1.0"
pin-project = "1.0"
prometheus = { version = "0.13.3", features = ["process"] }
promql-parser = { version = "0.5.1", features = ["ser"] }
prost = "0.13"
prost = { version = "0.13", features = ["no-recursion-limit"] }
raft-engine = { version = "0.4.1", default-features = false }
rand = "0.9"
ratelimit = "0.10"
@@ -191,7 +192,7 @@ simd-json = "0.15"
similar-asserts = "1.6.0"
smallvec = { version = "1", features = ["serde"] }
snafu = "0.8"
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "e98e6b322426a9d397a71efef17075966223c089", features = [
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "0cf6c04490d59435ee965edd2078e8855bd8471e", features = [
"visitor",
"serde",
] } # branch = "v0.54.x"

View File

@@ -8,7 +8,7 @@ CARGO_BUILD_OPTS := --locked
IMAGE_REGISTRY ?= docker.io
IMAGE_NAMESPACE ?= greptime
IMAGE_TAG ?= latest
DEV_BUILDER_IMAGE_TAG ?= 2024-12-25-a71b93dd-20250305072908
DEV_BUILDER_IMAGE_TAG ?= 2025-04-15-1a517ec8-20250428023155
BUILDX_MULTI_PLATFORM_BUILD ?= false
BUILDX_BUILDER_NAME ?= gtbuilder
BASE_IMAGE ?= ubuntu

18
flake.lock generated
View File

@@ -8,11 +8,11 @@
"rust-analyzer-src": "rust-analyzer-src"
},
"locked": {
"lastModified": 1737613896,
"narHash": "sha256-ldqXIglq74C7yKMFUzrS9xMT/EVs26vZpOD68Sh7OcU=",
"lastModified": 1745735608,
"narHash": "sha256-L0jzm815XBFfF2wCFmR+M1CF+beIEFj6SxlqVKF59Ec=",
"owner": "nix-community",
"repo": "fenix",
"rev": "303a062fdd8e89f233db05868468975d17855d80",
"rev": "c39a78eba6ed2a022cc3218db90d485077101496",
"type": "github"
},
"original": {
@@ -41,11 +41,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1737569578,
"narHash": "sha256-6qY0pk2QmUtBT9Mywdvif0i/CLVgpCjMUn6g9vB+f3M=",
"lastModified": 1745487689,
"narHash": "sha256-FQoi3R0NjQeBAsEOo49b5tbDPcJSMWc3QhhaIi9eddw=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "47addd76727f42d351590c905d9d1905ca895b82",
"rev": "5630cf13cceac06cefe9fc607e8dfa8fb342dde3",
"type": "github"
},
"original": {
@@ -65,11 +65,11 @@
"rust-analyzer-src": {
"flake": false,
"locked": {
"lastModified": 1737581772,
"narHash": "sha256-t1P2Pe3FAX9TlJsCZbmJ3wn+C4qr6aSMypAOu8WNsN0=",
"lastModified": 1745694049,
"narHash": "sha256-fxvRYH/tS7hGQeg9zCVh5RBcSWT+JGJet7RA8Ss+rC0=",
"owner": "rust-lang",
"repo": "rust-analyzer",
"rev": "582af7ee9c8d84f5d534272fc7de9f292bd849be",
"rev": "d8887c0758bbd2d5f752d5bd405d4491e90e7ed6",
"type": "github"
},
"original": {

View File

@@ -21,7 +21,7 @@
lib = nixpkgs.lib;
rustToolchain = fenix.packages.${system}.fromToolchainName {
name = (lib.importTOML ./rust-toolchain.toml).toolchain.channel;
sha256 = "sha256-f/CVA1EC61EWbh0SjaRNhLL0Ypx2ObupbzigZp8NmL4=";
sha256 = "sha256-arzEYlWLGGYeOhECHpBxQd2joZ4rPKV3qLNnZ+eql6A=";
};
in
{

View File

@@ -84,12 +84,6 @@ mod tests {
let key1 = "3178510";
let key2 = "4215648";
// have collision
assert_eq!(
oid_map.hasher.hash_one(key1) as u32,
oid_map.hasher.hash_one(key2) as u32
);
// insert them into oid_map
let oid1 = oid_map.get_oid(key1);
let oid2 = oid_map.get_oid(key2);

View File

@@ -36,8 +36,8 @@ use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_query::Output;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::RecordBatchStreamWrapper;
use common_telemetry::error;
use common_telemetry::tracing_context::W3cTrace;
use common_telemetry::{error, warn};
use futures::future;
use futures_util::{Stream, StreamExt, TryStreamExt};
use prost::Message;
@@ -192,6 +192,36 @@ impl Database {
from_grpc_response(response)
}
/// Retry if connection fails, max_retries is the max number of retries, so the total wait time
/// is `max_retries * GRPC_CONN_TIMEOUT`
pub async fn handle_with_retry(&self, request: Request, max_retries: u32) -> Result<u32> {
let mut client = make_database_client(&self.client)?.inner;
let mut retries = 0;
let request = self.to_rpc_request(request);
loop {
let raw_response = client.handle(request.clone()).await;
match (raw_response, retries < max_retries) {
(Ok(resp), _) => return from_grpc_response(resp.into_inner()),
(Err(err), true) => {
// determine if the error is retryable
if is_grpc_retryable(&err) {
// retry
retries += 1;
warn!("Retrying {} times with error = {:?}", retries, err);
continue;
}
}
(Err(err), false) => {
error!(
"Failed to send request to grpc handle after {} retries, error = {:?}",
retries, err
);
return Err(err.into());
}
}
}
}
#[inline]
fn to_rpc_request(&self, request: Request) -> GreptimeRequest {
GreptimeRequest {
@@ -368,6 +398,11 @@ impl Database {
}
}
/// by grpc standard, only `Unavailable` is retryable, see: https://github.com/grpc/grpc/blob/master/doc/statuscodes.md#status-codes-and-their-use-in-grpc
pub fn is_grpc_retryable(err: &tonic::Status) -> bool {
matches!(err.code(), tonic::Code::Unavailable)
}
#[derive(Default, Debug, Clone)]
struct FlightContext {
auth_header: Option<AuthHeader>,

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use std::time::Duration;
use async_trait::async_trait;
@@ -131,7 +132,7 @@ impl SubCommand {
}
}
#[derive(Debug, Default, Parser)]
#[derive(Default, Parser)]
pub struct StartCommand {
/// The address to bind the gRPC server.
#[clap(long, alias = "bind-addr")]
@@ -171,6 +172,27 @@ pub struct StartCommand {
backend: Option<BackendImpl>,
}
impl fmt::Debug for StartCommand {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StartCommand")
.field("rpc_bind_addr", &self.rpc_bind_addr)
.field("rpc_server_addr", &self.rpc_server_addr)
.field("store_addrs", &self.sanitize_store_addrs())
.field("config_file", &self.config_file)
.field("selector", &self.selector)
.field("use_memory_store", &self.use_memory_store)
.field("enable_region_failover", &self.enable_region_failover)
.field("http_addr", &self.http_addr)
.field("http_timeout", &self.http_timeout)
.field("env_prefix", &self.env_prefix)
.field("data_home", &self.data_home)
.field("store_key_prefix", &self.store_key_prefix)
.field("max_txn_ops", &self.max_txn_ops)
.field("backend", &self.backend)
.finish()
}
}
impl StartCommand {
pub fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
let mut opts = MetasrvOptions::load_layered_options(
@@ -184,6 +206,15 @@ impl StartCommand {
Ok(opts)
}
fn sanitize_store_addrs(&self) -> Option<Vec<String>> {
self.store_addrs.as_ref().map(|addrs| {
addrs
.iter()
.map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
.collect()
})
}
// The precedence order is: cli > config file > environment variables > default values.
fn merge_with_cli_options(
&self,

View File

@@ -19,4 +19,4 @@ mod uddsketch_state;
pub use geo_path::{GeoPathAccumulator, GEO_PATH_NAME};
pub(crate) use hll::HllStateType;
pub use hll::{HllState, HLL_MERGE_NAME, HLL_NAME};
pub use uddsketch_state::{UddSketchState, UDDSKETCH_STATE_NAME};
pub use uddsketch_state::{UddSketchState, UDDSKETCH_MERGE_NAME, UDDSKETCH_STATE_NAME};

View File

@@ -31,23 +31,28 @@ use datafusion::physical_plan::expressions::Literal;
use datafusion::prelude::create_udaf;
use datatypes::arrow::array::ArrayRef;
use datatypes::arrow::datatypes::{DataType, Float64Type};
use serde::{Deserialize, Serialize};
use uddsketch::{SketchHashKey, UDDSketch};
pub const UDDSKETCH_STATE_NAME: &str = "uddsketch_state";
#[derive(Debug)]
pub const UDDSKETCH_MERGE_NAME: &str = "uddsketch_merge";
#[derive(Debug, Serialize, Deserialize)]
pub struct UddSketchState {
uddsketch: UDDSketch,
error_rate: f64,
}
impl UddSketchState {
pub fn new(bucket_size: u64, error_rate: f64) -> Self {
Self {
uddsketch: UDDSketch::new(bucket_size, error_rate),
error_rate,
}
}
pub fn udf_impl() -> AggregateUDF {
pub fn state_udf_impl() -> AggregateUDF {
create_udaf(
UDDSKETCH_STATE_NAME,
vec![DataType::Int64, DataType::Float64, DataType::Float64],
@@ -61,18 +66,55 @@ impl UddSketchState {
)
}
/// Create a UDF for the `uddsketch_merge` function.
///
/// `uddsketch_merge` accepts bucket size, error rate, and a binary column of states generated by `uddsketch_state`
/// and merges them into a single state.
///
/// The bucket size and error rate must be the same as the original state.
pub fn merge_udf_impl() -> AggregateUDF {
create_udaf(
UDDSKETCH_MERGE_NAME,
vec![DataType::Int64, DataType::Float64, DataType::Binary],
Arc::new(DataType::Binary),
Volatility::Immutable,
Arc::new(|args| {
let (bucket_size, error_rate) = downcast_accumulator_args(args)?;
Ok(Box::new(UddSketchState::new(bucket_size, error_rate)))
}),
Arc::new(vec![DataType::Binary]),
)
}
fn update(&mut self, value: f64) {
self.uddsketch.add_value(value);
}
fn merge(&mut self, raw: &[u8]) {
if let Ok(uddsketch) = bincode::deserialize::<UDDSketch>(raw) {
if uddsketch.count() != 0 {
self.uddsketch.merge_sketch(&uddsketch);
fn merge(&mut self, raw: &[u8]) -> DfResult<()> {
if let Ok(uddsketch) = bincode::deserialize::<Self>(raw) {
if uddsketch.uddsketch.count() != 0 {
if self.uddsketch.max_allowed_buckets() != uddsketch.uddsketch.max_allowed_buckets()
|| (self.error_rate - uddsketch.error_rate).abs() >= 1e-9
{
return Err(DataFusionError::Plan(format!(
"Merging UDDSketch with different parameters: arguments={:?} vs actual input={:?}",
(
self.uddsketch.max_allowed_buckets(),
self.error_rate
),
(uddsketch.uddsketch.max_allowed_buckets(), uddsketch.error_rate)
)));
}
self.uddsketch.merge_sketch(&uddsketch.uddsketch);
}
} else {
trace!("Warning: Failed to deserialize UDDSketch from {:?}", raw);
return Err(DataFusionError::Plan(
"Failed to deserialize UDDSketch from binary".to_string(),
));
}
Ok(())
}
}
@@ -113,9 +155,21 @@ fn downcast_accumulator_args(args: AccumulatorArgs) -> DfResult<(u64, f64)> {
impl DfAccumulator for UddSketchState {
fn update_batch(&mut self, values: &[ArrayRef]) -> DfResult<()> {
let array = &values[2]; // the third column is data value
let f64_array = as_primitive_array::<Float64Type>(array)?;
for v in f64_array.iter().flatten() {
self.update(v);
match array.data_type() {
DataType::Float64 => {
let f64_array = as_primitive_array::<Float64Type>(array)?;
for v in f64_array.iter().flatten() {
self.update(v);
}
}
// meaning instantiate as `uddsketch_merge`
DataType::Binary => self.merge_batch(&[array.clone()])?,
_ => {
return not_impl_err!(
"UDDSketch functions do not support data type: {}",
array.data_type()
)
}
}
Ok(())
@@ -123,7 +177,7 @@ impl DfAccumulator for UddSketchState {
fn evaluate(&mut self) -> DfResult<ScalarValue> {
Ok(ScalarValue::Binary(Some(
bincode::serialize(&self.uddsketch).map_err(|e| {
bincode::serialize(&self).map_err(|e| {
DataFusionError::Internal(format!("Failed to serialize UDDSketch: {}", e))
})?,
)))
@@ -150,7 +204,7 @@ impl DfAccumulator for UddSketchState {
fn state(&mut self) -> DfResult<Vec<ScalarValue>> {
Ok(vec![ScalarValue::Binary(Some(
bincode::serialize(&self.uddsketch).map_err(|e| {
bincode::serialize(&self).map_err(|e| {
DataFusionError::Internal(format!("Failed to serialize UDDSketch: {}", e))
})?,
))])
@@ -160,7 +214,7 @@ impl DfAccumulator for UddSketchState {
let array = &states[0];
let binary_array = as_binary_array(array)?;
for v in binary_array.iter().flatten() {
self.merge(v);
self.merge(v)?;
}
Ok(())
@@ -182,8 +236,8 @@ mod tests {
let result = state.evaluate().unwrap();
if let ScalarValue::Binary(Some(bytes)) = result {
let deserialized: UDDSketch = bincode::deserialize(&bytes).unwrap();
assert_eq!(deserialized.count(), 3);
let deserialized: UddSketchState = bincode::deserialize(&bytes).unwrap();
assert_eq!(deserialized.uddsketch.count(), 3);
} else {
panic!("Expected binary scalar value");
}
@@ -201,13 +255,15 @@ mod tests {
// Create new state and merge the serialized data
let mut new_state = UddSketchState::new(10, 0.01);
if let ScalarValue::Binary(Some(bytes)) = &serialized {
new_state.merge(bytes);
new_state.merge(bytes).unwrap();
// Verify the merged state matches original by comparing deserialized values
let original_sketch: UDDSketch = bincode::deserialize(bytes).unwrap();
let original_sketch: UddSketchState = bincode::deserialize(bytes).unwrap();
let original_sketch = original_sketch.uddsketch;
let new_result = new_state.evaluate().unwrap();
if let ScalarValue::Binary(Some(new_bytes)) = new_result {
let new_sketch: UDDSketch = bincode::deserialize(&new_bytes).unwrap();
let new_sketch: UddSketchState = bincode::deserialize(&new_bytes).unwrap();
let new_sketch = new_sketch.uddsketch;
assert_eq!(original_sketch.count(), new_sketch.count());
assert_eq!(original_sketch.sum(), new_sketch.sum());
assert_eq!(original_sketch.mean(), new_sketch.mean());
@@ -244,7 +300,8 @@ mod tests {
let result = state.evaluate().unwrap();
if let ScalarValue::Binary(Some(bytes)) = result {
let deserialized: UDDSketch = bincode::deserialize(&bytes).unwrap();
let deserialized: UddSketchState = bincode::deserialize(&bytes).unwrap();
let deserialized = deserialized.uddsketch;
assert_eq!(deserialized.count(), 3);
} else {
panic!("Expected binary scalar value");
@@ -273,7 +330,8 @@ mod tests {
let result = merged_state.evaluate().unwrap();
if let ScalarValue::Binary(Some(bytes)) = result {
let deserialized: UDDSketch = bincode::deserialize(&bytes).unwrap();
let deserialized: UddSketchState = bincode::deserialize(&bytes).unwrap();
let deserialized = deserialized.uddsketch;
assert_eq!(deserialized.count(), 2);
} else {
panic!("Expected binary scalar value");

View File

@@ -12,8 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use std::iter::repeat_n;
use std::sync::Arc;
use std::{fmt, iter};
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::Volatility;
@@ -126,9 +127,10 @@ impl Function for MatchesTermFunction {
let term = term_column.get_ref(0).as_string().unwrap();
match term {
None => {
return Ok(Arc::new(BooleanVector::from_iter(
iter::repeat(None).take(text_column.len()),
)));
return Ok(Arc::new(BooleanVector::from_iter(repeat_n(
None,
text_column.len(),
))));
}
Some(term) => Some(MatchesTermFinder::new(term)),
}
@@ -217,7 +219,7 @@ impl MatchesTermFinder {
}
let mut pos = 0;
while let Some(found_pos) = self.finder.find(text[pos..].as_bytes()) {
while let Some(found_pos) = self.finder.find(&text.as_bytes()[pos..]) {
let actual_pos = pos + found_pos;
let prev_ok = self.starts_with_non_alnum

View File

@@ -13,10 +13,8 @@
// limitations under the License.
use std::sync::Arc;
mod greatest;
mod to_unixtime;
use greatest::GreatestFunction;
use to_unixtime::ToUnixtimeFunction;
use crate::function_registry::FunctionRegistry;
@@ -26,6 +24,5 @@ pub(crate) struct TimestampFunction;
impl TimestampFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register(Arc::new(ToUnixtimeFunction));
registry.register(Arc::new(GreatestFunction));
}
}

View File

@@ -1,328 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{self};
use common_query::error::{
self, ArrowComputeSnafu, InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, Volatility};
use datafusion::arrow::compute::kernels::cmp::gt;
use datatypes::arrow::array::AsArray;
use datatypes::arrow::compute::cast;
use datatypes::arrow::compute::kernels::zip;
use datatypes::arrow::datatypes::{
DataType as ArrowDataType, Date32Type, TimeUnit, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
};
use datatypes::prelude::ConcreteDataType;
use datatypes::types::TimestampType;
use datatypes::vectors::{Helper, VectorRef};
use snafu::{ensure, ResultExt};
use crate::function::{Function, FunctionContext};
#[derive(Clone, Debug, Default)]
pub struct GreatestFunction;
const NAME: &str = "greatest";
macro_rules! gt_time_types {
($ty: ident, $columns:expr) => {{
let column1 = $columns[0].to_arrow_array();
let column2 = $columns[1].to_arrow_array();
let column1 = column1.as_primitive::<$ty>();
let column2 = column2.as_primitive::<$ty>();
let boolean_array = gt(&column1, &column2).context(ArrowComputeSnafu)?;
let result = zip::zip(&boolean_array, &column1, &column2).context(ArrowComputeSnafu)?;
Helper::try_into_vector(&result).context(error::FromArrowArraySnafu)
}};
}
impl Function for GreatestFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
ensure!(
input_types.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly two, have: {}",
input_types.len()
)
}
);
match &input_types[0] {
ConcreteDataType::String(_) => Ok(ConcreteDataType::timestamp_millisecond_datatype()),
ConcreteDataType::Date(_) => Ok(ConcreteDataType::date_datatype()),
ConcreteDataType::Timestamp(ts_type) => Ok(ConcreteDataType::Timestamp(*ts_type)),
_ => UnsupportedInputDataTypeSnafu {
function: NAME,
datatypes: input_types,
}
.fail(),
}
}
fn signature(&self) -> Signature {
Signature::uniform(
2,
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::date_datatype(),
ConcreteDataType::timestamp_nanosecond_datatype(),
ConcreteDataType::timestamp_microsecond_datatype(),
ConcreteDataType::timestamp_millisecond_datatype(),
ConcreteDataType::timestamp_second_datatype(),
],
Volatility::Immutable,
)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly two, have: {}",
columns.len()
),
}
);
match columns[0].data_type() {
ConcreteDataType::String(_) => {
let column1 = cast(
&columns[0].to_arrow_array(),
&ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
)
.context(ArrowComputeSnafu)?;
let column1 = column1.as_primitive::<TimestampMillisecondType>();
let column2 = cast(
&columns[1].to_arrow_array(),
&ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
)
.context(ArrowComputeSnafu)?;
let column2 = column2.as_primitive::<TimestampMillisecondType>();
let boolean_array = gt(&column1, &column2).context(ArrowComputeSnafu)?;
let result =
zip::zip(&boolean_array, &column1, &column2).context(ArrowComputeSnafu)?;
Ok(Helper::try_into_vector(&result).context(error::FromArrowArraySnafu)?)
}
ConcreteDataType::Date(_) => gt_time_types!(Date32Type, columns),
ConcreteDataType::Timestamp(ts_type) => match ts_type {
TimestampType::Second(_) => gt_time_types!(TimestampSecondType, columns),
TimestampType::Millisecond(_) => {
gt_time_types!(TimestampMillisecondType, columns)
}
TimestampType::Microsecond(_) => {
gt_time_types!(TimestampMicrosecondType, columns)
}
TimestampType::Nanosecond(_) => {
gt_time_types!(TimestampNanosecondType, columns)
}
},
_ => UnsupportedInputDataTypeSnafu {
function: NAME,
datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
}
.fail(),
}
}
}
impl fmt::Display for GreatestFunction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "GREATEST")
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_time::timestamp::TimeUnit;
use common_time::{Date, Timestamp};
use datatypes::types::{
DateType, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType,
};
use datatypes::value::Value;
use datatypes::vectors::{
DateVector, StringVector, TimestampMicrosecondVector, TimestampMillisecondVector,
TimestampNanosecondVector, TimestampSecondVector, Vector,
};
use paste::paste;
use super::*;
#[test]
fn test_greatest_takes_string_vector() {
let function = GreatestFunction;
assert_eq!(
function
.return_type(&[
ConcreteDataType::string_datatype(),
ConcreteDataType::string_datatype()
])
.unwrap(),
ConcreteDataType::timestamp_millisecond_datatype()
);
let columns = vec![
Arc::new(StringVector::from(vec![
"1970-01-01".to_string(),
"2012-12-23".to_string(),
])) as _,
Arc::new(StringVector::from(vec![
"2001-02-01".to_string(),
"1999-01-01".to_string(),
])) as _,
];
let result = function
.eval(&FunctionContext::default(), &columns)
.unwrap();
let result = result
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap();
assert_eq!(result.len(), 2);
assert_eq!(
result.get(0),
Value::Timestamp(Timestamp::from_str("2001-02-01 00:00:00", None).unwrap())
);
assert_eq!(
result.get(1),
Value::Timestamp(Timestamp::from_str("2012-12-23 00:00:00", None).unwrap())
);
}
#[test]
fn test_greatest_takes_date_vector() {
let function = GreatestFunction;
assert_eq!(
function
.return_type(&[
ConcreteDataType::date_datatype(),
ConcreteDataType::date_datatype()
])
.unwrap(),
ConcreteDataType::Date(DateType)
);
let columns = vec![
Arc::new(DateVector::from_slice(vec![-1, 2])) as _,
Arc::new(DateVector::from_slice(vec![0, 1])) as _,
];
let result = function
.eval(&FunctionContext::default(), &columns)
.unwrap();
let result = result.as_any().downcast_ref::<DateVector>().unwrap();
assert_eq!(result.len(), 2);
assert_eq!(
result.get(0),
Value::Date(Date::from_str_utc("1970-01-01").unwrap())
);
assert_eq!(
result.get(1),
Value::Date(Date::from_str_utc("1970-01-03").unwrap())
);
}
#[test]
fn test_greatest_takes_datetime_vector() {
let function = GreatestFunction;
assert_eq!(
function
.return_type(&[
ConcreteDataType::timestamp_millisecond_datatype(),
ConcreteDataType::timestamp_millisecond_datatype()
])
.unwrap(),
ConcreteDataType::timestamp_millisecond_datatype()
);
let columns = vec![
Arc::new(TimestampMillisecondVector::from_slice(vec![-1, 2])) as _,
Arc::new(TimestampMillisecondVector::from_slice(vec![0, 1])) as _,
];
let result = function
.eval(&FunctionContext::default(), &columns)
.unwrap();
let result = result
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap();
assert_eq!(result.len(), 2);
assert_eq!(
result.get(0),
Value::Timestamp(Timestamp::from_str("1970-01-01 00:00:00", None).unwrap())
);
assert_eq!(
result.get(1),
Value::Timestamp(Timestamp::from_str("1970-01-01 00:00:00.002", None).unwrap())
);
}
macro_rules! test_timestamp {
($type: expr,$unit: ident) => {
paste! {
#[test]
fn [<test_greatest_takes_ $unit:lower _vector>]() {
let function = GreatestFunction;
assert_eq!(
function.return_type(&[$type, $type]).unwrap(),
ConcreteDataType::Timestamp(TimestampType::$unit([<Timestamp $unit Type>]))
);
let columns = vec![
Arc::new([<Timestamp $unit Vector>]::from_slice(vec![-1, 2])) as _,
Arc::new([<Timestamp $unit Vector>]::from_slice(vec![0, 1])) as _,
];
let result = function.eval(&FunctionContext::default(), &columns).unwrap();
let result = result.as_any().downcast_ref::<[<Timestamp $unit Vector>]>().unwrap();
assert_eq!(result.len(), 2);
assert_eq!(
result.get(0),
Value::Timestamp(Timestamp::new(0, TimeUnit::$unit))
);
assert_eq!(
result.get(1),
Value::Timestamp(Timestamp::new(2, TimeUnit::$unit))
);
}
}
}
}
test_timestamp!(
ConcreteDataType::timestamp_nanosecond_datatype(),
Nanosecond
);
test_timestamp!(
ConcreteDataType::timestamp_microsecond_datatype(),
Microsecond
);
test_timestamp!(
ConcreteDataType::timestamp_millisecond_datatype(),
Millisecond
);
test_timestamp!(ConcreteDataType::timestamp_second_datatype(), Second);
}

View File

@@ -115,6 +115,13 @@ impl Function for UddSketchCalcFunction {
}
};
// Check if the sketch is empty, if so, return null
// This is important to avoid panics when calling estimate_quantile on an empty sketch
// In practice, this will happen if input is all null
if sketch.bucket_iter().count() == 0 {
builder.push_null();
continue;
}
// Compute the estimated quantile from the sketch
let result = sketch.estimate_quantile(perc);
builder.push(Some(result));

View File

@@ -217,7 +217,9 @@ pub enum Instruction {
/// Invalidates batch cache.
InvalidateCaches(Vec<CacheIdent>),
/// Flushes regions.
FlushRegion(FlushRegions),
FlushRegions(FlushRegions),
/// Flushes a single region.
FlushRegion(RegionId),
}
/// The reply of [UpgradeRegion].
@@ -248,6 +250,7 @@ pub enum InstructionReply {
CloseRegion(SimpleReply),
UpgradeRegion(UpgradeRegionReply),
DowngradeRegion(DowngradeRegionReply),
FlushRegion(SimpleReply),
}
impl Display for InstructionReply {
@@ -259,6 +262,7 @@ impl Display for InstructionReply {
Self::DowngradeRegion(reply) => {
write!(f, "InstructionReply::DowngradeRegion({})", reply)
}
Self::FlushRegion(reply) => write!(f, "InstructionReply::FlushRegion({})", reply),
}
}
}

View File

@@ -112,7 +112,7 @@ pub mod test_utils;
mod tombstone;
pub mod topic_name;
pub mod topic_region;
pub(crate) mod txn_helper;
pub mod txn_helper;
pub mod view_info;
use std::collections::{BTreeMap, HashMap, HashSet};

View File

@@ -25,7 +25,7 @@ pub struct TxnOpGetResponseSet(Vec<KeyValue>);
impl TxnOpGetResponseSet {
/// Returns a filter to consume a [KeyValue] where the key equals `key`.
pub(crate) fn filter(key: Vec<u8>) -> impl FnMut(&mut TxnOpGetResponseSet) -> Option<Vec<u8>> {
pub fn filter(key: Vec<u8>) -> impl FnMut(&mut TxnOpGetResponseSet) -> Option<Vec<u8>> {
move |set| {
let pos = set.0.iter().position(|kv| kv.key == key);
match pos {
@@ -36,7 +36,7 @@ impl TxnOpGetResponseSet {
}
/// Returns a decoder to decode bytes to `DeserializedValueWithBytes<T>`.
pub(crate) fn decode_with<F, T>(
pub fn decode_with<F, T>(
mut f: F,
) -> impl FnMut(&mut TxnOpGetResponseSet) -> Result<Option<DeserializedValueWithBytes<T>>>
where

View File

@@ -35,7 +35,7 @@ pub mod memory;
pub mod rds;
pub mod test;
pub mod txn;
pub mod util;
pub type KvBackendRef<E = Error> = Arc<dyn KvBackend<Error = E> + Send + Sync>;
#[async_trait]

View File

@@ -0,0 +1,85 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// Removes sensitive information like passwords from connection strings.
///
/// This function sanitizes connection strings by removing credentials:
/// - For URL format (mysql://user:password@host:port/db): Removes everything before '@'
/// - For parameter format (host=localhost password=secret): Removes the password parameter
/// - For URL format without credentials (mysql://host:port/db): Removes the protocol prefix
///
/// # Arguments
///
/// * `conn_str` - The connection string to sanitize
///
/// # Returns
///
/// A sanitized version of the connection string with sensitive information removed
pub fn sanitize_connection_string(conn_str: &str) -> String {
// Case 1: URL format with credentials (mysql://user:password@host:port/db)
// Extract everything after the '@' symbol
if let Some(at_pos) = conn_str.find('@') {
return conn_str[at_pos + 1..].to_string();
}
// Case 2: Parameter format with password (host=localhost password=secret dbname=mydb)
// Filter out any parameter that starts with "password="
if conn_str.contains("password=") {
return conn_str
.split_whitespace()
.filter(|param| !param.starts_with("password="))
.collect::<Vec<_>>()
.join(" ");
}
// Case 3: URL format without credentials (mysql://host:port/db)
// Extract everything after the protocol prefix
if let Some(host_part) = conn_str.split("://").nth(1) {
return host_part.to_string();
}
// Case 4: Already sanitized or unknown format
// Return as is
conn_str.to_string()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sanitize_connection_string() {
// Test URL format with username/password
let conn_str = "mysql://user:password123@localhost:3306/db";
assert_eq!(sanitize_connection_string(conn_str), "localhost:3306/db");
// Test URL format without credentials
let conn_str = "mysql://localhost:3306/db";
assert_eq!(sanitize_connection_string(conn_str), "localhost:3306/db");
// Test parameter format with password
let conn_str = "host=localhost port=5432 user=postgres password=secret dbname=mydb";
assert_eq!(
sanitize_connection_string(conn_str),
"host=localhost port=5432 user=postgres dbname=mydb"
);
// Test parameter format without password
let conn_str = "host=localhost port=5432 user=postgres dbname=mydb";
assert_eq!(
sanitize_connection_string(conn_str),
"host=localhost port=5432 user=postgres dbname=mydb"
);
}
}

View File

@@ -15,8 +15,6 @@
#![feature(assert_matches)]
#![feature(btree_extract_if)]
#![feature(let_chains)]
#![feature(extract_if)]
#![feature(hash_extract_if)]
pub mod cache;
pub mod cache_invalidator;

View File

@@ -176,15 +176,12 @@ impl TableRoute {
})?
.into();
let leader_peer = peers
.get(region_route.leader_peer_index as usize)
.cloned()
.map(Into::into);
let leader_peer = peers.get(region_route.leader_peer_index as usize).cloned();
let follower_peers = region_route
.follower_peer_indexes
.into_iter()
.filter_map(|x| peers.get(x as usize).cloned().map(Into::into))
.filter_map(|x| peers.get(x as usize).cloned())
.collect::<Vec<_>>();
region_routes.push(RegionRoute {

View File

@@ -24,7 +24,7 @@ use datatypes::prelude::*;
use datatypes::vectors::{Helper as VectorHelper, VectorRef};
use snafu::ResultExt;
use crate::error::{self, Error, FromScalarValueSnafu, IntoVectorSnafu, Result};
use crate::error::{self, FromScalarValueSnafu, IntoVectorSnafu, Result};
use crate::prelude::*;
pub type AggregateFunctionCreatorRef = Arc<dyn AggregateFunctionCreator>;
@@ -166,8 +166,7 @@ impl DfAccumulator for DfAccumulatorAdaptor {
let output_type = self.creator.output_type()?;
let scalar_value = value
.try_to_scalar_value(&output_type)
.context(error::ToScalarValueSnafu)
.map_err(Error::from)?;
.context(error::ToScalarValueSnafu)?;
Ok(scalar_value)
}

View File

@@ -39,6 +39,7 @@ pub struct RegionHeartbeatResponseHandler {
region_server: RegionServer,
catchup_tasks: TaskTracker<()>,
downgrade_tasks: TaskTracker<()>,
flush_tasks: TaskTracker<()>,
}
/// Handler of the instruction.
@@ -50,6 +51,7 @@ pub struct HandlerContext {
region_server: RegionServer,
catchup_tasks: TaskTracker<()>,
downgrade_tasks: TaskTracker<()>,
flush_tasks: TaskTracker<()>,
}
impl HandlerContext {
@@ -63,6 +65,7 @@ impl HandlerContext {
region_server,
catchup_tasks: TaskTracker::new(),
downgrade_tasks: TaskTracker::new(),
flush_tasks: TaskTracker::new(),
}
}
}
@@ -74,6 +77,7 @@ impl RegionHeartbeatResponseHandler {
region_server,
catchup_tasks: TaskTracker::new(),
downgrade_tasks: TaskTracker::new(),
flush_tasks: TaskTracker::new(),
}
}
@@ -95,8 +99,11 @@ impl RegionHeartbeatResponseHandler {
handler_context.handle_upgrade_region_instruction(upgrade_region)
})),
Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
Instruction::FlushRegion(flush_regions) => Ok(Box::new(move |handler_context| {
handler_context.handle_flush_region_instruction(flush_regions)
Instruction::FlushRegions(flush_regions) => Ok(Box::new(move |handler_context| {
handler_context.handle_flush_regions_instruction(flush_regions)
})),
Instruction::FlushRegion(flush_region) => Ok(Box::new(move |handler_context| {
handler_context.handle_flush_region_instruction(flush_region)
})),
}
}
@@ -111,6 +118,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
| Some((_, Instruction::CloseRegion { .. }))
| Some((_, Instruction::DowngradeRegion { .. }))
| Some((_, Instruction::UpgradeRegion { .. }))
| Some((_, Instruction::FlushRegion { .. }))
)
}
@@ -124,12 +132,14 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
let region_server = self.region_server.clone();
let catchup_tasks = self.catchup_tasks.clone();
let downgrade_tasks = self.downgrade_tasks.clone();
let flush_tasks = self.flush_tasks.clone();
let handler = Self::build_handler(instruction)?;
let _handle = common_runtime::spawn_global(async move {
let reply = handler(HandlerContext {
region_server,
catchup_tasks,
downgrade_tasks,
flush_tasks,
})
.await;

View File

@@ -12,16 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::instruction::{FlushRegions, InstructionReply};
use common_meta::instruction::{FlushRegions, InstructionReply, SimpleReply};
use common_telemetry::warn;
use futures_util::future::BoxFuture;
use store_api::region_request::{RegionFlushRequest, RegionRequest};
use store_api::storage::RegionId;
use crate::error;
use crate::heartbeat::handler::HandlerContext;
impl HandlerContext {
pub(crate) fn handle_flush_region_instruction(
pub(crate) fn handle_flush_regions_instruction(
self,
flush_regions: FlushRegions,
) -> BoxFuture<'static, Option<InstructionReply>> {
@@ -49,6 +50,59 @@ impl HandlerContext {
None
})
}
pub(crate) fn handle_flush_region_instruction(
self,
region_id: RegionId,
) -> BoxFuture<'static, Option<InstructionReply>> {
Box::pin(async move {
let Some(writable) = self.region_server.is_region_leader(region_id) else {
return Some(InstructionReply::FlushRegion(SimpleReply {
result: false,
error: Some("Region is not leader".to_string()),
}));
};
if !writable {
return Some(InstructionReply::FlushRegion(SimpleReply {
result: false,
error: Some("Region is not writable".to_string()),
}));
}
let region_server_moved = self.region_server.clone();
let register_result = self
.flush_tasks
.try_register(
region_id,
Box::pin(async move {
let request = RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
});
region_server_moved
.handle_request(region_id, request)
.await?;
Ok(())
}),
)
.await;
if register_result.is_busy() {
warn!("Another flush task is running for the region: {region_id}");
}
let mut watcher = register_result.into_watcher();
let result = self.flush_tasks.wait_until_finish(&mut watcher).await;
match result {
Ok(()) => Some(InstructionReply::FlushRegion(SimpleReply {
result: true,
error: None,
})),
Err(err) => Some(InstructionReply::FlushRegion(SimpleReply {
result: false,
error: Some(format!("{err:?}")),
})),
}
})
}
}
#[cfg(test)]
@@ -84,7 +138,7 @@ mod tests {
let reply = handler_context
.clone()
.handle_flush_region_instruction(FlushRegions {
.handle_flush_regions_instruction(FlushRegions {
region_ids: region_ids.clone(),
})
.await;
@@ -94,7 +148,7 @@ mod tests {
flushed_region_ids.write().unwrap().clear();
let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::<Vec<_>>();
let reply = handler_context
.handle_flush_region_instruction(FlushRegions {
.handle_flush_regions_instruction(FlushRegions {
region_ids: not_found_region_ids.clone(),
})
.await;

View File

@@ -144,6 +144,11 @@ impl<T: Send + Sync + Clone + 'static> TaskTracker<T> {
}
}
/// Waits for a [RegisterResult] and returns a [WaitResult].
pub(crate) async fn wait_until_finish(&self, watcher: &mut TaskWatcher<T>) -> Result<T> {
wait(watcher).await
}
/// Tries to register a new async task, returns [RegisterResult::Busy] if previous task is running.
pub(crate) async fn try_register(
&self,

View File

@@ -253,9 +253,10 @@ fn create_current_timestamp_vector(
data_type: &ConcreteDataType,
num_rows: usize,
) -> Result<VectorRef> {
let current_timestamp_vector = TimestampMillisecondVector::from_values(
std::iter::repeat(util::current_time_millis()).take(num_rows),
);
let current_timestamp_vector = TimestampMillisecondVector::from_values(std::iter::repeat_n(
util::current_time_millis(),
num_rows,
));
if data_type.is_timestamp() {
current_timestamp_vector.cast(data_type)
} else {

View File

@@ -198,8 +198,7 @@ impl fmt::Debug for ConstantVector {
impl Serializable for ConstantVector {
fn serialize_to_json(&self) -> Result<Vec<serde_json::Value>> {
std::iter::repeat(self.get(0))
.take(self.len())
std::iter::repeat_n(self.get(0), self.len())
.map(serde_json::Value::try_from)
.collect::<serde_json::Result<_>>()
.context(SerializeSnafu)

View File

@@ -412,7 +412,7 @@ pub(crate) fn replicate_decimal128(
// Safety: std::iter::Repeat and std::iter::Take implement TrustedLen.
builder
.mutable_array
.append_trusted_len_iter(std::iter::repeat(data).take(repeat_times));
.append_trusted_len_iter(std::iter::repeat_n(data, repeat_times));
}
}
None => {

View File

@@ -120,9 +120,7 @@ impl fmt::Debug for NullVector {
impl Serializable for NullVector {
fn serialize_to_json(&self) -> Result<Vec<serde_json::Value>> {
Ok(std::iter::repeat(serde_json::Value::Null)
.take(self.len())
.collect())
Ok(std::iter::repeat_n(serde_json::Value::Null, self.len()).collect())
}
}

View File

@@ -388,7 +388,7 @@ pub(crate) fn replicate_primitive<T: LogicalPrimitiveType>(
// Safety: std::iter::Repeat and std::iter::Take implement TrustedLen.
builder
.mutable_array
.append_trusted_len_iter(std::iter::repeat(data).take(repeat_times));
.append_trusted_len_iter(std::iter::repeat_n(data, repeat_times));
}
}
None => {

View File

@@ -37,11 +37,12 @@ use tokio::sync::{Mutex, RwLock};
use crate::adapter::{CreateFlowArgs, StreamingEngine};
use crate::batching_mode::engine::BatchingEngine;
use crate::batching_mode::{FRONTEND_SCAN_TIMEOUT, MIN_REFRESH_DURATION};
use crate::engine::FlowEngine;
use crate::error::{
CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, IllegalCheckTaskStateSnafu,
InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, SyncCheckTaskSnafu,
UnexpectedSnafu,
InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, NoAvailableFrontendSnafu,
SyncCheckTaskSnafu, UnexpectedSnafu,
};
use crate::metrics::METRIC_FLOW_TASK_COUNT;
use crate::repr::{self, DiffRow};
@@ -81,6 +82,11 @@ impl FlowDualEngine {
}
}
/// Determine if the engine is in distributed mode
pub fn is_distributed(&self) -> bool {
self.streaming_engine.node_id.is_some()
}
pub fn streaming_engine(&self) -> Arc<StreamingEngine> {
self.streaming_engine.clone()
}
@@ -89,6 +95,39 @@ impl FlowDualEngine {
self.batching_engine.clone()
}
/// In distributed mode, scan periodically(1s) until available frontend is found, or timeout,
/// in standalone mode, return immediately
/// notice here if any frontend appear in cluster info this function will return immediately
async fn wait_for_available_frontend(&self, timeout: std::time::Duration) -> Result<(), Error> {
if !self.is_distributed() {
return Ok(());
}
let frontend_client = self.batching_engine().frontend_client.clone();
let sleep_duration = std::time::Duration::from_millis(1_000);
let now = std::time::Instant::now();
loop {
let frontend_list = frontend_client.scan_for_frontend().await?;
if !frontend_list.is_empty() {
let fe_list = frontend_list
.iter()
.map(|(_, info)| &info.peer.addr)
.collect::<Vec<_>>();
info!("Available frontend found: {:?}", fe_list);
return Ok(());
}
let elapsed = now.elapsed();
tokio::time::sleep(sleep_duration).await;
info!("Waiting for available frontend, elapsed={:?}", elapsed);
if elapsed >= timeout {
return NoAvailableFrontendSnafu {
timeout,
context: "No available frontend found in cluster info",
}
.fail();
}
}
}
/// Try to sync with check task, this is only used in drop flow&flush flow, so a flow id is required
///
/// the need to sync is to make sure flush flow actually get called
@@ -338,18 +377,36 @@ struct ConsistentCheckTask {
impl ConsistentCheckTask {
async fn start_check_task(engine: &Arc<FlowDualEngine>) -> Result<Self, Error> {
// first do recover flows
engine.check_flow_consistent(true, false).await?;
let inner = engine.clone();
let engine = engine.clone();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let (trigger_tx, mut trigger_rx) =
tokio::sync::mpsc::channel::<(bool, bool, tokio::sync::oneshot::Sender<()>)>(10);
let handle = common_runtime::spawn_global(async move {
// first check if available frontend is found
if let Err(err) = engine
.wait_for_available_frontend(FRONTEND_SCAN_TIMEOUT)
.await
{
warn!("No frontend is available yet:\n {err:?}");
}
// then do recover flows, if failed, always retry
let mut recover_retry = 0;
while let Err(err) = engine.check_flow_consistent(true, false).await {
recover_retry += 1;
error!(
"Failed to recover flows:\n {err:?}, retry {} in {}s",
recover_retry,
MIN_REFRESH_DURATION.as_secs()
);
tokio::time::sleep(MIN_REFRESH_DURATION).await;
}
// then do check flows, with configurable allow_create and allow_drop
let (mut allow_create, mut allow_drop) = (false, false);
let mut ret_signal: Option<tokio::sync::oneshot::Sender<()>> = None;
loop {
if let Err(err) = inner.check_flow_consistent(allow_create, allow_drop).await {
if let Err(err) = engine.check_flow_consistent(allow_create, allow_drop).await {
error!(err; "Failed to check flow consistent");
}
if let Some(done) = ret_signal.take() {
@@ -534,7 +591,12 @@ impl FlowEngine for FlowDualEngine {
match flow_type {
Some(FlowType::Batching) => self.batching_engine.flush_flow(flow_id).await,
Some(FlowType::Streaming) => self.streaming_engine.flush_flow(flow_id).await,
None => Ok(0),
None => {
warn!(
"Currently flow={flow_id} doesn't exist in flownode, ignore flush_flow request"
);
Ok(0)
}
}
}

View File

@@ -31,4 +31,19 @@ pub const DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT: Duration = Duration::from_secs(
pub const SLOW_QUERY_THRESHOLD: Duration = Duration::from_secs(60);
/// The minimum duration between two queries execution by batching mode task
const MIN_REFRESH_DURATION: Duration = Duration::new(5, 0);
pub const MIN_REFRESH_DURATION: Duration = Duration::new(5, 0);
/// Grpc connection timeout
const GRPC_CONN_TIMEOUT: Duration = Duration::from_secs(5);
/// Grpc max retry number
const GRPC_MAX_RETRIES: u32 = 3;
/// Flow wait for available frontend timeout,
/// if failed to find available frontend after FRONTEND_SCAN_TIMEOUT elapsed, return error
/// which should prevent flownode from starting
pub const FRONTEND_SCAN_TIMEOUT: Duration = Duration::from_secs(30);
/// Frontend activity timeout
/// if frontend is down(not sending heartbeat) for more than FRONTEND_ACTIVITY_TIMEOUT, it will be removed from the list that flownode use to connect
pub const FRONTEND_ACTIVITY_TIMEOUT: Duration = Duration::from_secs(60);

View File

@@ -49,7 +49,8 @@ use crate::{CreateFlowArgs, Error, FlowId, TableName};
pub struct BatchingEngine {
tasks: RwLock<BTreeMap<FlowId, BatchingTask>>,
shutdown_txs: RwLock<BTreeMap<FlowId, oneshot::Sender<()>>>,
frontend_client: Arc<FrontendClient>,
/// frontend client for insert request
pub(crate) frontend_client: Arc<FrontendClient>,
flow_metadata_manager: FlowMetadataManagerRef,
table_meta: TableMetadataManagerRef,
catalog_manager: CatalogManagerRef,

View File

@@ -15,6 +15,7 @@
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
use std::sync::{Arc, Weak};
use std::time::SystemTime;
use api::v1::greptime_request::Request;
use api::v1::CreateTableExpr;
@@ -25,13 +26,18 @@ use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
use common_meta::peer::Peer;
use common_meta::rpc::store::RangeRequest;
use common_query::Output;
use common_telemetry::warn;
use itertools::Itertools;
use meta_client::client::MetaClient;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use crate::batching_mode::DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT;
use crate::error::{ExternalSnafu, InvalidRequestSnafu, UnexpectedSnafu};
use crate::batching_mode::{
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT,
GRPC_MAX_RETRIES,
};
use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
use crate::Error;
/// Just like [`GrpcQueryHandler`] but use BoxedError
@@ -99,7 +105,9 @@ impl FrontendClient {
Self::Distributed {
meta_client,
chnl_mgr: {
let cfg = ChannelConfig::new().timeout(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT);
let cfg = ChannelConfig::new()
.connect_timeout(GRPC_CONN_TIMEOUT)
.timeout(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT);
ChannelManager::with_config(cfg)
},
}
@@ -122,10 +130,24 @@ impl DatabaseWithPeer {
fn new(database: Database, peer: Peer) -> Self {
Self { database, peer }
}
/// Try sending a "SELECT 1" to the database
async fn try_select_one(&self) -> Result<(), Error> {
// notice here use `sql` for `SELECT 1` return 1 row
let _ = self
.database
.sql("SELECT 1")
.await
.with_context(|_| InvalidRequestSnafu {
context: format!("Failed to handle `SELECT 1` request at {:?}", self.peer),
})?;
Ok(())
}
}
impl FrontendClient {
async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
/// scan for available frontend from metadata
pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
let Self::Distributed { meta_client, .. } = self else {
return Ok(vec![]);
};
@@ -155,8 +177,8 @@ impl FrontendClient {
Ok(res)
}
/// Get the database with max `last_activity_ts`
async fn get_last_active_frontend(
/// Get the database with maximum `last_activity_ts`& is able to process query
async fn get_latest_active_frontend(
&self,
catalog: &str,
schema: &str,
@@ -172,22 +194,50 @@ impl FrontendClient {
.fail();
};
let frontends = self.scan_for_frontend().await?;
let mut peer = None;
let mut interval = tokio::time::interval(GRPC_CONN_TIMEOUT);
interval.tick().await;
for retry in 0..GRPC_MAX_RETRIES {
let frontends = self.scan_for_frontend().await?;
let now_in_ms = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
if let Some((_, val)) = frontends.iter().max_by_key(|(_, val)| val.last_activity_ts) {
peer = Some(val.peer.clone());
// found node with maximum last_activity_ts
for (_, node_info) in frontends
.iter()
.sorted_by_key(|(_, node_info)| node_info.last_activity_ts)
.rev()
// filter out frontend that have been down for more than 1 min
.filter(|(_, node_info)| {
node_info.last_activity_ts + FRONTEND_ACTIVITY_TIMEOUT.as_millis() as i64
> now_in_ms
})
{
let addr = &node_info.peer.addr;
let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
let database = Database::new(catalog, schema, client);
let db = DatabaseWithPeer::new(database, node_info.peer.clone());
match db.try_select_one().await {
Ok(_) => return Ok(db),
Err(e) => {
warn!(
"Failed to connect to frontend {} on retry={}: \n{e:?}",
addr, retry
);
}
}
}
// no available frontend
// sleep and retry
interval.tick().await;
}
let Some(peer) = peer else {
UnexpectedSnafu {
reason: format!("No frontend available: {:?}", frontends),
}
.fail()?
};
let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![peer.addr.clone()]);
let database = Database::new(catalog, schema, client);
Ok(DatabaseWithPeer::new(database, peer))
NoAvailableFrontendSnafu {
timeout: GRPC_CONN_TIMEOUT,
context: "No available frontend found that is able to process query",
}
.fail()
}
pub async fn create(
@@ -217,17 +267,17 @@ impl FrontendClient {
) -> Result<u32, Error> {
match self {
FrontendClient::Distributed { .. } => {
let db = self.get_last_active_frontend(catalog, schema).await?;
let db = self.get_latest_active_frontend(catalog, schema).await?;
*peer_desc = Some(PeerDesc::Dist {
peer: db.peer.clone(),
});
db.database
.handle(req.clone())
.handle_with_retry(req.clone(), GRPC_MAX_RETRIES)
.await
.with_context(|_| InvalidRequestSnafu {
context: format!("Failed to handle request: {:?}", req),
context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
})
}
FrontendClient::Standalone { database_client } => {

View File

@@ -53,6 +53,7 @@ use crate::batching_mode::utils::{
use crate::batching_mode::{
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, MIN_REFRESH_DURATION, SLOW_QUERY_THRESHOLD,
};
use crate::df_optimizer::apply_df_optimizer;
use crate::error::{
ConvertColumnSchemaSnafu, DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu,
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
@@ -541,7 +542,10 @@ impl BatchingTask {
.clone()
.rewrite(&mut add_auto_column)
.with_context(|_| DatafusionSnafu {
context: format!("Failed to rewrite plan {:?}", self.config.plan),
context: format!(
"Failed to rewrite plan:\n {}\n",
self.config.plan
),
})?
.data;
let schema_len = plan.schema().fields().len();
@@ -573,16 +577,19 @@ impl BatchingTask {
let mut add_filter = AddFilterRewriter::new(expr);
let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
// make a not optimized plan for clearer unparse
let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false)
.await?;
plan.clone()
let rewrite = plan
.clone()
.rewrite(&mut add_filter)
.and_then(|p| p.data.rewrite(&mut add_auto_column))
.with_context(|_| DatafusionSnafu {
context: format!("Failed to rewrite plan {plan:?}"),
context: format!("Failed to rewrite plan:\n {}\n", plan),
})?
.data
.data;
// only apply optimize after complex rewrite is done
apply_df_optimizer(rewrite).await?
};
Ok(Some((new_plan, schema_len)))

View File

@@ -704,6 +704,28 @@ mod test {
),
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')"
),
// complex time window index with where
(
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE number in (2, 3, 4) GROUP BY time_window;",
Timestamp::new(1740394109, TimeUnit::Second),
(
"ts".to_string(),
Some(Timestamp::new(1740394080, TimeUnit::Second)),
Some(Timestamp::new(1740394140, TimeUnit::Second)),
),
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE numbers_with_ts.number IN (2, 3, 4) AND ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')"
),
// complex time window index with between and
(
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE number BETWEEN 2 AND 4 GROUP BY time_window;",
Timestamp::new(1740394109, TimeUnit::Second),
(
"ts".to_string(),
Some(Timestamp::new(1740394080, TimeUnit::Second)),
Some(Timestamp::new(1740394140, TimeUnit::Second)),
),
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE (numbers_with_ts.number BETWEEN 2 AND 4) AND ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')"
),
// no time index
(
"SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;",

View File

@@ -342,8 +342,8 @@ impl TreeNodeRewriter for AddAutoColumnRewriter {
}
} else {
return Err(DataFusionError::Plan(format!(
"Expect table have 0,1 or 2 columns more than query columns, found {} query columns {:?}, {} table columns {:?} at node {:?}",
query_col_cnt, exprs, table_col_cnt, self.schema.column_schemas(), node
"Expect table have 0,1 or 2 columns more than query columns, found {} query columns {:?}, {} table columns {:?}",
query_col_cnt, exprs, table_col_cnt, self.schema.column_schemas()
)));
}
@@ -406,7 +406,9 @@ mod test {
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use pretty_assertions::assert_eq;
use query::query_engine::DefaultSerializer;
use session::context::QueryContext;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use super::*;
use crate::test_utils::create_test_query_engine;
@@ -701,4 +703,18 @@ mod test {
);
}
}
#[tokio::test]
async fn test_null_cast() {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let sql = "SELECT NULL::DOUBLE FROM numbers_with_ts";
let plan = sql_to_df_plan(ctx, query_engine.clone(), sql, false)
.await
.unwrap();
let _sub_plan = DFLogicalSubstraitConvertor {}
.encode(&plan, DefaultSerializer)
.unwrap();
}
}

View File

@@ -25,7 +25,6 @@ use datafusion::config::ConfigOptions;
use datafusion::error::DataFusionError;
use datafusion::functions_aggregate::count::count_udaf;
use datafusion::functions_aggregate::sum::sum_udaf;
use datafusion::optimizer::analyzer::count_wildcard_rule::CountWildcardRule;
use datafusion::optimizer::analyzer::type_coercion::TypeCoercion;
use datafusion::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
use datafusion::optimizer::optimize_projections::OptimizeProjections;
@@ -42,6 +41,7 @@ use datafusion_expr::{
BinaryExpr, ColumnarValue, Expr, Operator, Projection, ScalarFunctionArgs, ScalarUDFImpl,
Signature, TypeSignature, Volatility,
};
use query::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
use query::parser::QueryLanguageParser;
use query::query_engine::DefaultSerializer;
use query::QueryEngine;
@@ -61,9 +61,9 @@ pub async fn apply_df_optimizer(
) -> Result<datafusion_expr::LogicalPlan, Error> {
let cfg = ConfigOptions::new();
let analyzer = Analyzer::with_rules(vec![
Arc::new(CountWildcardRule::new()),
Arc::new(AvgExpandRule::new()),
Arc::new(TumbleExpandRule::new()),
Arc::new(CountWildcardToTimeIndexRule),
Arc::new(AvgExpandRule),
Arc::new(TumbleExpandRule),
Arc::new(CheckGroupByRule::new()),
Arc::new(TypeCoercion::new()),
]);
@@ -128,13 +128,7 @@ pub async fn sql_to_flow_plan(
}
#[derive(Debug)]
struct AvgExpandRule {}
impl AvgExpandRule {
pub fn new() -> Self {
Self {}
}
}
struct AvgExpandRule;
impl AnalyzerRule for AvgExpandRule {
fn analyze(
@@ -331,13 +325,7 @@ impl TreeNodeRewriter for ExpandAvgRewriter<'_> {
/// expand tumble in aggr expr to tumble_start and tumble_end with column name like `window_start`
#[derive(Debug)]
struct TumbleExpandRule {}
impl TumbleExpandRule {
pub fn new() -> Self {
Self {}
}
}
struct TumbleExpandRule;
impl AnalyzerRule for TumbleExpandRule {
fn analyze(

View File

@@ -61,6 +61,16 @@ pub enum Error {
location: Location,
},
#[snafu(display(
"No available frontend found after timeout: {timeout:?}, context: {context}"
))]
NoAvailableFrontend {
timeout: std::time::Duration,
context: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("External error"))]
External {
source: BoxedError,
@@ -296,7 +306,8 @@ impl ErrorExt for Error {
Self::Eval { .. }
| Self::JoinTask { .. }
| Self::Datafusion { .. }
| Self::InsertIntoFlow { .. } => StatusCode::Internal,
| Self::InsertIntoFlow { .. }
| Self::NoAvailableFrontend { .. } => StatusCode::Internal,
Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists,
Self::TableNotFound { .. }
| Self::TableNotFoundMeta { .. }

View File

@@ -172,6 +172,8 @@ impl FlownodeServer {
}
/// Start the background task for streaming computation.
///
/// Should be called only after heartbeat is establish, hence can get cluster info
async fn start_workers(&self) -> Result<(), Error> {
let manager_ref = self.inner.flow_service.dual_engine.clone();
let handle = manager_ref

View File

@@ -481,7 +481,7 @@ mod tests {
let mock_values = dic_values
.iter()
.flat_map(|(value, size)| iter::repeat(value.clone()).take(*size))
.flat_map(|(value, size)| std::iter::repeat_n(value.clone(), *size))
.collect::<Vec<_>>();
let sorted_result = sorted_result(&mock_values, segment_row_count);

View File

@@ -14,7 +14,6 @@
#![feature(result_flattening)]
#![feature(assert_matches)]
#![feature(extract_if)]
#![feature(hash_set_entry)]
pub mod bootstrap;

View File

@@ -14,7 +14,7 @@
pub mod builder;
use std::fmt::Display;
use std::fmt::{self, Display};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
@@ -48,6 +48,7 @@ use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption;
use servers::http::HttpOptions;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use table::metadata::TableId;
use tokio::sync::broadcast::error::RecvError;
@@ -65,7 +66,7 @@ use crate::procedure::wal_prune::manager::WalPruneTickerRef;
use crate::procedure::ProcedureManagerListenerAdapter;
use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
use crate::region::supervisor::RegionSupervisorTickerRef;
use crate::selector::{Selector, SelectorType};
use crate::selector::{RegionStatAwareSelector, Selector, SelectorType};
use crate::service::mailbox::MailboxRef;
use crate::service::store::cached_kv::LeaderCachedKvBackend;
use crate::state::{become_follower, become_leader, StateRef};
@@ -96,7 +97,7 @@ pub enum BackendImpl {
MysqlStore,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[derive(Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct MetasrvOptions {
/// The address the server listens on.
@@ -166,6 +167,47 @@ pub struct MetasrvOptions {
pub node_max_idle_time: Duration,
}
impl fmt::Debug for MetasrvOptions {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut debug_struct = f.debug_struct("MetasrvOptions");
debug_struct
.field("bind_addr", &self.bind_addr)
.field("server_addr", &self.server_addr)
.field("store_addrs", &self.sanitize_store_addrs())
.field("selector", &self.selector)
.field("use_memory_store", &self.use_memory_store)
.field("enable_region_failover", &self.enable_region_failover)
.field(
"allow_region_failover_on_local_wal",
&self.allow_region_failover_on_local_wal,
)
.field("http", &self.http)
.field("logging", &self.logging)
.field("procedure", &self.procedure)
.field("failure_detector", &self.failure_detector)
.field("datanode", &self.datanode)
.field("enable_telemetry", &self.enable_telemetry)
.field("data_home", &self.data_home)
.field("wal", &self.wal)
.field("export_metrics", &self.export_metrics)
.field("store_key_prefix", &self.store_key_prefix)
.field("max_txn_ops", &self.max_txn_ops)
.field("flush_stats_factor", &self.flush_stats_factor)
.field("tracing", &self.tracing)
.field("backend", &self.backend);
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
debug_struct.field("meta_table_name", &self.meta_table_name);
#[cfg(feature = "pg_kvbackend")]
debug_struct.field("meta_election_lock_id", &self.meta_election_lock_id);
debug_struct
.field("node_max_idle_time", &self.node_max_idle_time)
.finish()
}
}
const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
impl Default for MetasrvOptions {
@@ -249,6 +291,13 @@ impl MetasrvOptions {
common_telemetry::debug!("detect local IP is not supported on Android");
}
}
fn sanitize_store_addrs(&self) -> Vec<String> {
self.store_addrs
.iter()
.map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
.collect()
}
}
pub struct MetasrvInfo {
@@ -338,6 +387,8 @@ pub struct SelectorContext {
}
pub type SelectorRef = Arc<dyn Selector<Context = SelectorContext, Output = Vec<Peer>>>;
pub type RegionStatAwareSelectorRef =
Arc<dyn RegionStatAwareSelector<Context = SelectorContext, Output = Vec<(RegionId, Peer)>>>;
pub type ElectionRef = Arc<dyn Election<Leader = LeaderValue>>;
pub struct MetaStateHandler {

View File

@@ -40,7 +40,7 @@ use common_meta::state_store::KvStateStore;
use common_meta::wal_options_allocator::{build_kafka_client, build_wal_options_allocator};
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
use common_telemetry::warn;
use common_telemetry::{info, warn};
use snafu::{ensure, ResultExt};
use crate::cache_invalidator::MetasrvCacheInvalidator;
@@ -54,16 +54,16 @@ use crate::handler::region_lease_handler::{CustomizedRegionLeaseRenewerRef, Regi
use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers};
use crate::lease::MetaPeerLookupService;
use crate::metasrv::{
ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, SelectTarget, SelectorContext, SelectorRef,
FLOW_ID_SEQ, TABLE_ID_SEQ,
ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, RegionStatAwareSelectorRef, SelectTarget,
SelectorContext, SelectorRef, FLOW_ID_SEQ, TABLE_ID_SEQ,
};
use crate::procedure::region_migration::manager::RegionMigrationManager;
use crate::procedure::region_migration::DefaultContextFactory;
use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker};
use crate::procedure::wal_prune::Context as WalPruneContext;
use crate::region::supervisor::{
HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorTicker,
DEFAULT_TICK_INTERVAL,
HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorSelector,
RegionSupervisorTicker, DEFAULT_TICK_INTERVAL,
};
use crate::selector::lease_based::LeaseBasedSelector;
use crate::selector::round_robin::RoundRobinSelector;
@@ -320,13 +320,24 @@ impl MetasrvBuilder {
),
));
region_migration_manager.try_start()?;
let region_supervisor_selector = plugins
.as_ref()
.and_then(|plugins| plugins.get::<RegionStatAwareSelectorRef>());
let supervisor_selector = match region_supervisor_selector {
Some(selector) => {
info!("Using region stat aware selector");
RegionSupervisorSelector::RegionStatAwareSelector(selector)
}
None => RegionSupervisorSelector::NaiveSelector(selector.clone()),
};
let region_failover_handler = if options.enable_region_failover {
let region_supervisor = RegionSupervisor::new(
rx,
options.failure_detector,
selector_ctx.clone(),
selector.clone(),
supervisor_selector,
region_migration_manager.clone(),
maintenance_mode_manager.clone(),
peer_lookup_service.clone(),

View File

@@ -141,10 +141,7 @@ pub async fn mock(
if let Some(client) = client {
Ok(TokioIo::new(client))
} else {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Client already taken",
))
Err(std::io::Error::other("Client already taken"))
}
}
}),

View File

@@ -14,6 +14,7 @@
pub(crate) mod close_downgraded_region;
pub(crate) mod downgrade_leader_region;
pub(crate) mod flush_leader_region;
pub(crate) mod manager;
pub(crate) mod migration_abort;
pub(crate) mod migration_end;
@@ -111,6 +112,8 @@ impl PersistentContext {
pub struct Metrics {
/// Elapsed time of downgrading region and upgrading region.
operations_elapsed: Duration,
/// Elapsed time of flushing leader region.
flush_leader_region_elapsed: Duration,
/// Elapsed time of downgrading leader region.
downgrade_leader_region_elapsed: Duration,
/// Elapsed time of open candidate region.
@@ -121,10 +124,15 @@ pub struct Metrics {
impl Display for Metrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let total = self.flush_leader_region_elapsed
+ self.downgrade_leader_region_elapsed
+ self.open_candidate_region_elapsed
+ self.upgrade_candidate_region_elapsed;
write!(
f,
"operations_elapsed: {:?}, downgrade_leader_region_elapsed: {:?}, open_candidate_region_elapsed: {:?}, upgrade_candidate_region_elapsed: {:?}",
self.operations_elapsed,
"total: {:?}, flush_leader_region_elapsed: {:?}, downgrade_leader_region_elapsed: {:?}, open_candidate_region_elapsed: {:?}, upgrade_candidate_region_elapsed: {:?}",
total,
self.flush_leader_region_elapsed,
self.downgrade_leader_region_elapsed,
self.open_candidate_region_elapsed,
self.upgrade_candidate_region_elapsed
@@ -138,6 +146,11 @@ impl Metrics {
self.operations_elapsed += elapsed;
}
/// Updates the elapsed time of flushing leader region.
pub fn update_flush_leader_region_elapsed(&mut self, elapsed: Duration) {
self.flush_leader_region_elapsed += elapsed;
}
/// Updates the elapsed time of downgrading leader region.
pub fn update_downgrade_leader_region_elapsed(&mut self, elapsed: Duration) {
self.downgrade_leader_region_elapsed += elapsed;
@@ -156,10 +169,18 @@ impl Metrics {
impl Drop for Metrics {
fn drop(&mut self) {
if !self.operations_elapsed.is_zero() {
let total = self.flush_leader_region_elapsed
+ self.downgrade_leader_region_elapsed
+ self.open_candidate_region_elapsed
+ self.upgrade_candidate_region_elapsed;
METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
.with_label_values(&["total"])
.observe(total.as_secs_f64());
if !self.flush_leader_region_elapsed.is_zero() {
METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
.with_label_values(&["operations"])
.observe(self.operations_elapsed.as_secs_f64());
.with_label_values(&["flush_leader_region"])
.observe(self.flush_leader_region_elapsed.as_secs_f64());
}
if !self.downgrade_leader_region_elapsed.is_zero() {
@@ -320,6 +341,13 @@ impl Context {
.update_operations_elapsed(instant.elapsed());
}
/// Updates the elapsed time of flushing leader region.
pub fn update_flush_leader_region_elapsed(&mut self, instant: Instant) {
self.volatile_ctx
.metrics
.update_flush_leader_region_elapsed(instant.elapsed());
}
/// Updates the elapsed time of downgrading leader region.
pub fn update_downgrade_leader_region_elapsed(&mut self, instant: Instant) {
self.volatile_ctx
@@ -700,7 +728,8 @@ mod tests {
use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
use crate::procedure::region_migration::test_util::*;
use crate::procedure::test_util::{
new_downgrade_region_reply, new_open_region_reply, new_upgrade_region_reply,
new_downgrade_region_reply, new_flush_region_reply, new_open_region_reply,
new_upgrade_region_reply,
};
use crate::service::mailbox::Channel;
@@ -1208,6 +1237,15 @@ mod tests {
to_peer_id,
Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
)),
Assertion::simple(assert_flush_leader_region, assert_no_persist),
),
// Flush Leader Region
Step::next(
"Should be the flush leader region",
Some(mock_datanode_reply(
from_peer_id,
Arc::new(|id| Ok(new_flush_region_reply(id, true, None))),
)),
Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
),
// UpdateMetadata::Downgrade

View File

@@ -170,7 +170,7 @@ impl DowngradeLeaderRegion {
if error.is_some() {
return error::RetryLaterSnafu {
reason: format!(
"Failed to downgrade the region {} on Datanode {:?}, error: {:?}, elapsed: {:?}",
"Failed to downgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
region_id, leader, error, now.elapsed()
),
}
@@ -179,13 +179,14 @@ impl DowngradeLeaderRegion {
if !exists {
warn!(
"Trying to downgrade the region {} on Datanode {}, but region doesn't exist!, elapsed: {:?}",
"Trying to downgrade the region {} on datanode {:?}, but region doesn't exist!, elapsed: {:?}",
region_id, leader, now.elapsed()
);
} else {
info!(
"Region {} leader is downgraded, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}",
"Region {} leader is downgraded on datanode {:?}, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}",
region_id,
leader,
last_entry_id,
metadata_last_entry_id,
now.elapsed()

View File

@@ -0,0 +1,285 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use api::v1::meta::MailboxMessage;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_procedure::Status;
use common_telemetry::{info, warn};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use tokio::time::Instant;
use crate::error::{self, Error, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;
/// Flushes the leader region before downgrading it.
///
/// This can minimize the time window where the region is not writable.
#[derive(Debug, Serialize, Deserialize)]
pub struct PreFlushRegion;
#[async_trait::async_trait]
#[typetag::serde]
impl State for PreFlushRegion {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
let timer = Instant::now();
self.flush_region(ctx).await?;
ctx.update_flush_leader_region_elapsed(timer);
// We intentionally don't update `operations_elapsed` here to prevent
// the `next_operation_timeout` from being reduced by the flush operation.
// This ensures sufficient time for subsequent critical operations.
Ok((
Box::new(UpdateMetadata::Downgrade),
Status::executing(false),
))
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl PreFlushRegion {
/// Builds flush leader region instruction.
fn build_flush_leader_region_instruction(&self, ctx: &Context) -> Instruction {
let pc = &ctx.persistent_ctx;
let region_id = pc.region_id;
Instruction::FlushRegion(region_id)
}
/// Tries to flush a leader region.
///
/// Ignore:
/// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
/// - [PushMessage](error::Error::PushMessage), The receiver is dropped.
/// - Failed to flush region on the Datanode.
///
/// Abort:
/// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
/// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible).
/// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply).
/// - [ExceededDeadline](error::Error::ExceededDeadline)
/// - Invalid JSON.
async fn flush_region(&self, ctx: &mut Context) -> Result<()> {
let operation_timeout =
ctx.next_operation_timeout()
.context(error::ExceededDeadlineSnafu {
operation: "Flush leader region",
})?;
let flush_instruction = self.build_flush_leader_region_instruction(ctx);
let region_id = ctx.persistent_ctx.region_id;
let leader = &ctx.persistent_ctx.from_peer;
let msg = MailboxMessage::json_message(
&format!("Flush leader region: {}", region_id),
&format!("Metasrv@{}", ctx.server_addr()),
&format!("Datanode-{}@{}", leader.id, leader.addr),
common_time::util::current_time_millis(),
&flush_instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: flush_instruction.to_string(),
})?;
let ch = Channel::Datanode(leader.id);
let now = Instant::now();
let result = ctx.mailbox.send(&ch, msg, operation_timeout).await;
match result {
Ok(receiver) => match receiver.await? {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received flush leader region reply: {:?}, region: {}, elapsed: {:?}",
reply,
region_id,
now.elapsed()
);
let InstructionReply::FlushRegion(SimpleReply { result, error }) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect flush region reply",
}
.fail();
};
if error.is_some() {
warn!(
"Failed to flush leader region {} on datanode {:?}, error: {:?}. Skip flush operation.",
region_id, leader, error
);
} else if result {
info!(
"The flush leader region {} on datanode {:?} is successful, elapsed: {:?}",
region_id,
leader,
now.elapsed()
);
}
Ok(())
}
Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
operation: "Flush leader region",
}
.fail(),
Err(err) => Err(err),
},
Err(Error::PusherNotFound { .. }) => {
warn!(
"Failed to flush leader region({}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
region_id,
leader
);
Ok(())
}
Err(err) => Err(err),
}
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use store_api::storage::RegionId;
use super::*;
use crate::procedure::region_migration::test_util::{self, TestingEnv};
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
use crate::procedure::test_util::{
new_close_region_reply, new_flush_region_reply, send_mock_reply,
};
fn new_persistent_context() -> PersistentContext {
test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
}
#[tokio::test]
async fn test_datanode_is_unreachable() {
let state = PreFlushRegion;
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
// Should be ok, if leader region is unreachable. it will skip flush operation.
state.flush_region(&mut ctx).await.unwrap();
}
#[tokio::test]
async fn test_unexpected_instruction_reply() {
common_telemetry::init_default_ut_logging();
let state = PreFlushRegion;
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
.await;
// Sends an incorrect reply.
send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
let err = state.flush_region(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::UnexpectedInstructionReply { .. });
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_instruction_exceeded_deadline() {
let state = PreFlushRegion;
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
.await;
// Sends an timeout error.
send_mock_reply(mailbox, rx, |id| {
Err(error::MailboxTimeoutSnafu { id }.build())
});
let err = state.flush_region(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::ExceededDeadline { .. });
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_flush_region_failed() {
common_telemetry::init_default_ut_logging();
let state = PreFlushRegion;
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
.await;
send_mock_reply(mailbox, rx, |id| {
Ok(new_flush_region_reply(
id,
false,
Some("test mocked".to_string()),
))
});
// Should be ok, if flush leader region failed. it will skip flush operation.
state.flush_region(&mut ctx).await.unwrap();
}
#[tokio::test]
async fn test_next_update_metadata_downgrade_state() {
common_telemetry::init_default_ut_logging();
let mut state = PreFlushRegion;
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
.await;
send_mock_reply(mailbox, rx, |id| Ok(new_flush_region_reply(id, true, None)));
let (next, _) = state.next(&mut ctx).await.unwrap();
let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
assert_matches!(update_metadata, UpdateMetadata::Downgrade);
}
}

View File

@@ -28,7 +28,7 @@ use tokio::time::Instant;
use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::flush_leader_region::PreFlushRegion;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;
@@ -47,10 +47,7 @@ impl State for OpenCandidateRegion {
self.open_candidate_region(ctx, instruction).await?;
ctx.update_open_candidate_region_elapsed(now);
Ok((
Box::new(UpdateMetadata::Downgrade),
Status::executing(false),
))
Ok((Box::new(PreFlushRegion), Status::executing(false)))
}
fn as_any(&self) -> &dyn Any {
@@ -399,7 +396,7 @@ mod tests {
}
#[tokio::test]
async fn test_next_update_metadata_downgrade_state() {
async fn test_next_flush_leader_region_state() {
let mut state = Box::new(OpenCandidateRegion);
// from_peer: 1
// to_peer: 2
@@ -445,8 +442,7 @@ mod tests {
(to_peer_id, region_id)
);
let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
assert_matches!(update_metadata, UpdateMetadata::Downgrade);
let flush_leader_region = next.as_any().downcast_ref::<PreFlushRegion>().unwrap();
assert_matches!(flush_leader_region, PreFlushRegion);
}
}

View File

@@ -44,6 +44,7 @@ use crate::error::{self, Error, Result};
use crate::metasrv::MetasrvInfo;
use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
use crate::procedure::region_migration::flush_leader_region::PreFlushRegion;
use crate::procedure::region_migration::manager::RegionMigrationProcedureTracker;
use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
@@ -415,6 +416,11 @@ pub(crate) fn assert_open_candidate_region(next: &dyn State) {
let _ = next.as_any().downcast_ref::<OpenCandidateRegion>().unwrap();
}
/// Asserts the [State] should be [FlushLeaderRegion].
pub(crate) fn assert_flush_leader_region(next: &dyn State) {
let _ = next.as_any().downcast_ref::<PreFlushRegion>().unwrap();
}
/// Asserts the [State] should be [UpdateMetadata::Downgrade].
pub(crate) fn assert_update_metadata_downgrade(next: &dyn State) {
let state = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();

View File

@@ -101,6 +101,24 @@ pub fn new_open_region_reply(id: u64, result: bool, error: Option<String>) -> Ma
}
}
/// Generates a [InstructionReply::FlushRegion] reply.
pub fn new_flush_region_reply(id: u64, result: bool, error: Option<String>) -> MailboxMessage {
MailboxMessage {
id,
subject: "mock".to_string(),
from: "datanode".to_string(),
to: "meta".to_string(),
timestamp_millis: current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::FlushRegion(SimpleReply {
result,
error,
}))
.unwrap(),
)),
}
}
/// Generates a [InstructionReply::CloseRegion] reply.
pub fn new_close_region_reply(id: u64) -> MailboxMessage {
MailboxMessage {

View File

@@ -181,7 +181,7 @@ impl WalPruneProcedure {
let peer_and_instructions = peer_region_ids_map
.into_iter()
.map(|(peer, region_ids)| {
let flush_instruction = Instruction::FlushRegion(FlushRegions { region_ids });
let flush_instruction = Instruction::FlushRegions(FlushRegions { region_ids });
(peer.clone(), flush_instruction)
})
.collect();
@@ -536,7 +536,7 @@ mod tests {
let msg = resp.mailbox_message.unwrap();
let flush_instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
let mut flush_requested_region_ids = match flush_instruction {
Instruction::FlushRegion(FlushRegions { region_ids, .. }) => region_ids,
Instruction::FlushRegions(FlushRegions { region_ids, .. }) => region_ids,
_ => unreachable!(),
};
let sorted_region_ids = region_ids

View File

@@ -22,20 +22,20 @@ use common_meta::datanode::Stat;
use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController};
use common_meta::key::maintenance::MaintenanceModeManagerRef;
use common_meta::leadership_notifier::LeadershipChangeListener;
use common_meta::peer::PeerLookupServiceRef;
use common_meta::peer::{Peer, PeerLookupServiceRef};
use common_meta::DatanodeId;
use common_runtime::JoinHandle;
use common_telemetry::{debug, error, info, warn};
use common_time::util::current_time_millis;
use error::Error::{LeaderPeerChanged, MigrationRunning, TableRouteNotFound};
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::time::{interval, MissedTickBehavior};
use crate::error::{self, Result};
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::metasrv::{SelectorContext, SelectorRef};
use crate::metasrv::{RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef};
use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
use crate::procedure::region_migration::{
RegionMigrationProcedureTask, DEFAULT_REGION_MIGRATION_TIMEOUT,
@@ -203,6 +203,12 @@ pub type RegionSupervisorRef = Arc<RegionSupervisor>;
/// The default tick interval.
pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1);
/// Selector for region supervisor.
pub enum RegionSupervisorSelector {
NaiveSelector(SelectorRef),
RegionStatAwareSelector(RegionStatAwareSelectorRef),
}
/// The [`RegionSupervisor`] is used to detect Region failures
/// and initiate Region failover upon detection, ensuring uninterrupted region service.
pub struct RegionSupervisor {
@@ -215,7 +221,7 @@ pub struct RegionSupervisor {
/// The context of [`SelectorRef`]
selector_context: SelectorContext,
/// Candidate node selector.
selector: SelectorRef,
selector: RegionSupervisorSelector,
/// Region migration manager.
region_migration_manager: RegionMigrationManagerRef,
/// The maintenance mode manager.
@@ -288,7 +294,7 @@ impl RegionSupervisor {
event_receiver: Receiver<Event>,
options: PhiAccrualFailureDetectorOptions,
selector_context: SelectorContext,
selector: SelectorRef,
selector: RegionSupervisorSelector,
region_migration_manager: RegionMigrationManagerRef,
maintenance_mode_manager: MaintenanceModeManagerRef,
peer_lookup: PeerLookupServiceRef,
@@ -362,6 +368,7 @@ impl RegionSupervisor {
}
}
// Extracts regions that are migrating(failover), which means they are already being triggered failover.
let migrating_regions = regions
.extract_if(.., |(_, region_id)| {
self.region_migration_manager.tracker().contains(*region_id)
@@ -374,10 +381,43 @@ impl RegionSupervisor {
);
}
warn!("Detects region failures: {:?}", regions);
if regions.is_empty() {
// If all detected regions are failover or migrating, just return.
return;
}
let mut grouped_regions: HashMap<u64, Vec<RegionId>> =
HashMap::with_capacity(regions.len());
for (datanode_id, region_id) in regions {
if let Err(err) = self.do_failover(datanode_id, region_id).await {
error!(err; "Failed to execute region failover for region: {region_id}, datanode: {datanode_id}");
grouped_regions
.entry(datanode_id)
.or_default()
.push(region_id);
}
for (datanode_id, regions) in grouped_regions {
warn!(
"Detects region failures on datanode: {}, regions: {:?}",
datanode_id, regions
);
// We can't use `grouped_regions.keys().cloned().collect::<Vec<_>>()` here
// because there may be false positives in failure detection on the datanode.
// So we only consider the datanode that reports the failure.
let failed_datanodes = [datanode_id];
match self
.generate_failover_tasks(datanode_id, &regions, &failed_datanodes)
.await
{
Ok(tasks) => {
for (task, count) in tasks {
let region_id = task.region_id;
let datanode_id = task.from_peer.id;
if let Err(err) = self.do_failover(task, count).await {
error!(err; "Failed to execute region failover for region: {}, datanode: {}", region_id, datanode_id);
}
}
}
Err(err) => error!(err; "Failed to generate failover tasks"),
}
}
}
@@ -389,49 +429,107 @@ impl RegionSupervisor {
.context(error::MaintenanceModeManagerSnafu)
}
async fn do_failover(&mut self, datanode_id: DatanodeId, region_id: RegionId) -> Result<()> {
let count = *self
.failover_counts
.entry((datanode_id, region_id))
.and_modify(|count| *count += 1)
.or_insert(1);
async fn select_peers(
&self,
from_peer_id: DatanodeId,
regions: &[RegionId],
failure_datanodes: &[DatanodeId],
) -> Result<Vec<(RegionId, Peer)>> {
let exclude_peer_ids = HashSet::from_iter(failure_datanodes.iter().cloned());
match &self.selector {
RegionSupervisorSelector::NaiveSelector(selector) => {
let opt = SelectorOptions {
min_required_items: regions.len(),
allow_duplication: true,
exclude_peer_ids,
};
let peers = selector.select(&self.selector_context, opt).await?;
ensure!(
peers.len() == regions.len(),
error::NoEnoughAvailableNodeSnafu {
required: regions.len(),
available: peers.len(),
select_target: SelectTarget::Datanode,
}
);
let region_peers = regions
.iter()
.zip(peers)
.map(|(region_id, peer)| (*region_id, peer))
.collect::<Vec<_>>();
Ok(region_peers)
}
RegionSupervisorSelector::RegionStatAwareSelector(selector) => {
let peers = selector
.select(
&self.selector_context,
from_peer_id,
regions,
exclude_peer_ids,
)
.await?;
ensure!(
peers.len() == regions.len(),
error::NoEnoughAvailableNodeSnafu {
required: regions.len(),
available: peers.len(),
select_target: SelectTarget::Datanode,
}
);
Ok(peers)
}
}
}
async fn generate_failover_tasks(
&mut self,
from_peer_id: DatanodeId,
regions: &[RegionId],
failed_datanodes: &[DatanodeId],
) -> Result<Vec<(RegionMigrationProcedureTask, u32)>> {
let mut tasks = Vec::with_capacity(regions.len());
let from_peer = self
.peer_lookup
.datanode(datanode_id)
.datanode(from_peer_id)
.await
.context(error::LookupPeerSnafu {
peer_id: datanode_id,
peer_id: from_peer_id,
})?
.context(error::PeerUnavailableSnafu {
peer_id: datanode_id,
peer_id: from_peer_id,
})?;
let mut peers = self
.selector
.select(
&self.selector_context,
SelectorOptions {
min_required_items: 1,
allow_duplication: false,
exclude_peer_ids: HashSet::from([from_peer.id]),
},
)
let region_peers = self
.select_peers(from_peer_id, regions, failed_datanodes)
.await?;
let to_peer = peers.remove(0);
if to_peer.id == from_peer.id {
warn!(
"Skip failover for region: {region_id}, from_peer: {from_peer}, trying to failover to the same peer."
);
return Ok(());
for (region_id, peer) in region_peers {
let count = *self
.failover_counts
.entry((from_peer_id, region_id))
.and_modify(|count| *count += 1)
.or_insert(1);
let task = RegionMigrationProcedureTask {
region_id,
from_peer: from_peer.clone(),
to_peer: peer,
timeout: DEFAULT_REGION_MIGRATION_TIMEOUT * count,
};
tasks.push((task, count));
}
Ok(tasks)
}
async fn do_failover(&mut self, task: RegionMigrationProcedureTask, count: u32) -> Result<()> {
let from_peer_id = task.from_peer.id;
let region_id = task.region_id;
info!(
"Failover for region: {region_id}, from_peer: {from_peer}, to_peer: {to_peer}, tries: {count}"
"Failover for region: {}, from_peer: {}, to_peer: {}, timeout: {:?}, tries: {}",
task.region_id, task.from_peer, task.to_peer, task.timeout, count
);
let task = RegionMigrationProcedureTask {
region_id,
from_peer,
to_peer,
timeout: DEFAULT_REGION_MIGRATION_TIMEOUT * count,
};
if let Err(err) = self.region_migration_manager.submit_procedure(task).await {
return match err {
@@ -439,25 +537,25 @@ impl RegionSupervisor {
MigrationRunning { .. } => {
info!(
"Another region migration is running, skip failover for region: {}, datanode: {}",
region_id, datanode_id
region_id, from_peer_id
);
Ok(())
}
TableRouteNotFound { .. } => {
self.deregister_failure_detectors(vec![(datanode_id, region_id)])
self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
.await;
info!(
"Table route is not found, the table is dropped, removed failover detector for region: {}, datanode: {}",
region_id, datanode_id
region_id, from_peer_id
);
Ok(())
}
LeaderPeerChanged { .. } => {
self.deregister_failure_detectors(vec![(datanode_id, region_id)])
self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
.await;
info!(
"Region's leader peer changed, removed failover detector for region: {}, datanode: {}",
region_id, datanode_id
region_id, from_peer_id
);
Ok(())
}
@@ -521,6 +619,7 @@ pub(crate) mod tests {
use tokio::sync::oneshot;
use tokio::time::sleep;
use super::RegionSupervisorSelector;
use crate::procedure::region_migration::manager::RegionMigrationManager;
use crate::procedure::region_migration::test_util::TestingEnv;
use crate::region::supervisor::{
@@ -548,7 +647,7 @@ pub(crate) mod tests {
rx,
Default::default(),
selector_context,
selector,
RegionSupervisorSelector::NaiveSelector(selector),
region_migration_manager,
maintenance_mode_manager,
peer_lookup,

View File

@@ -23,6 +23,7 @@ pub mod weighted_choose;
use std::collections::HashSet;
use serde::{Deserialize, Serialize};
use store_api::storage::RegionId;
use strum::AsRefStr;
use crate::error;
@@ -36,6 +37,24 @@ pub trait Selector: Send + Sync {
async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result<Self::Output>;
}
/// A selector that aware of region statistics
///
/// It selects the best destination peer for a list of regions.
/// The selection is based on the region statistics, such as the region leader's write throughput.
#[async_trait::async_trait]
pub trait RegionStatAwareSelector: Send + Sync {
type Context;
type Output;
async fn select(
&self,
ctx: &Self::Context,
from_peer_id: u64,
region_ids: &[RegionId],
exclude_peer_ids: HashSet<u64>,
) -> Result<Self::Output>;
}
#[derive(Debug)]
pub struct SelectorOptions {
/// Minimum number of selected results.

View File

@@ -278,7 +278,7 @@ impl KvBackend for LeaderCachedKvBackend {
let remote_res = self.store.batch_get(remote_req).await?;
let put_req = BatchPutRequest {
kvs: remote_res.kvs.clone().into_iter().map(Into::into).collect(),
kvs: remote_res.kvs.clone().into_iter().collect(),
..Default::default()
};
let _ = self.cache.batch_put(put_req).await?;

View File

@@ -42,11 +42,11 @@ pub(crate) use state::MetricEngineState;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::region_engine::{
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
SyncManifestResponse,
BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
RegionStatistic, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
SettableRegionRoleState, SyncManifestResponse,
};
use store_api::region_request::{BatchRegionDdlRequest, RegionRequest};
use store_api::region_request::{BatchRegionDdlRequest, RegionOpenRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
use crate::config::EngineConfig;
@@ -131,6 +131,17 @@ impl RegionEngine for MetricEngine {
METRIC_ENGINE_NAME
}
async fn handle_batch_open_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<BatchResponses, BoxedError> {
self.inner
.handle_batch_open_requests(parallelism, requests)
.await
.map_err(BoxedError::new)
}
async fn handle_batch_ddl_requests(
&self,
batch_request: BatchRegionDdlRequest,

View File

@@ -14,24 +14,80 @@
//! Open a metric region.
use std::collections::HashSet;
use common_telemetry::info;
use mito2::engine::MITO_ENGINE_NAME;
use object_store::util::join_dir;
use snafu::{OptionExt, ResultExt};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metric_engine_consts::{DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR};
use store_api::region_engine::RegionEngine;
use store_api::region_engine::{BatchResponses, RegionEngine};
use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
use store_api::storage::RegionId;
use crate::engine::create::region_options_for_metadata_region;
use crate::engine::options::{set_data_region_options, PhysicalRegionOptions};
use crate::engine::MetricEngineInner;
use crate::error::{OpenMitoRegionSnafu, PhysicalRegionNotFoundSnafu, Result};
use crate::error::{
BatchOpenMitoRegionSnafu, OpenMitoRegionSnafu, PhysicalRegionNotFoundSnafu, Result,
};
use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_REGION_COUNT};
use crate::utils;
impl MetricEngineInner {
pub async fn handle_batch_open_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<BatchResponses> {
// We need to open metadata region and data region for each request.
let mut all_requests = Vec::with_capacity(requests.len() * 2);
let mut physical_region_ids = Vec::with_capacity(requests.len());
let mut data_region_ids = HashSet::with_capacity(requests.len());
for (region_id, request) in requests {
if !request.is_physical_table() {
continue;
}
let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
let metadata_region_id = utils::to_metadata_region_id(region_id);
let data_region_id = utils::to_data_region_id(region_id);
let (open_metadata_region_request, open_data_region_request) =
self.transform_open_physical_region_request(request);
all_requests.push((metadata_region_id, open_metadata_region_request));
all_requests.push((data_region_id, open_data_region_request));
physical_region_ids.push((region_id, physical_region_options));
data_region_ids.insert(data_region_id);
}
let results = self
.mito
.handle_batch_open_requests(parallelism, all_requests)
.await
.context(BatchOpenMitoRegionSnafu {})?
.into_iter()
.filter(|(region_id, _)| data_region_ids.contains(region_id))
.collect::<Vec<_>>();
for (physical_region_id, physical_region_options) in physical_region_ids {
let primary_key_encoding = self
.mito
.get_primary_key_encoding(physical_region_id)
.context(PhysicalRegionNotFoundSnafu {
region_id: physical_region_id,
})?;
self.recover_states(
physical_region_id,
primary_key_encoding,
physical_region_options,
)
.await?;
}
Ok(results)
}
/// Open a metric region.
///
/// Only open requests to a physical region matter. Those to logical regions are
@@ -69,12 +125,15 @@ impl MetricEngineInner {
}
}
/// Invokes mito engine to open physical regions (data and metadata).
async fn open_physical_region(
/// Transform the open request to open metadata region and data region.
///
/// Returns:
/// - The open request for metadata region.
/// - The open request for data region.
fn transform_open_physical_region_request(
&self,
region_id: RegionId,
request: RegionOpenRequest,
) -> Result<AffectedRows> {
) -> (RegionOpenRequest, RegionOpenRequest) {
let metadata_region_dir = join_dir(&request.region_dir, METADATA_REGION_SUBDIR);
let data_region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR);
@@ -98,8 +157,19 @@ impl MetricEngineInner {
skip_wal_replay: request.skip_wal_replay,
};
(open_metadata_region_request, open_data_region_request)
}
/// Invokes mito engine to open physical regions (data and metadata).
async fn open_physical_region(
&self,
region_id: RegionId,
request: RegionOpenRequest,
) -> Result<AffectedRows> {
let metadata_region_id = utils::to_metadata_region_id(region_id);
let data_region_id = utils::to_data_region_id(region_id);
let (open_metadata_region_request, open_data_region_request) =
self.transform_open_physical_region_request(request);
self.mito
.handle_request(

View File

@@ -42,6 +42,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to batch open mito region"))]
BatchOpenMitoRegion {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to close mito region, region id: {}", region_id))]
CloseMitoRegion {
region_id: RegionId,
@@ -337,7 +344,8 @@ impl ErrorExt for Error {
| MitoCatchupOperation { source, .. }
| MitoFlushOperation { source, .. }
| MitoDeleteOperation { source, .. }
| MitoSyncOperation { source, .. } => source.status_code(),
| MitoSyncOperation { source, .. }
| BatchOpenMitoRegion { source, .. } => source.status_code(),
EncodePrimaryKey { source, .. } => source.status_code(),

View File

@@ -302,7 +302,10 @@ impl PartitionTreeMemtable {
fn update_stats(&self, metrics: &WriteMetrics) {
// Only let the tracker tracks value bytes.
self.alloc_tracker.on_allocation(metrics.value_bytes);
metrics.update_timestamp_range(&self.max_timestamp, &self.min_timestamp);
self.max_timestamp
.fetch_max(metrics.max_ts, Ordering::SeqCst);
self.min_timestamp
.fetch_min(metrics.min_ts, Ordering::SeqCst);
}
}

View File

@@ -14,8 +14,6 @@
//! Internal metrics of the memtable.
use std::sync::atomic::{AtomicI64, Ordering};
/// Metrics of writing memtables.
pub(crate) struct WriteMetrics {
/// Size allocated by keys.
@@ -28,51 +26,6 @@ pub(crate) struct WriteMetrics {
pub(crate) max_ts: i64,
}
impl WriteMetrics {
/// Update the min/max timestamp range according to current write metric.
pub(crate) fn update_timestamp_range(&self, prev_max_ts: &AtomicI64, prev_min_ts: &AtomicI64) {
loop {
let current_min = prev_min_ts.load(Ordering::Relaxed);
if self.min_ts >= current_min {
break;
}
let Err(updated) = prev_min_ts.compare_exchange(
current_min,
self.min_ts,
Ordering::Relaxed,
Ordering::Relaxed,
) else {
break;
};
if updated == self.min_ts {
break;
}
}
loop {
let current_max = prev_max_ts.load(Ordering::Relaxed);
if self.max_ts <= current_max {
break;
}
let Err(updated) = prev_max_ts.compare_exchange(
current_max,
self.max_ts,
Ordering::Relaxed,
Ordering::Relaxed,
) else {
break;
};
if updated == self.max_ts {
break;
}
}
}
}
impl Default for WriteMetrics {
fn default() -> Self {
Self {

View File

@@ -147,7 +147,8 @@ impl TimeSeriesMemtable {
fn update_stats(&self, stats: WriteMetrics) {
self.alloc_tracker
.on_allocation(stats.key_bytes + stats.value_bytes);
stats.update_timestamp_range(&self.max_timestamp, &self.min_timestamp);
self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
}
fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {

View File

@@ -363,9 +363,9 @@ mod tests {
builder
.push_field_array(
*column_id,
Arc::new(Int64Array::from_iter_values(
std::iter::repeat(*field).take(num_rows),
)),
Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
*field, num_rows,
))),
)
.unwrap();
}

View File

@@ -322,13 +322,10 @@ impl ScanRegion {
let memtables: Vec<_> = memtables
.into_iter()
.filter(|mem| {
if mem.is_empty() {
// check if memtable is empty by reading stats.
let Some((start, end)) = mem.stats().time_range() else {
return false;
}
let stats = mem.stats();
// Safety: the memtable is not empty.
let (start, end) = stats.time_range().unwrap();
};
// The time range of the memtable is inclusive.
let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
memtable_range.intersects(&time_range)

View File

@@ -346,7 +346,6 @@ impl BloomFilterIndexer {
#[cfg(test)]
pub(crate) mod tests {
use std::iter;
use api::v1::SemanticType;
use datatypes::data_type::ConcreteDataType;
@@ -461,15 +460,15 @@ pub(crate) mod tests {
Batch::new(
primary_key,
Arc::new(UInt64Vector::from_iter_values(
iter::repeat(0).take(num_rows),
)),
Arc::new(UInt64Vector::from_iter_values(
iter::repeat(0).take(num_rows),
)),
Arc::new(UInt8Vector::from_iter_values(
iter::repeat(1).take(num_rows),
)),
Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
0, num_rows,
))),
Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
0, num_rows,
))),
Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
1, num_rows,
))),
vec![u64_field],
)
.unwrap()

View File

@@ -489,12 +489,12 @@ mod tests {
Arc::new(UInt64Vector::from_iter_values(
(0..num_rows).map(|n| n as u64),
)),
Arc::new(UInt64Vector::from_iter_values(
std::iter::repeat(0).take(num_rows),
)),
Arc::new(UInt8Vector::from_iter_values(
std::iter::repeat(1).take(num_rows),
)),
Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
0, num_rows,
))),
Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
1, num_rows,
))),
vec![
BatchColumn {
column_id: 1,

View File

@@ -326,7 +326,6 @@ impl InvertedIndexer {
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
use std::iter;
use api::v1::SemanticType;
use datafusion_expr::{binary_expr, col, lit, Expr as DfExpr, Operator};
@@ -424,15 +423,15 @@ mod tests {
Batch::new(
primary_key,
Arc::new(UInt64Vector::from_iter_values(
iter::repeat(0).take(num_rows),
)),
Arc::new(UInt64Vector::from_iter_values(
iter::repeat(0).take(num_rows),
)),
Arc::new(UInt8Vector::from_iter_values(
iter::repeat(1).take(num_rows),
)),
Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
0, num_rows,
))),
Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
0, num_rows,
))),
Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
1, num_rows,
))),
vec![u64_field],
)
.unwrap()

View File

@@ -134,6 +134,7 @@ impl WriteFormat {
/// Helper for reading the SST format.
pub struct ReadFormat {
/// The metadata stored in the SST.
metadata: RegionMetadataRef,
/// SST file schema.
arrow_schema: SchemaRef,
@@ -305,17 +306,23 @@ impl ReadFormat {
&self,
row_groups: &[impl Borrow<RowGroupMetaData>],
column_id: ColumnId,
) -> Option<ArrayRef> {
let column = self.metadata.column_by_id(column_id)?;
) -> StatValues {
let Some(column) = self.metadata.column_by_id(column_id) else {
// No such column in the SST.
return StatValues::NoColumn;
};
match column.semantic_type {
SemanticType::Tag => self.tag_values(row_groups, column, true),
SemanticType::Field => {
let index = self.field_id_to_index.get(&column_id)?;
Self::column_values(row_groups, column, *index, true)
// Safety: `field_id_to_index` is initialized by the semantic type.
let index = self.field_id_to_index.get(&column_id).unwrap();
let stats = Self::column_values(row_groups, column, *index, true);
StatValues::from_stats_opt(stats)
}
SemanticType::Timestamp => {
let index = self.time_index_position();
Self::column_values(row_groups, column, index, true)
let stats = Self::column_values(row_groups, column, index, true);
StatValues::from_stats_opt(stats)
}
}
}
@@ -325,17 +332,23 @@ impl ReadFormat {
&self,
row_groups: &[impl Borrow<RowGroupMetaData>],
column_id: ColumnId,
) -> Option<ArrayRef> {
let column = self.metadata.column_by_id(column_id)?;
) -> StatValues {
let Some(column) = self.metadata.column_by_id(column_id) else {
// No such column in the SST.
return StatValues::NoColumn;
};
match column.semantic_type {
SemanticType::Tag => self.tag_values(row_groups, column, false),
SemanticType::Field => {
let index = self.field_id_to_index.get(&column_id)?;
Self::column_values(row_groups, column, *index, false)
// Safety: `field_id_to_index` is initialized by the semantic type.
let index = self.field_id_to_index.get(&column_id).unwrap();
let stats = Self::column_values(row_groups, column, *index, false);
StatValues::from_stats_opt(stats)
}
SemanticType::Timestamp => {
let index = self.time_index_position();
Self::column_values(row_groups, column, index, false)
let stats = Self::column_values(row_groups, column, index, false);
StatValues::from_stats_opt(stats)
}
}
}
@@ -345,17 +358,23 @@ impl ReadFormat {
&self,
row_groups: &[impl Borrow<RowGroupMetaData>],
column_id: ColumnId,
) -> Option<ArrayRef> {
let column = self.metadata.column_by_id(column_id)?;
) -> StatValues {
let Some(column) = self.metadata.column_by_id(column_id) else {
// No such column in the SST.
return StatValues::NoColumn;
};
match column.semantic_type {
SemanticType::Tag => None,
SemanticType::Tag => StatValues::NoStats,
SemanticType::Field => {
let index = self.field_id_to_index.get(&column_id)?;
Self::column_null_counts(row_groups, *index)
// Safety: `field_id_to_index` is initialized by the semantic type.
let index = self.field_id_to_index.get(&column_id).unwrap();
let stats = Self::column_null_counts(row_groups, *index);
StatValues::from_stats_opt(stats)
}
SemanticType::Timestamp => {
let index = self.time_index_position();
Self::column_null_counts(row_groups, index)
let stats = Self::column_null_counts(row_groups, index);
StatValues::from_stats_opt(stats)
}
}
}
@@ -390,8 +409,7 @@ impl ReadFormat {
row_groups: &[impl Borrow<RowGroupMetaData>],
column: &ColumnMetadata,
is_min: bool,
) -> Option<ArrayRef> {
let primary_key_encoding = self.metadata.primary_key_encoding;
) -> StatValues {
let is_first_tag = self
.metadata
.primary_key
@@ -400,9 +418,28 @@ impl ReadFormat {
.unwrap_or(false);
if !is_first_tag {
// Only the min-max of the first tag is available in the primary key.
return None;
return StatValues::NoStats;
}
StatValues::from_stats_opt(self.first_tag_values(row_groups, column, is_min))
}
/// Returns min/max values of the first tag.
/// Returns None if the tag does not have statistics.
fn first_tag_values(
&self,
row_groups: &[impl Borrow<RowGroupMetaData>],
column: &ColumnMetadata,
is_min: bool,
) -> Option<ArrayRef> {
debug_assert!(self
.metadata
.primary_key
.first()
.map(|id| *id == column.column_id)
.unwrap_or(false));
let primary_key_encoding = self.metadata.primary_key_encoding;
let converter = build_primary_key_codec_with_fields(
primary_key_encoding,
[(
@@ -452,6 +489,7 @@ impl ReadFormat {
}
/// Returns min/max values of specific non-tag columns.
/// Returns None if the column does not have statistics.
fn column_values(
row_groups: &[impl Borrow<RowGroupMetaData>],
column: &ColumnMetadata,
@@ -544,6 +582,29 @@ impl ReadFormat {
}
}
/// Values of column statistics of the SST.
///
/// It also distinguishes the case that a column is not found and
/// the column exists but has no statistics.
pub enum StatValues {
/// Values of each row group.
Values(ArrayRef),
/// No such column.
NoColumn,
/// Column exists but has no statistics.
NoStats,
}
impl StatValues {
/// Creates a new `StatValues` instance from optional statistics.
pub fn from_stats_opt(stats: Option<ArrayRef>) -> Self {
match stats {
Some(stats) => StatValues::Values(stats),
None => StatValues::NoStats,
}
}
}
#[cfg(test)]
impl ReadFormat {
/// Creates a helper with existing `metadata` and all columns.
@@ -755,7 +816,7 @@ mod tests {
));
let mut keys = vec![];
for (index, num_rows) in pk_row_nums.iter().map(|v| v.1).enumerate() {
keys.extend(std::iter::repeat(index as u32).take(num_rows));
keys.extend(std::iter::repeat_n(index as u32, num_rows));
}
let keys = UInt32Array::from(keys);
Arc::new(DictionaryArray::new(keys, values))

View File

@@ -25,7 +25,7 @@ use parquet::file::metadata::RowGroupMetaData;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::format::{ReadFormat, StatValues};
/// Statistics for pruning row groups.
pub(crate) struct RowGroupPruningStats<'a, T> {
@@ -100,16 +100,18 @@ impl<T: Borrow<RowGroupMetaData>> PruningStatistics for RowGroupPruningStats<'_,
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
let column_id = self.column_id_to_prune(&column.name)?;
match self.read_format.min_values(self.row_groups, column_id) {
Some(values) => Some(values),
None => self.compat_default_value(&column.name),
StatValues::Values(values) => Some(values),
StatValues::NoColumn => self.compat_default_value(&column.name),
StatValues::NoStats => None,
}
}
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
let column_id = self.column_id_to_prune(&column.name)?;
match self.read_format.max_values(self.row_groups, column_id) {
Some(values) => Some(values),
None => self.compat_default_value(&column.name),
StatValues::Values(values) => Some(values),
StatValues::NoColumn => self.compat_default_value(&column.name),
StatValues::NoStats => None,
}
}
@@ -118,10 +120,12 @@ impl<T: Borrow<RowGroupMetaData>> PruningStatistics for RowGroupPruningStats<'_,
}
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
let Some(column_id) = self.column_id_to_prune(&column.name) else {
return self.compat_null_count(&column.name);
};
self.read_format.null_counts(self.row_groups, column_id)
let column_id = self.column_id_to_prune(&column.name)?;
match self.read_format.null_counts(self.row_groups, column_id) {
StatValues::Values(values) => Some(values),
StatValues::NoColumn => self.compat_null_count(&column.name),
StatValues::NoStats => None,
}
}
fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {

View File

@@ -85,11 +85,9 @@ impl ImpureDefaultFiller {
.schema
.iter()
.filter_map(|schema| {
if self.impure_columns.contains_key(&schema.column_name) {
Some(&schema.column_name)
} else {
None
}
self.impure_columns
.contains_key(&schema.column_name)
.then_some(&schema.column_name)
})
.collect();

View File

@@ -325,7 +325,7 @@ impl std::str::FromStr for Pattern {
impl Pattern {
fn check(&self) -> Result<()> {
if self.len() == 0 {
if self.is_empty() {
return DissectEmptyPatternSnafu.fail();
}

View File

@@ -20,7 +20,7 @@ use arrow_schema::DataType;
use catalog::table_source::DfTableSourceProvider;
use common_function::aggr::{
GeoPathAccumulator, HllState, UddSketchState, GEO_PATH_NAME, HLL_MERGE_NAME, HLL_NAME,
UDDSKETCH_STATE_NAME,
UDDSKETCH_MERGE_NAME, UDDSKETCH_STATE_NAME,
};
use common_function::scalars::udf::create_udf;
use common_query::logical_plan::create_aggregate_function;
@@ -165,7 +165,9 @@ impl ContextProvider for DfContextProviderAdapter {
fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
if name == UDDSKETCH_STATE_NAME {
return Some(Arc::new(UddSketchState::udf_impl()));
return Some(Arc::new(UddSketchState::state_udf_impl()));
} else if name == UDDSKETCH_MERGE_NAME {
return Some(Arc::new(UddSketchState::merge_udf_impl()));
} else if name == HLL_NAME {
return Some(Arc::new(HllState::state_udf_impl()));
} else if name == HLL_MERGE_NAME {

View File

@@ -14,7 +14,6 @@
#![feature(let_chains)]
#![feature(int_roundings)]
#![feature(trait_upcasting)]
#![feature(try_blocks)]
#![feature(stmt_expr_attributes)]
#![feature(iterator_try_collect)]
@@ -28,7 +27,7 @@ pub mod error;
pub mod executor;
pub mod log_query;
pub mod metrics;
mod optimizer;
pub mod optimizer;
pub mod options;
pub mod parser;
mod part_sort;

View File

@@ -348,7 +348,7 @@ impl PartSortStream {
&self,
sort_column: &ArrayRef,
) -> datafusion_common::Result<Option<usize>> {
if sort_column.len() == 0 {
if sort_column.is_empty() {
return Ok(Some(0));
}

View File

@@ -128,7 +128,8 @@ impl SubstraitPlanDecoder for DefaultPlanDecoder {
session_state
.register_udf(udf)
.context(RegisterUdfSnafu { name: func.name() })?;
let _ = session_state.register_udaf(Arc::new(UddSketchState::udf_impl()));
let _ = session_state.register_udaf(Arc::new(UddSketchState::state_udf_impl()));
let _ = session_state.register_udaf(Arc::new(UddSketchState::merge_udf_impl()));
let _ = session_state.register_udaf(Arc::new(HllState::state_udf_impl()));
let _ = session_state.register_udaf(Arc::new(HllState::merge_udf_impl()));
let _ = session_state.register_udaf(Arc::new(GeoPathAccumulator::udf_impl()));

View File

@@ -117,7 +117,7 @@ where
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.inner.poll_ready(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(r) => Poll::Ready(r.map_err(Into::into)),
Poll::Ready(r) => Poll::Ready(r),
}
}

View File

@@ -17,7 +17,6 @@
#![feature(exclusive_wrapper)]
#![feature(let_chains)]
#![feature(if_let_guard)]
#![feature(trait_upcasting)]
use datafusion_expr::LogicalPlan;
use datatypes::schema::Schema;

View File

@@ -55,7 +55,7 @@ pub fn transform_statements(stmts: &mut Vec<Statement>) -> Result<()> {
}
}
visit_expressions_mut(stmts, |expr| {
let _ = visit_expressions_mut(stmts, |expr| {
for rule in RULES.iter() {
rule.visit_expr(expr)?;
}

View File

@@ -290,7 +290,7 @@ impl RegionMetadata {
pub fn project(&self, projection: &[ColumnId]) -> Result<RegionMetadata> {
// check time index
ensure!(
projection.iter().any(|id| *id == self.time_index),
projection.contains(&self.time_index),
TimeIndexNotFoundSnafu
);

View File

@@ -95,7 +95,7 @@ impl TableProvider for DfTableProviderAdapter {
filters: &[Expr],
limit: Option<usize>,
) -> DfResult<Arc<dyn ExecutionPlan>> {
let filters: Vec<Expr> = filters.iter().map(Clone::clone).map(Into::into).collect();
let filters: Vec<Expr> = filters.iter().map(Clone::clone).collect();
let request = {
let mut request = self.scan_req.lock().unwrap();
request.filters = filters;

View File

@@ -85,11 +85,7 @@ pub struct UnstableTestVariables {
pub fn load_unstable_test_env_variables() -> UnstableTestVariables {
let _ = dotenv::dotenv();
let binary_path = env::var(GT_FUZZ_BINARY_PATH).expect("GT_FUZZ_BINARY_PATH not found");
let root_dir = if let Ok(root) = env::var(GT_FUZZ_INSTANCE_ROOT_DIR) {
Some(root)
} else {
None
};
let root_dir = env::var(GT_FUZZ_INSTANCE_ROOT_DIR).ok();
UnstableTestVariables {
binary_path,

View File

@@ -157,7 +157,7 @@ async fn execute_unstable_create_table(
}
Err(err) => {
// FIXME(weny): support to retry it later.
if matches!(err, sqlx::Error::PoolTimedOut { .. }) {
if matches!(err, sqlx::Error::PoolTimedOut) {
warn!("ignore pool timeout, sql: {sql}");
continue;
}

View File

@@ -489,10 +489,7 @@ async fn create_datanode_client(datanode: &Datanode) -> (String, Client) {
if let Some(client) = client {
Ok(TokioIo::new(client))
} else {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Client already taken",
))
Err(std::io::Error::other("Client already taken"))
}
}
}),

View File

@@ -52,7 +52,41 @@ select uddsketch_calc(0.95, uddsketch_state(128, 0.01, `value`)) from test_uddsk
| 100.49456770856492 |
+----------------------------------------------------------------------------------------------+
CREATE TABLE grouped_uddsketch (
`state` BINARY,
id_group INT PRIMARY KEY,
`ts` timestamp time index default now()
);
Affected Rows: 0
INSERT INTO grouped_uddsketch (`state`, id_group) SELECT uddsketch_state(128, 0.01, `value`), `id`/5*5 as id_group FROM test_uddsketch GROUP BY id_group;
Affected Rows: 3
SELECT uddsketch_calc(0.1, uddsketch_merge(128, 0.01, `state`)) FROM grouped_uddsketch;
+------------------------------------------------------------------------------------------------+
| uddsketch_calc(Float64(0.1),uddsketch_merge(Int64(128),Float64(0.01),grouped_uddsketch.state)) |
+------------------------------------------------------------------------------------------------+
| 19.886670240866184 |
+------------------------------------------------------------------------------------------------+
-- should fail
SELECT uddsketch_calc(0.1, uddsketch_merge(128, 0.1, `state`)) FROM grouped_uddsketch;
Error: 3001(EngineExecuteQuery), Error during planning: Merging UDDSketch with different parameters: arguments=(128, 0.1) vs actual input=(128, 0.01)
-- should fail
SELECT uddsketch_calc(0.1, uddsketch_merge(64, 0.01, `state`)) FROM grouped_uddsketch;
Error: 3001(EngineExecuteQuery), Error during planning: Merging UDDSketch with different parameters: arguments=(64, 0.01) vs actual input=(128, 0.01)
drop table test_uddsketch;
Affected Rows: 0
drop table grouped_uddsketch;
Affected Rows: 0

View File

@@ -24,4 +24,21 @@ select uddsketch_calc(0.75, uddsketch_state(128, 0.01, `value`)) from test_uddsk
select uddsketch_calc(0.95, uddsketch_state(128, 0.01, `value`)) from test_uddsketch;
CREATE TABLE grouped_uddsketch (
`state` BINARY,
id_group INT PRIMARY KEY,
`ts` timestamp time index default now()
);
INSERT INTO grouped_uddsketch (`state`, id_group) SELECT uddsketch_state(128, 0.01, `value`), `id`/5*5 as id_group FROM test_uddsketch GROUP BY id_group;
SELECT uddsketch_calc(0.1, uddsketch_merge(128, 0.01, `state`)) FROM grouped_uddsketch;
-- should fail
SELECT uddsketch_calc(0.1, uddsketch_merge(128, 0.1, `state`)) FROM grouped_uddsketch;
-- should fail
SELECT uddsketch_calc(0.1, uddsketch_merge(64, 0.01, `state`)) FROM grouped_uddsketch;
drop table test_uddsketch;
drop table grouped_uddsketch;

View File

@@ -31,6 +31,15 @@ FROM
Affected Rows: 0
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
distinct_basic
VALUES

View File

@@ -23,6 +23,9 @@ FROM
distinct_basic;
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO
distinct_basic
VALUES

View File

@@ -44,6 +44,15 @@ ADMIN FLUSH_FLOW('test_numbers_basic');
+----------------------------------------+
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
@@ -101,6 +110,16 @@ GROUP BY
Affected Rows: 0
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
@@ -118,6 +137,15 @@ SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
SHOW CREATE FLOW test_numbers_basic;
+--------------------+---------------------------------------------------------------------------------------+

View File

@@ -20,6 +20,9 @@ SHOW CREATE TABLE out_num_cnt_basic;
ADMIN FLUSH_FLOW('test_numbers_basic');
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
SHOW CREATE TABLE out_num_cnt_basic;
SHOW CREATE FLOW test_numbers_basic;
@@ -44,10 +47,16 @@ FROM
numbers_input_basic
GROUP BY
ts;
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
SHOW CREATE TABLE out_num_cnt_basic;
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
SHOW CREATE FLOW test_numbers_basic;
SHOW CREATE TABLE out_num_cnt_basic;

View File

@@ -62,6 +62,15 @@ SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
numbers_input_basic
VALUES
@@ -206,6 +215,15 @@ SHOW CREATE TABLE out_basic;
+-----------+---------------------------------------------+
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
@@ -306,6 +324,15 @@ ADMIN FLUSH_FLOW('test_distinct_basic');
+-----------------------------------------+
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
distinct_basic
VALUES
@@ -1665,6 +1692,15 @@ ADMIN FLUSH_FLOW('test_numbers_basic');
+----------------------------------------+
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
numbers_input_basic
VALUES

View File

@@ -24,6 +24,9 @@ ADMIN FLUSH_FLOW('test_numbers_basic');
SHOW CREATE TABLE out_num_cnt_basic;
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO
numbers_input_basic
VALUES
@@ -91,6 +94,9 @@ FROM
SHOW CREATE TABLE out_basic;
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
@@ -130,6 +136,9 @@ SHOW CREATE TABLE out_distinct_basic;
ADMIN FLUSH_FLOW('test_distinct_basic');
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO
distinct_basic
VALUES
@@ -788,6 +797,9 @@ SHOW CREATE TABLE out_num_cnt_basic;
ADMIN FLUSH_FLOW('test_numbers_basic');
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO
numbers_input_basic
VALUES

View File

@@ -730,10 +730,21 @@ SELECT key FROM api_stats;
+-----+
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 5s
INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (now(), '2', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1);
Affected Rows: 1
-- wait more time so flownode have time to recover flows
-- SQLNESS SLEEP 5s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('api_stats_flow');

View File

@@ -399,8 +399,13 @@ ADMIN FLUSH_FLOW('api_stats_flow');
SELECT key FROM api_stats;
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 5s
INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (now(), '2', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1);
-- wait more time so flownode have time to recover flows
-- SQLNESS SLEEP 5s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('api_stats_flow');

View File

@@ -0,0 +1,320 @@
CREATE TABLE access_log (
"url" STRING,
user_id BIGINT,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY ("url", user_id)
);
Affected Rows: 0
CREATE TABLE access_log_10s (
"url" STRING,
time_window timestamp time INDEX,
state BINARY,
PRIMARY KEY ("url")
);
Affected Rows: 0
CREATE FLOW calc_access_log_10s SINK TO access_log_10s
AS
SELECT
"url",
date_bin('10s'::INTERVAL, ts) AS time_window,
hll(user_id) AS state
FROM
access_log
GROUP BY
"url",
time_window;
Affected Rows: 0
-- insert 4 rows of data
INSERT INTO access_log VALUES
("/dashboard", 1, "2025-03-04 00:00:00"),
("/dashboard", 1, "2025-03-04 00:00:01"),
("/dashboard", 2, "2025-03-04 00:00:05"),
("/not_found", 3, "2025-03-04 00:00:11"),
("/dashboard", 4, "2025-03-04 00:00:15");
Affected Rows: 5
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_access_log_10s');
+-----------------------------------------+
| ADMIN FLUSH_FLOW('calc_access_log_10s') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
-- query should return 3 rows
-- SQLNESS SORT_RESULT 3 1
SELECT "url", time_window FROM access_log_10s
ORDER BY
time_window;
+------------+---------------------+
| url | time_window |
+------------+---------------------+
| /dashboard | 2025-03-04T00:00:00 |
| /dashboard | 2025-03-04T00:00:10 |
| /not_found | 2025-03-04T00:00:10 |
+------------+---------------------+
-- use hll_count to query the approximate data in access_log_10s
-- SQLNESS SORT_RESULT 3 1
SELECT "url", time_window, hll_count(state) FROM access_log_10s
ORDER BY
time_window;
+------------+---------------------+---------------------------------+
| url | time_window | hll_count(access_log_10s.state) |
+------------+---------------------+---------------------------------+
| /dashboard | 2025-03-04T00:00:00 | 2 |
| /dashboard | 2025-03-04T00:00:10 | 1 |
| /not_found | 2025-03-04T00:00:10 | 1 |
+------------+---------------------+---------------------------------+
-- further, we can aggregate 10 seconds of data to every minute, by using hll_merge to merge 10 seconds of hyperloglog state
-- SQLNESS SORT_RESULT 3 1
SELECT
"url",
date_bin('1 minute'::INTERVAL, time_window) AS time_window_1m,
hll_count(hll_merge(state)) as uv_per_min
FROM
access_log_10s
GROUP BY
"url",
time_window_1m
ORDER BY
time_window_1m;
+------------+---------------------+------------+
| url | time_window_1m | uv_per_min |
+------------+---------------------+------------+
| /dashboard | 2025-03-04T00:00:00 | 3 |
| /not_found | 2025-03-04T00:00:00 | 1 |
+------------+---------------------+------------+
DROP FLOW calc_access_log_10s;
Affected Rows: 0
DROP TABLE access_log_10s;
Affected Rows: 0
DROP TABLE access_log;
Affected Rows: 0
CREATE TABLE percentile_base (
"id" INT PRIMARY KEY,
"value" DOUBLE,
ts timestamp(0) time index
);
Affected Rows: 0
CREATE TABLE percentile_5s (
"percentile_state" BINARY,
time_window timestamp(0) time index
);
Affected Rows: 0
CREATE FLOW calc_percentile_5s SINK TO percentile_5s
AS
SELECT
uddsketch_state(128, 0.01, "value") AS "value",
date_bin('5 seconds'::INTERVAL, ts) AS time_window
FROM
percentile_base
WHERE
"value" > 0 AND "value" < 70
GROUP BY
time_window;
Affected Rows: 0
INSERT INTO percentile_base ("id", "value", ts) VALUES
(1, 10.0, 1),
(2, 20.0, 2),
(3, 30.0, 3),
(4, 40.0, 4),
(5, 50.0, 5),
(6, 60.0, 6),
(7, 70.0, 7),
(8, 80.0, 8),
(9, 90.0, 9),
(10, 100.0, 10);
Affected Rows: 10
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_percentile_5s');
+----------------------------------------+
| ADMIN FLUSH_FLOW('calc_percentile_5s') |
+----------------------------------------+
| FLOW_FLUSHED |
+----------------------------------------+
SELECT
time_window,
uddsketch_calc(0.99, `percentile_state`) AS p99
FROM
percentile_5s
ORDER BY
time_window;
+---------------------+--------------------+
| time_window | p99 |
+---------------------+--------------------+
| 1970-01-01T00:00:00 | 40.04777053326359 |
| 1970-01-01T00:00:05 | 59.745049810145126 |
+---------------------+--------------------+
DROP FLOW calc_percentile_5s;
Affected Rows: 0
DROP TABLE percentile_5s;
Affected Rows: 0
DROP TABLE percentile_base;
Affected Rows: 0
CREATE TABLE percentile_base (
"id" INT PRIMARY KEY,
"value" DOUBLE,
ts timestamp(0) time index
);
Affected Rows: 0
CREATE TABLE percentile_5s (
"percentile_state" BINARY,
time_window timestamp(0) time index
);
Affected Rows: 0
CREATE TABLE percentile_10s (
"percentile_state" BINARY,
time_window timestamp(0) time index
);
Affected Rows: 0
CREATE FLOW calc_percentile_5s SINK TO percentile_5s
AS
SELECT
uddsketch_state(128, 0.01, CASE WHEN "value" > 0 AND "value" < 70 THEN "value" ELSE NULL END) AS "value",
date_bin('5 seconds'::INTERVAL, ts) AS time_window
FROM
percentile_base
GROUP BY
time_window;
Affected Rows: 0
CREATE FLOW calc_percentile_10s SINK TO percentile_10s
AS
SELECT
uddsketch_merge(128, 0.01, percentile_state),
date_bin('10 seconds'::INTERVAL, time_window) AS time_window
FROM
percentile_5s
GROUP BY
date_bin('10 seconds'::INTERVAL, time_window);
Affected Rows: 0
INSERT INTO percentile_base ("id", "value", ts) VALUES
(1, 10.0, 1),
(2, 20.0, 2),
(3, 30.0, 3),
(4, 40.0, 4),
(5, 50.0, 5),
(6, 60.0, 6),
(7, 70.0, 7),
(8, 80.0, 8),
(9, 90.0, 9),
(10, 100.0, 10);
Affected Rows: 10
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_percentile_5s');
+----------------------------------------+
| ADMIN FLUSH_FLOW('calc_percentile_5s') |
+----------------------------------------+
| FLOW_FLUSHED |
+----------------------------------------+
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_percentile_10s');
+-----------------------------------------+
| ADMIN FLUSH_FLOW('calc_percentile_10s') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
SELECT
time_window,
uddsketch_calc(0.99, percentile_state) AS p99
FROM
percentile_5s
ORDER BY
time_window;
+---------------------+--------------------+
| time_window | p99 |
+---------------------+--------------------+
| 1970-01-01T00:00:00 | 40.04777053326359 |
| 1970-01-01T00:00:05 | 59.745049810145126 |
| 1970-01-01T00:00:10 | |
+---------------------+--------------------+
SELECT
time_window,
uddsketch_calc(0.99, percentile_state) AS p99
FROM
percentile_10s
ORDER BY
time_window;
+---------------------+--------------------+
| time_window | p99 |
+---------------------+--------------------+
| 1970-01-01T00:00:00 | 59.745049810145126 |
| 1970-01-01T00:00:10 | |
+---------------------+--------------------+
DROP FLOW calc_percentile_5s;
Affected Rows: 0
DROP FLOW calc_percentile_10s;
Affected Rows: 0
DROP TABLE percentile_5s;
Affected Rows: 0
DROP TABLE percentile_10s;
Affected Rows: 0
DROP TABLE percentile_base;
Affected Rows: 0

View File

@@ -0,0 +1,192 @@
CREATE TABLE access_log (
"url" STRING,
user_id BIGINT,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY ("url", user_id)
);
CREATE TABLE access_log_10s (
"url" STRING,
time_window timestamp time INDEX,
state BINARY,
PRIMARY KEY ("url")
);
CREATE FLOW calc_access_log_10s SINK TO access_log_10s
AS
SELECT
"url",
date_bin('10s'::INTERVAL, ts) AS time_window,
hll(user_id) AS state
FROM
access_log
GROUP BY
"url",
time_window;
-- insert 4 rows of data
INSERT INTO access_log VALUES
("/dashboard", 1, "2025-03-04 00:00:00"),
("/dashboard", 1, "2025-03-04 00:00:01"),
("/dashboard", 2, "2025-03-04 00:00:05"),
("/not_found", 3, "2025-03-04 00:00:11"),
("/dashboard", 4, "2025-03-04 00:00:15");
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_access_log_10s');
-- query should return 3 rows
-- SQLNESS SORT_RESULT 3 1
SELECT "url", time_window FROM access_log_10s
ORDER BY
time_window;
-- use hll_count to query the approximate data in access_log_10s
-- SQLNESS SORT_RESULT 3 1
SELECT "url", time_window, hll_count(state) FROM access_log_10s
ORDER BY
time_window;
-- further, we can aggregate 10 seconds of data to every minute, by using hll_merge to merge 10 seconds of hyperloglog state
-- SQLNESS SORT_RESULT 3 1
SELECT
"url",
date_bin('1 minute'::INTERVAL, time_window) AS time_window_1m,
hll_count(hll_merge(state)) as uv_per_min
FROM
access_log_10s
GROUP BY
"url",
time_window_1m
ORDER BY
time_window_1m;
DROP FLOW calc_access_log_10s;
DROP TABLE access_log_10s;
DROP TABLE access_log;
CREATE TABLE percentile_base (
"id" INT PRIMARY KEY,
"value" DOUBLE,
ts timestamp(0) time index
);
CREATE TABLE percentile_5s (
"percentile_state" BINARY,
time_window timestamp(0) time index
);
CREATE FLOW calc_percentile_5s SINK TO percentile_5s
AS
SELECT
uddsketch_state(128, 0.01, "value") AS "value",
date_bin('5 seconds'::INTERVAL, ts) AS time_window
FROM
percentile_base
WHERE
"value" > 0 AND "value" < 70
GROUP BY
time_window;
INSERT INTO percentile_base ("id", "value", ts) VALUES
(1, 10.0, 1),
(2, 20.0, 2),
(3, 30.0, 3),
(4, 40.0, 4),
(5, 50.0, 5),
(6, 60.0, 6),
(7, 70.0, 7),
(8, 80.0, 8),
(9, 90.0, 9),
(10, 100.0, 10);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_percentile_5s');
SELECT
time_window,
uddsketch_calc(0.99, `percentile_state`) AS p99
FROM
percentile_5s
ORDER BY
time_window;
DROP FLOW calc_percentile_5s;
DROP TABLE percentile_5s;
DROP TABLE percentile_base;
CREATE TABLE percentile_base (
"id" INT PRIMARY KEY,
"value" DOUBLE,
ts timestamp(0) time index
);
CREATE TABLE percentile_5s (
"percentile_state" BINARY,
time_window timestamp(0) time index
);
CREATE TABLE percentile_10s (
"percentile_state" BINARY,
time_window timestamp(0) time index
);
CREATE FLOW calc_percentile_5s SINK TO percentile_5s
AS
SELECT
uddsketch_state(128, 0.01, CASE WHEN "value" > 0 AND "value" < 70 THEN "value" ELSE NULL END) AS "value",
date_bin('5 seconds'::INTERVAL, ts) AS time_window
FROM
percentile_base
GROUP BY
time_window;
CREATE FLOW calc_percentile_10s SINK TO percentile_10s
AS
SELECT
uddsketch_merge(128, 0.01, percentile_state),
date_bin('10 seconds'::INTERVAL, time_window) AS time_window
FROM
percentile_5s
GROUP BY
date_bin('10 seconds'::INTERVAL, time_window);
INSERT INTO percentile_base ("id", "value", ts) VALUES
(1, 10.0, 1),
(2, 20.0, 2),
(3, 30.0, 3),
(4, 40.0, 4),
(5, 50.0, 5),
(6, 60.0, 6),
(7, 70.0, 7),
(8, 80.0, 8),
(9, 90.0, 9),
(10, 100.0, 10);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_percentile_5s');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_percentile_10s');
SELECT
time_window,
uddsketch_calc(0.99, percentile_state) AS p99
FROM
percentile_5s
ORDER BY
time_window;
SELECT
time_window,
uddsketch_calc(0.99, percentile_state) AS p99
FROM
percentile_10s
ORDER BY
time_window;
DROP FLOW calc_percentile_5s;
DROP FLOW calc_percentile_10s;
DROP TABLE percentile_5s;
DROP TABLE percentile_10s;
DROP TABLE percentile_base;

View File

@@ -263,6 +263,15 @@ SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORM
-- makesure after recover should be the same
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
+---------------------+---------------+-------------------------------------------------------------+------------------------------------+

View File

@@ -108,6 +108,9 @@ SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORM
-- makesure after recover should be the same
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';

Some files were not shown because too many files have changed in this diff Show More