diff --git a/.coderabbit.yaml b/.coderabbit.yaml
deleted file mode 100644
index 01bc346444..0000000000
--- a/.coderabbit.yaml
+++ /dev/null
@@ -1,15 +0,0 @@
-# yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json
-language: "en-US"
-early_access: false
-reviews:
- profile: "chill"
- request_changes_workflow: false
- high_level_summary: true
- poem: true
- review_status: true
- collapse_walkthrough: false
- auto_review:
- enabled: false
- drafts: false
-chat:
- auto_reply: true
diff --git a/.github/scripts/create-version.sh b/.github/scripts/create-version.sh
index 1de37df190..0e8218ba01 100755
--- a/.github/scripts/create-version.sh
+++ b/.github/scripts/create-version.sh
@@ -10,17 +10,17 @@ set -e
function create_version() {
# Read from envrionment variables.
if [ -z "$GITHUB_EVENT_NAME" ]; then
- echo "GITHUB_EVENT_NAME is empty"
+ echo "GITHUB_EVENT_NAME is empty" >&2
exit 1
fi
if [ -z "$NEXT_RELEASE_VERSION" ]; then
- echo "NEXT_RELEASE_VERSION is empty"
- exit 1
+ echo "NEXT_RELEASE_VERSION is empty, use version from Cargo.toml" >&2
+ export NEXT_RELEASE_VERSION=$(grep '^version = ' Cargo.toml | cut -d '"' -f 2 | head -n 1)
fi
if [ -z "$NIGHTLY_RELEASE_PREFIX" ]; then
- echo "NIGHTLY_RELEASE_PREFIX is empty"
+ echo "NIGHTLY_RELEASE_PREFIX is empty" >&2
exit 1
fi
@@ -35,7 +35,7 @@ function create_version() {
# It will be like 'dev-2023080819-f0e7216c'.
if [ "$NEXT_RELEASE_VERSION" = dev ]; then
if [ -z "$COMMIT_SHA" ]; then
- echo "COMMIT_SHA is empty in dev build"
+ echo "COMMIT_SHA is empty in dev build" >&2
exit 1
fi
echo "dev-$(date "+%Y%m%d-%s")-$(echo "$COMMIT_SHA" | cut -c1-8)"
@@ -45,7 +45,7 @@ function create_version() {
# Note: Only output 'version=xxx' to stdout when everything is ok, so that it can be used in GitHub Actions Outputs.
if [ "$GITHUB_EVENT_NAME" = push ]; then
if [ -z "$GITHUB_REF_NAME" ]; then
- echo "GITHUB_REF_NAME is empty in push event"
+ echo "GITHUB_REF_NAME is empty in push event" >&2
exit 1
fi
echo "$GITHUB_REF_NAME"
@@ -54,7 +54,7 @@ function create_version() {
elif [ "$GITHUB_EVENT_NAME" = schedule ]; then
echo "$NEXT_RELEASE_VERSION-$NIGHTLY_RELEASE_PREFIX-$(date "+%Y%m%d")"
else
- echo "Unsupported GITHUB_EVENT_NAME: $GITHUB_EVENT_NAME"
+ echo "Unsupported GITHUB_EVENT_NAME: $GITHUB_EVENT_NAME" >&2
exit 1
fi
}
diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml
index fe85a6f2c8..b3c7ee4cdd 100644
--- a/.github/workflows/release.yml
+++ b/.github/workflows/release.yml
@@ -90,8 +90,6 @@ env:
# The scheduled version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-YYYYMMDD', like v0.2.0-nigthly-20230313;
NIGHTLY_RELEASE_PREFIX: nightly
- # Note: The NEXT_RELEASE_VERSION should be modified manually by every formal release.
- NEXT_RELEASE_VERSION: v0.14.0
jobs:
allocate-runners:
@@ -135,7 +133,6 @@ jobs:
env:
GITHUB_EVENT_NAME: ${{ github.event_name }}
GITHUB_REF_NAME: ${{ github.ref_name }}
- NEXT_RELEASE_VERSION: ${{ env.NEXT_RELEASE_VERSION }}
NIGHTLY_RELEASE_PREFIX: ${{ env.NIGHTLY_RELEASE_PREFIX }}
- name: Allocate linux-amd64 runner
diff --git a/Cargo.lock b/Cargo.lock
index 839ceafddc..1603528b22 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -173,9 +173,9 @@ dependencies = [
[[package]]
name = "anyhow"
-version = "1.0.89"
+version = "1.0.98"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6"
+checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
[[package]]
name = "anymap2"
@@ -185,7 +185,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
[[package]]
name = "api"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"common-base",
"common-decimal",
@@ -915,7 +915,7 @@ dependencies = [
[[package]]
name = "auth"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"api",
"async-trait",
@@ -1537,7 +1537,7 @@ dependencies = [
[[package]]
name = "cache"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"catalog",
"common-error",
@@ -1561,7 +1561,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"api",
"arrow 54.2.1",
@@ -1597,7 +1597,7 @@ dependencies = [
"partition",
"paste",
"prometheus",
- "rustc-hash 2.0.0",
+ "rustc-hash 2.1.1",
"serde_json",
"session",
"snafu 0.8.5",
@@ -1619,9 +1619,9 @@ dependencies = [
[[package]]
name = "cc"
-version = "1.1.24"
+version = "1.2.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "812acba72f0a070b003d3697490d2b55b837230ae7c6c6497f05cc2ddbb8d938"
+checksum = "04da6a0d40b948dfc4fa8f5bbf402b0fc1a64a28dbf7d12ffd683550f2c1b63a"
dependencies = [
"jobserver",
"libc",
@@ -1874,7 +1874,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
[[package]]
name = "cli"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"async-trait",
"auth",
@@ -1917,7 +1917,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
- "substrait 0.14.0",
+ "substrait 0.15.0",
"table",
"tempfile",
"tokio",
@@ -1926,7 +1926,7 @@ dependencies = [
[[package]]
name = "client"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"api",
"arc-swap",
@@ -1955,7 +1955,7 @@ dependencies = [
"rand 0.9.0",
"serde_json",
"snafu 0.8.5",
- "substrait 0.14.0",
+ "substrait 0.15.0",
"substrait 0.37.3",
"tokio",
"tokio-stream",
@@ -1996,7 +1996,7 @@ dependencies = [
[[package]]
name = "cmd"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"async-trait",
"auth",
@@ -2056,7 +2056,7 @@ dependencies = [
"similar-asserts",
"snafu 0.8.5",
"store-api",
- "substrait 0.14.0",
+ "substrait 0.15.0",
"table",
"temp-env",
"tempfile",
@@ -2102,7 +2102,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"anymap2",
"async-trait",
@@ -2124,11 +2124,11 @@ dependencies = [
[[package]]
name = "common-catalog"
-version = "0.14.0"
+version = "0.15.0"
[[package]]
name = "common-config"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"common-base",
"common-error",
@@ -2153,7 +2153,7 @@ dependencies = [
[[package]]
name = "common-datasource"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"arrow 54.2.1",
"arrow-schema 54.3.1",
@@ -2190,7 +2190,7 @@ dependencies = [
[[package]]
name = "common-decimal"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"bigdecimal 0.4.8",
"common-error",
@@ -2203,7 +2203,7 @@ dependencies = [
[[package]]
name = "common-error"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"common-macro",
"http 1.1.0",
@@ -2214,7 +2214,7 @@ dependencies = [
[[package]]
name = "common-frontend"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"async-trait",
"common-error",
@@ -2224,7 +2224,7 @@ dependencies = [
[[package]]
name = "common-function"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -2277,7 +2277,7 @@ dependencies = [
[[package]]
name = "common-greptimedb-telemetry"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"async-trait",
"common-runtime",
@@ -2294,7 +2294,7 @@ dependencies = [
[[package]]
name = "common-grpc"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"api",
"arrow-flight",
@@ -2325,7 +2325,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"api",
"common-base",
@@ -2344,7 +2344,7 @@ dependencies = [
[[package]]
name = "common-macro"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"arc-swap",
"common-query",
@@ -2358,7 +2358,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"common-error",
"common-macro",
@@ -2371,7 +2371,7 @@ dependencies = [
[[package]]
name = "common-meta"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"anymap2",
"api",
@@ -2432,7 +2432,7 @@ dependencies = [
[[package]]
name = "common-options"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"common-grpc",
"humantime-serde",
@@ -2441,11 +2441,11 @@ dependencies = [
[[package]]
name = "common-plugins"
-version = "0.14.0"
+version = "0.15.0"
[[package]]
name = "common-pprof"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"common-error",
"common-macro",
@@ -2457,7 +2457,7 @@ dependencies = [
[[package]]
name = "common-procedure"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"async-stream",
"async-trait",
@@ -2484,7 +2484,7 @@ dependencies = [
[[package]]
name = "common-procedure-test"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"async-trait",
"common-procedure",
@@ -2493,7 +2493,7 @@ dependencies = [
[[package]]
name = "common-query"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"api",
"async-trait",
@@ -2510,7 +2510,7 @@ dependencies = [
"futures-util",
"serde",
"snafu 0.8.5",
- "sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e98e6b322426a9d397a71efef17075966223c089)",
+ "sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"sqlparser_derive 0.1.1",
"statrs",
"store-api",
@@ -2519,7 +2519,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"arc-swap",
"common-error",
@@ -2539,7 +2539,7 @@ dependencies = [
[[package]]
name = "common-runtime"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -2569,14 +2569,14 @@ dependencies = [
[[package]]
name = "common-session"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"strum 0.27.1",
]
[[package]]
name = "common-telemetry"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"atty",
"backtrace",
@@ -2604,7 +2604,7 @@ dependencies = [
[[package]]
name = "common-test-util"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"client",
"common-query",
@@ -2616,7 +2616,7 @@ dependencies = [
[[package]]
name = "common-time"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"arrow 54.2.1",
"chrono",
@@ -2634,7 +2634,7 @@ dependencies = [
[[package]]
name = "common-version"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"build-data",
"const_format",
@@ -2644,7 +2644,7 @@ dependencies = [
[[package]]
name = "common-wal"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"common-base",
"common-error",
@@ -2946,9 +2946,9 @@ dependencies = [
[[package]]
name = "crossbeam-channel"
-version = "0.5.13"
+version = "0.5.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2"
+checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2"
dependencies = [
"crossbeam-utils",
]
@@ -3110,14 +3110,14 @@ dependencies = [
[[package]]
name = "data-encoding"
-version = "2.6.0"
+version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2"
+checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476"
[[package]]
name = "datafusion"
version = "45.0.0"
-source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
+source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"arrow-array 54.2.1",
@@ -3168,7 +3168,7 @@ dependencies = [
[[package]]
name = "datafusion-catalog"
version = "45.0.0"
-source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
+source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"async-trait",
@@ -3188,7 +3188,7 @@ dependencies = [
[[package]]
name = "datafusion-catalog-listing"
version = "45.0.0"
-source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
+source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"arrow-schema 54.3.1",
@@ -3211,7 +3211,7 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "45.0.0"
-source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
+source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3236,7 +3236,7 @@ dependencies = [
[[package]]
name = "datafusion-common-runtime"
version = "45.0.0"
-source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
+source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"log",
"tokio",
@@ -3245,12 +3245,12 @@ dependencies = [
[[package]]
name = "datafusion-doc"
version = "45.0.0"
-source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
+source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
[[package]]
name = "datafusion-execution"
version = "45.0.0"
-source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
+source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"dashmap",
@@ -3268,7 +3268,7 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "45.0.0"
-source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
+source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"chrono",
@@ -3288,7 +3288,7 @@ dependencies = [
[[package]]
name = "datafusion-expr-common"
version = "45.0.0"
-source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
+source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"datafusion-common",
@@ -3299,7 +3299,7 @@ dependencies = [
[[package]]
name = "datafusion-functions"
version = "45.0.0"
-source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
+source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"arrow-buffer 54.3.1",
@@ -3328,7 +3328,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-aggregate"
version = "45.0.0"
-source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
+source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3349,7 +3349,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-aggregate-common"
version = "45.0.0"
-source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
+source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3361,7 +3361,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-nested"
version = "45.0.0"
-source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
+source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"arrow-array 54.2.1",
@@ -3383,7 +3383,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-table"
version = "45.0.0"
-source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
+source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"async-trait",
@@ -3398,7 +3398,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-window"
version = "45.0.0"
-source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
+source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"datafusion-common",
"datafusion-doc",
@@ -3414,7 +3414,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-window-common"
version = "45.0.0"
-source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
+source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"datafusion-common",
"datafusion-physical-expr-common",
@@ -3423,7 +3423,7 @@ dependencies = [
[[package]]
name = "datafusion-macros"
version = "45.0.0"
-source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
+source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"datafusion-expr",
"quote",
@@ -3433,7 +3433,7 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "45.0.0"
-source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
+source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"chrono",
@@ -3451,7 +3451,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "45.0.0"
-source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
+source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3474,7 +3474,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr-common"
version = "45.0.0"
-source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
+source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3487,7 +3487,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-optimizer"
version = "45.0.0"
-source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
+source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"arrow-schema 54.3.1",
@@ -3508,7 +3508,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-plan"
version = "45.0.0"
-source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
+source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3538,7 +3538,7 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "45.0.0"
-source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
+source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"arrow-array 54.2.1",
@@ -3556,7 +3556,7 @@ dependencies = [
[[package]]
name = "datafusion-substrait"
version = "45.0.0"
-source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
+source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"async-recursion",
"async-trait",
@@ -3572,7 +3572,7 @@ dependencies = [
[[package]]
name = "datanode"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"api",
"arrow-flight",
@@ -3624,7 +3624,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
- "substrait 0.14.0",
+ "substrait 0.15.0",
"table",
"tokio",
"toml 0.8.19",
@@ -3633,7 +3633,7 @@ dependencies = [
[[package]]
name = "datatypes"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"arrow 54.2.1",
"arrow-array 54.2.1",
@@ -3656,7 +3656,7 @@ dependencies = [
"serde",
"serde_json",
"snafu 0.8.5",
- "sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e98e6b322426a9d397a71efef17075966223c089)",
+ "sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"sqlparser_derive 0.1.1",
]
@@ -4259,7 +4259,7 @@ dependencies = [
[[package]]
name = "file-engine"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"api",
"async-trait",
@@ -4382,7 +4382,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
[[package]]
name = "flow"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"api",
"arrow 54.2.1",
@@ -4444,7 +4444,7 @@ dependencies = [
"snafu 0.8.5",
"store-api",
"strum 0.27.1",
- "substrait 0.14.0",
+ "substrait 0.15.0",
"table",
"tokio",
"tonic 0.12.3",
@@ -4499,7 +4499,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]]
name = "frontend"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"api",
"arc-swap",
@@ -4553,10 +4553,10 @@ dependencies = [
"session",
"snafu 0.8.5",
"sql",
- "sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e98e6b322426a9d397a71efef17075966223c089)",
+ "sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"store-api",
"strfmt",
- "substrait 0.14.0",
+ "substrait 0.15.0",
"table",
"tokio",
"toml 0.8.19",
@@ -5795,7 +5795,7 @@ dependencies = [
[[package]]
name = "index"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"async-trait",
"asynchronous-codec",
@@ -6599,13 +6599,13 @@ dependencies = [
[[package]]
name = "log"
-version = "0.4.22"
+version = "0.4.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
+checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94"
[[package]]
name = "log-query"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"chrono",
"common-error",
@@ -6617,7 +6617,7 @@ dependencies = [
[[package]]
name = "log-store"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"async-stream",
"async-trait",
@@ -6911,7 +6911,7 @@ dependencies = [
[[package]]
name = "meta-client"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"api",
"async-trait",
@@ -6939,7 +6939,7 @@ dependencies = [
[[package]]
name = "meta-srv"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"api",
"async-trait",
@@ -7029,7 +7029,7 @@ dependencies = [
[[package]]
name = "metric-engine"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"api",
"aquamarine",
@@ -7118,7 +7118,7 @@ dependencies = [
[[package]]
name = "mito2"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"api",
"aquamarine",
@@ -7780,7 +7780,7 @@ version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af1844ef2428cc3e1cb900be36181049ef3d3193c63e43026cfe202983b27a56"
dependencies = [
- "proc-macro-crate 1.3.1",
+ "proc-macro-crate 3.2.0",
"proc-macro2",
"quote",
"syn 2.0.100",
@@ -7824,7 +7824,7 @@ dependencies = [
[[package]]
name = "object-store"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"anyhow",
"bytes",
@@ -8119,7 +8119,7 @@ dependencies = [
[[package]]
name = "operator"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -8166,9 +8166,9 @@ dependencies = [
"session",
"snafu 0.8.5",
"sql",
- "sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e98e6b322426a9d397a71efef17075966223c089)",
+ "sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"store-api",
- "substrait 0.14.0",
+ "substrait 0.15.0",
"table",
"tokio",
"tokio-util",
@@ -8423,7 +8423,7 @@ dependencies = [
[[package]]
name = "partition"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"api",
"async-trait",
@@ -8443,7 +8443,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"sql",
- "sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e98e6b322426a9d397a71efef17075966223c089)",
+ "sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"store-api",
"table",
]
@@ -8705,7 +8705,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipeline"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -8847,7 +8847,7 @@ dependencies = [
[[package]]
name = "plugins"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"auth",
"clap 4.5.19",
@@ -9127,7 +9127,7 @@ dependencies = [
[[package]]
name = "promql"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"ahash 0.8.11",
"async-trait",
@@ -9373,7 +9373,7 @@ dependencies = [
[[package]]
name = "puffin"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"async-compression 0.4.13",
"async-trait",
@@ -9414,7 +9414,7 @@ dependencies = [
[[package]]
name = "query"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -9477,10 +9477,10 @@ dependencies = [
"session",
"snafu 0.8.5",
"sql",
- "sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e98e6b322426a9d397a71efef17075966223c089)",
+ "sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"statrs",
"store-api",
- "substrait 0.14.0",
+ "substrait 0.15.0",
"table",
"tokio",
"tokio-stream",
@@ -9527,7 +9527,7 @@ dependencies = [
"pin-project-lite",
"quinn-proto",
"quinn-udp",
- "rustc-hash 2.0.0",
+ "rustc-hash 2.1.1",
"rustls",
"socket2",
"thiserror 1.0.64",
@@ -9544,7 +9544,7 @@ dependencies = [
"bytes",
"rand 0.8.5",
"ring",
- "rustc-hash 2.0.0",
+ "rustc-hash 2.1.1",
"rustls",
"slab",
"thiserror 1.0.64",
@@ -9821,9 +9821,9 @@ dependencies = [
[[package]]
name = "regex"
-version = "1.11.0"
+version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "38200e5ee88914975b69f657f0801b6f6dccafd44fd9326302a4aaeecfacb1d8"
+checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191"
dependencies = [
"aho-corasick",
"memchr",
@@ -10005,15 +10005,14 @@ dependencies = [
[[package]]
name = "ring"
-version = "0.17.8"
+version = "0.17.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d"
+checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7"
dependencies = [
"cc",
"cfg-if",
"getrandom 0.2.15",
"libc",
- "spin",
"untrusted",
"windows-sys 0.52.0",
]
@@ -10334,9 +10333,9 @@ checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc-hash"
-version = "2.0.0"
+version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "583034fd73374156e66797ed8e5b0d5690409c9226b22d87cb7f19821c05d152"
+checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
[[package]]
name = "rustc_version"
@@ -10831,7 +10830,7 @@ dependencies = [
[[package]]
name = "servers"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -10951,7 +10950,7 @@ dependencies = [
[[package]]
name = "session"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"api",
"arc-swap",
@@ -11159,9 +11158,9 @@ dependencies = [
[[package]]
name = "smallbitvec"
-version = "2.5.3"
+version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fcc3fc564a4b53fd1e8589628efafe57602d91bde78be18186b5f61e8faea470"
+checksum = "d31d263dd118560e1a492922182ab6ca6dc1d03a3bf54e7699993f31a4150e3f"
[[package]]
name = "smallvec"
@@ -11276,7 +11275,7 @@ dependencies = [
[[package]]
name = "sql"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"api",
"chrono",
@@ -11304,7 +11303,7 @@ dependencies = [
"serde",
"serde_json",
"snafu 0.8.5",
- "sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e98e6b322426a9d397a71efef17075966223c089)",
+ "sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"sqlparser_derive 0.1.1",
"store-api",
"table",
@@ -11331,7 +11330,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -11373,7 +11372,7 @@ dependencies = [
[[package]]
name = "sqlparser"
version = "0.54.0"
-source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e98e6b322426a9d397a71efef17075966223c089#e98e6b322426a9d397a71efef17075966223c089"
+source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e#0cf6c04490d59435ee965edd2078e8855bd8471e"
dependencies = [
"lazy_static",
"log",
@@ -11381,7 +11380,7 @@ dependencies = [
"regex",
"serde",
"sqlparser 0.54.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "sqlparser_derive 0.3.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e98e6b322426a9d397a71efef17075966223c089)",
+ "sqlparser_derive 0.3.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
]
[[package]]
@@ -11409,7 +11408,7 @@ dependencies = [
[[package]]
name = "sqlparser_derive"
version = "0.3.0"
-source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e98e6b322426a9d397a71efef17075966223c089#e98e6b322426a9d397a71efef17075966223c089"
+source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e#0cf6c04490d59435ee965edd2078e8855bd8471e"
dependencies = [
"proc-macro2",
"quote",
@@ -11650,7 +11649,7 @@ dependencies = [
[[package]]
name = "store-api"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"api",
"aquamarine",
@@ -11799,7 +11798,7 @@ dependencies = [
[[package]]
name = "substrait"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"async-trait",
"bytes",
@@ -11979,7 +11978,7 @@ dependencies = [
[[package]]
name = "table"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"api",
"async-trait",
@@ -12230,7 +12229,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]]
name = "tests-fuzz"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"arbitrary",
"async-trait",
@@ -12264,7 +12263,7 @@ dependencies = [
"serde_yaml",
"snafu 0.8.5",
"sql",
- "sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e98e6b322426a9d397a71efef17075966223c089)",
+ "sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"sqlx",
"store-api",
"strum 0.27.1",
@@ -12274,7 +12273,7 @@ dependencies = [
[[package]]
name = "tests-integration"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"api",
"arrow-flight",
@@ -12341,7 +12340,7 @@ dependencies = [
"sql",
"sqlx",
"store-api",
- "substrait 0.14.0",
+ "substrait 0.15.0",
"table",
"tempfile",
"time",
diff --git a/Cargo.toml b/Cargo.toml
index 92dba96d00..b0a049bbd1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -68,15 +68,16 @@ members = [
resolver = "2"
[workspace.package]
-version = "0.14.0"
+version = "0.15.0"
edition = "2021"
license = "Apache-2.0"
[workspace.lints]
-clippy.print_stdout = "warn"
-clippy.print_stderr = "warn"
clippy.dbg_macro = "warn"
clippy.implicit_clone = "warn"
+clippy.result_large_err = "allow"
+clippy.large_enum_variant = "allow"
+clippy.doc_overindented_list_items = "allow"
rust.unknown_lints = "deny"
rust.unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
@@ -112,15 +113,15 @@ clap = { version = "4.4", features = ["derive"] }
config = "0.13.0"
crossbeam-utils = "0.8"
dashmap = "6.1"
-datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" }
-datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" }
-datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" }
-datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" }
-datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" }
-datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" }
-datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" }
-datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" }
-datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" }
+datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
+datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
+datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
+datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
+datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
+datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
+datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
+datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
+datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
deadpool = "0.12"
deadpool-postgres = "0.14"
derive_builder = "0.20"
@@ -191,7 +192,7 @@ simd-json = "0.15"
similar-asserts = "1.6.0"
smallvec = { version = "1", features = ["serde"] }
snafu = "0.8"
-sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "e98e6b322426a9d397a71efef17075966223c089", features = [
+sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "0cf6c04490d59435ee965edd2078e8855bd8471e", features = [
"visitor",
"serde",
] } # branch = "v0.54.x"
diff --git a/config/config.md b/config/config.md
index f34a41d861..f3230190c9 100644
--- a/config/config.md
+++ b/config/config.md
@@ -319,6 +319,7 @@
| `selector` | String | `round_robin` | Datanode selector type.
- `round_robin` (default value)
- `lease_based`
- `load_based`
For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". |
| `use_memory_store` | Bool | `false` | Store data in memory. |
| `enable_region_failover` | Bool | `false` | Whether to enable region failover.
This feature is only available on GreptimeDB running on cluster mode and
- Using Remote WAL
- Using shared storage (e.g., s3). |
+| `allow_region_failover_on_local_wal` | Bool | `false` | Whether to allow region failover on local WAL.
**This option is not recommended to be set to true, because it may lead to data loss during failover.** |
| `node_max_idle_time` | String | `24hours` | Max allowed idle time before removing node info from metasrv memory. |
| `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. Enabled by default. |
| `runtime` | -- | -- | The runtime options. |
diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml
index 89c92352b2..0e7f9b74f0 100644
--- a/config/metasrv.example.toml
+++ b/config/metasrv.example.toml
@@ -50,6 +50,10 @@ use_memory_store = false
## - Using shared storage (e.g., s3).
enable_region_failover = false
+## Whether to allow region failover on local WAL.
+## **This option is not recommended to be set to true, because it may lead to data loss during failover.**
+allow_region_failover_on_local_wal = false
+
## Max allowed idle time before removing node info from metasrv memory.
node_max_idle_time = "24hours"
diff --git a/flake.lock b/flake.lock
index cfea27d34b..f2b2521130 100644
--- a/flake.lock
+++ b/flake.lock
@@ -8,11 +8,11 @@
"rust-analyzer-src": "rust-analyzer-src"
},
"locked": {
- "lastModified": 1737613896,
- "narHash": "sha256-ldqXIglq74C7yKMFUzrS9xMT/EVs26vZpOD68Sh7OcU=",
+ "lastModified": 1742452566,
+ "narHash": "sha256-sVuLDQ2UIWfXUBbctzrZrXM2X05YjX08K7XHMztt36E=",
"owner": "nix-community",
"repo": "fenix",
- "rev": "303a062fdd8e89f233db05868468975d17855d80",
+ "rev": "7d9ba794daf5e8cc7ee728859bc688d8e26d5f06",
"type": "github"
},
"original": {
@@ -41,11 +41,11 @@
},
"nixpkgs": {
"locked": {
- "lastModified": 1737569578,
- "narHash": "sha256-6qY0pk2QmUtBT9Mywdvif0i/CLVgpCjMUn6g9vB+f3M=",
+ "lastModified": 1743576891,
+ "narHash": "sha256-vXiKURtntURybE6FMNFAVpRPr8+e8KoLPrYs9TGuAKc=",
"owner": "NixOS",
"repo": "nixpkgs",
- "rev": "47addd76727f42d351590c905d9d1905ca895b82",
+ "rev": "44a69ed688786e98a101f02b712c313f1ade37ab",
"type": "github"
},
"original": {
@@ -65,11 +65,11 @@
"rust-analyzer-src": {
"flake": false,
"locked": {
- "lastModified": 1737581772,
- "narHash": "sha256-t1P2Pe3FAX9TlJsCZbmJ3wn+C4qr6aSMypAOu8WNsN0=",
+ "lastModified": 1742296961,
+ "narHash": "sha256-gCpvEQOrugHWLimD1wTFOJHagnSEP6VYBDspq96Idu0=",
"owner": "rust-lang",
"repo": "rust-analyzer",
- "rev": "582af7ee9c8d84f5d534272fc7de9f292bd849be",
+ "rev": "15d87419f1a123d8f888d608129c3ce3ff8f13d4",
"type": "github"
},
"original": {
diff --git a/flake.nix b/flake.nix
index a6d9fbc0df..225f631721 100644
--- a/flake.nix
+++ b/flake.nix
@@ -21,7 +21,7 @@
lib = nixpkgs.lib;
rustToolchain = fenix.packages.${system}.fromToolchainName {
name = (lib.importTOML ./rust-toolchain.toml).toolchain.channel;
- sha256 = "sha256-f/CVA1EC61EWbh0SjaRNhLL0Ypx2ObupbzigZp8NmL4=";
+ sha256 = "sha256-i0Sh/ZFFsHlZ3oFZFc24qdk6Cd8Do8OPU4HJQsrKOeM=";
};
in
{
diff --git a/rust-toolchain.toml b/rust-toolchain.toml
index eb2546003b..5d547223f2 100644
--- a/rust-toolchain.toml
+++ b/rust-toolchain.toml
@@ -1,2 +1,2 @@
[toolchain]
-channel = "nightly-2024-12-25"
+channel = "nightly-2025-04-15"
diff --git a/src/catalog/src/system_schema/pg_catalog/pg_namespace/oid_map.rs b/src/catalog/src/system_schema/pg_catalog/pg_namespace/oid_map.rs
index edbdac25c7..a2165d731c 100644
--- a/src/catalog/src/system_schema/pg_catalog/pg_namespace/oid_map.rs
+++ b/src/catalog/src/system_schema/pg_catalog/pg_namespace/oid_map.rs
@@ -84,12 +84,6 @@ mod tests {
let key1 = "3178510";
let key2 = "4215648";
- // have collision
- assert_eq!(
- oid_map.hasher.hash_one(key1) as u32,
- oid_map.hasher.hash_one(key2) as u32
- );
-
// insert them into oid_map
let oid1 = oid_map.get_oid(key1);
let oid2 = oid_map.get_oid(key2);
diff --git a/src/common/function/src/scalars/matches_term.rs b/src/common/function/src/scalars/matches_term.rs
index c99c5ca572..54cf556e85 100644
--- a/src/common/function/src/scalars/matches_term.rs
+++ b/src/common/function/src/scalars/matches_term.rs
@@ -12,8 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+use std::fmt;
+use std::iter::repeat_n;
use std::sync::Arc;
-use std::{fmt, iter};
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::Volatility;
@@ -126,9 +127,10 @@ impl Function for MatchesTermFunction {
let term = term_column.get_ref(0).as_string().unwrap();
match term {
None => {
- return Ok(Arc::new(BooleanVector::from_iter(
- iter::repeat(None).take(text_column.len()),
- )));
+ return Ok(Arc::new(BooleanVector::from_iter(repeat_n(
+ None,
+ text_column.len(),
+ ))));
}
Some(term) => Some(MatchesTermFinder::new(term)),
}
@@ -217,7 +219,7 @@ impl MatchesTermFinder {
}
let mut pos = 0;
- while let Some(found_pos) = self.finder.find(text[pos..].as_bytes()) {
+ while let Some(found_pos) = self.finder.find(&text.as_bytes()[pos..]) {
let actual_pos = pos + found_pos;
let prev_ok = self.starts_with_non_alnum
diff --git a/src/common/function/src/scalars/math/rate.rs b/src/common/function/src/scalars/math/rate.rs
index e296fb9496..cbe4c92550 100644
--- a/src/common/function/src/scalars/math/rate.rs
+++ b/src/common/function/src/scalars/math/rate.rs
@@ -37,7 +37,7 @@ impl fmt::Display for RateFunction {
impl Function for RateFunction {
fn name(&self) -> &str {
- "prom_rate"
+ "rate"
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result {
@@ -82,7 +82,7 @@ mod tests {
#[test]
fn test_rate_function() {
let rate = RateFunction;
- assert_eq!("prom_rate", rate.name());
+ assert_eq!("rate", rate.name());
assert_eq!(
ConcreteDataType::float64_datatype(),
rate.return_type(&[]).unwrap()
diff --git a/src/common/function/src/scalars/uddsketch_calc.rs b/src/common/function/src/scalars/uddsketch_calc.rs
index 5c0beb4fec..f429766eb7 100644
--- a/src/common/function/src/scalars/uddsketch_calc.rs
+++ b/src/common/function/src/scalars/uddsketch_calc.rs
@@ -115,6 +115,13 @@ impl Function for UddSketchCalcFunction {
}
};
+ // Check if the sketch is empty, if so, return null
+ // This is important to avoid panics when calling estimate_quantile on an empty sketch
+ // In practice, this will happen if input is all null
+ if sketch.bucket_iter().count() == 0 {
+ builder.push_null();
+ continue;
+ }
// Compute the estimated quantile from the sketch
let result = sketch.estimate_quantile(perc);
builder.push(Some(result));
diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs
index b1cc18d5e4..7bfbd78f9c 100644
--- a/src/common/meta/src/lib.rs
+++ b/src/common/meta/src/lib.rs
@@ -15,8 +15,6 @@
#![feature(assert_matches)]
#![feature(btree_extract_if)]
#![feature(let_chains)]
-#![feature(extract_if)]
-#![feature(hash_extract_if)]
pub mod cache;
pub mod cache_invalidator;
diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs
index 7ddd104c61..2386ca73a7 100644
--- a/src/common/meta/src/rpc/router.rs
+++ b/src/common/meta/src/rpc/router.rs
@@ -176,15 +176,12 @@ impl TableRoute {
})?
.into();
- let leader_peer = peers
- .get(region_route.leader_peer_index as usize)
- .cloned()
- .map(Into::into);
+ let leader_peer = peers.get(region_route.leader_peer_index as usize).cloned();
let follower_peers = region_route
.follower_peer_indexes
.into_iter()
- .filter_map(|x| peers.get(x as usize).cloned().map(Into::into))
+ .filter_map(|x| peers.get(x as usize).cloned())
.collect::>();
region_routes.push(RegionRoute {
diff --git a/src/common/query/src/logical_plan/accumulator.rs b/src/common/query/src/logical_plan/accumulator.rs
index 32f1b4587c..a9c499d323 100644
--- a/src/common/query/src/logical_plan/accumulator.rs
+++ b/src/common/query/src/logical_plan/accumulator.rs
@@ -24,7 +24,7 @@ use datatypes::prelude::*;
use datatypes::vectors::{Helper as VectorHelper, VectorRef};
use snafu::ResultExt;
-use crate::error::{self, Error, FromScalarValueSnafu, IntoVectorSnafu, Result};
+use crate::error::{self, FromScalarValueSnafu, IntoVectorSnafu, Result};
use crate::prelude::*;
pub type AggregateFunctionCreatorRef = Arc;
@@ -166,8 +166,7 @@ impl DfAccumulator for DfAccumulatorAdaptor {
let output_type = self.creator.output_type()?;
let scalar_value = value
.try_to_scalar_value(&output_type)
- .context(error::ToScalarValueSnafu)
- .map_err(Error::from)?;
+ .context(error::ToScalarValueSnafu)?;
Ok(scalar_value)
}
diff --git a/src/datatypes/src/schema/constraint.rs b/src/datatypes/src/schema/constraint.rs
index 1a2128c200..560500810f 100644
--- a/src/datatypes/src/schema/constraint.rs
+++ b/src/datatypes/src/schema/constraint.rs
@@ -253,9 +253,10 @@ fn create_current_timestamp_vector(
data_type: &ConcreteDataType,
num_rows: usize,
) -> Result {
- let current_timestamp_vector = TimestampMillisecondVector::from_values(
- std::iter::repeat(util::current_time_millis()).take(num_rows),
- );
+ let current_timestamp_vector = TimestampMillisecondVector::from_values(std::iter::repeat_n(
+ util::current_time_millis(),
+ num_rows,
+ ));
if data_type.is_timestamp() {
current_timestamp_vector.cast(data_type)
} else {
diff --git a/src/datatypes/src/vectors/constant.rs b/src/datatypes/src/vectors/constant.rs
index 66587cf1d7..3ccade1392 100644
--- a/src/datatypes/src/vectors/constant.rs
+++ b/src/datatypes/src/vectors/constant.rs
@@ -198,8 +198,7 @@ impl fmt::Debug for ConstantVector {
impl Serializable for ConstantVector {
fn serialize_to_json(&self) -> Result> {
- std::iter::repeat(self.get(0))
- .take(self.len())
+ std::iter::repeat_n(self.get(0), self.len())
.map(serde_json::Value::try_from)
.collect::>()
.context(SerializeSnafu)
diff --git a/src/datatypes/src/vectors/decimal.rs b/src/datatypes/src/vectors/decimal.rs
index cce26e3e3e..e446b36de3 100644
--- a/src/datatypes/src/vectors/decimal.rs
+++ b/src/datatypes/src/vectors/decimal.rs
@@ -412,7 +412,7 @@ pub(crate) fn replicate_decimal128(
// Safety: std::iter::Repeat and std::iter::Take implement TrustedLen.
builder
.mutable_array
- .append_trusted_len_iter(std::iter::repeat(data).take(repeat_times));
+ .append_trusted_len_iter(std::iter::repeat_n(data, repeat_times));
}
}
None => {
diff --git a/src/datatypes/src/vectors/dictionary.rs b/src/datatypes/src/vectors/dictionary.rs
index e6831d2ed7..07994d13bd 100644
--- a/src/datatypes/src/vectors/dictionary.rs
+++ b/src/datatypes/src/vectors/dictionary.rs
@@ -16,8 +16,8 @@ use std::any::Any;
use std::sync::Arc;
use arrow::array::Array;
-use arrow::datatypes::Int32Type;
-use arrow_array::{ArrayRef, DictionaryArray, Int32Array};
+use arrow::datatypes::Int64Type;
+use arrow_array::{ArrayRef, DictionaryArray, Int64Array};
use serde_json::Value as JsonValue;
use snafu::ResultExt;
@@ -32,7 +32,7 @@ use crate::vectors::{self, Helper, Validity, Vector, VectorRef};
/// Vector of dictionaries, basically backed by Arrow's `DictionaryArray`.
#[derive(Debug, PartialEq)]
pub struct DictionaryVector {
- array: DictionaryArray,
+ array: DictionaryArray,
/// The datatype of the items in the dictionary.
item_type: ConcreteDataType,
/// The vector of items in the dictionary.
@@ -41,7 +41,7 @@ pub struct DictionaryVector {
impl DictionaryVector {
/// Create a new instance of `DictionaryVector` from a dictionary array and item type
- pub fn new(array: DictionaryArray, item_type: ConcreteDataType) -> Result {
+ pub fn new(array: DictionaryArray, item_type: ConcreteDataType) -> Result {
let item_vector = Helper::try_into_vector(array.values())?;
Ok(Self {
@@ -52,12 +52,12 @@ impl DictionaryVector {
}
/// Returns the underlying Arrow dictionary array
- pub fn array(&self) -> &DictionaryArray {
+ pub fn array(&self) -> &DictionaryArray {
&self.array
}
/// Returns the keys array of this dictionary
- pub fn keys(&self) -> &arrow_array::PrimitiveArray {
+ pub fn keys(&self) -> &arrow_array::PrimitiveArray {
self.array.keys()
}
@@ -74,7 +74,7 @@ impl DictionaryVector {
impl Vector for DictionaryVector {
fn data_type(&self) -> ConcreteDataType {
ConcreteDataType::Dictionary(DictionaryType::new(
- ConcreteDataType::int32_datatype(),
+ ConcreteDataType::int64_datatype(),
self.item_type.clone(),
))
}
@@ -163,10 +163,10 @@ impl Serializable for DictionaryVector {
}
}
-impl TryFrom> for DictionaryVector {
+impl TryFrom> for DictionaryVector {
type Error = crate::error::Error;
- fn try_from(array: DictionaryArray) -> Result {
+ fn try_from(array: DictionaryArray) -> Result {
let item_type = ConcreteDataType::from_arrow_type(array.values().data_type());
let item_vector = Helper::try_into_vector(array.values())?;
@@ -243,7 +243,7 @@ impl VectorOp for DictionaryVector {
previous_offset = offset;
}
- let new_keys = Int32Array::from(replicated_keys);
+ let new_keys = Int64Array::from(replicated_keys);
let new_array = DictionaryArray::try_new(new_keys, self.values().clone())
.expect("Failed to create replicated dictionary array");
@@ -261,7 +261,7 @@ impl VectorOp for DictionaryVector {
let filtered_key_array = filtered_key_vector.to_arrow_array();
let filtered_key_array = filtered_key_array
.as_any()
- .downcast_ref::()
+ .downcast_ref::()
.unwrap();
let new_array = DictionaryArray::try_new(filtered_key_array.clone(), self.values().clone())
@@ -291,7 +291,7 @@ impl VectorOp for DictionaryVector {
let key_vector = Helper::try_into_vector(&key_array)?;
let new_key_vector = key_vector.take(indices)?;
let new_key_array = new_key_vector.to_arrow_array();
- let new_key_array = new_key_array.as_any().downcast_ref::().unwrap();
+ let new_key_array = new_key_array.as_any().downcast_ref::().unwrap();
let new_array = DictionaryArray::try_new(new_key_array.clone(), self.values().clone())
.expect("Failed to create filtered dictionary array");
@@ -318,7 +318,7 @@ mod tests {
// Keys: [0, 1, 2, null, 1, 3]
// Resulting in: ["a", "b", "c", null, "b", "d"]
let values = StringArray::from(vec!["a", "b", "c", "d"]);
- let keys = Int32Array::from(vec![Some(0), Some(1), Some(2), None, Some(1), Some(3)]);
+ let keys = Int64Array::from(vec![Some(0), Some(1), Some(2), None, Some(1), Some(3)]);
let dict_array = DictionaryArray::new(keys, Arc::new(values));
DictionaryVector::try_from(dict_array).unwrap()
}
@@ -404,7 +404,7 @@ mod tests {
assert_eq!(
casted.data_type(),
ConcreteDataType::Dictionary(DictionaryType::new(
- ConcreteDataType::int32_datatype(),
+ ConcreteDataType::int64_datatype(),
ConcreteDataType::string_datatype(),
))
);
diff --git a/src/datatypes/src/vectors/helper.rs b/src/datatypes/src/vectors/helper.rs
index cb8c8972a7..4e23d56809 100644
--- a/src/datatypes/src/vectors/helper.rs
+++ b/src/datatypes/src/vectors/helper.rs
@@ -20,7 +20,7 @@ use std::sync::Arc;
use arrow::array::{Array, ArrayRef, StringArray};
use arrow::compute;
use arrow::compute::kernels::comparison;
-use arrow::datatypes::{DataType as ArrowDataType, Int32Type, TimeUnit};
+use arrow::datatypes::{DataType as ArrowDataType, Int64Type, TimeUnit};
use arrow_array::DictionaryArray;
use arrow_schema::IntervalUnit;
use datafusion_common::ScalarValue;
@@ -348,11 +348,11 @@ impl Helper {
ArrowDataType::Decimal128(_, _) => {
Arc::new(Decimal128Vector::try_from_arrow_array(array)?)
}
- ArrowDataType::Dictionary(key, value) if matches!(&**key, ArrowDataType::Int32) => {
+ ArrowDataType::Dictionary(key, value) if matches!(&**key, ArrowDataType::Int64) => {
let array = array
.as_ref()
.as_any()
- .downcast_ref::>()
+ .downcast_ref::>()
.unwrap(); // Safety: the type is guarded by match arm condition
Arc::new(DictionaryVector::new(
array.clone(),
diff --git a/src/datatypes/src/vectors/null.rs b/src/datatypes/src/vectors/null.rs
index 292e2c5e33..e745ee13d6 100644
--- a/src/datatypes/src/vectors/null.rs
+++ b/src/datatypes/src/vectors/null.rs
@@ -120,9 +120,7 @@ impl fmt::Debug for NullVector {
impl Serializable for NullVector {
fn serialize_to_json(&self) -> Result> {
- Ok(std::iter::repeat(serde_json::Value::Null)
- .take(self.len())
- .collect())
+ Ok(std::iter::repeat_n(serde_json::Value::Null, self.len()).collect())
}
}
diff --git a/src/datatypes/src/vectors/primitive.rs b/src/datatypes/src/vectors/primitive.rs
index 7b059e0d07..f3e49183f5 100644
--- a/src/datatypes/src/vectors/primitive.rs
+++ b/src/datatypes/src/vectors/primitive.rs
@@ -388,7 +388,7 @@ pub(crate) fn replicate_primitive(
// Safety: std::iter::Repeat and std::iter::Take implement TrustedLen.
builder
.mutable_array
- .append_trusted_len_iter(std::iter::repeat(data).take(repeat_times));
+ .append_trusted_len_iter(std::iter::repeat_n(data, repeat_times));
}
}
None => {
diff --git a/src/flow/src/batching_mode.rs b/src/flow/src/batching_mode.rs
index 152ad5781c..031c7aad4b 100644
--- a/src/flow/src/batching_mode.rs
+++ b/src/flow/src/batching_mode.rs
@@ -32,3 +32,9 @@ pub const SLOW_QUERY_THRESHOLD: Duration = Duration::from_secs(60);
/// The minimum duration between two queries execution by batching mode task
const MIN_REFRESH_DURATION: Duration = Duration::new(5, 0);
+
+/// Grpc connection timeout
+const GRPC_CONN_TIMEOUT: Duration = Duration::from_secs(5);
+
+/// Grpc max retry number
+const GRPC_MAX_RETRIES: u32 = 3;
diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs
index 2454e86251..9f16ea07fa 100644
--- a/src/flow/src/batching_mode/frontend_client.rs
+++ b/src/flow/src/batching_mode/frontend_client.rs
@@ -25,12 +25,15 @@ use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
use common_meta::peer::Peer;
use common_meta::rpc::store::RangeRequest;
use common_query::Output;
+use common_telemetry::warn;
use meta_client::client::MetaClient;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
-use crate::batching_mode::DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT;
+use crate::batching_mode::{
+ DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, GRPC_CONN_TIMEOUT, GRPC_MAX_RETRIES,
+};
use crate::error::{ExternalSnafu, InvalidRequestSnafu, UnexpectedSnafu};
use crate::Error;
@@ -99,7 +102,9 @@ impl FrontendClient {
Self::Distributed {
meta_client,
chnl_mgr: {
- let cfg = ChannelConfig::new().timeout(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT);
+ let cfg = ChannelConfig::new()
+ .connect_timeout(GRPC_CONN_TIMEOUT)
+ .timeout(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT);
ChannelManager::with_config(cfg)
},
}
@@ -223,12 +228,32 @@ impl FrontendClient {
peer: db.peer.clone(),
});
- db.database
- .handle(req.clone())
- .await
- .with_context(|_| InvalidRequestSnafu {
- context: format!("Failed to handle request: {:?}", req),
- })
+ let mut retry = 0;
+
+ loop {
+ let ret = db.database.handle(req.clone()).await.with_context(|_| {
+ InvalidRequestSnafu {
+ context: format!("Failed to handle request: {:?}", req),
+ }
+ });
+ if let Err(err) = ret {
+ if retry < GRPC_MAX_RETRIES {
+ retry += 1;
+ warn!(
+ "Failed to send request to grpc handle at Peer={:?}, retry = {}, error = {:?}",
+ db.peer, retry, err
+ );
+ continue;
+ } else {
+ common_telemetry::error!(
+ "Failed to send request to grpc handle at Peer={:?} after {} retries, error = {:?}",
+ db.peer, retry, err
+ );
+ return Err(err);
+ }
+ }
+ return ret;
+ }
}
FrontendClient::Standalone { database_client } => {
let ctx = QueryContextBuilder::default()
diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs
index 1547faae11..bb1f296c90 100644
--- a/src/flow/src/batching_mode/task.rs
+++ b/src/flow/src/batching_mode/task.rs
@@ -53,6 +53,7 @@ use crate::batching_mode::utils::{
use crate::batching_mode::{
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, MIN_REFRESH_DURATION, SLOW_QUERY_THRESHOLD,
};
+use crate::df_optimizer::apply_df_optimizer;
use crate::error::{
ConvertColumnSchemaSnafu, DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu,
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
@@ -541,7 +542,10 @@ impl BatchingTask {
.clone()
.rewrite(&mut add_auto_column)
.with_context(|_| DatafusionSnafu {
- context: format!("Failed to rewrite plan {:?}", self.config.plan),
+ context: format!(
+ "Failed to rewrite plan:\n {}\n",
+ self.config.plan
+ ),
})?
.data;
let schema_len = plan.schema().fields().len();
@@ -573,16 +577,19 @@ impl BatchingTask {
let mut add_filter = AddFilterRewriter::new(expr);
let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
- // make a not optimized plan for clearer unparse
+
let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false)
.await?;
- plan.clone()
+ let rewrite = plan
+ .clone()
.rewrite(&mut add_filter)
.and_then(|p| p.data.rewrite(&mut add_auto_column))
.with_context(|_| DatafusionSnafu {
- context: format!("Failed to rewrite plan {plan:?}"),
+ context: format!("Failed to rewrite plan:\n {}\n", plan),
})?
- .data
+ .data;
+ // only apply optimize after complex rewrite is done
+ apply_df_optimizer(rewrite).await?
};
Ok(Some((new_plan, schema_len)))
diff --git a/src/flow/src/batching_mode/time_window.rs b/src/flow/src/batching_mode/time_window.rs
index e6a0d6ad8c..398250fc8b 100644
--- a/src/flow/src/batching_mode/time_window.rs
+++ b/src/flow/src/batching_mode/time_window.rs
@@ -704,6 +704,28 @@ mod test {
),
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')"
),
+ // complex time window index with where
+ (
+ "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE number in (2, 3, 4) GROUP BY time_window;",
+ Timestamp::new(1740394109, TimeUnit::Second),
+ (
+ "ts".to_string(),
+ Some(Timestamp::new(1740394080, TimeUnit::Second)),
+ Some(Timestamp::new(1740394140, TimeUnit::Second)),
+ ),
+ "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE numbers_with_ts.number IN (2, 3, 4) AND ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')"
+ ),
+ // complex time window index with between and
+ (
+ "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE number BETWEEN 2 AND 4 GROUP BY time_window;",
+ Timestamp::new(1740394109, TimeUnit::Second),
+ (
+ "ts".to_string(),
+ Some(Timestamp::new(1740394080, TimeUnit::Second)),
+ Some(Timestamp::new(1740394140, TimeUnit::Second)),
+ ),
+ "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE (numbers_with_ts.number BETWEEN 2 AND 4) AND ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')"
+ ),
// no time index
(
"SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;",
diff --git a/src/flow/src/batching_mode/utils.rs b/src/flow/src/batching_mode/utils.rs
index 7aa6a8b12f..117db03665 100644
--- a/src/flow/src/batching_mode/utils.rs
+++ b/src/flow/src/batching_mode/utils.rs
@@ -342,8 +342,8 @@ impl TreeNodeRewriter for AddAutoColumnRewriter {
}
} else {
return Err(DataFusionError::Plan(format!(
- "Expect table have 0,1 or 2 columns more than query columns, found {} query columns {:?}, {} table columns {:?} at node {:?}",
- query_col_cnt, exprs, table_col_cnt, self.schema.column_schemas(), node
+ "Expect table have 0,1 or 2 columns more than query columns, found {} query columns {:?}, {} table columns {:?}",
+ query_col_cnt, exprs, table_col_cnt, self.schema.column_schemas()
)));
}
@@ -406,7 +406,9 @@ mod test {
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use pretty_assertions::assert_eq;
+ use query::query_engine::DefaultSerializer;
use session::context::QueryContext;
+ use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use super::*;
use crate::test_utils::create_test_query_engine;
@@ -701,4 +703,18 @@ mod test {
);
}
}
+
+ #[tokio::test]
+ async fn test_null_cast() {
+ let query_engine = create_test_query_engine();
+ let ctx = QueryContext::arc();
+ let sql = "SELECT NULL::DOUBLE FROM numbers_with_ts";
+ let plan = sql_to_df_plan(ctx, query_engine.clone(), sql, false)
+ .await
+ .unwrap();
+
+ let _sub_plan = DFLogicalSubstraitConvertor {}
+ .encode(&plan, DefaultSerializer)
+ .unwrap();
+ }
}
diff --git a/src/flow/src/df_optimizer.rs b/src/flow/src/df_optimizer.rs
index d83bb77718..bef5b3ed79 100644
--- a/src/flow/src/df_optimizer.rs
+++ b/src/flow/src/df_optimizer.rs
@@ -25,7 +25,6 @@ use datafusion::config::ConfigOptions;
use datafusion::error::DataFusionError;
use datafusion::functions_aggregate::count::count_udaf;
use datafusion::functions_aggregate::sum::sum_udaf;
-use datafusion::optimizer::analyzer::count_wildcard_rule::CountWildcardRule;
use datafusion::optimizer::analyzer::type_coercion::TypeCoercion;
use datafusion::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
use datafusion::optimizer::optimize_projections::OptimizeProjections;
@@ -42,6 +41,7 @@ use datafusion_expr::{
BinaryExpr, ColumnarValue, Expr, Operator, Projection, ScalarFunctionArgs, ScalarUDFImpl,
Signature, TypeSignature, Volatility,
};
+use query::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
use query::parser::QueryLanguageParser;
use query::query_engine::DefaultSerializer;
use query::QueryEngine;
@@ -61,9 +61,9 @@ pub async fn apply_df_optimizer(
) -> Result {
let cfg = ConfigOptions::new();
let analyzer = Analyzer::with_rules(vec![
- Arc::new(CountWildcardRule::new()),
- Arc::new(AvgExpandRule::new()),
- Arc::new(TumbleExpandRule::new()),
+ Arc::new(CountWildcardToTimeIndexRule),
+ Arc::new(AvgExpandRule),
+ Arc::new(TumbleExpandRule),
Arc::new(CheckGroupByRule::new()),
Arc::new(TypeCoercion::new()),
]);
@@ -128,13 +128,7 @@ pub async fn sql_to_flow_plan(
}
#[derive(Debug)]
-struct AvgExpandRule {}
-
-impl AvgExpandRule {
- pub fn new() -> Self {
- Self {}
- }
-}
+struct AvgExpandRule;
impl AnalyzerRule for AvgExpandRule {
fn analyze(
@@ -331,13 +325,7 @@ impl TreeNodeRewriter for ExpandAvgRewriter<'_> {
/// expand tumble in aggr expr to tumble_start and tumble_end with column name like `window_start`
#[derive(Debug)]
-struct TumbleExpandRule {}
-
-impl TumbleExpandRule {
- pub fn new() -> Self {
- Self {}
- }
-}
+struct TumbleExpandRule;
impl AnalyzerRule for TumbleExpandRule {
fn analyze(
diff --git a/src/index/src/fulltext_index/tokenizer.rs b/src/index/src/fulltext_index/tokenizer.rs
index b00e7fda9c..54aa33edc8 100644
--- a/src/index/src/fulltext_index/tokenizer.rs
+++ b/src/index/src/fulltext_index/tokenizer.rs
@@ -46,7 +46,11 @@ pub struct ChineseTokenizer;
impl Tokenizer for ChineseTokenizer {
fn tokenize<'a>(&self, text: &'a str) -> Vec<&'a str> {
- JIEBA.cut(text, false)
+ if text.is_ascii() {
+ EnglishTokenizer {}.tokenize(text)
+ } else {
+ JIEBA.cut(text, false)
+ }
}
}
diff --git a/src/index/src/inverted_index/create/sort/external_sort.rs b/src/index/src/inverted_index/create/sort/external_sort.rs
index e8f67b7b7b..3b4eaebc5c 100644
--- a/src/index/src/inverted_index/create/sort/external_sort.rs
+++ b/src/index/src/inverted_index/create/sort/external_sort.rs
@@ -481,7 +481,7 @@ mod tests {
let mock_values = dic_values
.iter()
- .flat_map(|(value, size)| iter::repeat(value.clone()).take(*size))
+ .flat_map(|(value, size)| std::iter::repeat_n(value.clone(), *size))
.collect::>();
let sorted_result = sorted_result(&mock_values, segment_row_count);
diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs
index 3b27295301..40e41bb815 100644
--- a/src/meta-srv/src/bootstrap.rs
+++ b/src/meta-srv/src/bootstrap.rs
@@ -66,10 +66,12 @@ use crate::election::postgres::PgElection;
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
use crate::election::CANDIDATE_LEASE_SECS;
use crate::metasrv::builder::MetasrvBuilder;
-use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectorRef};
+use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectTarget, SelectorRef};
+use crate::node_excluder::NodeExcluderRef;
use crate::selector::lease_based::LeaseBasedSelector;
use crate::selector::load_based::LoadBasedSelector;
use crate::selector::round_robin::RoundRobinSelector;
+use crate::selector::weight_compute::RegionNumsBasedWeightCompute;
use crate::selector::SelectorType;
use crate::service::admin;
use crate::{error, Result};
@@ -294,14 +296,25 @@ pub async fn metasrv_builder(
let in_memory = Arc::new(MemoryKvBackend::new()) as ResettableKvBackendRef;
+ let node_excluder = plugins
+ .get::()
+ .unwrap_or_else(|| Arc::new(Vec::new()) as NodeExcluderRef);
let selector = if let Some(selector) = plugins.get::() {
info!("Using selector from plugins");
selector
} else {
let selector = match opts.selector {
- SelectorType::LoadBased => Arc::new(LoadBasedSelector::default()) as SelectorRef,
- SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef,
- SelectorType::RoundRobin => Arc::new(RoundRobinSelector::default()) as SelectorRef,
+ SelectorType::LoadBased => Arc::new(LoadBasedSelector::new(
+ RegionNumsBasedWeightCompute,
+ node_excluder,
+ )) as SelectorRef,
+ SelectorType::LeaseBased => {
+ Arc::new(LeaseBasedSelector::new(node_excluder)) as SelectorRef
+ }
+ SelectorType::RoundRobin => Arc::new(RoundRobinSelector::new(
+ SelectTarget::Datanode,
+ node_excluder,
+ )) as SelectorRef,
};
info!(
"Using selector from options, selector type: {}",
diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs
index ebd3b7b54f..20b9285723 100644
--- a/src/meta-srv/src/lib.rs
+++ b/src/meta-srv/src/lib.rs
@@ -14,7 +14,6 @@
#![feature(result_flattening)]
#![feature(assert_matches)]
-#![feature(extract_if)]
#![feature(hash_set_entry)]
pub mod bootstrap;
diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs
index 34b3cac25e..6c9111dd9c 100644
--- a/src/meta-srv/src/metasrv.rs
+++ b/src/meta-srv/src/metasrv.rs
@@ -111,6 +111,11 @@ pub struct MetasrvOptions {
pub use_memory_store: bool,
/// Whether to enable region failover.
pub enable_region_failover: bool,
+ /// Whether to allow region failover on local WAL.
+ ///
+ /// If it's true, the region failover will be allowed even if the local WAL is used.
+ /// Note that this option is not recommended to be set to true, because it may lead to data loss during failover.
+ pub allow_region_failover_on_local_wal: bool,
/// The HTTP server options.
pub http: HttpOptions,
/// The logging options.
@@ -173,6 +178,7 @@ impl Default for MetasrvOptions {
selector: SelectorType::default(),
use_memory_store: false,
enable_region_failover: false,
+ allow_region_failover_on_local_wal: false,
http: HttpOptions::default(),
logging: LoggingOptions {
dir: format!("{METASRV_HOME}/logs"),
diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs
index ec8f6ef253..0c93e4e4c7 100644
--- a/src/meta-srv/src/metasrv/builder.rs
+++ b/src/meta-srv/src/metasrv/builder.rs
@@ -40,7 +40,8 @@ use common_meta::state_store::KvStateStore;
use common_meta::wal_options_allocator::{build_kafka_client, build_wal_options_allocator};
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
-use snafu::ResultExt;
+use common_telemetry::warn;
+use snafu::{ensure, ResultExt};
use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
@@ -190,7 +191,7 @@ impl MetasrvBuilder {
let meta_peer_client = meta_peer_client
.unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory));
- let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector));
+ let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector::default()));
let pushers = Pushers::default();
let mailbox = build_mailbox(&kv_backend, &pushers);
let procedure_manager = build_procedure_manager(&options, &kv_backend);
@@ -234,13 +235,17 @@ impl MetasrvBuilder {
))
});
+ let flow_selector = Arc::new(RoundRobinSelector::new(
+ SelectTarget::Flownode,
+ Arc::new(Vec::new()),
+ )) as SelectorRef;
+
let flow_metadata_allocator = {
// for now flownode just use round-robin selector
- let flow_selector = RoundRobinSelector::new(SelectTarget::Flownode);
let flow_selector_ctx = selector_ctx.clone();
let peer_allocator = Arc::new(FlowPeerAllocator::new(
flow_selector_ctx,
- Arc::new(flow_selector),
+ flow_selector.clone(),
));
let seq = Arc::new(
SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
@@ -272,18 +277,25 @@ impl MetasrvBuilder {
},
));
let peer_lookup_service = Arc::new(MetaPeerLookupService::new(meta_peer_client.clone()));
+
if !is_remote_wal && options.enable_region_failover {
- return error::UnexpectedSnafu {
- violated: "Region failover is not supported in the local WAL implementation!",
+ ensure!(
+ options.allow_region_failover_on_local_wal,
+ error::UnexpectedSnafu {
+ violated: "Region failover is not supported in the local WAL implementation!
+ If you want to enable region failover for local WAL, please set `allow_region_failover_on_local_wal` to true.",
+ }
+ );
+ if options.allow_region_failover_on_local_wal {
+ warn!("Region failover is force enabled in the local WAL implementation! This may lead to data loss during failover!");
}
- .fail();
}
let (tx, rx) = RegionSupervisor::channel();
let (region_failure_detector_controller, region_supervisor_ticker): (
RegionFailureDetectorControllerRef,
Option>,
- ) = if options.enable_region_failover && is_remote_wal {
+ ) = if options.enable_region_failover {
(
Arc::new(RegionFailureDetectorControl::new(tx.clone())) as _,
Some(Arc::new(RegionSupervisorTicker::new(
@@ -309,7 +321,7 @@ impl MetasrvBuilder {
));
region_migration_manager.try_start()?;
- let region_failover_handler = if options.enable_region_failover && is_remote_wal {
+ let region_failover_handler = if options.enable_region_failover {
let region_supervisor = RegionSupervisor::new(
rx,
options.failure_detector,
@@ -420,7 +432,7 @@ impl MetasrvBuilder {
meta_peer_client: meta_peer_client.clone(),
selector,
// TODO(jeremy): We do not allow configuring the flow selector.
- flow_selector: Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)),
+ flow_selector,
handler_group: RwLock::new(None),
handler_group_builder: Mutex::new(Some(handler_group_builder)),
election,
diff --git a/src/meta-srv/src/metrics.rs b/src/meta-srv/src/metrics.rs
index ffbe986d72..2984a91a1c 100644
--- a/src/meta-srv/src/metrics.rs
+++ b/src/meta-srv/src/metrics.rs
@@ -71,4 +71,13 @@ lazy_static! {
/// The remote WAL prune execute counter.
pub static ref METRIC_META_REMOTE_WAL_PRUNE_EXECUTE: IntCounterVec =
register_int_counter_vec!("greptime_meta_remote_wal_prune_execute", "meta remote wal prune execute", &["topic_name"]).unwrap();
+ /// The migration stage elapsed histogram.
+ pub static ref METRIC_META_REGION_MIGRATION_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
+ "greptime_meta_region_migration_stage_elapsed",
+ "meta region migration stage elapsed",
+ &["stage"],
+ // 0.01 ~ 1000
+ exponential_buckets(0.01, 10.0, 7).unwrap(),
+ )
+ .unwrap();
}
diff --git a/src/meta-srv/src/mocks.rs b/src/meta-srv/src/mocks.rs
index 656ceeb3d9..29ed3a5ae8 100644
--- a/src/meta-srv/src/mocks.rs
+++ b/src/meta-srv/src/mocks.rs
@@ -141,10 +141,7 @@ pub async fn mock(
if let Some(client) = client {
Ok(TokioIo::new(client))
} else {
- Err(std::io::Error::new(
- std::io::ErrorKind::Other,
- "Client already taken",
- ))
+ Err(std::io::Error::other("Client already taken"))
}
}
}),
diff --git a/src/meta-srv/src/node_excluder.rs b/src/meta-srv/src/node_excluder.rs
index f9e892f092..a7bc6e0f69 100644
--- a/src/meta-srv/src/node_excluder.rs
+++ b/src/meta-srv/src/node_excluder.rs
@@ -24,3 +24,9 @@ pub trait NodeExcluder: Send + Sync {
/// Returns the excluded datanode ids.
fn excluded_datanode_ids(&self) -> &Vec;
}
+
+impl NodeExcluder for Vec {
+ fn excluded_datanode_ids(&self) -> &Vec {
+ self
+ }
+}
diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs
index b2f1eed711..43b444d3b1 100644
--- a/src/meta-srv/src/procedure/region_migration.rs
+++ b/src/meta-srv/src/procedure/region_migration.rs
@@ -25,7 +25,7 @@ pub(crate) mod update_metadata;
pub(crate) mod upgrade_candidate_region;
use std::any::Any;
-use std::fmt::Debug;
+use std::fmt::{Debug, Display};
use std::time::Duration;
use common_error::ext::BoxedError;
@@ -43,7 +43,7 @@ use common_procedure::error::{
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status, StringKey};
-use common_telemetry::info;
+use common_telemetry::{error, info};
use manager::RegionMigrationProcedureGuard;
pub use manager::{
RegionMigrationManagerRef, RegionMigrationProcedureTask, RegionMigrationProcedureTracker,
@@ -55,7 +55,10 @@ use tokio::time::Instant;
use self::migration_start::RegionMigrationStart;
use crate::error::{self, Result};
-use crate::metrics::{METRIC_META_REGION_MIGRATION_ERROR, METRIC_META_REGION_MIGRATION_EXECUTE};
+use crate::metrics::{
+ METRIC_META_REGION_MIGRATION_ERROR, METRIC_META_REGION_MIGRATION_EXECUTE,
+ METRIC_META_REGION_MIGRATION_STAGE_ELAPSED,
+};
use crate::service::mailbox::MailboxRef;
/// The default timeout for region migration.
@@ -103,6 +106,82 @@ impl PersistentContext {
}
}
+/// Metrics of region migration.
+#[derive(Debug, Clone, Default)]
+pub struct Metrics {
+ /// Elapsed time of downgrading region and upgrading region.
+ operations_elapsed: Duration,
+ /// Elapsed time of downgrading leader region.
+ downgrade_leader_region_elapsed: Duration,
+ /// Elapsed time of open candidate region.
+ open_candidate_region_elapsed: Duration,
+ /// Elapsed time of upgrade candidate region.
+ upgrade_candidate_region_elapsed: Duration,
+}
+
+impl Display for Metrics {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "operations_elapsed: {:?}, downgrade_leader_region_elapsed: {:?}, open_candidate_region_elapsed: {:?}, upgrade_candidate_region_elapsed: {:?}",
+ self.operations_elapsed,
+ self.downgrade_leader_region_elapsed,
+ self.open_candidate_region_elapsed,
+ self.upgrade_candidate_region_elapsed
+ )
+ }
+}
+
+impl Metrics {
+ /// Updates the elapsed time of downgrading region and upgrading region.
+ pub fn update_operations_elapsed(&mut self, elapsed: Duration) {
+ self.operations_elapsed += elapsed;
+ }
+
+ /// Updates the elapsed time of downgrading leader region.
+ pub fn update_downgrade_leader_region_elapsed(&mut self, elapsed: Duration) {
+ self.downgrade_leader_region_elapsed += elapsed;
+ }
+
+ /// Updates the elapsed time of open candidate region.
+ pub fn update_open_candidate_region_elapsed(&mut self, elapsed: Duration) {
+ self.open_candidate_region_elapsed += elapsed;
+ }
+
+ /// Updates the elapsed time of upgrade candidate region.
+ pub fn update_upgrade_candidate_region_elapsed(&mut self, elapsed: Duration) {
+ self.upgrade_candidate_region_elapsed += elapsed;
+ }
+}
+
+impl Drop for Metrics {
+ fn drop(&mut self) {
+ if !self.operations_elapsed.is_zero() {
+ METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
+ .with_label_values(&["operations"])
+ .observe(self.operations_elapsed.as_secs_f64());
+ }
+
+ if !self.downgrade_leader_region_elapsed.is_zero() {
+ METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
+ .with_label_values(&["downgrade_leader_region"])
+ .observe(self.downgrade_leader_region_elapsed.as_secs_f64());
+ }
+
+ if !self.open_candidate_region_elapsed.is_zero() {
+ METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
+ .with_label_values(&["open_candidate_region"])
+ .observe(self.open_candidate_region_elapsed.as_secs_f64());
+ }
+
+ if !self.upgrade_candidate_region_elapsed.is_zero() {
+ METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
+ .with_label_values(&["upgrade_candidate_region"])
+ .observe(self.upgrade_candidate_region_elapsed.as_secs_f64());
+ }
+ }
+}
+
/// It's shared in each step and available in executing (including retrying).
///
/// It will be dropped if the procedure runner crashes.
@@ -132,8 +211,8 @@ pub struct VolatileContext {
leader_region_last_entry_id: Option,
/// The last_entry_id of leader metadata region (Only used for metric engine).
leader_region_metadata_last_entry_id: Option,
- /// Elapsed time of downgrading region and upgrading region.
- operations_elapsed: Duration,
+ /// Metrics of region migration.
+ metrics: Metrics,
}
impl VolatileContext {
@@ -231,12 +310,35 @@ impl Context {
pub fn next_operation_timeout(&self) -> Option {
self.persistent_ctx
.timeout
- .checked_sub(self.volatile_ctx.operations_elapsed)
+ .checked_sub(self.volatile_ctx.metrics.operations_elapsed)
}
/// Updates operations elapsed.
pub fn update_operations_elapsed(&mut self, instant: Instant) {
- self.volatile_ctx.operations_elapsed += instant.elapsed();
+ self.volatile_ctx
+ .metrics
+ .update_operations_elapsed(instant.elapsed());
+ }
+
+ /// Updates the elapsed time of downgrading leader region.
+ pub fn update_downgrade_leader_region_elapsed(&mut self, instant: Instant) {
+ self.volatile_ctx
+ .metrics
+ .update_downgrade_leader_region_elapsed(instant.elapsed());
+ }
+
+ /// Updates the elapsed time of open candidate region.
+ pub fn update_open_candidate_region_elapsed(&mut self, instant: Instant) {
+ self.volatile_ctx
+ .metrics
+ .update_open_candidate_region_elapsed(instant.elapsed());
+ }
+
+ /// Updates the elapsed time of upgrade candidate region.
+ pub fn update_upgrade_candidate_region_elapsed(&mut self, instant: Instant) {
+ self.volatile_ctx
+ .metrics
+ .update_upgrade_candidate_region_elapsed(instant.elapsed());
}
/// Returns address of meta server.
@@ -550,6 +652,14 @@ impl Procedure for RegionMigrationProcedure {
.inc();
ProcedureError::retry_later(e)
} else {
+ error!(
+ e;
+ "Region migration procedure failed, region_id: {}, from_peer: {}, to_peer: {}, {}",
+ self.context.region_id(),
+ self.context.persistent_ctx.from_peer,
+ self.context.persistent_ctx.to_peer,
+ self.context.volatile_ctx.metrics,
+ );
METRIC_META_REGION_MIGRATION_ERROR
.with_label_values(&[name, "external"])
.inc();
diff --git a/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs
index 94256ba5ec..ba13f7cdea 100644
--- a/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs
+++ b/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs
@@ -46,7 +46,13 @@ impl State for CloseDowngradedRegion {
let region_id = ctx.region_id();
warn!(err; "Failed to close downgraded leader region: {region_id} on datanode {:?}", downgrade_leader_datanode);
}
-
+ info!(
+ "Region migration is finished: region_id: {}, from_peer: {}, to_peer: {}, {}",
+ ctx.region_id(),
+ ctx.persistent_ctx.from_peer,
+ ctx.persistent_ctx.to_peer,
+ ctx.volatile_ctx.metrics,
+ );
Ok((Box::new(RegionMigrationEnd), Status::done()))
}
diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs
index 02b7216fe7..93481adc54 100644
--- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs
+++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs
@@ -54,6 +54,7 @@ impl Default for DowngradeLeaderRegion {
#[typetag::serde]
impl State for DowngradeLeaderRegion {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box, Status)> {
+ let now = Instant::now();
// Ensures the `leader_region_lease_deadline` must exist after recovering.
ctx.volatile_ctx
.set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS));
@@ -77,6 +78,7 @@ impl State for DowngradeLeaderRegion {
}
}
}
+ ctx.update_downgrade_leader_region_elapsed(now);
Ok((
Box::new(UpgradeCandidateRegion::default()),
@@ -348,7 +350,8 @@ mod tests {
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
prepare_table_metadata(&ctx, HashMap::default()).await;
- ctx.volatile_ctx.operations_elapsed = ctx.persistent_ctx.timeout + Duration::from_secs(1);
+ ctx.volatile_ctx.metrics.operations_elapsed =
+ ctx.persistent_ctx.timeout + Duration::from_secs(1);
let err = state.downgrade_region(&mut ctx).await.unwrap_err();
@@ -591,7 +594,8 @@ mod tests {
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
- ctx.volatile_ctx.operations_elapsed = ctx.persistent_ctx.timeout + Duration::from_secs(1);
+ ctx.volatile_ctx.metrics.operations_elapsed =
+ ctx.persistent_ctx.timeout + Duration::from_secs(1);
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
diff --git a/src/meta-srv/src/procedure/region_migration/migration_abort.rs b/src/meta-srv/src/procedure/region_migration/migration_abort.rs
index af56843045..d364f0c8b9 100644
--- a/src/meta-srv/src/procedure/region_migration/migration_abort.rs
+++ b/src/meta-srv/src/procedure/region_migration/migration_abort.rs
@@ -15,6 +15,7 @@
use std::any::Any;
use common_procedure::Status;
+use common_telemetry::warn;
use serde::{Deserialize, Serialize};
use crate::error::{self, Result};
@@ -37,7 +38,15 @@ impl RegionMigrationAbort {
#[async_trait::async_trait]
#[typetag::serde]
impl State for RegionMigrationAbort {
- async fn next(&mut self, _: &mut Context) -> Result<(Box, Status)> {
+ async fn next(&mut self, ctx: &mut Context) -> Result<(Box, Status)> {
+ warn!(
+ "Region migration is aborted: {}, region_id: {}, from_peer: {}, to_peer: {}, {}",
+ self.reason,
+ ctx.region_id(),
+ ctx.persistent_ctx.from_peer,
+ ctx.persistent_ctx.to_peer,
+ ctx.volatile_ctx.metrics,
+ );
error::MigrationAbortSnafu {
reason: &self.reason,
}
diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs
index 6cacf75063..6d1c81d3ed 100644
--- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs
+++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs
@@ -13,7 +13,7 @@
// limitations under the License.
use std::any::Any;
-use std::time::{Duration, Instant};
+use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
@@ -24,6 +24,7 @@ use common_procedure::Status;
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
+use tokio::time::Instant;
use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
@@ -42,7 +43,9 @@ pub struct OpenCandidateRegion;
impl State for OpenCandidateRegion {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box, Status)> {
let instruction = self.build_open_region_instruction(ctx).await?;
+ let now = Instant::now();
self.open_candidate_region(ctx, instruction).await?;
+ ctx.update_open_candidate_region_elapsed(now);
Ok((
Box::new(UpdateMetadata::Downgrade),
diff --git a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs
index 552b9d3863..8f3741dbac 100644
--- a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs
+++ b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs
@@ -54,9 +54,12 @@ impl Default for UpgradeCandidateRegion {
#[typetag::serde]
impl State for UpgradeCandidateRegion {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box, Status)> {
+ let now = Instant::now();
if self.upgrade_region_with_retry(ctx).await {
+ ctx.update_upgrade_candidate_region_elapsed(now);
Ok((Box::new(UpdateMetadata::Upgrade), Status::executing(false)))
} else {
+ ctx.update_upgrade_candidate_region_elapsed(now);
Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)))
}
}
@@ -288,7 +291,8 @@ mod tests {
let persistent_context = new_persistent_context();
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
- ctx.volatile_ctx.operations_elapsed = ctx.persistent_ctx.timeout + Duration::from_secs(1);
+ ctx.volatile_ctx.metrics.operations_elapsed =
+ ctx.persistent_ctx.timeout + Duration::from_secs(1);
let err = state.upgrade_region(&ctx).await.unwrap_err();
@@ -558,7 +562,8 @@ mod tests {
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
- ctx.volatile_ctx.operations_elapsed = ctx.persistent_ctx.timeout + Duration::from_secs(1);
+ ctx.volatile_ctx.metrics.operations_elapsed =
+ ctx.persistent_ctx.timeout + Duration::from_secs(1);
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
diff --git a/src/meta-srv/src/selector.rs b/src/meta-srv/src/selector.rs
index ce166ae05c..96fbda241d 100644
--- a/src/meta-srv/src/selector.rs
+++ b/src/meta-srv/src/selector.rs
@@ -18,7 +18,7 @@ pub mod load_based;
pub mod round_robin;
#[cfg(test)]
pub(crate) mod test_utils;
-mod weight_compute;
+pub mod weight_compute;
pub mod weighted_choose;
use std::collections::HashSet;
diff --git a/src/meta-srv/src/selector/lease_based.rs b/src/meta-srv/src/selector/lease_based.rs
index e60157989d..448c26b08e 100644
--- a/src/meta-srv/src/selector/lease_based.rs
+++ b/src/meta-srv/src/selector/lease_based.rs
@@ -12,17 +12,37 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+use std::collections::HashSet;
+use std::sync::Arc;
+
use common_meta::peer::Peer;
use crate::error::Result;
use crate::lease;
use crate::metasrv::SelectorContext;
+use crate::node_excluder::NodeExcluderRef;
use crate::selector::common::{choose_items, filter_out_excluded_peers};
use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedItem};
use crate::selector::{Selector, SelectorOptions};
/// Select all alive datanodes based using a random weighted choose.
-pub struct LeaseBasedSelector;
+pub struct LeaseBasedSelector {
+ node_excluder: NodeExcluderRef,
+}
+
+impl LeaseBasedSelector {
+ pub fn new(node_excluder: NodeExcluderRef) -> Self {
+ Self { node_excluder }
+ }
+}
+
+impl Default for LeaseBasedSelector {
+ fn default() -> Self {
+ Self {
+ node_excluder: Arc::new(Vec::new()),
+ }
+ }
+}
#[async_trait::async_trait]
impl Selector for LeaseBasedSelector {
@@ -47,7 +67,14 @@ impl Selector for LeaseBasedSelector {
.collect();
// 3. choose peers by weight_array.
- filter_out_excluded_peers(&mut weight_array, &opts.exclude_peer_ids);
+ let mut exclude_peer_ids = self
+ .node_excluder
+ .excluded_datanode_ids()
+ .iter()
+ .cloned()
+ .collect::>();
+ exclude_peer_ids.extend(opts.exclude_peer_ids.iter());
+ filter_out_excluded_peers(&mut weight_array, &exclude_peer_ids);
let mut weighted_choose = RandomWeightedChoose::new(weight_array);
let selected = choose_items(&opts, &mut weighted_choose)?;
diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs
index d98a4ace5d..4f33245a28 100644
--- a/src/meta-srv/src/selector/load_based.rs
+++ b/src/meta-srv/src/selector/load_based.rs
@@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue};
use common_meta::key::TableMetadataManager;
@@ -26,6 +27,7 @@ use crate::error::{self, Result};
use crate::key::{DatanodeLeaseKey, LeaseValue};
use crate::lease;
use crate::metasrv::SelectorContext;
+use crate::node_excluder::NodeExcluderRef;
use crate::selector::common::{choose_items, filter_out_excluded_peers};
use crate::selector::weight_compute::{RegionNumsBasedWeightCompute, WeightCompute};
use crate::selector::weighted_choose::RandomWeightedChoose;
@@ -33,11 +35,15 @@ use crate::selector::{Selector, SelectorOptions};
pub struct LoadBasedSelector {
weight_compute: C,
+ node_excluder: NodeExcluderRef,
}
impl LoadBasedSelector {
- pub fn new(weight_compute: C) -> Self {
- Self { weight_compute }
+ pub fn new(weight_compute: C, node_excluder: NodeExcluderRef) -> Self {
+ Self {
+ weight_compute,
+ node_excluder,
+ }
}
}
@@ -45,6 +51,7 @@ impl Default for LoadBasedSelector {
fn default() -> Self {
Self {
weight_compute: RegionNumsBasedWeightCompute,
+ node_excluder: Arc::new(Vec::new()),
}
}
}
@@ -88,7 +95,14 @@ where
let mut weight_array = self.weight_compute.compute(&stat_kvs);
// 5. choose peers by weight_array.
- filter_out_excluded_peers(&mut weight_array, &opts.exclude_peer_ids);
+ let mut exclude_peer_ids = self
+ .node_excluder
+ .excluded_datanode_ids()
+ .iter()
+ .cloned()
+ .collect::>();
+ exclude_peer_ids.extend(opts.exclude_peer_ids.iter());
+ filter_out_excluded_peers(&mut weight_array, &exclude_peer_ids);
let mut weighted_choose = RandomWeightedChoose::new(weight_array);
let selected = choose_items(&opts, &mut weighted_choose)?;
diff --git a/src/meta-srv/src/selector/round_robin.rs b/src/meta-srv/src/selector/round_robin.rs
index 2c849cb194..d930ca06cb 100644
--- a/src/meta-srv/src/selector/round_robin.rs
+++ b/src/meta-srv/src/selector/round_robin.rs
@@ -12,7 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+use std::collections::HashSet;
use std::sync::atomic::AtomicUsize;
+use std::sync::Arc;
use common_meta::peer::Peer;
use snafu::ensure;
@@ -20,6 +22,7 @@ use snafu::ensure;
use crate::error::{NoEnoughAvailableNodeSnafu, Result};
use crate::lease;
use crate::metasrv::{SelectTarget, SelectorContext};
+use crate::node_excluder::NodeExcluderRef;
use crate::selector::{Selector, SelectorOptions};
/// Round-robin selector that returns the next peer in the list in sequence.
@@ -32,6 +35,7 @@ use crate::selector::{Selector, SelectorOptions};
pub struct RoundRobinSelector {
select_target: SelectTarget,
counter: AtomicUsize,
+ node_excluder: NodeExcluderRef,
}
impl Default for RoundRobinSelector {
@@ -39,32 +43,38 @@ impl Default for RoundRobinSelector {
Self {
select_target: SelectTarget::Datanode,
counter: AtomicUsize::new(0),
+ node_excluder: Arc::new(Vec::new()),
}
}
}
impl RoundRobinSelector {
- pub fn new(select_target: SelectTarget) -> Self {
+ pub fn new(select_target: SelectTarget, node_excluder: NodeExcluderRef) -> Self {
Self {
select_target,
+ node_excluder,
..Default::default()
}
}
- async fn get_peers(
- &self,
- min_required_items: usize,
- ctx: &SelectorContext,
- ) -> Result> {
+ async fn get_peers(&self, opts: &SelectorOptions, ctx: &SelectorContext) -> Result> {
let mut peers = match self.select_target {
SelectTarget::Datanode => {
// 1. get alive datanodes.
let lease_kvs =
lease::alive_datanodes(&ctx.meta_peer_client, ctx.datanode_lease_secs).await?;
+ let mut exclude_peer_ids = self
+ .node_excluder
+ .excluded_datanode_ids()
+ .iter()
+ .cloned()
+ .collect::>();
+ exclude_peer_ids.extend(opts.exclude_peer_ids.iter());
// 2. map into peers
lease_kvs
.into_iter()
+ .filter(|(k, _)| !exclude_peer_ids.contains(&k.node_id))
.map(|(k, v)| Peer::new(k.node_id, v.node_addr))
.collect::>()
}
@@ -84,8 +94,8 @@ impl RoundRobinSelector {
ensure!(
!peers.is_empty(),
NoEnoughAvailableNodeSnafu {
- required: min_required_items,
- available: 0usize,
+ required: opts.min_required_items,
+ available: peers.len(),
select_target: self.select_target
}
);
@@ -103,7 +113,7 @@ impl Selector for RoundRobinSelector {
type Output = Vec;
async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result> {
- let peers = self.get_peers(opts.min_required_items, ctx).await?;
+ let peers = self.get_peers(&opts, ctx).await?;
// choose peers
let mut selected = Vec::with_capacity(opts.min_required_items);
for _ in 0..opts.min_required_items {
@@ -176,4 +186,42 @@ mod test {
assert_eq!(peers.len(), 2);
assert_eq!(peers, vec![peer2.clone(), peer3.clone()]);
}
+
+ #[tokio::test]
+ async fn test_round_robin_selector_with_exclude_peer_ids() {
+ let selector = RoundRobinSelector::new(SelectTarget::Datanode, Arc::new(vec![5]));
+ let ctx = create_selector_context();
+ // add three nodes
+ let peer1 = Peer {
+ id: 2,
+ addr: "node1".to_string(),
+ };
+ let peer2 = Peer {
+ id: 5,
+ addr: "node2".to_string(),
+ };
+ let peer3 = Peer {
+ id: 8,
+ addr: "node3".to_string(),
+ };
+ put_datanodes(
+ &ctx.meta_peer_client,
+ vec![peer1.clone(), peer2.clone(), peer3.clone()],
+ )
+ .await;
+
+ let peers = selector
+ .select(
+ &ctx,
+ SelectorOptions {
+ min_required_items: 1,
+ allow_duplication: true,
+ exclude_peer_ids: HashSet::from([2]),
+ },
+ )
+ .await
+ .unwrap();
+ assert_eq!(peers.len(), 1);
+ assert_eq!(peers, vec![peer3.clone()]);
+ }
}
diff --git a/src/meta-srv/src/service/store/cached_kv.rs b/src/meta-srv/src/service/store/cached_kv.rs
index b26c2a558f..f86b42a9e2 100644
--- a/src/meta-srv/src/service/store/cached_kv.rs
+++ b/src/meta-srv/src/service/store/cached_kv.rs
@@ -278,7 +278,7 @@ impl KvBackend for LeaderCachedKvBackend {
let remote_res = self.store.batch_get(remote_req).await?;
let put_req = BatchPutRequest {
- kvs: remote_res.kvs.clone().into_iter().map(Into::into).collect(),
+ kvs: remote_res.kvs.clone().into_iter().collect(),
..Default::default()
};
let _ = self.cache.batch_put(put_req).await?;
diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs
index 883f554066..6d6de78a74 100644
--- a/src/mito2/src/read/projection.rs
+++ b/src/mito2/src/read/projection.rs
@@ -363,9 +363,9 @@ mod tests {
builder
.push_field_array(
*column_id,
- Arc::new(Int64Array::from_iter_values(
- std::iter::repeat(*field).take(num_rows),
- )),
+ Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
+ *field, num_rows,
+ ))),
)
.unwrap();
}
diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs
index aefec6a983..e48ca633da 100644
--- a/src/mito2/src/read/seq_scan.rs
+++ b/src/mito2/src/read/seq_scan.rs
@@ -206,6 +206,14 @@ impl SeqScan {
.build(),
));
}
+ if self.properties.partitions[partition].is_empty() {
+ return Ok(Box::pin(RecordBatchStreamWrapper::new(
+ self.stream_ctx.input.mapper.output_schema(),
+ common_recordbatch::EmptyRecordBatchStream::new(
+ self.stream_ctx.input.mapper.output_schema(),
+ ),
+ )));
+ }
let stream_ctx = self.stream_ctx.clone();
let semaphore = self.new_semaphore();
diff --git a/src/mito2/src/sst/index/bloom_filter/creator.rs b/src/mito2/src/sst/index/bloom_filter/creator.rs
index 5c00b1a19c..53821c7cf2 100644
--- a/src/mito2/src/sst/index/bloom_filter/creator.rs
+++ b/src/mito2/src/sst/index/bloom_filter/creator.rs
@@ -346,7 +346,6 @@ impl BloomFilterIndexer {
#[cfg(test)]
pub(crate) mod tests {
- use std::iter;
use api::v1::SemanticType;
use datatypes::data_type::ConcreteDataType;
@@ -461,15 +460,15 @@ pub(crate) mod tests {
Batch::new(
primary_key,
- Arc::new(UInt64Vector::from_iter_values(
- iter::repeat(0).take(num_rows),
- )),
- Arc::new(UInt64Vector::from_iter_values(
- iter::repeat(0).take(num_rows),
- )),
- Arc::new(UInt8Vector::from_iter_values(
- iter::repeat(1).take(num_rows),
- )),
+ Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
+ 0, num_rows,
+ ))),
+ Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
+ 0, num_rows,
+ ))),
+ Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
+ 1, num_rows,
+ ))),
vec![u64_field],
)
.unwrap()
diff --git a/src/mito2/src/sst/index/fulltext_index/creator.rs b/src/mito2/src/sst/index/fulltext_index/creator.rs
index 1d884ac3a5..fc9aae9f42 100644
--- a/src/mito2/src/sst/index/fulltext_index/creator.rs
+++ b/src/mito2/src/sst/index/fulltext_index/creator.rs
@@ -489,12 +489,12 @@ mod tests {
Arc::new(UInt64Vector::from_iter_values(
(0..num_rows).map(|n| n as u64),
)),
- Arc::new(UInt64Vector::from_iter_values(
- std::iter::repeat(0).take(num_rows),
- )),
- Arc::new(UInt8Vector::from_iter_values(
- std::iter::repeat(1).take(num_rows),
- )),
+ Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
+ 0, num_rows,
+ ))),
+ Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
+ 1, num_rows,
+ ))),
vec![
BatchColumn {
column_id: 1,
diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs
index 8991b72aec..6f44979c78 100644
--- a/src/mito2/src/sst/index/inverted_index/creator.rs
+++ b/src/mito2/src/sst/index/inverted_index/creator.rs
@@ -326,7 +326,6 @@ impl InvertedIndexer {
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
- use std::iter;
use api::v1::SemanticType;
use datafusion_expr::{binary_expr, col, lit, Expr as DfExpr, Operator};
@@ -424,15 +423,15 @@ mod tests {
Batch::new(
primary_key,
- Arc::new(UInt64Vector::from_iter_values(
- iter::repeat(0).take(num_rows),
- )),
- Arc::new(UInt64Vector::from_iter_values(
- iter::repeat(0).take(num_rows),
- )),
- Arc::new(UInt8Vector::from_iter_values(
- iter::repeat(1).take(num_rows),
- )),
+ Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
+ 0, num_rows,
+ ))),
+ Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
+ 0, num_rows,
+ ))),
+ Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
+ 1, num_rows,
+ ))),
vec![u64_field],
)
.unwrap()
diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs
index c90907f0eb..005e276bbd 100644
--- a/src/mito2/src/sst/parquet/format.rs
+++ b/src/mito2/src/sst/parquet/format.rs
@@ -755,7 +755,7 @@ mod tests {
));
let mut keys = vec![];
for (index, num_rows) in pk_row_nums.iter().map(|v| v.1).enumerate() {
- keys.extend(std::iter::repeat(index as u32).take(num_rows));
+ keys.extend(std::iter::repeat_n(index as u32, num_rows));
}
let keys = UInt32Array::from(keys);
Arc::new(DictionaryArray::new(keys, values))
diff --git a/src/operator/src/req_convert/insert/fill_impure_default.rs b/src/operator/src/req_convert/insert/fill_impure_default.rs
index a60138c6e5..cf1e1565a8 100644
--- a/src/operator/src/req_convert/insert/fill_impure_default.rs
+++ b/src/operator/src/req_convert/insert/fill_impure_default.rs
@@ -85,11 +85,9 @@ impl ImpureDefaultFiller {
.schema
.iter()
.filter_map(|schema| {
- if self.impure_columns.contains_key(&schema.column_name) {
- Some(&schema.column_name)
- } else {
- None
- }
+ self.impure_columns
+ .contains_key(&schema.column_name)
+ .then_some(&schema.column_name)
})
.collect();
diff --git a/src/pipeline/src/etl/processor/dissect.rs b/src/pipeline/src/etl/processor/dissect.rs
index 8c31f42ace..8034d984d2 100644
--- a/src/pipeline/src/etl/processor/dissect.rs
+++ b/src/pipeline/src/etl/processor/dissect.rs
@@ -325,7 +325,7 @@ impl std::str::FromStr for Pattern {
impl Pattern {
fn check(&self) -> Result<()> {
- if self.len() == 0 {
+ if self.is_empty() {
return DissectEmptyPatternSnafu.fail();
}
diff --git a/src/promql/src/extension_plan/instant_manipulate.rs b/src/promql/src/extension_plan/instant_manipulate.rs
index 1071e94f9a..a3339a4e95 100644
--- a/src/promql/src/extension_plan/instant_manipulate.rs
+++ b/src/promql/src/extension_plan/instant_manipulate.rs
@@ -91,9 +91,9 @@ impl UserDefinedLogicalNodeCore for InstantManipulate {
_exprs: Vec,
inputs: Vec,
) -> DataFusionResult {
- if inputs.is_empty() {
+ if inputs.len() != 1 {
return Err(DataFusionError::Internal(
- "InstantManipulate should have at least one input".to_string(),
+ "InstantManipulate should have exact one input".to_string(),
));
}
@@ -354,6 +354,9 @@ impl Stream for InstantManipulateStream {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll